c  多线程系列二 自定义线程执行器

多线程编程是现代计算机程序开发不可或缺的一部分。在多线程编程中,线程执行器(Thread Executor)扮演着关键的角色,它负责管理和调度线程的执行。Java语言提供了Executor框架和ThreadPoolExecutor类来方便地管理并发任务,但是在C语言中,没有现成的线程执行器供我们使用。本文将介绍如何自定义一个C语言的线程执行器,并提供案例说明。

## 线程执行器的概念

线程执行器(Thread Executor)是负责管理和调度线程的执行的机制。它为我们隐藏了线程的创建和管理,使我们可以专注于编写业务逻辑。一些常用的线程执行器,如Java中的Executor框架和ThreadPoolExecutor类、C++中的std::thread等。

线程执行器通常会提供以下几个功能:

1. 线程池管理:线程池是由若干个线程组成的线程集合,线程执行器可以管理和调度线程池中的所有线程,并可以控制线程池的大小和任务队列。

2. 任务调度:线程执行器会接收任务并将任务分配给线程池中的线程,从而实现多任务并发执行。

3. 任务队列:任务队列是存储任务的数据结构,线程执行器会将新接收到的任务放入任务队列中,并让线程池中的线程从任务队列中取出任务执行。

## 自定义线程执行器的实现步骤

自定义C语言的线程执行器的实现步骤如下:

1. 定义线程池结构体:线程池结构体包含线程池的大小、标识线程池是否正在运行、任务队列、任务队列条件变量、任务队列互斥锁、线程数组等字段。

2. 实现线程池初始化函数:初始化线程池,包括线程池大小、任务队列初始化、互斥锁和条件变量的初始化等。

3. 实现线程池销毁函数:销毁线程池,释放线程池及其相关资源。

4. 实现添加任务函数:将任务添加到任务队列中。

5. 实现线程函数:线程函数中实现任务的获取和执行。

6. 实现线程执行器的入口函数:在入口函数中初始化线程池、添加任务,并等待任务执行完毕后销毁线程池。

下面是一个基本的线程执行器的代码框架:

```c

typedef struct ThreadPool ThreadPool;

typedef void (*TaskFunc)(void*);

struct ThreadPool {

int pool_size; // 线程池大小

int is_running; // 线程池是否正在运行

pthread_mutex_t mutex; // 互斥锁

pthread_cond_t cond; // 条件变量

TaskQueue task_queue; // 任务队列

pthread_t* threads; // 线程数组

};

int thread_pool_init(ThreadPool* pool, int pool_size);

int thread_pool_destroy(ThreadPool* pool);

void thread_pool_add_task(ThreadPool* pool, TaskFunc func, void* arg);

void* thread_func(void* arg);

void thread_pool_execute(ThreadPool* pool);

```

## 线程执行器的使用方法

我们来看看如何使用自定义的线程执行器。

首先,在使用线程执行器之前,我们需要定义任务函数。任务函数需要接收一个`void*`类型的参数,返回类型为void。例如:

```c

void task_func(void* arg) {

int* num = (int*) arg;

printf("num: %d\n", *num);

}

```

然后,我们需要初始化线程池。

```c

ThreadPool pool;

thread_pool_init(&pool, 4); // 创建一个大小为4的线程池

```

之后,我们可以添加任务到线程池中。

```c

int num = 10;

thread_pool_add_task(&pool, task_func, &num); // 添加任务

```

在添加完所有任务后,我们需要等待所有任务执行完毕后销毁线程池。

```c

thread_pool_execute(&pool); // 等待任务执行完毕

thread_pool_destroy(&pool); // 销毁线程池

```

## 线程执行器的案例说明:求素数

下面我们通过一个求素数的例子来说明如何使用自定义线程执行器。算法的作用是计算小于等于给定整数n的所有素数。

我们将这个算法分解为多个任务,每一个任务处理一个范围内的整数。例如,第一个任务处理从1到n/4的整数,第二个任务处理从n/4+1到n/2的整数,以此类推。每个任务最终会返回它处理范围内的素数个数。

我们将任务的返回值累加起来,就可以获得小于等于n的素数总个数。

下面是完整的代码:

```c

#include

#include

#include

#define MAX_N 10000000

#define THREAD_POOL_SIZE 4

typedef struct {

int start;

int end;

int count;

} TaskArgs;

typedef struct {

int pool_size;

int is_running;

pthread_mutex_t mutex;

pthread_cond_t cond;

TaskArgs task_queue[MAX_N];

int queue_head;

int queue_tail;

pthread_t threads[THREAD_POOL_SIZE];

} ThreadPool;

int is_prime(int n) {

if (n <= 1) return 0;

for (int i = 2; i * i <= n; ++i) {

if (n % i == 0) {

return 0;

}

}

return 1;

}

void task_func(void* arg) {

TaskArgs* task_args = (TaskArgs*) arg;

int count = 0;

for (int i = task_args->start; i < task_args->end; ++i) {

if (is_prime(i)) {

++count;

}

}

task_args->count = count;

}

int thread_pool_init(ThreadPool* pool, int pool_size) {

pool->pool_size = pool_size;

pool->queue_head = 0;

pool->queue_tail = 0;

pool->is_running = 1;

pthread_mutex_init(&pool->mutex, NULL);

pthread_cond_init(&pool->cond, NULL);

for (int i = 0; i < pool_size; ++i) {

pthread_create(&pool->threads[i], NULL, thread_func, pool);

}

return 0;

}

int thread_pool_destroy(ThreadPool* pool) {

pool->is_running = 0;

pthread_cond_broadcast(&pool->cond);

for (int i = 0; i < pool->pool_size; ++i) {

pthread_join(pool->threads[i], NULL);

}

pthread_mutex_destroy(&pool->mutex);

pthread_cond_destroy(&pool->cond);

return 0;

}

void thread_pool_add_task(ThreadPool* pool, void (*task_func)(void*), void* arg) {

pthread_mutex_lock(&pool->mutex);

pool->task_queue[pool->queue_tail].start = ((TaskArgs*) arg)->start;

pool->task_queue[pool->queue_tail].end = ((TaskArgs*) arg)->end;

pool->task_queue[pool->queue_tail].count = 0;

pool->queue_tail = (pool->queue_tail + 1) % MAX_N;

pthread_cond_signal(&pool->cond);

pthread_mutex_unlock(&pool->mutex);

}

void* thread_func(void* arg) {

ThreadPool* pool = (ThreadPool*) arg;

while (pool->is_running) {

TaskArgs task_args;

pthread_mutex_lock(&pool->mutex);

while (pool->queue_head == pool->queue_tail) {

pthread_cond_wait(&pool->cond, &pool->mutex);

if (!pool->is_running) {

pthread_mutex_unlock(&pool->mutex);

return NULL;

}

}

task_args = pool->task_queue[pool->queue_head];

pool->queue_head = (pool->queue_head + 1) % MAX_N;

pthread_mutex_unlock(&pool->mutex);

task_func(&task_args);

}

return NULL;

}

void thread_pool_execute(ThreadPool* pool, int n) {

int range_size = n / THREAD_POOL_SIZE;

for (int i = 0; i < THREAD_POOL_SIZE; ++i) {

TaskArgs task_args;

task_args.start = range_size * i + 1;

task_args.end = range_size * (i + 1) + 1;

thread_pool_add_task(pool, task_func, &task_args);

}

int total_count = 0;

for (int i = 0; i < THREAD_POOL_SIZE; ++i) {

TaskArgs task_args;

pthread_mutex_lock(&pool->mutex);

while (pool->task_queue[pool->queue_head].count == 0) {

pthread_cond_wait(&pool->cond, &pool->mutex);

}

task_args = pool->task_queue[pool->queue_head];

pool->queue_head = (pool->queue_head + 1) % MAX_N;

pthread_mutex_unlock(&pool->mutex);

total_count += task_args.count;

}

printf("%d\n", total_count);

}

int main() {

ThreadPool pool;

thread_pool_init(&pool, THREAD_POOL_SIZE);

thread_pool_execute(&pool, 1000000);

thread_pool_destroy(&pool);

return 0;

}

```

在这个例子中,我们使用线程池分别计算了从1到n/4,n/4+1到n/2,n/2+1到3n/4,3n/4+1到n四个范围内的素数个数,最后将这四个个数相加得到结果。

壹涵网络我们是一家专注于网站建设、企业营销、网站关键词排名、AI内容生成、新媒体营销和短视频营销等业务的公司。我们拥有一支优秀的团队,专门致力于为客户提供优质的服务。

我们致力于为客户提供一站式的互联网营销服务,帮助客户在激烈的市场竞争中获得更大的优势和发展机会!

点赞(2) 打赏

评论列表 共有 0 条评论

暂无评论
立即
投稿
发表
评论
返回
顶部