程序从命令行读入数字n,将随机的n个正整数先升序排列,再把结果再降序排列。
程序可加入第二个参数"p",则可以进行并行排序。例如:
$ ./sort_task 100000000 p
上面的命令将先升序排列1亿个整数,再降序排列。两次排序都采用并行。
计算任务(或称线程任务),是我们非常重要的一个功能。在使用我们任务流的时候,并不建议在callback里直接进行非常复杂的计算。
所有需要消耗大量CPU时间的计算,都可以封装成计算任务交给系统去调度。计算任务和通信任务在使用方法上并没有什么区别。
系统的算法工厂提供了一些常用的计算任务,比如排序,归并等。用户也可以很方便定义自己的计算任务。
int main(int argc, char *argv[])
{
...
WFSortTask<int> *task;
if (use_parallel_sort)
task = WFAlgoTaskFactory::create_psort_task("sort", array, end, callback);
else
task = WFAlgoTaskFactory::create_sort_task("sort", array, end, callback);
...
task->start();
...
}
和WFHttpTask或WFRedisTask不同,排序任务多了一个模板参数代表要排序的数组数据类型。
create_sort_task和create_psort_task分别产生一个普通排序任务和一个并行排序任务。
这两个调用的参数和返回值并没有区别。
唯一需要特别说明的是第一个参数"sort",这个是计算队列名,用于影响内部的任务调度。本篇文档后面会介绍队列名的用法。
计算任务的启动方法与使用方法和网络通信任务并没有什么区别。
和通信任务一样,我们在callback里处理结果。这个示例里,升序排序之后会再发起一次降序排序。
using namespace algorithm;
void callback(void SortTask<int> *task)
{
SortInput<T> *input = task->get_input();
int *first = input->first;
int *last = input->last;
// print result
...
if (task->user_data == NULL)
{
auto cmp = [](int a1, int a2){ return a2 < a1; };
WFSortTask<int> *reverse;
if (use_parallel_sort)
reverse = WFAlgoTaskFactory::create_psort_task("sort", first, last, cmp, callback);
else
reverse = WFAlgoTaskFactory::create_sort_task("sort", first, last, cmp, callback);
reverse->user_data = (void *)1; /* as a flag */
series_of(task)->push_back(reverse);
}
else
{
// all done. Signal main thread to exit.
...
}
}
计算任务的get_input()接口得到输入数据,get_output()得到输出数据。对于排序任务,输入和输出是相同类型,内容也完全相同。
在WFAlgoTaskFactory.h里,可以看到排序任务输入输出的定义:
namespace algorithm
{
template <typename T>
struct SortInput
{
T *first;
T *last;
};
template <typename T>
using SortOutput = SortInput<T>;
}
template <typename T>
using WFSortTask = WFThreadTask<algorithm::SortInput<T>,
algorithm::SortOutput<T>>;
template <typename T>
using sort_callback_t = std::function<void (WFSortTask<T> *)>;
显然,input或output里的first, last分别为排序数组的首尾指针。
接下来我们会创建一个降序排序的任务,这时候,我们就需要传进去一个比较函数了。
auto cmp = [](int a1, int a2)->bool{ return a2 < a1; };
reverse = WFAlgoTaskFactory::create_sort_task("sort", first, last, cmp, callback);
可以说我们的用法和std::sort()区别不是很大。但我们的first和last是指针,而不是用iterator。
同样,用create_psort_task()可以创建一个并行排序任务。而对series的使用,和通信任务没有区别。
如果你不做任何配置,计算调度器将使用当前机器CPU个数的线程数。你也可以通过以下的方式,修改这个值:
#include "workflow/WFGlobal.h"
int main()
{
struct WFGlobalSettings settings = GLOBAL_SETTINGS_DEFAULT;
settings.compute_threads = 16;
WORKFLOW_library_init(&settings);
...
}
通过上面的配置,我们将创建16个线程用于计算。
内置的并行排序算法,使用分块+二路归并。空间复杂度为O(1)。
算法使用全局配置的计算线程进行计算,但最多使用128个线程。因为不使用额外空间,加速比会小于线程数量,平均CPU占用也比较小。
具体实现可参考WFAlgoTaskFactory.inl
我们的计算任务并没有优化级的概念,唯一可以影响调度顺序的是计算任务的队列名,本示例中队列名为字符串"sort"。
队列名的指定非常简单,需要说明以下几点:
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。