実行データフロー ブロック の種類は、データを受信するときにユーザー指定のデリゲートを呼び出します。
System.Threading.Tasks.Dataflow.ActionBlock<TInput>、System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput>、およびSystem.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput>クラスは、実行データフロー ブロックの種類です。 実行データフロー ブロックに作業関数を指定する場合は、 delegate
キーワード (Visual Basic でSub
)、 Action<T>、 Func<T,TResult>、またはラムダ式を使用できます。 このドキュメントでは、 Func<T,TResult> 式とラムダ式を使用して、実行ブロックでアクションを実行する方法について説明します。
注
TPL データフロー ライブラリ (System.Threading.Tasks.Dataflow 名前空間) は.NET と共に配布されません。 Visual Studio で System.Threading.Tasks.Dataflow 名前空間をインストールするには、プロジェクトを開き、[プロジェクト] メニューから [NuGet パッケージの管理] 選択し、System.Threading.Tasks.Dataflow
パッケージをオンラインで検索します。 または、.NET Core CLI 使用してインストールするには、dotnet add package System.Threading.Tasks.Dataflow
実行します。
例
次の例では、データフローを使用してディスクからファイルを読み取り、そのファイル内の 0 のバイト数を計算します。 TransformBlock<TInput,TOutput>を使用してファイルを読み取り、0 バイト数を計算し、ActionBlock<TInput> 0 バイトの数をコンソールに出力します。 TransformBlock<TInput,TOutput> オブジェクトは、ブロックがデータを受信したときに処理を実行するFunc<T,TResult> オブジェクトを指定します。 ActionBlock<TInput> オブジェクトはラムダ式を使用して、読み取られた 0 バイトの数をコンソールに出力します。
using System;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to provide delegates to exectution dataflow blocks.
class DataflowExecutionBlocks
{
// Computes the number of zero bytes that the provided file
// contains.
static int CountBytes(string path)
{
byte[] buffer = new byte[1024];
int totalZeroBytesRead = 0;
using (var fileStream = File.OpenRead(path))
{
int bytesRead = 0;
do
{
bytesRead = fileStream.Read(buffer, 0, buffer.Length);
totalZeroBytesRead += buffer.Count(b => b == 0);
} while (bytesRead > 0);
}
return totalZeroBytesRead;
}
static void Main(string[] args)
{
// Create a temporary file on disk.
string tempFile = Path.GetTempFileName();
// Write random data to the temporary file.
using (var fileStream = File.OpenWrite(tempFile))
{
Random rand = new Random();
byte[] buffer = new byte[1024];
for (int i = 0; i < 512; i++)
{
rand.NextBytes(buffer);
fileStream.Write(buffer, 0, buffer.Length);
}
}
// Create an ActionBlock<int> object that prints to the console
// the number of bytes read.
var printResult = new ActionBlock<int>(zeroBytesRead =>
{
Console.WriteLine($"{Path.GetFileName(tempFile)} contains {zeroBytesRead} zero bytes.");
});
// Create a TransformBlock<string, int> object that calls the
// CountBytes function and returns its result.
var countBytes = new TransformBlock<string, int>(
new Func<string, int>(CountBytes));
// Link the TransformBlock<string, int> object to the
// ActionBlock<int> object.
countBytes.LinkTo(printResult);
// Create a continuation task that completes the ActionBlock<int>
// object when the TransformBlock<string, int> finishes.
countBytes.Completion.ContinueWith(delegate { printResult.Complete(); });
// Post the path to the temporary file to the
// TransformBlock<string, int> object.
countBytes.Post(tempFile);
// Requests completion of the TransformBlock<string, int> object.
countBytes.Complete();
// Wait for the ActionBlock<int> object to print the message.
printResult.Completion.Wait();
// Delete the temporary file.
File.Delete(tempFile);
}
}
/* Sample output:
tmp4FBE.tmp contains 2081 zero bytes.
*/
Imports System.IO
Imports System.Linq
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to provide delegates to exectution dataflow blocks.
Friend Class DataflowExecutionBlocks
' Computes the number of zero bytes that the provided file
' contains.
Private Shared Function CountBytes(ByVal path As String) As Integer
Dim buffer(1023) As Byte
Dim totalZeroBytesRead As Integer = 0
Using fileStream = File.OpenRead(path)
Dim bytesRead As Integer = 0
Do
bytesRead = fileStream.Read(buffer, 0, buffer.Length)
totalZeroBytesRead += buffer.Count(Function(b) b = 0)
Loop While bytesRead > 0
End Using
Return totalZeroBytesRead
End Function
Shared Sub Main(ByVal args() As String)
' Create a temporary file on disk.
Dim tempFile As String = Path.GetTempFileName()
' Write random data to the temporary file.
Using fileStream = File.OpenWrite(tempFile)
Dim rand As New Random()
Dim buffer(1023) As Byte
For i As Integer = 0 To 511
rand.NextBytes(buffer)
fileStream.Write(buffer, 0, buffer.Length)
Next i
End Using
' Create an ActionBlock<int> object that prints to the console
' the number of bytes read.
Dim printResult = New ActionBlock(Of Integer)(Sub(zeroBytesRead) Console.WriteLine("{0} contains {1} zero bytes.", Path.GetFileName(tempFile), zeroBytesRead))
' Create a TransformBlock<string, int> object that calls the
' CountBytes function and returns its result.
Dim countBytes = New TransformBlock(Of String, Integer)(New Func(Of String, Integer)(AddressOf DataflowExecutionBlocks.CountBytes))
' Link the TransformBlock<string, int> object to the
' ActionBlock<int> object.
countBytes.LinkTo(printResult)
' Create a continuation task that completes the ActionBlock<int>
' object when the TransformBlock<string, int> finishes.
countBytes.Completion.ContinueWith(Sub() printResult.Complete())
' Post the path to the temporary file to the
' TransformBlock<string, int> object.
countBytes.Post(tempFile)
' Requests completion of the TransformBlock<string, int> object.
countBytes.Complete()
' Wait for the ActionBlock<int> object to print the message.
printResult.Completion.Wait()
' Delete the temporary file.
File.Delete(tempFile)
End Sub
End Class
' Sample output:
'tmp4FBE.tmp contains 2081 zero bytes.
'
TransformBlock<TInput,TOutput> オブジェクトにラムダ式を指定できますが、この例ではFunc<T,TResult>を使用して、他のコードで CountBytes
メソッドを使用できるようにします。
ActionBlock<TInput> オブジェクトはラムダ式を使用します。これは、実行する作業がこのタスクに固有であり、他のコードでは役に立たない可能性があるためです。 タスク並列ライブラリでのラムダ式の動作の詳細については、「 PLINQ および TPL のラムダ式」を参照してください。
データフロー ドキュメントのデリゲート型の概要セクションでは、ActionBlock<TInput>、TransformBlock<TInput,TOutput>、およびTransformManyBlock<TInput,TOutput>オブジェクトに提供できるデリゲート型の概要を示します。 また、このテーブルでは、デリゲート型が同期的に動作するか非同期的に動作するかを指定します。
堅牢なプログラミング
この例では、データフロー ブロックのタスクを同期的に実行するために、TransformBlock<TInput,TOutput> オブジェクトにFunc<T,TResult>型のデリゲートを提供します。 データフロー ブロックが非同期的に動作できるようにするには、データフロー ブロックに Func<T, Task<TResult>>
型のデリゲートを指定します。 データフロー ブロックが非同期的に動作する場合、データフロー ブロックのタスクは、返された Task<TResult> オブジェクトが完了したときにのみ完了します。 次の例では、 CountBytes
メソッドを変更し、 async 演算子と await 演算子 (Visual Basic では Async および Await ) を使用して、指定されたファイル内の 0 バイトの合計数を非同期的に計算します。
ReadAsync メソッドは、ファイル読み取り操作を非同期的に実行します。
// Asynchronously computes the number of zero bytes that the provided file
// contains.
static async Task<int> CountBytesAsync(string path)
{
byte[] buffer = new byte[1024];
int totalZeroBytesRead = 0;
using (var fileStream = new FileStream(
path, FileMode.Open, FileAccess.Read, FileShare.Read, 0x1000, true))
{
int bytesRead = 0;
do
{
// Asynchronously read from the file stream.
bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length);
totalZeroBytesRead += buffer.Count(b => b == 0);
} while (bytesRead > 0);
}
return totalZeroBytesRead;
}
' Asynchronously computes the number of zero bytes that the provided file
' contains.
Private Shared async Function CountBytesAsync(ByVal path As String) As Task(Of Integer)
Dim buffer(1023) As Byte
Dim totalZeroBytesRead As Integer = 0
Using fileStream = New FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, &H1000, True)
Dim bytesRead As Integer = 0
Do
' Asynchronously read from the file stream.
bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length)
totalZeroBytesRead += buffer.Count(Function(b) b = 0)
Loop While bytesRead > 0
End Using
Return totalZeroBytesRead
End Function
非同期ラムダ式を使用して、実行データフロー ブロックでアクションを実行することもできます。 次の例では、ラムダ式を使用して非同期的に処理を実行するように、前の例で使用した TransformBlock<TInput,TOutput> オブジェクトを変更します。
// Create a TransformBlock<string, int> object that calls the
// CountBytes function and returns its result.
var countBytesAsync = new TransformBlock<string, int>(async path =>
{
byte[] buffer = new byte[1024];
int totalZeroBytesRead = 0;
using (var fileStream = new FileStream(
path, FileMode.Open, FileAccess.Read, FileShare.Read, 0x1000, true))
{
int bytesRead = 0;
do
{
// Asynchronously read from the file stream.
bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length);
totalZeroBytesRead += buffer.Count(b => b == 0);
} while (bytesRead > 0);
}
return totalZeroBytesRead;
});
' Create a TransformBlock<string, int> object that calls the
' CountBytes function and returns its result.
Dim countBytesAsync = New TransformBlock(Of String, Integer)(async Function(path)
' Asynchronously read from the file stream.
Dim buffer(1023) As Byte
Dim totalZeroBytesRead As Integer = 0
Using fileStream = New FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, &H1000, True)
Dim bytesRead As Integer = 0
Do
bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length)
totalZeroBytesRead += buffer.Count(Function(b) b = 0)
Loop While bytesRead > 0
End Using
Return totalZeroBytesRead
End Function)
こちらもご覧ください
.NET