资讯

精准传达 • 有效沟通

从品牌网站建设到网络营销策划,从策略到执行的一站式服务

C++并发编程(四)同步与异步-创新互联

目录

洛江ssl适用于网站、小程序/APP、API接口等需要进行数据传输应用场景,ssl证书未来市场广阔!成为成都创新互联公司的ssl证书销售渠道,可以享受市场价格4-6折优惠!如果有意向欢迎电话联系或者加微信:18982081108(备注:SSL证书合作)期待与您的合作!

线程等待

1.设置标志

2.使用C++标准库工具(优先使用)

线程安全的队列

等待一次性发生的事件

异步方式启动任务

future实例和任务的关联

多任务异步调度

线程间异步求值

future中的异常

多个线程同时等待

指定等待时限

C++中时间的表示方式

时长类

时间点

接收超时时限的函数

同步操作简化代码

CSP消息传递进行同步

后续风格并发 

后续函数的连锁调用

等待多个future

线程闩和线程卡

基本线程闩

线程卡 

总结


在并发操作中,除了保护线程间的共享数据,还需要令独立线程上的行为同步,一个线程往往需要等待另一线程的任务完成,才可以执行自己的任务。C++标准库专门为之提供了处理工具:条件变量(conditional variable)和future。

线程等待

假设甲线程需要等待乙线程完成任务才可以执行,可以采取几种不同的方式:

1.设置标志

在共享数据内部维护一个受互斥保护的标志,乙线程完成任务后,该标志成立,进而执行甲线程。

这种方式主要存在两个弊端:线程甲需要不断检查标志,浪费原本有用的处理时间;线程甲每次检查标志都要锁住互斥以施加保护,若此时乙恰好完成任务需要设置标志位,则无法及时对互斥加锁。

改进:我们可以让线程甲调用std::this_thread::sleep_for()函数,使线程休眠,降低线程甲检查标志的频率:

bool flag;
std::mutex m;
void wait_for_flag()
{
	std::unique_locklk(m);
	while (!flag)
	{
		lk.unlock();
		std::this_thread::sleep_for(srd::chrono::milliseconds(100));
		lk.lock();
	}
}
  1. 上述线程首先创建unique_lock对象进行灵活解锁,每轮检测标志位的循环中先解锁等待,后上锁。但是休眠时间很难找到合适的数量,过度休眠会使线程反应不及时导致延迟,休眠期太短则虚耗处理时间。
2.使用C++标准库工具(优先使用)

标准库提供了条件变量的两种实现:std::condition_variable和std::condition_variable_any,声明它们的头文件是。两者需要配合互斥提供同步操作。其中std::condition_variable仅限于配合std::mutex使用,而std::condition_variable_any只要符合成为互斥的最低标准即可,更加通用的代价是会产生额外的开销。下面例子展示了使用方法:

#includestd::mutex m;
std::queueq;
std::condition_variable cond;
void data_preprocess()//线程乙
{
	while (more_data_to_prepare)
	{
		const data = prepare_data();//线程乙准备数据
		{										//此处使用代码块,一旦lk锁离开代码块就会销毁解锁
			std::lock_guardlk(m);	//准备好后互斥加锁
			data_queue.push(data);				//压入数据
		}
		cond.notify_one();//成员函数通知线程甲
	}
}

void data_process()//线程甲
{
	while (true)
	{
		std::unique_locklk(m);//使用unique_lock上锁
		cond.wait(						//wait函数接收等待结束的条件,成立则返回,继续进行操作;否则解锁互斥,线程阻塞
			lk, [] {return !queue.empty()}
		);
		data x = queue.front();
		queue.pop();
		lk.unlock();//此处数据就绪,即便还未开始处理也要先释放锁,因为数据处理可能相当耗时
		process(x);
		if (is_lasy(x))break;
	}
}

注释对代码作简要说明,线程乙的内层花括号运用了C++的RAII过程处理手法,假设其不存在,则互斥锁lk在线程乙通知甲时,lk对象仍未销毁,互斥仍未解锁 ,甲线程仍需作无谓的等待。

wait()函数通过条件函数,判定线程是否应该阻塞,并接收条件变量(std::condition_variable)的唤醒(notify_one())。向wait()函数传入锁对象和函数或可调用对象,函数用于判断加锁的条件,不满足条件时,会解锁,线程进入阻塞状态。其它线程调用notify_one()唤醒本线程(解除阻塞),本线程重新对互斥上锁,查验条件。

这里线程甲中使用std::unique_lock,因为线程等待期间,必须解锁互斥,而等待结束后,必须重新加锁,std::guard_lock无法提供这种灵活性。

在wait()函数调用期间,条件变量可以多次检查给定条件,次数不受限制,且在查验期间互斥会被锁住,当条件成立时wait()才会返回。如果线程甲重新获得互斥,且未不是直接响应线程乙的通知,则称为为唤醒(spurious wake)。若判定函数有副作用(判定函数不是仅仅查验结果,而是附带其它操作),多次副作用会叠加,此时可以顺带提高线程优先级,使等待久的线程优先处理。

下面是一个简单的wait()函数实现:

templatevoid m_wait(std::unique_lock& lk, T pred)
{
	while (!pred())
	{
		lk.unlock();//检验后解锁
		lk.lock();//下次检验前加锁
	}
}
线程安全的队列

我们可以按照上述思路,把队列设计为一线程安全的类,无需在全局中声明互斥和条件变量:

#include#include#include#includetemplateclass thread_safe_queue
{
private:
	std::queueq;
	mutable std::mutex m;
	std::condition_variable cond;
public:
	//构造和复制构造
	thread_safe_queue() {}
	thread_safe_queue(const thread_safe_thread& other)
	{
		std::lock_guardlk(other.m);
		q = other.q;
	}

	void push(T data)
	{
		std::lock_guardlk(m);
		q.push(data);
		cond.notify_one();
	}

	void wait_pop(T& val)
	{
		std::unique_locklk(m);
		cond.wait(lk, [this] {return !q.empty(); });
		val = q.front();
		q.pop();
	}
	std::shared_ptrwait_pop()
	{
		std::unique_locklk(m);
		cond.wait(lk, [this] {return !q.empty(); });
		std::shared_ptrres = std::make_shared(q.front());
		q.pop();
		return res;
	}
	bool try_pop(T& val)
	{
		std::lock_guardlk(m);
		if (q.empty()) return false;
		val = q.front();
		q.pop();
		return true;
	}
	std::shared_ptrtry_pop()
	{
		std::lock_guardlk(m);
		if (q.empty()) return std::shared_ptr();
		std::shared_ptrres = std::make_shared(q.front());
		q.pop();
		return res;
	}

	bool empty() const
	{
		std::lock_guardlk(m);
		return q.empty();
	}
};

即便复制构造函数的形参是const,empty()被声明为const函数,但是其他线程有可能以非const形式引用队列容器对象,或调用某些成员函数改动数据成员,因此仍需锁定互斥。

如果把条件变量适用于多个线程等待同一个目标事件,多个线程因执行wait()而同时等待,notify_one()会调用触发的其中一个线程查验条件,但是并不能确定通知到哪个具体的线程。

还有一种情况是每个线程都需要对notify()作出响应,比如:共享数据的初始化,所有线程都需要等待初始化以处理同一份数据(这种情况可以使用第三章的std::call_once(flag, func)在所有线程中仅调用一次初始化共享数据);所有线程都需要等待共享数据的更新。此时我们可以std::notify_all()函数通知所有线程。

等待一次性发生的事件

若某个线程计划仅等待一次,条件成立一次后便不再需要等待(如判定某项数据是否可用),则刻使用future来模拟这类一次性事件。若线程需要等待某个一次性事件,则需要以恰当的方式获取得一个future,代表目标事件,该线程可以一边执行其他任务,一边在等待future,并以短暂的时间间隔检验目标事件是否发生,线程还可以切换到别的任务,必要时才回头等待future准备。一旦目标事件发,future就进入就绪状态,无法重置。

标准库头文件内有两种future:独占future(std::future<>)和共享future(std::shared_future<>),参考了std::unique_ptr和std::shared_ptr的设计:同一事件仅允许关联一个std::future实例,但可以关联多个std::shared_future,只要目标事件发生,且所有实例都将同时就绪。多个线程访问同一个future对象需要用互斥或其它同步方式保护。

C++并发规约在std::experimental命名空间中给出了上述类模板的扩展版本std::experimental::future<>和std::experimental::shared_future<>,它们有更多功能的成员函数。必须包含

异步方式启动任务

std::async(func, para)函数可以以异步方式启动任务,函数运行完成后,其值由返回的future持有,故我们可以从其返回值中获得future对象。在future对象上调用get(),当前线程就会阻塞,以便future准备妥当返回该值。

与std::thread()类似函数的第一个参数为任务函数的指针,若为某个类的成员函数,还需再给出对象指针/对象本身/std::ref()包装的对象,作为第二个参数;后续参数则为任务函数所需的参数。如果std::async()的参数是右值,通过移动原始参数构建副本。

#include#includeint  f0();
struct X
{
	void f1(int a, int b);
	int f2(int c);
};
X x;

std::futureanswer = std::async(f0);//调用f0
int ans = answer.get();					//获取返回数值

auto R1 = std::async(&X::f1, &x, 1, 2);	//x->f1(1, 2);
auto R2 = std::async(&X::f2, x, 3);		//tmp_x.f2(3);

struct Y
{
	double operator()(double);
};
Y y;
auto R3 = std::async(Y(), 3.14);		//Y()匿名对象移动构造tmp_y对象,执行tmp_y(3.14);
auto R4 = std::async(std::ref(y), 3.14);//y(3.14);

Y f3(Y& y);
auto R5 = std::async(f3, std::ref(y));//f3(y);非const引用参数,使用std::ref包装

默认情况下,std::async()的前置参数为std::lauch::deferred|std::launch::async,表示等待future时,的两种方式:1.在当前线程上延后调用,等future上调用了wait()或get()才开始执行函数;2.另外开启专属的线程运行任务函数,默认为自行选择。

future实例和任务的关联

std::packaged_task<>类模版把future和任务关联起来,该类模版对象在执行任务时会调用关联函数,把返回值保存为future的内部数据。std::packaged_task<>的模版参数是函数签名(比如int(std::string&, double*)等),类型支持隐式转换,不一定严格匹配(比如某函数接收float参数,返回int值,可以为其构建std::packaged_task)。

我们可以把庞杂的操作分解为多个任务,把它们分别包装为std::packaged_task<>对象,再传给任务调度器,隐藏各种复杂任务的细节,将其抽象化。

std::packaged_task<>有成员函数get_future(),它返回std::future<>实例,该future对象的特化类型取决于包装函数的返回值。

另外,std::packaged_task<>是可调用对象,可以将其包装在std::function内,或传递给std::thread。

多任务异步调度

一般的应用会给图形用户界设立专门的线程,若其它线程需要更新界面,则会向GUI线程发送消息,该模式可以用std::packaged_task<>实现,面下面是图形用户界面框架例子

#include#include#include#include#includestd::mutex m;
std::deque>tasks;
std::condition_variable cond;
bool gui_shutdown_message_received();
void get_gui_message();

void gui_thread()
{
	while (!gui_shutdown_message_received())
	{
		get_gui_message();
		std::packaged_tasktask;//声明std::packaged_task对象
		{
			//std::lock_guardlk(m);
			//if (tasks.empty()) continue;
			std::unique_locklk(m);
			cond.wait(lk, [] {return !tasks.empty(); });//线程等待非空

			task = std::move(tasks.front());//从栈中取出任务
			tasks.pop_front();
		}
		task();//运行任务
	}
}
std::thread gui(gui_thread);//GUI线程

std::futurepush_task_for_gui(Func f)//需要包装的任务为f,返回值是void(因为返回的future是void类型)
{
	std::packaged_tasktask(f);//函数f包装到std::packaged_task对象
	std::futureres = task.get_future();//取得与task关联的future对象
	std::lock_guardlk(m);
	tasks.push_back(task);//任务放入队列
	cond.notify_one();
	return res;
}

假如我们需要判断传入push_task_for_gui的任务是否完成,只要等待future就绪即可,否则关联的future会被丢弃。等待future就绪的过程中可以进行其它任务。 

线程间异步求值

有些任务不能用简单的函数调用表达,或某个任务的结果来自多个部分的代码。如:某个应用需要处理大量网络连接,如果给每个连接一个单一的线程,则当连接数较多时,线程数也会过多,超出硬件支持的并发数,导致性能下降,故只能一个线程处理多个连接,每个连接都异步处理,这时我们需要创建std::promise。std::promise同样与std::future相关联,等待数据的线程在future上阻塞,提供数据的线程利用相配的promise设定关联值,使future准备就绪。

若要从std::promise获取std::future对象,调用成员函数get_future()即可。promise的值通过成员函数set_value()显式设置,设置好后future准备就绪。若销毁时仍未设置,则保存的数据由异常替代。

以下例子展示了利用多个promise处理在单个线程中的多个连接:

#includevoid process_connections(connection_set& connections)
{
	while (!done(connections))
	{
		for (connection::iterator it = connections.begin(); it != connections.end(); ++it)
		{
			//传入数据与promise关联的是有效载荷payload
			if (connections->has_imcomming_data())
			{
				//Data类包括数据成员id和payload
				Data data = it->incoming();//从某个连接获取接收的数据
				std::promise& p = it->get_promise(data.id);//获取该连接接收的数据,数据的id与std::promise对象一一对应
				p.set_value(data.payload);
			}
			
			//传出数据与promise关联的是成败的标志
			if (connections->has_outgoing_data())
			{
				Data data = it->outgoing_data();
				connections->send(data.payload);//发出数据
				data.promise.set_value(true);
			}
		}
	}
}

其中载荷指的是除去用于网络传输控制的数据头以外,实际传送的信息本体。 

future中的异常

若由经过std::async()调用的函数抛出异常,则会被保存到future中,代替本应该设定的值,future随即进入就绪状态,等待get()被调用,存储在future中的异常会被重新抛出。同理,std::packaged_task对象也是如此。std::promise也有同样的功能,假如我们不保存值,想要保存异常,则调用成员函数set_exception(),该函数通常用于其catch块中,捕获异常并装填promise:

extern std::promisep;
try()
{
	p.set_value(calculate());
}
catch (...)
{
	p.set_exception(std::current_exception());//此处的std::current_exception用于捕获抛出的异常
}

此外,我们还能用std::make_exception_ptr()直接保存新异常,而不出发抛出行为:

p.set_exception(std::make_exception_ptr(std::logic_error("ERROR!")));

在我们能够预知异常类型的情况下,这种替代方案简化了代码的同时,也有利于编译器优化,优先采用。

如果我们不调用promise的set_value()成员函数,也不执行包装的任务,直接销毁与future相关联的std::promise对象或std::packaged_task对象。如果关联的future未能准备就绪,无论销毁哪一个,其析构函数都会将异常std::future_error存储为异步任务的状态数据,值是std::future_error::broken_promise(std::future_error是一个枚举类,broken_promise是其中一个枚举量)。因为我们一旦创建future对象,就许诺按异步方式给出值或异常,若可以销毁来源,许诺会被破坏。若future没有传入任何数据,则线程永远等不到结果。

多个线程同时等待

如果我们在多个线程上访问同一个std::future对象,而不采取额外的同步措施,将引发数据竞争导致未定义行为,因为std::future模拟了对异步结果的独占行为,第一次调用get()会进行移动操作,故只有一个线程能获取目标值。这时,我们就需要std::shared_future,不同于std::future只能移动构造或移动赋值,它能复制出副本,因此多个对象可以指向同一异步任务的状态数据。

在多个线程访问一对象时,向每个线程传递std::shared_future对象的副本,作为线程的局部变量,由标准库正确地同步,可以安全访问,只要它们通过自有的std::shared_future对象读取状态数据,则访问行为安全。

future和promise都具备成员函数valid()用于判别异步状态是否有效。

std::shared_future的实例依据std::future的实例构造,前者指向的异步数据由后者决定,由于std::future对象独占异步状态,其归属权不为其他任何对象所获得,所以须向构造函数传递std::move(std::future)参数,转移归属权:

std::promisep;
std::futuref(p.get_future());
assert(f.valid());			//此时f有效
std::shared_futuresf(std::move(f));
assert(f.valid());			//f无效,归属权转移
assert(sf.valid());			//sf开始生效

//隐式归属权的转移 std::future转std::shared_future
std::promisepz;
std::shared_futuresfz(pz.get_future());

//使用share()创建新的std::shared_future对象,std::promise可以隐式推导future的类型
std::promise>ps;
auto sfs = ps.get_future().share();
指定等待时限

前面的方法中,只要所等待的事件还未发生,线程就会一直阻塞等待,但是在某些情况下,我们想要限制等待时长,这样我们才有可能以某种形式发送消息,用户可以选择等待或“取消”。

有两种超时机制:一是延迟超时,线程根据等待的时长继续等待;二是绝对超时,等待直到某时间点来临。延迟超时的函数变体以“_for”为后缀,绝对超时的函数以“_until”为后缀。

例如,std::condition_variable含有wait_for()和wait_unitl(),分别对应两个wait的重载,其参数类型为时长类,分别用于处理迟延超时(时间段)和绝对超时(时间点)。

C++中时间的表示方式

时钟类及时间工具在标准库的头文件中定义,系统时钟类为std::chrono::system_clock,若要获时钟类的取当前时刻调用静态成员函数now()即可,每个时钟类都有名为time_point的成员类型,它是该时钟类的自由时间点类。所以,std::chrono::system_clock::now()的返回类型为std::chrono::system_clock::time_point。

时钟类的计时单元是单位时间内时钟的计时次数,属于名为period的成员类型,以每秒计数的次数的分数形式进行表示,如:每秒计数25次,它的计时单元为std::ratio<1,25>;若时钟每间隔2.5秒计数1次,则为std::ratio<5,2>。

若计时速率恒定,则称为恒稳时钟,可以用时钟类的静态成员is_steady判断(恒稳时钟为true)。std::chrono::system_clock不是恒稳时钟,因为它可以调整以消除本地系统时钟的偏差,但也可能导致调用两次now(),后返回的时间可能早于前一个。C++提供了恒稳时钟类std::chrono::steady_clock。高精度时间类std::chrono::high_resolution_clock具备可能实现的最短计时单位。

时长类

标准库中最简单的时间部件std::chrono::duration<>具有两个模版参数,前者指名采用何种类型表示计时单元的数量,后者是分数,其值是每一个计时单元的秒数(与前文一致)。例如:采用double值计数的毫秒时长类型是:std::chrono::duration>,因为1毫秒是1/1000秒,计时单元的数量通过成员函数count()获取。

标准库在std::chrono命名空间中定义了各种时长类型的声明,如:nanoseconds,microseconds,milliseconds,seconds,minutes,hours。

C++14引入了命名空间std::chrono_liters,预定义了一些字面意义的后缀运算符(literal suffix operator),能够简化时长代码:

#includeusing namespace std::chrono_literals;
auto one_day = 24h;
auto half_an_hour = 30min;
auto interval_time = 30120ms;
//std::chrono::duration_cast进行显示转换
std::chrono::seconds s = std::chrono::duration_cast(interval_time);

与整数一起使用,相当于由typedef预设的时长类,比如:30ms相当于std::chrono::milliseconds(30)。还可以通过std::chrono::duration_cast<>进行显示的单位转换,上述例子中转换的结果会被截断为30s。

另外,时长支持加减乘除,5*seconds(1)=seconds(5)。

在线程等待中,所有等待函数都会返回一个状态值,表明目标事件超时或已发生。我们可以借助future进行等待,一旦超时,函数就返回std::future_status::timeout;如果准备,返回std::future_status::ready,任务被延后,则返回std::future_status::deferred。

std::futuref = std::async(task);
if (f.wait_for(std::chrono::milliseconds(30))) == std::future_status::ready)
	do_something_with(f.get());

延迟超时采用标准库内部的恒稳时钟,即便系统时间变化,等待的时间也不会变。

时间点

时间点由类模版std::chrono::time_point<>实例表示,第一个参数表示参考的时钟,第二个参数表示计时单元。若存在时钟类C,其内部应该含有typedef std::chrono::time_pointtime_point,其中time_point为C自身的成员类型,Duration是时长std::chrono::duration的某种特化,也是C的成员类型。

时钟纪元。是一个基础特性,典型的有1970年1月1日,或计算机启动的时刻,可以用time_since_epoch()表示从时钟纪元到给时间点的时长。

若时钟类C1和C2都参考同一个时间纪元,C1的time_point成员类型还可以是typedef std::chrono::time_pointtime_point。

我们可以用std::chrono::time_point指定一个时间点。

时间点可以与时长类相加减,得到新的时间点(std::chrono::time_point<>+ std::chrono::duration)。若两个时间点共享一个时钟,也可以相减得到时长:

auto start = std::chrono::high_resolution_clock::now();
...
auto end = std::chrono::high_resolution_clock::now();
double duration = std::chrono::duration(end - start).count();

使用条件变量的wait_util()设置等待时间500ms的绝对超时:

#include#include#includestd::condition_variable cv;
bool done;
std::mutex m;

bool wait_loop()
{
	auto const timeout = std::chrono::steady_clock::now() + std::chrono::milliseconds(500);
	std::unique_locklk(m);
	while (!done)
	{
		if (cv.wait_until(lk, timeout) == std::cv_status::timeout)
			break;
	}
	return done;
}

std::cv_status::timeout标志位判断是否超过某时间点。如果此处使用wiat_for(),那么在等待时间快结束时发生伪唤醒,如果再次等待则需要重新等待新的迟延时间段。

接收超时时限的函数

有时候我们需要推迟线程处理的过程,std::this_thread::sleep_for()std::this_thread::sleep_until()可以满足这个需求,与其它函数一样,_for用于指定时长,_until()用于指定唤醒线程的特定时间点。

我们在给互斥加锁的时候,也可以设定时限,std::timed_mutex和std::recursive_timed_mutex都有成员函数try_lock_for和try_lock_until(),前者尝试在特定时长内上锁,后者尝试在特定时间点之前上锁。 

C++标准库中接受time_point和duration的函数
类/命名空间函数返回值
std::this_thread

sleep_for(duration)

sleep_until(time_point)

std::condition_varilabe

std::condition_variable_any

wait_for(lock,duration)

wait_until(lock,time_point)

std::cv::status::timeout

std::cv::status::no_timeout

std::timed_mutex

std::recursive_timed_mutex

try_lock_for(duration)

try_lock_until(time_point)

bool

true:获取了锁

false:未获取锁

std::shared_timed_mutex

try_lock_shared_for(duration)

try_lock_shared_until(time_point)

bool

true:获取了锁

false:未获取锁

std::unique_lock

std::shared_lock

TimedLockable>

try_lock_for(duration)

try_lock_until(time_point)

bool(在新构建的对象上未获取锁)

true:获取了锁

false:未获取锁

std::future<>/std::shared_future

wait_for(duration)

wait_until(time_point)

等待超时:

std::future_status::timeout

已就绪:

std::future_status::ready

推迟方式执行,尚未执行:

std::future_status::deferred

同步操作简化代码

线程间可以不显示地访问共享数据,由个任务分别预先准备好各自所需数据,借助future将结果发送到其他所需线程。

首先以链表元素的快速排序为例,以下串行方法:

#includetemplatestd::listquick_sort(std::listinput)
{
	if (input.empty()) return input;
	std::listresult;
	result.splice(result.begin(), input, input.begin());//把input的首位元素放入result
	const T& pivot = *result.begin();//记录基准元素
	auto divide_point = std::partion(input.begin(), input.end(), [&](const T& i) {return i< pivot; });//整理并返回分割点(迭代器指向大组开头的元素)
	std::listlower_part;
	lower_part.splice(lower_part.end(), input, input.begin(), divide_point);//截取小于基准元素的部分作为lower_part
	//递归排序
	auto new_lower = quick_sort(std::move(lower_part));
	auto new_higher = quick_sort(std::move(input));
	//归并
	result.splice(result.end(), new_higher);
	result.splice(result.begin(), new_lower);
	return result;
}
	

结合future和函数式编程,并行进行快速排序:

函数式编程(functional programming)是指一种编程风格,函数调用的结果完全取决于参数,而不依赖任何外部状态。(源自数学概念中的函数)

#include#includetemplatestd::listparallel_quick_sort(std::listinput)
{
	if (input.empty()) return input;
	std::listresult;
	result.splice(result.begin(), input, input.begin());//把input的首位元素放入result
	const T& pivot = *result.begin();//记录基准元素
	auto divide_point = std::partion(input.begin(), input.end(), [&](const T& i) {return i< pivot; });//分割点(迭代器指向大组开头的元素)
	std::listlower_part;
	lower_part.splice(lower_part.end(), input, input.begin(), divide_point);//截取小于基准元素的部分作为lower_part
	//递归排序
	std::future>new_lower = std::async(std::parallel_quick_sort, std::move(lower_part));
	auto new_higher = parallel_quick_sort(std::move(input));
	//归并
	result.splice(result.end(), new_higher);
	result.splice(result.begin(), new_lower.get());//等待后台任务完成,返回future,get()移动返回右值引用
	return result;
}

大的变化是,链表的lower部分不再由当前线程执行,通过std::async在另一线程上工作,我们递归n层,就会有n^2个线程同时运行,一旦线程数超出了可调配资源量,就有可能转为同步方式,假如向其他线程传递任务无助于提升性能,则负责调用get()的线程会亲自执行任务。 

CSP消息传递进行同步

除了函数式编程,通信式串行编程(Communicating Sequential Process, CSP)同样可以摆脱共享的可变数据。CSP线程相互完全隔离,没有共享数据,采用通信管道传递消息。因此,每个CSP线程实际上与状态机(state machine)等效,从原始状态起步,只要接收到消息就按某种方式更新自身状态,或向其他CSP线程发送消息。

下面是自动柜员机状态机的简单例子,线程可以处在各个状态,每个状态等待着可以接收的消息,一旦送达消息,线程便接收并处理,还可以转移到新状态,“等待-处理-转移”不断循环进行:

struct card_inserted
{
	std::string account;
};

class atm
{
	messaging::receiver incomming;
	messaging::sender bank;
	messaging::sender interface_hard_ware;
	void (atm::* state)();
	std::string account;
	std::string pin;

	void waiting_for_card()
	{
		interface_hardware.send(display_enter_card());//硬件显示请插卡
		incomming.wait().//等待处理回复
			handle([&](const card_inserted& msg)//连锁调用handle()只接收特定类型的消息,类型不符则继续等待
				account = msg.account;//lamda函数获取帐号信息
				pin = "";
				interface_hardware.send(display_enter_pin());//提示顾客输入密码
				state = &atm::getting_pin;//状态变更为“获取密码”
				);
	}
	void getting_pin();
public:
	void run()
	{
		state = &atm::waiting_for_card;//初始状态设置为等待插入卡
		try
		{
			for (;;)//循环调用各种状态函数
			{
				(this->*state)();
			}
		}
		catch (messaging::close_queue const&)
		{
		}
	}
};

下面是getting_pin的简单实现,wait()之后有三个handle()形成连锁调用链,每个handle()调用自身设定的消息类型,并接收一个lambda函数 :

//获取密码需要处理三种信息
void atm::getting_pin() {
	incomming.wait()
		.handle(
			[&](const digit_pressed& msg)//输入密码数字
			{
				const unsigned pin_length = 4;
				pin += msg.digit;
				if (pin.length() == pin_length)
				{
					bank.send(verify_pin(account, pin, incomming));
					state = &atm::verifying_pin;
				}
			}
		)
		.handle(
				[&](const clear_last_pressed& msg)//删除最后一个输入
			{
				if (!pin.empty())
				{
					pin.resize(pin.length() - 1);
				}
			}
		)
		.handle(
				[&](const cancle_pressed& msg)//取消
			{
				state = &atm::done_processing;
			}
		);
}

这种方式允许我们无须顾虑同步和并发的问题,仅专注于收发的信息。

后续部分为future扩展功能:

后续风格并发 

并发规约技术还提供了future扩展版本,核心是指定后续(continuation)操作,future准备就绪就会运行后续函数。等待future就绪的方法有两种:

1、wait_for()和wait_until()设置时限,

2、调用成员函数wiat(),这可能引发完全阻塞。

而std::experimental::future的成员函数then(),能在future就绪后执行then()内的函数,并把准备就绪的future传入函数作为参数,因此我们无法向后续函数传递参数。

下列代码作出示范:

#include//由于fut1的类型,find_the_question函数参数必须是std::experimental::futurestd::string find_the_question(std::experimental::futurethe_answer);
std::futurefind_the_answer;
auto fut1 = find_the_answer();
auto fut2 = fut1.then(find_the_question);
assert(!fut1.valid());
assert(!fut2.valid());
后续函数的连锁调用

设想当用户登录应用程序时,我们需要向后端服务器发送信息以验证身份;完成身份验证后,我们需再次向后端服务器请求其账户信息;最后一旦取得了信息,就更新并显示呈现。为了让主线程抽身执行其他任务,我们按异步的方式执行这些任务。

std::futureprocess_login(const std::string& username, const std::string& password)
{
	return std::async(std::launch::async, [=]()
		{
			try {
				const user_id id = backend.authenticate_user(username, password);//验证身份
				const user_data info_to_display = backend.request_current_info(id);//请求账户信息
				update_display(info_to_display);//显示用户信息
			}
			catch (std::exception& e)
			{
				display_error(e);
			}
		}
	);
}

这个线程因承担过多任务而发生阻塞,因而我们需要按照后续函数的方式,将任务接合,形成调用链:

std::experimental::futureprocess_login(const std::string& username, const std::string& password)
{
    return  backend.authenticate_user(username, password).
    then([](std::experimental::futureid) {
		return  backend.request_current_info(id.get());
	}).then([](std::experimental::futureinfo_to_display) {
		try {
	    		update_display(info_to_display.get());
			}
			catch (std::exception& e)
			{
				display_error(e);
			}
		}
	);
};

上述例子中,每个函数都接收std::experimental::future对象作为唯一的参数,然后get()获取其值。

针对涉及服务器后端的函数调用,我们需要让其返回future对象。

倘若调用链中任何后续函数抛出了异常,异常会沿着调用链向外传递,在末尾函数中,经info_to_display.get()调用抛出,catch块能集中处理全部异常。

std::experimental::shared_future可以具有多个后续,由于该类允许多个对象指向同一个共享状态,若仅允许一个后续,两个线程之间边引发条件竞争。

等待多个future

若某线程需要等待多个future就绪后,再进行后续操作,此时我们可以采用std::experimental::when_all(),我们向该函数传入一系列需要等待的future,全部就绪后,由它返回一个总领的future:

std::experimental::futureprocess_data(std::vector& vec)
{
	const int size = whatever;
	std::vector>results;//记录结果future的数组
	for (auto begin = vec.begin(), end = vec.end(); beg != end;)
	{
		const int remaining_size = end - begin;
		const int chunk_size = std::min(remaining_size, chunk_size);
		results.push_back(
			std::async(process_chunk, begin + chunk_size);//按块的大小处理异步每个块
		);
		begin += chunk_size;
	}
	return std::experimental::when_all(results.begin(), results.end())//生成新future
		.then([](std::future>>ready_results)//使用then编排后续函数
		{
			std::vector>all_results = ready_results.get();//获取vector
			std::vectorv;
			v.reserve(all_results.size());
			for (auto& f : all_results)
			{
				v.push_back(f.get());//各future上调用get()不会引发阻塞
			}
			return gather_results(v);
		});
}

与when_all()对应的有std::experimental::when_any(),针对给定的多个future,只要有一个就绪,新总领future便随之就绪。

std::experimental::when_any()的返回参数为类型为std::experimental::future>,增加了std::experimental::when_any_result<>结构,包含全体future序列futures以及就绪future的索引index。when_any_result的类模版定义如下:

templatestruct when_any_result {
	size_t index;
	Sequence futures;
};
//获取就绪future的数据实例
futures[index].get();

when_all()和when_any()都接收一对迭代器作为参数,代表容器范围的开头和结尾,两个函数都有可变参数的重载形式,接收多个future直接作为参数,when_all持有tuple而when_any持有when_any_result实例:

std::experimental::futuref1 = std::async(func1);
std::experimental::futuref2 = std::async(func2);
std::experimental::futuref3 = std::async(func3);

std::experimental::future<
	std::tuple<
	std::experimental::future,
	std::experimental::future,
	std::experimental::future>>result =
std::expertmental::when_all(std::move(f1), std::move(f2), std::move(f3));
线程闩和线程卡

线程闩(latch)是一个同步对象,内含计数器,当计数器的值减到0,就会进入就绪状态,只要就绪就会一直保持状态,知道被销毁。

线程卡(barrier)则是可以重复使用的同步构件,针对给定的线程,在他们之间进行同步。

在一个同步周期内,在同一个线程中,线程闩可以多次使用,线程卡则只能用一次。

基本线程闩

std::experimental::latch由头文件定义。构造函数接收唯一一个参数,用于设置计数器的初始值。

当等待的目标事件发生时,就调用count_down(),使计数器减持;

当计数器减到0,就进入就绪态,可用is_ready()检查是否就绪。

我们要使计数器减持,同时要等待就绪,可调用count_ready_and_wait()。

#includevoid foo() {
	const unsigned int thread_count = 5;
	std::experimental::latch done(thread_count);//创建线程闩
	my_data data[thread_count];
	std::vecotr>threads;
	for (unsigned int i = 0; i< thread_count; ++i)
	{
		threads.push_back(std::async(std::launch::async, [&, i] {//发起线程,并把对应future放入容器
			data[i] = make_data(i);
			done.count_down();//线程内计数
				...;//后续处理
			}));
	}
	done.wait();//等待全部数据处理完成
	process_data(data);//处理整体数据
}//threads中的std::future析构,全部线程结束运行
线程卡 

头文件内定义了两种线程卡std::experimental::barrier和std::experimental::flex_barrier。flex相对灵活,开销也更大。

一组线程协同处理某些数据,各线程相互独立处理,操作过程不必同步,在全部线程完成各自的处理后,进行后续处理,此时,我们可以使用线程卡等待线程完成自身任务,调用arrive_and_wait()等待同步组(synchronization group)的其它线程。

与线程闩保持就绪状态不同,线程卡可以重复使用,在组内最后一个线程运行到线程卡后,所有线程都会被释放,线程卡会自我重置,另外线程卡仅阻拦同步组内的线程。可以调用arrive_and_drop()显式地脱离同步组,脱离后线程不会再被阻拦,因此下一个同步周期线程组线程数-1。

下面是线程卡简单的应用,为了充分利用硬件的并行能力,把数据切分为许多数据块进行处理:

std::vectordivide_into_chunks(data_block data, unsigned int num_threads);

void process_data(data_source& source, data_sink& sink) {
	const unsigned concurrency = std::thread::hardware_concurrency();
	const unsigned num_threads = (concurrency >0) ? concurrency : 2;

	std::experimental::barrier sync(num_threads);//初始化线程卡,线程数为num_threads
	std::vectorthreads(num_threads);//C++20的jthread会在析构前自动汇合
	std::vectorchunks;

	result_block result;
	for (unsigned int i = 0; i< num_threads; ++i)
	{
		threads[i] = std::jthread([&, i] {//发起多个线程,每个线程处理一块区数据
				while (!source.done())
				{
					if (!i) {//0号线程分割数据成块
						data_block current_block = source.get_next_data_block();
						chunks = divide_into_chunks(current_block, num_threads);
					}
					sync.arrive_and_wait();//所有其它线程等待上面0号线程的切分完成
					result.set_chunk(i, num_threads, process(chunks[i]));//并行执行区域,操作result
					sync.arrive_and_wait();//等待所有线程处理完成
					if (!i)
					{
						sink.write_data(std::move(result));//0号线程写入结果到sink
					}
				}
			}
		);
	}
}

std::experimental::flex_barrier类的接口与barrier仅在于前者的构造函数除了接收线程数目,还能额外接收补全函数(complerion function)只要全部线程都运行到线程卡处,该函数就会在其中一个线程上运行。其返回值可以设定下个周期的线程数目,-1表示不变,0或正数则作为下次的线程数。

以下例子利用flex_barrier简化了主循环部分,仅需一个同步点(arrive_and_wait()):

void process_data(data_source& source, data_sink& sink) {
	const unsigned concurrency = std::thread::hardware_concurrency();
	const unsigned num_threads = (concurrency >0) ? concurrency : 2;

	std::vectorthreads(num_threads);//C++20的jthread会在析构前自动汇合
	std::vectorchunks;

	auto split_source = [&] {//分块函数
		if (!source.done())
		{
			data_block current_block = source.get_next_data_block();
			chunks = divide_into_chunks(current_block, num_threads);
		}
	};

	result_block result;

	std::experimental::flex_barrier sync(num_threads, [&]
		{
			sink.write_data(std::move(result)); 
			split_source();//由于补全函数在随机线程运行,下一轮循环需要再调用分块函数
			return -1;//返回值是-1则在下一周期线程数不变,0或正数表示下一步周期的线程数目。
		});

	for (unsigned int i = 0; i< num_threads; ++i)
	{
		threads[i] = std::jthread([&, i] {//发起多个线程,每个线程处理一块区数据
			while (!source.done())
			{
				result.set_chunk(i, num_threads, process(chunks[i]));//并行执行区域,操作result
				sync.arrive_and_wait();//等待所有线程处理完成,并运行补全函数
			}
			}
		);
	}
}
总结

条件变量std::condition_variable,成员函数wait(lock, func)用于查验条件,根据条件使线程进行阻塞等待或返回执行,成员函数notify_one()用于在准备完毕后唤醒其中一个wait()的阻塞线程。notify_all()函数则通知所有线程。

异步目标事件的类模版std::future<>和std::shared_future<>,get()函数获取返回值。std::async(func,para)异步方式启动目标函数,返回future对象,可选延迟std::lauch::deferred或异步std::launch::async模式。

std::packaged_task<>类模版,可用于多任务管理,模版参数为函数签名,使用函数进行实例化,get_future()成员函数返回与所包装函数返回值相关联的std::future实例。

std::promise与std::future相关联,可以在一个线程t1中使用set_value()函数,保存一个类型T的值;在线程t2中调用对应promise对象的get_future()函数,获取关联的std::future对象,进而调用std::future::get()函数其获取值。

std::ratio计时单位,每秒等于多少个计时单位,时间点std::time_point,时长类std::duration。预设时长如:std::chrono::milliseconds(30)。时长和时间点支持加减运算,时长支持乘除运算,使用同一参考时钟clock的时间点可以相减得到时长,如:

auto const timeout = std::chrono::steady_clock::now() + std::chrono::milliseconds(500);

std::condition_variable::wait_until(lk, time_point)和std::condition_variable::wait_for(lk, duration)设置等待时长。返回标志位std::cv_status::timeout/std::cv::status::no_timeout。

线程休眠std::this_thread::sleep_for(duration)/std::this_thread::sleep_until(time_point)。

experimental内容:

then(function)进行后续函数调用,函数接收的参数为上一个函数的返回值的类型。

std::experimental::when_all()等待全部线程完成,std::experimental::when_any()等待其中一个,函数接收存放future的容器的两个迭代器,表示范围内future会被等待,返回future包装的容器,即分别返回std::experimental::future<容器>和std::experimental::future>>。

std::experimental::latch(num_threads)线程闩,用线程数进行构造,在线程总调用count_down()成员函数减少计数,主线程中wait()等待所有线程完成处理(计数减少至0),进入就绪状态,is_ready()判断就绪。

std::experimental::barrier线程卡,用线程数进行构造,线程内部arrive_and_wait()函数等待同步组内所有线程运行至线程卡,再进行后续任务。arrive_and_drop()成员函数使得线程脱离同步组。flex_barrier(num, function)还能接收后续函数,函数返回值调整下轮线程数,-1代表线程数不变,>0则表示新的线程数。

你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧


网站标题:C++并发编程(四)同步与异步-创新互联
URL分享:http://cdkjz.cn/article/doppoj.html
多年建站经验

多一份参考,总有益处

联系快上网,免费获得专属《策划方案》及报价

咨询相关问题或预约面谈,可以通过以下方式与我们联系

业务热线:400-028-6601 / 大客户专线   成都:13518219792   座机:028-86922220