线程同步:锁、条件变量、信号量
不同于进程拥有独立的地址,同一进程内的线程除了线程栈外其他数据都是共享的,所以我们要着重关注线程安全问题。线程安全技术包括原子操作和线程同步。原子操作通过确保不可分割的汇编指令实现了线程执行的安全性,这种方法高效,但只适用于较为简单的场景,例如修改某个变量、实现计数等场景。本文着重关注线程同步的方法。
在多线程编程中,线程同步是保证多个线程能够正确、安全地访问共享资源的关键技术。由于线程的调度是不确定的,不加控制的并发访问可能会导致数据竞争(race condition),最终引发不可预测的错误或崩溃。
由此,线程同步的核心目标是确保多个线程对共享资源的访问是有序的、互斥的,防止多个线程同时对同一个资源进行读写操作而导致数据不一致或资源破坏。
针对不同的场景,我们常用这些手段来实现线程同步:锁、信号量、条件变量。
锁
锁(Lock)是最常见的线程同步机制,用于保证同一时刻只有一个线程能访问共享资源。锁通过保护临界区(critical section)来防止多个线程同时修改共享数据,从而避免数据竞争。
常见的锁类型包括:互斥锁(mutex)、读写锁(read-write lock)、自旋锁(spinlock)。互斥锁是最基本的线程同步原语,读写锁和自旋锁分别针对大量读场景和短时间持有锁两种情况做了特定优化,在实际编程中应根据具体场景选择锁类型。
互斥锁
互斥锁是最基本的锁机制,一个线程获得锁后,其他线程必须等待,直到锁被释放。互斥锁保证了同一时间只有一个线程能够访问共享资源。
实现用例
#include <iostream>
#include <thread>
#include <mutex>
std::mutex mtx; // 定义互斥锁
int count = 0;
void increment() {
for (int i = 0; i < 100000; ++i) {
// 通过加锁保证了count在同一时刻仅会被一个线程操作,但每次count计数都会加锁解锁,将导致严重的性能问题
mtx.lock();
++count;
mtx.unlock();
}
}
int main() {
std::thread t1(increment);
std::thread t2(increment);
t1.join();
t2.join();
std::cout << "Final count: " << count << std::endl;
}
优点
- 简单:实现简单,易于理解;
- 保护临界资源:有效地保护临界区,确保同一时间只有一个线程可以访问共享资源。
缺点
- 可能导致死锁:如果线程在持有A锁的情况下等待B锁,而B锁又被另一个线程持有,并且也在等待A锁,这将会产生死锁;
- 性能消耗:频繁地锁定和解锁操作会造成性能开销;
- 优先级反转:高优先级进程可能会因为等待低优进程释放锁而阻塞,导致性能下降。
读写锁
读写锁有三种状态:读锁、写锁、不加锁。
- 某线程申请了读锁:其他线程可以再申请读锁,但不能申请写锁;
- 某线程申请了写锁:其他线程不允许申请读锁或写锁,即独占锁。
优点
- 提高并发性:允许多个读操作同时进行,只在写操作时才完全互斥,适用于读多写少的场景;
- 分离读写控制:读写锁能够更好地控制对共享资源的访问,读操作不会阻塞其他读操作。
缺点
- 复杂度:相对于互斥锁,读写锁的实现和使用较为复杂;
- 写操作饥饿:如果持续有读操作,写操作可能会长时间等待,导致写操作“饥饿”。
自旋锁
自旋锁与互斥锁类似,但是它不会使线程进入阻塞状态,而是持续循环检测锁的状态(轮询),直到获取锁。
优点
- 无阻塞:线程在尝试获取锁时不会进行上下文切换,适用于锁只会被持有很短时间的场景;
- 避免线程切换:由于线程不会进入睡眠状态,因此可以减少线程切换的开销。
缺点
- CPU消耗:如果锁被持有时间较长,自旋锁会导致CPU空转,浪费处理器资源;
- 不公平:某些自旋锁的实现可能导致线程饥饿或优先级反转问题。
条件变量
条件变量允许线程等待某个条件满足后再继续执行,常用于生产者-消费者问题。它结合互斥锁一起使用,等待线程在条件满足时被唤醒,解决了互斥锁仅有锁定或非锁定两种状态的问题。
实现用例:在C++中引入条件变量
- 包含头文件
<condition_variable>
和<mutex>
; - 创建一个互斥锁
std::mutex
和一个条件变量std::condition_variable
; - 在需要等待条件的线程中,首先锁定互斥锁,然后检查条件是否满足。如果不满足条件,则调用条件变量的
wait
方法,同时会自动解锁互斥锁,使其他线程可以访问共享资源。当其他线程通知条件变量时,等待的线程会被唤醒,并重新锁定互斥锁; - 在通知条件的线程中,锁定互斥锁,修改共享资源使得条件满足,然后调用条件变量的
notify_one
或notify_all
方法来通知等待的线程。
/*
生产者线程不断生产数据并放入队列中,当队列满时,生产者线程等待。消费者线程不断从队列中取出数据,当队列为空时,消费者线程等待。通过条件变量和互斥锁的配合,实现了生产者和消费者的同步。
*/
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
// 互斥锁,用于保护共享资源 data_queue
std::mutex mutex_;
// 条件变量,用于线程间的等待和通知
std::condition_variable condition_;
// 存储数据的队列,作为生产者和消费者共享的资源
std::queue<int> data_queue;
// 队列的最大容量
const int max_queue_size = 5;
// 生产者函数
void producer() {
for (int i = 0; i < 10; ++i) {
// 创建 unique_lock,在构造时自动锁定互斥锁
std::unique_lock<std::mutex> lock(mutex_);
// 当队列已满时,等待条件变量的通知
while (data_queue.size() == max_queue_size) {
condition_.wait(lock);
}
// 将数据放入队列
data_queue.push(i);
std::cout << "Produced: " << i << std::endl;
// 通知消费者线程,可能有数据可供消费了
condition_.notify_one();
}
}
// 消费者函数
void consumer() {
while (true) {
// 创建 unique_lock,在构造时自动锁定互斥锁
std::unique_lock<std::mutex> lock(mutex_);
// 当队列为空时,等待条件变量的通知
while (data_queue.empty()) {
condition_.wait(lock);
}
// 取出队列中的数据
int data = data_queue.front();
data_queue.pop();
std::cout << "Consumed: " << data << std::endl;
// 通知生产者线程,可能有空间可以生产新数据了
condition_.notify_one();
}
}
int main() {
// 创建生产者线程并执行 producer 函数
std::thread producer_thread(producer);
// 创建消费者线程并执行 consumer 函数
std::thread consumer_thread(consumer);
// 等待生产者线程结束
producer_thread.join();
// 等待消费者线程结束
consumer_thread.join();
return 0;
}
信号量
信号量(Semaphore)用于控制访问资源的线程数量,常用于限制某一资源的并发访问次数。C++ 标准库中没有直接提供信号量,应通过第三方库或者使用自定义实现。
分类
- 二元信号量(Binary Semaphore):与互斥锁类似,值只允许为 0 或 1,用于确保只有一个线程访问资源;
- 计数信号量(Counting Semaphore):可以允许多个线程同时访问,信号量的值表示可供访问的资源数量。
与锁的关系
- 锁主要是为了互斥,保证只有一个线程能访问资源;信号量则可以限制多个线程并发访问,适合资源有限的场景;
- 信号量在控制资源数量时更加灵活,而锁一般用于单一资源的保护。
自定义信号量实现
实现一个基本的信号量基类应包含这些能力:计数、获取、释放。
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>
// 信号量类
class Semaphore {
private:
std::mutex mutex_; // 互斥锁,用于同步对计数的访问
std::condition_variable condition_; // 条件变量,用于线程等待和通知
size_t count_; // 计数,表示可用资源的数量
public:
// 构造函数,初始化信号量计数
Semaphore(size_t count = 1) : count_(count) {}
// 获取信号量,减少计数,如果计数小于等于0,则线程等待
void acquire() {
std::unique_lock<std::mutex> lock(mutex_);
// 使用条件变量的wait函数等待,直到计数大于0
condition_.wait(lock, [this] { return count_ > 0; });
// 减少计数,表示获取了一个资源
--count_;
}
// 释放信号量,增加计数,并通知一个等待的线程
void release() {
std::lock_guard<std::mutex> lock(mutex_);
// 增加计数,表示释放了一个资源
++count_;
// 通知一个等待的线程(如果有)
condition_.notify_one();
}
};
int shared_resource = 0; // 共享资源
Semaphore semaphore(3); // 信号量,允许最多3个线程同时访问资源
// 工作线程函数
void worker() {
semaphore.acquire(); // 获取信号量,进入临界区
// 执行一些操作,比如增加共享资源
std::cout << "Thread " << std::this_thread::get_id() << " is accessing the shared resource." << std::endl;
++shared_resource; // 增加共享资源的值
std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟耗时操作
semaphore.release(); // 释放信号量,离开临界区
}
int main() {
std::vector<std::thread> threads; // 存储线程的向量
// 创建并启动多个线程
for (int i = 0; i < 10; ++i) {
threads.emplace_back(worker);
}
// 等待所有线程完成
for (auto& thread : threads) {
thread.join();
}
// 打印共享资源的最终值
std::cout << "Final value of shared resource: " << shared_resource << std::endl;
return 0;
}