[Ace-users] [ace-users] Can I have multiple thread consume message from the same connection?

Douglas C. Schmidt schmidt at dre.vanderbilt.edu
Tue Dec 18 20:55:17 CST 2007


Hi,

        Sorry for the delay in replying.  It's the end of the semester
and so things are much busier than usual.

>Thank you for your reply.
>And I've got a sample program here.

Thanks, this is helpful.

>It turns out that only one thread exits on my system.

Right, that's because your program is programmed incorrectly wrt
shutting down the threads running the ACE_Task.  In particular,
there's no reason to expect that the two MB_HANGUP messages will be
retrieved once in each of the two threads.  Instead, what's happening
is that one of your threads is getting both messages.

Please see Chapters 6 and 7 of C++NPv2
<www.cs.wustl.edu/~schmidt/ACE/book2/> for detailed discussions on how
to shutdown threads in an ACE_Task properly.  

Thanks,

        Doug

>But messages are consumed well.
>
>/*****************************************************
>file: messagequeue.cpp
>*****************************************************/
>#include "ace/Task.h"
>#include "ace/OS_NS_stdio.h"
>#include "ace/OS_NS_stdlib.h"
>#include "ace/OS_NS_time.h"
>#include "ace/Message_Block.h"
>#include "ace/Synch.h"
>#include "ace/Thread_Manager.h"
>#include "ace/Thread.h"
>#include "ace/Message_Queue.h"
>
>class Manager : public ACE_Task<ACE_MT_SYNCH>
>{
>public:
>  Manager(ACE_Thread_Manager *tm =3D 0, ACE_Message_Queue<ACE_MT_SYNCH>
>*mq =3D 0)
>    :  ACE_Task<ACE_MT_SYNCH> ( tm, mq ){}
>  virtual ~Manager() {}
>  virtual int open( void * =3D 0 )
>  { return activate(THR_NEW_LWP, 2);}
>  virtual int put( ACE_Message_Block *block, ACE_Time_Value *timeout =3D
>0 )
>  { return putq(block, timeout); }
>  virtual int svc(void)
>  {
>    id_ =3D ACE_Thread::self();
>    while(1)
>    {
>      ACE_Message_Block *block =3D 0;
>      if(this->getq(block)=3D=3D-1)
>      {
>	ACE_OS::printf("fail to get message. thread: %d\n", id_);
>	return -1;
>      }
>      if(block->msg_type()=3D=3DACE_Message_Block::MB_HANGUP)
>      {
>	ACE_OS::printf("exit thread: %d\n", id_);
>	block->release();
>	break;
>      }
>      ACE_OS::printf("get a message. thread %d\n", id_);
>      ACE_OS::sleep(1);
>      block->release();
>    }
>    return 0;
>  }
>private:
>  ACE_thread_t id_;
>};
>
>int main(int argc, char *argv[])
>{
>  Manager *manager =3D new Manager();
>  manager->open();
>  for(int i =3D 0; i<4; i++)
>  {
>    ACE_Message_Block *block =3D new ACE_Message_Block(sizeof(int));
>    manager->put(block);
>    ACE_OS::sleep(2);
>  }
>  ACE_OS::sleep(5);
>  for(int i =3D 0; i<2; i++)
>  {
>    ACE_Message_Block *block =3D new ACE_Message_Block(sizeof(int),
>						     ACE_Message_Block::MB_HANGUP);
>    manager->put(block);
>    ACE_OS::sleep(1);
>  }
>  ACE_OS::sleep(10);
>  delete manager;
>  return 0;
>}
>
>And my output is like this:
>get a message. thread -1212511344
>get a message. thread -1220899952
>get a message. thread -1220899952
>get a message. thread -1220899952
>exit thread: -1220899952
>exit thread: -1220899952
>Did I missed something?
>Do I have to create message queue and thread pool by myself when
>working with a task framework?
>
>Thanks for any reply.


-- 
Dr. Douglas C. Schmidt                       Professor and Associate Chair
Electrical Engineering and Computer Science  TEL: (615) 343-8197
Vanderbilt University                        WEB: www.dre.vanderbilt.edu/~schmidt
Nashville, TN 37203                          NET: d.schmidt at vanderbilt.edu



More information about the Ace-users mailing list