[Ace-users] How to use TP_Reactor + Manager/Work mode thread pool

jackyhwei roarsoft at gmail.com
Tue Oct 30 01:02:24 CDT 2007


Hello sir,

I'm in a case which i need to build up a tcp socket server, and i
wanna use reactor framework to setup my socket server, and use
Mananger/Worker mode queue to get reactor & response to client socket.
i wrote my main code as below, but i find i can not activate my thread
CManagerTask.

so i wondering , does there any matter if i mixed ACE_Task &
ACE_Reactor?

BTW: Fedora Core 6 + ACE5.6.0
////////////////////////////////////////////////////////////////////////////////////
class CReactorTask : public ACE_Task<ACE_MT_SYNCH>
{
public:
	CReactorTask(void);
public:
	~CReactorTask(void);

public:
	virtual int svc(void)
	{
		ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Running the event loop\n")));

		int result = ACE_Reactor::instance ()-
>run_reactor_event_loop(&reactor_event_hook);

		if (result == -1)
			ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("(%t) %p\n"), ACE_TEXT
("Error handling events")), 0);

		ACE_DEBUG ((LM_DEBUG,ACE_TEXT ("(%t) Done handling events.\n")));

		return 0;
	};

	static int reactor_event_hook (ACE_Reactor *)
	{
		ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) handling events ....\n")));

		return 0;
	}
};

class CManagerTask: public ACE_Task<ACE_MT_SYNCH>
//class CManagerTask: public ACE_Task_Base, private IManager
{
public:
	CManagerTask(void)
		: shutdown_(0), workers_lock_(), workers_cond_(workers_lock_)
	{
		UA_LOG_FUNC((LM_DEBUG, "11111\n"));
	};

	~CManagerTask(void);

public:
	virtual int svr()
	{
                       ACE_DEBUG((LM_DEBUG, "111111111111111\n"));
	};

public:
	//virtual int return_to_work(CWorkerTask *worker);
	int return_to_work(CWorkerTask *worker);

private:
	int create_worker_pool(int nPoolSize);

private:
	int shutdown_;
	ACE_Thread_Mutex workers_lock_;
	ACE_Condition<ACE_Thread_Mutex> workers_cond_;
	ACE_Unbounded_Queue<CWorkerTask* > workers_;
};

class CWorkerTask : public ACE_Task<ACE_MT_SYNCH>
{
public:
	CWorkerTask(IManager* pManager);
	~CWorkerTask(void);

public:
	virtual int svc()
	{
                       ACE_DEBUG((LM_DEBUG, "ddddddd\n"));
	};

private:
	IManager *manager_;
	ACE_thread_t thread_id_;
};


int ACE_TMAIN (int, ACE_TCHAR *[])
{
	ACE_TP_Reactor sr;
	ACE_Reactor new_reactor (&sr);
	ACE_Reactor::instance (&new_reactor);

	int svr_thrno = 5;

	//Setup TCP Socket, bind port 8014
	ACE_INET_Addr addr_ (8014);
	CReactorAcceptor acceptor;
	acceptor.reactor (ACE_Reactor::instance ());
	if (acceptor.open (addr_) == -1)
		return 1;

	UA_LOG_FUNC((LM_DEBUG, "TCP Socket Create Successfully, Bind Port
8014\n"));

	//Spawn server thread
	CReactorTask tcpServer;
	tcpServer.activate (THR_NEW_LWP | THR_JOINABLE, svr_thrno);
	ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Spawning %d server threads...
\n"), svr_thrno));

	CManagerTask manager;
	manager.activate();

	// Setup timer controller
	ACE_Time_Value foo_tv (1);
	CTimerHandler vTimer;
	if (ACE_Reactor::instance()->schedule_timer(&vTimer, 0, foo_tv,
foo_tv) == -1)
		ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "schedule_timer"), -1);

	ACE_Thread_Manager::instance()->wait();

	return (0);
}



More information about the Ace-users mailing list