[Ace-users] Can I have multiple thread consume message from the same connection?

kun niu haoniukun at gmail.com
Sat Dec 8 08:53:14 CST 2007


Dear all,
I've got such a problem.
My client program keeps a long TCP connection with my server program.
In my read event handler's open method, I created a task and activated
it.
When I receive a message, I'll call the task's put method and put the
message in the message_queue.
But I found that each time a message arrives, the waiting threads
which call getq are all invoked.
This lead to a fatal error to my program.
I wonder if my model is correct. And  please help me out.
Thanks in advance.
Here's part of my code:

  class Read_Handler : public ACE_Event_Handler
  {
  protected:
    int queued_count_;
    int deferred_close_;
    ACE_SYNCH_MUTEX lock_;
    ACE_SOCK_Stream connection_;
    IMCC_Task_Manager *manager_;

  public:
    Read_Handler(ACE_Reactor *r):
      ACE_Event_Handler(r),
      queued_count_(0),
      deferred_close_(0),
      manager_(0) {}
    virtual int open();
    virtual int handle_input(ACE_HANDLE);
    virtual int handle_close(ACE_HANDLE, ACE_Reactor_Mask);
    ACE_SOCK_Stream &peer() { return this->connection_; }
    virtual ACE_HANDLE get_handle() const
      { return this->connection_.get_handle(); }
  };
int Read_Handler::open ()
{
  this->manager_ = new Task_Manager ();
  if (this->manager_ == NULL)
    return -1;
  if (this->manager_->open () == -1)
  {
    LOG4CXX_DEBUG (Logger::getRootLogger(), "manager open fail");
    delete this->manager_;
    return -1;
  }
  LOG4CXX_DEBUG (Logger::getRootLogger(), "manager open ok");
  return reactor()->
    register_handler (this,
		      ACE_Event_Handler::READ_MASK);
}

int Read_Handler::handle_input (ACE_HANDLE handle =
ACE_INVALID_HANDLE)
{
  ACE_Message_Block *block = new ACE_Message_Block
(ACE_DEFAULT_CDR_BUFSIZE);
  ssize_t bytes_received;
  if((bytes_received = connection_.recv (block->wr_ptr (),
					 ACE_DEFAULT_CDR_BUFSIZE)) != -1 )
  {
    block->wr_ptr (bytes_received);
    ACE_Message_Block *payload = NULL;
    ACE_NEW_RETURN (payload,
		    ACE_Message_Block (reinterpret_cast<char *> (this)), -1);
    payload->cont (block);
    ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, guard, lock_, -1);
    if (this->manager_->put (payload) == -1)
    {
      payload->release ();
      payload = 0;
      return -1;
    }
    ++queued_count_;
    return 0;
  }
  block->release ();
  block = 0;
  return -1;
}
  class Task_Manager : public ACE_Task<ACE_MT_SYNCH>
  {
  public:
    Task_Manager(ACE_Thread_Manager *thr_mgr = 0,
		      ACE_Message_Queue<ACE_MT_SYNCH> *mq = 0) :
      ACE_Task<ACE_MT_SYNCH>(thr_mgr, mq) {}
    virtual ~Task_Manager() {}
    virtual int open (void * = 0)
    { return activate (THR_NEW_LWP, MAX_THREADS); }
    virtual int put (ACE_Message_Block *block, ACE_Time_Value *timeout
= 0)
    { return putq(block, timeout); }
    virtual int svc (void);
}

int Task_Manager::svc (void)
{
  for (ACE_Message_Block *block; getq(block) != -1;)
  {
    LOG4CXX_DEBUG (Logger::getRootLogger(), "get Message");
    Read_Handler *read_handler =
      reinterpret_cast<Read_Handler *>(block->rd_ptr());
    ACE_Message_Block *message = block->cont();
    LOG4CXX_DEBUG (Logger::getRootLogger(), "print message");
    ACE_InputCDR cdr(message);
    ACE_CDR::ULong length;
    cdr>>length;
    block->release();
  }
  return 0;
}


More information about the Ace-users mailing list