Orleans 交易

Orleans 支持针对持久粒度状态的分布式 ACID 事务。 事务是使用 Microsoft.Transactions NuGet 包实现的。 本文中示例应用的源代码由四个项目组成:

  • 抽象:包含粒度接口和共享类的类库。
  • 粒度:包含粒度实现的类库。
  • 服务器:一个控制台应用,它使用抽象和粒类库,并充当Orleans孤岛。
  • 客户端:使用表示客户端的抽象类库的 Orleans 控制台应用。

设置

Orleans 交易是选择加入的。 筒仓和客户端都必须配置为使用事务。 如果未配置它们,则对粒度实现上事务方法的任何调用都会收到一个 OrleansTransactionsDisabledException。 若要在仓库上启用事务,请调用仓库主机生成器上的 SiloBuilderExtensions.UseTransactions

var builder = Host.CreateDefaultBuilder(args)
    UseOrleans((context, siloBuilder) =>
    {
        siloBuilder.UseTransactions();
    });

同样,若要在客户端上启用事务,请调用 ClientBuilderExtensions.UseTransactions 客户端主机生成器:

var builder = Host.CreateDefaultBuilder(args)
    UseOrleansClient((context, clientBuilder) =>
    {
        clientBuilder.UseTransactions();
    });

事务状态存储

若要使用事务,需要配置数据存储区。 若要支持使用事务的各种数据存储,Orleans 使用存储抽象 ITransactionalStateStorage<TState>。 这种抽象特定于事务的需求,与通用粒度存储(IGrainStorage) 不同。 要使用特定于事务的存储,请使用 ITransactionalStateStorage 的任何实现来配置孤立宿主(例如使用 Azure AddAzureTableTransactionalStateStorage)。

例如,请考虑以下主机生成器配置:

await Host.CreateDefaultBuilder(args)
    .UseOrleans((_, silo) =>
    {
        silo.UseLocalhostClustering();

        if (Environment.GetEnvironmentVariable(
                "ORLEANS_STORAGE_CONNECTION_STRING") is { } connectionString)
        {
            silo.AddAzureTableTransactionalStateStorage(
                "TransactionStore", 
                options => options.ConfigureTableServiceClient(connectionString));
        }
        else
        {
            silo.AddMemoryGrainStorageAsDefault();
        }

        silo.UseTransactions();
    })
    .RunConsoleAsync();

出于开发目的,如果特定于事务的存储不适用于所需的数据存储,则可以改用 IGrainStorage 实现。 对于未配置存储的任何事务状态,事务会尝试通过网桥故障转移至粒度存储系统。 通过桥访问粮食存储的事务状态效率较低,将来可能不受支持。 因此,我们建议仅出于开发目的使用此方法。

晶粒界面

若要使粒度支持事务,必须使用 TransactionAttribute 在其粒接口上将方法标记为事务的一部分。 该属性需要指示粒度调用在事务环境中的行为方式,如以下 TransactionOption 值详述:

  • TransactionOption.Create:调用是事务性的,并且将始终创建新的事务上下文(它启动一个新事务),即使在现有事务上下文中调用也是如此。
  • TransactionOption.Join:调用是事务性的,但只能在现有事务的上下文中调用。
  • TransactionOption.CreateOrJoin:调用是事务性的。 如果在事务的上下文中调用,它将使用该上下文,否则将创建新的上下文。
  • TransactionOption.Suppress:调用不是事务性的,但可以从事务内部调用。 如果在事务上下文中调用,则上下文不会被传递到调用中。
  • TransactionOption.Supported:尽管调用不是事务性的,但它支持事务。 如果在事务上下文中进行调用,则上下文会在调用时被传递。
  • TransactionOption.NotAllowed:调用不是事务性的,不能从事务内部调用。 如果在事务的上下文中调用,它将抛出NotSupportedException

可以将调用标记为 TransactionOption.Create,这意味着调用总是启动其事务。 例如,Transfer 下述 ATM 模块中的操作始终启动一个涉及这两个引用账户的新事务。

namespace TransactionalExample.Abstractions;

public interface IAtmGrain : IGrainWithIntegerKey
{
    [Transaction(TransactionOption.Create)]
    Task Transfer(string fromId, string toId, decimal amountToTransfer);
}

事务操作 WithdrawDeposit 被标记为帐户粒度 TransactionOption.Join。 只能在现有事务的上下文中调用,如果在IAtmGrain.Transfer期间调用,那么就是这种情况。 GetBalance 的调用已被标记为 CreateOrJoin,因此可以从现有事务中调用(例如通过 IAtmGrain.Transfer),或者单独调用它。

namespace TransactionalExample.Abstractions;

public interface IAccountGrain : IGrainWithStringKey
{
    [Transaction(TransactionOption.Join)]
    Task Withdraw(decimal amount);

    [Transaction(TransactionOption.Join)]
    Task Deposit(decimal amount);

    [Transaction(TransactionOption.CreateOrJoin)]
    Task<decimal> GetBalance();
}

重要注意事项

不能将 OnActivateAsync 标记为事务性,因为任何此类调用都需要在调用之前进行适当的设置。 它仅适用于颗粒应用 API。 这意味着尝试读取事务状态作为这些方法的一部分会在运行时引发异常。

粒度实现

粒度实现需要使用 ITransactionalState<TState> Facet 通过 ACID 事务管理粒度状态。

public interface ITransactionalState<TState>
    where TState : class, new()
{
    Task<TResult> PerformRead<TResult>(
        Func<TState, TResult> readFunction);

    Task<TResult> PerformUpdate<TResult>(
        Func<TState, TResult> updateFunction);
}

通过传递给事务状态方面的同步函数执行对持久状态的所有读取或写入访问。 这样,事务系统就可以以事务方式执行或取消这些操作。 若要在粒度中使用事务状态,请定义要持久保存的可序列化状态类,并使用 a 在 TransactionalStateAttribute粒度的构造函数中声明事务状态。 此属性声明状态名称和(可选)要使用的事务状态存储。 有关详细信息,请参阅 安装程序

[AttributeUsage(AttributeTargets.Parameter)]
public class TransactionalStateAttribute : Attribute
{
    public TransactionalStateAttribute(string stateName, string storageName = null)
    {
        // ...
    }
}

例如, Balance 状态对象的定义如下:

namespace TransactionalExample.Abstractions;

[GenerateSerializer]
public record class Balance
{
    [Id(0)]
    public decimal Value { get; set; } = 1_000;
}

上述状态对象:

  • 使用 GenerateSerializerAttribute 来指示 Orleans 代码生成器生成序列化程序。
  • 具有用 IdAttribute 修饰以唯一标识成员的 Value 属性。

然后,在实现中如下所示地使用状态 Balance 对象 AccountGrain

namespace TransactionalExample.Grains;

[Reentrant]
public class AccountGrain : Grain, IAccountGrain
{
    private readonly ITransactionalState<Balance> _balance;

    public AccountGrain(
        [TransactionalState(nameof(balance))]
        ITransactionalState<Balance> balance) =>
        _balance = balance ?? throw new ArgumentNullException(nameof(balance));

    public Task Deposit(decimal amount) =>
        _balance.PerformUpdate(
            balance => balance.Value += amount);

    public Task Withdraw(decimal amount) =>
        _balance.PerformUpdate(balance =>
        {
            if (balance.Value < amount)
            {
                throw new InvalidOperationException(
                    $"Withdrawing {amount} credits from account " +
                    $"\"{this.GetPrimaryKeyString()}\" would overdraw it." +
                    $" This account has {balance.Value} credits.");
            }

            balance.Value -= amount;
        });

    public Task<decimal> GetBalance() =>
        _balance.PerformRead(balance => balance.Value);
}

重要

事务粒度必须标有 ReentrantAttribute ,以确保事务上下文正确传递给粒度调用。

在前面的示例中,TransactionalStateAttribute 声明 balance 构造函数参数应与名为 "balance" 的事务状态相关联。 使用此声明,Orleans 注入一个 ITransactionalState<TState> 实例,该实例从名为 "TransactionStore" 的事务状态存储中加载状态。 可以通过PerformUpdate修改状态,或者通过PerformRead读取状态。 事务基础结构确保作为事务一部分执行的任何此类更改(即使是在多个跨Orleans群集分布的粒度之间)在完成创建事务的粒度调用IAtmGrain.Transfer后,要么全部提交要么全部撤销。

从客户端调用事务方法

调用事务粒方法的推荐方式是使用ITransactionClient。 配置Orleans客户端时,Orleans会自动向依赖注入服务提供器注册ITransactionClient。 使用ITransactionClient创建事务上下文,并在该上下文中调用事务性粒方法。 以下示例演示如何使用ITransactionClient来调用事务粒方法。

using IHost host = Host.CreateDefaultBuilder(args)
    .UseOrleansClient((_, client) =>
    {
        client.UseLocalhostClustering()
            .UseTransactions();
    })
    .Build();

await host.StartAsync();

var client = host.Services.GetRequiredService<IClusterClient>();
var transactionClient= host.Services.GetRequiredService<ITransactionClient>();

var accountNames = new[] { "Xaawo", "Pasqualino", "Derick", "Ida", "Stacy", "Xiao" };
var random = Random.Shared;

while (!Console.KeyAvailable)
{
    // Choose some random accounts to exchange money
    var fromIndex = random.Next(accountNames.Length);
    var toIndex = random.Next(accountNames.Length);
    while (toIndex == fromIndex)
    {
        // Avoid transferring to/from the same account, since it would be meaningless
        toIndex = (toIndex + 1) % accountNames.Length;
    }

    var fromKey = accountNames[fromIndex];
    var toKey = accountNames[toIndex];
    var fromAccount = client.GetGrain<IAccountGrain>(fromKey);
    var toAccount = client.GetGrain<IAccountGrain>(toKey);

    // Perform the transfer and query the results
    try
    {
        var transferAmount = random.Next(200);

        await transactionClient.RunTransaction(
            TransactionOption.Create, 
            async () =>
            {
                await fromAccount.Withdraw(transferAmount);
                await toAccount.Deposit(transferAmount);
            });

        var fromBalance = await fromAccount.GetBalance();
        var toBalance = await toAccount.GetBalance();

        Console.WriteLine(
            $"We transferred {transferAmount} credits from {fromKey} to " +
            $"{toKey}.\n{fromKey} balance: {fromBalance}\n{toKey} balance: {toBalance}\n");
    }
    catch (Exception exception)
    {
        Console.WriteLine(
            $"Error transferring credits from " +
            $"{fromKey} to {toKey}: {exception.Message}");

        if (exception.InnerException is { } inner)
        {
            Console.WriteLine($"\tInnerException: {inner.Message}\n");
        }

        Console.WriteLine();
    }

    // Sleep and run again
    await Task.Delay(TimeSpan.FromMilliseconds(200));
}

在前面的客户端代码中:

  • IHostBuilder 被配置为 UseOrleansClient.
    • 使用 IClientBuilder 本地主机群集和事务。
  • 从服务提供商处检索 IClusterClient 接口和 ITransactionClient 接口。
  • fromto变量被分配了它们各自的IAccountGrain引用。
  • 这个 ITransactionClient 用于创建事务并进行调用:
    • Withdraw from帐户粒度引用。
    • Deposit to帐户粒度引用。

除非在transactionDelegate中抛出异常或指定了矛盾的transactionOption,否则事务始终提交。 虽然建议使用 ITransactionClient 此方法调用事务粒度方法,但也可以直接从其他粒度调用它们。

从另一粒度调用事务方法

像任何其他粒度方法一样,在粒度接口上调用事务方法。 使用ITransactionClient的替代方案是,在IAccountGrain接口上调用下面实现中的Transfer方法(即事务性)。

请考虑 AtmGrain 的实现,它解析两个引用的帐户片段,并发出对 WithdrawDeposit 的相应调用。

namespace TransactionalExample.Grains;

[StatelessWorker]
public class AtmGrain : Grain, IAtmGrain
{
    public Task Transfer(
        string fromId,
        string toId,
        decimal amount) =>
        Task.WhenAll(
            GrainFactory.GetGrain<IAccountGrain>(fromId).Withdraw(amount),
            GrainFactory.GetGrain<IAccountGrain>(toId).Deposit(amount));
}

客户端应用代码可以按如下所示以事务方式调用 AtmGrain.Transfer

IAtmGrain atmOne = client.GetGrain<IAtmGrain>(0);

Guid from = Guid.NewGuid();
Guid to = Guid.NewGuid();

await atmOne.Transfer(from, to, 100);

uint fromBalance = await client.GetGrain<IAccountGrain>(from).GetBalance();
uint toBalance = await client.GetGrain<IAccountGrain>(to).GetBalance();

在前面的调用中,一个 IAtmGrain 用于将 100 个货币单位从一个帐户转移到另一个帐户。 转移完成后,将查询这两个帐户以获取其当前余额。 货币转移以及两个帐户查询都作为 ACID 交易执行。

如前面的示例所示,事务可以在类似于其他粒度调用的Task中返回值。 但是,在调用失败时,它们不会引发应用程序异常,而是引发OrleansTransactionExceptionTimeoutException。 如果应用程序在事务期间引发异常,并且该异常导致事务失败(而不是由于其他系统故障而失败),则应用程序异常将成为内部异常 OrleansTransactionException

如果抛出OrleansTransactionAbortedException类型的事务异常,则事务失败,可以重试。 引发的任何其他异常都表示事务以未知状态终止。 由于事务是分布式作,因此处于未知状态的事务可能已成功、失败或仍在进行中。 因此,建议在验证状态或重试操作之前,让调用超时期限(SiloMessagingOptions.SystemResponseTimeout)过去,以避免级联中止。