次の方法で共有


データフロー (タスク並列ライブラリ)

タスク並列ライブラリ (TPL) には、コンカレンシーが有効なアプリケーションの堅牢性を高めるのに役立つデータフロー コンポーネントが用意されています。 これらのデータフロー コンポーネントは、まとめて TPL データフロー ライブラリと呼ばれます。 このデータフロー モデルは、粗いデータフローおよびパイプライン処理タスクに対してインプロセス メッセージ の受け渡しを提供することで、アクターベースのプログラミングを促進します。 データフロー コンポーネントは、TPL の型とスケジュール インフラストラクチャに基づいて構築され、非同期プログラミングのための C#、Visual Basic、F# 言語のサポートと統合されます。 これらのデータフロー コンポーネントは、非同期的に相互に通信する必要がある複数の操作がある場合や、データが使用可能になったときに処理する必要がある場合に便利です。 たとえば、Web カメラから画像データを処理するアプリケーションを考えてみましょう。 データフロー モデルを使用することで、アプリケーションはイメージ フレームが使用可能になったときに処理できます。 アプリケーションで画像フレームを拡張する場合 (たとえば、光補正や赤目除去を実行するなど)、データフロー コンポーネントの パイプライン を作成できます。 パイプラインの各ステージでは、TPL によって提供される機能など、より粗い並列処理機能を使用してイメージを変換する場合があります。

このドキュメントでは、TPL データフロー ライブラリの概要について説明します。 プログラミング モデル、定義済みのデータフロー ブロックの種類、およびアプリケーションの特定の要件を満たすようにデータフロー ブロックを構成する方法について説明します。

TPL データフロー ライブラリ (System.Threading.Tasks.Dataflow 名前空間) は.NET と共に配布されません。 Visual Studio で System.Threading.Tasks.Dataflow 名前空間をインストールするには、プロジェクトを開き、[プロジェクト] メニューから [NuGet パッケージの管理] 選択し、System.Threading.Tasks.Dataflow パッケージをオンラインで検索します。 または、.NET Core CLI 使用してインストールするには、dotnet add package System.Threading.Tasks.Dataflow実行します。

プログラミング モデル

TPL データフロー ライブラリは、高スループットで待機時間が短い、CPU 負荷の高い I/O 集中型アプリケーションをメッセージを渡して並列化するための基盤を提供します。 また、データのバッファリング方法とシステム内の移動方法を明示的に制御することもできます。 データフロー プログラミング モデルについて理解を深めるために、ディスクからイメージを非同期的に読み込み、それらのイメージの複合を作成するアプリケーションを検討してください。 従来のプログラミング モデルでは、通常、タスクと共有データへのアクセスを調整するために、コールバックと同期オブジェクト (ロックなど) を使用する必要があります。 データフロー プログラミング モデルを使用すると、イメージをディスクから読み取る際に処理するデータフロー オブジェクトを作成できます。 データフロー モデルでは、使用可能になったときのデータの処理方法と、データ間の依存関係を宣言します。 ランタイムはデータ間の依存関係を管理するため、多くの場合、共有データへのアクセスを同期する要件を回避できます。 さらに、ランタイム はデータの非同期到着に基づいて動作をスケジュールするため、基になるスレッドを効率的に管理することで、データフローの応答性とスループットを向上させることができます。 データフロー プログラミング モデルを使用して Windows フォーム アプリケーションに画像処理を実装する例については、「 チュートリアル: Windows フォーム アプリケーションでのデータフローの使用」を参照してください。

ソースとターゲット

TPL データフロー ライブラリは、データをバッファー処理して処理するデータ構造であるデータ フロー ブロックで構成されます。 TPL は、 ソース ブロックターゲット ブロック、伝達子ブロックの 3 種類のデータフロー ブロックを定義します。 ソース ブロックはデータのソースとして機能し、そこから読み取ることができます。 ターゲット ブロックはデータの受信者として機能し、書き込むことができます。 伝達子ブロックは、ソース ブロックとターゲット ブロックの両方として機能し、読み取りと書き込みを行うことができます。 TPL は、ソースを表す System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> インターフェイス、ターゲットを表す System.Threading.Tasks.Dataflow.ITargetBlock<TInput> 、伝達子を表す System.Threading.Tasks.Dataflow.IPropagatorBlock<TInput,TOutput> を定義します。 IPropagatorBlock<TInput,TOutput> は、 ISourceBlock<TOutput>ITargetBlock<TInput>の両方から継承されます。

TPL データフロー ライブラリには、 ISourceBlock<TOutput>ITargetBlock<TInput>、および IPropagatorBlock<TInput,TOutput> インターフェイスを実装する定義済みのデータフロー ブロックの種類がいくつか用意されています。 これらのデータフロー ブロックの種類については、このドキュメントの「 定義済みのデータフロー ブロックの種類」セクションで説明します。

ブロックの接続

データフロー ブロックを接続して、データフロー ブロックの線形シーケンスである パイプライン、またはデータフロー ブロックのグラフである ネットワークを形成できます。 パイプラインはネットワークの 1 つの形式です。 パイプラインまたはネットワークでは、ソースは、データが使用可能になると、非同期的にデータをターゲットに伝達します。 ISourceBlock<TOutput>.LinkTo メソッドは、ソース データフロー ブロックをターゲット ブロックにリンクします。 ソースは、0 個以上のターゲットにリンクできます。ターゲットは、0 個以上のソースからリンクできます。 パイプラインまたはネットワークに対して、またはパイプラインまたはネットワークから同時にデータフロー ブロックを追加または削除できます。 定義済みのデータフロー ブロックの種類は、リンクとリンク解除のすべてのスレッド セーフの側面を処理します。

データフロー ブロックを接続して基本的なパイプラインを形成する例については、「 チュートリアル: データフロー パイプラインの作成」を参照してください。 データフロー ブロックを接続してより複雑なネットワークを形成する例については、「 チュートリアル: Windows フォーム アプリケーションでのデータフローの使用」を参照してください。 ソースがターゲットにメッセージを提供した後にソースからターゲットのリンクを解除する例については、「 方法: データフロー ブロックのリンクを解除する」を参照してください。

フィルタリング

ISourceBlock<TOutput>.LinkTo メソッドを呼び出してソースをターゲットにリンクする場合、そのメッセージの値に基づいてターゲット ブロックがメッセージを受け入れるか拒否するかを決定するデリゲートを指定できます。 このフィルター処理メカニズムは、データフロー ブロックが特定の値のみを受け取っていることを保証する便利な方法です。 定義済みのデータフロー ブロックの種類のほとんどで、ソース ブロックが複数のターゲット ブロックに接続されている場合、ターゲット ブロックがメッセージを拒否すると、ソースはそのメッセージを次のターゲットに提供します。 ソースがターゲットにメッセージを提供する順序は、ソースによって定義され、ソースの種類によって異なる場合があります。 ほとんどのソース ブロックの種類は、1 つのターゲットがそのメッセージを受け入れた後、メッセージの提供を停止します。 この規則の 1 つの例外は、一部のターゲットがメッセージを拒否した場合でも、すべてのターゲットに各メッセージを提供する BroadcastBlock<T> クラスです。 フィルター処理を使用して特定のメッセージのみを処理する例については、「 チュートリアル: Windows フォーム アプリケーションでのデータフローの使用」を参照してください。

Von Bedeutung

定義済みの各ソース データフロー ブロックの種類では、メッセージが受信した順序で伝達されることを保証するため、ソース ブロックが次のメッセージを処理するには、すべてのメッセージをソース ブロックから読み取る必要があります。 そのため、フィルター処理を使用して複数のターゲットをソースに接続する場合は、少なくとも 1 つのターゲット ブロックが各メッセージを受信することを確認します。 そうしないと、アプリケーションがデッドロックする可能性があります。

メッセージ パッシング

データフロー プログラミング モデルは、 メッセージの受け渡しの概念に関連しています。この概念では、プログラムの独立したコンポーネントがメッセージを送信することによって相互に通信します。 アプリケーション コンポーネント間でメッセージを伝達する 1 つの方法は、 Post (同期) メソッドと SendAsync (非同期) メソッドを呼び出してターゲット データフロー ブロックにメッセージを送信し、ソース ブロックからメッセージを受信する ReceiveReceiveAsync、および TryReceive メソッドを呼び出すことです。 これらのメソッドをデータフロー パイプラインまたはネットワークと組み合わせるには、入力データをヘッド ノード (ターゲット ブロック) に送信し、パイプラインのターミナル ノードまたはネットワークのターミナル ノード (1 つ以上のソース ブロック) から出力データを受信します。 また、 Choose メソッドを使用して、提供されている最初のソースからデータを読み取り、そのデータに対してアクションを実行することもできます。

ソース ブロックは、 ITargetBlock<TInput>.OfferMessage メソッドを呼び出すことによって、ターゲット ブロックにデータを提供します。 ターゲット ブロックは、メッセージの受け入れ、メッセージの拒否、またはメッセージの延期のいずれかの方法で、提供されたメッセージに応答します。 ターゲットがメッセージを受け入れると、 OfferMessage メソッドは Acceptedを返します。 ターゲットがメッセージを拒否すると、 OfferMessage メソッドは Declinedを返します。 ターゲットがソースからメッセージを受信しなくなったことが必要な場合、 OfferMessageDecliningPermanentlyを返します。 定義済みのソース ブロックタイプは、そのような戻り値を受信した後にリンクされたターゲットにメッセージを提供せず、そのようなターゲットからのリンクを自動的に解除します。

ターゲット ブロックが後で使用するためにメッセージを延期すると、 OfferMessage メソッドは Postponedを返します。 メッセージを延期するターゲット ブロックは、後で ISourceBlock<TOutput>.ReserveMessage メソッドを呼び出して、提供されたメッセージの予約を試みることができます。 この時点で、メッセージは引き続き使用可能であり、ターゲット ブロックで使用できるか、またはメッセージが別のターゲットによって取得されています。 ターゲット ブロックは、後でメッセージを必要とする場合、またはメッセージが不要になった場合は、それぞれ ISourceBlock<TOutput>.ConsumeMessage または ReleaseReservation メソッドを呼び出します。 メッセージ予約は通常、非貪欲モードで動作するデータフローブロックの種類によって使用されます。 非貪欲モードについては、このドキュメントの後半で説明します。 ターゲット ブロックでは、延期されたメッセージを予約する代わりに、 ISourceBlock<TOutput>.ConsumeMessage メソッドを使用して、延期されたメッセージを直接使用することもできます。

データフロー ブロックの完了

データフロー ブロックでは 、完了の概念もサポートされています。 完了状態のデータフロー ブロックでは、それ以上の処理は実行されません。 各データフロー ブロックには、ブロック System.Threading.Tasks.Task 完了状態を表す 完了タスクと呼ばれるオブジェクトが関連付けられています。 Task オブジェクトが完了するまで待機できるため、完了タスクを使用して、データフロー ネットワークの 1 つ以上のターミナル ノードが完了するのを待つことができます。 IDataflowBlock インターフェイスは、Completeメソッドを定義します。このメソッドは、完了する要求をデータフロー ブロックに通知し、データフロー ブロックの完了タスクを返す Completion プロパティを定義します。 ISourceBlock<TOutput>ITargetBlock<TInput>の両方がIDataflowBlock インターフェイスを継承します。

データフロー ブロックがエラーなしで完了したか、1 つ以上のエラーが発生したか、取り消されたかを判断するには、2 つの方法があります。 最初の方法は、Task.Waittry- ブロック (Visual Basic でcatchTry-) の完了タスクに対してCatch メソッドを呼び出す方法です。 次の例では、入力値が 0 未満の場合にActionBlock<TInput>をスローするArgumentOutOfRangeException オブジェクトを作成します。 この例では、完了タスクで AggregateException を呼び出すと、Wait がスローされます。 ArgumentOutOfRangeExceptionには、InnerExceptions オブジェクトの AggregateException プロパティを使用してアクセスします。

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine($"n = {n}");
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine($"Encountered {e.GetType().Name}: {e.Message}");
      return true;
   });
}

/* Output:
n = 0
n = -1
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

この例では、実行データフロー ブロックのデリゲートで例外が処理されないケースを示します。 このようなブロックの本体で例外を処理することをお勧めします。 ただし、これを行うことができない場合、ブロックは取り消されたかのように動作し、受信メッセージを処理しません。

データフロー ブロックが明示的に取り消されると、AggregateException オブジェクトには OperationCanceledException プロパティにInnerExceptionsが含まれます。 データフローの取り消しの詳細については、「 キャンセルの有効化 」セクションを参照してください。

データフロー ブロックの完了状態を確認する 2 つ目の方法は、完了タスクの継続を使用するか、C# と Visual Basic の非同期言語機能を使用して完了タスクを非同期的に待機することです。 Task.ContinueWith メソッドに指定するデリゲートは、継続元タスクを表すTask オブジェクトを受け取ります。 Completion プロパティの場合、継続のデリゲートは完了タスク自体を受け取ります。 次の例は前の例に似ていますが、 ContinueWith メソッドを使用して、データフロー操作全体の状態を出力する継続タスクも作成します。

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine($"n = {n}");
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Create a continuation task that prints the overall
// task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(task =>
{
   Console.WriteLine($"The status of the completion task is '{task.Status}'.");
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine($"Encountered {e.GetType().Name}: {e.Message}");
      return true;
   });
}

/* Output:
n = 0
n = -1
The status of the completion task is 'Faulted'.
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Create a continuation task that prints the overall 
' task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(Sub(task) Console.WriteLine("The status of the completion task is '{0}'.", task.Status))

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         The status of the completion task is 'Faulted'.
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

継続タスクの本文で IsCanceled などのプロパティを使用して、データフロー ブロックの完了状態に関する追加情報を確認することもできます。 継続タスクやそれらがキャンセルやエラー処理とどのように関係するかの詳細については、「継続タスクを使用したタスクの連結」、「タスクのキャンセル」、および「例外処理」を参照してください。

定義済みのデータフロー ブロックの種類

TPL データフロー ライブラリには、定義済みのデータフロー ブロックの種類がいくつか用意されています。 これらの型は、 バッファー ブロック実行ブロック、 グループ化ブロックの 3 つのカテゴリに分かれています。 次のセクションでは、これらのカテゴリを構成するブロックの種類について説明します。

バッファリング ブロック

バッファー ブロックは、データ コンシューマーが使用するデータを保持します。 TPL データフロー ライブラリには、 System.Threading.Tasks.Dataflow.BufferBlock<T>System.Threading.Tasks.Dataflow.BroadcastBlock<T>System.Threading.Tasks.Dataflow.WriteOnceBlock<T>の 3 種類のバッファリング ブロックが用意されています。

BufferBlock<T>

BufferBlock<T> クラスは、汎用非同期メッセージング構造を表します。 このクラスは、複数のソースから書き込んだり、複数のターゲットから読み取ったりできるメッセージの先入れ先出し (FIFO) キューを格納します。 ターゲットが BufferBlock<T> オブジェクトからメッセージを受信すると、そのメッセージはメッセージ キューから削除されます。 そのため、 BufferBlock<T> オブジェクトは複数のターゲットを持つことができますが、各メッセージを受信するターゲットは 1 つだけです。 BufferBlock<T> クラスは、複数のメッセージを別のコンポーネントに渡す必要があり、そのコンポーネントが各メッセージを受信する必要がある場合に便利です。

次の基本的な例では、Int32 オブジェクトにいくつかのBufferBlock<T>値をポストし、そのオブジェクトからそれらの値を読み取ります。

// Create a BufferBlock<int> object.
var bufferBlock = new BufferBlock<int>();

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   bufferBlock.Post(i);
}

// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(bufferBlock.Receive());
}

/* Output:
   0
   1
   2
 */
' Create a BufferBlock<int> object.
Dim bufferBlock = New BufferBlock(Of Integer)()

' Post several messages to the block.
For i As Integer = 0 To 2
    bufferBlock.Post(i)
Next i

' Receive the messages back from the block.
For i As Integer = 0 To 2
    Console.WriteLine(bufferBlock.Receive())
Next i

'          Output:
'            0
'            1
'            2
'          

BufferBlock<T> オブジェクトにメッセージを書き込み、 オブジェクトからメッセージを読み取る方法を示す完全な例については、「方法: データフロー ブロックへのメッセージの書き込みとデータフロー ブロックからのメッセージの読み取り」を参照してください。

BroadcastBlock<T>

BroadcastBlock<T> クラスは、複数のメッセージを別のコンポーネントに渡す必要があるが、そのコンポーネントに必要なのは最新の値だけである場合に便利です。 このクラスは、メッセージを複数のコンポーネントにブロードキャストする場合にも便利です。

次の基本的な例では、 Double 値を BroadcastBlock<T> オブジェクトにポストしてから、そのオブジェクトからその値を複数回読み取ります。 値は読み取った後 BroadcastBlock<T> オブジェクトから削除されないため、毎回同じ値を使用できます。

// Create a BroadcastBlock<double> object.
var broadcastBlock = new BroadcastBlock<double>(null);

// Post a message to the block.
broadcastBlock.Post(Math.PI);

// Receive the messages back from the block several times.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(broadcastBlock.Receive());
}

/* Output:
   3.14159265358979
   3.14159265358979
   3.14159265358979
 */
' Create a BroadcastBlock<double> object.
Dim broadcastBlock = New BroadcastBlock(Of Double)(Nothing)

' Post a message to the block.
broadcastBlock.Post(Math.PI)

' Receive the messages back from the block several times.
For i As Integer = 0 To 2
    Console.WriteLine(broadcastBlock.Receive())
Next i

'          Output:
'            3.14159265358979
'            3.14159265358979
'            3.14159265358979
'          

BroadcastBlock<T>を使用して複数のターゲット ブロックにメッセージをブロードキャストする方法を示す完全な例については、「方法: データフロー ブロックでタスク スケジューラを指定する」を参照してください。

WriteOnceBlock<T>

WriteOnceBlock<T> クラスは、BroadcastBlock<T> クラスに似ていますが、WriteOnceBlock<T> オブジェクトを 1 回だけ書き込むことができる点が異なります。 WriteOnceBlock<T>は C# 読み取り専用 (Visual Basic では ReadOnly) キーワードに似ていると考えることができます。ただし、WriteOnceBlock<T> オブジェクトは、構築時ではなく値を受け取った後に不変になります。 BroadcastBlock<T> クラスと同様に、ターゲットがWriteOnceBlock<T> オブジェクトからメッセージを受信しても、そのメッセージはそのオブジェクトから削除されません。 そのため、複数のターゲットがメッセージのコピーを受信します。 WriteOnceBlock<T> クラスは、複数のメッセージの最初のメッセージのみを伝達する場合に便利です。

次の基本的な例では、String オブジェクトに複数のWriteOnceBlock<T>値をポストし、そのオブジェクトから値を読み取ります。 WriteOnceBlock<T> オブジェクトは 1 回だけ書き込むことができるため、WriteOnceBlock<T> オブジェクトはメッセージを受信した後、後続のメッセージを破棄します。

// Create a WriteOnceBlock<string> object.
var writeOnceBlock = new WriteOnceBlock<string>(null);

// Post several messages to the block in parallel. The first
// message to be received is written to the block.
// Subsequent messages are discarded.
Parallel.Invoke(
   () => writeOnceBlock.Post("Message 1"),
   () => writeOnceBlock.Post("Message 2"),
   () => writeOnceBlock.Post("Message 3"));

// Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive());

/* Sample output:
   Message 2
 */
' Create a WriteOnceBlock<string> object.
Dim writeOnceBlock = New WriteOnceBlock(Of String)(Nothing)

' Post several messages to the block in parallel. The first 
' message to be received is written to the block. 
' Subsequent messages are discarded.
Parallel.Invoke(Function() writeOnceBlock.Post("Message 1"), Function() writeOnceBlock.Post("Message 2"), Function() writeOnceBlock.Post("Message 3"))

' Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive())

'          Sample output:
'            Message 2
'          

WriteOnceBlock<T>を使用して、完了した最初の操作の値を受け取る方法を示す完全な例については、「方法: データフロー ブロックのリンクを解除する」を参照してください。

実行ブロック

実行ブロックは、受信したデータの各部分に対してユーザー指定のデリゲートを呼び出します。 TPL データフロー ライブラリには、 ActionBlock<TInput>System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput>System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput>の 3 種類の実行ブロックが用意されています。

ActionBlock<T>

ActionBlock<TInput> クラスは、データを受信するときにデリゲートを呼び出すターゲット ブロックです。 ActionBlock<TInput> オブジェクトは、データが使用可能になったときに非同期的に実行されるデリゲートと考えてください。 ActionBlock<TInput> オブジェクトに指定するデリゲートには、Action<T>型またはSystem.Func<TInput, Task>型を指定できます。 ActionBlock<TInput>Action<T> オブジェクトを使用すると、デリゲートが戻ったときに各入力要素の処理が完了したと見なされます。 ActionBlock<TInput>System.Func<TInput, Task> オブジェクトを使用すると、返されたTask オブジェクトが完了した場合にのみ、各入力要素の処理が完了したと見なされます。 これら 2 つのメカニズムを使用すると、各入力要素の同期処理と非同期処理の両方に ActionBlock<TInput> を使用できます。

次の基本的な例では、Int32 オブジェクトに複数のActionBlock<TInput>値をポストします。 ActionBlock<TInput> オブジェクトは、これらの値をコンソールに出力します。 次の使用例は、ブロックを完了状態に設定し、すべてのデータフロー タスクが完了するまで待機します。

// Create an ActionBlock<int> object that prints values
// to the console.
var actionBlock = new ActionBlock<int>(n => Console.WriteLine(n));

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   actionBlock.Post(i * 10);
}

// Set the block to the completed state and wait for all
// tasks to finish.
actionBlock.Complete();
actionBlock.Completion.Wait();

/* Output:
   0
   10
   20
 */
' Create an ActionBlock<int> object that prints values
' to the console.
Dim actionBlock = New ActionBlock(Of Integer)(Function(n) WriteLine(n))

' Post several messages to the block.
For i As Integer = 0 To 2
    actionBlock.Post(i * 10)
Next i

' Set the block to the completed state and wait for all 
' tasks to finish.
actionBlock.Complete()
actionBlock.Completion.Wait()

'          Output:
'            0
'            10
'            20
'          

ActionBlock<TInput> クラスでデリゲートを使用する方法を示す完全な例については、「方法: データフロー ブロックがデータを受信したときにアクションを実行する」を参照してください。

TransformBlock<TInput、TOutput>

TransformBlock<TInput,TOutput> クラスは、ソースとターゲットの両方として機能することを除き、ActionBlock<TInput> クラスに似ています。 TransformBlock<TInput,TOutput> オブジェクトに渡すデリゲートは、TOutput型の値を返します。 TransformBlock<TInput,TOutput> オブジェクトに指定するデリゲートには、System.Func<TInput, TOutput>型またはSystem.Func<TInput, Task<TOutput>>型を指定できます。 TransformBlock<TInput,TOutput>System.Func<TInput, TOutput> オブジェクトを使用すると、デリゲートが戻ったときに各入力要素の処理が完了したと見なされます。 TransformBlock<TInput,TOutput>で使用するSystem.Func<TInput, Task<TOutput>> オブジェクトを使用する場合、各入力要素の処理は、返されたTask<TResult> オブジェクトが完了したときにのみ完了したと見なされます。 ActionBlock<TInput>と同様に、これら 2 つのメカニズムを使用して、各入力要素の同期処理と非同期処理の両方にTransformBlock<TInput,TOutput>を使用できます。

次の基本的な例では、入力の平方根を計算する TransformBlock<TInput,TOutput> オブジェクトを作成します。 TransformBlock<TInput,TOutput> オブジェクトはInt32値を入力として受け取り、Double値を出力として生成します。

// Create a TransformBlock<int, double> object that
// computes the square root of its input.
var transformBlock = new TransformBlock<int, double>(n => Math.Sqrt(n));

// Post several messages to the block.
transformBlock.Post(10);
transformBlock.Post(20);
transformBlock.Post(30);

// Read the output messages from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(transformBlock.Receive());
}

/* Output:
   3.16227766016838
   4.47213595499958
   5.47722557505166
 */
' Create a TransformBlock<int, double> object that 
' computes the square root of its input.
Dim transformBlock = New TransformBlock(Of Integer, Double)(Function(n) Math.Sqrt(n))

' Post several messages to the block.
transformBlock.Post(10)
transformBlock.Post(20)
transformBlock.Post(30)

' Read the output messages from the block.
For i As Integer = 0 To 2
    Console.WriteLine(transformBlock.Receive())
Next i

'          Output:
'            3.16227766016838
'            4.47213595499958
'            5.47722557505166
'          

Windows フォーム アプリケーションで画像処理を実行するデータフロー ブロックのネットワークで TransformBlock<TInput,TOutput> を使用する完全な例については、「 チュートリアル: Windows フォーム アプリケーションでのデータフローの使用」を参照してください。

TransformManyBlock<TInput、TOutput>

TransformManyBlock<TInput,TOutput> クラスはTransformBlock<TInput,TOutput> クラスに似ていますが、TransformManyBlock<TInput,TOutput>では、各入力値に対して 1 つの出力値ではなく、各入力値に対して 0 個以上の出力値が生成される点が異なります。 TransformManyBlock<TInput,TOutput> オブジェクトに指定するデリゲートには、System.Func<TInput, IEnumerable<TOutput>>型またはSystem.Func<TInput, Task<IEnumerable<TOutput>>>型を指定できます。 TransformManyBlock<TInput,TOutput>System.Func<TInput, IEnumerable<TOutput>> オブジェクトを使用すると、デリゲートが戻ったときに各入力要素の処理が完了したと見なされます。 TransformManyBlock<TInput,TOutput>System.Func<TInput, Task<IEnumerable<TOutput>>> オブジェクトを使用する場合、各入力要素の処理は、返されたSystem.Threading.Tasks.Task<IEnumerable<TOutput>> オブジェクトが完了したときにのみ完了したと見なされます。

次の基本的な例では、文字列を個々の文字シーケンスに分割する TransformManyBlock<TInput,TOutput> オブジェクトを作成します。 TransformManyBlock<TInput,TOutput> オブジェクトはString値を入力として受け取り、Char値を出力として生成します。

// Create a TransformManyBlock<string, char> object that splits
// a string into its individual characters.
var transformManyBlock = new TransformManyBlock<string, char>(
   s => s.ToCharArray());

// Post two messages to the first block.
transformManyBlock.Post("Hello");
transformManyBlock.Post("World");

// Receive all output values from the block.
for (int i = 0; i < ("Hello" + "World").Length; i++)
{
   Console.WriteLine(transformManyBlock.Receive());
}

/* Output:
   H
   e
   l
   l
   o
   W
   o
   r
   l
   d
 */
' Create a TransformManyBlock<string, char> object that splits
' a string into its individual characters.
Dim transformManyBlock = New TransformManyBlock(Of String, Char)(Function(s) s.ToCharArray())

' Post two messages to the first block.
transformManyBlock.Post("Hello")
transformManyBlock.Post("World")

' Receive all output values from the block.
For i As Integer = 0 To ("Hello" & "World").Length - 1
    Console.WriteLine(transformManyBlock.Receive())
Next i

'          Output:
'            H
'            e
'            l
'            l
'            o
'            W
'            o
'            r
'            l
'            d
'          

TransformManyBlock<TInput,TOutput>を使用して、データフロー パイプライン内の入力ごとに複数の独立した出力を生成する完全な例については、「チュートリアル: データフロー パイプラインの作成」を参照してください。

並列処理の次数

すべての ActionBlock<TInput>TransformBlock<TInput,TOutput>、および TransformManyBlock<TInput,TOutput> オブジェクトは、ブロックで処理できる状態になるまで入力メッセージをバッファーします。 既定では、これらのクラスはメッセージを受信した順序で一度に 1 つのメッセージで処理します。 また、並列処理の次数を指定して、 ActionBlock<TInput>TransformBlock<TInput,TOutput> 、および TransformManyBlock<TInput,TOutput> オブジェクトが複数のメッセージを同時に処理できるようにすることもできます。 同時実行の詳細については、このドキュメントで後述する「並列処理の次数の指定」セクションを参照してください。 実行データフロー ブロックが一度に複数のメッセージを処理できるように並列処理の次数を設定する例については、「 方法: データフロー ブロックで並列処理の次数を指定する」を参照してください。

デリゲート型の概要

次の表は、オブジェクトの ActionBlock<TInput>TransformBlock<TInput,TOutput>、および TransformManyBlock<TInput,TOutput> に指定できるデリゲート型をまとめたものです。 このテーブルでは、デリゲート型が同期的に動作するか非同期的に動作するかを指定します。

タイプ 同期デリゲートタイプ 非同期的なデリゲート型
ActionBlock<TInput> System.Action System.Func<TInput, Task>
TransformBlock<TInput,TOutput> System.Func<TInput, TOutput> System.Func<TInput, Task<TOutput>>
TransformManyBlock<TInput,TOutput> System.Func<TInput, IEnumerable<TOutput>> System.Func<TInput, Task<IEnumerable<TOutput>>>

また、実行ブロック型を操作するときにラムダ式を使用することもできます。 実行ブロックでラムダ式を使用する方法を示す例については、「 方法: データフロー ブロックがデータを受信したときにアクションを実行する」を参照してください。

ブロックのグループ化

グループ化ブロックは、1 つ以上のソースからのデータを、さまざまな制約の下で結合します。 TPL データフロー ライブラリには、 BatchBlock<T>JoinBlock<T1,T2>BatchedJoinBlock<T1,T2>の 3 種類の結合ブロックが用意されています。

BatchBlock<T>

BatchBlock<T> クラスは、バッチと呼ばれる一連の入力データを出力データの配列に結合します。 BatchBlock<T> オブジェクトを作成するときに、各バッチのサイズを指定します。 BatchBlock<T> オブジェクトは、指定した数の入力要素を受け取ると、それらの要素を含む配列を非同期的に伝達します。 BatchBlock<T> オブジェクトが完了状態に設定されていても、バッチを形成するのに十分な要素が含まれていない場合は、残りの入力要素を含む最終的な配列が伝達されます。

BatchBlock<T> クラスは貪欲モードまたは非貪欲モードで動作します。 既定のグリーディーモードでは、BatchBlock<T> オブジェクトは提供されるすべてのメッセージを受け入れ、指定した要素数を受け取った後に配列を出力します。 非貪欲モードでは、BatchBlock<T> オブジェクトは、十分な数のソースからメッセージが提供されてバッチが形成されるまで、すべての受信メッセージの処理を延期します。 通常、貪欲モードは、処理オーバーヘッドが少ないため、非貪欲モードよりも優れたパフォーマンスを発揮します。 ただし、アトミックな方法で複数のソースからの消費をコーディネートする必要がある場合は、非貪欲モードを使用できます。 Greedy コンストラクターの False パラメーターでdataflowBlockOptionsBatchBlock<T>に設定して、非貪欲モードを指定します。

次の基本的な例では、バッチ内の 10 個の要素を保持するInt32 オブジェクトにいくつかのBatchBlock<T>値をポストします。 この例では、すべての値が BatchBlock<T>から確実に伝達されるように、 Complete メソッドを呼び出します。 Complete メソッドは、BatchBlock<T> オブジェクトを完了状態に設定するため、BatchBlock<T> オブジェクトは残りの要素を最終的なバッチとして伝達します。

// Create a BatchBlock<int> object that holds ten
// elements per batch.
var batchBlock = new BatchBlock<int>(10);

// Post several values to the block.
for (int i = 0; i < 13; i++)
{
   batchBlock.Post(i);
}
// Set the block to the completed state. This causes
// the block to propagate out any remaining
// values as a final batch.
batchBlock.Complete();

// Print the sum of both batches.

Console.WriteLine($"The sum of the elements in batch 1 is {batchBlock.Receive().Sum()}.");

Console.WriteLine($"The sum of the elements in batch 2 is {batchBlock.Receive().Sum()}.");

/* Output:
   The sum of the elements in batch 1 is 45.
   The sum of the elements in batch 2 is 33.
 */
' Create a BatchBlock<int> object that holds ten
' elements per batch.
Dim batchBlock = New BatchBlock(Of Integer)(10)

' Post several values to the block.
For i As Integer = 0 To 12
    batchBlock.Post(i)
Next i
' Set the block to the completed state. This causes
' the block to propagate out any remaining
' values as a final batch.
batchBlock.Complete()

' Print the sum of both batches.

Console.WriteLine("The sum of the elements in batch 1 is {0}.", batchBlock.Receive().Sum())

Console.WriteLine("The sum of the elements in batch 2 is {0}.", batchBlock.Receive().Sum())

'          Output:
'            The sum of the elements in batch 1 is 45.
'            The sum of the elements in batch 2 is 33.
'          

BatchBlock<T>を使用してデータベース挿入操作の効率を向上させる完全な例については、「チュートリアル: BatchBlock と BatchedJoinBlock を使用して効率を向上させる」を参照してください。

JoinBlock<T1、T2、...>

JoinBlock<T1,T2>クラスとJoinBlock<T1,T2,T3> クラスは、入力要素を収集し、それらの要素を含むSystem.Tuple<T1,T2>またはSystem.Tuple<T1,T2,T3>オブジェクトを伝達します。 JoinBlock<T1,T2>クラスとJoinBlock<T1,T2,T3> クラスは、ITargetBlock<TInput>から継承されません。 代わりに、Target1を実装するプロパティ、Target2Target3、およびITargetBlock<TInput>が提供されます。

BatchBlock<T>と同様に、JoinBlock<T1,T2>JoinBlock<T1,T2,T3>は貪欲モードまたは非貪欲モードで動作します。 既定のグリーディーモードでは、JoinBlock<T1,T2> または JoinBlock<T1,T2,T3> オブジェクトは提供されるすべてのメッセージを受け入れ、各ターゲットが少なくとも 1 つのメッセージを受信した後にタプルを送信します。 非貪欲モードでは、JoinBlock<T1,T2> または JoinBlock<T1,T2,T3> オブジェクトは、タプルの作成に必要なデータがすべてのターゲットに提供されるまで、すべての受信メッセージを延期します。 この時点で、ブロックは 2 フェーズ コミット プロトコルを使用して、ソースから必要なすべての項目をアトミックに取得します。 この延期により、別のエンティティがそれまでの間にデータを使用して、システム全体が前進できるようになります。

次の基本的な例は、 JoinBlock<T1,T2,T3> オブジェクトが値を計算するために複数のデータを必要とするケースを示しています。 次の使用例は、算術演算を実行するために 2 つのJoinBlock<T1,T2,T3>値とInt32値を必要とするChar オブジェクトを作成します。

// Create a JoinBlock<int, int, char> object that requires
// two numbers and an operator.
var joinBlock = new JoinBlock<int, int, char>();

// Post two values to each target of the join.

joinBlock.Target1.Post(3);
joinBlock.Target1.Post(6);

joinBlock.Target2.Post(5);
joinBlock.Target2.Post(4);

joinBlock.Target3.Post('+');
joinBlock.Target3.Post('-');

// Receive each group of values and apply the operator part
// to the number parts.

for (int i = 0; i < 2; i++)
{
   var data = joinBlock.Receive();
   switch (data.Item3)
   {
      case '+':
         Console.WriteLine($"{data.Item1} + {data.Item2} = {data.Item1 + data.Item2}");
         break;
      case '-':
         Console.WriteLine($"{data.Item1} - {data.Item2} = {data.Item1 - data.Item2}");
         break;
      default:
         Console.WriteLine($"Unknown operator '{data.Item3}'.");
         break;
   }
}

/* Output:
   3 + 5 = 8
   6 - 4 = 2
 */
' Create a JoinBlock<int, int, char> object that requires
' two numbers and an operator.
Dim joinBlock = New JoinBlock(Of Integer, Integer, Char)()

' Post two values to each target of the join.

joinBlock.Target1.Post(3)
joinBlock.Target1.Post(6)

joinBlock.Target2.Post(5)
joinBlock.Target2.Post(4)

joinBlock.Target3.Post("+"c)
joinBlock.Target3.Post("-"c)

' Receive each group of values and apply the operator part
' to the number parts.

For i As Integer = 0 To 1
    Dim data = joinBlock.Receive()
    Select Case data.Item3
        Case "+"c
            Console.WriteLine("{0} + {1} = {2}", data.Item1, data.Item2, data.Item1 + data.Item2)
        Case "-"c
            Console.WriteLine("{0} - {1} = {2}", data.Item1, data.Item2, data.Item1 - data.Item2)
        Case Else
            Console.WriteLine("Unknown operator '{0}'.", data.Item3)
    End Select
Next i

'          Output:
'            3 + 5 = 8
'            6 - 4 = 2
'          

非貪欲モードで JoinBlock<T1,T2> オブジェクトを使用してリソースを協調的に共有する完全な例については、「方法: JoinBlock を使用して複数のソースからデータを読み取る」を参照してください。

BatchedJoinBlock<T1、T2、...>

BatchedJoinBlock<T1,T2>クラスとBatchedJoinBlock<T1,T2,T3> クラスは、入力要素のバッチを収集し、それらの要素を含むSystem.Tuple(IList(T1), IList(T2))またはSystem.Tuple(IList(T1), IList(T2), IList(T3))オブジェクトを伝達します。 BatchedJoinBlock<T1,T2>は、BatchBlock<T>JoinBlock<T1,T2>の組み合わせと考えてください。 BatchedJoinBlock<T1,T2> オブジェクトを作成するときに、各バッチのサイズを指定します。 BatchedJoinBlock<T1,T2>には、Target1を実装するプロパティ (Target2ITargetBlock<TInput>) も用意されています。 指定した数の入力要素がすべてのターゲットから受信されると、 BatchedJoinBlock<T1,T2> オブジェクトは、それらの要素を含む System.Tuple(IList(T1), IList(T2)) オブジェクトを非同期的に伝達します。

次の基本的な例では、結果、BatchedJoinBlock<T1,T2>値、およびInt32オブジェクトであるエラーを保持するException オブジェクトを作成します。 この例では、複数の操作を実行し、結果を Target1 プロパティに書き込み、エラーを Target2 オブジェクトの BatchedJoinBlock<T1,T2> プロパティに書き込みます。 成功した操作と失敗した操作の数は事前に不明であるため、 IList<T> オブジェクトを使用すると、各ターゲットは 0 個以上の値を受け取ります。

// For demonstration, create a Func<int, int> that
// returns its argument, or throws ArgumentOutOfRangeException
// if the argument is less than zero.
Func<int, int> DoWork = n =>
{
   if (n < 0)
      throw new ArgumentOutOfRangeException();
   return n;
};

// Create a BatchedJoinBlock<int, Exception> object that holds
// seven elements per batch.
var batchedJoinBlock = new BatchedJoinBlock<int, Exception>(7);

// Post several items to the block.
foreach (int i in new int[] { 5, 6, -7, -22, 13, 55, 0 })
{
   try
   {
      // Post the result of the worker to the
      // first target of the block.
      batchedJoinBlock.Target1.Post(DoWork(i));
   }
   catch (ArgumentOutOfRangeException e)
   {
      // If an error occurred, post the Exception to the
      // second target of the block.
      batchedJoinBlock.Target2.Post(e);
   }
}

// Read the results from the block.
var results = batchedJoinBlock.Receive();

// Print the results to the console.

// Print the results.
foreach (int n in results.Item1)
{
   Console.WriteLine(n);
}
// Print failures.
foreach (Exception e in results.Item2)
{
   Console.WriteLine(e.Message);
}

/* Output:
   5
   6
   13
   55
   0
   Specified argument was out of the range of valid values.
   Specified argument was out of the range of valid values.
 */
' For demonstration, create a Func<int, int> that 
' returns its argument, or throws ArgumentOutOfRangeException
' if the argument is less than zero.
Dim DoWork As Func(Of Integer, Integer) = Function(n)
                                              If n < 0 Then
                                                  Throw New ArgumentOutOfRangeException()
                                              End If
                                              Return n
                                          End Function

' Create a BatchedJoinBlock<int, Exception> object that holds 
' seven elements per batch.
Dim batchedJoinBlock = New BatchedJoinBlock(Of Integer, Exception)(7)

' Post several items to the block.
For Each i As Integer In New Integer() {5, 6, -7, -22, 13, 55, 0}
    Try
        ' Post the result of the worker to the 
        ' first target of the block.
        batchedJoinBlock.Target1.Post(DoWork(i))
    Catch e As ArgumentOutOfRangeException
        ' If an error occurred, post the Exception to the 
        ' second target of the block.
        batchedJoinBlock.Target2.Post(e)
    End Try
Next i

' Read the results from the block.
Dim results = batchedJoinBlock.Receive()

' Print the results to the console.

' Print the results.
For Each n As Integer In results.Item1
    Console.WriteLine(n)
Next n
' Print failures.
For Each e As Exception In results.Item2
    Console.WriteLine(e.Message)
Next e

'          Output:
'            5
'            6
'            13
'            55
'            0
'            Specified argument was out of the range of valid values.
'            Specified argument was out of the range of valid values.
'          

BatchedJoinBlock<T1,T2>を使用して、プログラムがデータベースから読み取る間に発生する結果と例外の両方をキャプチャする完全な例については、「チュートリアル: BatchBlock と BatchedJoinBlock を使用して効率を向上させる」を参照してください。

データフロー ブロックの動作の構成

データフロー ブロック型のコンストラクターに System.Threading.Tasks.Dataflow.DataflowBlockOptions オブジェクトを指定することで、追加のオプションを有効にすることができます。 これらのオプションは、基になるタスクを管理するスケジューラや並列処理の次数などの動作を制御します。 DataflowBlockOptionsには、特定のデータフロー ブロック型に固有の動作を指定する派生型もあります。 次の表は、各データフロー ブロックの種類に関連付けられているオプションの種類をまとめたものです。

データフロー ブロックの種類 DataflowBlockOptions
BufferBlock<T> DataflowBlockOptions
BroadcastBlock<T> DataflowBlockOptions
WriteOnceBlock<T> DataflowBlockOptions
ActionBlock<TInput> ExecutionDataflowBlockOptions
TransformBlock<TInput,TOutput> ExecutionDataflowBlockOptions
TransformManyBlock<TInput,TOutput> ExecutionDataflowBlockOptions
BatchBlock<T> GroupingDataflowBlockOptions
JoinBlock<T1,T2> GroupingDataflowBlockOptions
BatchedJoinBlock<T1,T2> GroupingDataflowBlockOptions

次のセクションでは、 System.Threading.Tasks.Dataflow.DataflowBlockOptionsSystem.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptions、および System.Threading.Tasks.Dataflow.GroupingDataflowBlockOptions クラスで使用できる重要な種類のデータフロー ブロック オプションに関する追加情報を提供します。

タスク スケジューラの指定

すべての定義済みデータフロー ブロックでは、TPL タスク スケジューリング メカニズムを使用して、データをターゲットに伝達する、ソースからデータを受信する、データが使用可能になったときにユーザー定義デリゲートを実行するなどのアクティビティを実行します。 TaskScheduler は、タスクをスレッドにキューに入れるタスク スケジューラを表す抽象クラスです。 既定のタスク スケジューラ ( Default) では、 ThreadPool クラスを使用してキューに登録し、作業を実行します。 データフロー ブロック オブジェクトを構築するときに TaskScheduler プロパティを設定することで、既定のタスク スケジューラをオーバーライドできます。

同じタスク スケジューラが複数のデータフロー ブロックを管理する場合、それらのブロック全体にポリシーを適用できます。 たとえば、同じ ConcurrentExclusiveSchedulerPair オブジェクトの排他スケジューラを対象とするように複数のデータフロー ブロックが構成されている場合、これらのブロック間で実行されるすべての作業がシリアル化されます。 同様に、これらのブロックが同じ ConcurrentExclusiveSchedulerPair オブジェクトの同時実行スケジューラをターゲットにするように構成されていて、そのスケジューラが最大コンカレンシー レベルを持つよう構成されている場合、これらのブロックのすべての作業は、その同時実行操作の数に制限されます。 ConcurrentExclusiveSchedulerPair クラスを使用して読み取り操作を並列で実行し、書き込み操作を他のすべての操作のみで実行する例については、「方法: データフロー ブロックでタスク スケジューラを指定する」を参照してください。 TPL のタスク スケジューラの詳細については、 TaskScheduler クラスのトピックを参照してください。

並列処理の次数の指定

既定では、TPL データフロー ライブラリが提供する 3 つの実行ブロックの種類( ActionBlock<TInput>TransformBlock<TInput,TOutput>TransformManyBlock<TInput,TOutput>)は、一度に 1 つのメッセージを処理します。 これらのデータフロー ブロックの種類では、受信した順序でメッセージも処理されます。 これらのデータフロー ブロックでメッセージを同時に処理できるようにするには、データフロー ブロック オブジェクトを構築するときに ExecutionDataflowBlockOptions.MaxDegreeOfParallelism プロパティを設定します。

MaxDegreeOfParallelismの既定値は 1 で、データフロー ブロックが一度に 1 つのメッセージを処理することを保証します。 このプロパティを 1 より大きい値に設定すると、データフロー ブロックで複数のメッセージを同時に処理できます。 このプロパティを DataflowBlockOptions.Unbounded に設定すると、基になるタスク スケジューラはコンカレンシーの最大レベルを管理できます。

Von Bedeutung

並列処理の最大次数を 1 より大きく指定すると、複数のメッセージが同時に処理されるため、メッセージが受信した順序で処理されない可能性があります。 ただし、メッセージがブロックから出力される順序は、受信される順序と同じです。

MaxDegreeOfParallelism プロパティは並列処理の最大限度を表しているため、データフロー ブロックは指定したよりも並列処理の程度が低い場合があります。 データフロー ブロックの機能要件を満たすために、または使用可能なシステム リソースがないために、並列処理の程度が低くなる可能性があります。 データフロー ブロックでは、指定したよりも並列処理が選択されることはありません。

MaxDegreeOfParallelism プロパティの値は、各データフロー ブロック オブジェクトに対して排他的です。 たとえば、4 つのデータフロー ブロック オブジェクトがそれぞれ並列処理の最大次数として 1 を指定している場合、4 つのデータフロー ブロック オブジェクトすべてが並列で実行される可能性があります。

長い操作を並列で実行できるように並列処理の最大次数を設定する例については、「 方法: データフロー ブロックで並列処理の次数を指定する」を参照してください。

タスクあたりのメッセージ数の指定

定義済みのデータフロー ブロックの種類では、タスクを使用して複数の入力要素を処理します。 これにより、データを処理するために必要なタスク オブジェクトの数を最小限に抑えることができます。これにより、アプリケーションをより効率的に実行できます。 ただし、データフロー ブロックの 1 つのセットのタスクがデータを処理している場合、他のデータフロー ブロックのタスクは、メッセージをキューに格納して処理時間を待つ必要がある場合があります。 データフロー タスク間の公平性を高めるために、 MaxMessagesPerTask プロパティを設定します。 MaxMessagesPerTaskDataflowBlockOptions.Unbounded (既定値) に設定されている場合、データフロー ブロックで使用されるタスクは、使用可能な数のメッセージを処理します。 MaxMessagesPerTaskUnbounded以外の値に設定されている場合、データフロー ブロックは、Task オブジェクトごとに最大でこの数のメッセージを処理します。 MaxMessagesPerTask プロパティを設定すると、タスク間の公平性が向上する可能性がありますが、必要以上に多くのタスクが作成され、パフォーマンスが低下する可能性があります。

キャンセルの有効化

TPL は、タスクが協調的にキャンセルを調整できるようにするメカニズムを提供します。 データフロー ブロックがこのキャンセル メカニズムに参加できるようにするには、 CancellationToken プロパティを設定します。 この CancellationToken オブジェクトが取り消された状態に設定されている場合、このトークンを監視するすべてのデータフロー ブロックは現在のアイテムの実行を終了しますが、後続の項目の処理は開始しません。 また、これらのデータフロー ブロックは、バッファー内のメッセージをクリアし、任意のソース ブロックとターゲット ブロックへの接続を解放し、取り消された状態に遷移します。 取り消された状態に遷移することで、 Completion プロパティは、処理中に例外が発生しない限り、 Status プロパティを Canceled に設定します。 その場合、 StatusFaulted に設定されます。

Windows フォーム アプリケーションで取り消しを使用する方法を示す例については、「 方法: データフロー ブロックを取り消す」を参照してください。 TPL での取り消しの詳細については、「 タスクの取り消し」を参照してください。

最長一致と最短一致の動作の指定

複数のグループ化データフローブロックの種類は貪欲モードまたは非貪欲モードで動作できます。 既定では、定義済みのデータフロー ブロックは貪欲モードで動作します。

JoinBlock<T1,T2>などの結合ブロックの種類の場合、貪欲モードとは、結合するために必要な対応データがまだ利用できなくても、ブロックが即座にデータを受け入れることを意味します。 非貪欲モードでは、ブロックが結合を完了するために各ターゲットにメッセージが届くまで、すべての受信メッセージの処理を延期します。 延期されたメッセージのいずれかが使用できなくなった場合、結合ブロックは延期されたすべてのメッセージを解放し、プロセスを再開します。 BatchBlock<T> クラスの場合、貪欲モードと非貪欲モードの動作は似ていますが、非貪欲モードでは、BatchBlock<T> オブジェクトは、異なるソースからのメッセージがバッチを完了するのに十分な数集まるまで、すべての受信メッセージの処理を遅らせます。

データフローブロックに対して非貪欲モードを指定するには、GreedyFalse に設定します。 非貪欲モードを使用して複数の結合ブロックがデータ ソースをより効率的に共有できる方法の例については、「方法: JoinBlock を使用して複数のソースからデータを読み取る」を参照してください。

カスタム データフロー ブロック

TPL データフロー ライブラリには多くの定義済みブロックタイプが用意されていますが、カスタム動作を実行する追加のブロックタイプを作成できます。 ISourceBlock<TOutput>またはITargetBlock<TInput> インターフェイスを直接実装するか、Encapsulate メソッドを使用して、既存のブロック型の動作をカプセル化する複雑なブロックを構築します。 カスタム データフロー ブロック機能を実装する方法を示す例については、「 チュートリアル: カスタム データフロー ブロック型の作成」を参照してください。

タイトル 説明
方法: データフロー ブロックにメッセージを書き込み、データフロー ブロックからメッセージを読み取る BufferBlock<T> オブジェクトにメッセージを書き込み、メッセージを読み取る方法を示します。
方法: Producer-Consumer データフロー パターンを実装する データフロー モデルを使用してプロデューサー/コンシューマー パターンを実装する方法について説明します。プロデューサーはデータフロー ブロックにメッセージを送信し、コンシューマーはそのブロックからメッセージを読み取ります。
方法: データフロー ブロックがデータを受信したときにアクションを実行する 実行データフロー ブロックの種類、 ActionBlock<TInput>TransformBlock<TInput,TOutput>、および TransformManyBlock<TInput,TOutput>にデリゲートを提供する方法について説明します。
チュートリアル: データフロー パイプラインの作成 Web からテキストをダウンロードし、そのテキストに対して操作を実行するデータフロー パイプラインを作成する方法について説明します。
方法: データフロー ブロックのリンクを解除する LinkTo メソッドを使用して、ソースがターゲットにメッセージを提供した後でターゲット ブロックのリンクを解除する方法を示します。
チュートリアル: Windows フォーム アプリケーションでのデータフローの使用 Windows フォーム アプリケーションで画像処理を実行するデータフロー ブロックのネットワークを作成する方法を示します。
方法: データフロー ブロックをキャンセルする Windows フォーム アプリケーションでキャンセルを使用する方法を示します。
方法: JoinBlock を使用して複数のソースからデータを読み取る JoinBlock<T1,T2> クラスを使用して、複数のソースからデータを利用可能な場合に操作を実行する方法と、複数の結合ブロックがデータソースをより効率的に共有できるようにするために、非貪欲モードを使用する方法について説明します。
方法: データフロー ブロックで並列処理の次数を指定する MaxDegreeOfParallelism プロパティを設定して、実行データフロー ブロックで一度に複数のメッセージを処理できるようにする方法について説明します。
方法: データフロー ブロックでタスク スケジューラを指定する アプリケーションでデータフローを使用するときに、特定のタスク スケジューラを関連付ける方法を示します。
チュートリアル: BatchBlock と BatchedJoinBlock を使用した効率の向上 BatchBlock<T> クラスを使用してデータベース挿入操作の効率を向上させる方法と、BatchedJoinBlock<T1,T2> クラスを使用して、プログラムがデータベースから読み取る際に発生する結果と例外の両方をキャプチャする方法について説明します。
チュートリアル: カスタム データフロー ブロック型の作成 カスタム動作を実装するデータフロー ブロック型を作成する 2 つの方法を示します。
タスク並列ライブラリ (TPL) .NET Framework アプリケーションでの並列および同時実行プログラミングを簡略化するライブラリである TPL について説明します。