客户端允许非粒度代码与 Orleans 群集交互。 客户端使应用程序代码能够与群集中托管的粒度和流进行通信。 有两种方法可以获取客户端,具体取决于托管客户端代码的位置:在与孤岛相同的进程中或在单独的进程中。 本文讨论这两个选项,从建议的方法开始:在与粒度代码相同的进程中共同托管客户端代码。
共同托管的客户端
如果在与粒度代码相同的进程中托管客户端代码,可以直接从托管应用程序的依赖项注入容器获取客户端。 在这种情况下,客户端直接与它连接到的独立存储通信,并可以利用该独立存储对集群的额外知识。
此方法提供了多种优势,包括减少网络和 CPU 开销、降低延迟以及提高吞吐量和可靠性。 客户端使用Silo对集群拓扑和状态的了解,无需单独的网关。 这可以避免网络跃点和序列化/反序列化往返,从而通过最大程度地减少客户端和粒度之间的所需节点数来提高可靠性。 如果该粒子是无状态工作粒子,或者恰好在托管客户端的同一孤岛上被激活,则根本不需要进行序列化或网络通信,从而使客户端能够获得额外的性能和可靠性提升。 共同托管客户端和粒度代码还可以通过无需部署和监视两个不同的应用程序二进制文件来简化部署和应用程序拓扑。
此方法也有缺点,主要是粒度代码不再与客户端进程隔离。 因此,客户端代码中的问题(例如阻止 I/O 或锁争用导致线程饥饿)可能会影响粒度代码性能。 即使没有此类代码缺陷,也可能会发生 干扰邻居 效应,因为客户端代码在与粒度代码相同的处理器上执行,从而给 CPU 缓存带来额外的压力,并增加本地资源的争用。 此外,识别这些问题的来源会变得更加困难,因为监视系统无法从逻辑上区分客户端代码和粒度代码。
尽管存在这些缺点,但使用粒度代码共同托管客户端代码是一种常用的选项,也是大多数应用程序推荐的方法。 由于基于以下几个理由,上述缺点通常很小:
- 客户端代码通常非常 薄 (例如,将传入的 HTTP 请求转换为粒度调用)。 因此, 干扰性邻居 的影响最小,成本与其他所需的网关相当。
- 如果出现性能问题,典型的工作流可能涉及 CPU 探查器和调试器等工具。 这些工具在快速识别问题源方面仍然有效,即使在同一进程中执行客户端代码和粒度代码也是如此。 换句话说,虽然指标变得粗糙,并且无法精确识别问题的源,但更详细的工具仍然有效。
从主机获取客户端
如果使用 .NET 通用主机托管,则客户端会自动在主机的 依赖项注入 容器中可用。 可以将其注入到 ASP.NET 控制器或 IHostedService实现等服务中。
从 ISiloHost 获取客户端接口,或者也可以使用 IGrainFactory 或 IClusterClient。
var client = host.Services.GetService<IClusterClient>();
await client.GetGrain<IMyGrain>(0).Ping();
外部客户端
客户端代码可以在托管粒度代码的 Orleans 群集外部运行。 在这种情况下,外部客户端作为群集和所有应用程序的粒的连接器或通道。 通常,使用前端 Web 服务器上的客户端连接到 Orleans 充当中间层的群集,并执行业务逻辑。
在典型设置中,前端 Web 服务器:
- 接收 Web 请求。
- 执行必要的身份验证和授权验证。
- 确定应处理请求的粒度。
- 使用 Microsoft.Orleans.Client NuGet 包,来对 grain 进行一个或多个方法调用。
- 处理粒度调用的成功完成或失败以及任何返回的值。
- 向 Web 请求发送响应。
初始化粒度客户端
在使用粒度客户端对群集中 Orleans 托管的粒度进行调用之前,需要配置、初始化并连接到群集。
通过 UseOrleansClient 和多个补充选项类提供配置,这些选项类包含用于以编程方式配置客户端的配置属性层次结构。 有关详细信息,请参阅 客户端配置。
请考虑以下客户端配置示例:
// Alternatively, call Host.CreateDefaultBuilder(args) if using the
// Microsoft.Extensions.Hosting NuGet package.
using IHost host = new HostBuilder()
.UseOrleansClient(clientBuilder =>
{
clientBuilder.Configure<ClusterOptions>(options =>
{
options.ClusterId = "my-first-cluster";
options.ServiceId = "MyOrleansService";
});
clientBuilder.UseAzureStorageClustering(
options => options.ConfigureTableServiceClient(connectionString))
})
.Build();
启动host
客户端后,其构造的服务提供程序实例即可进行配置并可用。
通过 ClientBuilder 和几个包含配置属性层次结构的补充选项类来提供配置,以编程方式配置客户端。 有关详细信息,请参阅 客户端配置。
客户端配置示例:
var client = new ClientBuilder()
.Configure<ClusterOptions>(options =>
{
options.ClusterId = "my-first-cluster";
options.ServiceId = "MyOrleansService";
})
.UseAzureStorageClustering(
options => options.ConnectionString = connectionString)
.ConfigureApplicationParts(
parts => parts.AddApplicationPart(typeof(IValueGrain).Assembly))
.Build();
最后,需要在构造的客户端对象上调用 Connect()
该方法以将其连接到 Orleans 群集。 这是一个返回Task
的异步方法,因此需要使用await
或.Wait()
来等待其完成。
await client.Connect();
调用粒度
从客户端发起对 grains 的调用与 从 grain 代码中发起此类调用没有什么不同。 在这两种情况下,使用相同的 IGrainFactory.GetGrain<TGrainInterface>(Type, Guid) 方法(其中 T
目标粒度接口) 来获取粒度引用。 区别在于工厂对象调用 IGrainFactory.GetGrain。 在客户端代码中,通过连接的客户端对象执行此作,如以下示例所示:
IPlayerGrain player = client.GetGrain<IPlayerGrain>(playerId);
Task joinGameTask = player.JoinGame(game)
await joinGameTask;
对一个粒度方法的调用将根据粒度接口规则的要求返回Task或Task<TResult>。 客户端可以使用 await
关键字以异步方式等待返回的Task
,而不会阻塞线程,或者在某些情况下,使用Wait()
方法阻止当前执行线程。
从客户端代码调用格雷恩与从另一个格雷恩内部进行调用之间的主要区别在于格雷恩的单线程执行模型。 运行时 Orleans 将粒度限制为单线程,而客户端可以是多线程。
Orleans 客户端不提供任何此类保证,因此由客户端负责使用其环境的相应同步构造(锁、事件 Tasks
等)管理其并发性。
接收通知
有时,简单的请求-响应模式是不够的,客户端需要接收异步通知。 例如,当用户关注的人发布新消息时,用户可能需要通知。
使用 观察程序 是一种机制,允许将客户端对象公开为粒度状目标,以便由粒度调用。 向观察者发出的调用不提供任何成功或失败的指示,因为它们作为单向的,尽力而为的信息发送。 因此,应用程序代码有责任在必要时在观察者的基础上构建更高层次的可靠性机制。
将异步消息传送到客户端的另一种机制是 Streams。 流提供了每个消息传送成功或失败的指示,从而能够与客户端进行可靠的通信。
客户端连接
有两种情况:群集客户端可能会遇到连接问题:
- 客户端尝试连接到孤岛时。
- 对从连接的群集客户端获取的粒度引用进行调用时。
在第一种情况下,客户端尝试连接到孤岛。 如果客户端无法连接到任何孤岛,则会引发一个异常,指示出了问题。 可以注册一个 IClientConnectionRetryFilter 来处理异常,并决定是否重试。 如果未提供重试筛选器,或者重试筛选器返回 false
,客户端将永久放弃。
using Orleans.Runtime;
internal sealed class ClientConnectRetryFilter : IClientConnectionRetryFilter
{
private int _retryCount = 0;
private const int MaxRetry = 5;
private const int Delay = 1_500;
public async Task<bool> ShouldRetryConnectionAttempt(
Exception exception,
CancellationToken cancellationToken)
{
if (_retryCount >= MaxRetry)
{
return false;
}
if (!cancellationToken.IsCancellationRequested &&
exception is SiloUnavailableException siloUnavailableException)
{
await Task.Delay(++ _retryCount * Delay, cancellationToken);
return true;
}
return false;
}
}
有两种情况:群集客户端可能会遇到连接问题:
- 最初调用IClusterClient.Connect()方法时。
- 对从连接的群集客户端获取的粒度引用进行调用时。
在第一种情况下,该方法 Connect
会引发一个异常,指示出了问题。 这通常是 (但不一定) a SiloUnavailableException. 如果发生这种情况,群集客户端实例不可用,应将其释放。 可以选择向 Connect
方法提供重试筛选器函数,例如,在进行另一次尝试之前,可以等待指定的持续时间。 如果未提供重试筛选器,或者重试筛选器返回 false
,客户端将永久放弃。
如果 Connect
成功返回,则保证群集客户端在释放之前可用。 这意味着,即使客户端遇到连接问题,它也会尝试无限期恢复。 您可以在GatewayOptions由ClientBuilder提供的对象上配置确切的恢复行为,例如:
var client = new ClientBuilder()
// ...
.Configure<GatewayOptions>(
options => // Default is 1 min.
options.GatewayListRefreshPeriod = TimeSpan.FromMinutes(10))
.Build();
在第二种情况下,当粒度调用期间发生连接问题时,客户端会抛出一个 SiloUnavailableException。 可以按如下方式处理:
IPlayerGrain player = client.GetGrain<IPlayerGrain>(playerId);
try
{
await player.JoinGame(game);
}
catch (SiloUnavailableException)
{
// Lost connection to the cluster...
}
在这种情况下,粒度引用不会失效;在重新建立连接后,可以重试同一引用的调用。
依赖项注入
使用 .NET 泛型主机在程序中创建外部客户端的建议方法是通过依赖项注入注入单一 IClusterClient 实例。 然后,可以将此实例接受为托管服务、ASP.NET 控制器等中的构造函数参数。
注释
在与其连接的同一 Orleans 进程中共同托管一个 silo 时,无需 手动创建客户端;Orleans 将自动提供一个客户端并适当地管理其生命周期。
在不同的进程中(在不同的计算机上)连接到群集时,常见的模式是创建如下所示的托管服务:
using Microsoft.Extensions.Hosting;
namespace Client;
public sealed class ClusterClientHostedService : IHostedService
{
private readonly IClusterClient _client;
public ClusterClientHostedService(IClusterClient client)
{
_client = client;
}
public Task StartAsync(CancellationToken cancellationToken)
{
// Use the _client to consume grains...
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
=> Task.CompletedTask;
}
public class ClusterClientHostedService : IHostedService
{
private readonly IClusterClient _client;
public ClusterClientHostedService(IClusterClient client)
{
_client = client;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
// A retry filter could be provided here.
await _client.Connect();
}
public async Task StopAsync(CancellationToken cancellationToken)
{
await _client.Close();
_client.Dispose();
}
}
注册如下所示的服务:
await Host.CreateDefaultBuilder(args)
.UseOrleansClient(builder =>
{
builder.UseLocalhostClustering();
})
.ConfigureServices(services =>
{
services.AddHostedService<ClusterClientHostedService>();
})
.RunConsoleAsync();
示例:
下面是上一示例的扩展版本,展示了一个客户端应用程序:它连接到 Orleans,找到玩家账户,使用观察者模式订阅玩家参与的游戏会话更新,并打印通知,直到程序被手动终止。
try
{
using IHost host = Host.CreateDefaultBuilder(args)
.UseOrleansClient((context, client) =>
{
client.Configure<ClusterOptions>(options =>
{
options.ClusterId = "my-first-cluster";
options.ServiceId = "MyOrleansService";
})
.UseAzureStorageClustering(
options => options.ConfigureTableServiceClient(
context.Configuration["ORLEANS_AZURE_STORAGE_CONNECTION_STRING"]));
})
.UseConsoleLifetime()
.Build();
await host.StartAsync();
IGrainFactory client = host.Services.GetRequiredService<IGrainFactory>();
// Hardcoded player ID
Guid playerId = new("{2349992C-860A-4EDA-9590-000000000006}");
IPlayerGrain player = client.GetGrain<IPlayerGrain>(playerId);
IGameGrain? game = null;
while (game is null)
{
Console.WriteLine(
$"Getting current game for player {playerId}...");
try
{
game = await player.GetCurrentGame();
if (game is null) // Wait until the player joins a game
{
await Task.Delay(TimeSpan.FromMilliseconds(5_000));
}
}
catch (Exception ex)
{
Console.WriteLine($"Exception: {ex.GetBaseException()}");
}
}
Console.WriteLine(
$"Subscribing to updates for game {game.GetPrimaryKey()}...");
// Subscribe for updates
var watcher = new GameObserver();
await game.ObserveGameUpdates(
client.CreateObjectReference<IGameObserver>(watcher));
Console.WriteLine(
"Subscribed successfully. Press <Enter> to stop.");
}
catch (Exception e)
{
Console.WriteLine(
$"Unexpected Error: {e.GetBaseException()}");
}
await RunWatcherAsync();
// Block the main thread so that the process doesn't exit.
// Updates arrive on thread pool threads.
Console.ReadLine();
static async Task RunWatcherAsync()
{
try
{
var client = new ClientBuilder()
.Configure<ClusterOptions>(options =>
{
options.ClusterId = "my-first-cluster";
options.ServiceId = "MyOrleansService";
})
.UseAzureStorageClustering(
options => options.ConnectionString = connectionString)
.ConfigureApplicationParts(
parts => parts.AddApplicationPart(typeof(IValueGrain).Assembly))
.Build();
// Hardcoded player ID
Guid playerId = new("{2349992C-860A-4EDA-9590-000000000006}");
IPlayerGrain player = client.GetGrain<IPlayerGrain>(playerId);
IGameGrain game = null;
while (game is null)
{
Console.WriteLine(
$"Getting current game for player {playerId}...");
try
{
game = await player.GetCurrentGame();
if (game is null) // Wait until the player joins a game
{
await Task.Delay(TimeSpan.FromMilliseconds(5_000));
}
}
catch (Exception ex)
{
Console.WriteLine($"Exception: {ex.GetBaseException()}");
}
}
Console.WriteLine(
$"Subscribing to updates for game {game.GetPrimaryKey()}...");
// Subscribe for updates
var watcher = new GameObserver();
await game.SubscribeForGameUpdates(
await client.CreateObjectReference<IGameObserver>(watcher));
Console.WriteLine(
"Subscribed successfully. Press <Enter> to stop.");
Console.ReadLine();
}
catch (Exception e)
{
Console.WriteLine(
$"Unexpected Error: {e.GetBaseException()}");
}
}
}
/// <summary>
/// Observer class that implements the observer interface.
/// Need to pass a grain reference to an instance of
/// this class to subscribe for updates.
/// </summary>
class GameObserver : IGameObserver
{
public void UpdateGameScore(string score)
{
Console.WriteLine("New game score: {0}", score);
}
}