合肥市科技中心网站,做企业网站服务,网站建设第二年费用,桂林旅游1.线程池简介
我们知道在线程池是一种多线程处理形式#xff0c;处理过程中我们将相应的任务提交给线程池#xff0c;线程池会分配对应的工作线程执行任务或存放在任务队列中#xff0c;等待执行。
面向对象编程中#xff0c;创建和销毁对象是需要消耗一定时间的#xff0…1.线程池简介
我们知道在线程池是一种多线程处理形式处理过程中我们将相应的任务提交给线程池线程池会分配对应的工作线程执行任务或存放在任务队列中等待执行。
面向对象编程中创建和销毁对象是需要消耗一定时间的因为创建一个对象要获取内存资源或者其它更多资源。所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数特别是一些很耗资源的对象创建和销毁。如何利用已有对象来服务就是一个需要解决的关键问题其实这就是一些池化资源技术产生的原因。当然线程池也同样适用这种思想。
因为线程的创建和销毁时需要消耗一定的时间的。假设线程的创建消耗T1线程执行任务的时间T2线程的销毁销毁T3。当T1 T3 T2时候使用线程池技术通过线程的复用就能提高程序的性能。
2.线程池的作用需要大量的线程来完成任务且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务使用线程池技术是非常合适的。因为单个任务小而任务数量巨大你可以想象一个热门网站的点击次数。 但对于长时间的任务比如一个Telnet连接请求线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。对性能要求苛刻的应用比如要求服务器迅速响应客户请求。接受突发性的大量请求但不至于使服务器因此产生大量线程的应用。突发性大量客户请求在没有线程池情况下将产生大量线程虽然理论上大部分操作系统线程数目最大值不是问题短时间内产生大量线程可能使内存到达极限并出现OutOfMemory的错误。
3.线程池的实现我们这里实现的线程池是通过类CThreadPool来实现的。这个线程池类的构造函数如下所示[cpp] view plain copyCThreadPool(int corePoolSize, int maximumPoolSize, int keepAliveTime, CBlockingQueueTask *pQueue); 构造函数参数的含义corePoolSize核心池的大小在创建了线程池后默认情况下线程池中并没有任何线程而是等待有任务到来才创建线程去执行任务默认情况下在创建了线程池之后线程池中的线程数为0当有任务到来之后就会创建一个线程去执行任务当线程池中的线程数目达到corePoolSize后就会把达到的任务放到缓存对队列当中。maximumPoolSize线程池最大线程数表示在线程池中最多能创建多少个线程。keepAliveTime表示线程没有任务执行时最多保持多久时间会终止。默认情况下只有当线程池中的线程数大于corePoolSize时keepAliveTime才会起作用直到线程池中的线程数不大于corePoolSize即当线程池中的线程数大于corePoolSize时如果一个线程空闲的时间达到keepAliveTime则会终止直到线程池中的线程数不超过corePoolSize。CBlockingQueueTask任务队列用来存放提交到线程池的任务。我们实现的线程池的大概原理是在创建线程池之后线程池中并没有任何工作线程的当使用线程池提供的execute方法向线程池提交任务的时候如果线程池中存在空闲的工作线程那么就会复用该工作线程并不会去创建新的工作线程但是如果没有工作线程或空闲的工作线程并且当然的工作线程数量小于核心池的大小时候会创建一个工作线程去执行任务若当前的工作线程数量达到了核心池的数量那么就会将任务放入到队列中去若队列满的情况下如果没有达到线程池的最大线程数那么就会将创建新的工作线程去执行任务若线程数达到了最大的线程数那么我们是抛出异常这里没有提供一个拒绝的策略后续有时间的会处理目前就向采用抛出异常并且在当前的工作线程数大于核心池数的时候会有超时机制关闭指定某时间的空闲工作线程直到等于核心池的大小。当然目前的实现还是有缺陷的线程是结束了当时并没释放到资源目前没想到好的方法。
为了简化线程池的配置我们提供了一个工厂类来进行线程池的创建。我们的工厂类支持创建三种线程池newCachedThreadPool创建一个可缓存线程池如果线程池长度超过处理需要可灵活回收空闲线程若无可回收则新建线程。newFixedThreadPool 创建一个定长线程池可控制线程最大并发数超出的线程会在队列中等待。newSingleThreadExecutor 创建一个单线程化的线程池它只会用唯一的工作线程来执行任务保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。整个实现的原理就是这样下面 直接上实现我们的实现只适用Linux上并且需要依赖boost库。一些实现是之前写的库。这里只贴出新实现的代码。
1.FactoryThreadPool.h[cpp] view plain copy#ifndef __FACTORY_THREAD_POOL_H__ #define __FACTORY_THREAD_POOL_H__ #include ThreadPool.h #include LinkedBlockingQueue.h //创建线程池的工厂类一般直接使用工厂类创建所需的线程池 //共提供三种线程池固定大小线程池、可缓存的线程池、单个后台线程 class CFactoryThreadPool { public: CFactoryThreadPool() { } ~CFactoryThreadPool() { } //固定大小线程池。每次提交一个任务就创建一个线程池直到线程池达到 //线程池的最大大小。线程池的大小一旦达到最大值就会保持不变。 CAbstractThreadPool *newFixedThreadPool(int iThreads) { CBlockingQueueTask *pBlockingQueue new CLinkedBlockingQueueTask(); return new CThreadPool(iThreads, iThreads, 0, pBlockingQueue); } //可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程那么 //就会回收部分空闲(60s不执行任务)的线程当任务数增加是此线程池 //又可以智能的添加新线程来处理任务线程池的大小依赖于操作系统能创建 //的最大线程的大小 CAbstractThreadPool *newCachedThreadPool() { CBlockingQueueTask *pBlockingQueue new CLinkedBlockingQueueTask(1); return new CThreadPool(0, INT_MAX, 60, pBlockingQueue); } //单个后台线程。这个线程池只有一个线程在工作也就是相当于单线程串行 //执行所有任务。保证所有任务的执行按照提交的顺序。 CAbstractThreadPool * newSingleThreadExecutor() { CBlockingQueueTask *pBlockingQueue new CLinkedBlockingQueueTask(); return new CThreadPool(1, 1, 0, pBlockingQueue); } }; #endif //#ifndef __FACTORY_THREAD_POOL_H__ 2.AbstractThreadPool.h[cpp] view plain copy#ifndef __ABSTRACT_THREAD_POOL_H__ #define __ABSTRACT_THREAD_POOL_H__ #include TaskFuncExecute.h //抽象基类 class CAbstractThreadPool { public: CAbstractThreadPool() { } virtual ~CAbstractThreadPool() { } virtual void execute(const Task task) 0; virtual void shutdown() 0; }; #endif //#ifndef __ABSTRACT_THREAD_POOL_H__ 3.TaskFuncExecute.h[cpp] view plain copy#ifndef __TASK_FUNC_EXECUTE_H__ #define __TASK_FUNC_EXECUTE_H__ #include boost/function.hpp typedef boost::functionvoid(void) Task; //类CTaskFuncExecute用来执行boost::function函数对象里面实现了execute class CTaskFuncExecute { public: CTaskFuncExecute(const Task task nullTask) :m_TaskFunction(task) { } ~CTaskFuncExecute() { } CTaskFuncExecute(const CTaskFuncExecute Excute) { m_TaskFunction Excute.m_TaskFunction; } CTaskFuncExecute operator(const CTaskFuncExecute Excute) { m_TaskFunction Excute.m_TaskFunction; return *this; } void execute() { m_TaskFunction(); } //定义一个空任务实际上不做什么 static void nullTask(void) { //nothing } private: Task m_TaskFunction; }; #endif //#ifndef __TASK_FUNC_EXECUTE_H__ 4.Thread.h[cpp] view plain copy#ifndef __THREAD_H__ #define __THREAD_H__ #include pthread.h #include string #include boost/function.hpp #include TaskFuncExecute.h #include Condition.h //CThread实现了对Linux中的一些线程操作的封装 class CThread { friend void *startThread(void *pObj); public: typedef boost::functionvoid(void) ThreadFunc; enum ThreadStatus {RUNNING, EXIT, JOIN}; CThread(const ThreadFunc Func, const std::string strName); virtual ~CThread(); void start(const Task InitTask); int join(); void exit(void *rval_ptr); void setname(std::string strName); const std::string name(void); pthread_t getSelfId(); ThreadStatus getThreadStatus(); private: ThreadStatus m_Status; //线程的状态 pthread_t m_tId; //线程标识 ThreadFunc m_Func; CMutexLock m_Mutex; std::string m_strThreadName; //线程名 }; #endif //#ifndef __THREAD_H__ 5.thread.cpp[cpp] view plain copy#include Thread.h #include Logger.h #include Exception.h using namespace Log; using namespace Exception; //一个辅助类,实现运行boost::function函数对象 struct threadData { typedef CThread::ThreadFunc ThreadFunc; ThreadFunc m_Func; CThread *m_pThis; Task m_Task; threadData(const ThreadFunc Func, CThread *pThis, const Task InitTask) :m_Func(Func) ,m_pThis(pThis) ,m_Task(InitTask) { } void runThreadFunc() { try { m_Func(); } catch (std::exception ex) { LOG_FATAL runThreadFunc exception : ex.what(); } catch (...) { LOG_FATAL runThreadFunc unknow exception; } } }; void *startThread(void *pObj) { try { if (pObj NULL) throw CException(startThread parament obj is null); threadData *pData static_castthreadData *(pObj); pData-m_Task(); pData-runThreadFunc(); pData-m_pThis-m_Mutex.lock(); pData-m_pThis-m_Status CThread::EXIT; delete pData; pData-m_pThis-m_Mutex.unlock(); } catch (const CException ex) { LOG_FATAL throw exception : ex.what(); } return NULL; } CThread::CThread(const ThreadFunc Func, const std::string strName) :m_Status(RUNNING) ,m_tId(0) ,m_Func(Func) ,m_Mutex() ,m_strThreadName(strName) { } CThread::~CThread() { if (m_Status ! JOIN) { ::pthread_detach(m_tId); } } void CThread::start(const Task InitTask) { threadData *pData new threadData(m_Func, this, InitTask); int iRet ::pthread_create(m_tId, NULL, startThread, pData); if (iRet ! 0) { //创建线程失败认为这是个致命错误会终止程序的运行 LOG_FATAL pthread_create false return err iRet; } LOG_INFO create thread : m_strThreadName tid m_tId; } int CThread::join() { m_Mutex.lock(); if (m_Status JOIN) { m_Mutex.unlock(); //重复的调用join这里需要向日志系统输出错误信息。 LOG_ERROR repeat call pthread_join; return -1; } m_Mutex.unlock(); LOG_INFO join thread, tid m_tId; int iRet ::pthread_join(m_tId, NULL); m_Mutex.lock(); m_Status JOIN; m_Mutex.unlock(); return iRet; } void CThread::exit(void *rval_ptr) { ::pthread_exit(rval_ptr); } const std::string CThread::name(void) { CMutexLockPart lock(m_Mutex); return m_strThreadName; } void CThread::setname(std::string strName) { CMutexLockPart lock(m_Mutex); m_strThreadName strName; } pthread_t CThread::getSelfId() { return ::pthread_self(); } CThread::ThreadStatus CThread::getThreadStatus() { CMutexLockPart lock(m_Mutex); return m_Status; } 6.ThreadPool.h[cpp] view plain copy#ifndef __THREAD_POOL_H__ #define __THREAD_POOL_H__ #include boost/ptr_container/ptr_vector.hpp #include AbstractThreadPool.h #include BlockingQueue.h #include Thread.h #include TaskFuncExecute.h //BlockingQueue由使用者申请在析构程池的时候释放 class CThreadPool : public CAbstractThreadPool { public: CThreadPool(int corePoolSize, int maximumPoolSize, int keepAliveTime, CBlockingQueueTask *pQueue); ~CThreadPool(); void execute(const Task task); void shutdown(); private: void allWorker(const Task task); bool addBlockingQueue(const Task task); void runInWorkerThread(); Task getTask(); enum ThreadPoolStatus {RUNNING, SHUTDOWN}; ThreadPoolStatus m_eStatue; int m_iCorePoolSize; //核心池的大小 int m_iMaximumPoolSize; //最大的大小 int m_iKeepAliveTime; //空闲线程等待新任务的最长时间.0表示不使用 int m_iCurrentThreadSum; //当前的工作线程 int m_iIdleThreadSum; //空闲的工作线程 CMutexLock m_MutexLock; CCondition m_Condition; CBlockingQueueTask *m_pQueue; boost::ptr_vectorCThread m_vecWorker; //所有的工作线程的集合 }; #endif //#ifndef __THREAD_POOL_H__ 7.ThreadPool.cpp[cpp] view plain copy#include boost/bind.hpp #include ThreadPool.h #include Exception.h #include Logger.h using namespace Exception; using namespace Log; CThreadPool::CThreadPool(int corePoolSize, int maximumPoolSize, int keepAliveTime, CBlockingQueueTask *pQueue) :m_eStatue(RUNNING) ,m_iCorePoolSize(corePoolSize) ,m_iMaximumPoolSize(maximumPoolSize) ,m_iKeepAliveTime(keepAliveTime) ,m_iCurrentThreadSum(0) ,m_iIdleThreadSum(0) ,m_MutexLock() ,m_Condition(m_MutexLock) ,m_pQueue(pQueue) ,m_vecWorker() { m_vecWorker.clear(); } CThreadPool::~CThreadPool() { m_vecWorker.erase(m_vecWorker.begin(), m_vecWorker.end()); //队列由工厂类创建这里释放掉 if (m_pQueue ! NULL) delete m_pQueue; //这里需要释放队列 } void CThreadPool::execute(const Task task) { //为了简化实现这里没有提供可选的策略。后续有时间可完善 try { if (m_eStatue RUNNING) { m_MutexLock.lock(); if (m_iCorePoolSize 0 || m_iCurrentThreadSum m_iCorePoolSize) { if (m_iIdleThreadSum ! 0) { m_MutexLock.unlock(); //有空闲的工作线程直接放到队列去 (void)addBlockingQueue(task); } else { m_MutexLock.unlock(); //继续分配工作线程 allWorker(task); } } else { if (m_pQueue m_pQueue-full()) { m_MutexLock.unlock(); allWorker(task); } else { m_MutexLock.unlock(); (void)addBlockingQueue(task); } } } else { //线程池处于SHUTDOWN状态此时如果在提交任务 //我们会抛出异常这就是我们使用的默认策略 m_MutexLock.unlock(); throw CException(ThreadPool status SHUTDOWN!); } } catch (const CException ex) { LOG_ERROR Throw exception : ex.what(); } } void CThreadPool::shutdown() { m_MutexLock.lock(); m_eStatue SHUTDOWN; m_Condition.broadcast(); m_MutexLock.unlock(); for (boost::ptr_vectorCThread::iterator iter m_vecWorker.begin(); iter ! m_vecWorker.end(); iter) { iter-join(); } } void CThreadPool::runInWorkerThread() { try { for(;;) { m_MutexLock .lock(); if (m_eStatue SHUTDOWN m_pQueue m_pQueue-empty()) { m_MutexLock.unlock(); return; } m_MutexLock.unlock(); Task task getTask(); if (task NULL) return; task(); //执行任务 } } catch (const CException ex) { LOG_ERROR runInWorkerThread throw exeception ex.what(); } catch (...) { LOG_ERROR runInWorkerThread unknow error; } } Task CThreadPool::getTask() { CMutexLockPart lock(m_MutexLock); while (m_pQueue m_pQueue-empty() m_eStatue RUNNING) { m_iIdleThreadSum; if (m_iKeepAliveTime 0 || (m_iCorePoolSize m_iCurrentThreadSum m_iCorePoolSize)) { m_Condition.wait(); } else { bool bRet m_Condition.waitForSeconds(m_iKeepAliveTime); if (bRet true) { --m_iIdleThreadSum; return NULL; } } --m_iIdleThreadSum; } if (m_pQueue-empty() m_eStatue SHUTDOWN) return NULL; Task task m_pQueue-poll(); if (m_pQueue (m_pQueue-size() 0)) m_Condition.signal(); return task; } void CThreadPool::allWorker(const Task task) { try { CMutexLockPart lock(m_MutexLock); if (m_iCurrentThreadSum m_iMaximumPoolSize) throw CException(Current Threads out of Max Threads); CThread *pWorkerThread new CThread(boost::bind(CThreadPool::runInWorkerThread, this), Worker Thread); pWorkerThread-start(task); m_vecWorker.push_back(pWorkerThread); m_iCurrentThreadSum; } catch (const CException ex) { LOG_ERROR ThreadPool allWorker throw exception ex.what(); } } bool CThreadPool::addBlockingQueue(const Task task) { try { LOG_INFO addBlockingQueue; CMutexLockPart lock(m_MutexLock); if (m_pQueue m_pQueue-full()) return false; if (m_pQueue !m_pQueue-offer(task)) throw CException(ThreadPool add BlockingQueue false); m_Condition.signal(); return true; } catch (const CException ex) { //这种情况应该是不会出现的如果出现了进行日志记录。继续运行 LOG_ERROR ThreadPool addBlockingQueue throw exception ex.what(); return true; } } 8.mainFixedThreadPool.cpp[cpp] view plain copy#include FactoryThreadPool.h #include Logger.h #include stdio.h #include unistd.h #include boost/bind.hpp using namespace Log; //一个计算两个数之和的任务 void Add(int iValue1, int iValue2) { LOG_INFO iValue1 iValue2 iValue1 iValue2; //sleep(1); } //这是固定大小线程池的测试代码这里我们创建了一个固定大小为10的 //线程池该线程池用来执行计算两数之和。 int main(void) { //日志的报警等级为INFO CLogger::setLogLevel(CLogger::DEBUG); //CLogger::setLogLevel(CLogger::INFO); //我们设置日志输出到文件 CLogger::setOutputMode(LOGGER_MODE_LOGFILE); LOG_DEBUG ---------------------start-------------------; LOG_DEBUG FixedThreadPool test start!; CFactoryThreadPool *pFactory new CFactoryThreadPool(); CAbstractThreadPool *pThreadPool pFactory-newFixedThreadPool(10); //向线程池提交200个加法运算的任务 for (int i 0; i 200; i) { pThreadPool-execute(boost::bind(Add, i, i)); } pThreadPool-shutdown(); delete pThreadPool; delete pFactory; LOG_DEBUG ----------------------end--------------------; return 0; } 9.mainCachedThreadPool.cpp[cpp] view plain copy#include FactoryThreadPool.h #include Logger.h #include stdio.h #include unistd.h #include boost/bind.hpp using namespace Log; //一个计算两个数之和的任务 void Add(int iValue1, int iValue2) { LOG_INFO iValue1 iValue2 iValue1 iValue2; sleep(1); } //这是可缓存线程池的测试代码这里我们创建了一个固定大小为10的 //线程池该线程池用来执行计算两数之和。 int main(void) { //日志的报警等级为INFO CLogger::setLogLevel(CLogger::DEBUG); //CLogger::setLogLevel(CLogger::INFO); //我们设置日志输出到文件 CLogger::setOutputMode(LOGGER_MODE_LOGFILE); LOG_DEBUG ---------------------start-------------------; LOG_DEBUG newCachedThreadPool test start!; CFactoryThreadPool *pFactory new CFactoryThreadPool(); CAbstractThreadPool *pThreadPool pFactory-newCachedThreadPool(); //向线程池提交200个加法运算的任务 for (int i 0; i 200; i) { pThreadPool-execute(boost::bind(Add, i, i)); } pThreadPool-shutdown(); delete pThreadPool; delete pFactory; LOG_DEBUG ----------------------end--------------------; return 0; } 10.mainSingleThreadPool.cpp[cpp] view plain copy#include FactoryThreadPool.h #include Logger.h #include stdio.h #include unistd.h #include boost/bind.hpp using namespace Log; //一个计算两个数之和的任务 void Add(int iValue1, int iValue2) { LOG_INFO iValue1 iValue2 iValue1 iValue2; sleep(1); } //这是固定大小线程池的测试代码这里我们创建了一个固定大小为10的 //线程池该线程池用来执行计算两数之和。 int main(void) { //日志的报警等级为INFO CLogger::setLogLevel(CLogger::DEBUG); //CLogger::setLogLevel(CLogger::INFO); //我们设置日志输出到文件 CLogger::setOutputMode(LOGGER_MODE_LOGFILE); LOG_DEBUG ---------------------start-------------------; LOG_DEBUG SingleThreadPool test start!; CFactoryThreadPool *pFactory new CFactoryThreadPool(); CAbstractThreadPool *pThreadPool pFactory-newSingleThreadExecutor(); //向线程池提交200个加法运算的任务 for (int i 0; i 200; i) { pThreadPool-execute(boost::bind(Add, i, i)); } pThreadPool-shutdown(); delete pThreadPool; delete pFactory; LOG_DEBUG ----------------------end--------------------; return 0; } 11.mainThreadPool.cpp[cpp] view plain copy#include ThreadPool.h #include Logger.h #include LinkedBlockingQueue.h #include stdio.h #include unistd.h #include boost/bind.hpp using namespace Log; //一个计算两个数之和的任务 void Add(int iValue1, int iValue2) { LOG_INFO iValue1 iValue2 iValue1 iValue2; sleep(2); } //这是线程池的测试代码这里我们创建了一个大小为2的 //线程池该线程池最大允许的线程数为4 //该线程池用来执行计算两数之和。 int main(void) { //日志的报警等级为INFO CLogger::setLogLevel(CLogger::DEBUG); //CLogger::setLogLevel(CLogger::INFO); //我们设置日志输出到文件 CLogger::setOutputMode(LOGGER_MODE_LOGFILE); LOG_DEBUG ---------------------start-------------------; LOG_DEBUG CommonThreadPool test start!; CThreadPool *pThreadPool new CThreadPool(2, 4, 30, new CLinkedBlockingQueueTask(10)); //向线程池提交200个加法运算的任务 for (int i 0; i 15; i) { pThreadPool-execute(boost::bind(Add, i, i)); } pThreadPool-shutdown(); delete pThreadPool; LOG_DEBUG ----------------------end--------------------; return 0; } 我们共给出了4个测试代码包括三个工厂类创建 的线程池和直接只有CThreadPool创建的线程池的测试。这里没有贴出具体的测试结果。因为测试结果输出是比较多的。