[Ace-users] Can I have multiple thread consume message from the same connection?

kun niu haoniukun at gmail.com
Sun Dec 9 07:56:54 CST 2007


On 12月9日, 上午3时03分, schm... at dre.vanderbilt.edu (Douglas C. Schmidt)
wrote:
> Hi,
>
> >I'm sorry for the inconvenience I've caused.
> >>You can't have multiple threads simultanously reading from the same
> >>TCP connection.  Please see the discussions in Chapters 4 and 6 of
> >>C++NPv2 <www.cs.wustl.edu/~schmidt/ACE/book2/> for examples of how to
> >>handle this situation with the ACE Reactor and ACE Task frameworks.
> >In actual fact, I'm not reading from the same tcp connection with
> >multiple thread.
>
> Ok, then you might want to rephrase your "Subject:" line since that's
> what you imply!
>
> >In fact, I'm using the Task framework.
> >I have one thread which reads message from the connection and puts the
> >message in the message queue created by the Task framework.
> >And several other threads process these messages created by thread
> >manager in the task.
>
> Ok.
>
> >Here's my problem-report-form:
>
> >    ACE VERSION: 5.6
>
> Great, thanks for usin gthe PRF.
>
>
>
> >    HOST MACHINE and OPERATING SYSTEM:
> >Intel i386
> >Debian lenny
>
> >    COMPILER NAME AND VERSION (AND PATCHLEVEL):
> >g++ version 4.2.3 20071123 (prerelease) (Debian 4.2.2-4)
>
> >    THE $ACE_ROOT/ace/config.h FILE [if you use a link to a platform-
> >    specific file, simply state which one]:
> >#include "ace/config-linux.h"
>
> >    THE $ACE_ROOT/include/makeinclude/platform_macros.GNU FILE
> >include $(ACE_ROOT)/include/makeinclude/platform_linux.GNU
>
> >    DOES THE PROBLEM AFFECT:
> >        EXECUTION?
> >my application is affected
>
> >    SYNOPSIS:
> >messages put in the queue are processed several times.
>
> >    DESCRIPTION:
> >My main problem is like this:
> >Can I create a task when opening a event_handler for reading contents
> >from the connection?
>
> I don't understand this question.
>
> >I find that after putting a message in the message queue by calling
> >task's putq method, I can continuously get message from the message
> >queue by calling getq.
>
> What do you mean by "continuously get message"?
>
>
>
> >Part of my code are listed here, please point out my error if any.
> >Thanks in advance.
> >  class Read_Handler : public ACE_Event_Handler
> >  {
> >  protected:
> >    int queued_count_;
> >    int deferred_close_;
> >    ACE_SYNCH_MUTEX lock_;
> >    ACE_SOCK_Stream connection_;
> >    IMCC_Task_Manager *manager_;
>
> >  public:
> >    Read_Handler(ACE_Reactor *r):
> >      ACE_Event_Handler(r),
> >      queued_count_(0),
> >      deferred_close_(0),
> >      manager_(0) {}
> >    virtual int open();
> >    virtual int handle_input(ACE_HANDLE);
> >    virtual int handle_close(ACE_HANDLE, ACE_Reactor_Mask);
> >    ACE_SOCK_Stream &peer() { return this->connection_; }
> >    virtual ACE_HANDLE get_handle() const
> >      { return this->connection_.get_handle(); }
> >  };
> >int Read_Handler::open ()
> >{
> >  this->manager_ = new Task_Manager ();
> >  if (this->manager_ == NULL)
> >    return -1;
> >  if (this->manager_->open () == -1)
> >  {
> >    delete this->manager_;
> >    return -1;
> >  }
> >  return reactor()->
> >    register_handler (this,
> >                      ACE_Event_Handler::READ_MASK);
>
> >}
>
> >int Read_Handler::handle_input (ACE_HANDLE handle =
> >ACE_INVALID_HANDLE)
> >{
> >  ACE_Message_Block *block = new ACE_Message_Block
> >(ACE_DEFAULT_CDR_BUFSIZE);
> >  ssize_t bytes_received;
> >  if((bytes_received = connection_.recv (block->wr_ptr (),
> >                                         ACE_DEFAULT_CDR_BUFSIZE)) !=
> >-1 )
> >  {
> >    block->wr_ptr (bytes_received);
> >    ACE_Message_Block *payload = NULL;
> >    ACE_NEW_RETURN (payload,
> >                    ACE_Message_Block (reinterpret_cast<char *>
> >(this)), -1);
> >    payload->cont (block);
> >    ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, guard, lock_, -1);
> >    if (this->manager_->put (payload) == -1)
> >    {
> >      payload->release ();
> >      payload = 0;
> >      return -1;
> >    }
> >    ++queued_count_;
> >    return 0;
> >  }
> >  block->release ();
> >  block = 0;
> >  return -1;
> >}
>
> >  class Task_Manager : public ACE_Task<ACE_MT_SYNCH>
> >  {
> >  public:
> >    Task_Manager(ACE_Thread_Manager *thr_mgr = 0,
> >                      ACE_Message_Queue<ACE_MT_SYNCH> *mq = 0) :
> >      ACE_Task<ACE_MT_SYNCH>(thr_mgr, mq) {}
> >    virtual ~Task_Manager() {}
> >    virtual int open (void * = 0)
> >    { return activate (THR_NEW_LWP, MAX_THREADS); }
> >    virtual int put (ACE_Message_Block *block, ACE_Time_Value *timeout
> >= 0)
> >    { return putq(block, timeout); }
> >    virtual int svc (void);
>
> >}
>
> >int Task_Manager::svc (void)
> >{
> >  for (ACE_Message_Block *block; getq(block) != -1;)
> >  {
> >    Read_Handler *read_handler =
> >      reinterpret_cast<Read_Handler *>(block->rd_ptr());
> >    ACE_Message_Block *message = block->cont();
> >    ACE_InputCDR cdr(message);
> >    ACE_CDR::ULong length;
> >    cdr>>length;
> >    block->release();
> >  }
> >  return 0;
> >}
>
> This code looks right to me - please explain exactly where you are
> having a problem.
>
> Thanks,
>
> Doug
> --
> Dr. Douglas C. Schmidt                       Professor and Associate Chair
> Electrical Engineering and Computer Science  TEL: (615) 343-8197
> Vanderbilt University                        WEB:www.dre.vanderbilt.edu/~schmidt
> Nashville, TN 37203                          NET: d.schm... at vanderbilt.edu

Thank you for your reply.
And I've got a sample program here.
It turns out that only one thread exits on my system.
But messages are consumed well.

/*****************************************************
file: messagequeue.cpp
*****************************************************/
#include "ace/Task.h"
#include "ace/OS_NS_stdio.h"
#include "ace/OS_NS_stdlib.h"
#include "ace/OS_NS_time.h"
#include "ace/Message_Block.h"
#include "ace/Synch.h"
#include "ace/Thread_Manager.h"
#include "ace/Thread.h"
#include "ace/Message_Queue.h"

class Manager : public ACE_Task<ACE_MT_SYNCH>
{
public:
  Manager(ACE_Thread_Manager *tm = 0, ACE_Message_Queue<ACE_MT_SYNCH>
*mq = 0)
    :  ACE_Task<ACE_MT_SYNCH> ( tm, mq ){}
  virtual ~Manager() {}
  virtual int open( void * = 0 )
  { return activate(THR_NEW_LWP, 2);}
  virtual int put( ACE_Message_Block *block, ACE_Time_Value *timeout =
0 )
  { return putq(block, timeout); }
  virtual int svc(void)
  {
    id_ = ACE_Thread::self();
    while(1)
    {
      ACE_Message_Block *block = 0;
      if(this->getq(block)==-1)
      {
	ACE_OS::printf("fail to get message. thread: %d\n", id_);
	return -1;
      }
      if(block->msg_type()==ACE_Message_Block::MB_HANGUP)
      {
	ACE_OS::printf("exit thread: %d\n", id_);
	block->release();
	break;
      }
      ACE_OS::printf("get a message. thread %d\n", id_);
      ACE_OS::sleep(1);
      block->release();
    }
    return 0;
  }
private:
  ACE_thread_t id_;
};

int main(int argc, char *argv[])
{
  Manager *manager = new Manager();
  manager->open();
  for(int i = 0; i<4; i++)
  {
    ACE_Message_Block *block = new ACE_Message_Block(sizeof(int));
    manager->put(block);
    ACE_OS::sleep(2);
  }
  ACE_OS::sleep(5);
  for(int i = 0; i<2; i++)
  {
    ACE_Message_Block *block = new ACE_Message_Block(sizeof(int),
						     ACE_Message_Block::MB_HANGUP);
    manager->put(block);
    ACE_OS::sleep(1);
  }
  ACE_OS::sleep(10);
  delete manager;
  return 0;
}

And my output is like this:
get a message. thread -1212511344
get a message. thread -1220899952
get a message. thread -1220899952
get a message. thread -1220899952
exit thread: -1220899952
exit thread: -1220899952
Did I missed something?
Do I have to create message queue and thread pool by myself when
working with a task framework?

Thanks for any reply.


More information about the Ace-users mailing list