[Ace-users] [ace-users] Can I have multiple thread consume message from the same connection?

Douglas C. Schmidt schmidt at dre.vanderbilt.edu
Sat Dec 8 13:03:16 CST 2007


Hi,

>I'm sorry for the inconvenience I've caused.
>>You can't have multiple threads simultanously reading from the same
>>TCP connection.  Please see the discussions in Chapters 4 and 6 of
>>C++NPv2 <www.cs.wustl.edu/~schmidt/ACE/book2/> for examples of how to
>>handle this situation with the ACE Reactor and ACE Task frameworks.
>In actual fact, I'm not reading from the same tcp connection with
>multiple thread.

Ok, then you might want to rephrase your "Subject:" line since that's
what you imply!

>In fact, I'm using the Task framework.
>I have one thread which reads message from the connection and puts the
>message in the message queue created by the Task framework.
>And several other threads process these messages created by thread
>manager in the task.

Ok.

>Here's my problem-report-form:
>
>    ACE VERSION: 5.6

Great, thanks for usin gthe PRF.

>    HOST MACHINE and OPERATING SYSTEM:
>Intel i386
>Debian lenny
>
>    COMPILER NAME AND VERSION (AND PATCHLEVEL):
>g++ version 4.2.3 20071123 (prerelease) (Debian 4.2.2-4)
>
>    THE $ACE_ROOT/ace/config.h FILE [if you use a link to a platform-
>    specific file, simply state which one]:
>#include "ace/config-linux.h"
>
>    THE $ACE_ROOT/include/makeinclude/platform_macros.GNU FILE
>include $(ACE_ROOT)/include/makeinclude/platform_linux.GNU
>
>
>    DOES THE PROBLEM AFFECT:
>        EXECUTION?
>my application is affected
>
>    SYNOPSIS:
>messages put in the queue are processed several times.
>
>    DESCRIPTION:
>My main problem is like this:
>Can I create a task when opening a event_handler for reading contents
>from the connection?

I don't understand this question.

>I find that after putting a message in the message queue by calling
>task's putq method, I can continuously get message from the message
>queue by calling getq.

What do you mean by "continuously get message"?

>Part of my code are listed here, please point out my error if any.
>Thanks in advance.
>  class Read_Handler : public ACE_Event_Handler
>  {
>  protected:
>    int queued_count_;
>    int deferred_close_;
>    ACE_SYNCH_MUTEX lock_;
>    ACE_SOCK_Stream connection_;
>    IMCC_Task_Manager *manager_;
>
>  public:
>    Read_Handler(ACE_Reactor *r):
>      ACE_Event_Handler(r),
>      queued_count_(0),
>      deferred_close_(0),
>      manager_(0) {}
>    virtual int open();
>    virtual int handle_input(ACE_HANDLE);
>    virtual int handle_close(ACE_HANDLE, ACE_Reactor_Mask);
>    ACE_SOCK_Stream &peer() { return this->connection_; }
>    virtual ACE_HANDLE get_handle() const
>      { return this->connection_.get_handle(); }
>  };
>int Read_Handler::open ()
>{
>  this->manager_ = new Task_Manager ();
>  if (this->manager_ == NULL)
>    return -1;
>  if (this->manager_->open () == -1)
>  {
>    delete this->manager_;
>    return -1;
>  }
>  return reactor()->
>    register_handler (this,
>                      ACE_Event_Handler::READ_MASK);
>
>}
>
>int Read_Handler::handle_input (ACE_HANDLE handle =
>ACE_INVALID_HANDLE)
>{
>  ACE_Message_Block *block = new ACE_Message_Block
>(ACE_DEFAULT_CDR_BUFSIZE);
>  ssize_t bytes_received;
>  if((bytes_received = connection_.recv (block->wr_ptr (),
>                                         ACE_DEFAULT_CDR_BUFSIZE)) !=
>-1 )
>  {
>    block->wr_ptr (bytes_received);
>    ACE_Message_Block *payload = NULL;
>    ACE_NEW_RETURN (payload,
>                    ACE_Message_Block (reinterpret_cast<char *>
>(this)), -1);
>    payload->cont (block);
>    ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, guard, lock_, -1);
>    if (this->manager_->put (payload) == -1)
>    {
>      payload->release ();
>      payload = 0;
>      return -1;
>    }
>    ++queued_count_;
>    return 0;
>  }
>  block->release ();
>  block = 0;
>  return -1;
>}
>
>  class Task_Manager : public ACE_Task<ACE_MT_SYNCH>
>  {
>  public:
>    Task_Manager(ACE_Thread_Manager *thr_mgr = 0,
>                      ACE_Message_Queue<ACE_MT_SYNCH> *mq = 0) :
>      ACE_Task<ACE_MT_SYNCH>(thr_mgr, mq) {}
>    virtual ~Task_Manager() {}
>    virtual int open (void * = 0)
>    { return activate (THR_NEW_LWP, MAX_THREADS); }
>    virtual int put (ACE_Message_Block *block, ACE_Time_Value *timeout
>= 0)
>    { return putq(block, timeout); }
>    virtual int svc (void);
>
>}
>
>int Task_Manager::svc (void)
>{
>  for (ACE_Message_Block *block; getq(block) != -1;)
>  {
>    Read_Handler *read_handler =
>      reinterpret_cast<Read_Handler *>(block->rd_ptr());
>    ACE_Message_Block *message = block->cont();
>    ACE_InputCDR cdr(message);
>    ACE_CDR::ULong length;
>    cdr>>length;
>    block->release();
>  }
>  return 0;
>}

This code looks right to me - please explain exactly where you are
having a problem.

Thanks,

Doug
-- 
Dr. Douglas C. Schmidt                       Professor and Associate Chair
Electrical Engineering and Computer Science  TEL: (615) 343-8197
Vanderbilt University                        WEB: www.dre.vanderbilt.edu/~schmidt
Nashville, TN 37203                          NET: d.schmidt at vanderbilt.edu



More information about the Ace-users mailing list