次の方法で共有


PLINQ および TPL 用のカスタム パーティショナー

データ ソースに対する操作を並列化するには、ソースを複数のスレッドで同時にアクセスできる複数のセクションに パーティション分割 することが重要な手順の 1 つです。 PLINQ とタスク並列ライブラリ (TPL) は、並列クエリまたは ForEach ループを記述するときに透過的に動作する既定のパーティショナーを提供します。 より高度なシナリオでは、独自のパーティショナーをプラグインできます。

パーティション分割の種類

データ ソースをパーティション分割する方法は多数あります。 最も効率的なアプローチでは、ソースを複数のサブシーケンスに物理的に分離するのではなく、複数のスレッドが連携して元のソース シーケンスを処理します。 配列やその他のインデックス付きソース (長さが事前にわかっている IList コレクションなど) の場合、 範囲パーティション分割 は最も簡単な種類のパーティション分割です。 すべてのスレッドは一意の開始インデックスと終了インデックスを受け取るので、他のスレッドによって上書きされたり上書きされたりすることなく、ソースの範囲を処理できます。 範囲のパーティション分割に関係する唯一のオーバーヘッドは、範囲を作成する最初の作業です。その後、追加の同期は必要ありません。 そのため、ワークロードが均等に分割されている限り、良好なパフォーマンスを提供できます。 範囲パーティション分割の欠点は、あるスレッドが早期に終了した場合、他のスレッドが作業を完了するのを助けられないということです。

リンク リストまたは長さが不明な他のコレクションの場合は、 チャンク パーティション分割を使用できます。 チャンクパーティション分割では、並列ループまたはクエリ内のすべてのスレッドまたはタスクが、1 つのチャンク内のいくつかのソース要素を消費し、それらを処理してから、追加の要素を取得するために戻ってくる。 パーティショナーは、すべての要素が分散され、重複がないことを保証します。 チャンクには任意のサイズを指定できます。 たとえば、「 方法: 動的パーティションを実装する 」で示されているパーティショナーは、1 つの要素のみを含むチャンクを作成します。 チャンクが大きすぎない限り、この種のパーティション分割は本質的に負荷分散されます。これは、スレッドへの要素の割り当てが事前に決定されていないためです。 ただし、パーティショナーでは、スレッドが別のチャンクを取得する必要があるたびに同期オーバーヘッドが発生します。 このような場合に発生する同期の量は、チャンクのサイズに反比例します。

一般に、範囲のパーティション分割は、デリゲートの実行時間が小さいか中程度で、ソースに多数の要素があり、各パーティションの合計作業はほぼ同等である場合にのみ高速です。 そのため、ほとんどの場合、チャンクのパーティション分割は一般的に高速です。 デリゲートの要素数が少ないソースまたは実行時間が長いソースでは、チャンクと範囲のパーティション分割のパフォーマンスは約等しくなります。

TPL パーティショナーは、動的な数のパーティションもサポートします。 つまり、 ForEach ループで新しいタスクが生成された場合など、その場でパーティションを作成できます。 この機能により、パーティショナーはループ自体と共にスケーリングできます。 動的パーティショナーも本質的に負荷分散されます。 カスタム パーティショナーを作成するときは、 ForEach ループから使用できるように動的パーティション分割をサポートする必要があります。

PLINQ の負荷分散パーティショナーの構成

Partitioner.Create メソッドのいくつかのオーバーロードを使用すると、配列またはIListソースのパーティショナーを作成し、スレッド間でワークロードのバランスを取ろうとするかどうかを指定できます。 パーティショナーを負荷分散するように構成すると、チャンク パーティション分割が使用され、要素は要求に応じて小さなチャンクで各パーティションに渡されます。 この方法は、ループまたはクエリ全体が完了するまで、すべてのパーティションに処理する要素があることを保証するのに役立ちます。 追加のオーバーロードを使用して、任意の IEnumerable ソースの負荷分散パーティション分割を提供できます。

一般に、負荷分散では、パーティションがパーティションに対して比較的頻繁に要素を要求する必要があります。 これに対し、静的パーティション分割を行うパーティショナーでは、範囲パーティション分割またはチャンク パーティション分割を使用して、各パーティショナーに要素を一度に割り当てることができます。 これにより、負荷分散よりもオーバーヘッドが少なくなりますが、1 つのスレッドが他のスレッドよりも大幅に多くの作業を行う場合、実行に時間がかかる場合があります。 既定では、IList または配列が渡されると、PLINQ は常に負荷分散なしで範囲パーティション分割を使用します。 PLINQ の負荷分散を有効にするには、次の例に示すように、 Partitioner.Create メソッドを使用します。

// Static partitioning requires indexable source. Load balancing
// can use any IEnumerable.
var nums = Enumerable.Range(0, 100000000).ToArray();

// Create a load-balancing partitioner. Or specify false for static partitioning.
Partitioner<int> customPartitioner = Partitioner.Create(nums, true);

// The partitioner is the query's data source.
var q = from x in customPartitioner.AsParallel()
        select x * Math.PI;

q.ForAll((x) =>
{
    ProcessData(x);
});
' Static number of partitions requires indexable source.
Dim nums = Enumerable.Range(0, 100000000).ToArray()

' Create a load-balancing partitioner. Or specify false For  Shared partitioning.
Dim customPartitioner = Partitioner.Create(nums, True)

' The partitioner is the query's data source.
Dim q = From x In customPartitioner.AsParallel()
        Select x * Math.PI

q.ForAll(Sub(x) ProcessData(x))

特定のシナリオで負荷分散を使用するかどうかを判断する最善の方法は、代表的な負荷とコンピューター構成で操作が完了するまでの時間を実験して測定することです。 たとえば、静的パーティション分割は、少数のコアしか持たないマルチコア コンピューターでは速度が飛躍的に向上することがありますが、比較的多くのコアを持つコンピューターでは速度が低下することがあります。

次の表に、 Create メソッドの使用可能なオーバーロードを示します。 これらのパーティショナーは、PLINQ または Taskでのみ使用できるわけではありません。 また、任意のカスタム並列コンストラクトで使用することもできます。

過負荷 負荷分散を使用する
Create<TSource>(IEnumerable<TSource>) いつも
Create<TSource>(TSource[], Boolean) ブール値引数が true として指定されている場合
Create<TSource>(IList<TSource>, Boolean) ブール値引数が true として指定されている場合
Create(Int32, Int32) 決してない
Create(Int32, Int32, Int32) 決してない
Create(Int64, Int64) 決してない
Create(Int64, Int64, Int64) 決してない

Parallel.ForEach の静的範囲パーティショナーの構成

For ループでは、ループの本体がデリゲートとしてメソッドに提供されます。 そのデリゲートを呼び出すコストは、仮想メソッド呼び出しとほぼ同じです。 一部のシナリオでは、並列ループの本体が十分に小さくなり、各ループイテレーションでのデリゲート呼び出しのコストが大きくなる場合があります。 このような状況では、 Create オーバーロードのいずれかを使用して、ソース要素に対して範囲パーティションの IEnumerable<T> を作成できます。 次に、この範囲のコレクションを、本体が通常のfor ループで構成されるForEach メソッドに渡すことができます。 この方法の利点は、デリゲート呼び出しのコストが、要素ごとに 1 回ではなく、範囲ごとに 1 回だけ発生することです。 次の例は、基本的なパターンを示しています。

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main()
    {

        // Source must be array or IList.
        var source = Enumerable.Range(0, 100000).ToArray();

        // Partition the entire source array.
        var rangePartitioner = Partitioner.Create(0, source.Length);

        double[] results = new double[source.Length];

        // Loop over the partitions in parallel.
        Parallel.ForEach(rangePartitioner, (range, loopState) =>
        {
            // Loop over each range element without a delegate invocation.
            for (int i = range.Item1; i < range.Item2; i++)
            {
                results[i] = source[i] * Math.PI;
            }
        });

        Console.WriteLine("Operation complete. Print results? y/n");
        char input = Console.ReadKey().KeyChar;
        if (input == 'y' || input == 'Y')
        {
            foreach(double d in results)
            {
                Console.Write("{0} ", d);
            }
        }
    }
}
Imports System.Threading.Tasks
Imports System.Collections.Concurrent

Module PartitionDemo

    Sub Main()
        ' Source must be array or IList.
        Dim source = Enumerable.Range(0, 100000).ToArray()

        ' Partition the entire source array. 
        ' Let the partitioner size the ranges.
        Dim rangePartitioner = Partitioner.Create(0, source.Length)

        Dim results(source.Length - 1) As Double

        ' Loop over the partitions in parallel. The Sub is invoked
        ' once per partition.
        Parallel.ForEach(rangePartitioner, Sub(range, loopState)

                                               ' Loop over each range element without a delegate invocation.
                                               For i As Integer = range.Item1 To range.Item2 - 1
                                                   results(i) = source(i) * Math.PI
                                               Next
                                           End Sub)
        Console.WriteLine("Operation complete. Print results? y/n")
        Dim input As Char = Console.ReadKey().KeyChar
        If input = "y"c Or input = "Y"c Then
            For Each d As Double In results
                Console.Write("{0} ", d)
            Next
        End If

    End Sub
End Module

ループ内のすべてのスレッドは、指定したサブ範囲内の開始インデックス値と終了インデックス値を含む独自の Tuple<T1,T2> を受け取ります。 内部 for ループでは、 fromInclusivetoExclusive の値を使用して、配列または IList を直接ループ処理します。

Createオーバーロードの 1 つを使用すると、パーティションのサイズとパーティションの数を指定できます。 このオーバーロードは、要素ごとの作業が非常に少なく、要素ごとに 1 つの仮想メソッド呼び出しでもパフォーマンスに顕著な影響を与えるシナリオで使用できます。

カスタム パーティショナー

シナリオによっては、独自のパーティショナーを実装する価値がある場合や、必要になる場合もあります。 たとえば、クラスの内部構造に関する知識に基づいて、既定のパーティショナーよりも効率的にパーティション分割できるカスタム コレクション クラスがあるとします。 または、ソース コレクション内のさまざまな場所で要素を処理するのにかかる時間に関する知識に基づいて、さまざまなサイズの範囲パーティションを作成することもできます。

基本的なカスタム パーティショナーを作成するには、次の表に示すように、 System.Collections.Concurrent.Partitioner<TSource> からクラスを派生させ、仮想メソッドをオーバーライドします。

メソッド 説明
GetPartitions このメソッドはメイン スレッドによって 1 回呼び出され、IList(IEnumerator(TSource)) を返します。 ループまたはクエリ内の各ワーカー スレッドは、一覧の GetEnumerator を呼び出して、個別のパーティションに対する IEnumerator<T> を取得できます。
SupportsDynamicPartitions GetDynamicPartitionsを実装する場合はtrueを返します。それ以外の場合はfalse
GetDynamicPartitions SupportsDynamicPartitionstrueされている場合は、必要に応じて、GetPartitionsの代わりにこのメソッドを呼び出すことができます。

結果を並べ替え可能にする必要がある場合、または要素へのインデックス付きアクセスが必要な場合は、次の表に示すように、 System.Collections.Concurrent.OrderablePartitioner<TSource> から派生し、その仮想メソッドをオーバーライドします。

メソッド 説明
GetPartitions このメソッドはメイン スレッドによって 1 回呼び出され、 IList(IEnumerator(TSource))を返します。 ループまたはクエリ内の各ワーカー スレッドは、一覧の GetEnumerator を呼び出して、個別のパーティションに対する IEnumerator<T> を取得できます。
SupportsDynamicPartitions GetDynamicPartitionsを実装する場合はtrueを返します。それ以外の場合は false。
GetDynamicPartitions 通常、これは単に GetOrderableDynamicPartitionsを呼び出します。
GetOrderableDynamicPartitions SupportsDynamicPartitionstrueされている場合は、必要に応じて、GetPartitionsの代わりにこのメソッドを呼び出すことができます。

次の表では、3 種類の負荷分散パーティショナーが OrderablePartitioner<TSource> クラスを実装する方法について詳しく説明します。

メソッド/プロパティ 負荷分散なしの IList/配列 負荷分散を使用した IList/配列 IEnumerable
GetOrderablePartitions 範囲パーティション分割を使用する 指定された partitionCount のリスト用に最適化されたチャンク パーティション分割を使用します 静的な数のパーティションを作成してチャンク パーティション分割を使用します。
OrderablePartitioner<TSource>.GetOrderableDynamicPartitions サポートされていない例外をスローします リストと動的パーティション用に最適化されたチャンク パーティション分割を使用します 動的な数のパーティションを作成してチャンク パーティション分割を使用します。
KeysOrderedInEachPartition 戻り値 true 戻り値 true 戻り値 true
KeysOrderedAcrossPartitions 戻り値 true 戻り値 false 戻り値 false
KeysNormalized 戻り値 true 戻り値 true 戻り値 true
SupportsDynamicPartitions 戻り値 false 戻り値 true 戻り値 true

動的パーティション

ForEach メソッドでパーティショナーを使用する場合は、動的な数のパーティションを返すことができる必要があります。 つまり、パーティショナーは、ループの実行中にいつでも新しいパーティションの列挙子をオンデマンドで提供できます。 基本的に、ループは新しい並列タスクを追加するたびに、そのタスクの新しいパーティションを要求します。 データを順序付け可能にする必要がある場合は、各パーティション内の各項目に一意のインデックスが割り当てることができるように、 System.Collections.Concurrent.OrderablePartitioner<TSource> から派生します。

詳細と例については、「 方法: 動的パーティションを実装する」を参照してください。

パーティショナーのコントラクト

カスタム パーティショナーを実装する場合は、次のガイドラインに従って、TPL で PLINQ と ForEach との正しい対話を確保します。

  • partitionsCountに対して 0 以下の引数を指定してGetPartitionsが呼び出された場合は、ArgumentOutOfRangeExceptionをスローします。 PLINQ と TPL は、0 に等しい partitionCount を渡すことはありませんが、その可能性から保護することをお勧めします。

  • GetPartitionsGetOrderablePartitions は常に partitionsCount 数のパーティションを返す必要があります。 パーティショナーがデータを使い切り、要求された数のパーティションを作成できない場合、メソッドは残りのパーティションごとに空の列挙子を返す必要があります。 それ以外の場合、PLINQ と TPL の両方が InvalidOperationExceptionをスローします。

  • GetPartitionsGetOrderablePartitionsGetDynamicPartitions、および GetOrderableDynamicPartitionsnull を返すことはありません (Visual Basic のNothing )。 その場合、PLINQ/TPL は InvalidOperationExceptionをスローします。

  • パーティションを返すメソッドは、データ ソースを完全かつ一意に列挙できるパーティションを常に返す必要があります。 パーティショナーの設計で特に必要な場合を除き、データ ソースまたはスキップされた項目の重複は発生しません。 この規則に従わない場合、出力順序がスクランブリングされる可能性があります。

  • 次のブール値ゲッターは、出力順序がスクランブリングされないように、常に次の値を正確に返す必要があります。

    • KeysOrderedInEachPartition: 各パーティションは、増加するキー インデックスを持つ要素を返します。

    • KeysOrderedAcrossPartitions: 返されるすべてのパーティションの場合、パーティション i のキー インデックスは、パーティション i-1 のキー インデックスよりも高くなります。

    • KeysNormalized:すべてのキー インデックスは、ゼロから始まるギャップなしで単調に増加しています。

  • すべてのインデックスは一意である必要があります。 重複するインデックスがない可能性があります。 この規則に従わない場合、出力順序がスクランブリングされる可能性があります。

  • すべてのインデックスは負でない必要があります。 この規則に従っていない場合、PLINQ/TPL は例外をスローする可能性があります。

こちらも参照ください