[Ace-users] [ace-users] Can I have multiple thread consume message from the same connection?
Douglas C. Schmidt
schmidt at dre.vanderbilt.edu
Sat Dec 8 12:03:32 CST 2007
Hi,
>I've got such a problem.
Thanks very much for your email. To ensure that we have proper
version/platform/compiler information, please make sure you fill out
the appropriate problem report form (PRF), which is in
$ACE_ROOT/PROBLEM-REPORT-FORM
$TAO_ROOT/PROBLEM-REPORT-FORM
or in
$ACE_ROOT/BUG-REPORT-FORM
$TAO_ROOT/BUG-REPORT-FORM
in older versions of ACE+TAO. Make sure to include this information
when asking any questions about ACE+TAO since otherwise we have to
"guess" what version/platform/compiler/options you've using, which is
very error-prone and slows down our responsiveness. If you don't use
the PRF, therefore, it is less likely that someone from the core
ACE+TAO developer team will be able to answer your question.
Naturally, we encourage and appreciate other members of the ACE+TAO
user community who can respond to questions that they have the answers
to.
>My client program keeps a long TCP connection with my server program.
>In my read event handler's open method, I created a task and activated
>it.
>When I receive a message, I'll call the task's put method and put the
>message in the message_queue.
>But I found that each time a message arrives, the waiting threads
>which call getq are all invoked.
>This lead to a fatal error to my program.
>I wonder if my model is correct. And please help me out.
You can't have multiple threads simultanously reading from the same
TCP connection. Please see the discussions in Chapters 4 and 6 of
C++NPv2 <www.cs.wustl.edu/~schmidt/ACE/book2/> for examples of how to
handle this situation with the ACE Reactor and ACE Task frameworks.
Thanks,
Doug
>Thanks in advance.
>Here's part of my code:
>
> class Read_Handler : public ACE_Event_Handler
> {
> protected:
> int queued_count_;
> int deferred_close_;
> ACE_SYNCH_MUTEX lock_;
> ACE_SOCK_Stream connection_;
> IMCC_Task_Manager *manager_;
>
> public:
> Read_Handler(ACE_Reactor *r):
> ACE_Event_Handler(r),
> queued_count_(0),
> deferred_close_(0),
> manager_(0) {}
> virtual int open();
> virtual int handle_input(ACE_HANDLE);
> virtual int handle_close(ACE_HANDLE, ACE_Reactor_Mask);
> ACE_SOCK_Stream &peer() { return this->connection_; }
> virtual ACE_HANDLE get_handle() const
> { return this->connection_.get_handle(); }
> };
>int Read_Handler::open ()
>{
> this->manager_ = new Task_Manager ();
> if (this->manager_ == NULL)
> return -1;
> if (this->manager_->open () == -1)
> {
> LOG4CXX_DEBUG (Logger::getRootLogger(), "manager open fail");
> delete this->manager_;
> return -1;
> }
> LOG4CXX_DEBUG (Logger::getRootLogger(), "manager open ok");
> return reactor()->
> register_handler (this,
> ACE_Event_Handler::READ_MASK);
>}
>
>int Read_Handler::handle_input (ACE_HANDLE handle =
>ACE_INVALID_HANDLE)
>{
> ACE_Message_Block *block = new ACE_Message_Block
>(ACE_DEFAULT_CDR_BUFSIZE);
> ssize_t bytes_received;
> if((bytes_received = connection_.recv (block->wr_ptr (),
> ACE_DEFAULT_CDR_BUFSIZE)) != -1 )
> {
> block->wr_ptr (bytes_received);
> ACE_Message_Block *payload = NULL;
> ACE_NEW_RETURN (payload,
> ACE_Message_Block (reinterpret_cast<char *> (this)), -1);
> payload->cont (block);
> ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, guard, lock_, -1);
> if (this->manager_->put (payload) == -1)
> {
> payload->release ();
> payload = 0;
> return -1;
> }
> ++queued_count_;
> return 0;
> }
> block->release ();
> block = 0;
> return -1;
>}
> class Task_Manager : public ACE_Task<ACE_MT_SYNCH>
> {
> public:
> Task_Manager(ACE_Thread_Manager *thr_mgr = 0,
> ACE_Message_Queue<ACE_MT_SYNCH> *mq = 0) :
> ACE_Task<ACE_MT_SYNCH>(thr_mgr, mq) {}
> virtual ~Task_Manager() {}
> virtual int open (void * = 0)
> { return activate (THR_NEW_LWP, MAX_THREADS); }
> virtual int put (ACE_Message_Block *block, ACE_Time_Value *timeout
>= 0)
> { return putq(block, timeout); }
> virtual int svc (void);
>}
>
>int Task_Manager::svc (void)
>{
> for (ACE_Message_Block *block; getq(block) != -1;)
> {
> LOG4CXX_DEBUG (Logger::getRootLogger(), "get Message");
> Read_Handler *read_handler =
> reinterpret_cast<Read_Handler *>(block->rd_ptr());
> ACE_Message_Block *message = block->cont();
> LOG4CXX_DEBUG (Logger::getRootLogger(), "print message");
> ACE_InputCDR cdr(message);
> ACE_CDR::ULong length;
> cdr>>length;
> block->release();
> }
> return 0;
>}
--
Dr. Douglas C. Schmidt Professor and Associate Chair
Electrical Engineering and Computer Science TEL: (615) 343-8197
Vanderbilt University WEB: www.dre.vanderbilt.edu/~schmidt
Nashville, TN 37203 NET: d.schmidt at vanderbilt.edu
More information about the Ace-users
mailing list