[Ace-users] Can I have multiple thread consume message from the same connection?
kun niu
haoniukun at gmail.com
Sun Dec 9 07:56:54 CST 2007
On 12月9日, 上午3时03分, schm... at dre.vanderbilt.edu (Douglas C. Schmidt)
wrote:
> Hi,
>
> >I'm sorry for the inconvenience I've caused.
> >>You can't have multiple threads simultanously reading from the same
> >>TCP connection. Please see the discussions in Chapters 4 and 6 of
> >>C++NPv2 <www.cs.wustl.edu/~schmidt/ACE/book2/> for examples of how to
> >>handle this situation with the ACE Reactor and ACE Task frameworks.
> >In actual fact, I'm not reading from the same tcp connection with
> >multiple thread.
>
> Ok, then you might want to rephrase your "Subject:" line since that's
> what you imply!
>
> >In fact, I'm using the Task framework.
> >I have one thread which reads message from the connection and puts the
> >message in the message queue created by the Task framework.
> >And several other threads process these messages created by thread
> >manager in the task.
>
> Ok.
>
> >Here's my problem-report-form:
>
> > ACE VERSION: 5.6
>
> Great, thanks for usin gthe PRF.
>
>
>
> > HOST MACHINE and OPERATING SYSTEM:
> >Intel i386
> >Debian lenny
>
> > COMPILER NAME AND VERSION (AND PATCHLEVEL):
> >g++ version 4.2.3 20071123 (prerelease) (Debian 4.2.2-4)
>
> > THE $ACE_ROOT/ace/config.h FILE [if you use a link to a platform-
> > specific file, simply state which one]:
> >#include "ace/config-linux.h"
>
> > THE $ACE_ROOT/include/makeinclude/platform_macros.GNU FILE
> >include $(ACE_ROOT)/include/makeinclude/platform_linux.GNU
>
> > DOES THE PROBLEM AFFECT:
> > EXECUTION?
> >my application is affected
>
> > SYNOPSIS:
> >messages put in the queue are processed several times.
>
> > DESCRIPTION:
> >My main problem is like this:
> >Can I create a task when opening a event_handler for reading contents
> >from the connection?
>
> I don't understand this question.
>
> >I find that after putting a message in the message queue by calling
> >task's putq method, I can continuously get message from the message
> >queue by calling getq.
>
> What do you mean by "continuously get message"?
>
>
>
> >Part of my code are listed here, please point out my error if any.
> >Thanks in advance.
> > class Read_Handler : public ACE_Event_Handler
> > {
> > protected:
> > int queued_count_;
> > int deferred_close_;
> > ACE_SYNCH_MUTEX lock_;
> > ACE_SOCK_Stream connection_;
> > IMCC_Task_Manager *manager_;
>
> > public:
> > Read_Handler(ACE_Reactor *r):
> > ACE_Event_Handler(r),
> > queued_count_(0),
> > deferred_close_(0),
> > manager_(0) {}
> > virtual int open();
> > virtual int handle_input(ACE_HANDLE);
> > virtual int handle_close(ACE_HANDLE, ACE_Reactor_Mask);
> > ACE_SOCK_Stream &peer() { return this->connection_; }
> > virtual ACE_HANDLE get_handle() const
> > { return this->connection_.get_handle(); }
> > };
> >int Read_Handler::open ()
> >{
> > this->manager_ = new Task_Manager ();
> > if (this->manager_ == NULL)
> > return -1;
> > if (this->manager_->open () == -1)
> > {
> > delete this->manager_;
> > return -1;
> > }
> > return reactor()->
> > register_handler (this,
> > ACE_Event_Handler::READ_MASK);
>
> >}
>
> >int Read_Handler::handle_input (ACE_HANDLE handle =
> >ACE_INVALID_HANDLE)
> >{
> > ACE_Message_Block *block = new ACE_Message_Block
> >(ACE_DEFAULT_CDR_BUFSIZE);
> > ssize_t bytes_received;
> > if((bytes_received = connection_.recv (block->wr_ptr (),
> > ACE_DEFAULT_CDR_BUFSIZE)) !=
> >-1 )
> > {
> > block->wr_ptr (bytes_received);
> > ACE_Message_Block *payload = NULL;
> > ACE_NEW_RETURN (payload,
> > ACE_Message_Block (reinterpret_cast<char *>
> >(this)), -1);
> > payload->cont (block);
> > ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, guard, lock_, -1);
> > if (this->manager_->put (payload) == -1)
> > {
> > payload->release ();
> > payload = 0;
> > return -1;
> > }
> > ++queued_count_;
> > return 0;
> > }
> > block->release ();
> > block = 0;
> > return -1;
> >}
>
> > class Task_Manager : public ACE_Task<ACE_MT_SYNCH>
> > {
> > public:
> > Task_Manager(ACE_Thread_Manager *thr_mgr = 0,
> > ACE_Message_Queue<ACE_MT_SYNCH> *mq = 0) :
> > ACE_Task<ACE_MT_SYNCH>(thr_mgr, mq) {}
> > virtual ~Task_Manager() {}
> > virtual int open (void * = 0)
> > { return activate (THR_NEW_LWP, MAX_THREADS); }
> > virtual int put (ACE_Message_Block *block, ACE_Time_Value *timeout
> >= 0)
> > { return putq(block, timeout); }
> > virtual int svc (void);
>
> >}
>
> >int Task_Manager::svc (void)
> >{
> > for (ACE_Message_Block *block; getq(block) != -1;)
> > {
> > Read_Handler *read_handler =
> > reinterpret_cast<Read_Handler *>(block->rd_ptr());
> > ACE_Message_Block *message = block->cont();
> > ACE_InputCDR cdr(message);
> > ACE_CDR::ULong length;
> > cdr>>length;
> > block->release();
> > }
> > return 0;
> >}
>
> This code looks right to me - please explain exactly where you are
> having a problem.
>
> Thanks,
>
> Doug
> --
> Dr. Douglas C. Schmidt Professor and Associate Chair
> Electrical Engineering and Computer Science TEL: (615) 343-8197
> Vanderbilt University WEB:www.dre.vanderbilt.edu/~schmidt
> Nashville, TN 37203 NET: d.schm... at vanderbilt.edu
Thank you for your reply.
And I've got a sample program here.
It turns out that only one thread exits on my system.
But messages are consumed well.
/*****************************************************
file: messagequeue.cpp
*****************************************************/
#include "ace/Task.h"
#include "ace/OS_NS_stdio.h"
#include "ace/OS_NS_stdlib.h"
#include "ace/OS_NS_time.h"
#include "ace/Message_Block.h"
#include "ace/Synch.h"
#include "ace/Thread_Manager.h"
#include "ace/Thread.h"
#include "ace/Message_Queue.h"
class Manager : public ACE_Task<ACE_MT_SYNCH>
{
public:
Manager(ACE_Thread_Manager *tm = 0, ACE_Message_Queue<ACE_MT_SYNCH>
*mq = 0)
: ACE_Task<ACE_MT_SYNCH> ( tm, mq ){}
virtual ~Manager() {}
virtual int open( void * = 0 )
{ return activate(THR_NEW_LWP, 2);}
virtual int put( ACE_Message_Block *block, ACE_Time_Value *timeout =
0 )
{ return putq(block, timeout); }
virtual int svc(void)
{
id_ = ACE_Thread::self();
while(1)
{
ACE_Message_Block *block = 0;
if(this->getq(block)==-1)
{
ACE_OS::printf("fail to get message. thread: %d\n", id_);
return -1;
}
if(block->msg_type()==ACE_Message_Block::MB_HANGUP)
{
ACE_OS::printf("exit thread: %d\n", id_);
block->release();
break;
}
ACE_OS::printf("get a message. thread %d\n", id_);
ACE_OS::sleep(1);
block->release();
}
return 0;
}
private:
ACE_thread_t id_;
};
int main(int argc, char *argv[])
{
Manager *manager = new Manager();
manager->open();
for(int i = 0; i<4; i++)
{
ACE_Message_Block *block = new ACE_Message_Block(sizeof(int));
manager->put(block);
ACE_OS::sleep(2);
}
ACE_OS::sleep(5);
for(int i = 0; i<2; i++)
{
ACE_Message_Block *block = new ACE_Message_Block(sizeof(int),
ACE_Message_Block::MB_HANGUP);
manager->put(block);
ACE_OS::sleep(1);
}
ACE_OS::sleep(10);
delete manager;
return 0;
}
And my output is like this:
get a message. thread -1212511344
get a message. thread -1220899952
get a message. thread -1220899952
get a message. thread -1220899952
exit thread: -1220899952
exit thread: -1220899952
Did I missed something?
Do I have to create message queue and thread pool by myself when
working with a task framework?
Thanks for any reply.
More information about the Ace-users
mailing list