[ace-users] ACE_TP_Reactor:random crash sometimes

Douglas C. Schmidt schmidt at dre.vanderbilt.edu
Tue Jun 12 08:26:18 CDT 2007


Hi Mihai,

   Thanks for the example.  It looks to me like you're concurrently dispatching
notification event handlers with I/O event handlers, is that correct?
If so, those aren't serialized, as per Steve's comments.

Thanks,

        Doug

> --- "Douglas C. Schmidt" <schmidt at dre.vanderbilt.edu>
> wrote:
> > 
> > Hi Mihai,
> > 
> >    That behavior shouldn't be happening in general,
> > but without seeing
> > your sample program it's hard to know what's going
> > on.  Can you post a
> > very simple example that illustrates what's
> > happening?
> 
> Sure. Here it is a simple example showing this
> behaviour:
> 
> RequestResponse_Handler.cpp:
> ----------------------------------------
> 
> #include "ace_common.h"
> #include "RequestResponse_Handler.h"
> using namespace std;
> 
> RequestResponse_Handler::RequestResponse_Handler
> (ACE_Thread_Manager *thr_mgr)
>     : ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH>
> (thr_mgr),
> 	notifier_ (0, this, ACE_Event_Handler::WRITE_MASK)
> {
>   this->reactor (ACE_Reactor::instance ());
> }
> 
> 
> int RequestResponse_Handler::open (void *p) {
>   ACE_DEBUG ((LM_DEBUG,
>              ACE_TEXT
> ("RequestResponse_Handler::open\n")));
>   if (super::open (p) == -1)
>     return -1;    
>   this->notifier_.reactor (this->reactor ());
>   this->msg_queue ()->notification_strategy
> (&this->notifier_);   
> }
> 
> 
> int RequestResponse_Handler::handle_output
> (ACE_HANDLE)
> {
>   ACE_DEBUG ((LM_DEBUG,
>              ACE_TEXT ("(%t)
> RequestResponse_Handler::handle_output\n")));  
> 
>   ACE_Message_Block *mb;
>   ACE_Time_Value nowait (ACE_OS::gettimeofday ());
>    
>   //here we send messages from the synchronized
> ACE_Message_Queue inherited from ACE_Task
>   while (-1 != this->getq (mb, &nowait))
>     {
>       ssize_t send_cnt =
>         this->peer ().send (mb->rd_ptr (), mb->length
> ());
> 	  if (send_cnt == -1) {
>         ACE_DEBUG ((LM_DEBUG,
>              ACE_TEXT ("ouputing error\n")));
> 		return -1;
> 	  }
>       else {
>         mb->rd_ptr (static_cast<size_t> (send_cnt));
>       }    
>       if (mb->length () > 0)
>         {
>           this->ungetq (mb);
>           break;
>         }
>       mb->release ();
>     }
> 
>  
> 	if (this->msg_queue ()->is_empty ()) 
>      this->reactor ()->cancel_wakeup
>       (this, ACE_Event_Handler::WRITE_MASK);
>     else
>     this->reactor ()->schedule_wakeup
>       (this, ACE_Event_Handler::WRITE_MASK);
> 
> 
>   return 0;
> }
> 
> int RequestResponse_Handler::handle_input (ACE_HANDLE
> fd)
> {
>   ACE_TCHAR buffer[BUFSIZ];
>   memset(buffer,0,BUFSIZ*sizeof(ACE_TCHAR));
>   ssize_t result = this->peer ().recv (buffer,BUFSIZ);
> 
>   if (result > 0)
>   {
> 
>       ACE_DEBUG ((LM_DEBUG,
>                   ACE_TEXT ("(%t) svr input; fd: 0x%x;
> input: %s\n"),
>                   fd,
>                   buffer));
> 
>       ACE_Message_Block *mb = new
> ACE_Message_Block(sizeof(buffer)+1);
>       mb->copy(buffer,sizeof(buffer));
>       this->putq(mb); 
> 
>       return -1;
>   }
>   else
>     ACE_DEBUG ((LM_DEBUG,
>                 ACE_TEXT ("(%t)
> RequestResponse_Handler: 0x%x peer closed (0x%x)\n"),
>                 this, fd));
>   return -1;
> }
> 
> int RequestResponse_Handler::handle_close (ACE_HANDLE
> fd, ACE_Reactor_Mask)
> {
>   ACE_DEBUG ((LM_DEBUG,
>               ACE_TEXT ("(%t) svr close; fd: 0x%x\n"),
>               fd));
> 
>   return super::handle_close();
> 
> }
> 
> 
> ----------------------------------
> main.cpp:
> ----------------------------------
> #include "ace_common.h"
> #include "RequestResponse_Handler.h"
> #include "WorkerThread.h"
> 
> 
> 
> static const ACE_TCHAR *rendezvous = ACE_TEXT
> ("127.0.0.1:10010");
> int svr_thrno = 3; // number of worker threads
> 
> typedef ACE_Strategy_Acceptor<RequestResponse_Handler,
> ACE_SOCK_ACCEPTOR> ACCEPTOR;
> 
> 
> int ACE_TMAIN (int, ACE_TCHAR *[])
> {
>   ACE_TP_Reactor sr;
>   ACE_Reactor new_reactor (&sr);
>   ACE_Reactor::instance (&new_reactor);
> 
>   ACCEPTOR acceptor;
>   ACE_INET_Addr accept_addr (rendezvous);
> 
>   if (acceptor.open (accept_addr) == -1)
>     ACE_ERROR_RETURN ((LM_ERROR,
>                        ACE_TEXT ("%p\n"),
>                        ACE_TEXT ("open")),
>                       1);
> 
>   ACE_DEBUG ((LM_DEBUG,
>              ACE_TEXT ("(%t) Spawning %d server
> threads...\n"),
>              svr_thrno));
> 
>   WorkerThread worker;
>   worker.activate (THR_NEW_LWP | THR_JOINABLE,
> svr_thrno);
> 
>   ACE_Thread_Manager::instance ()->wait ();
> 
>   return 0;
> }
> 
> 
> ----------------------------------
> WhorkerThread.h:
> ----------------------------------
> #include "ace_common.h"
> 
> #ifndef _WORKER_THREAD
> #define _WORKER_THREAD
> 
> static int reactor_event_hook (ACE_Reactor *)
> {
>   ACE_DEBUG ((LM_DEBUG,
>               ACE_TEXT ("(%t) handling events
> ....\n")));
> 
>   return 0;
> }
> 
> class WorkerThread : public ACE_Task_Base
> {
> public:
>   virtual int svc (void)
>   {
>     ACE_DEBUG ((LM_DEBUG,ACE_TEXT ("(%t) Running the
> event loop\n")));
> 
>     int result =ACE_Reactor::instance
> ()->run_reactor_event_loop(&reactor_event_hook);
> 
>     if (result == -1)
>       ACE_ERROR_RETURN ((LM_ERROR,
>                          ACE_TEXT ("(%t) %p\n"),
>                          ACE_TEXT ("Error handling
> events")),
>                         0);
> 
>     ACE_DEBUG ((LM_DEBUG,
>                 ACE_TEXT ("(%t) Done handling
> events.\n")));
> 
>     return 0;
>   }
> };
> 
> #endif
> 
> 
> 
> -----------------------------------
> RequestResponse_Handler.h:
> -----------------------------------
> 
> #ifndef __REQUEST_RESPONSE_HANDLER_H_
> #define __REQUEST_RESPONSE_HANDLER_H_
> 
> #include "ace_common.h"
> 
> ACE_BEGIN_VERSIONED_NAMESPACE_DECL
> class ACE_Thread_Manager;
> ACE_END_VERSIONED_NAMESPACE_DECL
> 
> class RequestResponse_Handler : public
> ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH>
> {
> 
>   typedef ACE_Svc_Handler<ACE_SOCK_STREAM,
> ACE_MT_SYNCH> super;
> 
> public:
>   RequestResponse_Handler (ACE_Thread_Manager *tm =
> 0);
>   virtual int open (void *p);
>   
>   virtual ~RequestResponse_Handler() {
> 	ACE_DEBUG ((LM_DEBUG,
>              ACE_TEXT ("(%t)
> ~RequestResponse_Handler()\n"))); 
> 	
>   }
>   
> 
> protected:
>   virtual int handle_input (ACE_HANDLE fd =
> ACE_INVALID_HANDLE);
>   virtual int handle_close (ACE_HANDLE fd,
> ACE_Reactor_Mask = 0);
>   virtual int handle_output (ACE_HANDLE);
> 
> private:
> 	ACE_Reactor_Notification_Strategy notifier_;
> };
> 
> #endif 
> 
> --------------------------------------------
> 
> 
> I'm testing the program with 'telnet localhost 10010'
> and I get the following output sometimes(and
> crash...still sometimes):
> 
> RequestResponse_Handler::open
> (2544) handling events ....
> (2544) handling events ....
> (2588) handling events ....
> (2544) svr input; fd: 0x704; input: g
> (2588) RequestResponse_Handler::handle_output
> (2544) svr close; fd: 0x704
> (2544) ~RequestResponse_Handler()
> (2544) handling events ....
> (2588) handling events ....
> 
> As you can see thread 2588 calls handle_output() on
> our handle while thread 2544 is still working with
> that handle:
>   
> (2544) svr input; fd: 0x704; input: g       (this is
> from handle_output() )
> (2588) RequestResponse_Handler::handle_output
> (2544) svr close; fd: 0x704  (this is from
> handle_close() )
> 
> 
> 
> 
>        
> ____________________________________________________________________________________
> Building a website is a piece of cake. Yahoo! Small Business gives you all the tools to get online.
> http://smallbusiness.yahoo.com/webhosting 
> 
> _______________________________________________
> ace-users mailing list
> ace-users at mail.cse.wustl.edu
> http://mail.cse.wustl.edu/mailman/listinfo/ace-users



More information about the Ace-users mailing list