C++ 实现线程池

前置知识:

  • c++
  • 操作系统

C++实现线程池

前言

  本文使用C++基本库,实现了固定线程数量的线程池。

线程池基础

  对于了解过线程的人而言,线程池的概念并不难理解。为了避免频繁地创建与销毁线程,节省开销,预先创建一定数量的线程作为“池子”,并使用池内的线程来进行使用,节省开销。
  理论并不困难,但在实现上面,由于现代C++自身的晦涩难懂,标准库的难以使用,而且网络上的教程不说人话,使得使用C++开发线程池存在不少困难。本文便理清这些困难,帮助更好使用C++。

thread介绍

  在C++11之后,标准库增加了thread库,使得c++有了原生的线程支持。这里简单介绍一下库的几个方法。
  构造函数:thread只有默认构造函数和移动构造函数,没有拷贝构造。在默认构造中,声明如下:

1
2
template< class Function, class... Args >
explicit thread( Function&& f, Args&&... args );

  它接收函数与参数,并执行。在实际运用中,除了传递函数指针,也可以直接使用lambda表达式。
  方法:thread有多种方法,具体如下:

| Observers | | | ———————————————————— | ———————————————————— | | joinable | checks whether the thread is joinable, i.e. potentially running in parallel context (public member function) | | get_id | returns the id of the thread (public member function) | | native_handle | returns the underlying implementation-defined thread handle (public member function) | | hardware_concurrency[static] | returns the number of concurrent threads supported by the implementation (public static member function) | | Operations | | | join | waits for a thread to finish its execution (public member function) | | detach | permits the thread to execute independently from the thread handle (public member function) | | swap | swaps two thread objects (public member function) |

  在本次中主要使用了detach,即,将线程放开,到后台运行。

实现思路

  可以看到,c11的thread库在线程创建时候便需要与一个函数相关联,无法更改。这里采用的实现思路为:将需要执行的函数当作对象,存入池子中。每个线程先阻塞,在函数存入时便唤醒,获取函数,得到函数参数,并执行该函数,在完成后再次阻塞。
  由此,我们可以得到每个thread的主体内容:
  - 一个指针,指向线程池主体,来获取锁与函数。
  - 函数主体为死循环,在检测到池子内有任务后便获取函数并执行。在检测到函数主题关闭后退出循环。在其他时刻阻塞。
  - thread理应detach,在后台运行。

  根据此流程,我们也可以得到线程池主体控制程序的内容:
  - 锁,用于协调各个线程。
  - 任务队列,用于存放函数。
  - 条件变量,用于唤醒线程。
  - 标识退出的变量。

  基本思路理清,我们可以写出对应代码。

Controller类

  Controller类对应的便是控制程序。如上所述,我们构建相应的成员。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
template <class F, class Args, class Ret>
class Controller
{
public:
std::mutex mu;//锁
std::queue<std::tuple<F, Args, Ret &>> tasks;//任务队列。存放函数的地方。
std::condition_variable cond_v;//条件变量,用于阻塞和唤醒线程
bool closed = false;//标志是否结束
Controller() = default;//默认构造函数
Controller(const Controller &ctrl)//拷贝构造函数。
{
//由于条件变量无法拷贝构造,这里只是占位,并不会实际发生拷贝
}
};

  模板的三个参数分别对应函数主体,函数参数,函数返回值。因此返回值在队列里存放的为左引用。
  拷贝构造函数不实际执行拷贝,只是占位便于代码编写。
  就像之前上一部分所述一样,我们实现了控制类的编写。

ThreadPool 类

  代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70

template <class F, class Args, class Ret>
class ThreadPool
{
public:
Controller<F, Args, Ret> controller;//控制类
std::shared_ptr<Controller<F, Args, Ret>> shr_ptr;//指向控制类的指针 被所有线程共享

//构造函数。构造出给定数量的线程并且放入后台执行。
ThreadPool(size_t size) : controller()
{
this->shr_ptr = std::make_shared<Controller<F, Args, Ret>>(this->controller);
//循环构造给定数量thread 并进入让其执行死循环
for (size_t i = 0; i < size; i++)
{
std::thread([ptr = shr_ptr] {
std::unique_lock<std::mutex> mux(ptr->mu);
for (;;)
{
if (!ptr->tasks.empty())//如果任务非空,则从队列里获取并执行任务
{
auto current = std::move(ptr->tasks.front());
F fun = std::get<0>(current);
Args arg = std::get<1>(current);
Ret &re = std::get<2>(current);
ptr->tasks.pop();
mux.unlock();
re = fun(arg);
mux.lock();
}
else if (ptr->closed)//退出循环
{
break;
}
else//进入阻塞
{
ptr->cond_v.wait(mux);
}
}
}).detach();
}
}

//析构函数 先上锁通知类已经关闭 再利用条件变量通知所有线程
~ThreadPool()
{
if ((bool)shr_ptr)
{
{
std::lock_guard<std::mutex> mu(shr_ptr->mu);
shr_ptr->closed = true;
}

shr_ptr->cond_v.notify_all();
}
}

//执行函数 先上锁写入任务,再通知一个线程进行执行
void excute(F &&f, Args &&args, Ret &ret)
{
{
std::lock_guard<std::mutex> mu(shr_ptr->mu); //上锁 析构时候解锁 便于写入
shr_ptr->tasks.emplace(std::tuple<F, Args, Ret &>(std::forward<F>(f), std::forward<Args>(args), ret));
}

shr_ptr->cond_v.notify_one();
}
};


  如注释所写一般,我们使用c++实现了线程池。

总结

  本次花了一晚上时间,使用c++实现了一个线程池。虽然只是固定数量的线程池,但对于c++并发而言,算是一个很好的入门。就我个人而言,本次也复习了不少c++相关知识,更接触了不少现代c++特性。可以看出,本文在左值右值上并没有过多阐述,因为本人对完美转发掌握尚不熟练。本次部分代码也是依葫芦画瓢。之后的c++学习过程中还得强化概念与使用方法。

Author

王钦砚

Posted on

2020-10-05

Licensed under

CC BY-NC-SA 4.0

Your browser is out-of-date!

Update your browser to view this website correctly.&npsb;Update my browser now

×