[Ace-users] [ace-users] Can I have multiple thread consume message from the same connection?
Douglas C. Schmidt
schmidt at dre.vanderbilt.edu
Tue Dec 18 20:55:17 CST 2007
Hi,
Sorry for the delay in replying. It's the end of the semester
and so things are much busier than usual.
>Thank you for your reply.
>And I've got a sample program here.
Thanks, this is helpful.
>It turns out that only one thread exits on my system.
Right, that's because your program is programmed incorrectly wrt
shutting down the threads running the ACE_Task. In particular,
there's no reason to expect that the two MB_HANGUP messages will be
retrieved once in each of the two threads. Instead, what's happening
is that one of your threads is getting both messages.
Please see Chapters 6 and 7 of C++NPv2
<www.cs.wustl.edu/~schmidt/ACE/book2/> for detailed discussions on how
to shutdown threads in an ACE_Task properly.
Thanks,
Doug
>But messages are consumed well.
>
>/*****************************************************
>file: messagequeue.cpp
>*****************************************************/
>#include "ace/Task.h"
>#include "ace/OS_NS_stdio.h"
>#include "ace/OS_NS_stdlib.h"
>#include "ace/OS_NS_time.h"
>#include "ace/Message_Block.h"
>#include "ace/Synch.h"
>#include "ace/Thread_Manager.h"
>#include "ace/Thread.h"
>#include "ace/Message_Queue.h"
>
>class Manager : public ACE_Task<ACE_MT_SYNCH>
>{
>public:
> Manager(ACE_Thread_Manager *tm =3D 0, ACE_Message_Queue<ACE_MT_SYNCH>
>*mq =3D 0)
> : ACE_Task<ACE_MT_SYNCH> ( tm, mq ){}
> virtual ~Manager() {}
> virtual int open( void * =3D 0 )
> { return activate(THR_NEW_LWP, 2);}
> virtual int put( ACE_Message_Block *block, ACE_Time_Value *timeout =3D
>0 )
> { return putq(block, timeout); }
> virtual int svc(void)
> {
> id_ =3D ACE_Thread::self();
> while(1)
> {
> ACE_Message_Block *block =3D 0;
> if(this->getq(block)=3D=3D-1)
> {
> ACE_OS::printf("fail to get message. thread: %d\n", id_);
> return -1;
> }
> if(block->msg_type()=3D=3DACE_Message_Block::MB_HANGUP)
> {
> ACE_OS::printf("exit thread: %d\n", id_);
> block->release();
> break;
> }
> ACE_OS::printf("get a message. thread %d\n", id_);
> ACE_OS::sleep(1);
> block->release();
> }
> return 0;
> }
>private:
> ACE_thread_t id_;
>};
>
>int main(int argc, char *argv[])
>{
> Manager *manager =3D new Manager();
> manager->open();
> for(int i =3D 0; i<4; i++)
> {
> ACE_Message_Block *block =3D new ACE_Message_Block(sizeof(int));
> manager->put(block);
> ACE_OS::sleep(2);
> }
> ACE_OS::sleep(5);
> for(int i =3D 0; i<2; i++)
> {
> ACE_Message_Block *block =3D new ACE_Message_Block(sizeof(int),
> ACE_Message_Block::MB_HANGUP);
> manager->put(block);
> ACE_OS::sleep(1);
> }
> ACE_OS::sleep(10);
> delete manager;
> return 0;
>}
>
>And my output is like this:
>get a message. thread -1212511344
>get a message. thread -1220899952
>get a message. thread -1220899952
>get a message. thread -1220899952
>exit thread: -1220899952
>exit thread: -1220899952
>Did I missed something?
>Do I have to create message queue and thread pool by myself when
>working with a task framework?
>
>Thanks for any reply.
--
Dr. Douglas C. Schmidt Professor and Associate Chair
Electrical Engineering and Computer Science TEL: (615) 343-8197
Vanderbilt University WEB: www.dre.vanderbilt.edu/~schmidt
Nashville, TN 37203 NET: d.schmidt at vanderbilt.edu
More information about the Ace-users
mailing list