WRY

Where Are You?
You are on the brave land,
To experience, to remember...

0%

Cpp多线程初探

c++11改进的内容

锁相关的完整代码可在Github仓库中查看

互斥锁

C++提供互斥锁mutex,来实现多线程之间的互斥,成员函数如下:

方法名称 用途
(constructor) Construct mutex (public member function )
1. 返回一个没有被锁定的状态
2. 不可复制、拷贝、移动
lock Lock mutex (public member function )
1. 如果锁没有被其他线程占有,该线程将会拥有该锁,直到使用unlock进行释放
2. 如果锁被其他线程获取到了,将等待用有锁的线程unlock之后,才有可能获得
3. 同线程锁多次,会死锁
try_lock Lock mutex if not locked (public member function )
1. 若没有被占有,占有直到主动声明释放
2. 若被其他线程占有,返回false,不阻塞
3. 同线程锁多次,会死锁
unlock Unlock mutex (public member function )
1. 若线程占有锁,释放
2. 若没有占有锁,会发生未知的行为
native_handle Get native handle (public member function )
不知道是个啥

应用例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 编译 & 执行命令
// g++ -pthread -o main mutex.cpp && ./main
#include<mutex>
#include<thread>
#include<iostream>
using namespace std;

mutex mtx;

void print_block (int n, char c) {
this_thread::sleep_for(std::chrono::seconds(1));
mtx.lock();
// 互斥区
for (int i=0; i<n; ++i) { cout << c; }
cout << '\n';
mtx.unlock();
}

int main(void){
mutex lock;
// mutex l2 = lock; // 禁止赋值
// mutex l3 = move(lock); // 禁止移动

thread th1 (print_block, 50, '*'); // 创建并执行线程
thread th2 (print_block, 50, '$');

th1.join();
th2.join();

return 0;
}

条件变量

条件变量condition_variable实现了线程主动进入阻塞状态等待被其他线程进行唤醒。

A condition variable is an object able to block the calling thread until notified to resume. It uses a unique_lock (over a mutex) to lock the thread when one of its wait functions is called. The thread remains blocked until woken up by another thread that calls a notification function on the same condition_variable object. Objects of type condition_variable always use unique_lock<mutex> to wait: for an alternative that works with any kind of lockable type, see condition_variable_any

条件变量阻塞线程,直到被其他线程通知。

这里额外简单解释一下unique_lock

  • unique_lock是一个管理锁的对象
  • 在构造时期会让锁处于lock的阶段(需要通过如下的构造函数样式explicit unique_lock (mutex_type& m);
  • 在对象被析构的时候,自动的释放锁
  • 此外还提供了获取锁和释放锁的接口
方法名称 含义
(constructor) Construct condition_variable (public member function )
(destructor) Destroy condition_variable (public member function )
wait Wait until notified (public member function )
1. 阻塞当前线程
2. 自动调用lck.unlock(),允许其他线程继续执行
3. 当notified类型的函数被调用时,会执行lck.lock(),让线程继续执行
wait_for Wait for timeout or until notified (public member function )
1. 等待被唤醒或者倒计时结束唤醒
2. 返回一个被唤醒的原因,比如timeout
wait_until Wait until notified or time point (public member function )
1. 和wait_for类似,该方法会在到了某个绝对时间点的时候被唤醒
notify_one Notify one (public member function )
1. 如果有被阻塞的,随机选取一个进行唤醒
2. 如果没有被阻塞的,什么都不做
notify_all Notify all (public member function )
1. 如果有被阻塞的,唤醒全部
2. 如果没有被阻塞的,什么都不做

应用例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#include <iostream>           // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable

std::mutex mtx;
std::condition_variable cv;
bool ready = false;

void print_id (int id) {
std::unique_lock<std::mutex> lck(mtx); // 上锁
while (!ready) {
cv.wait(lck); // 在线程被阻塞之前,释放锁
std::cout<<".";
}
// ...
std::cout << std::endl << "thread " << id << '\n';
}

void go() {
std::unique_lock<std::mutex> lck(mtx);
ready = true;
cv.notify_all();
}

int main () {
std::thread threads[10];
// spawn 10 threads:
for (int i=0; i<10; ++i) threads[i] = std::thread(print_id, i);
std::cout << "10 threads ready to race...\n";
go(); // go
for (auto& th : threads) th.join();
return 0;
}

信号量

C++中只提供了互斥条件变量,但是没有信号量的开箱即用,可以使用锁加信号量来实现一个信号量,代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
using namespace std;

class semaphore{
private:
int count;
mutex mtk; // 控制count的互斥访问
condition_variable cv; // 实现主动阻塞和被动唤醒
public:
semaphore(int val=1):count(val){}
void wait(){ // 可以理解是申请资源
unique_lock<mutex> lck(mtk); // 上锁
if(--count<0) cv.wait(lck); // 在wait的时候就释放掉了锁
}
void signal(){ // 可以理解是释放资源
unique_lock<mutex> lck(mtk); // 上锁
if(++count<=0) cv.notify_one(); // 此处并没有释放锁
} //函数退出在执行对象lck的析构的时候,释放掉了锁
};

生产者-消费者模型

参考CSDN的博客,使用队列模拟生产消费过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#include"02-semaphore.hpp"
#include<queue>
#include<thread>
#include<iostream>
using namespace std;

class ProCon{
int count;
queue<int> que;
semaphore emptyBuffers, fullBuffers; // 空位和有数据位
mutex mtx;
public:
ProCon(int size = 1):emptyBuffers(size), fullBuffers(0), count(size){}
void push(int val){
emptyBuffers.wait(); // 占用一个空位资源
mtx.lock();
que.push(val);
cout<<"after push: queue's size: "<<que.size()<<endl;
mtx.unlock();
fullBuffers.signal(); // 生产一个可用资源
}
int pop(){
fullBuffers.wait(); // 占用一个有数据资源
mtx.lock();
int ans = que.front();
que.pop();
mtx.unlock();
emptyBuffers.signal();
return ans;
}
};

读者-写者模型

读者写者模型具体分为优先读优先写公平读写

优先读

因为读-读之间共享写锁,读-写和写-写之间是互斥的,所以最简单版本的实现就是读优先的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#include"02-semaphore.hpp"
#include<iostream>
#include<mutex>
#include<string>
using namespace std;

class ReaderWriterRPriority{ // 读者优先的读者写者模型
private:
mutex mtx; // 控制读者数量变量的访问
int count=0; // 读者数量
semaphore rw; // 读写互斥锁 读-读共享,读-写、写-写互斥
string mem="hello";
public:
ReaderWriterRPriority():rw(1){};
void read(){
mtx.lock();
if(count++==0) rw.wait(); // 读写互斥锁。
mtx.unlock();

cout<<mem<<endl;

mtx.lock();
if(--count==0) rw.signal();
mtx.unlock();
}
void write(const string &s){
rw.wait();
mem = s;
rw.signal();
}
};

公平读写

公平读写锁,如果有写进程拿到了锁,后面的读就需要排队了

  • 信号量flag,保证在写之前的读都可以读,在出现写之后,必须写了才能允许后面的读
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class ReaderWriterFair{ // 公平的读者写者模型
private:
mutex mtx; // 控制读者数量变量的访问
semaphore flag; // 控制读者-写者之间的公平竞争 类似与串行化了
semaphore rw; // 读写锁
int count=0; // 读者数量
string mem="hello";
public:
ReaderWriterFair():rw(1), flag(1){};
void read(){
flag.wait(); // 如果有写进程拿到了,锁,必须等待写进程写完才可读
mtx.lock();
if(count++==0) rw.wait(); // 占用内容的互斥
mtx.unlock();
flag.signal();

cout<<mem<<endl;

mtx.lock();
if(--count==0) rw.signal();
mtx.unlock();
}
void write(const string &s){
flag.wait(); // 阻塞后面的读进程因为已经有读进程而可以先读
rw.wait();
mem = s;
rw.signal();
flag.signal();
}
};

写优先

写优先,需要在读优先的基础上添加:

  • 一个信号量w,保证在有写进程的时候,阻塞读(实现了公平)
  • 一个变量和一把锁,控制写进程数量,实现对w尽可能多的占有 (只要一直有写,就能一直阻塞读,实现了写优先)
  • 一个flag信号量,禁止读进程排队,让写进程更有机会拿到w (削弱了读抢到w锁的机会)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class ReaderWriterWPriority{ // 写者优先的读者写者模型
private:
mutex mtx, mtxWriter; // 控制读者、写者数量变量的访问
semaphore flag; // 控制读者不能排队
semaphore rw, w; // 读写锁, 写锁
int count=0, countWriter=0; // 读者数量, 写者数量
string mem="hello";
public:
ReaderWriterWPriority():rw(1), w(1), flag(1){};
void read(){
flag.wait(); // 读队列上禁止排队, 为了让写进程有机会拿到s_write锁 读者一个一个的来,减少了获取到w锁的概率
w.wait(); // 用于被写进程阻塞
mtx.lock();
if(count++==0) rw.wait(); // 占用内容的互斥
mtx.unlock();
w.signal();
flag.signal();

cout<<mem<<endl;

mtx.lock(); // 释放资源
if(--count==0) rw.signal();
mtx.unlock();
}
void write(const string &s){
mtxWriter.lock();
if(countWriter++==0) w.wait(); // 写者优先锁 拿到之后会阻塞之后的读进程
mtxWriter.unlock();
// 修改内容
rw.wait();
mem = s;
rw.signal();
// 尝试释放写占领锁
mtxWriter.lock();
if(--countWriter==0) w.signal(); // 写者优先锁 拿到之后会阻塞之后的读进程
mtxWriter.unlock();
}
};

多线程数据结构

unordered_map

网上没有找到比较好的关于c++多线程版本的unordered_map的介绍,这里整理一下Java中的HashMapHashTableConcurrentHashMap的介绍。关于unordered_map的实现细节,参考《STL源码剖析》读书笔记

HashMap

HashMap是最简单的不支持多线程并发的。多线程下的HashMap使用put操作会引起死循环。

HashTable

HashTable是支持多线程并发的,但是效率较为低下,主要实现策略是将HashMap中部分方法转变成同步方法,相当于串行化了操作,且无论是读操作还是写操作他们都给整个集合上锁。

ConcurrentHashMap

HashTable效率低下的原因是全局数据竞争同一个锁,ConcurrentHashMap使用了锁分段,将数据分段加锁,提高并发执行效率。

一个ConcurrentHashMap中包含了一个Segment数组(第一层hash,同时也是加锁的对象),每个segment又包含一个HashEntry数组(第二层hash,类似于HashMap)

锁分段技术:每个segment的读写都是高度自治的,segment之间互不影响。

降低了锁住的数据范围之后,关心的重点就是并发操作同一个Segment时的控制

  • 同时写入:阻塞
  • 一写一读:并发 ??/

具体的Get和Put操作:

  • Get方法:

    1. 为输入的Key做Hash运算,得到hash值。
    2. 通过hash值,定位到对应的Segment对象
    3. 再次通过hash值,定位到Segment当中数组的具体位置。
  • Put方法:

    1. 为输入的Key做Hash运算,得到hash值。
    2. 通过hash值,定位到对应的Segment对象
    3. 获取可重入锁
    4. 再次通过hash值,定位到Segment当中数组的具体位置
    5. 插入或覆盖HashEntry对象
    6. 释放锁
  • size方法:

    先按照乐观锁算一遍,不乐观在悲观算一遍

    1. 遍历所有的Segment。
    2. 把Segment的元素数量累加起来。
    3. 把Segment的修改次数累加起来。
    4. 判断所有Segment的总修改次数是否大于上一次的总修改次数。如果大于,说明统计过程中有修改,重新统计,尝试次数+1;如果不是。说明没有修改,统计结束。
    5. 如果尝试次数超过阈值,则对每一个Segment加锁,再重新统计。
    6. 再次判断所有Segment的总修改次数是否大于上一次的总修改次数。由于已经加锁,次数一定和上次相等。
    7. 释放锁,统计结束。