手搓线程池(C++&Go)——生产消费者模型
本文最后更新于39 天前,其中的信息可能已经过时,如有错误请留言

线程池用到了生产消费者模型,也就是生产者生产一个task,线程作为消费者去消费task(执行任务)。为什么使用线程池,其原因之一在于控制线程的数量(后面会用golang语言实现协程池,同样是为了防止协程数量过多,协程可以理解为用户态线程,其更轻量化,需要占用的资源更小,后面有机会再详细整理相关资料)防止线程数量过多引起系统资源不足而崩溃。

一、用C++实现线程池

手搓线程池需要用到以下工具:

1、条件变量

2、锁(mutex)(条件变量需要用到)

为什么说条件变量需要用到锁呢,因为task是用队列的形式存储的,而queue在C++中,并不是线程安全的,也就是说,不能有两个线程同时操作这个队列,所以在task出队,以及task入队的过程中,必须要加锁。

那么有个问题,线程需要知道taskqueue为不为空,再决定要不要阻塞,任务队列为空,那么线程就阻塞等待,如果任务队列不为空,那么线程就取出任务开始执行,那么实际上要先加锁,再读取队列(防止读取过程中被其他线程写入,造成读取结果的不确定),那么如果读取完发现队列是空的,线程就要阻塞等待,那这个时候,肯定不能继续加着锁等待,否则其他线程将全部无法获取锁从而阻塞。

所以实际上C++中的条件变量类型(condition_variable)在wait方法中,带一个函数,除非函数返回true,否则就会继续阻塞等待,阻塞之前会把锁解开,然后线程开始阻塞,失去CPU时间片,该线程不会再获得CPU时间片,除非被再次唤醒,当线程被唤醒后,将会再次加锁,因为后续要操作task队列。

再来看线程被唤醒后,可能是因为有任务入队,但线程被唤醒后因为要加锁,所以阻塞等待,等到线程真正操作队列的时候,其中的任务可能已经被其他线程取走,比如说当条件变量用notify_all方法时,仅有一个任务入队,就唤醒全部线程,那么只有一个线程能获得task,其他线程必须判断队列是否为空,如果为空,需要继续执行wait方法,即解锁阻塞,等待唤醒。

最终实现如下:

“mythreadpool.h”文件

#pragma once
#include <thread>
#include <vector>
#include <atomic>
#include <functional>
#include <mutex>
#include <queue>
#include <condition_variable>
#include <iostream>
#include <sys/syscall.h>
#include <unistd.h>
using namespace std;
class ThreadPool
{
private:
    vector<thread> threads_;
    queue<function<void()>> taskqueue_;
    mutex mutex_;
    condition_variable cond_;
    atomic_bool stop_;
    string type_;
public:
    ThreadPool(int threadNum);
    ~ThreadPool();
    void Stop();
    void AddTask(function<void()> fn);
    size_t Size();
};

“mythreadpool.cpp”文件

#include "mythreadpool.h"
ThreadPool::ThreadPool(int threadNum):stop_(false)
{
    for (size_t i = 0; i < threadNum; i++) {
        threads_.emplace_back([this]{
            while(!stop_) {
                function<void()> task;
                {
                    unique_lock<mutex> lock(this->mutex_);
                    // while(this->stop_ != true && this->taskqueue_.empty() != false) {
                    //     this->cond_.wait(lock);
                    // }
                    this->cond_.wait(lock, [this]{
                        printf("threadid = %d, queue.empty() = %d\n", syscall(SYS_gettid), this->taskqueue_.empty());
                        return (this->stop_ == true) || (this->taskqueue_.empty() == false);
                    });
                    if (this->stop_) { /* 这里如果为true,taskqueue可能为空 */
                        printf("threadid = %d, stop...\n");
                        return;
                    }
                /* if (this->taskqueue_.empty()) {
                        cout << "task queue empty!\n" << endl;
                        continue;
                    } 这个函数不可能执行到 */
                    task = this->taskqueue_.front();
                    this->taskqueue_.pop();
                }
                printf("threadid = %d, start execute task...\n", syscall(SYS_gettid));
                task();
            }
        });
    }
}
void ThreadPool::Stop()
{
    if (stop_) {
        return;
    }
    stop_ = true;
    cond_.notify_all();
    for (thread &th : threads_) {
        th.join();
    }
}
ThreadPool::~ThreadPool()
{
    Stop();
}
void ThreadPool::AddTask(function<void()> fn)
{
    {
        unique_lock<mutex> lock(this->mutex_);
        taskqueue_.push(fn);
    }
    cond_.notify_one();
}
size_t ThreadPool::Size()
{
    return threads_.size();
}

“main.cpp”文件

#include "mythreadpool.h"
int main(int argc, char *argv[])
{
    if (argc != 2) {
        printf("./main 3\n");
        return -1;
    }
    int threadNum = atoi(argv[1]);
    ThreadPool pool(threadNum);
    pool.AddTask([]{
        printf("threadid = %d, task1\n", syscall(SYS_gettid));
        this_thread::sleep_for(std::chrono::seconds(2));
        printf("task1 end...\n");
    });
    this_thread::sleep_for(std::chrono::seconds(1));
    pool.AddTask([]{
        printf("threadid = %d, task2\n", syscall(SYS_gettid));
        this_thread::sleep_for(std::chrono::seconds(2));
        printf("task2 end...\n");
    });
    pool.AddTask([]{
        printf("threadid = %d, task3\n", syscall(SYS_gettid));
        this_thread::sleep_for(std::chrono::seconds(2));
        printf("task3 end...\n");
    });
    this_thread::sleep_for(std::chrono::seconds(3));
}

编译脚本:

g++ -o main main.cpp mythreadpool.cpp -pthread

二、用Go实现线程池

Go中使用channel实现线程池。

package main

import (
	"fmt"
	"sync"
)

type ThreadPool struct {
	threadNum int
	wg        sync.WaitGroup
	taskQueue chan func() error
}

func (t *ThreadPool) Init(threadNum int, capacity int) {
	t.threadNum = threadNum
	t.wg.Add(threadNum)
	t.taskQueue = make(chan func() error, capacity)
	for i := 0; i < threadNum; i++ {
		go func() {
			defer t.wg.Done()
			for task := range t.taskQueue {
				if err := task(); err != nil {
					fmt.Println(err)
				}
			}
		}()
	}
}

func (t *ThreadPool) AddTask(task func() error) {
	t.taskQueue <- task
}

func (t *ThreadPool) Stop() {
	close(t.taskQueue)
	t.wg.Wait()
}

func f() error {
	fmt.Println("f")
	return nil
}

func main() {
	var threadPool ThreadPool
	threadPool.Init(3, 5)
	for i := 0; i < 10; i++ {
		threadPool.AddTask(f)
	}
	threadPool.Stop()
}

等待后续进一步完善…

感谢阅读!如有疑问请留言

评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇