线程同步 —— Muduo 网络库笔记(2)
2025-03-23

声明:本文为个人阅读《Linux 多线程服务端编程:使用 muduo C++ 网络库》的读书笔记,文中部分标题可能有些更改,但主体内容和书中一致。如有侵权,告知即删。

并发编程有两种模型。运行在多台机器上的多个进程的分布式场景中,通常使用的是 message passing。在单机上也可以使用 message passing 模型作为多个进程的并发模型,更容易保证程序的正确性。

书中总结了线程同步的四项原则,按重要性排列:

  1. 最低限度地共享对象,减少需要同步的场合。共享对象优先考虑 immutable 对象。
  2. 优先使用高级的并发编程数据结构,如 TaskQueue、Producer-Consumer Queue、CountDownLatch 等等。
  3. 迫不得已使用底层同步原语时,只使用互斥锁,慎用可重入锁,不使用信号量。
  4. 除了 atomic 整数,建议不要自行设计编写 lock-free 代码,不要使用“内核级”同步原语。不要凭空猜测不同原语性能的优异,例如 spin lock 和 mutex。
  5. 很多人误认为用锁会让程序变慢,其实真正影响性能的不是锁,而是锁争用。

2.1 互斥锁(mutex)

任何一个时刻最多只有一个线程在此 mutex 划出的临界区内活动。作者给出的使用原则有:

  1. 用 RAII 手法使用 mutex,例如 std::lock_guard
  2. 不要使用可重入锁。
  3. 不要自行调用 lock()unlock() 函数,一切交给栈上的 Guard 对象。这种做法通常成为 Scoped Locking,能避免重复加锁、忘记解锁、多次解锁等蠢货操作。
  4. 使用 Guard 对象发生死锁时,看函数调用栈就能分析加锁的顺序。
  5. 必要时可以通过 PTHREAD_MUTEX_ERRORCHECK 来排错。

2.1.1 只使用不可重入的 mutex

在 POSIX 的叫法中,mutex 分为递归(recursive)和非递归(non-recursive)两种。当然也可以叫可重入(reentrant)和不可重入(non-reentrant)。可重入锁唯一的特点是:同一个线程可以重复对 recursive mutex 加锁。

这不是为了性能,而是为了体现良好的设计意图,可以帮助我们思考代码对锁的请求,在编码阶段发生设计问题。

书中举了一个例子,代码如下。post() 函数会加锁,traverse() 函数也会加锁。未来某天,foo.dot() 间接地调用了 post() 函数:

  1. mutex 是不可重入的,于是死锁了。
  2. mutex 是可重入的,foos.push_back(foo) 存在 crash 的可能。

若是不可重入 mutex,发生死锁或 crash 只需要将各个线程的调用栈打印出来分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
std::mutex mutex;
std::vector<Foo> foos;

void post(const Foo& foo) {
std::lock_guard<std::mutex> lock(mutex);
foos.push_back(foo);
}

void traverse() {
std::lock_guard<std::mutex> lock(mutex);
for (auto& foo : foos) {
foo.doit();
}
}

如果一个函数可能在加锁的情况下调用,又可能在未加锁的情况下调用,那么就应该拆成两个函数:

  1. 一个函数跟原来函数同名,加锁之后调用第二个函数。
  2. 第二个函数加个后缀,不加锁,执行原有函数逻辑。

类似与这样:

1
2
3
4
5
6
7
8
void post(const Foo& f) {
std::lock_guard<std::mutex> lock(mutex);
postWithLockHold(f);
}

void postWithLockHold(const Foo& f) {
foos.push_back(f);
}

这里可能会出现两个问题:

  1. 误用了加锁版本,死锁了。将各个线程的调用栈打印出来分析即可。
  2. 误用了不加锁版本,数据损坏了。如果在 postWithLockHold 里能检测是否上锁就比较好办,像 muduo::MutexLock 提供了 isLockedByThisThread() 方法,只需要 assert(mutex.isLockedByThisThread()) 就好。

性能上,Linux 的 Pthreads mutex 采用 futex 实现,加锁解锁时不必每次都陷入系统调用。

2.1.2 死锁可能会发生意想不到的地方

有一个 Inventory 的类,记录当前的 Request 对象。它的 addremove 成员函数都是线程安全的,使用了 mutex_ 保护共享数据 requests_

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class Inventory {
public:
void add(Request* req) {
std::lock_guard<std::mutex> lock(mutex);
requests_.insert(req);
}
void remove(Request* req) {
std::lock_guard<std::mutex> lock(mutex);
requests_.erase(req);
}

void printAll() const {
std::lock_guard<std::mutex> lock(mutex);
for (auto req : requests_) {
req->print();
}
}

private:
mutable std::mutex mutex_;
std::set<Request*> requests_;
};

Inventory g_inventory; // 简单演示,全局变量

Request 类与 Inventory 类的交互逻辑也很简单,在 process 函数中,往全局 g_inventory 中添加自己,析构时从 g_inventory 移除自己。目前看起来整个程序都很线程安全。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class Request {
public:
void process() __attribute__((noinline)) {
std::lock_guard<std::mutex> lock(mutex_);
g_inventory.add(this);
}

~Request() __attribute__((noinline)) {
std::lock_guard<std::mutex> lock(mutex_);
g_inventory.remove(this);
}

void print() __attribute__((noinline)) {
std::lock_guard<std::mutex> lock(mutex_);
// .....
}

private:
std::mutex mutex_;
};

这两个类实际上存在发生死锁的可能性,书中通过 gdb 查看两个线程的函数调用栈,发现两个线程都等在 mutex 上___lll_lock_wait,发生了死锁。

thread 用于管理 GDB 中的线程相关操作。 apply 对指定线程或所有线程执行后续的 GDB 命令。 all 表示表示命令作用于​所有线程。 bt 即 backtrace,用于打印当前线程的函数调用堆栈信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
$ gdb ./mutual_deadlock core # gdb <可执行程序名> <崩溃时产生的 core 文件>
(gdb) thread apply all bt # ​打印所有线程的调用堆栈信息

Thread 1 (Thread 31229): # 这是 main() 线程
#0 __lll_lock_wait() at /opt/labs/deepunix/x86_64/lowlevellock.S:136
#1 _L_lock_953() from /lib/libpthread.so.0
#2 _pthread_mutex_lock(mutex=0xecd150) at pthread_mutex_lock.c:61
#3 muduo::MutexLock::lock(this=0xecd150) at test/Mutex.h:49
#4 muduo::MutexLockGuard(this=0xecd150) at test/Mutex.h:75
#5 Request::print(this=0xecd150) at test/MutualDeadLock.cc:51
#6 Inventory::printAll(this=0x605aa0) at test/MutualDeadLock.cc:67
#7 0x00000000000403368 in main() at test/MutualDeadLock.cc:84

Thread 2 (Thread 31230): # 这是 threadFunc() 线程
#0 __lll_lock_wait() at /opt/labs/deepunix/x86_64/lowlevellock.S:136
#1 _L_lock_953() from /lib/libpthread.so.0
#2 _pthread_mutex_lock(mutex=0x605aa0) at pthread_mutex_lock.c:61
#3 muduo::MutexLock::lock(this=0x605aa0) at test/Mutex.h:49
#4 muduo::MutexLockGuard(this=0x605aa0) at test/Mutex.h:75
#5 Inventory::remove(this=0x605aa0, req=0x80) at test/MutualDeadLock.cc:19
#6 ~Request(this=0xecd150) at test/MutualDeadLock.cc:46
#7 threadFunc() at test/MutualDeadLock.cc:76
#8 boost::function<void()>::operator()(this=0x7fff2lc10310) at /usr/include/boost/function/detail/function_template.hpp:1013
#9 muduo::Thread::runInThread(this=0x7ff21c10310) at Thread.cc:113
#10 muduo::Thread::startThread(obj=0x605aa0) at Thread.cc:105
#11 start_thread() at thread_create.c:300
#12 clone() at ../sysdeps/unix/sysv/linux/x86_64/clone.S:112

我们可以注意到 main() 线程先调用了 Inventory::printAll(#6) 再调用 Request::print(#5)threadFunc() 线程先调用 Request::~Request(#6) 再调用 Inventory::remove(#5)。这两个调用序列对两个 mutex 的加锁顺序正好相反,造成了经典的死锁(加锁顺序不一致)。

思考:如图,如果 printAll() 晚于 remove() 执行,还会出现死锁吗?

答案:不会。printAll() 会等待 remove() 释放锁再执行。

思考Invetory::printAllRequest::print 有没有可能与 Request::processInventory::add 发生死锁?

答案:不会。Invetory::printAllRequest::print 时,后者 Request 还未加入到 requests_ 中,无法获取未 Inventory::add 的锁。

2.2 条件变量(Condition Variable)

如果线程需要等待某个条件成立后再执行,我们应该使用条件变量(Condition Variable)。对于 wait 端:

  1. 必须与 mutex 一起使用,等待条件的布尔表达式的读写也要被 mutex 保护。
  2. 在 mutex 上锁的时候才能调用 wait()
  3. 要警惕 spurious wakeup。要么把 wait() 放到 while 循环中,要么将判断布尔表达式 lambda 函数传入 wait() 第二个参数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
std::mutex mutex;
std::condition_variable cv;
std::deque<int> queue;

int dequeue() {
std::unique_lock<std::mutex> lock(mutex);

// 第一种写法
while (queue.empty()) {
cv.wait(lock);
}
// 第二种写法
cv.wait(lock, [&] {
return !queue.empty();
});

assert(!queue.empty());
int top = queue.front();
queue.pop_front();

return top;
}

对于 signal/broadcast 端:

  1. 不需要理会 mutex 的上锁状态就可以调用 signal。
  2. 在 signal 之前要修改布尔表达式。
  3. 修改布尔表达式通常要用 mutex 保护。
  4. 区分 signal 和 broadcast。(signal 通常用于表示资源可用,broadcast 通常表明状态变化)
1
2
3
4
5
void enqueue(int x) {
std::unique_lock<std::mutex> lock(mutex);
queue.push_back(x);
cv.notify_one(); // 它可以不在临界区内
}

思考enqueue() 中每次添加元素都会调用 notify(),如果改成只在 queue.size()01 的时候才调用 notify(),会出现什么后果?

答案:若多个 dequeue() 都在等待队列非空,此时有多个 enqueue() 连续调用,只会调用一次 notify(),唤醒了其中一个线程,而其他线程将会等不到下一个 notify(),将一直阻塞下去。

条件变量是非常底层的同步原语,很少直接使用。一般都是用来实现高层的同步措施。下面举一个 CountDownLatch 的例子,类似 Go 中的 WaitGroup。它主要有两种用途:

  1. 主线程等待多个子线程完成初始化。
  2. 多个子线程等待主线程发出“起跑”命令。

代码使用条件变量实现也非常清晰明了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class CountDownLatch {
public:
explicit CountDownLatch(int count) : count_(count) {}

void wait() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this] {
return count_ == 0; // true 才能结束 wait
});
}

void countDown() {
std::lock_guard<std::mutex> lock(mutex_);
--count_;
if (count_ == 0) {
cv_.notify_all();
}
}

private:
std::mutex mutex_;
std::condition_variable cv_;
int count_;
};

2.3 不要用读写锁和信号量

最好不要使用读写锁,因为:

  1. 可能会在持有 read lock 的时候修改了共享数据,此时和无 mutex 并发读写共享数据是一样的。
  2. 读写锁不见得比互斥锁开销小,read lock 还需要额外更新当前 reader 的数目。如果临界区足够小,mutex 往往会更快。
  3. read lock 可能允许提升未 write lock,也可能不允许提升。但最好不要这么做,如果多个 read lock 尝试提升,会造成和可重入锁一样的问题,堆栈难以查明问题。
  4. 追求低延时读取的场景也不适合用读写锁。

如果遇到并发读写,可以考虑 copy-on-write,同时避免 reader 被 writer 阻塞。

作者表明,他还没有遇到过需要使用信号量的场景,使用条件变量配合 mutex 完全可以替代它的功能,而且更不容易出错。信号量的另一个问题在于它也有自己的计数值,通常我们自己的数据结构也有自己的长度值,这就造成了同样的信息存了两份,需要时刻保持一致,额外增加了程序员的负担。如果要控制并发读,可以考虑用线程池

2.4 使用 Pthread 封装 MutexLock、MutexLockGuard、Condition

MutexLock 默认创建 PTHREAD_MUTEX_DEFAULT 类型,预想的应该是 PTHREAD_MUTEX_NORMAL 类型。严格做法应该是传入 mutexattr 参数来显示指定 mutex 类型。

这段代码也没有检查 pthread 调用的返回值。检查返回值的意义是防止 ENOMEM 之类的资源不足的情况,只有在负载很重的应用中才会出现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class MutexLock : boost::noncopyable {
public:
MutexLock() : holder_(0) {
pthread_mutex_init(&mutex_, NULL);
}

~MutexLock() {
assert(holder_ == 0);
pthread_mutex_destroy(&mutex_);
}

bool isLockedByThisThread() const {
return holder_ == pthread_self();
}

void assertLocked() const {
assert(isLockedByThisThread());
}

// 仅供 MutexLockGuard 调用
void lock() {
pthread_mutex_lock(&mutex_);
holder_ = pthread_self();
}

// 仅供 MutexLockGuard 调用
void unlock() {
holder_ = 0;
pthread_mutex_unlock(&mutex_);
}

// 仅供 Condition 调用
pthread_mutex_t* getPthreadMutex() {
return &mutex_;
}

private:
pthread_mutex_t mutex_;
pid_t holder_;
};

MutexLockGuard 最后一行的定义的宏,是防止出现这样的语句:MutexLockGuard(mutex);。遗漏的变量名,产生一个临时对象后立马被销毁了,没有做到保护临界区的作用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class MutexLockGuard : boost::noncopyable {
public:
explicit MutexLockGuard(MutexLock& mutex) : mutex_(mutex) {
mutex_.lock();
}

~MutexLockGuard() {
mutex_.unlock();
}

private:
MutexLock& mutex_;
};

#define MutexLockGuard(x) static_assert(false, "missing mutex guard var name")

Pthread 的 condition variable 允许在 wait() 的时候指定 mutex。标准库中的 condition_variable 可以在 wait() 的时候指定 mutex。书中指出想不到 condition variable 会和不同 mutex 配合使用的场景,所以 muduo 实现时从简设计。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class Condition : boost::noncopyable {
public:
explicit Condition(MutexLock& mutex) : mutex_(mutex) {
pthread_cond_init(&pcond_, NULL);
}

~Condition() {
pthread_cond_destroy(&pcond_);
}

void wait() {
pthread_cond_wait(&pcond_, mutex_.getPthreadMutex());
}

void notify() {
pthread_cond_signal(&pcond_);
}

void notifyAll() {
pthread_cond_broadcast(&pcond_);
}

private:
MutexLock& mutex_;
pthread_cond_t pcond_;
};

如果一个 class 要包含 MutexLock 和 Condition,请注意声明顺序 MutexLock 必须在前:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class CountDownLatch {
public:
explicit CountDownLatch(int count)
: count_(count),
mutex_(),
condition_(mutex_) {}

void wait() {
MutexLockGuard lock(mutex_);
while (count_ > 0) {
condition_.wait();
}
}

void countDown() {
}

private:
mutable MutexLock mutex_; // 必须在 mutex 前
Condition condition_;
int count_;
};

2.5 使用 pthread_once 实现线程安全的 Singleton

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
template <typename T>
class Singleton : boost::noncopyable {
public:
static T& instance() {
pthread_once(&ponce_, &Singleton::init);
return *value_;
}

private:
Singleton() {}
~Singleton() {}

static void init() {
value_ = new T();
}

static pthread_once_t ponce_;
static T* value_;
};

template <typename T>
pthread_once_t Singleton<T>::ponce_ = PTHREAD_ONCE_INIT;

template <typename T>
T* Singleton<T>::value_ = nullptr;

使用方式也非常简单,但目前仅支持默认构造函数。如果需要指定构造方式,还需要额外的封装。

1
Foo& foo = Singleton<Foo>::instance();

当然用 std::call_once 也可以:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
template <typename T>
class Singleton : boost::noncopyable {
public:
static T& instance() {
std::call_once(init_flag_, &Singleton::init);
return *instance_;
}

private:
Singleton() {}
~Singleton() {}

static void init() {
instance_ = std::unique_ptr<T>(new T());
}

static std::once_flag init_flag_;
static std::unique_ptr<T> instance_;
};

template <typename T>
std::once_flag Singleton<T>::init_flag_ = std::once_flag{};

template <typename T>
std::unique_ptr<T> Singleton<T>::instance_ = nullptr;

2.6 sleep() 只应该出现在测试代码中

它不具备 memory barrier 语意,不保证内存的可见性。它应该只出现在测试代码中,加速复现死锁的情况。如果线程需要等待某个事件发生,应该采用条件变量或 IO 事件回调。采用 sleep 轮询浪费了 CPU 资源。

2.7 借用 shared_ptr 实现 copy-on-write

下面的代码中,我们做到了:

  1. read 不阻塞 read。
  2. read 不阻塞 write。
  3. write 不阻塞 read。
  4. write 阻塞 write。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
std::mutex mutex;
std::shared_ptr<std::vector<Foo>> g_foos;

void traverse() {
std::shared_ptr<std::vector<Foo>> local_foos;

{ // 读取时,先获取指向当前对象的 shared_ptr
std::lock_guard<std::mutex> lock(mutex);
local_foos = g_foos;
assert(!g_foos.unique());
}

for (auto it = local_foos->begin(); it != local_foos->end(); ++it) {
it->doit();
}
}

void post(const Foo& foo) {
std::lock_guard<std::mutex> lock(mutex);

if (!g_foos.unique()) {
// 写时,若发现有其他线程正在读取,
// 则将 Foo 拷贝构造一个新的,reset 至 g_foos 中
g_foos.reset(new std::vector<Foo>(*g_foos));
}

assert(g_foos.unique());
g_foos->push_back(foo);
}

错误 1:直接修改 g_foos 指向的 shared_ptr。这段代码只阻止了 read 拷贝指针,没有阻止 read 读,会留下 shared_ptr 同时读写的隐患。

1
2
3
4
void post(const Foo& foo) {
std::lock_guard<std::mutex> lock(mutex);
g_foos->push_back(foo);
}

错误 2:试图缩小临界区,把 copy 移出临界区。这段代码在多次 write 并发时,可能只有一次 write 生效。

1
2
3
4
5
6
7
void post(const Foo& foo) {
std::shared_ptr<std::vector<Foo>> new_foos(new std::vector<Foo>(*g_foos));
new_foos->push_back(foo);

std::lock_guard<std::mutex> lock(mutex);
g_foos = new_foos;
}

错误 3:把临界区拆成两个小的,把 copy 放到临界区之外。这段代码和上面一样,多次 write 并发时,可能只有一次 write 生效。

1
2
3
4
5
6
7
8
9
10
11
void post(const Foo& foo) {
std::shared_ptr<std::vector<Foo>> old_foos;
{
std::lock_guard<std::mutex> lock(mutex);
old_foos = g_foos;
}
old_foos->push_back(foo);

std::lock_guard<std::mutex> lock(mutex);
g_foos = old_foos;
}

通过 copy-on-write,我们完全不需要读写锁,仅需要 mutex。对于 2.1.2 中的死锁问题,使用这种 copy-on-write 办法,非常简单就解决了。