LV100-线程池
本文主要是线程的应用——线程池的相关笔记,若笔记中有错误或者不合适的地方,欢迎批评指正😃。
一、线程池原理
我们需要使用线程的时候就去创建一个线程,不需要就将线程销毁,但是就会有这样一个问题:如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程都是需要时间的。
这个时候我们就可以使用线程池来解决这个问题了。线程池的主要思想是:在进程开始时创建一定数量的线程,并加到线程池中以等待工作。当服务器收到请求时,它会唤醒池内的一个线程(如果有可用线程),并将需要服务的请求传递给它。一旦线程完成了服务,它会返回到池中再等待工作。如果线程池内没有可用线程,那么服务器会等待,直到有空线程为止。

二、线程池实现
1. 结构体实现
1.1 任务结构体
我们将使用链表构建一个任务队列,每个任务的结构体成员设计如下:
c
typedef struct Task
{
void *(*func)(void *arg);/* 函数指针,指向需要执行的任务函数 */
void *taskArg; /* 任务参数,传递给任务函数的参数 */
struct Task *next; /* 指向下一个任务的指针变量 */
} Task;【成员说明】
func,函数指针变量,可以指向一个返回值为void *的带有一个void *类型参数的真实任务函数。taskArg:void类型的指针变量,表示传给执行真实任务的函数的参数。next:指针域,指向下一个任务结构体。
1.2线程池结构体
我们也是通过链表来存储线程池中的各个工作线程,每个工作线程的结构体成员设计如下:
c
#define POOL_NUM 10 /* 线程池最大线程数 */
typedef struct ThreadPool
{
pthread_mutex_t taskLock;/* 互斥锁 */
pthread_cond_t newTask; /* 条件变量 */
pthread_t tid[POOL_NUM]; /* 工作线程的线程号 tid */
Task *queueHead; /* 任务队列头指针 */
int busyWork; /* 线程池中已使用线程数 */
}ThreadPool;1.3两个结构体联系
上边两个结构体的联系如下图所示:

2. 线程池初始化
2.1 实现步骤
- (1)申请线程池的内存空间并判断是否申请成功;
- (2)对线程池的各个参数进行初始化;
- (3)创建线程,这里要注意不要超过系统可创建的最大线程数。
2.2 函数实现
注意先定义一个指向线程池的全局指针变量:
c
ThreadPool * pool = NULL;/* 定义一个线程池,需要申请内存 */函数实现如下:
c
/**
* @Function: void threadPoolInit(void)
* @Description: 线程池初始化
* @param : none
* @return : 返回一个整型数值
* 0,成功;
* -1,失败
*/
int threadPoolInit(void)
{
int i = 0;
/* 1.申请线程池的内存空间 */
pool = (ThreadPool *)malloc(sizeof(ThreadPool));
/* 2. 判断是否申请成功 */
if( pool == NULL)
{
printf("ThreadPool malloc failed!\n");
return -1;
}
/* 3. 初始化互斥锁 */
pthread_mutex_init(&pool->taskLock,NULL);
/* 4. 初始化条件变量 */
pthread_cond_init(&pool->newTask,NULL);
/* 5. 初始化链式任务队列指针 */
pool->queueHead = NULL;
/* 6. 初始化线程池已占用线程数量 */
pool->busyWork=0;
/* 7. 创建所有的工作线程 */
for(i = 0; i < POOL_NUM; i++)
{
pthread_create(&pool->tid[i], NULL, workThread, (void *)i);
}
return 0;
}3. 线程开始函数
3.1 实现步骤
线程被创建之后,就开始运行线程开始函数,在线程开始函数中我们要做的事有:
- (1)获取互斥锁
- (2)等待条件变量,就是等待任务队列中有任务产生;
- (3)收到信号时,从任务队列取出一个任务;
- (4)释放互斥锁;
- (5)传入任务参数,执行任务函数;
- (6)执行完毕后,更新线程池中已被占用的线程数量。
3.2 函数实现
c
/**
* @Function: void *workThread(void *arg)
* @Description: 线程池中线程的运行函数
* @param arg : 线程创建函数传递给工作线程的参数
* @return : 返回一个void指针类型数据
*/
void *workThread(void *arg)
{
/* 定义一个任务结构体指针变量 */
Task *ptask;
printf("This is %d thread\n", (int)arg);
while(1)
{
/* 获取互斥锁 */
pthread_mutex_lock(&pool->taskLock);
/* 等待条件变量信号(收到信号时会唤醒至少一个线程) */
pthread_cond_wait(&pool->newTask, &pool->taskLock);
printf("Thread [%ld] execution,working[%d]!",pthread_self(), pool->busyWork);
/* 从链式任务队列取出一个任务 */
ptask = pool->queueHead;/* 指向任务队列的头节点 */
pool->queueHead = pool->queueHead->next;/* 头指针指向下一个任务节点 */
/* 解除互斥锁 */
pthread_mutex_unlock(&pool->taskLock);
/* 线程执行任务函数 */
ptask->func(ptask->taskArg);
/* 更新线程池已占用线程数量(任务函数执行完毕后,线程空闲) */
pool->busyWork--;
}
return (void *)0;
}4. 添加任务
4.1 实现步骤
- (1)获取互斥锁
- (2)判断线程池中是否还有空闲的线程可以执行任务,若没有则释放互斥锁,休眠等待,进行下一轮判断前要先获取已经释放的互斥锁;
- (3)申请新任务节点内存,并判断是否申请成功;
- (4)对新任务进行初始化;
- (5)添加新任务到任务队列,任务队列是链表,注意插入的方式;
- (6)新任务到来,需要唤醒一个线程,所以这里已占用线程数需要更新一下;
- (7)发送有新任务的信号,至少唤醒一个空闲线程即可。
4.2 函数实现
c
/**
* @Function: int poolAddTask(int taskArg)
* @Description: 向任务队列中添加新的任务节点
* @param taskArg : 递给实际执行的任务函数的参数
* @return : 返回一个整型数值
* 0,成功;
* -1,失败
*/
int poolAddTask(int taskArg)
{
/* 1. 定义两个任务链表结构体指针变量 */
Task * newTask;
Task * p;
pthread_mutex_lock(&pool->taskLock);/* 获取互斥锁 */
/* 2. 判断线程池中是否还有线程可以执行新添加的任务 */
while(pool->busyWork >= POOL_NUM)
{
pthread_mutex_unlock(&pool->taskLock);/* 解除互斥锁 */
usleep(10000);/* us休眠 */
pthread_mutex_lock(&pool->taskLock); /* 获取互斥锁 */
}
pthread_mutex_unlock(&pool->taskLock); /* 解除互斥锁 */
/* 3. 申请新线程任务节点内存 */
newTask = (Task *)malloc(sizeof(Task));
/* 4. 判断是否申请成功 */
if( newTask == NULL)
{
printf("newTask malloc failed!\n");
return -1;
}
/* 5. 对新建的任务节点进行初始化 */
newTask->func = realWork;/* 线程运行函数中所执行的真正的任务内容函数 */
newTask->taskArg = (void *)taskArg;
/* 6. 添加新任务到链式任务队列 */
pthread_mutex_lock(&pool->taskLock);/* 获取互斥锁 */
p = pool->queueHead;
if(p == NULL)/* 判断线程池是否为空 */
{
pool->queueHead = newTask;
}
else
{
while(p->next != NULL)
{
p = p->next;
}
p->next = newTask;/* 新任务添加到线程池链表结尾 */
}
/* 7. 更新已使用线程数 */
pool->busyWork++;/* 已使用线程数加一 */
/* 8. 发送信号到工作线程 */
pthread_cond_signal(&pool->newTask);
pthread_mutex_unlock(&pool->taskLock);/* 解除互斥锁 */
return 0;
}5. 任务函数
线程调用的任务函数,这里的休眠时间的延长可以更好的反映实验现象。
c
/**
* @Function: void *realWork(void *taskArg)
* @Description: 线程池任务执行内容函数
* @param taskArg : 任务参数
* @return : 返回(void *)0
*/
void *realWork(void *taskArg)
{
printf(" Finish work %d\n",(int)taskArg);
sleep(2);
return (void *)0;
}6. 线程池销毁
6.1 销毁步骤
任务全部执行完毕后,进程结束,需要销毁线程池,释放内存:
- (1)释放任务链表所有节点内存;
- (2)销毁互斥锁;
- (3)销毁条件变量,这里需要注意一下,是若有个线程在
pthread_cond_wait等着,那这个条件变量就不能销毁,高版本的Ubuntu会卡死,低版本应该是不会的,这也是Ubuntu的一个优化吧,应该还有其他解决办法可以兼顾,不过自己也是初学,随着后边深入学习,解决了再更新这里吧。 - (4)销毁线程池。
6.2 函数实现
c
/**
* @Function: void poolDestory(void)
* @Description: 线程池销毁
* @param : none
* @return : none
*/
void poolDestory(void)
{
/* 1. 定义一个任务链表结构体指针变量 */
Task * p;
/* 2. 释放线程任务节点 */
while(pool->queueHead != NULL)
{
p = pool->queueHead;
pool->queueHead = pool->queueHead->next;
free(p);
}
/* 释放互斥锁 */
pthread_mutex_destroy(&pool->taskLock);
/* 释放条件变量 */
/* 是因为有个线程在pthread_cond_wait等着,所以不能销毁,高版本的Ubuntu会卡死,低版本应该是不会的,这也是Ubuntu的一个优化把 */
// pthread_cond_destroy(&pool->newTask);
free(pool);
printf("free end!\n");
}三、完整实例
1. main.c
c
/* 头文件 */
#include <stdio.h>
#include "pthread_pool.h"
/* 主函数 */
int main(int argc, char *argv[])
{
int ret = 0;
int i = 0;
threadPoolInit();
sleep(1);
printf("Start create Task!\n");
for(i = 0; i < 20; i++)
{
ret = poolAddTask(i);
sleep(0.5);
if(ret == 0)
{
printf("Add task[%d] success!\n", i);
}
else
{
printf("Add task[%d] failed!\n", i);
}
}
sleep(5);
poolDestory();
return 0;
}2. pthread_pool.c
c
#include "pthread_pool.h"
ThreadPool * pool = NULL;/* 定义一个线程池,需要申请内存 */
/**
* @Function: void *workThread(void *arg)
* @Description: 线程池中线程的运行函数
* @param arg : 线程创建函数传递给工作线程的参数
* @return : 返回一个void指针类型数据
*/
void *workThread(void *arg)
{
/* 定义一个任务结构体指针变量 */
Task *ptask;
printf("This is %d thread\n", (int)arg);
while(1)
{
/* 获取互斥锁 */
pthread_mutex_lock(&pool->taskLock);
/* 等待条件变量信号(收到信号时会唤醒至少一个线程) */
pthread_cond_wait(&pool->newTask, &pool->taskLock);
printf("Thread [%ld] execution,working[%d]!",pthread_self(), pool->busyWork);
/* 从链式任务队列取出一个任务 */
ptask = pool->queueHead;/* 指向任务队列的头节点 */
pool->queueHead = pool->queueHead->next;/* 头指针指向下一个任务节点 */
/* 解除互斥锁 */
pthread_mutex_unlock(&pool->taskLock);
/* 线程执行任务函数 */
ptask->func(ptask->taskArg);
/* 更新线程池已占用线程数量(任务函数执行完毕后,线程空闲) */
pool->busyWork--;
}
return (void *)0;
}
/**
* @Function: void threadPoolInit(void)
* @Description: 线程池初始化
* @param : none
* @return : 返回一个整型数值
* 0,成功;
* -1,失败
*/
int threadPoolInit(void)
{
int i = 0;
/* 1.申请线程池的内存空间 */
pool = (ThreadPool *)malloc(sizeof(ThreadPool));
/* 2. 判断是否申请成功 */
if( pool == NULL)
{
printf("ThreadPool malloc failed!\n");
return -1;
}
/* 3. 初始化互斥锁 */
pthread_mutex_init(&pool->taskLock,NULL);
/* 4. 初始化条件变量 */
pthread_cond_init(&pool->newTask,NULL);
/* 5. 初始化链式任务队列指针 */
pool->queueHead = NULL;
/* 6. 初始化线程池已占用线程数量 */
pool->busyWork=0;
/* 7. 创建所有的工作线程 */
for(i = 0; i < POOL_NUM; i++)
{
pthread_create(&pool->tid[i], NULL, workThread, (void *)i);
}
return 0;
}
/**
* @Function: int poolAddTask(int taskArg)
* @Description: 向任务队列中添加新的任务节点
* @param taskArg : 递给实际执行的任务函数的参数
* @return : 返回一个整型数值
* 0,成功;
* -1,失败
*/
int poolAddTask(int taskArg)
{
/* 1. 定义两个任务链表结构体指针变量 */
Task * newTask;
Task * p;
pthread_mutex_lock(&pool->taskLock);/* 获取互斥锁 */
/* 2. 判断线程池中是否还有线程可以执行新添加的任务 */
while(pool->busyWork >= POOL_NUM)
{
pthread_mutex_unlock(&pool->taskLock);/* 解除互斥锁 */
usleep(10000);/* us休眠 */
pthread_mutex_lock(&pool->taskLock); /* 获取互斥锁 */
}
pthread_mutex_unlock(&pool->taskLock); /* 解除互斥锁 */
/* 3. 申请新线程任务节点内存 */
newTask = (Task *)malloc(sizeof(Task));
/* 4. 判断是否申请成功 */
if( newTask == NULL)
{
printf("newTask malloc failed!\n");
return -1;
}
/* 5. 对新建的任务节点进行初始化 */
newTask->func = realWork;/* 线程运行函数中所执行的真正的任务内容函数 */
newTask->taskArg = (void *)taskArg;
/* 6. 添加新任务到链式任务队列 */
pthread_mutex_lock(&pool->taskLock);/* 获取互斥锁 */
p = pool->queueHead;
if(p == NULL)/* 判断线程池是否为空 */
{
pool->queueHead = newTask;
}
else
{
while(p->next != NULL)
{
p = p->next;
}
p->next = newTask;/* 新任务添加到线程池链表结尾 */
}
/* 7. 更新已使用线程数 */
pool->busyWork++;/* 已使用线程数加一 */
/* 8. 发送信号到工作线程 */
pthread_cond_signal(&pool->newTask);
pthread_mutex_unlock(&pool->taskLock);/* 解除互斥锁 */
return 0;
}
/**
* @Function: void *realWork(void *taskArg)
* @Description: 线程池任务执行内容函数
* @param taskArg : 任务参数
* @return : 返回(void *)0
*/
void *realWork(void *taskArg)
{
printf(" Finish work %d\n",(int)taskArg);
sleep(2);
return (void *)0;
}
/**
* @Function: void poolDestory(void)
* @Description: 线程池销毁
* @param : none
* @return : none
*/
void poolDestory(void)
{
/* 1. 定义一个任务链表结构体指针变量 */
Task * p;
/* 2. 释放线程任务节点 */
while(pool->queueHead != NULL)
{
p = pool->queueHead;
pool->queueHead = pool->queueHead->next;
free(p);
}
/* 释放互斥锁 */
pthread_mutex_destroy(&pool->taskLock);
/* 释放条件变量 */
/* 是因为有个线程在pthread_cond_wait等着,所以不能销毁,高版本的Ubuntu会卡死,低版本应该是不会的,这也是Ubuntu的一个优化把 */
// pthread_cond_destroy(&pool->newTask);
free(pool);
printf("free end!\n");
}3. pthread_pool.h
c
#ifndef __THREAD_POOL_H
#define __THREAD_POOL_H
/* 头文件 */
#include <stdio.h>
#include <pthread.h>/* pthread_create pthread_exit pthread_cancel pthread_self */
#include <unistd.h> /* sleep */
#include <string.h> /* strerror */
#include <stdlib.h> /* malloc */
/* 任务结构体定义 */
typedef struct Task
{
void *(*func)(void *arg);/* 函数指针,指向需要执行的任务函数 */
void *taskArg; /* 任务参数,传递给任务函数的参数 */
struct Task *next; /* 指向下一个任务的指针变量 */
} Task;
/* 线程池结构体的定义 */
#define POOL_NUM 10 /* 线程池最大线程数 */
typedef struct ThreadPool
{
pthread_mutex_t taskLock;/* 互斥变量 */
pthread_cond_t newTask; /* 条件变量 */
pthread_t tid[POOL_NUM]; /* 线程池中线程的线程号 tid */
Task *queueHead; /* 指向任务队列队头的指针 */
int busyWork; /* 线程池已在执行任务的线程数 */
}ThreadPool;
extern ThreadPool * pool;
void *workThread(void *arg);/* 线程池中线程的运行函数 */
int threadPoolInit(void);/* 线程池初始化 */
int poolAddTask(int taskArg);/* 向任务队列中添加新的任务节点 */
void *realWork(void *arg);/* 线程池中线程执行的任务函数 */
void poolDestory(void);/* 线程池销毁 */
#endif