多线程之 std::condition_variable
# C++ 中 std::condition_variable
的使用详解与示例
在 C++ 的多线程编程中,std::condition_variable
是一个至关重要的同步原语。它允许一个或多个线程等待某个特定条件成立。当另一个线程修改了该条件并发出通知时,等待的线程将被唤醒并继续执行。这在解决许多并发问题时非常有用,尤其是经典的“生产者-消费者”问题。
本文将详细介绍 std::condition_variable
的概念、核心函数,并通过一个完整的生产者-消费者示例来展示其具体用法。
# 核心概念
想象一个场景:一个线程(消费者)需要处理由另一个线程(生产者)产生的数据。如果当前没有数据可供处理,消费者线程应该暂停执行,而不是在一个循环中空耗 CPU 资源去反复检查是否有新数据。当生产者线程产生了新数据后,它需要一种方式来通知正在等待的消费者线程。
std::condition_variable
正是为此而生。它与一个互斥锁(std::mutex
)协同工作,以实现线程间的等待和通知机制。
- 等待 (Waiting): 一个线程可以阻塞(暂停执行)在一个条件变量上,直到被通知。
- 通知 (Notifying): 另一个线程在满足了特定条件后,可以通知一个或所有正在该条件变量上等待的线程。
# 主要成员函数
std::condition_variable
包含在 <condition_variable>
头文件中。以下是其最核心的三个成员函数:
wait(std::unique_lock<std::mutex>& lock)
:- 调用此函数的线程必须首先获得一个
std::unique_lock
管理的互斥锁。 - 当
wait
被调用时,它会自动释放传入的锁,并阻塞当前线程。 - 当线程被唤醒时(通过
notify_one
或notify_all
),wait
函数会重新尝试获取锁。一旦成功获取锁,函数才会返回。 - 虚假唤醒 (Spurious Wakeup):
wait
函数有时可能会在没有收到任何通知的情况下被唤醒。为了处理这种情况,wait
通常被放在一个while
循环中,循环检查等待的条件是否真正满足。
- 调用此函数的线程必须首先获得一个
wait(std::unique_lock<std::mutex>& lock, Predicate pred)
:- 这是
wait
的一个重载版本,它接受一个返回布尔值的可调用对象(通常是 Lambda 表达式)作为第二个参数。 - 这个版本等价于:
while (!pred()) { wait(lock); }
1
2
3 - 它能自动处理虚假唤醒问题,只有当
pred
返回true
时,wait
才会返回。这是推荐的使用方式。
- 这是
notify_one()
:- 唤醒一个正在该条件变量上等待的线程。
- 如果有多个线程在等待,只会唤醒其中一个(具体是哪一个由操作系统调度决定)。
- 如果没有任何线程在等待,此调用将不产生任何效果。
notify_all()
:- 唤醒所有正在该条件变量上等待的线程。
关键搭档:std::mutex
与 std::unique_lock
std::condition_variable
的操作(wait
, notify_one
, notify_all
)必须与 std::mutex
结合使用,以保护共享数据(即那个“条件”)。wait
函数强制要求使用 std::unique_lock
而不是 std::lock_guard
,因为它需要在等待期间释放锁,并在被唤醒后重新获取锁,而 std::unique_lock
提供了这种灵活的锁定和解锁能力。
# 示例:生产者-消费者问题
下面我们通过一个经典的生产者-消费者模型来演示 std::condition_variable
的实际应用。在这个模型中:
- 生产者 (Producer) 线程负责向一个共享队列中添加数据。
- 消费者 (Consumer) 线程负责从该队列中取出数据进行处理。
- 共享队列的最大容量是有限的。
同步条件:
- 当队列满时,生产者必须等待,直到消费者取走数据腾出空间。
- 当队列空时,消费者必须等待,直到生产者加入新的数据。
# 代码实现
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <chrono>
// 共享数据和同步原语
std::queue<int> data_queue;
std::mutex mtx;
std::condition_variable cv;
const unsigned int MAX_QUEUE_SIZE = 5;
// 生产者线程函数
void producer(int id) {
for (int i = 0; i < 10; ++i) {
int value = id * 100 + i;
{
// 获取锁
std::unique_lock<std::mutex> lock(mtx);
// 当队列已满时,等待消费者通知
// 使用 Lambda 表达式作为 wait 的第二个参数,防止虚假唤醒
cv.wait(lock, [] { return data_queue.size() < MAX_QUEUE_SIZE; });
// 生产数据
data_queue.push(value);
std::cout << "Producer " << id << " produced: " << value << ". Queue size is now " << data_queue.size() << std::endl;
} // lock 在此处自动释放
// 通知一个可能在等待的消费者
cv.notify_one();
// 模拟生产耗时
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
// 消费者线程函数
void consumer(int id) {
for (int i = 0; i < 10; ++i) {
int value;
{
// 获取锁
std::unique_lock<std::mutex> lock(mtx);
// 当队列为空时,等待生产者通知
cv.wait(lock, [] { return !data_queue.empty(); });
// 消费数据
value = data_queue.front();
data_queue.pop();
std::cout << "Consumer " << id << " consumed: " << value << ". Queue size is now " << data_queue.size() << std::endl;
} // lock 在此处自动释放
// 通知一个可能在等待的生产者
cv.notify_one();
// 模拟消费耗时
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
int main() {
std::thread p1(producer, 1);
std::thread c1(consumer, 1);
p1.join();
c1.join();
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# 代码剖析
共享资源:
data_queue
是生产者和消费者共享的数据队列。mtx
是一个互斥锁,用于保护对data_queue
的访问,防止多个线程同时修改它导致数据竞争。cv
是条件变量,用于协调生产者和消费者的等待与通知。
生产者
producer
:- 在每次循环中,首先通过
std::unique_lock<std::mutex> lock(mtx);
获取互斥锁。 - 接下来是关键的一步:
cv.wait(lock, [] { return data_queue.size() < MAX_QUEUE_SIZE; });
wait
检查 Lambda 表达式[] { return data_queue.size() < MAX_QUEUE_SIZE; }
的返回值。- 如果队列未满(表达式返回
true
),wait
直接返回,线程继续执行。 - 如果队列已满(表达式返回
false
),wait
会原子地释放锁lock
并使当前生产者线程进入阻塞(等待)状态。
- 当线程从
wait
返回(意味着条件满足且已重新获得锁)后,它安全地向队列中添加数据。 - 在离开
{}
作用域时,unique_lock
会自动释放互斥锁。 - 最后,调用
cv.notify_one()
。这会唤醒一个可能因队列为空而正在等待的消费者线程。
- 在每次循环中,首先通过
消费者
consumer
:- 逻辑与生产者非常相似。
- 它也先获取同一个互斥锁。
- 调用
cv.wait(lock, [] { return !data_queue.empty(); });
来等待队列中有数据。- 如果队列为空,
wait
释放锁并阻塞消费者线程。
- 如果队列为空,
- 当被唤醒且条件满足后,它从队列中取出数据。
- 离开作用域后锁被释放。
- 调用
cv.notify_one()
,这可能会唤醒一个因队列已满而正在等待的生产者线程。
# 总结
std::condition_variable
是 C++ 并发编程中实现线程间高效协作的强大工具。正确使用它可以避免线程忙等待(busy-waiting),从而显著降低 CPU 消耗,提升程序性能。
使用 std::condition_variable
的核心要点:
- 始终与
std::mutex
配对使用,以保护共享的条件状态。 - 使用
std::unique_lock
来管理互斥锁。 - 在
while
循环中调用wait
或使用带有谓词(Predicate)的wait
重载版本,以处理虚假唤醒。 - 在修改了共享条件并释放锁之后,通过
notify_one()
或notify_all()
来通知其他等待线程。