Orleans 流式处理 API

应用程序通过 API 与流交互,这与 .NET 中已知的反应扩展 (Rx) 非常相似。 主要区别是 Orleans 流扩展是 异步 的,从而在 Orleans 的分布式和可扩展的计算结构中提升处理效率。

异步流

首先使用 流提供程序 获取流的句柄。 可以将流提供程序视为流工厂,允许实现者自定义流的行为和语义。

IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
StreamId streamId = StreamId.Create("MyStreamNamespace", Guid);
IAsyncStream<T> stream = streamProvider.GetStream<T>(streamId);
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
IAsyncStream<T> stream = streamProvider.GetStream<T>(Guid, "MyStreamNamespace");

您可以在粒度内通过调用Grain.GetStreamProvider方法,或者在客户端实例上调用GetStreamProvider方法来获取对流提供程序的引用。

Orleans.Streams.IAsyncStream<T> 是一种针对虚拟流的逻辑、强类型句柄,本质上类似于 Orleans 粒度引用。 对 GetStreamProviderGetStream 的调用纯粹在本地进行。 GetStream需要的参数是一个 GUID 和一个名为流命名空间的额外字符串(可能为 null)。 GUID 和命名空间字符串共同构成流的标识(类似于 IGrainFactory.GetGrain 的参数)。 这种组合在确定流标识方面提供了额外的灵活性。 就像类型中 PlayerGrain 可能存在粒度 7 一样,类型中 ChatRoomGrain 可能存在不同的粒度 7,流 123 可以存在于命名空间中 PlayerEventsStream ,并且命名空间中 ChatRoomMessagesStream 可以存在不同的流 123。

生成和使用

IAsyncStream<T> 实现 IAsyncObserver<T>IAsyncObservable<T> 接口。 这样,您的应用程序可以使用流来生成新事件IAsyncObserver<T>,或订阅并使用事件IAsyncObservable<T>

public interface IAsyncObserver<in T>
{
    Task OnNextAsync(T item, StreamSequenceToken token = null);
    Task OnCompletedAsync();
    Task OnErrorAsync(Exception ex);
}

public interface IAsyncObservable<T>
{
    Task<StreamSubscriptionHandle<T>> SubscribeAsync(IAsyncObserver<T> observer);
}

若要将事件生成到流中,应用程序将调用:

await stream.OnNextAsync<T>(event)

若要订阅流,应用程序将调用:

StreamSubscriptionHandle<T> subscriptionHandle = await stream.SubscribeAsync(IAsyncObserver)

要的参数 SubscribeAsync 可以是实现 IAsyncObserver<T> 接口的对象,也可以是 lambda 函数的组合来处理传入事件。 可以通过AsyncObservableExtensions类获得SubscribeAsync的更多选项。 SubscribeAsync 返回一个 StreamSubscriptionHandle<T>,一个不透明句柄,用于取消订阅流(类似于异步版本的 IDisposable)。

await subscriptionHandle.UnsubscribeAsync()

请务必注意,订阅适用于粒度,而不是用于激活。 一旦粒度代码订阅流,此订阅将超过此激活的生存期,并永久保持持久状态,直到粒度代码(可能在不同的激活中)显式取消订阅。 这是虚拟流抽象的核心:不仅所有流在逻辑上始终存在,而且流订阅是持久的,能够在创建它的特定物理激活结束后继续存在。

多重性

流 Orleans 可以有多个生成者和多个使用者。 生成者发布的消息将传递到发布消息之前订阅流的所有使用者。

此外,消费者可以多次订阅同一视频流。 每次订阅时,它都会返回一个唯一的 StreamSubscriptionHandle<T>。 如果数据粒(或客户端)订阅同一流 X 次,它会为每个订阅接收相同的事件 X 次。 使用者还可以取消单个订阅。 您可以通过以下方法找到所有当前的订阅:

IList<StreamSubscriptionHandle<T>> allMyHandles =
    await IAsyncStream<T>.GetAllSubscriptionHandles();

从故障中恢复

如果流生成者死去(或其粒流被停用),则无需执行任何操作。 下次这个粒子想要生成更多事件时,它可以再次获取流句柄并正常地产生新事件。

消费者逻辑稍微更复杂。 如前所述,一旦消费者粒度订阅某个流,该订阅将一直有效,直到粒度显式退订。 如果流使用者死亡(或其粒度已停用),并在流上生成新事件,则使用者粒度会自动重新激活(就像在向流发送消息时自动激活任何常规 Orleans 粒度一样)。 zh-CN: 现在,粒度代码唯一需要做的是提供一个 IAsyncObserver<T> 来处理数据。 使用者需要重新附加处理逻辑作为 OnActivateAsync() 方法的一部分。 为此,它可以调用:

StreamSubscriptionHandle<int> newHandle =
    await subscriptionHandle.ResumeAsync(IAsyncObserver);

使用者使用在初始订阅期间获取的上一个句柄来继续处理。 请注意, ResumeAsync 仅使用新逻辑实例 IAsyncObserver 更新现有订阅,并且不会更改此使用者已订阅此流的事实。

消费者如何获取旧的 subscriptionHandle? 共有两个选项。 用户可能已保留从原始 SubscribeAsync 操作返回的句柄,现在可以使用它。 或者,如果消费者没有句柄,可以通过调用 IAsyncStream<T> 来请求其所有活动订阅句柄。

IList<StreamSubscriptionHandle<T>> allMyHandles =
    await IAsyncStream<T>.GetAllSubscriptionHandles();

然后,使用者可以恢复所有这些应用,或者根据需要取消订阅。

提示

如果使用者粒度直接实现 IAsyncObserver<T> 接口(public class MyGrain<T> : Grain, IAsyncObserver<T>),则从理论上讲,它不应该重新附加 IAsyncObserver 接口,因此不需要调用 ResumeAsync。 流式处理运行时应自动确定粒度已实现 IAsyncObserver 并调用这些 IAsyncObserver 方法。 但是,流式处理运行时当前不支持此功能,并且粒度代码仍需要显式调用 ResumeAsync,即使粒度直接实现了 IAsyncObserver 也是如此。

显式和隐式订阅

默认情况下,流消费者必须显式订阅流。 此订阅通常由粒子(或客户端)接收到的外部消息触发,该消息指示其进行订阅。 例如,在聊天服务中,当用户加入聊天室时,其粒会收到一条包含聊天名称的JoinChatGroup消息,使用户粒订阅到这个聊天流。

此外, Orleans 流支持 隐式订阅。 在此模型中,粒度不会显式订阅。 它是根据其粒度标识和ImplicitStreamSubscriptionAttribute自动且隐式订阅的。 隐式订阅的主要价值是允许流活动自动触发粒子激活,从而引发订阅。 例如,如果使用短信流,一个生产单元想要生成一个流,而另一个消费单元处理它,则生产单元需要获取消费单元的身份,并进行一个组件调用,要求其进行订阅。 只有这样,它才能开始发送事件。 相反,借助隐式订阅,生成者可以直接向流中生成事件,而使用者粒度会自动激活并订阅。 在这种情况下,生成者不需要知道谁正在读取事件。

粒度实现 MyGrainType 可以声明 [ImplicitStreamSubscription("MyStreamNamespace")] 属性。 当事件在具有GUID标识符为XXX和命名空间"MyStreamNamespace"的流中生成时,就会告知流运行时应将其传递给类型为MyGrainType且标识为XXX的粒子。 即,运行时将流 <XXX, MyStreamNamespace> 映射到使用者 grain <XXX, MyGrainType>

ImplicitStreamSubscription导致流式处理运行时自动订阅此粒度并为其传送流事件。 但是,粒度代码仍需要告知运行时它希望如何处理事件。 本质上,它需要附加 IAsyncObserver。 因此,当粒度激活时,内部 OnActivateAsync 的粒度代码需要调用:

IStreamProvider streamProvider =
    base.GetStreamProvider("SimpleStreamProvider");

StreamId streamId =
    StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
IAsyncStream<T> stream =
    streamProvider.GetStream<T>(streamId);

StreamSubscriptionHandle<T> subscription =
    await stream.SubscribeAsync(IAsyncObserver<T>);
IStreamProvider streamProvider =
    base.GetStreamProvider("SimpleStreamProvider");

IAsyncStream<T> stream =
    streamProvider.GetStream<T>(this.GetPrimaryKey(), "MyStreamNamespace");

StreamSubscriptionHandle<T> subscription =
    await stream.SubscribeAsync(IAsyncObserver<T>);

编写订阅逻辑

下面是针对各种情况编写订阅逻辑的准则:显式订阅和隐式订阅、可回退流和非可回退流。 隐式订阅与显式订阅的主要区别在于,对于隐式订阅,每个流命名空间中的晶粒始终只有一个隐式订阅。 无法创建多个订阅(不存在订阅的多重性),也无法取消订阅,而粒度逻辑只需要附加处理逻辑即可。 这也意味着永远不需要恢复隐式订阅。 另一方面,对于显式订阅,需要重新启用订阅;否则,再次订阅会导致同一项被多次订阅。

隐式订阅:

对于隐式订阅,grain 仍需要订阅以附加处理逻辑。 可以通过实现 IStreamSubscriptionObserverIAsyncObserver<T> 接口在使用者粒度中执行此作,从而允许粒度与订阅分开激活。 要订阅流,粒度会创建一个句柄,并在其 await handle.ResumeAsync(this) 方法中调用 OnSubscribed(...)

若要处理消息,请实现 IAsyncObserver<T>.OnNextAsync(...) 接收流数据和序列令牌的方法。 或者,ResumeAsync 方法可以采用一组代表 IAsyncObserver<T> 接口方法的委托:onNextAsynconErrorAsynconCompletedAsync

public Task OnNextAsync(string item, StreamSequenceToken? token = null)
{
    _logger.LogInformation($"Received an item from the stream: {item}");
}

public async Task OnSubscribed(IStreamSubscriptionHandleFactory handleFactory)
{
    var handle = handleFactory.Create<string>();
    await handle.ResumeAsync(this);
}
public override async Task OnActivateAsync()
{
    var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
    var stream =
        streamProvider.GetStream<string>(
            this.GetPrimaryKey(), "MyStreamNamespace");

    await stream.SubscribeAsync(OnNextAsync);
}

显式订阅:

对于显式订阅,grain 必须调用 SubscribeAsync 来订阅流。 这样可以创建一个订阅并绑定处理逻辑。 显式订阅将一直存在,直到 grain 取消订阅。 如果一个粒子停用并重新激活,它仍然是显式订阅的,但没有附加任何处理逻辑。 在这种情况下,grain 需要重新附加处理逻辑。 为此,在其 OnActivateAsync中,谷物首先需要通过调用 IAsyncStream<T>.GetAllSubscriptionHandles()来找出其订阅。 粒度在希望继续处理的每个句柄上必须执行 ResumeAsync ,或者在完成处理的句柄上执行 UnsubscribeAsync 。 还可以选择将粒度指定 StreamSequenceToken 为调用的参数 ResumeAsync ,从而导致此显式订阅开始从该令牌使用。

public async override Task OnActivateAsync(CancellationToken cancellationToken)
{
    var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
    var streamId = StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
    var stream = streamProvider.GetStream<string>(streamId);

    var subscriptionHandles = await stream.GetAllSubscriptionHandles();
    foreach (var handle in subscriptionHandles)
    {
       await handle.ResumeAsync(this);
    }
}
public async override Task OnActivateAsync()
{
    var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
    var stream =
        streamProvider.GetStream<string>(this.GetPrimaryKey(), "MyStreamNamespace");

    var subscriptionHandles = await stream.GetAllSubscriptionHandles();
    if (!subscriptionHandles.IsNullOrEmpty())
    {
        subscriptionHandles.ForEach(
            async x => await x.ResumeAsync(OnNextAsync));
    }
}

流顺序和序列标记

单个生成者和使用者之间的事件传送顺序取决于流提供程序。

使用 SMS,生成者通过控制事件发布方式来显式控制使用者看到的事件顺序。 默认情况下(如果 SimpleMessageStreamProviderOptions.FireAndForgetDelivery SMS 提供程序的选项为 false),如果生成者等待每次 OnNextAsync 调用,事件将按 FIFO 顺序到达。 在 SMS 中,生产者决定如何处理由 Task 调用返回的失败 OnNextAsync 指示的传递中断。

Azure 队列流不能保证 FIFO 顺序,因为基础 Azure 队列在失败情况下不能保证顺序(尽管它们确实保证 FIFO 在无故障执行中的顺序)。 当生产者将事件放入 Azure 队列时,如果队列操作失败,生产者必须尝试操作另一个队列,并随后处理可能出现的重复消息。 在传递端,Orleans 流式处理运行时会将事件出队,并尝试将其传递给消费者进行处理。 仅当成功处理时,运行时才会从队列中删除该事件。 如果传递或处理失败,则不会从队列中删除该事件,以后会自动重新出现。 流式处理运行时尝试再次交付它,这可能会中断 FIFO 订单。 此行为与 Azure 队列的正常语义匹配。

应用程序定义的顺序:若要处理上述排序问题,应用程序可以选择指定其排序。 使用不透明IComparable对象(StreamSequenceToken用于对事件进行排序)来实现此目的。 生成者可以将可选的 StreamSequenceToken 传递给 OnNextAsync 调用。 这StreamSequenceToken传递给使用者,并随事件一同传递。 这样,您的应用程序可以独立于流式运行时环境进行推理,并重新构造其顺序。

可倒退流

某些流仅允许从最新时间点开始订阅,而另一些流则允许“及时返回”。此功能取决于基础队列技术和特定的流提供程序。 例如,Azure 队列服务仅允许使用最新的排队事件,而事件中心服务则允许从任意时间点重播事件(最长直到某个过期时间)。 支持及时回退的 流称为可回退流

可倒退流的使用者可将 StreamSequenceToken 传递给 SubscribeAsync 调用。 运行时提供从该 StreamSequenceToken开始的事件。 null 标记表示使用者希望从最新时间点开始接收事件。

在恢复方案中,倒退流的功能非常有用。 例如,考虑订阅流的粒度,并定期检查其状态以及最新的序列令牌。 从故障中恢复时,粒度可以从最新的检查点序列令牌重新订阅同一流,从而恢复,而不会丢失自上次检查点以来生成的任何事件。

事件中心提供程序可回溯。 可以在 GitHub: Orleans/Azure/Orleans.Streaming.EventHubs 上找到其代码。 短信(现在为 广播频道)和 Azure 队列 提供程序是不可回绕的。

无状态自动横向扩展处理

默认情况下, Orleans 流式处理目标支持大量相对较小的流,每个流由一个或多个有状态粒度处理。 统一地说,所有流的处理都是在许多常规(有状态)粒度之间分片的。 应用程序代码通过分配流 ID 和粒 ID 以及进行显式订阅来控制此分片。 目标是实现分片式有状态处理。

但是,还有一个有趣的场景,即自动横向扩展无状态处理。 在此方案中,应用程序具有少量的流(甚至一个大型流),目标是无状态处理。 例如,一个全局事件流,其中处理涉及解码每个事件,并可能将其转发到其他流,以便进一步有状态处理。 可以通过粒度支持OrleansStatelessWorkerAttribute无状态横向扩展流处理。

无状态自动横向扩展处理的当前状态: 这一功能尚未实现。 尝试从 StatelessWorker 粒度订阅流会导致未定义的行为。 我们正在考虑支持此选项

Grain 和 Orleans 客户端

Orleans 流在 grain 和 Orleans 客户端之间以统一的方式工作。 这意味着您可以在粒和Orleans客户端中使用相同的API来生成和消费事件。 这大大简化了应用程序逻辑,使特定的客户端 API(如 Grain 观察者)变得多余。

完全托管的可靠流订阅-发布

为了跟踪流订阅,Orleans 使用一个名为 Streaming Pub-Sub 的运行时组件,该组件充当流消费者和生成者的汇合点。 Pub-sub 跟踪所有流订阅,保留这些订阅,并与流生成者匹配流使用者。

应用程序可以选择订阅-发布数据的存储位置和方式。 订阅-发布组件本身实现为 grain(称为 PubSubRendezvousGrain),这些 grain 使用 Orleans 声明持久性。 PubSubRendezvousGrain 使用名为 PubSubStore 的存储提供程序。 与任何 grain 一样,可为存储提供程序指定实现。 对于流式处理的 Pub-Sub,您可以在构建 silo 时使用 silo 主机生成器更改 PubSubStore 的实现:

以下配置 Pub-Sub 以将其状态存储在 Azure 表中。

hostBuilder.AddAzureTableGrainStorage("PubSubStore",
    options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddAzureTableGrainStorage("PubSubStore",
    options => options.ConnectionString = "<Secret>");

这样,Pub-Sub 数据将持久存储在 Azure 表中。 对于初始开发,也可以使用内存存储。 除了订阅-发布以外,Orleans 流运行时还会将事件从生成者传递给使用者,管理分配给活跃使用的流的所有运行时资源,并以透明方式从未使用的流中回收运行时资源。

配置

要使用流,您需要通过土仓主机或集群客户端生成器启用流提供程序。 示例流提供程序设置:

hostBuilder.AddMemoryStreams("StreamProvider")
    .AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
        optionsBuilder => optionsBuilder.Configure(
            options => options.ConfigureTableServiceClient("<Secret>")))
    .AddAzureTableGrainStorage("PubSubStore",
        options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
    .AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
        optionsBuilder => optionsBuilder.Configure(
            options => options.ConnectionString = "<Secret>"))
    .AddAzureTableGrainStorage("PubSubStore",
        options => options.ConnectionString = "<Secret>");

另请参阅

Orleans 流提供程序