本指南介绍设置和使用 Orleans Streams 的快速方法。 若要详细了解流式处理功能的详细信息,请阅读本文档的其他部分。
所需的配置
在本指南中,您将使用基于内存的流,该流通过粒度消息将数据发送给订阅者。 使用内存中存储提供程序存储订阅列表。 使用基于内存的机制进行流式处理和存储仅适用于本地开发和测试,而不适用于生产环境。
在筒仓上,silo
是一个 ISiloBuilder 时,调用 AddMemoryStreams:
silo.AddMemoryStreams("StreamProvider")
.AddMemoryGrainStorage("PubSubStore");
在群集客户端上,其中 client
是 IClientBuilder,调用 AddMemoryStreams。
client.AddMemoryStreams("StreamProvider");
本指南使用一种简单的消息传递流,这种流使用精细粒度的消息传递技术将流数据发送给订阅者。 使用内存中存储提供程序存储订阅列表;这不是实际生产应用程序的明智选择。
在筒仓里, hostBuilder
是一个 ISiloHostBuilder
,请调用 AddSimpleMessageStreamProvider:
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
.AddMemoryGrainStorage("PubSubStore");
在群集客户端上,其中 clientBuilder
是一个 IClientBuilder
,调用 AddSimpleMessageStreamProvider。
clientBuilder.AddSimpleMessageStreamProvider("SMSProvider");
注释
默认情况下,通过简单消息流传递的消息被视为不可变,可以通过引用其他粒度传递。 若要关闭此行为,请将 SMS 提供程序配置为关闭 SimpleMessageStreamProviderOptions.OptimizeForImmutableData。
siloBuilder
.AddSimpleMessageStreamProvider(
"SMSProvider",
options => options.OptimizeForImmutableData = false);
可以创建流、使用流作为生成者发送数据,以及以订阅者身份接收数据。
生成事件
为流生成事件相对容易。 首先,获取对前面配置中定义的流提供程序的访问权限("StreamProvider"
),然后选择一个流并将数据推送到该流。
// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);
为流创建事件相对容易。 首先,获取对前面配置中定义的流提供程序的访问权限("SMSProvider"
),然后选择一个流并将数据推送到该流。
// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");
可以看到,流具有 GUID 和命名空间。 这样可以轻松识别独特的数据流。 例如,聊天室的命名空间可以是“房间”,GUID 可以是拥有 RoomGrain
的 GUID。
在这里,使用已知聊天室的 GUID。 使用OnNextAsync
方法,将数据推送到流中。 让我们在计时器内使用随机数来完成这一操作。 你也可以对数据流使用任何其他数据类型。
RegisterTimer(_ =>
{
return stream.OnNextAsync(Random.Shared.Next());
},
null,
TimeSpan.FromMilliseconds(1_000),
TimeSpan.FromMilliseconds(1_000));
订阅和接收流数据
若要接收数据,可以使用显式订阅和隐式订阅,在 显式订阅和隐式订阅中更详细地介绍。 此示例使用更轻松的隐式订阅。 当粒度类型想要隐式订阅流时,它使用属性 [ImplicitStreamSubscription(namespace)]
。
对于你的情况,请定义 ReceiverGrain
如下所示:
[ImplicitStreamSubscription("RANDOMDATA")]
public class ReceiverGrain : Grain, IRandomReceiver
每当数据推送到命名空间中的
为此,请通过设置 OnNextAsync
接收数据的方法来完成订阅过程。 为实现此目标,ReceiverGrain
应在其 OnActivateAsync
中调用类似如下所示的内容:
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();
// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);
// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
async (data, token) =>
{
Console.WriteLine(data);
await Task.CompletedTask;
});
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();
// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");
// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
async (data, token) =>
{
Console.WriteLine(data);
await Task.CompletedTask;
});
你准备好了! 现在,唯一的要求是一些触发生产者粮食的创建。 然后,它会注册计时器,并开始向所有相关方发送随机整数。
同样,本指南会跳过许多详细信息,并且仅提供高级概述。 阅读 Rx 上的此手册和其他资源的其他部分,了解可用内容及其工作原理。
反应式编程是解决许多问题的强大方法。 例如,可以在订阅者中使用 LINQ 来筛选数字并执行各种有趣的操作。