[Ace-users] Can I have multiple thread consume message from the same connection?

kun niu haoniukun at gmail.com
Sun Dec 9 07:58:04 CST 2007


On 12月9日, 下午9时56分, kun niu <haoniu... at gmail.com> wrote:
> On 12月9日, 上午3时03分, schm... at dre.vanderbilt.edu (Douglas C. Schmidt)
> wrote:
>
>
>
> > Hi,
>
> > >I'm sorry for the inconvenience I've caused.
> > >>You can't have multiple threads simultanously reading from the same
> > >>TCP connection.  Please see the discussions in Chapters 4 and 6 of
> > >>C++NPv2 <www.cs.wustl.edu/~schmidt/ACE/book2/> for examples of how to
> > >>handle this situation with the ACE Reactor and ACE Task frameworks.
> > >In actual fact, I'm not reading from the same tcp connection with
> > >multiple thread.
>
> > Ok, then you might want to rephrase your "Subject:" line since that's
> > what you imply!
>
> > >In fact, I'm using the Task framework.
> > >I have one thread which reads message from the connection and puts the
> > >message in the message queue created by the Task framework.
> > >And several other threads process these messages created by thread
> > >manager in the task.
>
> > Ok.
>
> > >Here's my problem-report-form:
>
> > >    ACE VERSION: 5.6
>
> > Great, thanks for usin gthe PRF.
>
> > >    HOST MACHINE and OPERATING SYSTEM:
> > >Intel i386
> > >Debian lenny
>
> > >    COMPILER NAME AND VERSION (AND PATCHLEVEL):
> > >g++ version 4.2.3 20071123 (prerelease) (Debian 4.2.2-4)
>
> > >    THE $ACE_ROOT/ace/config.h FILE [if you use a link to a platform-
> > >    specific file, simply state which one]:
> > >#include "ace/config-linux.h"
>
> > >    THE $ACE_ROOT/include/makeinclude/platform_macros.GNU FILE
> > >include $(ACE_ROOT)/include/makeinclude/platform_linux.GNU
>
> > >    DOES THE PROBLEM AFFECT:
> > >        EXECUTION?
> > >my application is affected
>
> > >    SYNOPSIS:
> > >messages put in the queue are processed several times.
>
> > >    DESCRIPTION:
> > >My main problem is like this:
> > >Can I create a task when opening a event_handler for reading contents
> > >from the connection?
>
> > I don't understand this question.
>
> > >I find that after putting a message in the message queue by calling
> > >task's putq method, I can continuously get message from the message
> > >queue by calling getq.
>
> > What do you mean by "continuously get message"?
>
> > >Part of my code are listed here, please point out my error if any.
> > >Thanks in advance.
> > >  class Read_Handler : public ACE_Event_Handler
> > >  {
> > >  protected:
> > >    int queued_count_;
> > >    int deferred_close_;
> > >    ACE_SYNCH_MUTEX lock_;
> > >    ACE_SOCK_Stream connection_;
> > >    IMCC_Task_Manager *manager_;
>
> > >  public:
> > >    Read_Handler(ACE_Reactor *r):
> > >      ACE_Event_Handler(r),
> > >      queued_count_(0),
> > >      deferred_close_(0),
> > >      manager_(0) {}
> > >    virtual int open();
> > >    virtual int handle_input(ACE_HANDLE);
> > >    virtual int handle_close(ACE_HANDLE, ACE_Reactor_Mask);
> > >    ACE_SOCK_Stream &peer() { return this->connection_; }
> > >    virtual ACE_HANDLE get_handle() const
> > >      { return this->connection_.get_handle(); }
> > >  };
> > >int Read_Handler::open ()
> > >{
> > >  this->manager_ = new Task_Manager ();
> > >  if (this->manager_ == NULL)
> > >    return -1;
> > >  if (this->manager_->open () == -1)
> > >  {
> > >    delete this->manager_;
> > >    return -1;
> > >  }
> > >  return reactor()->
> > >    register_handler (this,
> > >                      ACE_Event_Handler::READ_MASK);
>
> > >}
>
> > >int Read_Handler::handle_input (ACE_HANDLE handle =
> > >ACE_INVALID_HANDLE)
> > >{
> > >  ACE_Message_Block *block = new ACE_Message_Block
> > >(ACE_DEFAULT_CDR_BUFSIZE);
> > >  ssize_t bytes_received;
> > >  if((bytes_received = connection_.recv (block->wr_ptr (),
> > >                                         ACE_DEFAULT_CDR_BUFSIZE)) !=
> > >-1 )
> > >  {
> > >    block->wr_ptr (bytes_received);
> > >    ACE_Message_Block *payload = NULL;
> > >    ACE_NEW_RETURN (payload,
> > >                    ACE_Message_Block (reinterpret_cast<char *>
> > >(this)), -1);
> > >    payload->cont (block);
> > >    ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, guard, lock_, -1);
> > >    if (this->manager_->put (payload) == -1)
> > >    {
> > >      payload->release ();
> > >      payload = 0;
> > >      return -1;
> > >    }
> > >    ++queued_count_;
> > >    return 0;
> > >  }
> > >  block->release ();
> > >  block = 0;
> > >  return -1;
> > >}
>
> > >  class Task_Manager : public ACE_Task<ACE_MT_SYNCH>
> > >  {
> > >  public:
> > >    Task_Manager(ACE_Thread_Manager *thr_mgr = 0,
> > >                      ACE_Message_Queue<ACE_MT_SYNCH> *mq = 0) :
> > >      ACE_Task<ACE_MT_SYNCH>(thr_mgr, mq) {}
> > >    virtual ~Task_Manager() {}
> > >    virtual int open (void * = 0)
> > >    { return activate (THR_NEW_LWP, MAX_THREADS); }
> > >    virtual int put (ACE_Message_Block *block, ACE_Time_Value *timeout
> > >= 0)
> > >    { return putq(block, timeout); }
> > >    virtual int svc (void);
>
> > >}
>
> > >int Task_Manager::svc (void)
> > >{
> > >  for (ACE_Message_Block *block; getq(block) != -1;)
> > >  {
> > >    Read_Handler *read_handler =
> > >      reinterpret_cast<Read_Handler *>(block->rd_ptr());
> > >    ACE_Message_Block *message = block->cont();
> > >    ACE_InputCDR cdr(message);
> > >    ACE_CDR::ULong length;
> > >    cdr>>length;
> > >    block->release();
> > >  }
> > >  return 0;
> > >}
>
> > This code looks right to me - please explain exactly where you are
> > having a problem.
>
> > Thanks,
>
> > Doug
> > --
> > Dr. Douglas C. Schmidt                       Professor and Associate Chair
> > Electrical Engineering and Computer Science  TEL: (615) 343-8197
> > Vanderbilt University                        WEB:www.dre.vanderbilt.edu/~schmidt
> > Nashville, TN 37203                          NET: d.schm... at vanderbilt.edu
>
> Thank you for your reply.
> And I've got a sample program here.
> It turns out that only one thread exits on my system.
> But messages are consumed well.
>
> /*****************************************************
> file: messagequeue.cpp
> *****************************************************/
> #include "ace/Task.h"
> #include "ace/OS_NS_stdio.h"
> #include "ace/OS_NS_stdlib.h"
> #include "ace/OS_NS_time.h"
> #include "ace/Message_Block.h"
> #include "ace/Synch.h"
> #include "ace/Thread_Manager.h"
> #include "ace/Thread.h"
> #include "ace/Message_Queue.h"
>
> class Manager : public ACE_Task<ACE_MT_SYNCH>
> {
> public:
>   Manager(ACE_Thread_Manager *tm = 0, ACE_Message_Queue<ACE_MT_SYNCH>
> *mq = 0)
>     :  ACE_Task<ACE_MT_SYNCH> ( tm, mq ){}
>   virtual ~Manager() {}
>   virtual int open( void * = 0 )
>   { return activate(THR_NEW_LWP, 2);}
>   virtual int put( ACE_Message_Block *block, ACE_Time_Value *timeout =
> 0 )
>   { return putq(block, timeout); }
>   virtual int svc(void)
>   {
>     id_ = ACE_Thread::self();
>     while(1)
>     {
>       ACE_Message_Block *block = 0;
>       if(this->getq(block)==-1)
>       {
>         ACE_OS::printf("fail to get message. thread: %d\n", id_);
>         return -1;
>       }
>       if(block->msg_type()==ACE_Message_Block::MB_HANGUP)
>       {
>         ACE_OS::printf("exit thread: %d\n", id_);
>         block->release();
>         break;
>       }
>       ACE_OS::printf("get a message. thread %d\n", id_);
>       ACE_OS::sleep(1);
>       block->release();
>     }
>     return 0;
>   }
> private:
>   ACE_thread_t id_;
>
> };
>
> int main(int argc, char *argv[])
> {
>   Manager *manager = new Manager();
>   manager->open();
>   for(int i = 0; i<4; i++)
>   {
>     ACE_Message_Block *block = new ACE_Message_Block(sizeof(int));
>     manager->put(block);
>     ACE_OS::sleep(2);
>   }
>   ACE_OS::sleep(5);
>   for(int i = 0; i<2; i++)
>   {
>     ACE_Message_Block *block = new ACE_Message_Block(sizeof(int),
>                                                      ACE_Message_Block::MB_HANGUP);
>     manager->put(block);
>     ACE_OS::sleep(1);
>   }
>   ACE_OS::sleep(10);
>   delete manager;
>   return 0;
>
> }
>
> And my output is like this:
> get a message. thread -1212511344
> get a message. thread -1220899952
> get a message. thread -1220899952
> get a message. thread -1220899952
> exit thread: -1220899952
> exit thread: -1220899952
> Did I missed something?
> Do I have to create message queue and thread pool by myself when
> working with a task framework?
>
> Thanks for any reply.

And it can be compiled with the following command:
g++ -I$ACE_ROOT -L$ACE_ROOT/lib -lpthread -ldl -lrt -lACE -D_REENTRANT
messagequeue.cpp -o messagequeue


More information about the Ace-users mailing list