diff --git a/14-thread-pool-and-work-thread.md b/14-thread-pool-and-work-thread.md new file mode 100644 index 0000000000000000000000000000000000000000..8e6370772d471716025d24d939e57e60517bc075 --- /dev/null +++ b/14-thread-pool-and-work-thread.md @@ -0,0 +1,86 @@ +# 线程池与工作线程 +由于cpp-tbox是基于事件驱动的编程模型,对于事件的处理要求不能阻塞。然而实际的工作中,多多少少会遇到一些需要阻塞的事务。比如说:大运算、调用第三方库的阻塞性接口等。 +这时,就得需要借助`ThreadPool`与`WorkThread`来解决问题。 + +## ThreadPool +所谓线程池,就是预先创建指定个数的线程,让他们等待队列传入的任务。当队列中有新的任务后,其中一个线程就能抢到任务,并开始执行。如果同时来多个任务,那么这些空闲的线程便从逐一从队列中领取任务。工作线程在执行完任务后,会继续检查任务队列中是否还有其它的任务。如果有,会继续领取任务并执行。否则,观察当前任务数是否大于预设个数,如果是则结束线程,不是则处理空闲等待状态。 +有了线程池,我们在开发中就不需要临时为阻塞性的函数调用创建线程,而是很方便地将这些阻塞性的函数调用委托给线程池,由线程池中的工作线程去执行。 + +### 基础用法 +本节通过一个简单的示例来引导大家掌握`ThreadPool`的基础使用方法。 + +![](images/048-thread-pool-simple-code.png) + +L21,在`onStart()`函数中直接使用`ctx().thread_pool()->execute(...)`委派任务给线程池执行。这是因为在tbox.main框架中,ctx 自带一个线程池对象。我们只需要通过`ctx().thread_pool()`即可获取并使用。 +L22~27,在工作线程中打印日志并执行`Fibonacci()`函数模拟CPU密集的运算。 + +执行结果: +![](images/049-thread-pool-simple-run.png) + +注意观察上面日志中的线程ID,9983是Loop线程,9984是工作线程,9985是监控线程。 +(2)与(3)都是在工作线程中打印的。 +在最后黄框标记为线程池任务执行的耗时报告:"cost 201 + 722190",表示从任务派发到执行的等待时长为201us,而任务的执行花了722190us。 + +[示例工程目录](examples/15-thread-pool-simple) + +### 进阶用法 +上面对线程池的使用仅仅是让工作线程做了一件事情,不需要后续的处理。然而实际业务中往往不是这样的,比如:解析请求 --> 写数据库 --> 回复;其中"写数据库"这步操作是阻塞的。我们需要在让线程池写完数据后之后再在Loop线程中执行回复的动作。 +这就引入`execute()`的第二个参数: +``` +TaskToken execute(NonReturnFunc &&backend_task, NonReturnFunc &&main_cb, int prio); +``` +这个参数传入的是一个函数对象,即工作线程执行完后,由Loop线程执行的后续处理。 + +接下来,我用另一个较为复杂一点的示例向大家展示它的使用场景: +![](images/050-thread-pool-advance-code.png) + +这个示例也是让程序计算`Fibonacci()`。不同的是,让它一次性计算多个数值,而且Loop线程要在所有的计算都完成之后打印计算结果。 +实现的方式大致为:Loop线程向线程池委派计算任务,在每一个工作线程完成了任务之后,让Loop线程记录计算结果,检查是否所有的计算都已完成。如果已完成,Loop线程打印所有的结果。 + +具体的实现: +(1) 定义两个成员变量`unfinished_task_`,用于记录还有哪些任务没有完成;再定义`results_`记录不同数值计算所得的结果; +(2) 在`onStart()`中调4次`startCalculateTask()`,启4个计算任务; +(3) 在`startCalculateTask()`中,将需要计算的数值写入到`unfinished_task_`集合中。还需要定义`Tmp`结构体用于传递数据。注意,这里是Loop线程在执行,操作成员变量是安全的;并创建了`Tmp`结构体作为与子线程进行数据交换的媒介; +(4) 这里由工作线程在进行计算。在这里,是从`tmp->n`中取值进行操作,完成之后也是将结果记录到`tmp->result`中。整个过程没有操作成员变量,不需要加锁; +(5) 这里是由Loop线程执行的匿名函数,它会在工作线程执行完成之后被Loop线程调用。它的职责是处理`tmp`中的结果。 +(6)(7) 这是在做计算结果的处理。它有被Loop线程调用的,直接访问成员变量是安全的,无需加锁。 + + +编译执行的效果: +![](images/051-thread-pool-advance-run-1.png) +注意观察可见: +1. 确实是子线程在进行计算; +2. 最后的结果打印是主线程; +3. 所有的任务都只被一个工作线程执行; + +Q: 为什么只有一个工作线程呢? +A: 因为在tbox.main的框架配置中,线程池的默认配置为:最小0个,最大1个线程; + +如果想看到并行运算的效果,那要将最大线程数调大一点。在运行时指定`thread_pool.max`参数的值即可: +``` +./demo -s 'thread_pool.max=5' +``` +运行效果: +![](images/052-thread-pool-advance-run-2.png) +从上可以看到,程序在启动时就为每一个任务创建了一个工作线程。 + +[示例工程目录](examples/16-thread-pool-advance) + +## WorkThread +工作线程与线程池的接口很像,区别在于`WorkThread`内部只有一个工作线程,相当于简化版本的`ThreadPool`。由于`WorkThread`只有一个工作线程,提交给它的任务是按顺执行的。为此,我们会使用它来实现对执行顺序有要求的场景。 + +本节将用示例展示`WorkThread`的使用方法。如下图所示: +![](images/053-work-thread-code.png) +(1) 首先,需要`#include `; +(2) 要在定义成员变量`worker_`; +(3) 将上一节课中的`ctx().thread_pool()->`替代为`worker_.`即可; + +运行效果: +![](images/054-work-thread-run.png) +可以看到,与线程池很相像; + +[示例工程目录](examples/17-work-thread) + +------- +[[返回主页]](README.md) + diff --git a/README.md b/README.md index 4e1414f3418901bccd4cead96d1777feae032288..59c2d15e90620e2c158b12f967ceaf27ad13d24d 100644 --- a/README.md +++ b/README.md @@ -51,11 +51,14 @@ ## 日志输出 -## 线程池的使用 +## 线程池与工作线程的使用 +由于cpp-tbox是基于事件驱动的编程模型,对于事件的处理要求不能阻塞。然而实际的工作中,多多少少会遇到一些需要阻塞的事务。比如说:大运算、调用第三方库的阻塞性接口等。 +这时,就得需要借助`ThreadPool`与`WorkThread`来解决问题。 +[[点击前往]](14-thread-pool-and-work-thread.md) ## 子线程向主线程委派任务 -## 定时器池使用 +## 定时器池的使用 前面了解了定时器的使用,这里来了解一种创建定时器更方便的方式。 [[点击前往]](12-timer-pool.md) diff --git a/examples/15-thread-pool-simple/Makefile b/examples/15-thread-pool-simple/Makefile new file mode 100644 index 0000000000000000000000000000000000000000..526d44604420f9f10215e8eba8db574873bc2ddb --- /dev/null +++ b/examples/15-thread-pool-simple/Makefile @@ -0,0 +1,24 @@ +TARGET:=demo + +OBJECTS:=app_main.o + +CXXFLAGS:=-I$(HOME)/.tbox/include -DLOG_MODULE_ID='"demo"' -std=c++11 +LDFLAGS:=-L$(HOME)/.tbox/lib -rdynamic +LIBS:=\ + -ltbox_main \ + -ltbox_coroutine \ + -ltbox_trace \ + -ltbox_terminal \ + -ltbox_network \ + -ltbox_eventx \ + -ltbox_event \ + -ltbox_log \ + -ltbox_util \ + -ltbox_base \ + -lpthread -ldl + +$(TARGET): $(OBJECTS) + g++ -o $@ $^ $(LDFLAGS) $(LIBS) + +clean: + rm *.o diff --git a/examples/15-thread-pool-simple/app_main.cpp b/examples/15-thread-pool-simple/app_main.cpp new file mode 100644 index 0000000000000000000000000000000000000000..0e2f597b49d7f6b01bca635b93a9c4f0db248c7d --- /dev/null +++ b/examples/15-thread-pool-simple/app_main.cpp @@ -0,0 +1,41 @@ +#include +#include + +//! CPU密集运算 +uint64_t Fibonacci(int n) { + if (n <= 1) return n; + return Fibonacci(n - 1) + Fibonacci(n - 2); +} + +class MyModule : public tbox::main::Module { + public: + explicit MyModule(tbox::main::Context &ctx) + : tbox::main::Module("my", ctx) + { } + + public: + virtual bool onStart() { + int n = 40; + LogTrace("start caculate fibonacci %d", n); + //! 委托线程池执行任务 + ctx().thread_pool()->execute( + [n] { + //! 在线程池中执行 + LogTrace("fibonacci %d calculating", n); + auto result = Fibonacci(n); //! 进行CPU密集的运算 + LogTrace("caculate fibonacci %d result: %llu", n, result); + } + ); + + LogTag(); + return true; + } +}; + +namespace tbox { +namespace main { +void RegisterApps(Module &apps, Context &ctx) { + apps.add(new MyModule(ctx)); +} +} +} diff --git a/examples/16-thread-pool-advance/Makefile b/examples/16-thread-pool-advance/Makefile new file mode 100644 index 0000000000000000000000000000000000000000..526d44604420f9f10215e8eba8db574873bc2ddb --- /dev/null +++ b/examples/16-thread-pool-advance/Makefile @@ -0,0 +1,24 @@ +TARGET:=demo + +OBJECTS:=app_main.o + +CXXFLAGS:=-I$(HOME)/.tbox/include -DLOG_MODULE_ID='"demo"' -std=c++11 +LDFLAGS:=-L$(HOME)/.tbox/lib -rdynamic +LIBS:=\ + -ltbox_main \ + -ltbox_coroutine \ + -ltbox_trace \ + -ltbox_terminal \ + -ltbox_network \ + -ltbox_eventx \ + -ltbox_event \ + -ltbox_log \ + -ltbox_util \ + -ltbox_base \ + -lpthread -ldl + +$(TARGET): $(OBJECTS) + g++ -o $@ $^ $(LDFLAGS) $(LIBS) + +clean: + rm *.o diff --git a/examples/16-thread-pool-advance/app_main.cpp b/examples/16-thread-pool-advance/app_main.cpp new file mode 100644 index 0000000000000000000000000000000000000000..7085c1522368c7fe44360dd7d402163c247f5e09 --- /dev/null +++ b/examples/16-thread-pool-advance/app_main.cpp @@ -0,0 +1,84 @@ +#include +#include +#include +#include +#include + +//! CPU密集运算 +uint64_t Fibonacci(int n) { + if (n <= 1) return n; + return Fibonacci(n - 1) + Fibonacci(n - 2); +} + +class MyModule : public tbox::main::Module { + public: + explicit MyModule(tbox::main::Context &ctx) + : tbox::main::Module("my", ctx) + { } + + public: + virtual bool onStart() { + startCalculateTask(45); + startCalculateTask(40); + startCalculateTask(43); + startCalculateTask(42); + return true; + } + + //! 启动计算任务 + void startCalculateTask(int n) { + struct Tmp { + int n = 0; + uint64_t result = 0; + }; + auto tmp = std::make_shared(); + tmp->n = n; + unfinished_task_.insert(n); + + LogTrace("start caculate fibonacci %d", n); + ctx().thread_pool()->execute( + [tmp] { + //! 工作线程中执行,要避免让它访问到公共资源 + LogTrace("fibonacci %d underway", tmp->n); + tmp->result = Fibonacci(tmp->n); + }, + [this, tmp] { + //! Loop线程中执行,允许访问公共资源 + onCaculateTaskFinished(tmp->n, tmp->result); + } + ); + //! Q: 为什么不能在工作线程中直接调用onCaculateTaskFinished()? + //! A: 因为onCaculateTaskFinished()函数会访问到公共变量unfinished_task_,result_,这是非常危险的事情。 + //! Loop线程与工作线程都去访问就会导致变量竞态,如果没有加锁就会引起不可预知的异常。 + //! 为此,onCaculateTaskFinished()必须在工作线程做完事情后的后续处理函数中调用,后续处理函数是由 + //! Loop线程进执行,所以是安全的。 + //! 由于工作线程不能直接访问公共数据,那么它正常工作需要的数据与执行的结果都需要通过Tmp这个结构体 + //! 进行传递。由于tmp所指的对象被Loop线程与工作线程的访问在时间上是隔离的,所以是安全的。 + } + + //! 计算任务完成后的处理 + void onCaculateTaskFinished(int n, uint64_t result) { + LogTrace("caculate fibonacci %d result: %llu", n, result); + unfinished_task_.erase(n); //! 从未完成任务中移除n + results_[n] = result; //! 将结果存放到result_中 + + if (unfinished_task_.empty()) { + LogTrace("=== all task finished ==="); + for (auto item : results_) { + std::cout << item.first << " --> " << item.second << std::endl; + } + } + } + + private: + std::set unfinished_task_; + std::map results_; +}; + +namespace tbox { +namespace main { +void RegisterApps(Module &apps, Context &ctx) { + apps.add(new MyModule(ctx)); +} +} +} diff --git a/examples/17-work-thread/Makefile b/examples/17-work-thread/Makefile new file mode 100644 index 0000000000000000000000000000000000000000..526d44604420f9f10215e8eba8db574873bc2ddb --- /dev/null +++ b/examples/17-work-thread/Makefile @@ -0,0 +1,24 @@ +TARGET:=demo + +OBJECTS:=app_main.o + +CXXFLAGS:=-I$(HOME)/.tbox/include -DLOG_MODULE_ID='"demo"' -std=c++11 +LDFLAGS:=-L$(HOME)/.tbox/lib -rdynamic +LIBS:=\ + -ltbox_main \ + -ltbox_coroutine \ + -ltbox_trace \ + -ltbox_terminal \ + -ltbox_network \ + -ltbox_eventx \ + -ltbox_event \ + -ltbox_log \ + -ltbox_util \ + -ltbox_base \ + -lpthread -ldl + +$(TARGET): $(OBJECTS) + g++ -o $@ $^ $(LDFLAGS) $(LIBS) + +clean: + rm *.o diff --git a/examples/17-work-thread/app_main.cpp b/examples/17-work-thread/app_main.cpp new file mode 100644 index 0000000000000000000000000000000000000000..fafa6834cb4b61f95d72748b4b84735995e8b1ce --- /dev/null +++ b/examples/17-work-thread/app_main.cpp @@ -0,0 +1,44 @@ +#include +#include +#include + +//! CPU密集运算 +uint64_t Fibonacci(int n) { + if (n <= 1) return n; + return Fibonacci(n - 1) + Fibonacci(n - 2); +} + +class MyModule : public tbox::main::Module { + public: + explicit MyModule(tbox::main::Context &ctx) + : tbox::main::Module("my", ctx) + { } + + public: + virtual bool onStart() { + int n = 40; + LogTrace("start caculate fibonacci %d", n); + //! 委托线程池执行任务 + worker_.execute( + [n] { + //! 在线程池中执行 + LogTrace("fibonacci %d calculating", n); + auto result = Fibonacci(n); //! 进行CPU密集的运算 + LogTrace("caculate fibonacci %d result: %llu", n, result); + } + ); + + LogTag(); + return true; + } + private: + tbox::eventx::WorkThread worker_; +}; + +namespace tbox { +namespace main { +void RegisterApps(Module &apps, Context &ctx) { + apps.add(new MyModule(ctx)); +} +} +} diff --git a/images/048-thread-pool-simple-code.png b/images/048-thread-pool-simple-code.png new file mode 100644 index 0000000000000000000000000000000000000000..36f44c5087ac13c5002d4834c74cf01c852624b8 Binary files /dev/null and b/images/048-thread-pool-simple-code.png differ diff --git a/images/049-thread-pool-simple-run.png b/images/049-thread-pool-simple-run.png new file mode 100644 index 0000000000000000000000000000000000000000..71f79c5778541f597a32fea394f927672d4011e5 Binary files /dev/null and b/images/049-thread-pool-simple-run.png differ diff --git a/images/050-thread-pool-advance-code.png b/images/050-thread-pool-advance-code.png new file mode 100644 index 0000000000000000000000000000000000000000..2fc4a80a842d184a253e2c7c69bc74024aac88a2 Binary files /dev/null and b/images/050-thread-pool-advance-code.png differ diff --git a/images/051-thread-pool-advance-run-1.png b/images/051-thread-pool-advance-run-1.png new file mode 100644 index 0000000000000000000000000000000000000000..d24febb318f997ebbbbab7111005d8df87523a6c Binary files /dev/null and b/images/051-thread-pool-advance-run-1.png differ diff --git a/images/052-thread-pool-advance-run-2.png b/images/052-thread-pool-advance-run-2.png new file mode 100644 index 0000000000000000000000000000000000000000..047ccef960d94d3e65fce0160bfb9a8bc1fae401 Binary files /dev/null and b/images/052-thread-pool-advance-run-2.png differ diff --git a/images/053-work-thread-code.png b/images/053-work-thread-code.png new file mode 100644 index 0000000000000000000000000000000000000000..9b7690bf34472468ab91f0400b3f5e61c6696676 Binary files /dev/null and b/images/053-work-thread-code.png differ diff --git a/images/054-work-thread-run.png b/images/054-work-thread-run.png new file mode 100644 index 0000000000000000000000000000000000000000..eb7c05233479649c74e9ddaeee0bc35e87916725 Binary files /dev/null and b/images/054-work-thread-run.png differ