[Ace-users] Can I have multiple thread consume message from the same connection?

kun niu haoniukun at gmail.com
Sat Dec 8 12:49:22 CST 2007


I'm sorry for the inconvenience I've caused.
>You can't have multiple threads simultanously reading from the same
>TCP connection.  Please see the discussions in Chapters 4 and 6 of
>C++NPv2 <www.cs.wustl.edu/~schmidt/ACE/book2/> for examples of how to
>handle this situation with the ACE Reactor and ACE Task frameworks.
In actual fact, I'm not reading from the same tcp connection with
multiple thread.
In fact, I'm using the Task framework.
I have one thread which reads message from the connection and puts the
message in the message queue created by the Task framework.
And several other threads process these messages created by thread
manager in the task.

Here's my problem-report-form:

    ACE VERSION: 5.6

    HOST MACHINE and OPERATING SYSTEM:
Intel i386
Debian lenny

    COMPILER NAME AND VERSION (AND PATCHLEVEL):
g++ version 4.2.3 20071123 (prerelease) (Debian 4.2.2-4)

    THE $ACE_ROOT/ace/config.h FILE [if you use a link to a platform-
    specific file, simply state which one]:
#include "ace/config-linux.h"

    THE $ACE_ROOT/include/makeinclude/platform_macros.GNU FILE
include $(ACE_ROOT)/include/makeinclude/platform_linux.GNU


    DOES THE PROBLEM AFFECT:
        EXECUTION?
my application is affected

    SYNOPSIS:
messages put in the queue are processed several times.

    DESCRIPTION:
My main problem is like this:
Can I create a task when opening a event_handler for reading contents
from the connection?
I find that after putting a message in the message queue by calling
task's putq method, I can continuously get message from the message
queue by calling getq.
Part of my code are listed here, please point out my error if any.
Thanks in advance.
  class Read_Handler : public ACE_Event_Handler
  {
  protected:
    int queued_count_;
    int deferred_close_;
    ACE_SYNCH_MUTEX lock_;
    ACE_SOCK_Stream connection_;
    IMCC_Task_Manager *manager_;

  public:
    Read_Handler(ACE_Reactor *r):
      ACE_Event_Handler(r),
      queued_count_(0),
      deferred_close_(0),
      manager_(0) {}
    virtual int open();
    virtual int handle_input(ACE_HANDLE);
    virtual int handle_close(ACE_HANDLE, ACE_Reactor_Mask);
    ACE_SOCK_Stream &peer() { return this->connection_; }
    virtual ACE_HANDLE get_handle() const
      { return this->connection_.get_handle(); }
  };
int Read_Handler::open ()
{
  this->manager_ = new Task_Manager ();
  if (this->manager_ == NULL)
    return -1;
  if (this->manager_->open () == -1)
  {
    delete this->manager_;
    return -1;
  }
  return reactor()->
    register_handler (this,
                      ACE_Event_Handler::READ_MASK);

}

int Read_Handler::handle_input (ACE_HANDLE handle =
ACE_INVALID_HANDLE)
{
  ACE_Message_Block *block = new ACE_Message_Block
(ACE_DEFAULT_CDR_BUFSIZE);
  ssize_t bytes_received;
  if((bytes_received = connection_.recv (block->wr_ptr (),
                                         ACE_DEFAULT_CDR_BUFSIZE)) !=
-1 )
  {
    block->wr_ptr (bytes_received);
    ACE_Message_Block *payload = NULL;
    ACE_NEW_RETURN (payload,
                    ACE_Message_Block (reinterpret_cast<char *>
(this)), -1);
    payload->cont (block);
    ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, guard, lock_, -1);
    if (this->manager_->put (payload) == -1)
    {
      payload->release ();
      payload = 0;
      return -1;
    }
    ++queued_count_;
    return 0;
  }
  block->release ();
  block = 0;
  return -1;
}

  class Task_Manager : public ACE_Task<ACE_MT_SYNCH>
  {
  public:
    Task_Manager(ACE_Thread_Manager *thr_mgr = 0,
                      ACE_Message_Queue<ACE_MT_SYNCH> *mq = 0) :
      ACE_Task<ACE_MT_SYNCH>(thr_mgr, mq) {}
    virtual ~Task_Manager() {}
    virtual int open (void * = 0)
    { return activate (THR_NEW_LWP, MAX_THREADS); }
    virtual int put (ACE_Message_Block *block, ACE_Time_Value *timeout
= 0)
    { return putq(block, timeout); }
    virtual int svc (void);

}

int Task_Manager::svc (void)
{
  for (ACE_Message_Block *block; getq(block) != -1;)
  {
    Read_Handler *read_handler =
      reinterpret_cast<Read_Handler *>(block->rd_ptr());
    ACE_Message_Block *message = block->cont();
    ACE_InputCDR cdr(message);
    ACE_CDR::ULong length;
    cdr>>length;
    block->release();
  }
  return 0;
}


More information about the Ace-users mailing list