有时,简单的消息/响应模式是不够的,客户端需要接收异步通知。 例如,当用户发布新的即时消息时,用户可能需要通知。
客户端观察者是一种允许异步通知客户端的机制。 观察程序接口必须继承自IGrainObserver,并且所有方法都必须返回void
、Task、Task<TResult>或ValueTaskValueTask<TResult>。 我们不推荐使用返回类型 void
,因为这可能会促使在实现中使用 async void
。 这是一种危险的模式,因为它可能会导致应用程序崩溃(如果从方法引发异常)。 相反,对于尽力而为的通知场景,请考虑将 OneWayAttribute 应用到观察者的接口方法。 这会导致接收方不发送方法调用的响应,并在调用站点立即返回该方法,而无需等待观察者的响应。 粒度通过像任何粒度接口方法一样调用观察程序的方法。 运行时 Orleans 可确保请求和响应的传递。 观察者模式的常见用例是当应用程序中发生 Orleans 事件时,让客户端接收通知。 提供此类通知的模块应提供一个 API 来添加或删除观察者。 此外,公开允许取消现有订阅的方法通常很方便。
可以使用实用工具类 ObserverManager<TObserver> 来简化观察到的粒度类型的开发。 与在故障后可以自动根据需要重新激活的“grains”不同,客户端不具备容错能力:出现故障的客户端可能永远无法恢复。 因此,该 ObserverManager<T>
实用工具在配置的持续时间后删除订阅。 活跃客户端应定期重新订阅,以保持其订阅的活跃状态。
若要订阅通知,客户端必须先创建实现观察程序接口的本地对象。 然后,它调用观察者工厂 CreateObjectReference 的方法,将对象转换为谷物引用。 然后,可以将此引用传递给通知单元上的订阅方法。
其他粒度还可以使用此模型来接收异步通知。 Grains 可以实现 IGrainObserver 接口。 与客户端订阅情况不同,订阅粒子只是实现观察者接口并传入对自身的引用(例如,this.AsReference<IMyGrainObserverInterface>()
)。 不需要CreateObjectReference()
,因为颗粒已经可寻址。
代码示例
假设你有一个定期向客户发送消息的微服务。 为简单起见,示例中的消息是一个字符串。 首先,定义接收消息的客户端上的接口。
接口如下所示:
public interface IChat : IGrainObserver
{
Task ReceiveMessage(string message);
}
唯一的特别要求是接口必须继承自 IGrainObserver
。
现在,任何想要观察这些消息的客户端都应实现IChat
接口的类。
最简单的情况如下所示:
public class Chat : IChat
{
public Task ReceiveMessage(string message)
{
Console.WriteLine(message);
return Task.CompletedTask;
}
}
在服务器上,接下来应该设置一个Grain,用于将这些聊天消息发送到客户端。 模块还应为客户端提供一种机制来订阅或退订通知。 对于订阅,粒度可以使用实用工具类 ObserverManager<TObserver>的实例。
注释
ObserverManager<TObserver> 是自版本 7.0 以来的 Orleans 一部分。 对于旧版本,可以复制以下 实现 。
class HelloGrain : Grain, IHello
{
private readonly ObserverManager<IChat> _subsManager;
public HelloGrain(ILogger<HelloGrain> logger)
{
_subsManager =
new ObserverManager<IChat>(
TimeSpan.FromMinutes(5), logger);
}
// Clients call this to subscribe.
public Task Subscribe(IChat observer)
{
_subsManager.Subscribe(observer, observer);
return Task.CompletedTask;
}
//Clients use this to unsubscribe and no longer receive messages.
public Task UnSubscribe(IChat observer)
{
_subsManager.Unsubscribe(observer);
return Task.CompletedTask;
}
}
若要向客户端发送消息,请使用 Notify
实例的方法 ObserverManager<IChat>
。 该方法接受一个Action<T>
方法或一个 lambda 表达式(其中T
是IChat
类型)。 可以在接口上调用任何方法,以将其发送到客户端。 在本例中,我们只有一种方法, ReceiveMessage
并且服务器上的发送代码如下所示:
public Task SendUpdateMessage(string message)
{
_subsManager.Notify(s => s.ReceiveMessage(message));
return Task.CompletedTask;
}
现在,我们的服务器有一种方法向观察者客户端发送消息,以及订阅/取消订阅的两种方法。 客户端实现了能够观察粒度消息的类。 最后一步是使用以前实现 Chat
的类在客户端上创建观察者引用,并在订阅后允许它接收消息。
代码如下所示:
//First create the grain reference
var friend = _grainFactory.GetGrain<IHello>(0);
Chat c = new Chat();
//Create a reference for chat, usable for subscribing to the observable grain.
var obj = _grainFactory.CreateObjectReference<IChat>(c);
//Subscribe the instance to receive messages.
await friend.Subscribe(obj);
现在,每当服务器程序调用该方法 SendUpdateMessage
时,所有订阅的客户端都会收到消息。 在我们的客户端代码中, Chat
变量 c
中的实例接收消息并将其输出到控制台。
重要
传递给 CreateObjectReference
的对象通过 WeakReference<T> 保存,因此,如果没有其他引用,则会进行垃圾回收。
应为每个不希望收集的观察者保留引用。
注释
观察程序本质上是不可靠的,因为托管观察者的客户端可能会失败,并且恢复后创建的观察程序具有不同的(随机化)标识。 ObserverManager<TObserver> 依赖于观察者定期重新订阅,如上所述,因此它可以删除非活动的观察者。
执行模型
可以通过调用 IGrainFactory.CreateObjectReference 来注册 IGrainObserver 的实现。 每次调用该方法都会创建指向该实现的新引用。 Orleans 执行发送到每个引用的请求,逐个并完整地完成。 观察者是不可重入的,因此,Orleans 不会将并发请求交错给观察者。 如果多个观察者同时接收请求,则这些请求可以并行执行。 属性(例如 AlwaysInterleaveAttribute 或 ReentrantAttribute)不影响观察者方法的执行;执行模型无法自定义。