次の方法で共有


方法: データフロー ブロックのリンクを解除する

このドキュメントでは、ソースからターゲット データフロー ブロックのリンクを解除する方法について説明します。

TPL データフロー ライブラリ (System.Threading.Tasks.Dataflow 名前空間) は.NET と共に配布されません。 Visual Studio で System.Threading.Tasks.Dataflow 名前空間をインストールするには、プロジェクトを開き、[プロジェクト] メニューから [NuGet パッケージの管理] 選択し、System.Threading.Tasks.Dataflow パッケージをオンラインで検索します。 または、.NET Core CLI 使用してインストールするには、dotnet add package System.Threading.Tasks.Dataflow実行します。

次の例では、3 つの TransformBlock<TInput,TOutput> オブジェクトを作成し、それぞれが TrySolution メソッドを呼び出して値を計算します。 この例は、完了させるために、TrySolution に対する最初の呼び出しからの結果のみが必要です。

using System;
using System.Threading;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to unlink dataflow blocks.
class DataflowReceiveAny
{
   // Receives the value from the first provided source that has
   // a message.
   public static T ReceiveFromAny<T>(params ISourceBlock<T>[] sources)
   {
      // Create a WriteOnceBlock<T> object and link it to each source block.
      var writeOnceBlock = new WriteOnceBlock<T>(e => e);
      foreach (var source in sources)
      {
         // Setting MaxMessages to one instructs
         // the source block to unlink from the WriteOnceBlock<T> object
         // after offering the WriteOnceBlock<T> object one message.
         source.LinkTo(writeOnceBlock, new DataflowLinkOptions { MaxMessages = 1 });
      }
      // Return the first value that is offered to the WriteOnceBlock object.
      return writeOnceBlock.Receive();
   }

   // Demonstrates a function that takes several seconds to produce a result.
   static int TrySolution(int n, CancellationToken ct)
   {
      // Simulate a lengthy operation that completes within three seconds
      // or when the provided CancellationToken object is cancelled.
      SpinWait.SpinUntil(() => ct.IsCancellationRequested,
         new Random().Next(3000));

      // Return a value.
      return n + 42;
   }

   static void Main(string[] args)
   {
      // Create a shared CancellationTokenSource object to enable the
      // TrySolution method to be cancelled.
      var cts = new CancellationTokenSource();

      // Create three TransformBlock<int, int> objects.
      // Each TransformBlock<int, int> object calls the TrySolution method.
      Func<int, int> action = n => TrySolution(n, cts.Token);
      var trySolution1 = new TransformBlock<int, int>(action);
      var trySolution2 = new TransformBlock<int, int>(action);
      var trySolution3 = new TransformBlock<int, int>(action);

      // Post data to each TransformBlock<int, int> object.
      trySolution1.Post(11);
      trySolution2.Post(21);
      trySolution3.Post(31);

      // Call the ReceiveFromAny<T> method to receive the result from the
      // first TransformBlock<int, int> object to finish.
      int result = ReceiveFromAny(trySolution1, trySolution2, trySolution3);

      // Cancel all calls to TrySolution that are still active.
      cts.Cancel();

      // Print the result to the console.
      Console.WriteLine($"The solution is {result}.");

      cts.Dispose();
   }
}

/* Sample output:
The solution is 53.
*/
Imports System.Threading
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to unlink dataflow blocks.
Friend Class DataflowReceiveAny
    ' Receives the value from the first provided source that has 
    ' a message.
    Public Shared Function ReceiveFromAny(Of T)(ParamArray ByVal sources() As ISourceBlock(Of T)) As T
        ' Create a WriteOnceBlock<T> object and link it to each source block.
        Dim writeOnceBlock = New WriteOnceBlock(Of T)(Function(e) e)
        For Each source In sources
            ' Setting MaxMessages to one instructs
            ' the source block to unlink from the WriteOnceBlock<T> object
            ' after offering the WriteOnceBlock<T> object one message.
            source.LinkTo(writeOnceBlock, New DataflowLinkOptions With {.MaxMessages = 1})
        Next source
        ' Return the first value that is offered to the WriteOnceBlock object.
        Return writeOnceBlock.Receive()
    End Function

    ' Demonstrates a function that takes several seconds to produce a result.
    Private Shared Function TrySolution(ByVal n As Integer, ByVal ct As CancellationToken) As Integer
        ' Simulate a lengthy operation that completes within three seconds
        ' or when the provided CancellationToken object is cancelled.
        SpinWait.SpinUntil(Function() ct.IsCancellationRequested, New Random().Next(3000))

        ' Return a value.
        Return n + 42
    End Function

    Shared Sub Main(ByVal args() As String)
        ' Create a shared CancellationTokenSource object to enable the 
        ' TrySolution method to be cancelled.
        Dim cts = New CancellationTokenSource()

        ' Create three TransformBlock<int, int> objects. 
        ' Each TransformBlock<int, int> object calls the TrySolution method.
        Dim action As Func(Of Integer, Integer) = Function(n) TrySolution(n, cts.Token)
        Dim trySolution1 = New TransformBlock(Of Integer, Integer)(action)
        Dim trySolution2 = New TransformBlock(Of Integer, Integer)(action)
        Dim trySolution3 = New TransformBlock(Of Integer, Integer)(action)

        ' Post data to each TransformBlock<int, int> object.
        trySolution1.Post(11)
        trySolution2.Post(21)
        trySolution3.Post(31)

        ' Call the ReceiveFromAny<T> method to receive the result from the 
        ' first TransformBlock<int, int> object to finish.
        Dim result As Integer = ReceiveFromAny(trySolution1, trySolution2, trySolution3)

        ' Cancel all calls to TrySolution that are still active.
        cts.Cancel()

        ' Print the result to the console.
        Console.WriteLine("The solution is {0}.", result)

        cts.Dispose()
    End Sub
End Class

' Sample output:
'The solution is 53.
'

この例では、終了した最初の TransformBlock<TInput,TOutput> オブジェクトから値を受け取るために、ReceiveFromAny(T) メソッドを定義します。 ReceiveFromAny(T) メソッドは、ISourceBlock<TOutput> オブジェクトの配列を受け取り、これらの各オブジェクトを WriteOnceBlock<T> オブジェクトにリンクします。 LinkTo メソッドを使用してソース データフロー ブロックをターゲット ブロックにリンクすると、データが使用可能になると、ソースはメッセージをターゲットに伝達します。 WriteOnceBlock<T> クラスは提供された最初のメッセージのみを受け入れるため、ReceiveFromAny(T) メソッドは、Receive メソッドを呼び出して結果を生成します。 これにより、WriteOnceBlock<T> オブジェクトに提供される最初のメッセージが生成されます。 LinkTo メソッドには、MaxMessages プロパティを持つ DataflowLinkOptions オブジェクトを受け取るオーバーロードされたバージョンがあり、1に設定されている場合、ターゲットがソースから 1 つのメッセージを受信した後にターゲットからのリンクを解除するようにソース ブロックに指示します。 WriteOnceBlock<T> オブジェクトがメッセージを受信した後、ソースの配列と WriteOnceBlock<T> オブジェクトの関係が不要になったため、WriteOnceBlock<T> オブジェクトのソースからのリンクを解除することが重要です。

TrySolution の残りの呼び出しが値を計算した後で終了できるようにするには、TrySolution メソッドは、ReceiveFromAny(T) の呼び出しが戻った後に取り消された CancellationToken オブジェクトを受け取ります。 SpinUntil メソッドは、この CancellationToken オブジェクトが取り消されたときに返します。

こちらもご覧ください