如何:转换使用取消的 OpenMP 循环以使用并发运行时

某些并行循环不需要执行所有迭代。 例如,搜索值的算法可以在找到值后终止。 OpenMP 不提供中断并行循环的机制。 但是,可以使用布尔值或标志来启用循环迭代,以指示已找到解决方案。 并发运行时提供允许一个任务取消其他尚未启动的任务的功能。

此示例演示如何将一个不需要运行所有迭代的 OpenMP parallelfor 循环转换为使用并发运行时取消机制。

示例

此示例同时使用 OpenMP 和并发运行时来实现 std::any_of 算法的并行版本。 此示例的 OpenMP 版本使用标志来协调所有满足条件的并行循环迭代。 使用并发运行时的版本使用 concurrency::structured_task_group::cancel 方法在满足条件时停止整个操作。

// concrt-omp-parallel-any-of.cpp
// compile with: /EHsc /openmp
#include <ppl.h>
#include <array>
#include <random>
#include <iostream>

using namespace concurrency;
using namespace std;

// Uses OpenMP to determine whether a condition exists in 
// the specified range of elements.
template <class InIt, class Predicate>
bool omp_parallel_any_of(InIt first, InIt last, const Predicate& pr)
{
   typedef typename std::iterator_traits<InIt>::value_type item_type;

   // A flag that indicates that the condition exists.
   bool found = false;

   #pragma omp parallel for
      for (int i = 0; i < static_cast<int>(last-first); ++i)
      {
         if (!found)
         {
            item_type& cur = *(first + i);

            // If the element satisfies the condition, set the flag to 
            // cancel the operation.
            if (pr(cur)) {
               found = true;
            }
         }
      }

   return found;
}

// Uses the Concurrency Runtime to determine whether a condition exists in 
// the specified range of elements.
template <class InIt, class Predicate>
bool concrt_parallel_any_of(InIt first, InIt last, const Predicate& pr)
{
   typedef typename std::iterator_traits<InIt>::value_type item_type;

   structured_task_group tasks;

   // Create a predicate function that cancels the task group when
   // an element satisfies the condition.
   auto for_each_predicate = [&pr, &tasks](const item_type& cur) {
      if (pr(cur)) {
         tasks.cancel();
      }
   };

   // Create a task that calls the predicate function in parallel on each
   // element in the range.
   auto task = make_task([&]() {
       parallel_for_each(first, last, for_each_predicate);
   });

   // The condition is satisfied if the task group is in the cancelled state.
   return tasks.run_and_wait(task) == canceled;
}

int wmain()
{
   // The length of the array.
   const size_t size = 100000;
   
   // Create an array and initialize it with random values.
   array<int, size> a;   
   generate(begin(a), end(a), mt19937(42));

   // Search for a value in the array by using OpenMP and the Concurrency Runtime.

   const int what = 9114046;
   auto predicate = [what](int n) -> bool { 
      return (n == what);
   };

   wcout << L"Using OpenMP..." << endl;
   if (omp_parallel_any_of(begin(a), end(a), predicate))
   {
      wcout << what << L" is in the array." << endl;
   }
   else
   {
      wcout << what << L" is not in the array." << endl;
   }

   wcout << L"Using the Concurrency Runtime..." << endl;
   if (concrt_parallel_any_of(begin(a), end(a), predicate))
   {
      wcout << what << L" is in the array." << endl;
   }
   else
   {
      wcout << what << L" is not in the array." << endl;
   }
}

本示例生成以下输出。

Using OpenMP...
9114046 is in the array.
Using the Concurrency Runtime...
9114046 is in the array.

在使用 OpenMP 的版本中,将执行循环的所有迭代,即使设置了标志。 此外,如果任务具有任何子任务,则标志还必须可供这些子任务用来传达取消信息。 在并发运行时中,当任务组被取消时,运行时会取消整个工作树,包括子任务。 concurrency::parallel_for_each 算法使用任务来执行工作。 因此,当循环的一次迭代取消根任务时,也会取消整个计算树。 取消工作树后,运行时不会启动新任务。 但是,运行时允许已经开始的任务完成。 因此,对于 parallel_for_each 算法,活动循环迭代可以清理其资源。

在此示例的两个版本中,如果数组包含要搜索的值的多个副本,则多个循环迭代可以同时设置结果并取消整个操作。 如果问题要求在满足条件时只有一个任务执行工作,则可以使用同步基元(如关键部分)。

还可以使用异常处理来取消使用 PPL 的任务。 有关取消的详细信息,请参阅 PPL 中的取消操作

有关 parallel_for_each 和其他并行算法的详细信息,请参阅并行算法

编译代码

复制示例代码,并将它粘贴到 Visual Studio 项目中,或粘贴到名为 concrt-omp-parallel-any-of.cpp 的文件中,再在 Visual Studio 命令提示符窗口中运行以下命令。

cl.exe /EHsc /openmp concrt-omp-parallel-any-of.cpp

另请参阅

从 OpenMP 迁移至并发运行时
PPL 中的取消操作
并行算法