Orleans 流媒体快速入门

本指南介绍设置和使用 Orleans Streams 的快速方法。 若要详细了解流式处理功能的详细信息,请阅读本文档的其他部分。

所需的配置

在本指南中,您将使用基于内存的流,该流通过粒度消息将数据发送给订阅者。 使用内存中存储提供程序存储订阅列表。 使用基于内存的机制进行流式处理和存储仅适用于本地开发和测试,而不适用于生产环境。

在筒仓上,silo 是一个 ISiloBuilder 时,调用 AddMemoryStreams:

silo.AddMemoryStreams("StreamProvider")
    .AddMemoryGrainStorage("PubSubStore");

在群集客户端上,其中 clientIClientBuilder,调用 AddMemoryStreams

client.AddMemoryStreams("StreamProvider");

本指南使用一种简单的消息传递流,这种流使用精细粒度的消息传递技术将流数据发送给订阅者。 使用内存中存储提供程序存储订阅列表;这不是实际生产应用程序的明智选择。

在筒仓里, hostBuilder 是一个 ISiloHostBuilder,请调用 AddSimpleMessageStreamProvider:

hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
           .AddMemoryGrainStorage("PubSubStore");

在群集客户端上,其中 clientBuilder 是一个 IClientBuilder,调用 AddSimpleMessageStreamProvider

clientBuilder.AddSimpleMessageStreamProvider("SMSProvider");

注释

默认情况下,通过简单消息流传递的消息被视为不可变,可以通过引用其他粒度传递。 若要关闭此行为,请将 SMS 提供程序配置为关闭 SimpleMessageStreamProviderOptions.OptimizeForImmutableData

siloBuilder
    .AddSimpleMessageStreamProvider(
        "SMSProvider",
        options => options.OptimizeForImmutableData = false);

可以创建流、使用流作为生成者发送数据,以及以订阅者身份接收数据。

生成事件

为流生成事件相对容易。 首先,获取对前面配置中定义的流提供程序的访问权限("StreamProvider"),然后选择一个流并将数据推送到该流。

// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);

为流创建事件相对容易。 首先,获取对前面配置中定义的流提供程序的访问权限("SMSProvider"),然后选择一个流并将数据推送到该流。

// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");

可以看到,流具有 GUID 和命名空间。 这样可以轻松识别独特的数据流。 例如,聊天室的命名空间可以是“房间”,GUID 可以是拥有 RoomGrain的 GUID。

在这里,使用已知聊天室的 GUID。 使用OnNextAsync方法,将数据推送到流中。 让我们在计时器内使用随机数来完成这一操作。 你也可以对数据流使用任何其他数据类型。

RegisterTimer(_ =>
{
    return stream.OnNextAsync(Random.Shared.Next());
},
null,
TimeSpan.FromMilliseconds(1_000),
TimeSpan.FromMilliseconds(1_000));

订阅和接收流数据

若要接收数据,可以使用显式订阅和隐式订阅,在 显式订阅和隐式订阅中更详细地介绍。 此示例使用更轻松的隐式订阅。 当粒度类型想要隐式订阅流时,它使用属性 [ImplicitStreamSubscription(namespace)]

对于你的情况,请定义 ReceiverGrain 如下所示:

[ImplicitStreamSubscription("RANDOMDATA")]
public class ReceiverGrain : Grain, IRandomReceiver

每当数据推送到命名空间中的流中时(如计时器示例),与流具有相同< c2 /> 的 类型的粒度将接收消息。 即使当前不存在粒度的激活,运行时也会自动创建一个新粒度,并向其中发送消息。

为此,请通过设置 OnNextAsync 接收数据的方法来完成订阅过程。 为实现此目标,ReceiverGrain 应在其 OnActivateAsync 中调用类似如下所示的内容:

// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();

// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("StreamProvider");

// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);

// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
    async (data, token) =>
    {
        Console.WriteLine(data);
        await Task.CompletedTask;
    });
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();

// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("SMSProvider");

// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");

// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
    async (data, token) =>
    {
        Console.WriteLine(data);
        await Task.CompletedTask;
    });

你准备好了! 现在,唯一的要求是一些触发生产者粮食的创建。 然后,它会注册计时器,并开始向所有相关方发送随机整数。

同样,本指南会跳过许多详细信息,并且仅提供高级概述。 阅读 Rx 上的此手册和其他资源的其他部分,了解可用内容及其工作原理。

反应式编程是解决许多问题的强大方法。 例如,可以在订阅者中使用 LINQ 来筛选数字并执行各种有趣的操作。

另请参阅

Orleans 流编程 API