企业信息查询系统官网山东,seo wordpress 主题,物流公司哪家便宜又好,wordpress 文章目录插件目录
学习目标
1线程池
2.UDP通信
3本地socket通信 学习目标 了解线程池模型的设计思想能看懂线程池实现源码掌握tcp和udp的优缺点和使用场景说出udp服务器通信流程说出udp客户端通信流程独立实现udp服务器代码独立实现udp客户端代码熟练掌握本地套接字进行本地进程通信
1…目录
学习目标
1线程池
2.UDP通信
3本地socket通信 学习目标 了解线程池模型的设计思想能看懂线程池实现源码掌握tcp和udp的优缺点和使用场景说出udp服务器通信流程说出udp客户端通信流程独立实现udp服务器代码独立实现udp客户端代码熟练掌握本地套接字进行本地进程通信
1线程池
什么是线程池?
是一个抽象的概念, 若干个线程组合到一起, 形成线程池.
为什么需要线程池?
多线程版服务器一个客户端就需要创建一个线程! 若客户端太多, 显然不太合适.
什么时候需要创建线程池呢简单的说如果一个应用需要频繁的创建和销毁线程而任务执行的时间又非常短这样线程创建和销毁的带来的开销就不容忽视这时也是线程池该出场的机会了。如果线程创建和销毁时间相比任务执行时间可以忽略不计则没有必要使用线程池了。
实现的时候类似于生产者和消费者.
线程池和任务池:
任务池相当于共享资源, 所以需要使用互斥锁, 当任务池中没有任务的时候需要让线程阻塞, 所以需要使用条件变量.
如何让线程执行不同的任务?
使用回到函数, 在任务中设置任务执行函数, 这样可以起到不同的任务执行不同的函数.
通过阅读线程池代码思考如下问题?
熟悉结构体 threadpool_t线程池如何创建起来? 各种初始化mallocpthread_createpthread_cond_init pthread_mutex_init线程池内都有几类线程? 2类管理线程工作线程管理者线程的任务是什么?任务如何实现? 任务是添加线程或者删除线程通过2个算法删除线程 wait_exit_thr_num 10工作线程如何工作? 等待有任务抢到任务修改busy_thr_num 执行任务 修改 busy_thr_num --线程池是如何销毁的? 自爆shutdown 诱杀 讲解代码threadpoolsimple.c 讲解代码 pthreadpool.c
threadsimplepool.h
#ifndef _THREADPOOL_H
#define _THREADPOOL_H#include stdio.h
#include stdlib.h
#include string.h
#include sys/types.h
#include unistd.h
#include pthread.htypedef struct _PoolTask
{int tasknum;//模拟任务编号void *arg;//回调函数参数void (*task_func)(void *arg);//任务的回调函数
}PoolTask ;typedef struct _ThreadPool
{int max_job_num;//最大任务个数int job_num;//实际任务个数PoolTask *tasks;//任务队列数组int job_push;//入队位置int job_pop;// 出队位置int thr_num;//线程池内线程个数pthread_t *threads;//线程池内线程数组int shutdown;//是否关闭线程池pthread_mutex_t pool_lock;//线程池的锁pthread_cond_t empty_task;//任务队列为空的条件pthread_cond_t not_empty_task;//任务队列不为空的条件}ThreadPool;void create_threadpool(int thrnum,int maxtasknum);//创建线程池--thrnum 代表线程个数maxtasknum 最大任务个数
void destroy_threadpool(ThreadPool *pool);//摧毁线程池
void addtask(ThreadPool *pool);//添加任务到线程池
void taskRun(void *arg);//任务回调函数#endifthreadsimplepool.c
//简易版线程池
#include threadpoolsimple.hThreadPool *thrPool NULL;int beginnum 1000;void *thrRun(void *arg)
{//printf(begin call %s-----\n,__FUNCTION__);ThreadPool *pool (ThreadPool*)arg;int taskpos 0;//任务位置PoolTask *task (PoolTask *)malloc(sizeof(PoolTask));while(1){//获取任务先要尝试加锁pthread_mutex_lock(thrPool-pool_lock);//无任务并且线程池不是要摧毁while(thrPool-job_num 0 !thrPool-shutdown ){//如果没有任务线程会阻塞pthread_cond_wait(thrPool-not_empty_task,thrPool-pool_lock);}if(thrPool-job_num){//有任务需要处理taskpos (thrPool-job_pop)%thrPool-max_job_num;//printf(task out %d...tasknum%d tid%lu\n,taskpos,thrPool-tasks[taskpos].tasknum,pthread_self());//为什么要拷贝避免任务被修改生产者会添加任务memcpy(task,thrPool-tasks[taskpos],sizeof(PoolTask));task-arg task;thrPool-job_num--;//task thrPool-tasks[taskpos];pthread_cond_signal(thrPool-empty_task);//通知生产者}if(thrPool-shutdown){//代表要摧毁线程池此时线程退出即可//pthread_detach(pthread_self());//临死前分家pthread_mutex_unlock(thrPool-pool_lock);free(task);pthread_exit(NULL);}//释放锁pthread_mutex_unlock(thrPool-pool_lock);task-task_func(task-arg);//执行回调函数}//printf(end call %s-----\n,__FUNCTION__);
}//创建线程池
void create_threadpool(int thrnum,int maxtasknum)
{printf(begin call %s-----\n,__FUNCTION__);thrPool (ThreadPool*)malloc(sizeof(ThreadPool));thrPool-thr_num thrnum;thrPool-max_job_num maxtasknum;thrPool-shutdown 0;//是否摧毁线程池1代表摧毁thrPool-job_push 0;//任务队列添加的位置thrPool-job_pop 0;//任务队列出队的位置thrPool-job_num 0;//初始化的任务个数为0thrPool-tasks (PoolTask*)malloc((sizeof(PoolTask)*maxtasknum));//申请最大的任务队列//初始化锁和条件变量pthread_mutex_init(thrPool-pool_lock,NULL);pthread_cond_init(thrPool-empty_task,NULL);pthread_cond_init(thrPool-not_empty_task,NULL);int i 0;thrPool-threads (pthread_t *)malloc(sizeof(pthread_t)*thrnum);//申请n个线程id的空间pthread_attr_t attr;pthread_attr_init(attr);pthread_attr_setdetachstate(attr, PTHREAD_CREATE_DETACHED);for(i 0;i thrnum;i){pthread_create(thrPool-threads[i],attr,thrRun,(void*)thrPool);//创建多个线程}//printf(end call %s-----\n,__FUNCTION__);
}
//摧毁线程池
void destroy_threadpool(ThreadPool *pool)
{pool-shutdown 1;//开始自爆pthread_cond_broadcast(pool-not_empty_task);//诱杀 int i 0;for(i 0; i pool-thr_num ; i){pthread_join(pool-threads[i],NULL);}pthread_cond_destroy(pool-not_empty_task);pthread_cond_destroy(pool-empty_task);pthread_mutex_destroy(pool-pool_lock);free(pool-tasks);free(pool-threads);free(pool);
}//添加任务到线程池
void addtask(ThreadPool *pool)
{//printf(begin call %s-----\n,__FUNCTION__);pthread_mutex_lock(pool-pool_lock);//实际任务总数大于最大任务个数则阻塞等待(等待任务被处理)while(pool-max_job_num pool-job_num){pthread_cond_wait(pool-empty_task,pool-pool_lock);}int taskpos (pool-job_push)%pool-max_job_num;//printf(add task %d tasknum%d\n,taskpos,beginnum);pool-tasks[taskpos].tasknum beginnum;pool-tasks[taskpos].arg (void*)pool-tasks[taskpos];pool-tasks[taskpos].task_func taskRun;pool-job_num;pthread_mutex_unlock(pool-pool_lock);pthread_cond_signal(pool-not_empty_task);//通知包身工//printf(end call %s-----\n,__FUNCTION__);
}//任务回调函数
void taskRun(void *arg)
{PoolTask *task (PoolTask*)arg;int num task-tasknum;printf(task %d is runing %lu\n,num,pthread_self());sleep(1);printf(task %d is done %lu\n,num,pthread_self());
}int main()
{create_threadpool(3,20);int i 0;for(i 0;i 50 ; i){addtask(thrPool);//模拟添加任务}sleep(20);destroy_threadpool(thrPool);return 0;
}2.UDP通信
TCP传输控制协议, 面向连接的稳定的可靠的安全的数据流传递
稳定和可靠: 丢包重传
数据有序: 序号和确认序号
流量控制: 滑动窗口
UDP用户数据报协议
面向无连接的不稳定不可靠不安全的数据报传递---更像是收发短信
UDP传输不需要建立连接传输效率更高在稳定的局域网内环境相对可靠
UDP通信相关函数介绍: ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags,struct sockaddr *src_addr, socklen_t *addrlen); 函数说明: 接收消息
参数说明:
sockfd 套接字buf 要接受的缓冲区len 缓冲区的长度flags 标志位 一般填0src_addr 原地址 传出参数 addrlen 发送方地址长度 返回值
成功: 返回读到的字节数
失败: 返回 -1 设置errno 调用该函数相当于TCP通信的recvaccept函数 ssize_t sendto(int sockfd, const void *buf, size_t len, int flags,const struct sockaddr *dest_addr, socklen_t addrlen); 函数说明: 发送数据
参数说明:
sockfd 套接字
dest_addr 目的地址addrlen 目的地址长度返回值
成功: 返回写入的字节数
失败: 返回-1设置errno 通过man 2 bind, 可以查看bind函数的相关信息, 后面还有示例代码, 可以参考.
本地套接字服务器的流程
可以使用TCP的方式, 必须按照tcp的流程 也可以使用UDP的方式, 必须按照udp的流程
tcp的本地套接字服务器流程
创建套接字 socket(AF_UNIX,SOCK_STREAM,0)绑定 struct sockaddr_un 强转侦听 listen获得新连接 accept循环通信 read-write关闭文件描述符 close
tcp本地套接字客户端流程
调用socket创建套接字调用bind函数将socket文件描述和socket文件进行绑定.
不是必须的, 若无显示绑定会进行隐式绑定但服务器不知道谁连接了.
调用connect函数连接服务端循环通信read-write关闭文件描述符 close
编写代码并进行测试
编写udp代码并进行测试
测试:
多开器几个客户端经过测试表明:, udp天然支持多客户端, 这点和TCP不同, TCP需要维护连接.
//udp服务端
#include stdio.h
#include stdlib.h
#include string.h
#include sys/types.h
#include unistd.h
#include arpa/inet.h
#include netinet/in.h
#include ctype.hint main()
{//创建socketint cfd socket(AF_INET, SOCK_DGRAM, 0);if(cfd0){perror(socket error);return -1;}//绑定struct sockaddr_in serv;struct sockaddr_in client;bzero(serv, sizeof(serv));serv.sin_family AF_INET;serv.sin_port htons(8888);serv.sin_addr.s_addr htonl(INADDR_ANY);bind(cfd, (struct sockaddr *)serv, sizeof(serv));int i;int n;socklen_t len;char buf[1024];while(1){//读取数据memset(buf, 0x00, sizeof(buf));len sizeof(client);n recvfrom(cfd, buf, sizeof(buf), 0, (struct sockaddr *)client, len);//将大写转换为小写for(i0; in; i){buf[i] toupper(buf[i]);}printf([%d]:n[%d], buf[%s]\n, ntohs(client.sin_port), n, buf);//发送数据sendto(cfd, buf, n, 0, (struct sockaddr *)client, len);}//关闭套接字close(cfd);return 0;
}使用nc命令进行测试: nc -u 127.1 8888
//udp客户端
#include stdio.h
#include stdlib.h
#include string.h
#include sys/types.h
#include unistd.h
#include arpa/inet.h
#include netinet/in.h
#include ctype.hint main()
{//创建socketint cfd socket(AF_INET, SOCK_DGRAM, 0);if(cfd0){perror(socket error);return -1;}int n;char buf[1024];struct sockaddr_in serv;serv.sin_family AF_INET;serv.sin_port htons(8888);inet_pton(AF_INET, 127.0.0.1, serv.sin_addr.s_addr);while(1){//读标准输入数据memset(buf, 0x00, sizeof(buf));n read(STDIN_FILENO, buf, sizeof(buf));//发送数据sendto(cfd, buf, n, 0, (struct sockaddr *)serv, sizeof(serv));//读取数据memset(buf, 0x00, sizeof(buf));n recvfrom(cfd, buf, sizeof(buf), 0, NULL, NULL);printf(n[%d], buf[%s]\n, n, buf);}//关闭套接字close(cfd);return 0;
}3本地socket通信
回顾一些linux系统有哪些文件类型?
回顾一些linux系统下有哪些常见的IPC机制?
通过查询: man 7 unix 可以查到unix本地域socket通信相关信息:
#include sys/socket.h
#include sys/un.h
int socket(int domain, int type, int protocol);
函数说明: 创建本地域socket
函数参数:
domain: AF_UNIX or AF_LOCAL
type: SOCK_STREAM或者SOCK_DGRAM
protocol: 0 表示使用默认协议
函数返回值:
成功: 返回文件描述符.
失败: 返回-1, 并设置errno值.
创建socket成功以后, 会在内核创建缓冲区, 下图是客户端和服务端内核缓冲区示意图. int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
函数说明: 绑定套接字
函数参数:
socket: 由socket函数返回的文件描述符
addr: 本地地址
addlen: 本地地址长度
函数返回值:
成功: 返回文件描述符.
失败: 返回-1, 并设置errno值. 需要注意的是: bind函数会自动创建socket文件, 若在调用bind函数之前socket文件已经存在, 则调用bind会报错, 可以使用unlink函数在bind之前先删除文件. struct sockaddr_un { sa_family_t sun_family; /* AF_UNIX or AF_LOCAL*/ char sun_path[108]; /* pathname */ }; 通过man 2 bind, 可以查看bind函数的相关信息, 后面还有示例代码, 可以参考.
本地套接字服务器的流程
可以使用TCP的方式, 必须按照tcp的流程 也可以使用UDP的方式, 必须按照udp的流程
tcp的本地套接字服务器流程
创建套接字 socket(AF_UNIX,SOCK_STREAM,0)绑定 struct sockaddr_un 强转侦听 listen获得新连接 accept循环通信 read-write关闭文件描述符 close
tcp本地套接字客户端流程
调用socket创建套接字调用bind函数将socket文件描述和socket文件进行绑定.
不是必须的, 若无显示绑定会进行隐式绑定但服务器不知道谁连接了.
调用connect函数连接服务端循环通信read-write关闭文件描述符 close 编写代码并进行测试
测试客户端工具:
man nc
-U Specifies to use UNIX-domain sockets. 例如: nc -U sock.s size offsetof(struct sockaddr_un, sun_path) strlen(un.sun_path);
#define offsetof(type, member) ((int)((type *)0)-member) //本地socket通信服务端
#include stdio.h
#include stdlib.h
#include string.h
#include sys/types.h
#include unistd.h
#include arpa/inet.h
#include netinet/in.h
#include sys/un.hint main()
{//创建socketint lfd socket(AF_UNIX, SOCK_STREAM, 0);if(lfd0){perror(socket error);return -1;}//删除socket文件,避免bind失败unlink(./server.sock);//绑定bindstruct sockaddr_un serv;bzero(serv, sizeof(serv));serv.sun_family AF_UNIX;strcpy(serv.sun_path, ./server.sock); int ret bind(lfd, (struct sockaddr *)serv, sizeof(serv));if(ret0){perror(bind error);return -1;}//监听listenlisten(lfd, 10);//接收新的连接-acceptstruct sockaddr_un client;bzero(client, sizeof(client));int len sizeof(client);int cfd accept(lfd, (struct sockaddr *)client, len);if(cfd0){perror(accept error); return -1;}printf(client-[%s]\n, client.sun_path);int n;char buf[1024];while(1){//读数据memset(buf, 0x00, sizeof(buf)); n read(cfd, buf, sizeof(buf));if(n0){printf(read error or client close, n[%d]\n, n);break;}printf(n[%d], buf[%s]\n, n, buf);//发送数据write(cfd, buf, n);}close(lfd);return 0;
} //本地socket通信客户端
#include stdio.h
#include stdlib.h
#include string.h
#include sys/types.h
#include unistd.h
#include arpa/inet.h
#include netinet/in.h
#include sys/un.hint main()
{//创建socketint cfd socket(AF_UNIX, SOCK_STREAM, 0);if(cfd0){perror(socket error);return -1;}//删除socket文件,避免bind失败unlink(./client.sock);//绑定bindstruct sockaddr_un client;bzero(client, sizeof(client));client.sun_family AF_UNIX;strcpy(client.sun_path, ./client.sock); int ret bind(cfd, (struct sockaddr *)client, sizeof(client));if(ret0){perror(bind error);return -1;}struct sockaddr_un serv;bzero(serv, sizeof(serv));serv.sun_family AF_UNIX;strcpy(serv.sun_path, ./server.sock);ret connect(cfd, (struct sockaddr *)serv, sizeof(serv));if(ret0){perror(connect error); return -1;}int n;char buf[1024];while(1){memset(buf, 0x00, sizeof(buf));n read(STDIN_FILENO, buf, sizeof(buf));//发送数据write(cfd, buf, n);//读数据memset(buf, 0x00, sizeof(buf)); n read(cfd, buf, sizeof(buf));if(n0){printf(read error or client close, n[%d]\n, n);break;}printf(n[%d], buf[%s]\n, n, buf);}close(cfd);return 0;
} 测试客户端工具: man nc -U Specifies to use UNIX-domain sockets. 例如: nc -U sock.s size offsetof(struct sockaddr_un, sun_path) strlen(un.sun_path); #define offsetof(type, member) ((int)((type *)0)-member) //线程池完整版本
#ifndef __THREADPOOL_H_
#define __THREADPOOL_H_typedef struct threadpool_t threadpool_t;/*** function threadpool_create* descCreates a threadpool_t object.* param thr_num thread num* param max_thr_num max thread size* param queue_max_size size of the queue.* return a newly created thread pool or NULL*/
threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);/*** function threadpool_add* desc add a new task in the queue of a thread pool* param pool Thread pool to which add the task.* param function Pointer to the function that will perform the task.* param argument Argument to be passed to the function.* return 0 if all goes well,else -1*/
int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg);/*** function threadpool_destroy* desc Stops and destroys a thread pool.* param pool Thread pool to destroy.* return 0 if destory success else -1*/
int threadpool_destroy(threadpool_t *pool);/*** desc get the thread num* pool pool threadpool* return # of the thread*/
int threadpool_all_threadnum(threadpool_t *pool);/*** desc get the busy thread num* param pool threadpool* return # of the busy thread*/
int threadpool_busy_threadnum(threadpool_t *pool);#endif#include stdlib.h
#include pthread.h
#include unistd.h
#include assert.h
#include stdio.h
#include string.h
#include signal.h
#include errno.h
#include threadpool.h#define DEFAULT_TIME 10 /*10s检测一次*/
#define MIN_WAIT_TASK_NUM 10 /*如果queue_size MIN_WAIT_TASK_NUM 添加新的线程到线程池*/
#define DEFAULT_THREAD_VARY 10 /*每次创建和销毁线程的个数*/
#define true 1
#define false 0typedef struct
{void *(*function)(void *); /* 函数指针回调函数 */void *arg; /* 上面函数的参数 */
} threadpool_task_t; /* 各子线程任务结构体 *//* 描述线程池相关信息 */
struct threadpool_t
{pthread_mutex_t lock; /* 用于锁住本结构体 */ pthread_mutex_t thread_counter; /* 记录忙状态线程个数de琐 -- busy_thr_num */pthread_cond_t queue_not_full; /* 当任务队列满时添加任务的线程阻塞等待此条件变量 */pthread_cond_t queue_not_empty; /* 任务队列里不为空时通知等待任务的线程 */pthread_t *threads; /* 存放线程池中每个线程的tid。数组 */pthread_t adjust_tid; /* 存管理线程tid */threadpool_task_t *task_queue; /* 任务队列(数组首地址) */int min_thr_num; /* 线程池最小线程数 */int max_thr_num; /* 线程池最大线程数 */int live_thr_num; /* 当前存活线程个数 */int busy_thr_num; /* 忙状态线程个数 */int wait_exit_thr_num; /* 要销毁的线程个数 */int queue_front; /* task_queue队头下标 */int queue_rear; /* task_queue队尾下标 */int queue_size; /* task_queue队中实际任务数 */int queue_max_size; /* task_queue队列可容纳任务数上限 */int shutdown; /* 标志位线程池使用状态true或false */
};void *threadpool_thread(void *threadpool);void *adjust_thread(void *threadpool);int is_thread_alive(pthread_t tid);
int threadpool_free(threadpool_t *pool);//threadpool_create(3,100,100);
threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
{int i;threadpool_t *pool NULL;do {if((pool (threadpool_t *)malloc(sizeof(threadpool_t))) NULL) { printf(malloc threadpool fail);break; /*跳出do while*/}pool-min_thr_num min_thr_num;pool-max_thr_num max_thr_num;pool-busy_thr_num 0;pool-live_thr_num min_thr_num; /* 活着的线程数 初值最小线程数 */pool-wait_exit_thr_num 0;pool-queue_size 0; /* 有0个产品 */pool-queue_max_size queue_max_size;pool-queue_front 0;pool-queue_rear 0;pool-shutdown false; /* 不关闭线程池 *//* 根据最大线程上限数 给工作线程数组开辟空间, 并清零 */pool-threads (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num); if (pool-threads NULL) {printf(malloc threads fail);break;}memset(pool-threads, 0, sizeof(pthread_t)*max_thr_num);/* 队列开辟空间 */pool-task_queue (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);if (pool-task_queue NULL) {printf(malloc task_queue fail\n);break;}/* 初始化互斥琐、条件变量 */if (pthread_mutex_init((pool-lock), NULL) ! 0|| pthread_mutex_init((pool-thread_counter), NULL) ! 0|| pthread_cond_init((pool-queue_not_empty), NULL) ! 0|| pthread_cond_init((pool-queue_not_full), NULL) ! 0){printf(init the lock or cond fail\n);break;}//启动工作线程pthread_attr_t attr;pthread_attr_init(attr);pthread_attr_setdetachstate(attr, PTHREAD_CREATE_DETACHED);for (i 0; i min_thr_num; i) {pthread_create((pool-threads[i]), attr, threadpool_thread, (void *)pool);/*pool指向当前线程池*/printf(start thread 0x%x...\n, (unsigned int)pool-threads[i]);}//创建管理者线程pthread_create((pool-adjust_tid), attr, adjust_thread, (void *)pool);return pool;} while (0);/* 前面代码调用失败时,释放poll存储空间 */threadpool_free(pool);return NULL;
}/* 向线程池中 添加一个任务 */
//threadpool_add(thp, process, (void*)num[i]); /* 向线程池中添加任务 process: 小写----大写*/int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg)
{pthread_mutex_lock((pool-lock));/* 为真队列已经满 调wait阻塞 */while ((pool-queue_size pool-queue_max_size) (!pool-shutdown)) {pthread_cond_wait((pool-queue_not_full), (pool-lock));}if (pool-shutdown) {pthread_cond_broadcast((pool-queue_not_empty));pthread_mutex_unlock((pool-lock));return 0;}/* 清空 工作线程 调用的回调函数 的参数arg */if (pool-task_queue[pool-queue_rear].arg ! NULL) {pool-task_queue[pool-queue_rear].arg NULL;}/*添加任务到任务队列里*/pool-task_queue[pool-queue_rear].function function;pool-task_queue[pool-queue_rear].arg arg;pool-queue_rear (pool-queue_rear 1) % pool-queue_max_size; /* 队尾指针移动, 模拟环形 */pool-queue_size;/*添加完任务后队列不为空唤醒线程池中 等待处理任务的线程*/pthread_cond_signal((pool-queue_not_empty));pthread_mutex_unlock((pool-lock));return 0;
}/* 线程池中各个工作线程 */
void *threadpool_thread(void *threadpool)
{threadpool_t *pool (threadpool_t *)threadpool;threadpool_task_t task;while (true) {/* Lock must be taken to wait on conditional variable *//*刚创建出线程等待任务队列里有任务否则阻塞等待任务队列里有任务后再唤醒接收任务*/pthread_mutex_lock((pool-lock));/*queue_size 0 说明没有任务调 wait 阻塞在条件变量上, 若有任务跳过该while*/while ((pool-queue_size 0) (!pool-shutdown)) { printf(thread 0x%x is waiting\n, (unsigned int)pthread_self());pthread_cond_wait((pool-queue_not_empty), (pool-lock));//暂停到这/*清除指定数目的空闲线程如果要结束的线程个数大于0结束线程*/if (pool-wait_exit_thr_num 0) {pool-wait_exit_thr_num--;/*如果线程池里线程个数大于最小值时可以结束当前线程*/if (pool-live_thr_num pool-min_thr_num) {printf(thread 0x%x is exiting\n, (unsigned int)pthread_self());pool-live_thr_num--;pthread_mutex_unlock((pool-lock));//pthread_detach(pthread_self());pthread_exit(NULL);}}}/*如果指定了true要关闭线程池里的每个线程自行退出处理---销毁线程池*/if (pool-shutdown) {pthread_mutex_unlock((pool-lock));printf(thread 0x%x is exiting\n, (unsigned int)pthread_self());//pthread_detach(pthread_self());pthread_exit(NULL); /* 线程自行结束 */}/*从任务队列里获取任务, 是一个出队操作*/task.function pool-task_queue[pool-queue_front].function;task.arg pool-task_queue[pool-queue_front].arg;pool-queue_front (pool-queue_front 1) % pool-queue_max_size; /* 出队模拟环形队列 */pool-queue_size--;/*通知可以有新的任务添加进来*/pthread_cond_broadcast((pool-queue_not_full));/*任务取出后立即将 线程池琐 释放*/pthread_mutex_unlock((pool-lock));/*执行任务*/ printf(thread 0x%x start working\n, (unsigned int)pthread_self());pthread_mutex_lock((pool-thread_counter)); /*忙状态线程数变量琐*/pool-busy_thr_num; /*忙状态线程数1*/pthread_mutex_unlock((pool-thread_counter));(*(task.function))(task.arg); /*执行回调函数任务*///task.function(task.arg); /*执行回调函数任务*//*任务结束处理*/ printf(thread 0x%x end working\n, (unsigned int)pthread_self());pthread_mutex_lock((pool-thread_counter));pool-busy_thr_num--; /*处理掉一个任务忙状态数线程数-1*/pthread_mutex_unlock((pool-thread_counter));}pthread_exit(NULL);
}/* 管理线程 */
void *adjust_thread(void *threadpool)
{int i;threadpool_t *pool (threadpool_t *)threadpool;while (!pool-shutdown) {sleep(DEFAULT_TIME); /*定时 对线程池管理*/pthread_mutex_lock((pool-lock));int queue_size pool-queue_size; /* 关注 任务数 */int live_thr_num pool-live_thr_num; /* 存活 线程数 */pthread_mutex_unlock((pool-lock));pthread_mutex_lock((pool-thread_counter));int busy_thr_num pool-busy_thr_num; /* 忙着的线程数 */pthread_mutex_unlock((pool-thread_counter));/* 创建新线程 算法 任务数大于最小线程池个数, 且存活的线程数少于最大线程个数时 如3010 40100*/if (queue_size MIN_WAIT_TASK_NUM live_thr_num pool-max_thr_num) {pthread_mutex_lock((pool-lock)); int add 0;/*一次增加 DEFAULT_THREAD 个线程*/for (i 0; i pool-max_thr_num add DEFAULT_THREAD_VARY pool-live_thr_num pool-max_thr_num; i) {if (pool-threads[i] 0 || !is_thread_alive(pool-threads[i])) {pthread_create((pool-threads[i]), NULL, threadpool_thread, (void *)pool);add;pool-live_thr_num;}}pthread_mutex_unlock((pool-lock));}/* 销毁多余的空闲线程 算法忙线程X2 小于 存活的线程数 且 存活的线程数 大于 最小线程数时*/if ((busy_thr_num * 2) live_thr_num live_thr_num pool-min_thr_num) {/* 一次销毁DEFAULT_THREAD个线程, 隨機10個即可 */pthread_mutex_lock((pool-lock));pool-wait_exit_thr_num DEFAULT_THREAD_VARY; /* 要销毁的线程数 设置为10 */pthread_mutex_unlock((pool-lock));for (i 0; i DEFAULT_THREAD_VARY; i) {/* 通知处在空闲状态的线程, 他们会自行终止*/pthread_cond_signal((pool-queue_not_empty));}}}return NULL;
}int threadpool_destroy(threadpool_t *pool)
{int i;if (pool NULL) {return -1;}pool-shutdown true;/*先销毁管理线程*///pthread_join(pool-adjust_tid, NULL);for (i 0; i pool-live_thr_num; i) {/*通知所有的空闲线程*/pthread_cond_broadcast((pool-queue_not_empty));}/*for (i 0; i pool-live_thr_num; i) {pthread_join(pool-threads[i], NULL);}*/threadpool_free(pool);return 0;
}int threadpool_free(threadpool_t *pool)
{if (pool NULL) {return -1;}if (pool-task_queue) {free(pool-task_queue);}if (pool-threads) {free(pool-threads);pthread_mutex_lock((pool-lock));pthread_mutex_destroy((pool-lock));pthread_mutex_lock((pool-thread_counter));pthread_mutex_destroy((pool-thread_counter));pthread_cond_destroy((pool-queue_not_empty));pthread_cond_destroy((pool-queue_not_full));}free(pool);pool NULL;return 0;
}int threadpool_all_threadnum(threadpool_t *pool)
{int all_threadnum -1;pthread_mutex_lock((pool-lock));all_threadnum pool-live_thr_num;pthread_mutex_unlock((pool-lock));return all_threadnum;
}int threadpool_busy_threadnum(threadpool_t *pool)
{int busy_threadnum -1;pthread_mutex_lock((pool-thread_counter));busy_threadnum pool-busy_thr_num;pthread_mutex_unlock((pool-thread_counter));return busy_threadnum;
}int is_thread_alive(pthread_t tid)
{int kill_rc pthread_kill(tid, 0); //发0号信号测试线程是否存活if (kill_rc ESRCH) {return false;}return true;
}/*测试*/ #if 1
/* 线程池中的线程模拟处理业务 */
void *process(void *arg)
{printf(thread 0x%x working on task %d\n ,(unsigned int)pthread_self(),*(int *)arg);sleep(1);printf(task %d is end\n, *(int *)arg);return NULL;
}int main(void)
{/*threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);*/threadpool_t *thp threadpool_create(3,100,100); /*创建线程池池里最小3个线程最大100队列最大100*/printf(pool inited);//int *num (int *)malloc(sizeof(int)*20);int num[20], i;for (i 0; i 20; i) {num[i]i;printf(add task %d\n,i);threadpool_add(thp, process, (void*)num[i]); /* 向线程池中添加任务 */}sleep(10); /* 等子线程完成任务 */threadpool_destroy(thp);return 0;
}#endif