广州营销型网站制作,汕头企业建站系统模板,新站突然网站停止收录,中国建筑网官网云筑网目录
function_wrapper.hpp:
stealing_queue.hpp
thread_pool_steal_hpp 参考#xff1a;《C并发编程实战》
对于thread_pool_steal.hpp的代码有改动#xff0c;不然运行不了
function_wrapper.hpp: //包装可调用对象#xff0c;对外消除对象型别#xff0c;还需要有一…目录
function_wrapper.hpp:
stealing_queue.hpp
thread_pool_steal_hpp 参考《C并发编程实战》
对于thread_pool_steal.hpp的代码有改动不然运行不了
function_wrapper.hpp: //包装可调用对象对外消除对象型别还需要有一个函数调用符 // // //私有成员 // 1.一个虚基类没有的话在3很难定义一个指向实例化类的指针 // struct impl_base{ // 1.一个纯虚函数用来让派生类继承3执行函数 // 2.虚析构函数 // } // 2.因为需要接受任意类型的可调用对象所以内部需要封装一个模板类派生自1 // templatetypename F // struct impl_type{ // 1.成员变量传入的函数 // 2.构造函数移动构造成员变量 // 3.成员函数执行函数 // } // 3.一个指向实例化后的指针在这里看出确实需要一个基类可以通过基类指针构 造派生类指针 //公有成员 // 1.构造函数因为接收的对象是任意类型所以是模板构造函数并且使用万能引 用和完美转发实例化私有成员的3. // 2.()运算符的重载通过调用私有成员2.的3.实现 // 3.默认构造函数 default // 4.移动拷贝构造函数 // 5.移动复制构造函数 //
#ifndef _FUNCTION_WRAPPER_HPP_
#define _FUNCTION_WRAPPER_HPP_#includememory
class function_wrapper
{struct impl_base{virtual void call() 0;virtual ~impl_base(){};}; templatetypename Functionstruct impl_type:impl_base{Function f;impl_type(Function f_):f(std::move(f_)){}void call(){ f(); }};std::unique_ptrimpl_base impl;
public:function_wrapper() default;templatetypename Functionfunction_wrapper(Function f):impl(new impl_typeFunction(std::move(f))) {}void operator()(){impl-call();}function_wrapper(function_wrapper other):impl(std::move(other.impl)) {}function_wrapper operator(function_wrapper other){impl std::move(other.impl);return *this;}function_wrapper(function_wrapper other) delete;function_wrapper(const function_wrapper other) delete;function_wrapper operator(const function_wrapper other)delete;};
#endif
stealing_queue.hpp //可以进行任务窃取的队列 // //私有成员 // 1.一个双端队列pop操作在队头进行steal操作在队尾进行存储的内容为function_wrapper类 // 2.互斥 保证安全因为不存储线程所以不需要条件变量传递线程运行信息 //公有成员 // 1.默认构造函数 // 2.push // 3.try_pop // 4.try_steal基本和pop都一样就是弹出队尾元素//在线程池中定义一个存储窃取 队列的vector,vector的索引代表每个线程的标识这样每个线程就可以通过这个vector访问窃取队列。 // 5.empty
#ifndef _STEALING_QUEUE_HPP
#define _STEALING_QUEUE_HPP#includefunction_wrapper.hpp
#includedeque
#includemutex
class work_stealing_queue
{typedef function_wrapper data_type;std::dequedata_type the_queue;mutable std::mutex mut;
public:work_stealing_queue(){}work_stealing_queue(const work_stealing_queue othre) delete;work_stealing_queue operator(const work_stealing_queue othre) delete;void push(data_type data){std::lock_guardstd::mutex lk(mut);the_queue.push_front(std::move(data));}bool try_pop(data_type data){std::lock_guardstd::mutex lk(mut);if(the_queue.empty())return false;else{data std::move(the_queue.front());the_queue.pop_front();return true;}}bool try_steal(data_type data){std::lock_guardstd::mutex lk(mut);if(the_queue.empty())return false;else{data std::move(the_queue.front());the_queue.pop_back();return true;}}bool empty() const{std::lock_guardstd::mutex lk(mut);return the_queue.empty();}
};#endif
thread_pool_steal_hpp
//私有成员 // 1.控制线程正常运行的原子变量抛出异常设置为true // 2.线程池的池队列基于普通的线程安全队列 // 3.提供索引的队列存储指向窃取队列的指针 // 4.存放线程的队列 // 5.封装可联结线程 // 6.静态本地线程变量 本地线程队列 // 7.静态本地线程变量 索引 // // 8.work_thread工作函数任务函数 // 9.判断能否从 本地线程队列获取任务 // 10.判断能否从 线程池队列获取任务 // 11.判断能否从 其他线程窃取任务 //公有成员 // 1.构造函数初始化原子变量可联结线程类。 // { // try // { // for() // {初始化提供索引的队列 } // for() // {初始化线程} // } // catch // } // 2.提交任务函数//为了获取返回值应该返回future传入各种函数所以应该是模 板函数{ // 1.传入函数打包给pakage_task // 2.获取future // 3.判断是传入本地线程队列还是线程池队列 // 4.return future // } // 3.run_package_task//work_thread的主要部分 // { // if(判断从哪个队列获得任务) // { // task() // } // else 交出cpu时间。 // }
// 4.析构函数
#ifndef _THREAD_POOL_STEAL_HPP_
#define _THREAD_POOL_STEAL_HPP_#include threadsafe_queue.hpp
#include ThreadRAII.h
#include stealing_queue.hpp
#include function_wrapper.hpp
#include stealing_queue.hpp#includeatomic
#includevector
#includememory
#includethread
#includefuture
class thread_pool_steal
{typedef function_wrapper task_type;std::atomic_bool done;threadsafe_queuetask_type pool_work_queue;std::vectorstd::unique_ptrwork_stealing_queue queues;std::vectorstd::thread threads;join_threads joiner;static thread_local work_stealing_queue* local_work_queue;static thread_local unsigned my_index ;void work_thread(unsigned index){my_index index;local_work_queue queues[my_index].get();while(!done){run_pending_task();} }bool pop_from_local(task_type task){return local_work_queue local_work_queue-try_pop(task);}bool pop_from_pool(task_type task){return pool_work_queue.try_pop(task);}bool pop_from_steal(task_type task){work_stealing_queue steal_queue;for(unsigned i 0; iqueues.size(); i){unsigned index (my_index i1)%queues.size();if(queues[index]-try_steal(task)){return true;}}return false;}
public:thread_pool_steal():done(false),joiner(threads){unsigned number std::thread::hardware_concurrency();try{for(unsigned i 0; inumber ;i){queues.push_back(std::unique_ptrwork_stealing_queue(new work_stealing_queue));}for(unsigned i 0; inumber; i){threads.push_back(std::thread(thread_pool_steal::work_thread,this,i));}}catch(...){done true;throw;}}~thread_pool_steal(){done true;}template typename Functionstd::futuretypename std::result_ofFunction()::type submit(Function f){typedef typename std::result_ofFunction()::type result_type;std::packaged_taskresult_type() task(f);std::futureresult_type res(task.get_future());int index 0;for(auto ptr:queues){if(ptr-empty()){ptr-push(std::move(task));index -1;break;}index;}if(index0){pool_work_queue.push(std::move(task));}return res;}void run_pending_task(){task_type task;if(pop_from_local(task) || pop_from_pool(task) || pop_from_steal(task)){task();}else{std::this_thread::yield();}}};
thread_local work_stealing_queue* thread_pool_steal::local_work_queue nullptr;
thread_local unsigned thread_pool_steal::my_index 0;
#endif