[Ace-users] Can I have multiple thread consume message from the same connection?
kun niu
haoniukun at gmail.com
Sun Dec 9 07:58:04 CST 2007
On 12月9日, 下午9时56分, kun niu <haoniu... at gmail.com> wrote:
> On 12月9日, 上午3时03分, schm... at dre.vanderbilt.edu (Douglas C. Schmidt)
> wrote:
>
>
>
> > Hi,
>
> > >I'm sorry for the inconvenience I've caused.
> > >>You can't have multiple threads simultanously reading from the same
> > >>TCP connection. Please see the discussions in Chapters 4 and 6 of
> > >>C++NPv2 <www.cs.wustl.edu/~schmidt/ACE/book2/> for examples of how to
> > >>handle this situation with the ACE Reactor and ACE Task frameworks.
> > >In actual fact, I'm not reading from the same tcp connection with
> > >multiple thread.
>
> > Ok, then you might want to rephrase your "Subject:" line since that's
> > what you imply!
>
> > >In fact, I'm using the Task framework.
> > >I have one thread which reads message from the connection and puts the
> > >message in the message queue created by the Task framework.
> > >And several other threads process these messages created by thread
> > >manager in the task.
>
> > Ok.
>
> > >Here's my problem-report-form:
>
> > > ACE VERSION: 5.6
>
> > Great, thanks for usin gthe PRF.
>
> > > HOST MACHINE and OPERATING SYSTEM:
> > >Intel i386
> > >Debian lenny
>
> > > COMPILER NAME AND VERSION (AND PATCHLEVEL):
> > >g++ version 4.2.3 20071123 (prerelease) (Debian 4.2.2-4)
>
> > > THE $ACE_ROOT/ace/config.h FILE [if you use a link to a platform-
> > > specific file, simply state which one]:
> > >#include "ace/config-linux.h"
>
> > > THE $ACE_ROOT/include/makeinclude/platform_macros.GNU FILE
> > >include $(ACE_ROOT)/include/makeinclude/platform_linux.GNU
>
> > > DOES THE PROBLEM AFFECT:
> > > EXECUTION?
> > >my application is affected
>
> > > SYNOPSIS:
> > >messages put in the queue are processed several times.
>
> > > DESCRIPTION:
> > >My main problem is like this:
> > >Can I create a task when opening a event_handler for reading contents
> > >from the connection?
>
> > I don't understand this question.
>
> > >I find that after putting a message in the message queue by calling
> > >task's putq method, I can continuously get message from the message
> > >queue by calling getq.
>
> > What do you mean by "continuously get message"?
>
> > >Part of my code are listed here, please point out my error if any.
> > >Thanks in advance.
> > > class Read_Handler : public ACE_Event_Handler
> > > {
> > > protected:
> > > int queued_count_;
> > > int deferred_close_;
> > > ACE_SYNCH_MUTEX lock_;
> > > ACE_SOCK_Stream connection_;
> > > IMCC_Task_Manager *manager_;
>
> > > public:
> > > Read_Handler(ACE_Reactor *r):
> > > ACE_Event_Handler(r),
> > > queued_count_(0),
> > > deferred_close_(0),
> > > manager_(0) {}
> > > virtual int open();
> > > virtual int handle_input(ACE_HANDLE);
> > > virtual int handle_close(ACE_HANDLE, ACE_Reactor_Mask);
> > > ACE_SOCK_Stream &peer() { return this->connection_; }
> > > virtual ACE_HANDLE get_handle() const
> > > { return this->connection_.get_handle(); }
> > > };
> > >int Read_Handler::open ()
> > >{
> > > this->manager_ = new Task_Manager ();
> > > if (this->manager_ == NULL)
> > > return -1;
> > > if (this->manager_->open () == -1)
> > > {
> > > delete this->manager_;
> > > return -1;
> > > }
> > > return reactor()->
> > > register_handler (this,
> > > ACE_Event_Handler::READ_MASK);
>
> > >}
>
> > >int Read_Handler::handle_input (ACE_HANDLE handle =
> > >ACE_INVALID_HANDLE)
> > >{
> > > ACE_Message_Block *block = new ACE_Message_Block
> > >(ACE_DEFAULT_CDR_BUFSIZE);
> > > ssize_t bytes_received;
> > > if((bytes_received = connection_.recv (block->wr_ptr (),
> > > ACE_DEFAULT_CDR_BUFSIZE)) !=
> > >-1 )
> > > {
> > > block->wr_ptr (bytes_received);
> > > ACE_Message_Block *payload = NULL;
> > > ACE_NEW_RETURN (payload,
> > > ACE_Message_Block (reinterpret_cast<char *>
> > >(this)), -1);
> > > payload->cont (block);
> > > ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, guard, lock_, -1);
> > > if (this->manager_->put (payload) == -1)
> > > {
> > > payload->release ();
> > > payload = 0;
> > > return -1;
> > > }
> > > ++queued_count_;
> > > return 0;
> > > }
> > > block->release ();
> > > block = 0;
> > > return -1;
> > >}
>
> > > class Task_Manager : public ACE_Task<ACE_MT_SYNCH>
> > > {
> > > public:
> > > Task_Manager(ACE_Thread_Manager *thr_mgr = 0,
> > > ACE_Message_Queue<ACE_MT_SYNCH> *mq = 0) :
> > > ACE_Task<ACE_MT_SYNCH>(thr_mgr, mq) {}
> > > virtual ~Task_Manager() {}
> > > virtual int open (void * = 0)
> > > { return activate (THR_NEW_LWP, MAX_THREADS); }
> > > virtual int put (ACE_Message_Block *block, ACE_Time_Value *timeout
> > >= 0)
> > > { return putq(block, timeout); }
> > > virtual int svc (void);
>
> > >}
>
> > >int Task_Manager::svc (void)
> > >{
> > > for (ACE_Message_Block *block; getq(block) != -1;)
> > > {
> > > Read_Handler *read_handler =
> > > reinterpret_cast<Read_Handler *>(block->rd_ptr());
> > > ACE_Message_Block *message = block->cont();
> > > ACE_InputCDR cdr(message);
> > > ACE_CDR::ULong length;
> > > cdr>>length;
> > > block->release();
> > > }
> > > return 0;
> > >}
>
> > This code looks right to me - please explain exactly where you are
> > having a problem.
>
> > Thanks,
>
> > Doug
> > --
> > Dr. Douglas C. Schmidt Professor and Associate Chair
> > Electrical Engineering and Computer Science TEL: (615) 343-8197
> > Vanderbilt University WEB:www.dre.vanderbilt.edu/~schmidt
> > Nashville, TN 37203 NET: d.schm... at vanderbilt.edu
>
> Thank you for your reply.
> And I've got a sample program here.
> It turns out that only one thread exits on my system.
> But messages are consumed well.
>
> /*****************************************************
> file: messagequeue.cpp
> *****************************************************/
> #include "ace/Task.h"
> #include "ace/OS_NS_stdio.h"
> #include "ace/OS_NS_stdlib.h"
> #include "ace/OS_NS_time.h"
> #include "ace/Message_Block.h"
> #include "ace/Synch.h"
> #include "ace/Thread_Manager.h"
> #include "ace/Thread.h"
> #include "ace/Message_Queue.h"
>
> class Manager : public ACE_Task<ACE_MT_SYNCH>
> {
> public:
> Manager(ACE_Thread_Manager *tm = 0, ACE_Message_Queue<ACE_MT_SYNCH>
> *mq = 0)
> : ACE_Task<ACE_MT_SYNCH> ( tm, mq ){}
> virtual ~Manager() {}
> virtual int open( void * = 0 )
> { return activate(THR_NEW_LWP, 2);}
> virtual int put( ACE_Message_Block *block, ACE_Time_Value *timeout =
> 0 )
> { return putq(block, timeout); }
> virtual int svc(void)
> {
> id_ = ACE_Thread::self();
> while(1)
> {
> ACE_Message_Block *block = 0;
> if(this->getq(block)==-1)
> {
> ACE_OS::printf("fail to get message. thread: %d\n", id_);
> return -1;
> }
> if(block->msg_type()==ACE_Message_Block::MB_HANGUP)
> {
> ACE_OS::printf("exit thread: %d\n", id_);
> block->release();
> break;
> }
> ACE_OS::printf("get a message. thread %d\n", id_);
> ACE_OS::sleep(1);
> block->release();
> }
> return 0;
> }
> private:
> ACE_thread_t id_;
>
> };
>
> int main(int argc, char *argv[])
> {
> Manager *manager = new Manager();
> manager->open();
> for(int i = 0; i<4; i++)
> {
> ACE_Message_Block *block = new ACE_Message_Block(sizeof(int));
> manager->put(block);
> ACE_OS::sleep(2);
> }
> ACE_OS::sleep(5);
> for(int i = 0; i<2; i++)
> {
> ACE_Message_Block *block = new ACE_Message_Block(sizeof(int),
> ACE_Message_Block::MB_HANGUP);
> manager->put(block);
> ACE_OS::sleep(1);
> }
> ACE_OS::sleep(10);
> delete manager;
> return 0;
>
> }
>
> And my output is like this:
> get a message. thread -1212511344
> get a message. thread -1220899952
> get a message. thread -1220899952
> get a message. thread -1220899952
> exit thread: -1220899952
> exit thread: -1220899952
> Did I missed something?
> Do I have to create message queue and thread pool by myself when
> working with a task framework?
>
> Thanks for any reply.
And it can be compiled with the following command:
g++ -I$ACE_ROOT -L$ACE_ROOT/lib -lpthread -ldl -lrt -lACE -D_REENTRANT
messagequeue.cpp -o messagequeue
More information about the Ace-users
mailing list