c++11改进的内容
锁
锁相关的完整代码可在Github 仓库中查看
互斥锁
C++提供互斥锁mutex,来实现多线程 之间的互斥 ,成员函数如下:
(constructor)
Construct mutex (public member function ) 1. 返回一个没有被锁定的状态 2. 不可复制、拷贝、移动
lock
Lock mutex (public member function ) 1. 如果锁没有被其他线程占有,该线程将会拥有该锁,直到使用unlock进行释放 2. 如果锁被其他线程获取到了,将等待用有锁的线程unlock之后,才有可能获得 3. 同线程锁多次,会死锁
try_lock
Lock mutex if not locked (public member function ) 1. 若没有被占有,占有直到主动声明释放 2. 若被其他线程占有,返回false,不阻塞 3. 同线程锁多次,会死锁
unlock
Unlock mutex (public member function ) 1. 若线程占有锁,释放 2. 若没有占有锁,会发生未知的行为
native_handle
Get native handle (public member function ) 不知道是个啥
应用例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 #include <mutex> #include <thread> #include <iostream> using namespace std ;mutex mtx; void print_block (int n, char c) { this_thread::sleep_for(std ::chrono::seconds(1 )); mtx.lock(); for (int i=0 ; i<n; ++i) { cout << c; } cout << '\n' ; mtx.unlock(); } int main (void ) { mutex lock; thread th1 (print_block, 50 , '*' ) ; thread th2 (print_block, 50 , '$' ) ; th1.join(); th2.join(); return 0 ; }
条件变量
条件变量condition_variable实现了线程主动进入阻塞状态等待被其他线程进行唤醒。
A condition variable is an object able to block the calling thread until notified to resume. It uses a unique_lock (over a mutex ) to lock the thread when one of its wait functions is called. The thread remains blocked until woken up by another thread that calls a notification function on the same condition_variable object. Objects of type condition_variable always use unique_lock<mutex> to wait: for an alternative that works with any kind of lockable type , see condition_variable_any
条件变量阻塞线程,直到被其他线程通知。
这里额外简单解释一下unique_lock:
unique_lock是一个管理锁的对象
在构造时期会让锁处于lock的阶段(需要通过如下的构造函数样式explicit unique_lock (mutex_type& m);)
在对象被析构的时候,自动的释放锁
此外还提供了获取锁和释放锁的接口
(constructor)
Construct condition_variable (public member function )
(destructor)
Destroy condition_variable (public member function )
wait
Wait until notified (public member function ) 1. 阻塞当前线程 2. 自动调用lck.unlock(),允许其他线程继续执行 3. 当notified类型的函数被调用时,会执行lck.lock(),让线程继续执行
wait_for
Wait for timeout or until notified (public member function ) 1. 等待被唤醒或者倒计时结束唤醒 2. 返回一个被唤醒的原因,比如timeout
wait_until
Wait until notified or time point (public member function ) 1. 和wait_for类似,该方法会在到了某个绝对时间点的时候被唤醒
notify_one
Notify one (public member function ) 1. 如果有被阻塞的,随机选取一个进行唤醒 2. 如果没有被阻塞的,什么都不做
notify_all
Notify all (public member function ) 1. 如果有被阻塞的,唤醒全部 2. 如果没有被阻塞的,什么都不做
应用例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 #include <iostream> // std::cout #include <thread> // std::thread #include <mutex> // std::mutex, std::unique_lock #include <condition_variable> // std::condition_variable std ::mutex mtx;std ::condition_variable cv;bool ready = false ;void print_id (int id) { std ::unique_lock<std ::mutex> lck (mtx) ; while (!ready) { cv.wait(lck); std ::cout <<"." ; } std ::cout << std ::endl << "thread " << id << '\n' ; } void go () { std ::unique_lock<std ::mutex> lck (mtx) ; ready = true ; cv.notify_all(); } int main () { std ::thread threads[10 ]; for (int i=0 ; i<10 ; ++i) threads[i] = std ::thread(print_id, i); std ::cout << "10 threads ready to race...\n" ; go(); for (auto & th : threads) th.join(); return 0 ; }
信号量
C++中只提供了互斥 和条件变量 ,但是没有信号量的开箱即用,可以使用锁加信号量来实现一个信号量,代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 #include <iostream> #include <thread> #include <mutex> #include <condition_variable> using namespace std ;class semaphore { private : int count; mutex mtk; condition_variable cv; public : semaphore(int val=1 ):count(val){} void wait () { unique_lock<mutex> lck (mtk) ; if (--count<0 ) cv.wait(lck); } void signal () { unique_lock<mutex> lck (mtk) ; if (++count<=0 ) cv.notify_one(); } };
生产者-消费者模型
参考CSDN的博客 ,使用队列模拟生产消费过程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 #include "02-semaphore.hpp" #include <queue> #include <thread> #include <iostream> using namespace std ;class ProCon { int count; queue <int > que; semaphore emptyBuffers, fullBuffers; mutex mtx; public : ProCon(int size = 1 ):emptyBuffers(size), fullBuffers(0 ), count(size){} void push (int val) { emptyBuffers.wait(); mtx.lock(); que.push(val); cout <<"after push: queue's size: " <<que.size()<<endl ; mtx.unlock(); fullBuffers.signal(); } int pop () { fullBuffers.wait(); mtx.lock(); int ans = que.front(); que.pop(); mtx.unlock(); emptyBuffers.signal(); return ans; } };
读者-写者模型
读者写者模型具体分为优先读 、优先写 和公平读写 。
优先读
因为读-读之间共享写锁,读-写和写-写之间是互斥的,所以最简单版本的实现就是读优先的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 #include "02-semaphore.hpp" #include <iostream> #include <mutex> #include <string> using namespace std ;class ReaderWriterRPriority { private : mutex mtx; int count=0 ; semaphore rw; string mem="hello" ; public : ReaderWriterRPriority():rw(1 ){}; void read () { mtx.lock(); if (count++==0 ) rw.wait(); mtx.unlock(); cout <<mem<<endl ; mtx.lock(); if (--count==0 ) rw.signal(); mtx.unlock(); } void write (const string &s) { rw.wait(); mem = s; rw.signal(); } };
公平读写
公平读写锁,如果有写进程拿到了锁,后面的读就需要排队了
信号量flag,保证在写之前的读都可以读,在出现写之后,必须写了才能允许后面的读
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 class ReaderWriterFair { private : mutex mtx; semaphore flag; semaphore rw; int count=0 ; string mem="hello" ; public : ReaderWriterFair():rw(1 ), flag(1 ){}; void read () { flag.wait(); mtx.lock(); if (count++==0 ) rw.wait(); mtx.unlock(); flag.signal(); cout <<mem<<endl ; mtx.lock(); if (--count==0 ) rw.signal(); mtx.unlock(); } void write (const string &s) { flag.wait(); rw.wait(); mem = s; rw.signal(); flag.signal(); } };
写优先
写优先,需要在读优先的基础上添加:
一个信号量w,保证在有写进程的时候,阻塞读(实现了公平)
一个变量和一把锁,控制写进程数量,实现对w尽可能多的占有 (只要一直有写,就能一直阻塞读,实现了写优先)
一个flag信号量,禁止读进程排队,让写进程更有机会拿到w (削弱了读抢到w锁的机会)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 class ReaderWriterWPriority { private : mutex mtx, mtxWriter; semaphore flag; semaphore rw, w; int count=0 , countWriter=0 ; string mem="hello" ; public : ReaderWriterWPriority():rw(1 ), w(1 ), flag(1 ){}; void read () { flag.wait(); w.wait(); mtx.lock(); if (count++==0 ) rw.wait(); mtx.unlock(); w.signal(); flag.signal(); cout <<mem<<endl ; mtx.lock(); if (--count==0 ) rw.signal(); mtx.unlock(); } void write (const string &s) { mtxWriter.lock(); if (countWriter++==0 ) w.wait(); mtxWriter.unlock(); rw.wait(); mem = s; rw.signal(); mtxWriter.lock(); if (--countWriter==0 ) w.signal(); mtxWriter.unlock(); } };
多线程数据结构
unordered_map
网上没有找到比较好的关于c++多线程版本的unordered_map的介绍,这里整理一下Java中的HashMap、HashTable和ConcurrentHashMap的介绍。关于unordered_map的实现细节,参考《STL源码剖析》读书笔记 。
HashMap
HashMap是最简单的不支持多线程并发的。多线程下的HashMap使用put操作会引起死循环。
HashTable
HashTable是支持多线程并发的,但是效率较为低下,主要实现策略是将HashMap中部分方法转变成同步方法,相当于串行化了操作,且无论是读操作还是写操作他们都给整个集合上锁。
ConcurrentHashMap
HashTable效率低下的原因是全局数据竞争同一个锁,ConcurrentHashMap使用了锁分段,将数据分段加锁,提高并发执行效率。
一个ConcurrentHashMap中包含了一个Segment数组(第一层hash,同时也是加锁的对象),每个segment又包含一个HashEntry数组(第二层hash,类似于HashMap)
锁分段技术:每个segment的读写都是高度自治的,segment之间互不影响。
降低了锁住的数据范围之后,关心的重点就是并发操作同一个Segment时的控制
具体的Get和Put操作:
Get方法:
为输入的Key做Hash运算,得到hash值。
通过hash值,定位到对应的Segment对象
再次通过hash值,定位到Segment当中数组的具体位置。
Put方法:
为输入的Key做Hash运算,得到hash值。
通过hash值,定位到对应的Segment对象
获取可重入锁
再次通过hash值,定位到Segment当中数组的具体位置
插入或覆盖HashEntry对象
释放锁
size方法:
先按照乐观锁算一遍,不乐观在悲观算一遍
遍历所有的Segment。
把Segment的元素数量累加起来。
把Segment的修改次数累加起来。
判断所有Segment的总修改次数是否大于上一次的总修改次数。如果大于,说明统计过程中有修改,重新统计,尝试次数+1;如果不是。说明没有修改,统计结束。
如果尝试次数超过阈值,则对每一个Segment加锁,再重新统计。
再次判断所有Segment的总修改次数是否大于上一次的总修改次数。由于已经加锁,次数一定和上次相等。
释放锁,统计结束。