[Ace-users] How to generate flow-control with Proactor correctly?

Fan fanyang32 at gmail.com
Tue Aug 21 10:49:58 CDT 2007


ACE VERSION: 5.2.1

HOST MACHINE and OPERATING SYSTEM: x86 (P-III), FreeBSD 6
COMPILER NAME AND VERSION (AND PATCHLEVEL): g++ (GCC) 3.4.6

AREA/CLASS/EXAMPLE AFFECTED: /examples/C++NPv2/
AIO_Client_Logging_Daemon.cpp

DOES THE PROBLEM AFFECT:
EXECUTION? yes

SYNOPSIS: How to generate flow-control with Proactor correctly

DESCRIPTION: I'm trying to develop a proxy with ACE, it is based
loosely on the code in AIO_Client_Logging_Daemon.cpp.  it has
AIO_input_handler, AIO_output_handler, connector, acceptor and
proactor components. Like a proxy, the acceptor will create an
AIO_input_handler upon clients connections, and the acceptor use
connector to connect to a server. When server is connected, the
connector will create an AIO_output_handler. The AIO_input_handler and
the AIO_output_handler pair will deal with data communication between
the client and the server. In order to deal with flow control, both of
AIO_input_handler and AIO_output_handler are inheritance from
ACE_Task<ACE_NULL_SYNCH>. Basically, the writer would write a
Message_Block into the Message_Queue, which would buffer it if there
was space available in the Queue, call Write_Stream::write() if the
queue is empty, or return an error code EWOULDBLOCK if there is no
space, causing the reader to stop executing subsequent reads. At the
same time, the handle_write_event for the stream would deal with
writes. For the partial writes, it will wrtiethe unwritten bytes
again. For the full writes, It will get Message_Blocks from queue and
write, also it will let reader read again.

However, there is still a problem(code snippet is following). It
appears, for example, when the AIO_out_handler's reader is faster than
the AIO_input_handler's writer, AIO_input_handler's writer queue is
full and the stream is under flow-control, AIO_out_handler's reader
will stop reading from the server. In normal case, AIO_input_handler's
handle_write_stream event will get Message_Blocks from queue and call
writer's write(), congetion will be alleviated, and turn on flag for
out_handler's reader to read again. But If there is an error(in
general it is "Resource temporarily unavailable") in
handle_write_stream event, it has to call start_write(0) again in
order to let proactor raise write event again. Otherwise due to single
thread of my application, no one will call writer's write() in the
application(output_handler has stopped reading), and input_handler's
queue will be full for ever. But sometimes it works. Sometimes it
dosen't work, it called start_write(0) when error occurred inside
handle_write_event, later handle write event again, call
start_write(0) again upon error like a loop, it repeat this process
for 2 or 3 times, then proactor will never raise write event any
more.

I'm new to ACE. Would you please tell me what should I do in such
condition? What's the best way to do flow control with Proactor in
ACE? Can you tell me is there an example in ACE which shows how to do
flow control with Proactor? I know Gateway example does flow control
but the example is for Reactor.


BTW, in the example , it use the line to control can_write or not:
can_write_ = handle () == result.handle (); would you please explain
the reason?

/************************************* Partial code
************************************************************************/
void AIO_Input_Handler::handle_write_stream(const
ACE_Asynch_Write_Stream::Result &result)
{
  ACE_Message_Block &mblk = result.message_block ();
  if (!result.success ()) {
    //mblk.rd_ptr(mblk.base());
    //ungetq (&mblk);
    if(result.error() == EPIPE){
      delete this;
    }
    else{
       start_write(0);
    }
  }
  else {
    can_write_ = handle () == result.handle ();
    if (mblk.length () == 0) {
      mblk.release ();
      if (can_write_){
	start_write(0);
      }
    }
    else if (can_write_){
      start_write (&mblk);
    }
    else{
      mblk.rd_ptr (mblk.base ());
      ungetq (&mblk);
      can_write_ = 1;
    }
  }



More information about the Ace-users mailing list