《C++ Concurrency in Action》笔记
2022/1/28 20:10:13
本文主要是介绍《C++ Concurrency in Action》笔记,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
《C++ Concurrency in Action》笔记
- 1 你好,C++的并发世界
- 1.1 何谓并发
- 1.1.1 计算机系统中的并发
- 1.1.2 并发的途径
- 多进程并发
- 多线程并发
- 1.2 为什么使用并发?
- 1.2.1 为了分离关注点
- 1.2.2 为了性能
- 1.2.3 什么时候不使用并发
- 1.3 C++中的并发和多线程
- 1.3.1 C++多线程历史
- 1.3.2 新标准支持并发
- 1.3.3 C++线程库的效率
- 1.3.4 平台相关的工具
- 1.4 开始入门
- 2 线程管理
- 2.1 线程管理的基础
- 2.1.1 启动线程
- 2.1.2 等待线程完成
- 2.1.3 特殊情况下的等待
- 2.1.4 后台运行线程
- 2.2 向线程函数传递参数
- 2.3 转移线程所有权
- 2.4 运行时决定线程数量
- 2.5 标识线程
- 3 线程间共享数据
- 3.1 共享数据带来的问题
- 3.1.1 条件竞争
- 3.1.2 避免恶性条件竞争
- 3.2 使用互斥量保护共享数据
- 3.2.1 C++中使用互斥量
- 3.2.2 精心组织代码来保护共享数据
- 3.2.3 发现接口内在的条件竞争
- 3.2.4 死锁:问题描述及解决方案
- 3.2.5 避免死锁的进阶指导
- 3.2.6 std::unique_lock——灵活的锁
- 3.2.7 不同域中互斥量所有权的传递
- 3.2.8 锁的粒度
- 3.3 保护共享数据的替代设施
- 3.3.1 保护共享数据的初始化过程
- 3.3.2 保护很少更新的数据结构
- 3.3.3 嵌套锁
- 4 同步并发操作
- 4.1 等待一个事件或其他条件
- 4.1.1 等待条件达成
- 4.1.2 使用条件变量构建线程安全队列
- 4.2 使用期望等待一次性事件
- 5 C++内存模型和原子类型操作
- 6 基于锁的并发数据结构设计
- 7 无锁并发数据结构设计
- 8 并发代码设计
- 9 高级线程管理
- 10 多线程程序的测试和调试
- 重点
- 线程std::thread对象必须调用join或detach
- shared_mutex也是基于操作系统底层的读写锁pthread_rwlock_t的封装
- 参考
英文书籍《C++ Concurrency in Action》国内有不同人翻译,有一版是4个译者翻译的,豆瓣知乎风评一般。陈晓伟翻译的不错,下面是我对该译者书籍的读书笔记。
1 你好,C++的并发世界
1.1 何谓并发
最简单和最基本的并发,是指两个或更多独立的活动同时发生。
并发在生活中随处可见,我们可以一边走路一边说话,也可以两只手同时作不同的动作,还有我们每个人都过着相互独立的生活——当我在游泳的时候,你可以看球赛,等等。
1.1.1 计算机系统中的并发
计算机领域的并发指的是在单个系统里同时执行多个独立的任务,而非顺序的进行一些活动。
系统通过任务调度,在时间片微粒度下,做任务切换,让用户感觉不到间隙,这种也被称为并发。
多处理器计算机可实现硬件并发。
图1.2显示了四个任务在双核处理器上的任务切换,仍然是将任务整齐地划分为同等大小块的理想情况。实际上,许多因素会使得分割不均和调度不规则。
1.1.2 并发的途径
途径一:一个进程内含一个线程,交互是进程间通信;
途径二:一个进程内含多个线程,交互是线程间通信。
多进程并发
如图1.3所示,独立的进程可以通过进程间常规的通信渠道传递讯息(信号、套接字、文件、管道等等)。不过,这种进程之间的通信通常不是设置复杂,就是速度慢,这是因为操作系统会在进程间提供了一定的保护措施,以避免一个进程去修改另一个进程的数据。还有一个缺点是,运行多个进程所需的固定开销:需要时间启动进程,操作系统需要内部资源来管理进程,等等。
当然,以上的机制也不是一无是处:操作系统在进程间提供附加的保护操作和更高级别的通信机制,意味着可以更容易编写安全的并发代码。实际上,在类似于Erlang的编程环境中,将进程作为并发的基本构造块。
使用多进程实现并发还有一个额外的优势———可以使用远程连接(可能需要联网)的方式,在不同的机器上运行独立的进程。虽然,这增加了通信成本,但在设计精良的系统上,这可能是一个提高并行可用行和性能的低成本方式。
多线程并发
并发的另一个途径,在单个进程中运行多个线程。线程很像轻量级的进程:每个线程相互独立运行,且线程可以在不同的指令序列中运行。但是,进程中的所有线程都共享地址空间,并且所有线程访问到大部分数据———全局变量仍然是全局的,指针、对象的引用或数据可以在线程之间传递。虽然,进程之间通常共享内存,但是这种共享通常是难以建立和管理的。因为,同一数据的内存地址在不同的进程中是不相同。图1.4展示了一个进程中的两个线程通过共享内存进行通信。
地址空间共享,以及缺少线程间数据的保护,使得操作系统的记录工作量减小,所以使用多线程相关的开销远远小于使用多个进程。不过,共享内存的灵活性是有代价的:如果数据要被多个线程访问,那么程序员必须确保每个线程所访问到的数据是一致的(在本书第3、4、5和8章中会涉及,线程间数据共享可能会遇到的问题,以及如何使用工具来避免这些问题)。问题并非无解,只要在编写代码时适当地注意即可,这同样也意味着需要对线程通信做大量的工作。
1.2 为什么使用并发?
主要原因有两个:关注点分离(SOC)和性能。
1.2.1 为了分离关注点
编写软件时,分离关注点是个好主意;通过将相关的代码与无关的代码分离,可以使程序更容易理解和测试,从而减少出错的可能性。即使一些功能区域中的操作需要在同一时刻发生的情况下,依旧可以使用并发分离不同的功能区域;若不显式地使用并发,就得编写一个任务切换框架,或者在操作中主动地调用一段不相关的代码。
1.2.2 为了性能
多处理器系统,现在越来越普遍。如果想要利用日益增长的计算能力,那就必须设计多任务并发式软件。
两种方式利用并发提高性能。一是业务逻辑并行,即任务并行(task parallelism),二是数据并行(data parallelism),如图像处理。
1.2.3 什么时候不使用并发
基本上,不使用并发的唯一原因就是,收益比不上成本。并发代码增加开发成本,更高复杂度增加出错风险。除非潜在的性能增益足够大或关注点分离地足够清晰,否则,别用并发。
此外,线程是有限的资源。如果让太多的线程同时运行,则会消耗很多操作系统资源,从而使得操作系统整体上运行得更加缓慢。不仅如此,因为每个线程都需要一个独立的堆栈空间,所以运行太多的线程也会耗尽进程的可用内存或地址空间。对于一个可用地址空间为4GB(32bit)的平坦架构的进程来说,这的确是个问题:如果每个线程都有一个1MB的堆栈(很多系统都会这样分配),那么4096个线程将会用尽所有地址空间,不会给代码、静态数据或者堆数据留有任何空间。即便64位(或者更大)的系统不存在这种直接的地址空间限制,但其他资源有限:如果你运行了太多的线程,最终也是出会问题的。尽管线程池(参见第9章)可以用来限制线程的数量,但这也并不是什么灵丹妙药,它也有自己的问题。
当客户端/服务器(C/S)应用在服务器端为每一个链接启动一个独立的线程,对于少量的链接是可以正常工作的,但当同样的技术用于需要处理大量链接的高需求服务器时,也会因为线程太多而耗尽系统资源。在这种场景下,使用线程池可以对性能产生优化(参见第9章)。
1.3 C++中的并发和多线程
通过多线程为C++并发提供标准化支持是件新鲜事。只有在C++11标准下,才能编写不依赖平台扩展的多线程代码。了解C++线程库中的众多规则前,先来了解一下其发展的历史。
1.3.1 C++多线程历史
C++98(1998)标准不承认线程的存在,并且各种语言要素的操作效果都以顺序抽象机的形式编写。不仅如此,内存模型也没有正式定义,所以在C++98标准下,没办法在缺少编译器相关扩展的情况下编写多线程应用程序。
当然,编译器供应商可以自由地向语言添加扩展,添加C语言中流行的多线程API———POSIX标准中的C标准和Microsoft Windows API中的那些———这就使得很多C++编译器供应商通过各种平台相关扩展来支持多线程。所以C++本身不支持并行语法时,程序员们已经编写了大量的C++多线程程序了。
1.3.2 新标准支持并发
C++11新标准中不仅有了一个全新的线程感知内存模型,C++标准库也扩展了:包含了用于管理线程(参见第2章)、保护共享数据(参见第3章)、线程间同步操作(参见第4章),以及低级原子操作(参见第5章)的各种类。
新C++线程库很大程度上,是基于上文提到的C++类库的经验积累。特别是,Boost线程库作为新类库的主要模型,很多类与Boost库中的相关类有着相同名称和结构。随着C++标准的进步,Boost线程库也配合着C++标准在许多方面做出改变,因此之前使用Boost的用户将会发现自己非常熟悉C++11的线程库。
新的C++标准直接支持原子操作,允许程序员通过定义语义的方式编写高效的代码,从而无需了解与平台相关的汇编指令。
1.3.3 C++线程库的效率
为了效率,C++类整合了一些底层工具。这样就需要了解相关使用高级工具和使用低级工具的开销差,这个开销差就是抽象代价(abstraction penalty)。该类库在大部分主流平台上都能实现高效(带有非常低的抽象代价)。
1.3.4 平台相关的工具
在C++线程库中提供一个native_handle()
成员函数,允许通过使用平台相关API直接操作底层实现。就其本质而言,任何使用native_handle()
执行的操作都是完全依赖于平台的。
1.4 开始入门
#include <iostream> #include <thread> //① void hello() //② { std::cout << "Hello Concurrent World\n"; } int main() { std::thread t(hello); //③ t.join(); //④ }
2 线程管理
2.1 线程管理的基础
2.1.1 启动线程
void do_some_work(); std::thread my_thread(do_some_work);
class background_task { public: void operator()() const { do_something(); do_something_else(); } }; background_task f; std::thread my_thread(f);
有件事需要注意,当把函数对象传入到线程构造函数中时,需要避免“最令人头痛的语法解析”(C++’s most vexing parse, 中文简介)。如果你传递了一个临时变量,而不是一个命名的变量;C++编译器会将其解析为函数声明,而不是类型对象的定义。
例如:
std::thread my_thread(background_task());
这里相当与声明了一个名为my_thread的函数,这个函数带有一个参数(函数指针指向没有参数并返回background_task对象的函数),返回一个std::thread
对象的函数,而非启动了一个线程。
使用在前面命名函数对象的方式,或使用多组括号①,或使用新统一的初始化语法②,可以避免这个问题。
如下所示:
std::thread my_thread((background_task())); // 1 std::thread my_thread{background_task()}; // 2
使用lambda表达式也能避免这个问题。lambda表达式是C++11的一个新特性,它允许使用一个可以捕获局部变量的局部函数(可以避免传递参数,参见2.2节)。想要具体的了解lambda表达式,可以阅读附录A的A.5节。之前的例子可以改写为lambda表达式的类型:
std::thread my_thread([]{ do_something(); do_something_else(); });
启动了线程,你需要明确是要等待线程结束(加入式——参见2.1.2节),还是让其自主运行(分离式——参见2.1.3节)。如果std::thread
对象销毁之前还没有做出决定,程序就会终止(std::thread
的析构函数会调用std::terminate()
)。因此,即便是有异常存在,也需要确保线程能够正确的_加入_(joined)或_分离_(detached)。
清单2.1 函数已经结束,线程依旧访问局部变量
struct func { int& i; func(int& i_) : i(i_) {} void operator() () { for (unsigned j=0 ; j<1000000 ; ++j) { do_something(i); // 1. 潜在访问隐患:悬空引用 } } }; void oops() { int some_local_state=0; func my_func(some_local_state); std::thread my_thread(my_func); my_thread.detach(); // 2. 不等待线程结束 } // 3. 新线程可能还在运行
这个例子中,已经决定不等待线程结束(使用了detach()②),所以当oops()函数执行完成时③,新线程中的函数可能还在运行。如果线程还在运行,它就会去调用do_something(i)函数①,这时就会访问已经销毁的变量。如同一个单线程程序——允许在函数完成后继续持有局部变量的指针或引用;当然,这从来就不是一个好主意——这种情况发生时,错误并不明显,会使多线程更容易出错。
处理这种情况的常规方法:使线程函数的功能齐全,将数据复制到线程中,而非复制到共享数据中。如果使用一个可调用的对象作为线程函数,这个对象就会复制到线程中,而后原始对象就会立即销毁。但对于对象中包含的指针和引用还需谨慎,例如清单2.1所示。使用一个能访问局部变量的函数去创建线程是一个糟糕的主意(除非十分确定线程会在函数完成前结束)。此外,可以通过join()函数来确保线程在函数完成前结束。
2.1.2 等待线程完成
如果需要等待线程,相关的std::thread
实例需要使用join()。在这种情况下,因为原始线程在其生命周期中并没有做什么事,使得用一个独立的线程去执行函数变得收益甚微,但在实际编程中,原始线程要么有自己的工作要做;要么会启动多个子线程来做一些有用的工作,并等待这些线程结束。
这意味着,只能对一个线程使用一次join();一旦已经使用过join(),std::thread
对象就不能再次加入了,当对其使用joinable()时,将返回false。
2.1.3 特殊情况下的等待
如果想要分离一个线程,可以在线程启动后,直接使用detach()进行分离。如果打算等待对应线程,通常,当倾向于在无异常的情况下使用join()时,需要在异常处理过程中调用join(),从而避免应用被抛出的异常所终止。
清单 2.2 等待线程完成
struct func; // 定义在清单2.1中 void f() { int some_local_state=0; func my_func(some_local_state); std::thread t(my_func); try { do_something_in_current_thread(); } catch(...) { t.join(); // 1 throw; } t.join(); // 2 }
代码使用了try/catch
块确保访问本地状态的线程退出后,函数才结束。
一种方式是使用“资源获取即初始化方式”(RAII,Resource Acquisition Is Initialization),并且提供一个类,在析构函数中使用join(),如同下面清单中的代码。看它如何简化f()函数。
清单 2.3 使用RAII等待线程完成
class thread_guard { std::thread& t; public: explicit thread_guard(std::thread& t_): t(t_) {} ~thread_guard() { if(t.joinable()) // 1 { t.join(); // 2 } } thread_guard(thread_guard const&)=delete; // 3 thread_guard& operator=(thread_guard const&)=delete; }; struct func; // 定义在清单2.1中 void f() { int some_local_state=0; func my_func(some_local_state); std::thread t(my_func); thread_guard g(t); do_something_in_current_thread(); } // 4
2.1.4 后台运行线程
使用detach()会让线程在后台运行,这就意味着主线程不能与之产生直接交互。也就是说,不会等待这个线程结束;如果线程分离,那么就不可能有std::thread
对象能引用它,分离线程的确在后台运行,所以分离线程不能被加入。不过C++运行库保证,当线程退出时,相关资源的能够正确回收,后台线程的归属和控制C++运行库都会处理。
通常称分离线程为_守护线程_(daemon threads),UNIX中守护线程是指,没有任何显式的用户接口,并在后台运行的线程。这种线程的特点就是长时间运行;线程的生命周期可能会从某一个应用起始到结束,可能会在后台监视文件系统,还有可能对缓存进行清理,亦或对数据结构进行优化。
std::thread t(do_background_work); t.detach(); assert(!t.joinable());
清单2.4 使用分离线程去处理其他文档
void edit_document(std::string const& filename) { open_document_and_display_gui(filename); while(!done_editing()) { user_command cmd=get_user_input(); if(cmd.type==open_new_document) { std::string const new_name=get_filename_from_user(); std::thread t(edit_document,new_name); // 1 t.detach(); // 2 } else { process_user_input(cmd); } } }
2.2 向线程函数传递参数
向std::thread
构造函数中的可调用对象,或函数传递一个参数很简单。需要注意的是,默认参数要拷贝到线程独立内存中,即使参数是引用的形式,也可以在新线程中进行访问。
void f(int i, std::string const& s); std::thread t(f, 3, "hello");
代码创建了一个调用f(3, “hello”)的线程。注意,函数f需要一个std::string
对象作为第二个参数,但这里使用的是字符串的字面值,也就是char const *
类型。之后,在线程的上下文中完成字面值向std::string
对象的转化。需要特别要注意,当指向动态变量的指针作为参数传递给线程的情况,代码如下:
void f(int i,std::string const& s); void oops(int some_param) { char buffer[1024]; // 1 sprintf(buffer, "%i",some_param); std::thread t(f,3,buffer); // 2 t.detach(); }
这种情况下,buffer②是一个指针变量,指向本地变量,然后本地变量通过buffer传递到新线程中②。并且,函数有很有可能会在字面值转化成std::string
对象之前崩溃(oops),从而导致一些未定义的行为。并且想要依赖隐式转换将字面值转换为函数期待的std::string
对象,但因std::thread
的构造函数会复制提供的变量,就只复制了没有转换成期望类型的字符串字面值。
解决方案就是在传递到std::thread
构造函数之前就将字面值转化为std::string
对象:
void f(int i,std::string const& s); void not_oops(int some_param) { char buffer[1024]; sprintf(buffer,"%i",some_param); std::thread t(f,3,std::string(buffer)); // 使用std::string,避免悬垂指针 t.detach(); }
还可能遇到相反的情况:期望传递一个引用,但整个对象被复制了。当线程更新一个引用传递的数据结构时,这种情况就可能发生,比如:
void update_data_for_widget(widget_id w,widget_data& data); // 1 void oops_again(widget_id w) { widget_data data; std::thread t(update_data_for_widget,w,data); // 2 display_status(); t.join(); process_widget_data(data); // 3 }
虽然update_data_for_widget①的第二个参数期待传入一个引用,但是std::thread
的构造函数②并不知晓;构造函数无视函数期待的参数类型,并盲目的拷贝已提供的变量。当线程调用update_data_for_widget函数时,传递给函数的参数是data变量内部拷贝的引用,而非数据本身的引用。因此,当线程结束时,内部拷贝数据将会在数据更新阶段被销毁,且process_widget_data将会接收到没有修改的data变量③。对于熟悉std::bind
的开发者来说,问题的解决办法是显而易见的:可以使用std::ref
将参数转换成引用的形式,从而可将线程的调用改为以下形式:
std::thread t(update_data_for_widget,w,std::ref(data));
在这之后,update_data_for_widget就会接收到一个data变量的引用,而非一个data变量拷贝的引用。
如果你熟悉std::bind
,就应该不会对以上述传参的形式感到奇怪,因为std::thread
构造函数和std::bind
的操作都在标准库中定义好了,可以传递一个成员函数指针作为线程函数,并提供一个合适的对象指针作为第一个参数:
class X { public: void do_lengthy_work(); }; X my_x; std::thread t(&X::do_lengthy_work,&my_x); // 1
这段代码中,新线程将my_x.do_lengthy_work()作为线程函数;my_x的地址①作为指针对象提供给函数。也可以为成员函数提供参数:std::thread
构造函数的第三个参数就是成员函数的第一个参数,以此类推(代码如下,译者自加)。
class X { public: void do_lengthy_work(int); }; X my_x; int num(0); std::thread t(&X::do_lengthy_work, &my_x, num);
有趣的是,提供的参数可以移动,但不能拷贝。"移动"是指:原始对象中的数据转移给另一对象,而转移的这些数据就不再在原始对象中保存了(译者:比较像在文本编辑的"剪切"操作)。std::unique_ptr
就是这样一种类型(译者:C++11中的智能指针),这种类型为动态分配的对象提供内存自动管理机制(译者:类似垃圾回收)。同一时间内,只允许一个std::unique_ptr
实现指向一个给定对象,并且当这个实现销毁时,指向的对象也将被删除。移动构造函数(move constructor)和移动赋值操作符(move assignment operator)允许一个对象在多个std::unique_ptr
实现中传递(有关"移动"的更多内容,请参考附录A的A.1.1节)。使用"移动"转移原对象后,就会留下一个空指针(NULL)。移动操作可以将对象转换成可接受的类型,例如:函数参数或函数返回的类型。当原对象是一个临时变量时,自动进行移动操作,但当原对象是一个命名变量,那么转移的时候就需要使用std::move()
进行显示移动。下面的代码展示了std::move
的用法,展示了std::move
是如何转移一个动态对象到一个线程中去的:
void process_big_object(std::unique_ptr<big_object>); std::unique_ptr<big_object> p(new big_object); p->prepare_data(42); std::thread t(process_big_object,std::move(p));
在std::thread
的构造函数中指定std::move(p)
,big_object对象的所有权就被首先转移到新创建线程的的内部存储中,之后传递给process_big_object函数。
标准线程库中和std::unique_ptr
在所属权上有相似语义类型的类有好几种,std::thread
为其中之一。虽然,std::thread
实例不像std::unique_ptr
那样能占有一个动态对象的所有权,但是它能占有其他资源:每个实例都负责管理一个执行线程。执行线程的所有权可以在多个std::thread
实例中互相转移,这是依赖于std::thread
实例的可移动且不可复制性。不可复制保性证了在同一时间点,一个std::thread
实例只能关联一个执行线程;可移动性使得程序员可以自己决定,哪个实例拥有实际执行线程的所有权。
2.3 转移线程所有权
假设要写一个在后台启动线程的函数,想通过新线程返回的所有权去调用这个函数,而不是等待线程结束再去调用;或完全与之相反的想法:创建一个线程,并在函数中转移所有权,都必须要等待线程结束。总之,新线程的所有权都需要转移。
这就是移动引入std::thread
的原因,C++标准库中有很多_资源占有_(resource-owning)类型,比如std::ifstream
,std::unique_ptr
还有std::thread
都是可移动,但不可拷贝。这就说明执行线程的所有权可以在std::thread
实例中移动,下面将展示一个例子。例子中,创建了两个执行线程,并且在std::thread
实例之间(t1,t2和t3)转移所有权:
void some_function(); void some_other_function(); std::thread t1(some_function); // 1 std::thread t2=std::move(t1); // 2 t1=std::thread(some_other_function); // 3 std::thread t3; // 4 t3=std::move(t2); // 5 t1=std::move(t3); // 6 赋值操作将使程序崩溃
首先,新线程开始与t1相关联。当显式使用std::move()
创建t2后②,t1的所有权就转移给了t2。之后,t1和执行线程已经没有关联了;执行some_function的函数现在与t2关联。
然后,与一个临时std::thread
对象相关的线程启动了③。为什么不显式调用std::move()
转移所有权呢?因为,所有者是一个临时对象——移动操作将会隐式的调用。
t3使用默认构造方式创建④,与任何执行线程都没有关联。调用std::move()
将与t2关联线程的所有权转移到t3中⑤。因为t2是一个命名对象,需要显式的调用std::move()
。移动操作⑤完成后,t1与执行some_other_function的线程相关联,t2与任何线程都无关联,t3与执行some_function的线程相关联。
最后一个移动操作,将some_function线程的所有权转移⑥给t1。不过,t1已经有了一个关联的线程(执行some_other_function的线程),所以这里系统直接调用std::terminate()
终止程序继续运行。
std::thread
支持移动,就意味着线程的所有权可以在函数外进行转移,就如下面程序一样。
清单2.5 函数返回std::thread
对象
std::thread f() { void some_function(); return std::thread(some_function); } std::thread g() { void some_other_function(int); std::thread t(some_other_function,42); return t; }
当所有权可以在函数内部传递,就允许std::thread
实例可作为参数进行传递,代码如下:
void f(std::thread t); void g() { void some_function(); f(std::thread(some_function)); std::thread t(some_function); f(std::move(t)); }
std::thread
对象的容器,如果这个容器是移动敏感的(比如,标准中的std::vector<>
),那么移动操作同样适用于这些容器。了解这些后,就可以写出类似清单2.7中的代码,代码量产了一些线程,并且等待它们结束。
清单2.7 量产线程,等待它们结束
void do_work(unsigned id); void f() { std::vector<std::thread> threads; for(unsigned i=0; i < 20; ++i) { threads.push_back(std::thread(do_work,i)); // 产生线程 } std::for_each(threads.begin(),threads.end(), std::mem_fn(&std::thread::join)); // 对每个线程调用join() }
2.4 运行时决定线程数量
std::thread::hardware_concurrency()
在新版C++标准库中是一个很有用的函数。这个函数将返回能同时并发在一个程序中的线程数量。例如,多核系统中,返回值可以是CPU核芯的数量。返回值也仅仅是一个提示,当系统信息无法获取时,函数也会返回0。
2.5 标识线程
线程标识类型是std::thread::id
,可以通过两种方式进行检索。第一种,可以通过调用std::thread
对象的成员函数get_id()
来直接获取。如果std::thread
对象没有与任何执行线程相关联,get_id()
将返回std::thread::type
默认构造值,这个值表示“无线程”。第二种,当前线程中调用std::this_thread::get_id()
(这个函数定义在<thread>
头文件中)也可以获得线程标识。
std::thread::id
对象可以自由的拷贝和对比,因为标识符就可以复用。如果两个对象的std::thread::id
相等,那它们就是同一个线程,或者都“无线程”。如果不等,那么就代表了两个不同线程,或者一个有线程,另一没有线程。
线程库不会限制你去检查线程标识是否一样,std::thread::id
类型对象提供相当丰富的对比操作;比如,提供为不同的值进行排序。这意味着允许程序员将其当做为容器的键值,做排序,或做其他方式的比较。按默认顺序比较不同值的std::thread::id
,所以这个行为可预见的:当a<b
,b<c
时,得a<c
,等等。标准库也提供std::hash<std::thread::id>
容器,所以std::thread::id
也可以作为无序容器的键值。
std::thread::id master_thread; void some_core_part_of_algorithm() { if(std::this_thread::get_id()==master_thread) { do_master_thread_work(); } do_common_work(); }
3 线程间共享数据
3.1 共享数据带来的问题
当涉及到共享数据时,问题很可能是因为共享数据修改所导致。如果共享数据是只读的,那么只读操作不会影响到数据,更不会涉及对数据的修改,所以所有线程都会获得同样的数据。但是,当一个或多个线程要修改共享数据时,就会产生很多麻烦。这种情况下,就必须小心谨慎,才能确保一切所有线程都工作正常。
不变量(invariants)的概念对程序员们编写的程序会有一定的帮助——对于特殊结构体的描述;比如,“变量包含列表中的项数”。不变量通常会在一次更新中被破坏,特别是比较复杂的数据结构,或者一次更新就要改动很大的数据结构。
双链表中每个节点都有一个指针指向列表中下一个节点,还有一个指针指向前一个节点。其中不变量就是节点A中指向“下一个”节点B的指针,还有前向指针。为了从列表中删除一个节点,其两边节点的指针都需要更新。当其中一边更新完成时,不变量就被破坏了,直到另一边也完成更新;在两边都完成更新后,不变量就又稳定了。
从一个列表中删除一个节点的步骤如下图:
1. 找到要删除的节点N 2. 更新前一个节点指向N的指针,让这个指针指向N的下一个节点 3. 更新后一个节点指向N的指针,让这个指正指向N的前一个节点 4. 删除节点N
图中b和c在相同的方向上指向和原来已经不一致了,这就破坏了不变量。线程间潜在问题就是修改共享数据,致使不变量遭到破坏。这就是并行代码常见错误:条件竞争。
3.1.1 条件竞争
C++
标准中也定义了数据竞争这个术语,一种特殊的条件竞争:并发的去修改一个独立对象,数据竞争是(可怕的)未定义行为的起因。
恶性条件竞争通常发生于完成对多于一个的数据块的修改时,例如,对两个连接指针的修改。因为操作要访问两个独立的数据块,独立的指令将会对数据块将进行修改,并且其中一个线程可能正在进行时,另一个线程就对数据块进行了访问。因为出现的概率太低,条件竞争很难查找,也很难复现。
3.1.2 避免恶性条件竞争
这里提供一些方法来解决恶性条件竞争,最简单的办法就是对数据结构采用某种保护机制,确保只有进行修改的线程才能看到不变量被破坏时的中间状态。从其他访问线程的角度来看,修改不是已经完成了,就是还没开始。C++
标准库提供很多类似的机制。
另一个选择是对数据结构和不变量的设计进行修改,修改完的结构必须能完成一系列不可分割的变化,也就是保证每个不变量保持稳定的状态,这就是所谓的无锁编程。不过,这种方式很难得到正确的结果。如果到这个级别,无论是内存模型上的细微差异,还是线程访问数据的能力,都会让工作变的复杂。
另一种处理条件竞争的方式是,使用事务的方式去处理数据结构的更新(这里的"处理"就如同对数据库进行更新一样)。所需的一些数据和读取都存储在事务日志中,然后将之前的操作合为一步,再进行提交。当数据结构被另一个线程修改后,或处理已经重启的情况下,提交就会无法进行,这称作为“软件事务内存”。理论研究中,这是一个很热门的研究领域。这个概念将不会在本书中再进行介绍,因为在C++
中没有对STM进行直接支持。但是,基本思想会在后面提及。
保护共享数据结构的最基本的方式,是使用C++标准库提供的互斥量。
3.2 使用互斥量保护共享数据
互斥量是C++
中一种最通用的数据保护机制,但它不是“银弹”;精心组织代码来保护正确的数据,并在接口内部避免竞争条件是非常重要的。但互斥量自身也有问题,也会造成死锁,或是对数据保护的太多(或太少)。
3.2.1 C++中使用互斥量
C++中通过实例化std::mutex
创建互斥量,通过调用成员函数lock()进行上锁,unlock()进行解锁。不过,不推荐实践中直接去调用成员函数,因为调用成员函数就意味着,必须记住在每个函数出口都要去调用unlock(),也包括异常的情况。C++标准库为互斥量提供了一个RAII语法的模板类std::lock_guard
,其会在构造的时候提供已锁的互斥量,并在析构的时候进行解锁,从而保证了一个已锁的互斥量总是会被正确的解锁。下面的程序清单中,展示了如何在多线程程序中,使用std::mutex
构造的std::lock_guard
实例,对一个列表进行访问保护。std::mutex
和std::lock_guard
都在<mutex>
头文件中声明。
清单3.1 使用互斥量保护列表
#include <list> #include <mutex> #include <algorithm> std::list<int> some_list; // 1 std::mutex some_mutex; // 2 void add_to_list(int new_value) { std::lock_guard<std::mutex> guard(some_mutex); // 3 some_list.push_back(new_value); } bool list_contains(int value_to_find) { std::lock_guard<std::mutex> guard(some_mutex); // 4 return std::find(some_list.begin(),some_list.end(),value_to_find) != some_list.end(); }
当所有成员函数都会在调用时对数据上锁,结束时对数据解锁,那么就保证了数据访问时不变量不被破坏。
当然,也不是总是那么理想,聪明的你一定注意到了:当其中一个成员函数返回的是保护数据的指针或引用时,会破坏对数据的保护。具有访问能力的指针或引用可以访问(并可能修改)被保护的数据,而不会被互斥锁限制。互斥量保护的数据需要对接口的设计相当谨慎,要确保互斥量能锁住任何对保护数据的访问,并且不留后门。
3.2.2 精心组织代码来保护共享数据
使用互斥量来保护数据,并不是仅仅在每一个成员函数中都加入一个std::lock_guard
对象那么简单;一个迷失的指针或引用,将会让这种保护形同虚设。
清单3.2 无意中传递了保护数据的引用
class some_data { int a; std::string b; public: void do_something(); }; class data_wrapper { private: some_data data; std::mutex m; public: template<typename Function> void process_data(Function func) { std::lock_guard<std::mutex> l(m); func(data); // 1 传递“保护”数据给用户函数 } }; some_data* unprotected; void malicious_function(some_data& protected_data) { unprotected=&protected_data; } data_wrapper x; void foo() { x.process_data(malicious_function); // 2 传递一个恶意函数 unprotected->do_something(); // 3 在无保护的情况下访问保护数据 }
例子中process_data看起来没有任何问题,std::lock_guard
对数据做了很好的保护,但调用用户提供的函数func①,就意味着foo能够绕过保护机制将函数malicious_function
传递进去②,在没有锁定互斥量的情况下调用do_something()
。
3.2.3 发现接口内在的条件竞争
清单3.3 std::stack
容器的实现
template<typename T,typename Container=std::deque<T> > class stack { public: explicit stack(const Container&); explicit stack(Container&& = Container()); template <class Alloc> explicit stack(const Alloc&); template <class Alloc> stack(const Container&, const Alloc&); template <class Alloc> stack(Container&&, const Alloc&); template <class Alloc> stack(stack&&, const Alloc&); bool empty() const; size_t size() const; T& top(); T const& top() const; void push(T const&); void push(T&&); void pop(); void swap(stack&&); };
虽然empty()和size()可能在被调用并返回时是正确的,但其的结果是不可靠的;当它们返回后,其他线程就可以自由地访问栈,并且可能push()多个新元素到栈中,也可能pop()一些已在栈中的元素。这样的话,之前从empty()和size()得到的结果就有问题了。
特别地,当栈实例是非共享的,如果栈非空,使用empty()检查再调用top()访问栈顶部的元素是安全的。如下代码所示:
stack<int> s; if (! s.empty()){ // 1 int const value = s.top(); // 2 s.pop(); // 3 do_something(value); }
以上是单线程安全代码:对一个空栈使用top()是未定义行为。对于共享的栈对象,这样的调用顺序就不再安全了,因为在调用empty()①和调用top()②之间,可能有来自另一个线程的pop()调用并删除了最后一个元素。这是一个经典的条件竞争,使用互斥量对栈内部数据进行保护,但依旧不能阻止条件竞争的发生,这就是接口固有的问题。
怎么解决呢?问题发生在接口设计上,所以解决的方法也就是改变接口设计。
表3.1 一种可能执行顺序
Thread A | Thread B |
---|---|
if (!s.empty); | |
if(!s.empty); | |
int const value = s.top(); | |
int const value = s.top(); | |
s.pop(); | |
do_something(value); | s.pop(); |
do_something(value); |
当线程运行时,调用两次top(),栈没被修改,所以每个线程能得到同样的值。值已被处理了两次。这种条件竞争,因为看起来没有任何错误,就会让这个Bug很难定位。
这就需要接口设计上有较大的改动,提议之一就是使用同一互斥量来保护top()和pop()。
选项1: 传入一个引用
第一个选项是将变量的引用作为参数,传入pop()函数中获取想要的“弹出值”:
std::vector<int> result; some_stack.pop(result);
大多数情况下,这种方式还不错,但有明显的缺点:需要构造出一个栈中类型的实例,用于接收目标值。对于一些类型,这样做是不现实的,因为临时构造一个实例,从时间和资源的角度上来看,都是不划算。对于其他的类型,这样也不总能行得通,因为构造函数需要的一些参数,在代码的这个阶段不一定可用。最后,需要可赋值的存储类型,这是一个重大限制:即使支持移动构造,甚至是拷贝构造(从而允许返回一个值),很多用户自定义类型可能都不支持赋值操作。
选项2:无异常抛出的拷贝构造函数或移动构造函数
对于有返回值的pop()函数来说,只有“异常安全”方面的担忧(当返回值时可以抛出一个异常)。很多类型都有拷贝构造函数,它们不会抛出异常,并且随着新标准中对“右值引用”的支持(详见附录A,A.1节),很多类型都将会有一个移动构造函数,即使他们和拷贝构造函数做着相同的事情,它也不会抛出异常。一个有用的选项可以限制对线程安全的栈的使用,并且能让栈安全的返回所需的值,而不会抛出异常。
选项3:返回指向弹出值的指针
第三个选择是返回一个指向弹出元素的指针,而不是直接返回值。指针的优势是自由拷贝,并且不会产生异常。缺点就是返回一个指针需要对对象的内存分配进行管理,对于简单数据类型(比如:int),内存管理的开销要远大于直接返回值。对于选择这个方案的接口,使用std::shared_ptr
是个不错的选择;不仅能避免内存泄露(因为当对象中指针销毁时,对象也会被销毁),而且标准库能够完全控制内存分配方案,也就不需要new和delete操作。这种优化是很重要的:因为堆栈中的每个对象,都需要用new进行独立的内存分配,相较于非线程安全版本,这个方案的开销相当大。
选项4:“选项1 + 选项2”或 “选项1 + 选项3”
对于通用的代码来说,灵活性不应忽视。当你已经选择了选项2或3时,再去选择1也是很容易的。这些选项提供给用户,让用户自己选择对于他们自己来说最合适,最经济的方案。
例:定义线程安全的堆栈
清单3.4 线程安全的堆栈类定义(概述)
#include <exception> #include <memory> // For std::shared_ptr<> struct empty_stack: std::exception { const char* what() const throw(); }; template<typename T> class threadsafe_stack { public: threadsafe_stack(); threadsafe_stack(const threadsafe_stack&); threadsafe_stack& operator=(const threadsafe_stack&) = delete; // 1 赋值操作被删除 void push(T new_value); std::shared_ptr<T> pop(); void pop(T& value); bool empty() const; };
削减接口可以获得最大程度的安全,甚至限制对栈的一些操作。使用std::shared_ptr
可以避免内存分配管理的问题,并避免多次使用new和delete操作。
清单3.5 扩充(线程安全)堆栈
#include <exception> #include <memory> #include <mutex> #include <stack> struct empty_stack: std::exception { const char* what() const throw() { return "empty stack!"; }; }; template<typename T> class threadsafe_stack { private: std::stack<T> data; mutable std::mutex m; public: threadsafe_stack() : data(std::stack<T>()){} threadsafe_stack(const threadsafe_stack& other) { std::lock_guard<std::mutex> lock(other.m); data = other.data; // 1 在构造函数体中的执行拷贝 } threadsafe_stack& operator=(const threadsafe_stack&) = delete; void push(T new_value) { std::lock_guard<std::mutex> lock(m); data.push(new_value); } std::shared_ptr<T> pop() { std::lock_guard<std::mutex> lock(m); if(data.empty()) throw empty_stack(); // 在调用pop前,检查栈是否为空 std::shared_ptr<T> const res(std::make_shared<T>(data.top())); // 在修改堆栈前,分配出返回值 data.pop(); return res; } void pop(T& value) { std::lock_guard<std::mutex> lock(m); if(data.empty()) throw empty_stack(); value=data.top(); data.pop(); } bool empty() const { std::lock_guard<std::mutex> lock(m); return data.empty(); } };
之前对top()和pop()函数的讨论中,恶性条件竞争已经出现,因为锁的粒度太小,需要保护的操作并未全覆盖到。不过,锁住的颗粒过大同样会有问题。还有一个问题,一个全局互斥量要去保护全部共享数据,在一个系统中存在有大量的共享数据时,因为线程可以强制运行,甚至可以访问不同位置的数据,抵消了并发带来的性能提升。在第一版为多处理器系统设计Linux内核中,就使用了一个全局内核锁。虽然这个锁能正常工作,但在双核处理系统的上的性能要比两个单核系统的性能差很多,四核系统就更不能提了。太多请求去竞争占用内核,使得依赖于处理器运行的线程没有办法很好的工作。随后修正的Linux内核加入了一个细粒度锁方案,因为少了很多内核竞争,这时四核处理系统的性能就和单核处理的四倍差不多了。
一个给定操作需要两个或两个以上的互斥量时,另一个潜在的问题将出现:死锁。与条件竞争完全相反——不同的两个线程会互相等待,从而什么都没做。
3.2.4 死锁:问题描述及解决方案
试想有一个玩具,这个玩具由两部分组成,必须拿到这两个部分,才能够玩。例如,一个玩具鼓,需要一个鼓锤和一个鼓才能玩。现在有两个小孩,他们都很喜欢玩这个玩具。当其中一个孩子拿到了鼓和鼓锤时,那就可以尽情的玩耍了。当另一孩子想要玩,他就得等待另一孩子玩完才行。再试想,鼓和鼓锤被放在不同的玩具箱里,并且两个孩子在同一时间里都想要去敲鼓。之后,他们就去玩具箱里面找这个鼓。其中一个找到了鼓,并且另外一个找到了鼓锤。现在问题就来了,除非其中一个孩子决定让另一个先玩,他可以把自己的那部分给另外一个孩子;但当他们都紧握着自己所有的部分而不给予,那么这个鼓谁都没法玩。
现在没有孩子去争抢玩具,但线程有对锁的竞争:一对线程需要对他们所有的互斥量做一些操作,其中每个线程都有一个互斥量,且等待另一个解锁。这样没有线程能工作,因为他们都在等待对方释放互斥量。这种情况就是死锁,它的最大问题就是由两个或两个以上的互斥量来锁定一个操作。
避免死锁的一般建议,就是让两个互斥量总以相同的顺序上锁:总在互斥量B之前锁住互斥量A,就永远不会死锁。
std::lock
——可以一次性锁住多个(两个以上)的互斥量,并且没有副作用(死锁风险)。下面的程序清单中,就来看一下怎么在一个简单的交换操作中使用std::lock
。
清单3.6 交换操作中使用std::lock()
和std::lock_guard
// 这里的std::lock()需要包含<mutex>头文件 class some_big_object; void swap(some_big_object& lhs,some_big_object& rhs); class X { private: some_big_object some_detail; std::mutex m; public: X(some_big_object const& sd):some_detail(sd){} friend void swap(X& lhs, X& rhs) { if(&lhs==&rhs) return; std::lock(lhs.m,rhs.m); // 1 std::lock_guard<std::mutex> lock_a(lhs.m,std::adopt_lock); // 2 std::lock_guard<std::mutex> lock_b(rhs.m,std::adopt_lock); // 3 swap(lhs.some_detail,rhs.some_detail); } };
使用std::lock
去锁lhs.m或rhs.m时,可能会抛出异常;这种情况下,异常会传播到std::lock
之外。当std::lock
成功的获取一个互斥量上的锁,并且当其尝试从另一个互斥量上再获取锁时,就会有异常抛出,第一个锁也会随着异常的产生而自动释放,所以std::lock
要么将两个锁都锁住,要不一个都不锁。
虽然std::lock
可以在这情况下(获取两个以上的锁)避免死锁,但它没办法帮助你获取其中一个锁。
3.2.5 避免死锁的进阶指导
虽然锁是产生死锁的一般原因,但也不排除死锁出现在其他地方。无锁的情况下,仅需要每个std::thread
对象调用join(),两个线程就能产生死锁。这种情况很常见,一个线程会等待另一个线程,其他线程同时也会等待第一个线程结束,所以三个或更多线程的互相等待也会发生死锁。为了避免死锁,这里的指导意见为:当机会来临时,不要拱手让人。以下提供一些个人的指导建议,如何识别死锁,并消除其他线程的等待。
避免嵌套锁
第一个建议往往是最简单的:一个线程已获得一个锁时,再别去获取第二个。如果能坚持这个建议,因为每个线程只持有一个锁,锁上就不会产生死锁。即使互斥锁造成死锁的最常见原因,也可能会在其他方面受到死锁的困扰(比如:线程间的互相等待)。当你需要获取多个锁,使用一个std::lock
来做这件事(对获取锁的操作上锁),避免产生死锁。
避免在持有锁时调用用户提供的代码
:因为代码是用户提供的,你没有办法确定用户要做什么;用户程序可能做任何事情,包括获取锁。你在持有锁的情况下,调用用户提供的代码;如果用户代码要获取一个锁,就会违反第一个指导意见,并造成死锁(有时,这是无法避免的)。
使用固定顺序获取锁
当硬性条件要求你获取两个以上(包括两个)的锁,并且不能使用std::lock
单独操作来获取它们;那么最好在每个线程上,用固定的顺序获取它们获取它们(锁)。
为了遍历链表,线程必须保证在获取当前节点的互斥锁前提下,获得下一个节点的锁,要保证指向下一个节点的指针不会同时被修改。一旦下一个节点上的锁被获取,那么第一个节点的锁就可以释放了,因为没有持有它的必要性了。
这种“手递手”锁的模式允许多个线程访问列表,为每一个访问的线程提供不同的节点。但是,为了避免死锁,节点必须以同样的顺序上锁:如果两个线程试图用互为反向的顺序,使用“手递手”锁遍历列表,他们将执行到列表中间部分时,发生死锁。当节点A和B在列表中相邻,当前线程可能会同时尝试获取A和B上的锁。另一个线程可能已经获取了节点B上的锁,并且试图获取节点A上的锁——经典的死锁场景。
当A、C节点中的B节点正在被删除时,如果有线程在已获取A和C上的锁后,还要获取B节点上的锁时,当一个线程遍历列表的时候,这样的情况就可能发生死锁。这样的线程可能会试图首先锁住A节点或C节点(根据遍历的方向),但是后面就会发现,它无法获得B上的锁,因为线程在执行删除任务的时候,已经获取了B上的锁,并且同时也获取了A和C上的锁。
这里提供一种避免死锁的方式,定义遍历的顺序,所以一个线程必须先锁住A才能获取B的锁,在锁住B之后才能获取C的锁。这将消除死锁发生的可能性,在不允许反向遍历的列表上。类似的约定常被用来建立其他的数据结构。
使用锁的层次结构
清单3.7 使用层次锁来避免死锁
hierarchical_mutex high_level_mutex(10000); // 1 hierarchical_mutex low_level_mutex(5000); // 2 int do_low_level_stuff(); int low_level_func() { std::lock_guard<hierarchical_mutex> lk(low_level_mutex); // 3 return do_low_level_stuff(); } void high_level_stuff(int some_param); void high_level_func() { std::lock_guard<hierarchical_mutex> lk(high_level_mutex); // 4 high_level_stuff(low_level_func()); // 5 } void thread_a() // 6 { high_level_func(); } hierarchical_mutex other_mutex(100); // 7 void do_other_stuff(); void other_stuff() { high_level_func(); // 8 do_other_stuff(); } void thread_b() // 9 { std::lock_guard<hierarchical_mutex> lk(other_mutex); // 10 other_stuff(); }
例子也展示了另一点,std::lock_guard<>
模板与用户定义的互斥量类型一起使用。虽然hierarchical_mutex不是C++
标准的一部分,但是它写起来很容易;一个简单的实现在列表3.8中展示出来。尽管它是一个用户定义类型,它可以用于std::lock_guard<>
模板中,因为它的实现有三个成员函数为了满足互斥量操作:lock(), unlock() 和 try_lock()。虽然你还没见过try_lock()怎么使用,但是其使用起来很简单:当互斥量上的锁被一个线程持有,它将返回false,而不是等待调用的线程,直到能够获取互斥量上的锁为止。在std::lock()
的内部实现中,try_lock()会作为避免死锁算法的一部分。
列表3.8 简单的层级互斥量实现
class hierarchical_mutex { std::mutex internal_mutex; unsigned long const hierarchy_value; unsigned long previous_hierarchy_value; static thread_local unsigned long this_thread_hierarchy_value; // 1 void check_for_hierarchy_violation() { if(this_thread_hierarchy_value <= hierarchy_value) // 2 { throw std::logic_error(“mutex hierarchy violated”); } } void update_hierarchy_value() { previous_hierarchy_value=this_thread_hierarchy_value; // 3 this_thread_hierarchy_value=hierarchy_value; } public: explicit hierarchical_mutex(unsigned long value): hierarchy_value(value), previous_hierarchy_value(0) {} void lock() { check_for_hierarchy_violation(); internal_mutex.lock(); // 4 update_hierarchy_value(); // 5 } void unlock() { this_thread_hierarchy_value=previous_hierarchy_value; // 6 internal_mutex.unlock(); } bool try_lock() { check_for_hierarchy_violation(); if(!internal_mutex.try_lock()) // 7 return false; update_hierarchy_value(); return true; } }; thread_local unsigned long hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX); // 8
超越锁的延伸扩展
当代码已经能规避死锁,std::lock()
和std::lock_guard
能组成简单的锁覆盖大多数情况,但是有时需要更多的灵活性。在这些情况,可以使用标准库提供的std::unique_lock
模板。如std::lock_guard
,这是一个参数化的互斥量模板类,并且它提供很多RAII类型锁用来管理std::lock_guard
类型,可以让代码更加灵活。
3.2.6 std::unique_lock——灵活的锁
std::unqiue_lock
使用更为自由的不变量,这样std::unique_lock
实例不会总与互斥量的数据类型相关,使用起来要比std:lock_guard
更加灵活。首先,可将std::adopt_lock
作为第二个参数传入构造函数,对互斥量进行管理;也可以将std::defer_lock
作为第二个参数传递进去,表明互斥量应保持解锁状态。这样,就可以被std::unique_lock
对象(不是互斥量)的lock()函数的所获取,或传递std::unique_lock
对象到std::lock()
中。清单3.6可以轻易的转换为清单3.9,使用std::unique_lock
和std::defer_lock
①,而非std::lock_guard
和std::adopt_lock
。代码长度相同,几乎等价,唯一不同的就是:std::unique_lock
会占用比较多的空间,并且比std::lock_guard
稍慢一些。保证灵活性要付出代价,这个代价就是允许std::unique_lock
实例不带互斥量:信息已被存储,且已被更新。
清单3.9 交换操作中std::lock()
和std::unique_lock
的使用
class some_big_object; void swap(some_big_object& lhs,some_big_object& rhs); class X { private: some_big_object some_detail; std::mutex m; public: X(some_big_object const& sd):some_detail(sd){} friend void swap(X& lhs, X& rhs) { if(&lhs==&rhs) return; std::unique_lock<std::mutex> lock_a(lhs.m,std::defer_lock); // 1 std::unique_lock<std::mutex> lock_b(rhs.m,std::defer_lock); // 1 std::def_lock 留下未上锁的互斥量 std::lock(lock_a,lock_b); // 2 互斥量在这里上锁 swap(lhs.some_detail,rhs.some_detail); } };
列表3.9中,因为std::unique_lock
支持lock(), try_lock()和unlock()成员函数,所以能将std::unique_lock
对象传递到std::lock()
②。这些同名的成员函数在低层做着实际的工作,并且仅更新std::unique_lock
实例中的标志,来确定该实例是否拥有特定的互斥量,这个标志是为了确保unlock()在析构函数中被正确调用。如果实例拥有互斥量,那么析构函数必须调用unlock();但当实例中没有互斥量时,析构函数就不能去调用unlock()。这个标志可以通过owns_lock()成员变量进行查询。
可能如你期望的那样,这个标志被存储在某个地方。因此,std::unique_lock
对象的体积通常要比std::lock_guard
对象大,当使用std::unique_lock
替代std::lock_guard
,因为会对标志进行适当的更新或检查,就会做些轻微的性能惩罚。当std::lock_guard
已经能够满足你的需求,那么还是建议你继续使用它。当需要更加灵活的锁时,最好选择std::unique_lock
,因为它更适合于你的任务。你已经看到一个递延锁的例子,另外一种情况是锁的所有权需要从一个域转到另一个域。
3.2.7 不同域中互斥量所有权的传递
std::unique_lock
是可移动,但不可赋值的类型。
std::unique_lock<std::mutex> get_lock() { extern std::mutex some_mutex; std::unique_lock<std::mutex> lk(some_mutex); prepare_data(); return lk; // 1 } void process_data() { std::unique_lock<std::mutex> lk(get_lock()); // 2 do_something(); }
std::unique_lock
的灵活性同样也允许实例在销毁之前放弃其拥有的锁。可以使用unlock()来做这件事,如同一个互斥量:std::unique_lock
的成员函数提供类似于锁定和解锁互斥量的功能。std::unique_lock
实例在销毁前释放锁的能力,当锁没有必要在持有的时候,可以在特定的代码分支对其进行选择性的释放。这对于应用性能来说很重要,因为持有锁的时间增加会导致性能下降,其他线程会等待这个锁的释放,避免超越操作。
3.2.8 锁的粒度
选择粒度对于锁来说很重要,为了保护对应的数据,保证锁有能力保护这些数据也很重要。
如果很多线程正在等待同一个资源,当有线程持有锁的时间过长,这就会增加等待的时间。在可能的情况下,锁住互斥量的同时只能对共享数据进行访问;试图对锁外数据进行处理。特别是做一些费时的动作,比如:对文件的输入/输出操作进行上锁。文件输入/输出通常要比从内存中读或写同样长度的数据慢成百上千倍,所以除非锁已经打算去保护对文件的访问,要么执行输入/输出操作将会将延迟其他线程执行的时间,这很没有必要(因为文件锁阻塞住了很多操作),这样多线程带来的性能效益会被抵消。
这能表示只有一个互斥量保护整个数据结构时的情况,不仅可能会有更多对锁的竞争,也会增加锁持锁的时间。较多的操作步骤需要获取同一个互斥量上的锁,所以持有锁的时间会更长。成本上的双重打击也算是为向细粒度锁转移提供了双重激励和可能。
列表3.10 比较操作符中一次锁住一个互斥量
class Y { private: int some_detail; mutable std::mutex m; int get_detail() const { std::lock_guard<std::mutex> lock_a(m); // 1 return some_detail; } public: Y(int sd):some_detail(sd){} friend bool operator==(Y const& lhs, Y const& rhs) { if(&lhs==&rhs) return true; int const lhs_value=lhs.get_detail(); // 2 int const rhs_value=rhs.get_detail(); // 3 return lhs_value==rhs_value; // 4 } };
3.3 保护共享数据的替代设施
互斥量是最通用的机制,但其并非保护共享数据的唯一方式。这里有很多替代方式可以在特定情况下,提供更加合适的保护。
一个特别极端(但十分常见)的情况就是,共享数据在并发访问和初始化时(都需要保护),但是之后需要进行隐式同步。这可能是因为数据作为只读方式创建,所以没有同步问题;或者因为必要的保护作为对数据操作的一部分,所以隐式的执行。任何情况下,数据初始化后锁住一个互斥量,纯粹是为了保护其初始化过程(这是没有必要的),并且这会给性能带来不必要的冲击。出于以上的原因,C++
标准提供了一种纯粹保护共享数据初始化过程的机制。
3.3.1 保护共享数据的初始化过程
假设你与一个共享源,构建代价很昂贵,可能它会打开一个数据库连接或分配出很多的内存。
延迟初始化(Lazy initialization)在单线程代码很常见——每一个操作都需要先对源进行检查,为了了解数据是否被初始化,然后在其使用前决定,数据是否需要初始化:
std::shared_ptr<some_resource> resource_ptr; void foo() { if(!resource_ptr) { resource_ptr.reset(new some_resource); // 1 } resource_ptr->do_something(); }
清单 3.11 使用一个互斥量的延迟初始化(线程安全)过程
std::shared_ptr<some_resource> resource_ptr; std::mutex resource_mutex; void foo() { std::unique_lock<std::mutex> lk(resource_mutex); // 所有线程在此序列化 if(!resource_ptr) { resource_ptr.reset(new some_resource); // 只有初始化过程需要保护 } lk.unlock(); resource_ptr->do_something(); }
这段代码相当常见了,也足够表现出没必要的线程化问题,很多人能想出更好的一些的办法来做这件事,包括声名狼藉的双重检查锁模式:
void undefined_behaviour_with_double_checked_locking() { if(!resource_ptr) // 1 { std::lock_guard<std::mutex> lk(resource_mutex); if(!resource_ptr) // 2 { resource_ptr.reset(new some_resource); // 3 } } resource_ptr->do_something(); // 4 }
指针第一次读取数据不需要获取锁①,并且只有在指针为NULL时才需要获取锁。然后,当获取锁之后,指针会被再次检查一遍② (这就是双重检查的部分),避免另一的线程在第一次检查后再做初始化,并且让当前线程获取锁。
这个模式为什么声名狼藉呢?因为这里有潜在的条件竞争,未被锁保护的读取操作①没有与其他线程里被锁保护的写入操作③进行同步。因此就会产生条件竞争,这个条件竞争不仅覆盖指针本身,还会影响到其指向的对象;即使一个线程知道另一个线程完成对指针进行写入,它可能没有看到新创建的some_resource实例,然后调用do_something()④后,得到不正确的结果。这个例子是在一种典型的条件竞争——数据竞争,C++
标准中这就会被指定为“未定义行为”。这种竞争肯定是可以避免的。可以阅读第5章,那里有更多对内存模型的讨论,包括数据竞争的构成。
C++标准委员会也认为条件竞争的处理很重要,所以C++标准库提供了std::once_flag
和std::call_once
来处理这种情况。比起锁住互斥量,并显式的检查指针,每个线程只需要使用std::call_once
,在std::call_once
的结束时,就能安全的知道指针已经被其他的线程初始化了。使用std::call_once
比显式使用互斥量消耗的资源更少,特别是当初始化完成后。
std::shared_ptr<some_resource> resource_ptr; std::once_flag resource_flag; // 1 void init_resource() { resource_ptr.reset(new some_resource); } void foo() { std::call_once(resource_flag,init_resource); // 可以完整的进行一次初始化 resource_ptr->do_something(); }
清单3.12 使用std::call_once
作为类成员的延迟初始化(线程安全)
class X { private: connection_info connection_details; connection_handle connection; std::once_flag connection_init_flag; void open_connection() { connection=connection_manager.open(connection_details); } public: X(connection_info const& connection_details_): connection_details(connection_details_) {} void send_data(data_packet const& data) // 1 { std::call_once(connection_init_flag,&X::open_connection,this); // 2 connection.send_data(data); } data_packet receive_data() // 3 { std::call_once(connection_init_flag,&X::open_connection,this); // 2 return connection.receive_data(); } };
值得注意的是,std::mutex
和std::one_flag
的实例就不能拷贝和移动,所以当你使用它们作为类成员函数,如果你需要用到他们,你就得显示定义这些特殊的成员函数。
3.3.2 保护很少更新的数据结构
试想,为了将域名解析为其相关IP地址,我们在缓存中的存放了一张DNS入口表。通常,给定DNS数目在很长的一段时间内保持不变。虽然,在用户访问不同网站时,新的入口可能会被添加到表中,但是这些数据可能在其生命周期内保持不变。所以定期检查缓存中入口的有效性,就变的十分重要了;但是,这也需要一次更新,也许这次更新只是对一些细节做了改动。
使用std::mutex
来保护数据结构,显的有些反应过度,比起使用std::mutex
实例进行同步,不如使用boost::shared_mutex
来做同步。
清单3.13 使用boost::shared_mutex
对数据结构进行保护
#include <map> #include <string> #include <mutex> #include <boost/thread/shared_mutex.hpp> class dns_entry; class dns_cache { std::map<std::string,dns_entry> entries; mutable boost::shared_mutex entry_mutex; public: dns_entry find_entry(std::string const& domain) const { boost::shared_lock<boost::shared_mutex> lk(entry_mutex); // 1 std::map<std::string,dns_entry>::const_iterator const it= entries.find(domain); return (it==entries.end())?dns_entry():it->second; } void update_or_add_entry(std::string const& domain, dns_entry const& dns_details) { std::lock_guard<boost::shared_mutex> lk(entry_mutex); // 2 entries[domain]=dns_details; } };
3.3.3 嵌套锁
当一个线程已经获取一个std::mutex
时(已经上锁),并对其再次上锁,这个操作就是错误的,并且继续尝试这样做的话,就会产生未定义行为。然而,在某些情况下,一个线程尝试获取同一个互斥量多次,而没有对其进行一次释放是可以的。之所以可以,是因为C++
标准库提供了std::recursive_mutex
类。互斥量锁住其他线程前,你必须释放你拥有的所有锁,所以当你调用lock()三次时,你也必须调用unlock()三次。正确使用std::lock_guard<std::recursive_mutex>
和std::unique_lock<std::recursive_mutex>
可以帮你处理这些问题。
4 同步并发操作
4.1 等待一个事件或其他条件
晚间在列车上等待凌晨到站出站,要么一直不睡等着,要么等待被列车员唤醒。
第二个选择是在等待线程在检查间隙,使用std::this_thread::sleep_for()
进行周期性的间歇(详见4.3节):
bool flag; std::mutex m; void wait_for_flag() { std::unique_lock<std::mutex> lk(m); while(!flag) { lk.unlock(); // 1 解锁互斥量 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 2 休眠100ms lk.lock(); // 3 再锁互斥量 } }
这里博主理解为是一个轮询。
4.1.1 等待条件达成
清单4.1 使用std::condition_variable
处理数据等待
std::mutex mut; std::queue<data_chunk> data_queue; // 1 std::condition_variable data_cond; void data_preparation_thread() { while(more_data_to_prepare()) { data_chunk const data=prepare_data(); std::lock_guard<std::mutex> lk(mut); data_queue.push(data); // 2 data_cond.notify_one(); // 3 } } void data_processing_thread() { while(true) { std::unique_lock<std::mutex> lk(mut); // 4 data_cond.wait( lk,[]{return !data_queue.empty();}); // 5 data_chunk data=data_queue.front(); data_queue.pop(); lk.unlock(); // 6 process(data); if(is_last_chunk(data)) break; } }
4.1.2 使用条件变量构建线程安全队列
清单4.2 std::queue
接口
template <class T, class Container = std::deque<T> > class queue { public: explicit queue(const Container&); explicit queue(Container&& = Container()); template <class Alloc> explicit queue(const Alloc&); template <class Alloc> queue(const Container&, const Alloc&); template <class Alloc> queue(Container&&, const Alloc&); template <class Alloc> queue(queue&&, const Alloc&); void swap(queue& q); bool empty() const; size_type size() const; T& front(); const T& front() const; T& back(); const T& back() const; void push(const T& x); void push(T&& x); void pop(); template <class... Args> void emplace(Args&&... args); };
当你忽略构造、赋值以及交换操作时,你就剩下了三组操作:1. 对整个队列的状态进行查询(empty()和size());2.查询在队列中的各个元素(front()和back());3.修改队列的操作(push(), pop()和emplace())。这就和3.2.3中的栈一样了,因此你也会遇到在固有接口上的条件竞争。因此,你需要将front()和pop()合并成一个函数调用,就像之前在栈实现时合并top()和pop()一样。与清单4.1中的代码不同的是:当使用队列在多个线程中传递数据时,接收线程通常需要等待数据的压入。这里我们提供pop()函数的两个变种:try_pop()和wait_and_pop()。try_pop() ,尝试从队列中弹出数据,总会直接返回(当有失败时),即使没有值可检索;wait_and_pop(),将会等待有值可检索的时候才返回。当你使用之前栈的方式来实现你的队列,你实现的队列接口就可能会是下面这样:
清单4.3 线程安全队列的接口
#include <memory> // 为了使用std::shared_ptr template<typename T> class threadsafe_queue { public: threadsafe_queue(); threadsafe_queue(const threadsafe_queue&); threadsafe_queue& operator=( const threadsafe_queue&) = delete; // 不允许简单的赋值 void push(T new_value); bool try_pop(T& value); // 1 std::shared_ptr<T> try_pop(); // 2 void wait_and_pop(T& value); std::shared_ptr<T> wait_and_pop(); bool empty() const; };
清单4.4 从清单4.1中提取push()和wait_and_pop()
#include <queue> #include <mutex> #include <condition_variable> template<typename T> class threadsafe_queue { private: std::mutex mut; std::queue<T> data_queue; std::condition_variable data_cond; public: void push(T new_value) { std::lock_guard<std::mutex> lk(mut); data_queue.push(new_value); data_cond.notify_one(); } void wait_and_pop(T& value) { std::unique_lock<std::mutex> lk(mut); data_cond.wait(lk,[this]{return !data_queue.empty();}); value=data_queue.front(); data_queue.pop(); } }; threadsafe_queue<data_chunk> data_queue; // 1 void data_preparation_thread() { while(more_data_to_prepare()) { data_chunk const data=prepare_data(); data_queue.push(data); // 2 } } void data_processing_thread() { while(true) { data_chunk data; data_queue.wait_and_pop(data); // 3 process(data); if(is_last_chunk(data)) break; } }
清单4.5 使用条件变量的线程安全队列(完整版)
#include <queue> #include <memory> #include <mutex> #include <condition_variable> template<typename T> class threadsafe_queue { private: mutable std::mutex mut; // 1 互斥量必须是可变的 std::queue<T> data_queue; std::condition_variable data_cond; public: threadsafe_queue() {} threadsafe_queue(threadsafe_queue const& other) { std::lock_guard<std::mutex> lk(other.mut); data_queue=other.data_queue; } void push(T new_value) { std::lock_guard<std::mutex> lk(mut); data_queue.push(new_value); data_cond.notify_one(); } void wait_and_pop(T& value) { std::unique_lock<std::mutex> lk(mut); data_cond.wait(lk,[this]{return !data_queue.empty();}); value=data_queue.front(); data_queue.pop(); } std::shared_ptr<T> wait_and_pop() { std::unique_lock<std::mutex> lk(mut); data_cond.wait(lk,[this]{return !data_queue.empty();}); std::shared_ptr<T> res(std::make_shared<T>(data_queue.front())); data_queue.pop(); return res; } bool try_pop(T& value) { std::lock_guard<std::mutex> lk(mut); if(data_queue.empty()) return false; value=data_queue.front(); data_queue.pop(); return true; } std::shared_ptr<T> try_pop() { std::lock_guard<std::mutex> lk(mut); if(data_queue.empty()) return std::shared_ptr<T>(); std::shared_ptr<T> res(std::make_shared<T>(data_queue.front())); data_queue.pop(); return res; } bool empty() const { std::lock_guard<std::mutex> lk(mut); return data_queue.empty(); } };
4.2 使用期望等待一次性事件
5 C++内存模型和原子类型操作
6 基于锁的并发数据结构设计
7 无锁并发数据结构设计
8 并发代码设计
9 高级线程管理
10 多线程程序的测试和调试
重点
线程std::thread对象必须调用join或detach
- thread析构
- C++11多线程 - Part 2: 连接和分离线程
- Starting thread causing abort()
Destroys the thread object. If *this has an associated thread (joinable() == true), std::terminate() is called.
创建的std::thread对象会在构造函数结束时销毁,因为它是一个局部变量。如果调用了std::thread的析构函数,而该线程是joinable,则调用std::terminate。即程序会调用Abort(),不进行任何清理工作,然后abnormal program termination(程序异常终止)。
shared_mutex也是基于操作系统底层的读写锁pthread_rwlock_t的封装
参考
1、《C++ Concurrency in Action》[陈晓伟 译]
2、github 翻译地址
3、gitbook 在线阅读
4、书中源码
5、学习C++11/14
6、CPP-Concurrency-In-Action-2ed
7、thread析构
8、C++11多线程 - Part 2: 连接和分离线程
9、Starting thread causing abort()
10、Boost Mutex详细解说
这篇关于《C++ Concurrency in Action》笔记的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-07-03微信支付提示下单账户与支付账户不一致-icode9专业技术文章分享
- 2024-07-03微信支付提示订单号重复-icode9专业技术文章分享
- 2024-07-02微服务启动nacos注册上去了,但是一直没有收到请求-icode9专业技术文章分享
- 2024-07-02如何检查文件的编码格式-icode9专业技术文章分享
- 2024-07-02sublime 更改编码格式-icode9专业技术文章分享
- 2024-06-30uniAPP 实现全屏左右滚动滚动的效果-icode9专业技术文章分享
- 2024-06-30如何在本地使用授权或插件-icode9专业技术文章分享
- 2024-06-30伪静态规则配置方法汇总-icode9专业技术文章分享
- 2024-06-29易优CMS安装常见问题汇总-icode9专业技术文章分享
- 2024-06-28易优新手必读安装教程-icode9专业技术文章分享