本文共 3940 字,大约阅读时间需要 13 分钟。
ACE_Task 是ACE 中的任务或主动对象“处理结构”的基类。在ACE 中使用了此类来实现主动对象模式。所有希望成为“主动对象”的对象都必须从此类派生。你也可以把ACE_Task看作是更高级的、更为面向对象的线程类。相当于我们具体业务是继承ACE_Task这个类进行实现的。
ACE_Task处理的是对象,因而在构造OO程序时更便于思考。因此,在大多数情况下,当你需要构建多线程程序时,较好的选择是使用ACE_Task 的子类。这样做有若干好处。首要的是刚刚所提到的,这可以产生更好的OO软件。其次,你不必操心你的线程入口是否是静态的,因为ACE_Task 的入口是一个常规的成员函数。而且,我们会看到ACE_Task 还包括了一种用于与其他任务进行通信的易于使用的机制。
要创建任务,需要进行以下步骤:
ACE 中的每个任务都有一个底层消息队列(ACE_Message_Block),这个消息队列被用作任务间通信的一种方法。当一个任务想要与另一任务“谈话”时,它创建一个消息,并将此消息放入它想要与之谈话的任务的消息队列使用putq方法。接收任务通常用getq () 从消息队列里获取消息。如果队列中没有数据可用,它就进入休眠状态。如果有其他任务将消息插入它的队列,它就会苏醒过来,从队列中拾取数据并处理它。因而,在这种情况下,接收任务将从发送任务那里接收消息,并以应用特定的方式作出反馈。
这样的体系结构大大简化了多线程程序的编程模型
下一个例子演示两个任务怎样使用它们的底层消息队列进行通信。这个例子包含了经典的生产者-消费者问题的实现。生产者任务生成数据,将它发送给消费者任务。消费者任务随后消费这个数据。使用ACE_Task 构造,我们可将生产者和消费者看作是不同的ACE_Task 类型的对象。这两种任务使用底层消息队列进行通信。
#include "ace/Task.h" class ProduceAudio : public ACE_Task{public: ProduceAudio(ACE_Thread_Manager *thr_man=0, ACE_Message_Queue *mq=0); ~ProduceAudio(void); int open(void*); int svc(void);};
#include "ProduceAudio.h" #include "ace/Log_Msg.h"#include "ace/OS.h"#include "Converter.h"#includeusing namespace std; ProduceAudio::ProduceAudio(ACE_Thread_Manager *thr_man, ACE_Message_Queue *mq) :ACE_Task (thr_man,mq){} ProduceAudio::~ProduceAudio(void){ ACE_DEBUG((LM_DEBUG, "(%t) ~ProduceAudio()\n")); } int ProduceAudio::open(void*) { ACE_DEBUG((LM_DEBUG, "(%t) ProduceAudio task opened\n")); activate(THR_NEW_LWP,1); return 0; } int ProduceAudio::svc(void){ ACE_DEBUG((LM_DEBUG, "(%t) ProduceAudio::svc() running\n")); string s("message"); for ( int i=0;i<3;++i) { ACE_Message_Block * blk = new ACE_Message_Block(10); blk->copy( (s + lexical_cast (i)).c_str()); this->putq(blk); //this->put(blk); ACE_DEBUG((LM_DEBUG, "(%t) ProduceAudio::svc() put(%s),now msg_queue()->message_count()[%d]\n",blk->rd_ptr(), this->msg_queue()->message_count())); ACE_OS::sleep(1); } ACE_DEBUG((LM_DEBUG, "(%t) ProduceAudio::svc() return\n")); return 0;}
消费者类 获取从其他线程putq传送过来的底层数据,就是不同线程间的通信
#include "ace/Task.h" class SendToServer : public ACE_Task{public: SendToServer(ACE_Thread_Manager *thr_man=0, ACE_Message_Queue *mq=0); ~SendToServer(void); int open(void*); int svc(void);};
#include "SendToServer.h" #include "ace/OS.h"#includemain函数using namespace std; SendToServer::SendToServer(ACE_Thread_Manager *thr_man, ACE_Message_Queue *mq) :ACE_Task (thr_man,mq){} SendToServer::~SendToServer(void){ ACE_DEBUG((LM_DEBUG, "(%t) ~SendToServer()\n")); } int SendToServer::open(void*) { ACE_DEBUG((LM_DEBUG, "(%t) SendToServer task opened\n")); activate(THR_NEW_LWP,1); return 0; } int SendToServer::svc(void){ ACE_DEBUG((LM_DEBUG, "(%t) SendToServer::svc() running\n")); ACE_Message_Block * blk = NULL; int count =0; for ( ; count<3;) { if (this->msg_queue()->message_count()>0) { this->getq(blk); ++count; ACE_DEBUG((LM_DEBUG,"SendToServer get :%s\n",blk->rd_ptr())); blk->release(); } ACE_OS::sleep(1); } ACE_DEBUG((LM_DEBUG, "(%t) SendToServer::svc() return\n")); return 0;}
#include "ace/Thread_Manager.h"#include "SendToServer.h"#include "ProduceAudio.h" #ifdef _DEBUG #pragma comment (lib,"ACEd.lib") #else #pragma comment (lib,"ACE.lib") #endif int main(int argc, char* argv[]){ SendToServer consumer(NULL,NULL); ProduceAudio producer(NULL,consumer.msg_queue()); producer.open(NULL); consumer.open(NULL); ACE_Thread_Manager::instance()->wait(); return 0;}
转载地址:http://ywucn.baihongyu.com/