U-SQL 可编程性指南 - UDT 和 UDAGG

使用用户定义的类型:UDT

用户定义的类型或 UDT 是 U-SQL 的另一个可编程性功能。 U-SQL UDT 的行为类似于常规 C# 用户定义类型。 C# 是一种强类型语言,允许使用内置类型和用户自定义类型。

当 UDT 在行集中的顶点之间传递时,U-SQL 无法隐式序列化或反序列化任意 UDT。 这意味着用户必须通过使用 IFormatter 接口来提供一个明确的格式化器。 这为 U-SQL 提供了 UDT 的序列化和反序列化方法。

注释

U-SQL 的内置提取器和输出器当前无法将 UDT 数据进行序列化或反序列化,即使已配置 IFormatter 格式化程序也是如此。 因此,当你使用 OUTPUT 语句将 UDT 数据写入文件或使用提取器读取它时,必须将其作为字符串或字节数组传递。 然后,显式调用序列化和反序列化代码(即 UDT 的 ToString() 方法。 另一方面,用户定义的提取器和输出器可以读取和写入 UDT。

如果尝试在 EXTRACTOR 或 OUTPUTTER 中使用 UDT(在先前的 SELECT 之外),如下所示:

@rs1 =
    SELECT
        MyNameSpace.Myfunction_Returning_UDT(filed1) AS myfield
    FROM @rs0;

OUTPUT @rs1
    TO @output_file
    USING Outputters.Text();

收到以下错误:

Error	1	E_CSC_USER_INVALIDTYPEINOUTPUTTER: Outputters.Text was used to output column myfield of type
MyNameSpace.Myfunction_Returning_UDT.

Description:

Outputters.Text only supports built-in types.

Resolution:

Implement a custom outputter that knows how to serialize this type, or call a serialization method on the type in
the preceding SELECT.	C:\Users\sergeypu\Documents\Visual Studio 2013\Projects\USQL-Programmability\
USQL-Programmability\Types.usql	52	1	USQL-Programmability

若要在输出器中使用 UDT,必须将其序列化为使用 ToString() 方法的字符串,或创建自定义输出器。

目前无法在 GROUP BY 中使用 UDT。 如果在 GROUP BY 中使用 UDT,则会引发以下错误:

Error	1	E_CSC_USER_INVALIDTYPEINCLAUSE: GROUP BY doesn't support type MyNameSpace.Myfunction_Returning_UDT
for column myfield

Description:

GROUP BY doesn't support UDT or Complex types.

Resolution:

Add a SELECT statement where you can project a scalar column that you want to use with GROUP BY.
C:\Users\sergeypu\Documents\Visual Studio 2013\Projects\USQL-Programmability\USQL-Programmability\Types.usql
62	5	USQL-Programmability

若要定义 UDT,必须:

  1. 添加以下命名空间:
using Microsoft.Analytics.Interfaces
using System.IO;
  1. 添加 Microsoft.Analytics.Interfaces,这是 UDT 接口所必需的。 此外, System.IO 可能需要定义 IFormatter 接口。

  2. 使用 SqlUserDefinedType 属性定义用户定义的类型。

SqlUserDefinedType 用于将程序集中的类型定义标记为 U-SQL 中的用户定义类型(UDT)。 属性上的属性反映了 UDT 的物理特征。 此类无法被继承。

SqlUserDefinedType 是 UDT 定义的必需属性。

类的构造函数:

  • SqlUserDefinedTypeAttribute (类型格式化程序)

  • 类型格式化程序:定义 UDT 格式化程序所需的参数,必须在此处传递接口IFormatter的类型。

[SqlUserDefinedType(typeof(MyTypeFormatter))]
public class MyType
{ … }
  • 典型的 UDT 还需要定义 IFormatter 接口,如以下示例所示:
public class MyTypeFormatter : IFormatter<MyType>
{
    public void Serialize(MyType instance, IColumnWriter writer, ISerializationContext context)
    { … }

    public MyType Deserialize(IColumnReader reader, ISerializationContext context)
    { … }
}

IFormatter接口用于序列化和反序列化根类型为<类型参数引用“T”>的对象图。

<typeparam name=“T”>表示要序列化和反序列化的对象图的根类型。

  • 反序列化:对提供的流中的数据进行反序列化,并重建对象图。

  • 序列化:使用提供的流的给定根对对象或对象图进行序列化。

MyType 实例:类型的实例。 IColumnWriter 编写器/ IColumnReader 读取器:底层列流。 ISerializationContext context:枚举,用于定义一组标志,用于指定序列化期间流的源或目标上下文。

  • 中间:指定源或目标上下文不是持久存储区。

  • 持久性:指定源或目标上下文是持久存储区。

作为常规 C# 类型,U-SQL UDT 定义可以包含运算符的替代,例如 +/===/!=。 它还可以包括静态方法。 例如,如果要将此 UDT 用作 U-SQL MIN 聚合函数的参数,则必须定义 < 运算符重载。

在本指南的前面部分,我们演示了从特定日期以格式 Qn:Pn (Q1:P10)标识会计期间的示例。 以下示例演示如何为会计周期值定义自定义类型。

下面是包含自定义 UDT 和 IFormatter 接口的代码隐藏部分的示例:

[SqlUserDefinedType(typeof(FiscalPeriodFormatter))]
public struct FiscalPeriod
{
    public int Quarter { get; private set; }

    public int Month { get; private set; }

    public FiscalPeriod(int quarter, int month):this()
    {
        this.Quarter = quarter;
        this.Month = month;
    }

    public override bool Equals(object obj)
    {
        if (ReferenceEquals(null, obj))
        {
            return false;
        }

        return obj is FiscalPeriod && Equals((FiscalPeriod)obj);
    }

    public bool Equals(FiscalPeriod other)
    {
return this.Quarter.Equals(other.Quarter) && this.Month.Equals(other.Month);
    }

    public bool GreaterThan(FiscalPeriod other)
    {
return this.Quarter.CompareTo(other.Quarter) > 0 || this.Month.CompareTo(other.Month) > 0;
    }

    public bool LessThan(FiscalPeriod other)
    {
return this.Quarter.CompareTo(other.Quarter) < 0 || this.Month.CompareTo(other.Month) < 0;
    }

    public override int GetHashCode()
    {
        unchecked
        {
            return (this.Quarter.GetHashCode() * 397) ^ this.Month.GetHashCode();
        }
    }

    public static FiscalPeriod operator +(FiscalPeriod c1, FiscalPeriod c2)
    {
return new FiscalPeriod((c1.Quarter + c2.Quarter) > 4 ? (c1.Quarter + c2.Quarter)-4 : (c1.Quarter + c2.Quarter), (c1.Month + c2.Month) > 12 ? (c1.Month + c2.Month) - 12 : (c1.Month + c2.Month));
    }

    public static bool operator ==(FiscalPeriod c1, FiscalPeriod c2)
    {
        return c1.Equals(c2);
    }

    public static bool operator !=(FiscalPeriod c1, FiscalPeriod c2)
    {
        return !c1.Equals(c2);
    }
    public static bool operator >(FiscalPeriod c1, FiscalPeriod c2)
    {
        return c1.GreaterThan(c2);
    }
    public static bool operator <(FiscalPeriod c1, FiscalPeriod c2)
    {
        return c1.LessThan(c2);
    }
    public override string ToString()
    {
        return (String.Format("Q{0}:P{1}", this.Quarter, this.Month));
    }

}

public class FiscalPeriodFormatter : IFormatter<FiscalPeriod>
{
    public void Serialize(FiscalPeriod instance, IColumnWriter writer, ISerializationContext context)
    {
        using (var binaryWriter = new BinaryWriter(writer.BaseStream))
        {
            binaryWriter.Write(instance.Quarter);
            binaryWriter.Write(instance.Month);
            binaryWriter.Flush();
        }
    }

    public FiscalPeriod Deserialize(IColumnReader reader, ISerializationContext context)
    {
        using (var binaryReader = new BinaryReader(reader.BaseStream))
        {
var result = new FiscalPeriod(binaryReader.ReadInt16(), binaryReader.ReadInt16());
            return result;
        }
    }
}

定义的类型包括两个数字:季度和月份。 此处定义了运算符 ==/!=/>/< 和静态方法 ToString()

如前所述,UDT 可以在 SELECT 表达式中使用,但不能在 OUTPUTTER/EXTRACTOR 中使用,而无需自定义序列化。 它要么必须序列化为带有 ToString() 的字符串,要么与自定义的 OUTPUTTER/EXTRACTOR 一起使用。

现在,让我们讨论 UDT 的用法。 在代码隐藏部分中,我们已将 GetFiscalPeriod 函数更改为以下内容:

public static FiscalPeriod GetFiscalPeriodWithCustomType(DateTime dt)
{
    int FiscalMonth = 0;
    if (dt.Month < 7)
    {
        FiscalMonth = dt.Month + 6;
    }
    else
    {
        FiscalMonth = dt.Month - 6;
    }

    int FiscalQuarter = 0;
    if (FiscalMonth >= 1 && FiscalMonth <= 3)
    {
        FiscalQuarter = 1;
    }
    if (FiscalMonth >= 4 && FiscalMonth <= 6)
    {
        FiscalQuarter = 2;
    }
    if (FiscalMonth >= 7 && FiscalMonth <= 9)
    {
        FiscalQuarter = 3;
    }
    if (FiscalMonth >= 10 && FiscalMonth <= 12)
    {
        FiscalQuarter = 4;
    }

    return new FiscalPeriod(FiscalQuarter, FiscalMonth);
}

可以看到,它返回 FiscalPeriod 类型的值。

下面提供了如何在 U-SQL 基本脚本中进一步使用它的示例。 此示例演示 U-SQL 脚本中不同形式的 UDT 调用。

DECLARE @input_file string = @"c:\work\cosmos\usql-programmability\input_file.tsv";
DECLARE @output_file string = @"c:\work\cosmos\usql-programmability\output_file.tsv";

@rs0 =
    EXTRACT
        guid string,
        dt DateTime,
        user String,
        des String
    FROM @input_file USING Extractors.Tsv();

@rs1 =
    SELECT
        guid AS start_id,
        dt,
        DateTime.Now.ToString("M/d/yyyy") AS Nowdate,
        USQL_Programmability.CustomFunctions.GetFiscalPeriodWithCustomType(dt).Quarter AS fiscalquarter,
        USQL_Programmability.CustomFunctions.GetFiscalPeriodWithCustomType(dt).Month AS fiscalmonth,
        USQL_Programmability.CustomFunctions.GetFiscalPeriodWithCustomType(dt) + new USQL_Programmability.CustomFunctions.FiscalPeriod(1,7) AS fiscalperiod_adjusted,
        user,
        des
    FROM @rs0;

@rs2 =
    SELECT
        start_id,
        dt,
        DateTime.Now.ToString("M/d/yyyy") AS Nowdate,
        fiscalquarter,
        fiscalmonth,
        USQL_Programmability.CustomFunctions.GetFiscalPeriodWithCustomType(dt).ToString() AS fiscalperiod,

           // This user-defined type was created in the prior SELECT.  Passing the UDT to this subsequent SELECT would have failed if the UDT was not annotated with an IFormatter.
           fiscalperiod_adjusted.ToString() AS fiscalperiod_adjusted,
           user,
           des
    FROM @rs1;

OUTPUT @rs2
    TO @output_file
    USING Outputters.Text();

下面是完整代码隐藏部分的示例:

using Microsoft.Analytics.Interfaces;
using Microsoft.Analytics.Types.Sql;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.IO;

namespace USQL_Programmability
{
    public class CustomFunctions
    {
        static public DateTime? ToDateTime(string dt)
        {
            DateTime dtValue;

            if (!DateTime.TryParse(dt, out dtValue))
                return Convert.ToDateTime(dt);
            else
                return null;
        }

        public static FiscalPeriod GetFiscalPeriodWithCustomType(DateTime dt)
        {
            int FiscalMonth = 0;
            if (dt.Month < 7)
            {
                FiscalMonth = dt.Month + 6;
            }
            else
            {
                FiscalMonth = dt.Month - 6;
            }

            int FiscalQuarter = 0;
            if (FiscalMonth >= 1 && FiscalMonth <= 3)
            {
                FiscalQuarter = 1;
            }
            if (FiscalMonth >= 4 && FiscalMonth <= 6)
            {
                FiscalQuarter = 2;
            }
            if (FiscalMonth >= 7 && FiscalMonth <= 9)
            {
                FiscalQuarter = 3;
            }
            if (FiscalMonth >= 10 && FiscalMonth <= 12)
            {
                FiscalQuarter = 4;
            }

            return new FiscalPeriod(FiscalQuarter, FiscalMonth);
        }        [SqlUserDefinedType(typeof(FiscalPeriodFormatter))]
        public struct FiscalPeriod
        {
            public int Quarter { get; private set; }

            public int Month { get; private set; }

            public FiscalPeriod(int quarter, int month):this()
            {
                this.Quarter = quarter;
                this.Month = month;
            }

            public override bool Equals(object obj)
            {
                if (ReferenceEquals(null, obj))
                {
                    return false;
                }

                return obj is FiscalPeriod && Equals((FiscalPeriod)obj);
            }

            public bool Equals(FiscalPeriod other)
            {
return this.Quarter.Equals(other.Quarter) &&    this.Month.Equals(other.Month);
            }

            public bool GreaterThan(FiscalPeriod other)
            {
return this.Quarter.CompareTo(other.Quarter) > 0 || this.Month.CompareTo(other.Month) > 0;
            }

            public bool LessThan(FiscalPeriod other)
            {
return this.Quarter.CompareTo(other.Quarter) < 0 || this.Month.CompareTo(other.Month) < 0;
            }

            public override int GetHashCode()
            {
                unchecked
                {
                    return (this.Quarter.GetHashCode() * 397) ^ this.Month.GetHashCode();
                }
            }

            public static FiscalPeriod operator +(FiscalPeriod c1, FiscalPeriod c2)
            {
return new FiscalPeriod((c1.Quarter + c2.Quarter) > 4 ? (c1.Quarter + c2.Quarter)-4 : (c1.Quarter + c2.Quarter), (c1.Month + c2.Month) > 12 ? (c1.Month + c2.Month) - 12 : (c1.Month + c2.Month));
            }

            public static bool operator ==(FiscalPeriod c1, FiscalPeriod c2)
            {
                return c1.Equals(c2);
            }

            public static bool operator !=(FiscalPeriod c1, FiscalPeriod c2)
            {
                return !c1.Equals(c2);
            }
            public static bool operator >(FiscalPeriod c1, FiscalPeriod c2)
            {
                return c1.GreaterThan(c2);
            }
            public static bool operator <(FiscalPeriod c1, FiscalPeriod c2)
            {
                return c1.LessThan(c2);
            }
            public override string ToString()
            {
                return (String.Format("Q{0}:P{1}", this.Quarter, this.Month));
            }

        }

        public class FiscalPeriodFormatter : IFormatter<FiscalPeriod>
        {
public void Serialize(FiscalPeriod instance, IColumnWriter writer, ISerializationContext context)
            {
                using (var binaryWriter = new BinaryWriter(writer.BaseStream))
                {
                    binaryWriter.Write(instance.Quarter);
                    binaryWriter.Write(instance.Month);
                    binaryWriter.Flush();
                }
            }

public FiscalPeriod Deserialize(IColumnReader reader, ISerializationContext context)
            {
                using (var binaryReader = new BinaryReader(reader.BaseStream))
                {
var result = new FiscalPeriod(binaryReader.ReadInt16(), binaryReader.ReadInt16());
                    return result;
                }
            }
        }
    }
}

使用用户定义的聚合:UDAGG

用户定义的聚合是指 U-SQL 没有默认提供的任何聚合相关函数。 该示例可以用作聚合,用于执行自定义数学计算、字符串连接、字符串操作等。

用户定义的聚合基类定义如下所示:

    [SqlUserDefinedAggregate]
    public abstract class IAggregate<T1, T2, TResult> : IAggregate
    {
        protected IAggregate();

        public abstract void Accumulate(T1 t1, T2 t2);
        public abstract void Init();
        public abstract TResult Terminate();
    }

SqlUserDefinedAggregate 指示类型应注册为用户定义的聚合。 此类无法被继承。

对于 UDAGG 定义,SqlUserDefinedType 属性是 可选的

基类允许传递三个抽象参数:两个作为输入参数,一个作为结果。 数据类型是可变的,应在类继承期间定义。

public class GuidAggregate : IAggregate<string, string, string>
{
    string guid_agg;

    public override void Init()
    { … }

    public override void Accumulate(string guid, string user)
    { … }

    public override string Terminate()
    { … }
}
  • Init 在计算期间为每个组调用一次。 它为每个聚合组提供初始化例程。
  • 累积被执行一次,对每个值。 它提供聚合算法的主要功能。 它可用于聚合在类继承期间定义的各种数据类型的值。 它可以接受变量数据类型的两个参数。
  • 处理结束时对每个聚合组执行一次终止,以输出每个组的结果。

若要声明正确的输入和输出数据类型,请使用类定义,如下所示:

public abstract class IAggregate<T1, T2, TResult> : IAggregate
  • T1:要累积的第一个参数
  • T2:要累积的第二个参数
  • TResult:terminate 函数的返回类型

例如:

public class GuidAggregate : IAggregate<string, int, int>

public class GuidAggregate : IAggregate<string, string, string>

在 U-SQL 中使用 UDAGG

若要使用 UDAGG,请先在后台代码中定义它,或者从现有的可编程性 DLL 中引用它,如前所述。

然后使用以下语法:

AGG<UDAGG_functionname>(param1,param2)

下面是 UDAGG 的示例:

public class GuidAggregate : IAggregate<string, string, string>
{
    string guid_agg;

    public override void Init()
    {
        guid_agg = "";
    }

    public override void Accumulate(string guid, string user)
    {
        if (user.ToUpper()== "USER1")
        {
            guid_agg += "{" + guid + "}";
        }
    }

    public override string Terminate()
    {
        return guid_agg;
    }

}

基本的 U-SQL 脚本:

DECLARE @input_file string = @"\usql-programmability\input_file.tsv";
DECLARE @output_file string = @" \usql-programmability\output_file.tsv";

@rs0 =
    EXTRACT
            guid string,
            dt DateTime,
            user String,
            des String
    FROM @input_file
    USING Extractors.Tsv();

@rs1 =
    SELECT
        user,
        AGG<USQL_Programmability.GuidAggregate>(guid,user) AS guid_list
    FROM @rs0
    GROUP BY user;

OUTPUT @rs1 TO @output_file USING Outputters.Text();

在此用例场景中,我们将特定用户的类 GUID 串联起来。

后续步骤