このチュートリアルでは、小売在庫シナリオで Service Bus のトピックとサブスクリプションを使用する方法について説明します。 これには、Azure portal と .NET を使用した発行/サブスクライブ チャネルが含まれます。 このシナリオの例は、複数の小売店の在庫品の更新です。 このシナリオでは、各店舗または一連の店舗が、在庫品を更新するためのメッセージを受け取ります。
Azure Service Bus は、アプリケーションとサービスの間で情報を送信するマルチテナント クラウド メッセージング サービスです。 非同期操作により、柔軟なブローカー メッセージング、構造化された先入れ先出し型 (FIFO) のメッセージング、および発行/購読機能が可能になります。 概要については、「Service Bus とは」を参照してください。
このチュートリアルでは、サブスクリプションとフィルターを使用してこのシナリオを実装する方法を示します。 まず、3 つのサブスクリプションを持つトピックを作成し、いくつかのルールとフィルターを追加してから、トピックとサブスクリプションにメッセージを送受信します。
このチュートリアルでは、以下の内容を学習します。
- Azure Portal を使用して、Service Bus のトピックとそのトピックへの 3 つのサブスクリプションを作成する
- .NET コードを使用して、サブスクリプションにフィルターを追加する
- 異なる内容の複数のメッセージを作成する
- メッセージを送信し、想定されるサブスクリプションに到着したことを確認する
- サブスクリプションからメッセージを受信する
前提条件
このチュートリアルを完了するには、以下のものが必要です。
- Azure サブスクリプション。 Azure Service Bus など、Azure の各種サービスを使用するには、サブスクリプションが必要です。 始める前に、無料アカウントを作成できます。
- Visual Studio 2019 以降。
Service Bus トピックとサブスクリプション
トピックへの各サブスクリプションは、各メッセージのコピーを受信できます。 トピックは完全にプロトコルであり、意味的には Service Bus キューと互換性があります。 Service Bus のトピックは、フィルターの条件を持つさまざまな選択ルールをサポートしています。メッセージのプロパティを設定または変更するオプションのアクションもあります。 ルールが一致するたびに、メッセージが生成されます。 ルール、フィルター、アクションの詳細については、「 トピック フィルターとアクション」を参照してください。
Azure Portal での名前空間の作成
Azure で Service Bus メッセージング エンティティの使用を開始するには、Azure 全体で一意の名前を持つ名前空間を作成します。 名前空間は、アプリケーション内の Service Bus リソース (キューやトピックなど) のスコープ コンテナーを提供します。
名前空間を作成するには:
Azure portal にサインインします。
左上からポップアップ メニューを選択し、[ すべてのサービス ] ページに移動します。
左側のナビゲーション バーで、[ 統合] を選択します。
[メッセージング サービス>Service Bus まで下にスクロールし、[作成] を選択します。
[名前空間の作成] ページの [基本] タブで、次の手順に従います。
[サブスクリプション] で、名前空間を作成する Azure サブスクリプションを選択します。
[リソース グループ] では、既存のリソース グループを選ぶか、新しく作成します。
次の名前付け規則を満たす 名前空間名 を入力します。
- この名前は Azure 全体で一意である必要があります。 その名前が使用できるかどうかがすぐに自動で確認されます。
- 名前の長さは 6 ~ 50 文字である。
- この名前には、文字、数字、ハイフン
-
のみを含めることができます。 - 名前の先頭は文字、末尾は文字または数字にする必要があります。
- 名前の末尾を
-sb
または-mgmt
にすることはできません。
[ 場所] で、名前空間をホストするリージョンを選択します。
[価格レベル] で、名前空間の価格レベル (Basic、Standard、Premium) を選択します。 このクイック スタートでは、 [Standard] を選択します。
Premium レベルを選択した場合は、名前空間の geo レプリケーションを有効にすることができます。 geo レプリケーション機能により、名前空間のメタデータとデータがプライマリ リージョンから 1 つ以上のセカンダリ リージョンに継続的にレプリケートされます。
重要
トピックとサブスクリプションを使用する場合は、Standard または Premium を選択してください。 トピックとサブスクリプションは、Basic 価格レベルではサポートされていません。
[Premium] 価格レベルを選択した場合は、メッセージング ユニットの数を指定します。 Premium レベルでは、各ワークロードが分離した状態で実行されるように、CPU とメモリのレベルでリソースが分離されます。 このリソース コンテナーは 、メッセージング ユニットと呼ばれます。 Premium 名前空間には、少なくとも 1 つのメッセージング ユニットがあります。 Service Bus の Premium 名前空間ごとに、1 個、2 個、4 個、8 個、または 16 個のメッセージング ユニットを選択できます。 詳細については、「 Service Bus Premium メッセージングレベル」を参照してください。
ページ下部にある [確認と作成] を選択します。
[確認および作成] ページで、設定を確認し、 [作成] を選択します。
リソースのデプロイが成功したら、デプロイ ページで [ リソースに移動 ] を選択します。
Service Bus 名前空間のホーム ページが表示されます。
名前空間への接続文字列を取得する (Azure portal)
新しい名前空間を作成すると、最初の Shared Access Signature (SAS) ポリシーが自動的に生成されます。 このポリシーには、プライマリ キーとセカンダリ キー、および各名前空間のすべての側面を完全に制御できるプライマリ接続文字列とセカンダリ接続文字列が含まれています。 通常の送信者と受信者に対してより制約のある権限を持つ規則を作成する方法の詳細については、 Service Bus の認証と承認に関するセクションを参照してください。
クライアントは、接続文字列を使用して Service Bus の名前空間に接続できます。 名前空間のプライマリ接続文字列をコピーするには、次の手順に従います。
[Service Bus 名前空間] ページで、[設定] を展開し、[共有アクセス ポリシー] を選択します。
[共有アクセス ポリシー] ページで、 [RootManageSharedAccessKey] を選択します。
[SAS ポリシー: RootManageSharedAccessKey] ウィンドウで、[プライマリ接続文字列] の横にあるコピー ボタンを選択します。 これで、後で使用できるように接続文字列がクリップボードにコピーされます。 この値をメモ帳などに一時的に貼り付けます。
このページを使用して、主キー、2 次キー、プライマリ接続文字列、セカンダリ接続文字列をコピーできます。
Azure Portal を使用したトピックの作成
[Service Bus 名前空間] ページで、左側のナビゲーション メニューの [エンティティ] を展開し、[トピック] を選択します。
[ + トピック] を選択します。
トピックの名前を入力します。 他のオプションは既定値のままにしてください。
[作成] を選択します。
トピックに対するサブスクリプションを作成する
前のセクションで作成したトピックを選択します。
[Service Bus トピック] ページで、[+ サブスクリプション] を選択します。
[サブスクリプションの作成] ページで、次の手順に従います。
サブスクリプションの名前として 「S1 」と入力します。
次に、 [作成] を選択してサブスクリプションを作成します。
前の手順を 2 回繰り返して、S2 と S3 というサブスクリプションを作成します。
サブスクリプションに対してフィルター ルールを作成する
名前空間、トピック、サブスクリプションをプロビジョニングし、接続文字列を名前空間に取得したら、サブスクリプションにフィルター規則を作成する準備が整います。 その後、メッセージを送受信します。 こちらの GitHub サンプル フォルダーでコードを調べることができます。
メッセージを送受信する
コードを実行するには、次の手順に従います。
コマンド プロンプト ウィンドウまたは PowerShell プロンプトで、 Service Bus GitHub リポジトリを複製します。
git clone https://github.com/Azure/azure-service-bus.git
サンプル フォルダー
azure-service-bus\samples\DotNet\Azure.Messaging.ServiceBus\BasicSendReceiveTutorialWithFilters
に移動します。このチュートリアルでメモ帳にコピーした接続文字列を取得します。 また、作成したトピックの名前も必要です。
コマンド プロンプトで、次のコマンドを入力します。
dotnet build
BasicSendReceiveTutorialWithFilters\bin\Debug\netcoreapp3.1
フォルダーに移動します。プログラムを実行するには、次のコマンドを入力します。 必ず
myConnectionString
を実際の値に置き換え、myTopicName
を作成したトピックの名前に置き換えてください。dotnet --roll-forward Major BasicSendReceiveTutorialWithFilters.dll -ConnectionString "myConnectionString" -TopicName "myTopicName"
コンソールの指示に従って、まずフィルター作成を選択します。 フィルターの作成処理には、既定のフィルターの削除が含まれています。 PowerShell または CLI を使用する場合は、既定のフィルターを削除する必要はありません。 コードを使用してフィルターを作成する場合は、最初に既定のフィルターを削除します。 コンソール コマンド 1 と 3 は、以前に作成したサブスクリプションに対するフィルターを管理するために役立ちます。
- 1 の実行: 既定のフィルターを削除します。
- 2 の実行: 独自のフィルターを追加します。
- 3 の実行: チュートリアルではこの手順はスキップします。 このオプションは、必要に応じて、設定した独自のフィルターを削除します。 既定のフィルターは再作成されません。
フィルターの作成後は、メッセージを送信できます。 4 キーを押して、トピックに送信されている 10 個のメッセージを観察します。
5 キーを押して、受信中のメッセージを観察します。 10 個のメッセージが表示されない場合は、m キーを押してメニューを表示し、もう一度 5 キーを押します。
リソースをクリーンアップする
不要になったリソースをクリーンアップするには、次の手順に従います。
- Azure portal で名前空間に移動します。
- [Service Bus 名前空間] ページで、コマンド バーから [削除] を選択して、名前空間とそこにあるリソース (キュー、トピック、サブスクリプション) を削除します。
サンプル コードを理解する
このセクションでは、サンプル コードの処理内容の詳細について説明します。
接続文字列とトピックを取得する
コードはまず一連の変数を宣言し、プログラムの残りの処理を実行します。
string ServiceBusConnectionString;
string TopicName;
static string[] Subscriptions = { "S1", "S2", "S3" };
static IDictionary<string, string[]> SubscriptionFilters = new Dictionary<string, string[]> {
{ "S1", new[] { "StoreId IN('Store1', 'Store2', 'Store3')", "StoreId = 'Store4'"} },
{ "S2", new[] { "sys.To IN ('Store5','Store6','Store7') OR StoreId = 'Store8'" } },
{ "S3", new[] { "sys.To NOT IN ('Store1','Store2','Store3','Store4','Store5','Store6','Store7','Store8') OR StoreId NOT IN ('Store1','Store2','Store3','Store4','Store5','Store6','Store7','Store8')" } }
};
// You can have only have one action per rule and this sample code supports only one action for the first filter, which is used to create the first rule.
static IDictionary<string, string> SubscriptionAction = new Dictionary<string, string> {
{ "S1", "" },
{ "S2", "" },
{ "S3", "SET sys.Label = 'SalesEvent'" }
};
static string[] Store = { "Store1", "Store2", "Store3", "Store4", "Store5", "Store6", "Store7", "Store8", "Store9", "Store10" };
static string SysField = "sys.To";
static string CustomField = "StoreId";
static int NrOfMessagesPerStore = 1; // Send at least 1.
接続文字列とトピック名は、次のようにコマンド ライン パラメーターを使用して渡されます。 これらは、 Main()
メソッドで読み取られます。
static void Main(string[] args)
{
string ServiceBusConnectionString = "";
string TopicName = "";
for (int i = 0; i < args.Length; i++)
{
if (args[i] == "-ConnectionString")
{
Console.WriteLine($"ConnectionString: {args[i + 1]}");
ServiceBusConnectionString = args[i + 1]; // Alternatively enter your connection string here.
}
else if (args[i] == "-TopicName")
{
Console.WriteLine($"TopicName: {args[i + 1]}");
TopicName = args[i + 1]; // Alternatively enter your queue name here.
}
}
if (ServiceBusConnectionString != "" && TopicName != "")
{
Program P = StartProgram(ServiceBusConnectionString, TopicName);
P.PresentMenu().GetAwaiter().GetResult();
}
else
{
Console.WriteLine("Specify -Connectionstring and -TopicName to execute the example.");
Console.ReadKey();
}
}
既定のフィルターを削除する
サブスクリプションを作成すると、Service Bus でサブスクリプションごとに既定のフィルターが作成されます。 このフィルターを使用すると、トピックに送信されたすべてのメッセージを受信できます。 カスタム フィルターを使用する場合は、次のコードのように既定のフィルターを削除できます。
private async Task RemoveDefaultFilters()
{
Console.WriteLine($"Starting to remove default filters.");
try
{
var client = new ServiceBusAdministrationClient(ServiceBusConnectionString);
foreach (var subscription in Subscriptions)
{
await client.DeleteRuleAsync(TopicName, subscription, CreateRuleOptions.DefaultRuleName);
Console.WriteLine($"Default filter for {subscription} has been removed.");
}
Console.WriteLine("All default Rules have been removed.\n");
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
await PresentMenu();
}
フィルターの作成
次のコードは、このチュートリアルで定義したカスタム フィルターを追加します。
private async Task CreateCustomFilters()
{
try
{
for (int i = 0; i < Subscriptions.Length; i++)
{
var client = new ServiceBusAdministrationClient(ServiceBusConnectionString);
string[] filters = SubscriptionFilters[Subscriptions[i]];
if (filters[0] != "")
{
int count = 0;
foreach (var myFilter in filters)
{
count++;
string action = SubscriptionAction[Subscriptions[i]];
if (action != "")
{
await client.CreateRuleAsync(TopicName, Subscriptions[i], new CreateRuleOptions
{
Filter = new SqlRuleFilter(myFilter),
Action = new SqlRuleAction(action),
Name = $"MyRule{count}"
});
}
else
{
await client.CreateRuleAsync(TopicName, Subscriptions[i], new CreateRuleOptions
{
Filter = new SqlRuleFilter(myFilter),
Name = $"MyRule{count}"
});
}
}
}
Console.WriteLine($"Filters and actions for {Subscriptions[i]} have been created.");
}
Console.WriteLine("All filters and actions have been created.\n");
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
await PresentMenu();
}
作成したカスタム フィルターを削除する
サブスクリプションに対するすべてのフィルターを削除するには、次のコードでその方法を確認してください。
private async Task CleanUpCustomFilters()
{
foreach (var subscription in Subscriptions)
{
try
{
var client = new ServiceBusAdministrationClient(ServiceBusConnectionString);
IAsyncEnumerator<RuleProperties> rules = client.GetRulesAsync(TopicName, subscription).GetAsyncEnumerator();
while (await rules.MoveNextAsync())
{
await client.DeleteRuleAsync(TopicName, subscription, rules.Current.Name);
Console.WriteLine($"Rule {rules.Current.Name} has been removed.");
}
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
}
Console.WriteLine("All default filters have been removed.\n");
await PresentMenu();
}
メッセージを送信する
トピックにメッセージを送信する処理は、メッセージをキューに送信する処理に似ています。 この例は、タスク一覧と非同期処理を使用してメッセージを送信する方法を示しています。
public async Task SendMessages()
{
try
{
await using var client = new ServiceBusClient(ServiceBusConnectionString);
var taskList = new List<Task>();
for (int i = 0; i < Store.Length; i++)
{
taskList.Add(SendItems(client, Store[i]));
}
await Task.WhenAll(taskList);
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
Console.WriteLine("\nAll messages sent.\n");
}
private async Task SendItems(ServiceBusClient client, string store)
{
// create the sender
ServiceBusSender tc = client.CreateSender(TopicName);
for (int i = 0; i < NrOfMessagesPerStore; i++)
{
Random r = new Random();
Item item = new Item(r.Next(5), r.Next(5), r.Next(5));
// Note the extension class which is serializing an deserializing messages
ServiceBusMessage message = item.AsMessage();
message.To = store;
message.ApplicationProperties.Add("StoreId", store);
message.ApplicationProperties.Add("Price", item.GetPrice().ToString());
message.ApplicationProperties.Add("Color", item.GetColor());
message.ApplicationProperties.Add("Category", item.GetItemCategory());
await tc.SendMessageAsync(message);
Console.WriteLine($"Sent item to Store {store}. Price={item.GetPrice()}, Color={item.GetColor()}, Category={item.GetItemCategory()}"); ;
}
}
メッセージを受信する
メッセージはタスク リストを使用して再び受信され、コードではバッチ処理が使用されます。 バッチ処理を使用して送受信することはできますが、この例はバッチ受信方法のみを示しています。 実際には、ループから抜けることはありませんが、ループを維持するよう、1 分などの長めの期間を設定します。 ブローカーへの受信呼び出しは、この時間の間、開いたままにされます。 メッセージが到着すると、すぐに返されます。 新しい受信呼び出しが発行されます。
この概念は、長いポーリングと呼ばれます。 リポジトリ内のいくつかのサンプルで確認できる受信ポンプを使用することは、より一般的なオプションです。 詳細については、「 Azure portal を使用して Service Bus 名前空間とキューを作成する」を参照してください。
public async Task Receive()
{
var taskList = new List<Task>();
for (var i = 0; i < Subscriptions.Length; i++)
{
taskList.Add(this.ReceiveMessages(Subscriptions[i]));
}
await Task.WhenAll(taskList);
}
private async Task ReceiveMessages(string subscription)
{
await using var client = new ServiceBusClient(ServiceBusConnectionString);
ServiceBusReceiver receiver = client.CreateReceiver(TopicName, subscription);
// In reality you would not break out of the loop like in this example but would keep looping. The receiver keeps the connection open
// to the broker for the specified amount of seconds and the broker returns messages as soon as they arrive. The client then initiates
// a new connection. So in reality you would not want to break out of the loop.
// Also note that the code shows how to batch receive, which you would do for performance reasons. For convenience you can also always
// use the regular receive pump which we show in our Quick Start and in other GitHub samples.
while (true)
{
try
{
//IList<Message> messages = await receiver.ReceiveAsync(10, TimeSpan.FromSeconds(2));
// Note the extension class which is serializing an deserializing messages and testing messages is null or 0.
// If you think you did not receive all messages, just press M and receive again via the menu.
IReadOnlyList<ServiceBusReceivedMessage> messages = await receiver.ReceiveMessagesAsync(maxMessages: 100);
if (messages.Any())
{
foreach (ServiceBusReceivedMessage message in messages)
{
lock (Console.Out)
{
Item item = message.As<Item>();
IReadOnlyDictionary<string, object> myApplicationProperties = message.ApplicationProperties;
Console.WriteLine($"StoreId={myApplicationProperties["StoreId"]}");
if (message.Subject != null)
{
Console.WriteLine($"Subject={message.Subject}");
}
Console.WriteLine(
$"Item data: Price={item.GetPrice()}, Color={item.GetColor()}, Category={item.GetItemCategory()}");
}
await receiver.CompleteMessageAsync(message);
}
}
else
{
break;
}
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
}
}
注
Service Bus リソースは、Service Bus Explorer で管理できます。 Service Bus Explorer を使用すると、ユーザーは Service Bus 名前空間に接続し、簡単な方法でメッセージング エンティティを管理できます。 このツールは、インポート/エクスポート機能や、トピック、キュー、サブスクリプション、リレー サービス、通知ハブ、イベント ハブをテストする機能などの高度な機能を提供します。
関連コンテンツ
このチュートリアルでは、Azure Portal を使用してリソースをプロビジョニングした後、Service Bus のトピックとそのサブスクリプションからメッセージを送受信しました。 以下の方法を学習しました。
- Azure Portal を使用して、Service Bus トピックとそのトピックへの 1 つ以上のサブスクリプションを作成する
- .NET コードを使用してトピック フィルターを追加する
- 異なる内容の 2 つのメッセージを作成する
- メッセージを送信し、所定のサブスクリプションに到着したことを確認する
- サブスクリプションからメッセージを受信する
メッセージの送受信のその他の例については、 GitHub の Service Bus サンプルを参照してください。
Service Bus の公開/サブスクライブ機能の使用方法の詳細については、次のチュートリアルに進んでください。