多线程之实现非阻塞的事件循环
C++ 中实现主线程非阻塞的事件循环的经典模式,通常被称为“生产者-消费者”模型或基于任务队列的线程池模型。
主线程作为“生产者”,其唯一的任务就是将要执行的“事件”或“任务”快速地放入一个线程安全的队列中,然后立即返回,继续做其他事情(比如响应用户界面)。一个或多个后台的“工作线程”(消费者)则不断地从这个队列中取出任务并执行它们。
这样就完美地达成了你的要求:
- 主线程不阻塞:主线程的工作仅仅是
push
一个任务到队列,这是一个非常快的操作。 - 主线程只负责发消息:主线程调用一个
post
或dispatch
函数来提交任务,但不关心任务何时、由谁执行。 - 能够快速返回:
push
操作完成后,主线程的函数调用就结束了。
下面我将分步讲解如何实现,并提供一个完整的、可运行的 C++11/14/17 示例。
# 核心组件
要实现这个模型,我们需要以下几个关键组件:
线程安全的任务队列 (
SafeQueue
):- 一个标准库的队列
std::queue
来存储任务。 - 一个互斥锁
std::mutex
来保护队列,防止多个线程同时读写造成数据竞争。 - 一个条件变量
std::condition_variable
来让工作线程在队列为空时高效地“睡眠”,而不是空转浪费 CPU。当新任务到来时,主线程会唤醒一个工作线程。
- 一个标准库的队列
任务的表示 (
Task
):- 使用
std::function<void()>
是最灵活的方式。它可以封装任何可调用对象,包括普通函数、Lambda 表达式、成员函数等。
- 使用
事件循环/线程池 (
EventLoop
):- 一个或多个工作线程
std::thread
。这些线程在启动后就进入一个循环,不断地尝试从任务队列中获取并执行任务。 - 一个用于通知线程退出的标志位(例如
std::atomic<bool>
)。 - 一个公共的
post(Task task)
方法,供主线程调用以提交任务。
- 一个或多个工作线程
# 代码实现
下面是一个完整的实现。
# 1. SafeQueue.h
- 线程安全队列
这是一个通用的模板类,可以用于任何类型,这里我们用它来存储 Task
。
#pragma once
#include <queue>
#include <mutex>
#include <condition_variable>
template <typename T>
class SafeQueue {
public:
void push(T value) {
std::lock_guard<std::mutex> lock(m_mutex);
m_queue.push(std::move(value));
m_cond.notify_one(); // 唤醒一个等待的线程
}
// wait_and_pop 会阻塞调用者,直到队列中有元素或停止
bool wait_and_pop(T& value) {
std::unique_lock<std::mutex> lock(m_mutex);
// 使用 wait 的谓词版本,防止虚假唤醒,并处理停止信号
m_cond.wait(lock, [this] { return !m_queue.empty() || m_stop; });
if (m_queue.empty() && m_stop) {
return false; // 队列为空且已收到停止信号
}
value = std::move(m_queue.front());
m_queue.pop();
return true;
}
void stop() {
std::lock_guard<std::mutex> lock(m_mutex);
m_stop = true;
m_cond.notify_all(); // 唤醒所有等待的线程以使其退出
}
private:
std::queue<T> m_queue;
std::mutex m_mutex;
std::condition_variable m_cond;
bool m_stop = false;
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# 2. EventLoop.h
- 事件循环主类
这个类封装了工作线程和任务队列,提供了简单的 post
和 stop
接口。
#pragma once
#include "SafeQueue.h"
#include <functional>
#include <thread>
#include <vector>
#include <iostream>
class EventLoop {
public:
using Task = std::function<void()>;
// 构造函数:创建指定数量的工作线程
explicit EventLoop(size_t num_threads = 1) {
if (num_threads == 0) {
num_threads = std::thread::hardware_concurrency();
}
for (size_t i = 0; i < num_threads; ++i) {
m_threads.emplace_back([this] { worker_loop(); });
}
}
// 析构函数:确保所有线程都已停止并加入
~EventLoop() {
stop();
}
// 禁止拷贝和移动
EventLoop(const EventLoop&) = delete;
EventLoop& operator=(const EventLoop&) = delete;
// 主线程调用的接口:发布一个新任务
void post(Task task) {
m_task_queue.push(std::move(task));
}
// 停止事件循环
void stop() {
m_task_queue.stop(); // 通知队列停止
for (auto& thread : m_threads) {
if (thread.joinable()) {
thread.join();
}
}
}
private:
// 工作线程运行的循环函数
void worker_loop() {
Task task;
// 循环地从队列中获取任务并执行
while (m_task_queue.wait_and_pop(task)) {
try {
task(); // 执行任务
} catch (const std::exception& e) {
std::cerr << "Task threw an exception: " << e.what() << std::endl;
}
}
}
SafeQueue<Task> m_task_queue;
std::vector<std::thread> m_threads;
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# 3. main.cpp
- 使用示例
这个文件展示了主线程如何使用 EventLoop
。
#include "EventLoop.h"
#include <iostream>
#include <chrono>
void some_long_running_task(int id) {
std::cout << "Task " << id << " started on thread " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟耗时操作
std::cout << "Task " << id << " finished." << std::endl;
}
int main() {
// 创建一个拥有 2 个工作线程的事件循环
EventLoop event_loop(2);
std::cout << "Main thread (" << std::this_thread::get_id() << ") is posting tasks." << std::endl;
// 主线程快速发布多个任务
event_loop.post([] { some_long_running_task(1); });
std::cout << "Main thread: Posted task 1. Returning immediately." << std::endl;
event_loop.post([] { some_long_running_task(2); });
std::cout << "Main thread: Posted task 2. Returning immediately." << std::endl;
int data = 42;
event_loop.post([data] {
std::cout << "Task 3 started on thread " << std::this_thread::get_id()
<< " with captured data: " << data << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "Task 3 finished." << std::endl;
});
std::cout << "Main thread: Posted task 3. Returning immediately." << std::endl;
// 主线程可以继续做其他事情,不会被上面的任务阻塞
std::cout << "Main thread continues its own work..." << std::endl;
for (int i = 0; i < 5; ++i) {
std::cout << "Main thread is alive..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
std::cout << "Main thread has finished its work and is now waiting for tasks to complete before exiting." << std::endl;
// EventLoop 对象在 main 函数结束时会自动析构,
// 其析构函数会调用 stop() 并等待所有工作线程结束。
// 你也可以显式调用 event_loop.stop();
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# 如何编译和运行
将以上三个文件(SafeQueue.h
, EventLoop.h
, main.cpp
)放在同一个目录下,使用支持 C++11 或更高版本的编译器进行编译:
g++ -std=c++17 main.cpp -o event_loop_demo -pthread
或者使用 Clang:
clang++ -std=c++17 main.cpp -o event_loop_demo -pthread
运行 ./event_loop_demo
,你将看到类似下面的输出(线程 ID 和任务执行顺序可能不同):
Main thread (0x7f....) is posting tasks.
Main thread: Posted task 1. Returning immediately.
Main thread: Posted task 2. Returning immediately.
Main thread: Posted task 3. Returning immediately.
Main thread continues its own work...
Main thread is alive...
Task 1 started on thread 0x7e....
Task 2 started on thread 0x7d....
Main thread is alive...
Main thread is alive...
Task 3 started on thread 0x7e....
Main thread is alive...
Main thread is alive...
Main thread has finished its work and is now waiting for tasks to complete before exiting.
Task 3 finished.
Task 1 finished.
Task 2 finished.
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 关键点解析
- 非阻塞性:
event_loop.post()
内部只做了一件事:对队列push
一个元素并发出通知。这个过程非常快,几乎是瞬间完成,所以主线程可以立即返回,执行后续代码。 - 线程安全:
SafeQueue
使用std::mutex
保证了在任何时刻只有一个线程能修改队列,避免了数据损坏。 - 高效等待:工作线程在没有任务时,会因为
std::condition_variable::wait
而进入睡眠状态,完全不消耗 CPU 资源。只有当主线程post
新任务并通过notify_one
唤醒它时,它才会醒来工作。 - 优雅退出:
EventLoop
的析构函数或stop()
方法会确保所有已提交的任务执行完毕后,工作线程才会正常退出。join()
调用会阻塞主线程直到工作线程完全结束,这是程序正常退出所必需的。 - 灵活性:
std::function<void()>
和 Lambda 表达式的结合使得提交任务变得极其方便和强大。你可以捕捉上下文变量,传递任意参数。
# EventLoop.h 代码详解
# using Task = std::function<void()>;
using Task = std::function<void()>;
我们来详细解析这行代码:
这行代码是 C++11 引入的 类型别名 (Type Alias) 声明。它的作用是为 std::function<void()>
这个复杂的类型起一个更简单、更具描述性的名字——Task
。
我们可以把它分解成三个部分来理解:
# 1. using ... = ...;
(类型别名)
这是 C++11 中定义类型别名的首选方式,它比旧的 typedef
语法更清晰、更通用(尤其是在模板编程中)。
它的基本语法是:
using NewTypeName = ExistingType;
在这段代码里,NewTypeName
就是 Task
,而 ExistingType
就是 std::function<void()>
。所以,这行代码的作用等同于:
// 旧的 typedef 语法,功能相同
typedef std::function<void()> Task;
2
从现在开始,在代码中任何需要 std::function<void()>
的地方,你都可以直接写 Task
,编译器会把它们看作是完全相同的类型。
# 2. std::function
(通用函数包装器)
std::function
是 C++ 标准库 <functional>
中的一个非常强大的工具。它是一个通用的、多态的函数包装器。
可以把它想象成一个“万能盒子”,这个盒子里可以存放 任何可以像函数一样被调用的东西(我们称之为“可调用对象”)。
这些“可调用对象”包括:
- 普通函数 (Free Functions)
- Lambda 表达式 (Lambdas)
- 类的成员函数 (Member Functions)
- 函数对象 (Functors,即重载了
operator()
的类)
std::function
的强大之处在于,它抹平了这些不同可调用对象之间的类型差异,让你能用统一的方式来存储和调用它们。
# 3. <void()>
(函数签名)
std::function
是一个模板类,你需要通过模板参数来告诉它,你想装入的“盒子”里的函数应该长什么样。这个“长什么样”就是函数的 签名 (Signature)。
函数签名包括 返回值类型 和 参数列表。
在 std::function<void()>
中:
void
: 表示这个函数包装器期望的返回值类型是void
(即不返回任何值)。()
: 表示这个函数包装器期望的参数列表是空的(即调用时不需要传递任何参数)。
所以,std::function<void()>
这个完整的类型代表了**“一个不返回任何值且不接受任何参数的可调用对象”**。
# 总结:这行代码的用处是什么?
结合以上三点,using Task = std::function<void()>;
这行代码的用处是:
抽象和简化: 它为
std::function<void()>
这个又长又复杂的类型创建了一个简洁、有意义的别名Task
。这使得代码更易读、更易懂。比较一下:// 使用别名前 SafeQueue<std::function<void()>> m_task_queue; void post(std::function<void()> task); // 使用别名后 SafeQueue<Task> m_task_queue; void post(Task task);
1
2
3
4
5
6
7第二种写法显然意图更清晰:“一个存放任务的安全队列”和“一个提交任务的函数”。
提供极高的灵活性: 因为
Task
是std::function<void()>
的别名,所以事件循环的post
方法可以接受任何符合“无参、无返回值”签名的可调用对象。这使得事件循环的功能变得非常强大。示例:
EventLoop loop; // 1. 提交一个普通函数 void print_hello() { std::cout << "Hello!" << std::endl; } loop.post(print_hello); // 2. 提交一个 Lambda 表达式 (最常用) int value = 123; loop.post([value] { std::cout << "The value is " << value << std::endl; // Lambda可以捕获外部变量,功能非常强大 }); // 3. 提交一个类的成员函数 class MyWorker { public: void do_work() { std::cout << "Worker is working." << std::endl; } }; MyWorker worker; loop.post([&worker] { worker.do_work(); }); // 通过Lambda包装
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20如你所见,尽管
print_hello
、lambda 表达式和worker.do_work()
在 C++ 中是完全不同的类型,但std::function
可以将它们全部“装”进Task
这个统一的类型中,让事件循环能够无差别地处理它们。便于维护: 如果将来你需要改变所有任务的签名,比如要求所有任务都接受一个
int
类型的 ID,你只需要修改一处地方即可:// 只需要修改这一行 using Task = std::function<void(int)>;
1
2所有使用
Task
别名的地方都会自动更新,而无需在整个代码库中进行繁琐的查找和替换。
# explicit EventLoop(size_t num_threads = 1) 构造函数
// 构造函数:创建指定数量的工作线程
explicit EventLoop(size_t num_threads = 1) {
if (num_threads == 0) {
num_threads = std::thread::hardware_concurrency();
}
for (size_t i = 0; i < num_threads; ++i) {
m_threads.emplace_back([this] { worker_loop(); });
}
}
2
3
4
5
6
7
8
9
这段代码是 EventLoop
类的构造函数,它的核心作用是初始化并启动处理后台任务的工作线程。我们来逐行解析一下。
# 1. 构造函数签名
explicit EventLoop(size_t num_threads = 1)
explicit
:这是一个 C++ 关键字,用于防止编译器进行不经意的隐式类型转换。例如,它能阻止你意外地写出EventLoop loop = 5;
这样的代码。这是一种良好的编程习惯,能让代码更安全、意图更明确。size_t num_threads = 1
:这定义了构造函数的参数num_threads
,它用来指定要创建多少个工作线程。size_t
是一种适合表示大小和计数的无符号整数类型。= 1
是一个默认值。这意味着,如果你在创建EventLoop
对象时不指定线程数(例如EventLoop my_loop;
),它将默认创建一个工作线程。
# 2. 自动检测线程数
if (num_threads == 0) {
num_threads = std::thread::hardware_concurrency();
}
2
3
这是一个非常实用的特殊处理逻辑。它允许用户通过传递 0
来表达“请自动为我选择一个合适的线程数”。
std::thread::hardware_concurrency()
:这个静态函数会返回一个建议值,表示当前硬件环境支持的并发线程数量。这个值通常等于你电脑 CPU 的核心数或超线程数。- 逻辑:如果用户传入
0
,程序就会自动获取硬件支持的线程数,并用这个值来创建线程池。这使得程序能更好地适应不同的运行环境,充分利用硬件性能。
# 3. 创建并启动线程
for (size_t i = 0; i < num_threads; ++i) {
m_threads.emplace_back([this] { worker_loop(); });
}
2
3
这是整个构造函数的核心,负责真正地创建和启动线程。
for (...)
:这个循环会执行num_threads
次,每次循环创建一个新的线程。m_threads
:这应该是EventLoop
类的一个成员变量,通常是一个std::vector<std::thread>
,用来存放和管理所有被创建出来的线程对象。.emplace_back(...)
:这是std::vector
的一个高效方法,它会在容器的末尾就地构造一个新的元素。在这里,它直接在m_threads
向量的末尾构造一个std::thread
对象,避免了不必要的拷贝,效率更高。emplace_back
会调用std::thread
的构造函数来创建新线程。在 C++ 中,std::thread
对象的构造函数一旦成功执行,一个新的线程就会立即被创建并开始运行。它不像 Java 或 C# 那样需要一个单独的.start()
方法来启动。[this] { worker_loop(); }
:这是一个 Lambda 表达式,也是最关键的部分。它定义了每个新线程启动后应该执行的代码。[this]
:这是 Lambda 的捕获列表。this
代表指向当前EventLoop
对象的指针。捕获this
指针后,Lambda 内部的代码就有权访问这个EventLoop
对象的成员函数和变量了。{ worker_loop(); }
:这是 Lambda 的函数体。它只做一件事:调用当前对象的worker_loop()
成员函数。这个worker_loop()
函数内部通常会包含一个无限循环,不断地从任务队列中取出任务并执行。
# void worker_loop() 工作线程运行的循环函数
void worker_loop() {
Task task;
// 循环地从队列中获取任务并执行
while (m_task_queue.wait_and_pop(task)) {
try {
task(); // 执行任务
} catch (const std::exception& e) {
std::cerr << "Task threw an exception: " << e.what() << std::endl;
}
}
}
2
3
4
5
6
7
8
9
10
11
这段 worker_loop()
函数是 每个工作线程的核心。在 EventLoop
的构造函数中创建的所有线程,一旦启动,就会立即开始执行这个函数。可以把它看作是每个后台“工人”的工作指令和日常循环。
它的主要职责是:不断地从共享的任务队列中获取任务,然后安全地执行它。
# 1. 定义任务容器
Task task;
- 在循环开始之前,先定义一个
Task
类型的变量task
。 Task
是std::function<void()>
的别名,所以task
变量就像一个可复用的“容器”,专门用来存放从队列中取出的待执行任务。- 在循环外部定义它可以避免在每次循环迭代时都重新构造和销毁这个对象,是一个微小的性能优化。
# 2. 线程的生命周期循环
while (m_task_queue.wait_and_pop(task)) {
// ... 执行任务 ...
}
2
3
- 这是整个工作线程的生命周期循环。只要
while
的条件为true
,线程就会一直存活并等待新任务。当条件变为false
时,循环结束,线程函数返回,该线程也就正常退出了。 m_task_queue.wait_and_pop(task)
是这里的关键。这个函数调用做了两件核心事情:- 高效等待:如果任务队列
m_task_queue
是空的,线程会在这里阻塞(睡眠),不会空转浪费 CPU 资源。它会一直等到队列中有新任务被放入,或者收到停止信号。 - 获取任务并判断:一旦线程被唤醒,
wait_and_pop
会尝试从队列中取出一个任务,并将其存入task
变量中。- 如果成功获取到一个任务,该函数返回
true
,while
循环继续执行。 - 如果失败(通常是因为收到了停止信号且队列已空),该函数返回
false
,while
循环将终止。
- 如果成功获取到一个任务,该函数返回
- 高效等待:如果任务队列
# 3. 安全地执行任务
try {
task(); // 执行任务
} catch (const std::exception& e) {
std::cerr << "Task threw an exception: " << e.what() << std::endl;
}
2
3
4
5
- 这是一个健壮性设计,确保了线程的稳定运行。
try { task(); }
:task()
这行代码会执行刚刚从队列中取出的任务。因为task
是一个std::function
对象,所以可以用函数调用的语法()
来执行它所包装的任何可调用对象(如 Lambda 表达式、普通函数等)。
catch (const std::exception& e)
:- 如果在执行任务(
task()
)的过程中,任务代码抛出了异常,try-catch
块会捕获这个异常。 - 捕获异常后,程序会向标准错误流
std::cerr
打印一条错误信息,而不会导致整个工作线程崩溃。 - 这一点至关重要:它防止了因为单个任务的失败而导致整个线程池损失一个工作线程,保证了系统的稳定性。处理完异常后,
while
循环会继续,线程会尝试获取下一个任务。
- 如果在执行任务(
# EventLoop.h
#pragma once
#include <queue>
#include <mutex>
#include <condition_variable>
template <typename T>
class SafeQueue {
public:
void push(T value) {
std::lock_guard<std::mutex> lock(m_mutex);
m_queue.push(std::move(value));
m_cond.notify_one(); // 唤醒一个等待的线程
}
// wait_and_pop 会阻塞调用者,直到队列中有元素或停止
bool wait_and_pop(T& value) {
std::unique_lock<std::mutex> lock(m_mutex);
// 使用 wait 的谓词版本,防止虚假唤醒,并处理停止信号
m_cond.wait(lock, [this] { return !m_queue.empty() || m_stop; });
if (m_queue.empty() && m_stop) {
return false; // 队列为空且已收到停止信号
}
value = std::move(m_queue.front());
m_queue.pop();
return true;
}
void stop() {
std::lock_guard<std::mutex> lock(m_mutex);
m_stop = true;
m_cond.notify_all(); // 唤醒所有等待的线程以使其退出
}
private:
std::queue<T> m_queue;
std::mutex m_mutex;
std::condition_variable m_cond;
bool m_stop = false;
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
首先,要明白这个类的存在意义。
C++ 标准库里的 std::queue
是一个很普通的队列容器,它不是线程安全的。这意味着,如果你在程序中有多个线程,一个线程正在往队列里 push
数据,同时另一个线程正在 pop
数据,程序很可能会崩溃或产生无法预料的错误。这被称为竞争条件 (Race Condition)。
SafeQueue
的目标就是封装一个 std::queue
,并为它加上线程同步机制,使得你可以在多个线程中同时、安全地对它进行操作,而不会出问题。
# 核心组件(私有成员变量)
要实现线程安全,SafeQueue
依赖于三个核心的同步工具和一个状态标志:
std::queue<T> m_queue;
- 这是真正存放数据的底层队列。所有的数据都存储在这里。
std::mutex m_mutex;
(互斥锁)mutex
的意思是“互斥”。你可以把它想象成一把门锁。- 任何线程在访问
m_queue
之前,都必须先获得这把锁。一旦一个线程锁住了它,其他任何线程再想获取这把锁时,就必须在门口排队等待,直到前一个线程释放锁。 - 作用:保证在任何一个时刻,只有一个线程能够访问和修改
m_queue
,从而避免了数据混乱。
std::condition_variable m_cond;
(条件变量)- 这是实现高效等待的关键。你可以把它想象成一个**“通知铃”或“等待室”**。
- 当一个消费者线程(想
pop
数据的线程)发现队列是空的,它没必要一直忙着“加锁->检查->解锁->再加锁...”(这叫忙等,非常浪费 CPU)。 - 相反,它可以调用
m_cond.wait()
,把自己**“挂起”(进入睡眠状态),并临时释放它持有的锁**。它会在这个“等待室”里休息,直到被别人唤醒。 - 当一个生产者线程(
push
数据的线程)成功放入一个数据后,它会调用m_cond.notify_one()
来按响“通知铃”,唤醒一个正在“等待室”里休息的线程。
bool m_stop = false;
(停止标志)- 这是一个简单的布尔标志,用于通知所有线程:我们的程序要关闭了,大家可以下班了。这对于实现优雅退出至关重要。
# 公共接口(成员函数)
# 1. void push(T value)
- 生产者
这个函数负责向队列中安全地添加元素。
void push(T value) {
std::lock_guard<std::mutex> lock(m_mutex); // 1. 自动加锁
m_queue.push(std::move(value)); // 2. 操作队列
m_cond.notify_one(); // 3. 通知一个等待者
}
2
3
4
5
std::lock_guard<std::mutex> lock(m_mutex);
: 这是个非常巧妙的 RAII 技巧。当lock
对象被创建时,它会自动锁住m_mutex
。当push
函数结束,lock
对象被销毁时,它会自动解锁m_mutex
。这能确保锁一定会被释放,即使发生异常。m_queue.push(...)
: 在持有锁的情况下,安全地向队列中添加一个元素。std::move
本身并不移动任何数据,它只是改变了对象的 "值类别"(将左值转换为右值),为后续的移动语义(move semantics)创造条件。当对象被转换为右值后,编译器会优先选择该对象的移动构造函数或移动赋值运算符(如果存在),从而实现资源的转移而非复制,提高性能m_cond.notify_one()
: 按下“通知铃”,告诉一个可能因队列为空而正在等待的线程:“嘿,有新任务来了,你可以醒来工作了!”
# 2. void stop()
- 关闭开关
这个函数用于通知整个队列停止工作。
void stop() {
std::lock_guard<std::mutex> lock(m_mutex); // 1. 加锁
m_stop = true; // 2. 设置停止标志
m_cond.notify_all(); // 3. 唤醒所有等待者
}
2
3
4
5
- 同样,先加锁以保证线程安全。
- 将
m_stop
标志设置为true
。 m_cond.notify_all()
: 唤醒所有正在等待的线程。这一点非常重要,因为程序要关闭了,不能只唤醒一个线程让它退出,而让其他线程永远地等下去。必须叫醒所有人,让他们检查m_stop
标志,然后一起下班回家。
# 3. bool wait_and_pop(T& value)
- 消费者
这是最复杂但也是最核心的函数,负责安全地从队列中取出元素。
bool wait_and_pop(T& value) {
std::unique_lock<std::mutex> lock(m_mutex); // 1. 使用更灵活的锁
// 2. 等待条件满足
m_cond.wait(lock, [this] { return !m_queue.empty() || m_stop; });
// 3. 再次检查,处理关闭情况
if (m_queue.empty() && m_stop) {
return false;
}
// 4. 成功获取数据
value = std::move(m_queue.front());
m_queue.pop();
return true;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
std::unique_lock<std::mutex> lock(m_mutex);
: 这里用unique_lock
而不是lock_guard
。因为condition_variable
在等待时需要临时解锁互斥锁(允许其他线程push
数据),并在被唤醒后重新加锁。unique_lock
提供了这种灵活性,而lock_guard
做不到。m_cond.wait(lock, ...)
: 这是最关键的一步。wait
函数会检查第二个参数——一个 Lambda 表达式(作为判断条件)。- 如果条件为
false
(即m_queue
为空且m_stop
为false
),wait
会原子地做三件事:- 解锁
lock
。 - 让当前线程进入睡眠。
- 当被
notify
唤醒时,重新加锁lock
,并再次检查条件。
- 解锁
- 如果条件为
true
(即队列不为空,或者收到了停止信号),wait
函数直接返回,线程继续向下执行,并且持有锁。 - 使用带判断条件的
wait
版本可以完美处理“虚假唤醒”(线程无故被唤醒)问题,并整合了停止逻辑。
if (m_queue.empty() && m_stop)
: 线程被唤醒后,有两种可能:一是有新任务了,二是收到了停止信号。这个if
判断的就是第二种情况:我是因为要下班了才被唤醒的,并且队列里确实已经没活干了。此时应该返回false
,通知调用者(worker_loop
)退出循环。value = std::move(...)
: 如果代码能执行到这里,说明一定是成功获取到了一个任务。把任务从队列中取出,然后返回true
。
# 总结
SafeQueue
通过 mutex
保证了同一时间只有一个线程能操作队列,通过 condition_variable
实现了高效的生产者-消费者同步(有任务时通知,没任务时等待),并通过一个 stop
标志实现了优雅的线程退出。它是构建多线程任务处理系统的基石。