Compartir a través de


Particionadores personalizados para PLINQ y TPL

Para paralelizar una operación en un origen de datos, uno de los pasos esenciales es dividir el origen en varias secciones a las que varios subprocesos pueden acceder simultáneamente. PLINQ y la Task Parallel Library (TPL) proporcionan particionadores predeterminados que funcionan de forma transparente al escribir una consulta o un bucle paralelo. Para escenarios más avanzados, puede conectar su propio particionador.

Tipos de particiones

Hay muchas maneras de crear particiones de un origen de datos. En los enfoques más eficaces, varios subprocesos cooperan para procesar la secuencia de origen original, en lugar de separar físicamente el origen en varias subsecuencias. En el caso de las matrices y otros orígenes indexados, como IList colecciones en las que se conoce la longitud de antemano, la creación de particiones de intervalo es el tipo más sencillo de creación de particiones. Cada subproceso recibe índices exclusivos de apertura y cierre, para poder procesar su rango del origen sin sobrescribir subprocesos ni ser sobrescrito por algún subproceso. La única sobrecarga implicada en la creación de particiones de intervalos es el trabajo inicial de crear los intervalos; no se requiere ninguna sincronización adicional después de eso. Por lo tanto, puede proporcionar un buen rendimiento siempre que la carga de trabajo se divida uniformemente. Una desventaja de la partición por intervalos es que si un hilo finaliza temprano, no puede ayudar a los otros hilos a terminar su trabajo.

Para las listas vinculadas u otras colecciones cuya longitud no se conoce, puede usar la partición por bloques. En la creación de particiones de fragmentos, cada subproceso o tarea de un bucle paralelo o consulta consume algún número de elementos de origen en un fragmento, los procesa y, a continuación, vuelve a recuperar elementos adicionales. El particionador garantiza que todos los elementos se distribuyen y que no hay duplicados. Un fragmento puede tener cualquier tamaño. Por ejemplo, el particionador que se muestra en How to: Implement Dynamic Partitions crea fragmentos que contienen solo un elemento. Siempre que los fragmentos no sean demasiado grandes, este tipo de creación de particiones tiene un equilibrio de carga inherente, porque la asignación de elementos a los subprocesos no es predeterminada. Sin embargo, el particionador incurre en la sobrecarga de sincronización cada vez que el subproceso necesita obtener otro fragmento. La cantidad de sincronización en que se incurre en estos casos es inversamente proporcional al tamaño de los fragmentos.

En general, la creación de particiones por rangos solo es más rápida cuando el tiempo de ejecución del delegado es de bajo a moderado y el origen tiene un gran número de elementos y el trabajo total de cada partición es más o menos equivalente. Por tanto, la creación de particiones por fragmentos suele ser más rápida en la mayoría de los casos. En fuentes con un número reducido de elementos o mayor tiempo de ejecución del delegado, el rendimiento de la partición en fragmentos y en intervalos es aproximadamente equivalente.

Los particionadores de TPL también admiten un número dinámico de particiones. Esto significa que pueden crear particiones sobre la marcha, por ejemplo, cuando el ForEach bucle genera una nueva tarea. Esta característica permite al particionador escalar junto con el bucle en sí mismo. Los particionadores dinámicos también son intrínsecamente equilibradores de carga. Cuando se crea un particionador personalizado, debe admitir la creación de particiones dinámicas para poder usarlas desde un bucle ForEach.

Configuración de particiones de equilibrio de carga para PLINQ

Algunas sobrecargas del método Partitioner.Create permiten crear un particionador para una matriz o un origen IList y especificar si debe intentar equilibrar la carga de trabajo entre los subprocesos. Cuando el particionador está configurado para distribuir la carga, se utiliza el particionamiento en fragmentos y los elementos se entregan a cada partición en pequeños fragmentos a medida que se solicitan. Este enfoque ayuda a garantizar que todas las particiones tengan elementos que procesar hasta que se complete todo el bucle o consulta. Se puede usar una sobrecarga adicional para proporcionar particiones de equilibrio de carga de cualquier origen IEnumerable.

En general, el equilibrio de carga requiere que las particiones soliciten elementos con relativamente frecuencia desde el particionador. Por el contrario, un particionador que realiza particiones estáticas puede asignar los elementos de una vez a cada particionador mediante particionamiento por intervalos o bloques. Esto requiere menos sobrecarga que el equilibrio de carga, pero puede tardar más tiempo en ejecutarse si un subproceso termina con mucho más trabajo que los demás. Por defecto, cuando se le pasa un IList o una matriz, PLINQ siempre utiliza el particionamiento de intervalo sin equilibrio de carga. Para habilitar el equilibrio de carga para PLINQ, use el Partitioner.Create método , como se muestra en el ejemplo siguiente.

// Static partitioning requires indexable source. Load balancing
// can use any IEnumerable.
var nums = Enumerable.Range(0, 100000000).ToArray();

// Create a load-balancing partitioner. Or specify false for static partitioning.
Partitioner<int> customPartitioner = Partitioner.Create(nums, true);

// The partitioner is the query's data source.
var q = from x in customPartitioner.AsParallel()
        select x * Math.PI;

q.ForAll((x) =>
{
    ProcessData(x);
});
' Static number of partitions requires indexable source.
Dim nums = Enumerable.Range(0, 100000000).ToArray()

' Create a load-balancing partitioner. Or specify false For  Shared partitioning.
Dim customPartitioner = Partitioner.Create(nums, True)

' The partitioner is the query's data source.
Dim q = From x In customPartitioner.AsParallel()
        Select x * Math.PI

q.ForAll(Sub(x) ProcessData(x))

La mejor manera de determinar si se debe usar el equilibrio de carga en cualquier escenario determinado es experimentar y medir cuánto tiempo tardan en completarse las operaciones en cargas representativas y configuraciones de equipo. Por ejemplo, la creación de particiones estáticas podría proporcionar una velocidad significativa en un equipo de varios núcleos que solo tiene unos pocos núcleos, pero podría provocar ralentizaciones en los equipos que tienen relativamente muchos núcleos.

En la tabla siguiente se enumeran las sobrecargas disponibles del Create método . Estos particionadores no se limitan a usar solo con PLINQ o Task. También se pueden usar con cualquier construcción paralela personalizada.

Sobrecarga Usa el balanceo de carga
Create<TSource>(IEnumerable<TSource>) Siempre
Create<TSource>(TSource[], Boolean) Cuando el argumento booleano se especifica como true
Create<TSource>(IList<TSource>, Boolean) Cuando el argumento booleano se especifica como true
Create(Int32, Int32) Nunca
Create(Int32, Int32, Int32) Nunca
Create(Int64, Int64) Nunca
Create(Int64, Int64, Int64) Nunca

Configuración de particionadores de intervalo estático para Parallel.ForEach

En un bucle For, el cuerpo del bucle se proporciona al método como un delegado. El costo de invocar ese delegado es aproximadamente el mismo que una llamada de método virtual. En algunos escenarios, el cuerpo de un bucle paralelo podría ser lo suficientemente pequeño como para que el costo de la invocación del delegado en cada iteración del bucle sea significativo. En estas situaciones, puede usar una de las sobrecargas Create para crear una interfaz IEnumerable<T> de particiones por rangos de los elementos de origen. A continuación, puede pasar esta colección de intervalos a un método ForEach cuyo cuerpo consta de un bucle regular for. La ventaja de este enfoque es que el costo de invocación del delegado se incurre solo una vez por intervalo, en lugar de una vez por elemento. En el ejemplo siguiente se muestra el patrón básico.

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main()
    {

        // Source must be array or IList.
        var source = Enumerable.Range(0, 100000).ToArray();

        // Partition the entire source array.
        var rangePartitioner = Partitioner.Create(0, source.Length);

        double[] results = new double[source.Length];

        // Loop over the partitions in parallel.
        Parallel.ForEach(rangePartitioner, (range, loopState) =>
        {
            // Loop over each range element without a delegate invocation.
            for (int i = range.Item1; i < range.Item2; i++)
            {
                results[i] = source[i] * Math.PI;
            }
        });

        Console.WriteLine("Operation complete. Print results? y/n");
        char input = Console.ReadKey().KeyChar;
        if (input == 'y' || input == 'Y')
        {
            foreach(double d in results)
            {
                Console.Write("{0} ", d);
            }
        }
    }
}
Imports System.Threading.Tasks
Imports System.Collections.Concurrent

Module PartitionDemo

    Sub Main()
        ' Source must be array or IList.
        Dim source = Enumerable.Range(0, 100000).ToArray()

        ' Partition the entire source array. 
        ' Let the partitioner size the ranges.
        Dim rangePartitioner = Partitioner.Create(0, source.Length)

        Dim results(source.Length - 1) As Double

        ' Loop over the partitions in parallel. The Sub is invoked
        ' once per partition.
        Parallel.ForEach(rangePartitioner, Sub(range, loopState)

                                               ' Loop over each range element without a delegate invocation.
                                               For i As Integer = range.Item1 To range.Item2 - 1
                                                   results(i) = source(i) * Math.PI
                                               Next
                                           End Sub)
        Console.WriteLine("Operation complete. Print results? y/n")
        Dim input As Char = Console.ReadKey().KeyChar
        If input = "y"c Or input = "Y"c Then
            For Each d As Double In results
                Console.Write("{0} ", d)
            Next
        End If

    End Sub
End Module

Cada subproceso del bucle recibe su propio Tuple<T1,T2> que contiene los valores de índice de inicio y fin en el subrango especificado. El bucle for interno usa los valores fromInclusive y toExclusive para recorrer en bucle la matriz o IList directamente.

Una de las sobrecargas Create le permite especificar el tamaño de las particiones y el número de particiones. Esta sobrecarga se puede usar en escenarios en los que el trabajo por elemento es tan bajo que incluso una llamada de método virtual por elemento tiene un impacto notable en el rendimiento.

Particionadores personalizados

En algunos escenarios, puede ser útil o incluso necesario implementar su propio particionador. Por ejemplo, puede tener una clase de colección personalizada que pueda dividir en particiones de forma más eficaz que los particionadores predeterminados, en función de su conocimiento de la estructura interna de la clase. O bien, puede que desee crear particiones de intervalo de diferentes tamaños en función del conocimiento de cuánto tiempo tardará en procesar elementos en diferentes ubicaciones de la colección de origen.

Para crear un particionador personalizado básico, derive una clase de System.Collections.Concurrent.Partitioner<TSource> e invalide los métodos virtuales, como se describe en la tabla siguiente.

Método Descripción
GetPartitions El subproceso principal llama a este método una vez y devuelve un IList(IEnumerator(TSource)). Cada subproceso de trabajo del bucle o la consulta puede llamar a GetEnumerator en la lista para recuperar IEnumerator<T> a través de una partición distinta.
SupportsDynamicPartitions Devuelve true si implementa GetDynamicPartitions, de lo contrario, false.
GetDynamicPartitions Si SupportsDynamicPartitions es true, este método se puede llamar opcionalmente en lugar de GetPartitions.

Si los resultados deben ser ordenables o necesita acceso indizado a los elementos, derive de System.Collections.Concurrent.OrderablePartitioner<TSource> e invalide sus métodos virtuales como se describe en la tabla siguiente.

Método Descripción
GetPartitions Este método es llamado una vez por el subproceso principal y devuelve un IList(IEnumerator(TSource)). Cada subproceso de trabajo del bucle o la consulta puede llamar a GetEnumerator en la lista para recuperar IEnumerator<T> a través de una partición distinta.
SupportsDynamicPartitions Devuelve true si implementa GetDynamicPartitions; de lo contrario, false.
GetDynamicPartitions Normalmente, esto simplemente llama a GetOrderableDynamicPartitions.
GetOrderableDynamicPartitions Si SupportsDynamicPartitions es true, este método se puede llamar opcionalmente en lugar de GetPartitions.

En la tabla siguiente se proporcionan detalles adicionales sobre cómo implementan la OrderablePartitioner<TSource> clase los tres tipos de particionadores de equilibrio de carga.

Método/Propiedad IList/matriz sin equilibrio de carga IList/Array con equilibrio de carga IEnumerable
GetOrderablePartitions Usa el particionamiento por rangos Usa la creación de particiones por fragmentos optimizada para listas de la clase partitionCount especificada Usa la creación de particiones por fragmentos mediante la creación de un número estático de particiones.
OrderablePartitioner<TSource>.GetOrderableDynamicPartitions Genera una excepción no admitida Usa la creación de particiones de fragmentos optimizada para listas y particiones dinámicas Utiliza particionamiento de fragmentos creando un número dinámico de particiones.
KeysOrderedInEachPartition Devuelve true Devuelve true Devuelve true
KeysOrderedAcrossPartitions Devuelve true Devuelve false Devuelve false
KeysNormalized Devuelve true Devuelve true Devuelve true
SupportsDynamicPartitions Devuelve false Devuelve true Devuelve true

Particiones dinámicas

Si considera que se debe usar el particionador en un ForEach método, debe ser capaz de devolver un número dinámico de particiones. Esto significa que el particionador puede proporcionar un enumerador para una nueva partición a petición en cualquier momento durante la ejecución del bucle. Básicamente, siempre que el bucle agrega una nueva tarea paralela, solicita una nueva partición para esa tarea. Si necesita que se puedan ordenar los datos, derive de System.Collections.Concurrent.OrderablePartitioner<TSource> para que cada elemento de cada partición tenga asignado un índice único.

Para obtener más información y un ejemplo, vea Cómo: Implementar particiones dinámicas.

Contrato para particionadores

Al implementar un particionador personalizado, siga estas instrucciones para ayudar a garantizar la interacción correcta con PLINQ y ForEach en el TPL:

  • Si se llama a GetPartitions con un argumento de cero o menos para partitionsCount, se produce ArgumentOutOfRangeException. Aunque PLINQ y TPL nunca pasarán una clase partitionCount igual a 0, no obstante, se recomienda adoptar medidas preventivas para evitar esta posibilidad.

  • GetPartitions y GetOrderablePartitions siempre deben devolver partitionsCount el número de particiones. Si el particionador se queda sin datos y no puede crear tantas particiones como se solicite, el método debe devolver un enumerador vacío para cada una de las particiones restantes. De lo contrario, PLINQ y TPL producirán una excepción InvalidOperationException.

  • GetPartitions, GetOrderablePartitions, GetDynamicPartitionsy GetOrderableDynamicPartitions nunca debe devolver null (Nothing en Visual Basic). Si lo hacen, PLINQ/TPL producirán una excepción InvalidOperationException.

  • Los métodos que devuelven particiones siempre deben devolver particiones que puedan enumerar de forma completa y única el origen de datos. No debe haber ninguna duplicación en el origen de datos ni en los elementos omitidos, a menos que el diseño del particionador lo requiera específicamente. Si no se sigue esta regla, el orden de salida puede estar desordenado.

  • Los siguientes captadores booleanos deben devolver siempre con precisión los siguientes valores para que no se altere el orden de salida:

    • KeysOrderedInEachPartition: cada partición devuelve elementos con índices clave crecientes.

    • KeysOrderedAcrossPartitions: para todas las particiones que se devuelven, los índices de clave de la partición i son superiores a los índices de clave de la partición i-1.

    • KeysNormalized: todos los índices clave están aumentando monotonicamente sin brechas, empezando por cero.

  • Todos los índices deben ser únicos. Es posible que no haya índices duplicados. Si no se sigue esta regla, el orden de salida puede estar desordenado.

  • Todos los índices deben ser no negativos. Si no se sigue esta regla, PLINQ/TPL puede producir excepciones.

Consulte también