Skip to content

LV100-线程池

本文主要是线程的应用——线程池的相关笔记,若笔记中有错误或者不合适的地方,欢迎批评指正😃。

一、线程池原理

我们需要使用线程的时候就去创建一个线程,不需要就将线程销毁,但是就会有这样一个问题:如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程都是需要时间的。

这个时候我们就可以使用线程池来解决这个问题了。线程池的主要思想是:在进程开始时创建一定数量的线程,并加到线程池中以等待工作。当服务器收到请求时,它会唤醒池内的一个线程(如果有可用线程),并将需要服务的请求传递给它。一旦线程完成了服务,它会返回到池中再等待工作。如果线程池内没有可用线程,那么服务器会等待,直到有空线程为止。

image-20230628213732591

二、线程池实现

1. 结构体实现

1.1 任务结构体

我们将使用链表构建一个任务队列,每个任务的结构体成员设计如下:

c
typedef struct Task
{
	void *(*func)(void *arg);/* 函数指针,指向需要执行的任务函数 */
	void *taskArg;           /* 任务参数,传递给任务函数的参数 */
	struct Task *next;       /* 指向下一个任务的指针变量 */
} Task;

成员说明

  • func,函数指针变量,可以指向一个返回值为void *的带有一个void *类型参数的真实任务函数。
  • taskArgvoid类型的指针变量,表示传给执行真实任务的函数的参数。
  • next:指针域,指向下一个任务结构体。

1.2线程池结构体

我们也是通过链表来存储线程池中的各个工作线程,每个工作线程的结构体成员设计如下:

c
#define POOL_NUM 10 /* 线程池最大线程数 */
typedef struct ThreadPool
{
	pthread_mutex_t taskLock;/* 互斥锁 */
	pthread_cond_t newTask;  /* 条件变量 */

	pthread_t tid[POOL_NUM]; /* 工作线程的线程号 tid */
	Task *queueHead;         /* 任务队列头指针 */
	int busyWork;            /* 线程池中已使用线程数 */

}ThreadPool;

1.3两个结构体联系

上边两个结构体的联系如下图所示:

image-20220527173459949

2. 线程池初始化

2.1 实现步骤

  • (1)申请线程池的内存空间并判断是否申请成功;
  • (2)对线程池的各个参数进行初始化;
  • (3)创建线程,这里要注意不要超过系统可创建的最大线程数。

2.2 函数实现

注意先定义一个指向线程池的全局指针变量:

c
ThreadPool * pool = NULL;/* 定义一个线程池,需要申请内存 */

函数实现如下:

c
/**
 * @Function: void threadPoolInit(void)
 * @Description: 线程池初始化
 * @param   : none
 * @return  : 返回一个整型数值
 *            0,成功;
 *           -1,失败
 */
int threadPoolInit(void)
{
	int i = 0;
	/* 1.申请线程池的内存空间 */
	pool = (ThreadPool *)malloc(sizeof(ThreadPool));
	/* 2. 判断是否申请成功 */
	if( pool == NULL)
	{
		printf("ThreadPool malloc failed!\n");
		return -1;
	}
	/* 3. 初始化互斥锁 */
	pthread_mutex_init(&pool->taskLock,NULL);
	/* 4. 初始化条件变量 */
	pthread_cond_init(&pool->newTask,NULL);
	/* 5. 初始化链式任务队列指针 */
	pool->queueHead = NULL;
	/* 6. 初始化线程池已占用线程数量 */
	pool->busyWork=0;
	/* 7. 创建所有的工作线程 */
	for(i = 0; i < POOL_NUM; i++)
	{
		pthread_create(&pool->tid[i], NULL, workThread, (void *)i);
	}
	return 0;
}

3. 线程开始函数

3.1 实现步骤

线程被创建之后,就开始运行线程开始函数,在线程开始函数中我们要做的事有:

  • (1)获取互斥锁
  • (2)等待条件变量,就是等待任务队列中有任务产生;
  • (3)收到信号时,从任务队列取出一个任务;
  • (4)释放互斥锁;
  • (5)传入任务参数,执行任务函数;
  • (6)执行完毕后,更新线程池中已被占用的线程数量。

3.2 函数实现

c
/**
 * @Function: void *workThread(void *arg)
 * @Description: 线程池中线程的运行函数
 * @param arg : 线程创建函数传递给工作线程的参数
 * @return    : 返回一个void指针类型数据
 */
void *workThread(void *arg)
{
	/* 定义一个任务结构体指针变量 */
	Task *ptask;
	printf("This is %d thread\n", (int)arg);
	while(1)
	{
		/* 获取互斥锁 */
		pthread_mutex_lock(&pool->taskLock);  

		/* 等待条件变量信号(收到信号时会唤醒至少一个线程) */
		pthread_cond_wait(&pool->newTask, &pool->taskLock);
		printf("Thread [%ld] execution,working[%d]!",pthread_self(), pool->busyWork);
		/* 从链式任务队列取出一个任务 */
		ptask = pool->queueHead;/* 指向任务队列的头节点 */
		pool->queueHead = pool->queueHead->next;/* 头指针指向下一个任务节点 */
		/* 解除互斥锁 */
		pthread_mutex_unlock(&pool->taskLock);

		/* 线程执行任务函数 */
		ptask->func(ptask->taskArg);
		/* 更新线程池已占用线程数量(任务函数执行完毕后,线程空闲) */
		pool->busyWork--;
	}
	return (void *)0;
}

4. 添加任务

4.1 实现步骤

  • (1)获取互斥锁
  • (2)判断线程池中是否还有空闲的线程可以执行任务,若没有则释放互斥锁,休眠等待,进行下一轮判断前要先获取已经释放的互斥锁;
  • (3)申请新任务节点内存,并判断是否申请成功;
  • (4)对新任务进行初始化;
  • (5)添加新任务到任务队列,任务队列是链表,注意插入的方式;
  • (6)新任务到来,需要唤醒一个线程,所以这里已占用线程数需要更新一下;
  • (7)发送有新任务的信号,至少唤醒一个空闲线程即可。

4.2 函数实现

c
/**
 * @Function: int poolAddTask(int taskArg)
 * @Description: 向任务队列中添加新的任务节点
 * @param taskArg : 递给实际执行的任务函数的参数
 * @return  : 返回一个整型数值
 *            0,成功;
 *           -1,失败
 */
int poolAddTask(int taskArg)
{
	/* 1. 定义两个任务链表结构体指针变量 */
	Task * newTask;
	Task * p;

	pthread_mutex_lock(&pool->taskLock);/* 获取互斥锁 */

	/* 2. 判断线程池中是否还有线程可以执行新添加的任务 */
	while(pool->busyWork >= POOL_NUM)
	{
		pthread_mutex_unlock(&pool->taskLock);/* 解除互斥锁 */
		usleep(10000);/* us休眠 */
		pthread_mutex_lock(&pool->taskLock);  /* 获取互斥锁 */
	}
	pthread_mutex_unlock(&pool->taskLock); /* 解除互斥锁 */

	/* 3. 申请新线程任务节点内存 */
	newTask = (Task *)malloc(sizeof(Task));
	/* 4. 判断是否申请成功 */
	if( newTask == NULL)
	{
		printf("newTask malloc failed!\n");
		return -1;
	}
	/* 5. 对新建的任务节点进行初始化 */
	newTask->func =  realWork;/* 线程运行函数中所执行的真正的任务内容函数 */
	newTask->taskArg = (void *)taskArg;

	/* 6. 添加新任务到链式任务队列 */
	pthread_mutex_lock(&pool->taskLock);/* 获取互斥锁 */
	p = pool->queueHead;
	if(p == NULL)/* 判断线程池是否为空 */
	{
		pool->queueHead = newTask;
	}
	else
	{
		while(p->next != NULL)
		{
			p = p->next;
		}
		p->next = newTask;/* 新任务添加到线程池链表结尾 */

	}
	/* 7. 更新已使用线程数 */
	pool->busyWork++;/* 已使用线程数加一 */
	/* 8. 发送信号到工作线程 */
	pthread_cond_signal(&pool->newTask);

	pthread_mutex_unlock(&pool->taskLock);/* 解除互斥锁 */

	return 0;
}

5. 任务函数

线程调用的任务函数,这里的休眠时间的延长可以更好的反映实验现象。

c
/**
 * @Function: void *realWork(void *taskArg)
 * @Description: 线程池任务执行内容函数
 * @param taskArg : 任务参数
 * @return    : 返回(void *)0
 */
void *realWork(void *taskArg)
{
	printf(" Finish work %d\n",(int)taskArg);
	sleep(2);
	return (void *)0;
}

6. 线程池销毁

6.1 销毁步骤

任务全部执行完毕后,进程结束,需要销毁线程池,释放内存:

  • (1)释放任务链表所有节点内存;
  • (2)销毁互斥锁;
  • (3)销毁条件变量,这里需要注意一下,是若有个线程在pthread_cond_wait等着,那这个条件变量就不能销毁,高版本的Ubuntu会卡死,低版本应该是不会的,这也是Ubuntu的一个优化吧,应该还有其他解决办法可以兼顾,不过自己也是初学,随着后边深入学习,解决了再更新这里吧。
  • (4)销毁线程池。

6.2 函数实现

c
/**
 * @Function: void poolDestory(void)
 * @Description: 线程池销毁
 * @param   : none
 * @return  : none
 */
void poolDestory(void)
{

	/* 1. 定义一个任务链表结构体指针变量 */
	Task * p;
	/* 2. 释放线程任务节点 */
	while(pool->queueHead != NULL)
	{
		p = pool->queueHead;
		pool->queueHead = pool->queueHead->next;
		free(p);
	}

	/* 释放互斥锁 */
	pthread_mutex_destroy(&pool->taskLock);

	/* 释放条件变量 */
	/* 是因为有个线程在pthread_cond_wait等着,所以不能销毁,高版本的Ubuntu会卡死,低版本应该是不会的,这也是Ubuntu的一个优化把 */
	// pthread_cond_destroy(&pool->newTask);

	free(pool);
	printf("free end!\n");
}

三、完整实例

1. main.c

c
/* 头文件 */
#include <stdio.h>
#include "pthread_pool.h"
/* 主函数 */
int main(int argc, char *argv[])
{
	int ret = 0;
	int i = 0;
	threadPoolInit();
	sleep(1);
	printf("Start create Task!\n");
	for(i = 0; i < 20; i++)
	{
		ret = poolAddTask(i);
		sleep(0.5);
		if(ret == 0)
		{
			printf("Add task[%d] success!\n", i);
		}
		else
		{
			printf("Add task[%d] failed!\n", i);
		}
	}
	
	sleep(5);
	poolDestory();
	return 0;
}

2. pthread_pool.c

c
#include "pthread_pool.h"

ThreadPool * pool = NULL;/* 定义一个线程池,需要申请内存 */
/**
 * @Function: void *workThread(void *arg)
 * @Description: 线程池中线程的运行函数
 * @param arg : 线程创建函数传递给工作线程的参数
 * @return    : 返回一个void指针类型数据
 */
void *workThread(void *arg)
{
	/* 定义一个任务结构体指针变量 */
	Task *ptask;
	printf("This is %d thread\n", (int)arg);
	while(1)
	{
		/* 获取互斥锁 */
		pthread_mutex_lock(&pool->taskLock);  
		
		/* 等待条件变量信号(收到信号时会唤醒至少一个线程) */
		pthread_cond_wait(&pool->newTask, &pool->taskLock);
		printf("Thread [%ld] execution,working[%d]!",pthread_self(), pool->busyWork);
		/* 从链式任务队列取出一个任务 */
		ptask = pool->queueHead;/* 指向任务队列的头节点 */
		pool->queueHead = pool->queueHead->next;/* 头指针指向下一个任务节点 */
		/* 解除互斥锁 */
		pthread_mutex_unlock(&pool->taskLock);

		/* 线程执行任务函数 */
		ptask->func(ptask->taskArg);
		/* 更新线程池已占用线程数量(任务函数执行完毕后,线程空闲) */
		pool->busyWork--;
	}
	return (void *)0;
}

/**
 * @Function: void threadPoolInit(void)
 * @Description: 线程池初始化
 * @param   : none
 * @return  : 返回一个整型数值
 *            0,成功;
 *           -1,失败
 */
int threadPoolInit(void)
{
	int i = 0;
	/* 1.申请线程池的内存空间 */
	pool = (ThreadPool *)malloc(sizeof(ThreadPool));
	/* 2. 判断是否申请成功 */
	if( pool == NULL)
	{
		printf("ThreadPool malloc failed!\n");
		return -1;
	}
	/* 3. 初始化互斥锁 */
	pthread_mutex_init(&pool->taskLock,NULL);
	/* 4. 初始化条件变量 */
	pthread_cond_init(&pool->newTask,NULL);
	/* 5. 初始化链式任务队列指针 */
	pool->queueHead = NULL;
	/* 6. 初始化线程池已占用线程数量 */
	pool->busyWork=0;
	/* 7. 创建所有的工作线程 */
	for(i = 0; i < POOL_NUM; i++)
	{
		pthread_create(&pool->tid[i], NULL, workThread, (void *)i);
	}
	return 0;
}

/**
 * @Function: int poolAddTask(int taskArg)
 * @Description: 向任务队列中添加新的任务节点
 * @param taskArg : 递给实际执行的任务函数的参数
 * @return  : 返回一个整型数值
 *            0,成功;
 *           -1,失败
 */
int poolAddTask(int taskArg)
{
	/* 1. 定义两个任务链表结构体指针变量 */
	Task * newTask;
	Task * p;

	pthread_mutex_lock(&pool->taskLock);/* 获取互斥锁 */

	/* 2. 判断线程池中是否还有线程可以执行新添加的任务 */
	while(pool->busyWork >= POOL_NUM)
	{
		pthread_mutex_unlock(&pool->taskLock);/* 解除互斥锁 */
		usleep(10000);/* us休眠 */
		pthread_mutex_lock(&pool->taskLock);  /* 获取互斥锁 */
	}
	pthread_mutex_unlock(&pool->taskLock); /* 解除互斥锁 */

	/* 3. 申请新线程任务节点内存 */
	newTask = (Task *)malloc(sizeof(Task));
	/* 4. 判断是否申请成功 */
	if( newTask == NULL)
	{
		printf("newTask malloc failed!\n");
		return -1;
	}
	/* 5. 对新建的任务节点进行初始化 */
	newTask->func =  realWork;/* 线程运行函数中所执行的真正的任务内容函数 */
	newTask->taskArg = (void *)taskArg;

	/* 6. 添加新任务到链式任务队列 */
	pthread_mutex_lock(&pool->taskLock);/* 获取互斥锁 */
	p = pool->queueHead;
	if(p == NULL)/* 判断线程池是否为空 */
	{
		pool->queueHead = newTask;
	}
	else
	{
		while(p->next != NULL)
		{
			p = p->next;
		}
		p->next = newTask;/* 新任务添加到线程池链表结尾 */

	}
	/* 7. 更新已使用线程数 */
	pool->busyWork++;/* 已使用线程数加一 */
	/* 8. 发送信号到工作线程 */
	pthread_cond_signal(&pool->newTask);

	pthread_mutex_unlock(&pool->taskLock);/* 解除互斥锁 */

	return 0;
}

/**
 * @Function: void *realWork(void *taskArg)
 * @Description: 线程池任务执行内容函数
 * @param taskArg : 任务参数
 * @return    : 返回(void *)0
 */
void *realWork(void *taskArg)
{
	printf(" Finish work %d\n",(int)taskArg);
	sleep(2);
	return (void *)0;
}

/**
 * @Function: void poolDestory(void)
 * @Description: 线程池销毁
 * @param   : none
 * @return  : none
 */
void poolDestory(void)
{

	/* 1. 定义一个任务链表结构体指针变量 */
	Task * p;
	/* 2. 释放线程任务节点 */
	while(pool->queueHead != NULL)
	{
		p = pool->queueHead;
		pool->queueHead = pool->queueHead->next;
		free(p);
	}

	/* 释放互斥锁 */
	pthread_mutex_destroy(&pool->taskLock);

	/* 释放条件变量 */
	/* 是因为有个线程在pthread_cond_wait等着,所以不能销毁,高版本的Ubuntu会卡死,低版本应该是不会的,这也是Ubuntu的一个优化把 */
	// pthread_cond_destroy(&pool->newTask);

	free(pool);
	printf("free end!\n");
}

3. pthread_pool.h

c
#ifndef __THREAD_POOL_H
#define __THREAD_POOL_H
/* 头文件 */
#include <stdio.h>
#include <pthread.h>/* pthread_create pthread_exit pthread_cancel pthread_self */
#include <unistd.h> /* sleep */
#include <string.h> /* strerror */
#include <stdlib.h> /* malloc */

/* 任务结构体定义 */
typedef struct Task
{
	void *(*func)(void *arg);/* 函数指针,指向需要执行的任务函数 */
	void *taskArg;           /* 任务参数,传递给任务函数的参数 */
	struct Task *next;       /* 指向下一个任务的指针变量 */
} Task;

/* 线程池结构体的定义 */
#define POOL_NUM 10 /* 线程池最大线程数 */
typedef struct ThreadPool
{
	pthread_mutex_t taskLock;/* 互斥变量 */
	pthread_cond_t newTask;  /* 条件变量 */

	pthread_t tid[POOL_NUM]; /* 线程池中线程的线程号 tid */
	Task *queueHead;         /* 指向任务队列队头的指针 */
	int busyWork;            /* 线程池已在执行任务的线程数 */

}ThreadPool;

extern ThreadPool * pool;
void *workThread(void *arg);/* 线程池中线程的运行函数 */
int threadPoolInit(void);/* 线程池初始化 */
int poolAddTask(int taskArg);/* 向任务队列中添加新的任务节点 */
void *realWork(void *arg);/* 线程池中线程执行的任务函数 */
void poolDestory(void);/* 线程池销毁 */
#endif