线程池用到了生产消费者模型,也就是生产者生产一个task,线程作为消费者去消费task(执行任务)。为什么使用线程池,其原因之一在于控制线程的数量(后面会用golang语言实现协程池,同样是为了防止协程数量过多,协程可以理解为用户态线程,其更轻量化,需要占用的资源更小,后面有机会再详细整理相关资料)防止线程数量过多引起系统资源不足而崩溃。
一、用C++实现线程池
手搓线程池需要用到以下工具:
1、条件变量
2、锁(mutex)(条件变量需要用到)
为什么说条件变量需要用到锁呢,因为task是用队列的形式存储的,而queue在C++中,并不是线程安全的,也就是说,不能有两个线程同时操作这个队列,所以在task出队,以及task入队的过程中,必须要加锁。
那么有个问题,线程需要知道taskqueue为不为空,再决定要不要阻塞,任务队列为空,那么线程就阻塞等待,如果任务队列不为空,那么线程就取出任务开始执行,那么实际上要先加锁,再读取队列(防止读取过程中被其他线程写入,造成读取结果的不确定),那么如果读取完发现队列是空的,线程就要阻塞等待,那这个时候,肯定不能继续加着锁等待,否则其他线程将全部无法获取锁从而阻塞。
所以实际上C++中的条件变量类型(condition_variable)在wait方法中,带一个函数,除非函数返回true,否则就会继续阻塞等待,阻塞之前会把锁解开,然后线程开始阻塞,失去CPU时间片,该线程不会再获得CPU时间片,除非被再次唤醒,当线程被唤醒后,将会再次加锁,因为后续要操作task队列。
再来看线程被唤醒后,可能是因为有任务入队,但线程被唤醒后因为要加锁,所以阻塞等待,等到线程真正操作队列的时候,其中的任务可能已经被其他线程取走,比如说当条件变量用notify_all方法时,仅有一个任务入队,就唤醒全部线程,那么只有一个线程能获得task,其他线程必须判断队列是否为空,如果为空,需要继续执行wait方法,即解锁阻塞,等待唤醒。
最终实现如下:
“mythreadpool.h”文件
#pragma once
#include <thread>
#include <vector>
#include <atomic>
#include <functional>
#include <mutex>
#include <queue>
#include <condition_variable>
#include <iostream>
#include <sys/syscall.h>
#include <unistd.h>
using namespace std;
class ThreadPool
{
private:
vector<thread> threads_;
queue<function<void()>> taskqueue_;
mutex mutex_;
condition_variable cond_;
atomic_bool stop_;
string type_;
public:
ThreadPool(int threadNum);
~ThreadPool();
void Stop();
void AddTask(function<void()> fn);
size_t Size();
};
“mythreadpool.cpp”文件
#include "mythreadpool.h"
ThreadPool::ThreadPool(int threadNum):stop_(false)
{
for (size_t i = 0; i < threadNum; i++) {
threads_.emplace_back([this]{
while(!stop_) {
function<void()> task;
{
unique_lock<mutex> lock(this->mutex_);
// while(this->stop_ != true && this->taskqueue_.empty() != false) {
// this->cond_.wait(lock);
// }
this->cond_.wait(lock, [this]{
printf("threadid = %d, queue.empty() = %d\n", syscall(SYS_gettid), this->taskqueue_.empty());
return (this->stop_ == true) || (this->taskqueue_.empty() == false);
});
if (this->stop_) { /* 这里如果为true,taskqueue可能为空 */
printf("threadid = %d, stop...\n");
return;
}
/* if (this->taskqueue_.empty()) {
cout << "task queue empty!\n" << endl;
continue;
} 这个函数不可能执行到 */
task = this->taskqueue_.front();
this->taskqueue_.pop();
}
printf("threadid = %d, start execute task...\n", syscall(SYS_gettid));
task();
}
});
}
}
void ThreadPool::Stop()
{
if (stop_) {
return;
}
stop_ = true;
cond_.notify_all();
for (thread &th : threads_) {
th.join();
}
}
ThreadPool::~ThreadPool()
{
Stop();
}
void ThreadPool::AddTask(function<void()> fn)
{
{
unique_lock<mutex> lock(this->mutex_);
taskqueue_.push(fn);
}
cond_.notify_one();
}
size_t ThreadPool::Size()
{
return threads_.size();
}
“main.cpp”文件
#include "mythreadpool.h"
int main(int argc, char *argv[])
{
if (argc != 2) {
printf("./main 3\n");
return -1;
}
int threadNum = atoi(argv[1]);
ThreadPool pool(threadNum);
pool.AddTask([]{
printf("threadid = %d, task1\n", syscall(SYS_gettid));
this_thread::sleep_for(std::chrono::seconds(2));
printf("task1 end...\n");
});
this_thread::sleep_for(std::chrono::seconds(1));
pool.AddTask([]{
printf("threadid = %d, task2\n", syscall(SYS_gettid));
this_thread::sleep_for(std::chrono::seconds(2));
printf("task2 end...\n");
});
pool.AddTask([]{
printf("threadid = %d, task3\n", syscall(SYS_gettid));
this_thread::sleep_for(std::chrono::seconds(2));
printf("task3 end...\n");
});
this_thread::sleep_for(std::chrono::seconds(3));
}
编译脚本:
g++ -o main main.cpp mythreadpool.cpp -pthread
二、用Go实现线程池
Go中使用channel实现线程池。
package main
import (
"fmt"
"sync"
)
type ThreadPool struct {
threadNum int
wg sync.WaitGroup
taskQueue chan func() error
}
func (t *ThreadPool) Init(threadNum int, capacity int) {
t.threadNum = threadNum
t.wg.Add(threadNum)
t.taskQueue = make(chan func() error, capacity)
for i := 0; i < threadNum; i++ {
go func() {
defer t.wg.Done()
for task := range t.taskQueue {
if err := task(); err != nil {
fmt.Println(err)
}
}
}()
}
}
func (t *ThreadPool) AddTask(task func() error) {
t.taskQueue <- task
}
func (t *ThreadPool) Stop() {
close(t.taskQueue)
t.wg.Wait()
}
func f() error {
fmt.Println("f")
return nil
}
func main() {
var threadPool ThreadPool
threadPool.Init(3, 5)
for i := 0; i < 10; i++ {
threadPool.AddTask(f)
}
threadPool.Stop()
}
等待后续进一步完善…
评论