次の方法で共有


チュートリアル: C# と .NET を使用して非同期ストリームを生成して使用する

非同期ストリームは 、データのストリーミング ソースをモデル化します。 データ ストリームは、多くの場合、要素を非同期的に取得または生成します。 これらは、非同期ストリーミング データ ソース用の自然なプログラミング モデルを提供します。

このチュートリアルで学習する内容は次のとおりです。

  • データ要素のシーケンスを非同期的に生成するデータ ソースを作成します。
  • そのデータ ソースを非同期的に使用します。
  • 非同期ストリームのキャンセルとキャプチャされたコンテキストをサポートします。
  • 新しいインターフェイスとデータ ソースが以前の同期データ シーケンスより優先されるタイミングを認識します。

[前提条件]

C# コンパイラを含め、.NET を実行するようにマシンを設定する必要があります。 C# コンパイラは、 Visual Studio 2022 または .NET SDK で使用できます。

GitHub GraphQL エンドポイントにアクセスできるように、 GitHub アクセス トークン を作成する必要があります。 GitHub アクセス トークンに対して次のアクセス許可を選択します。

  • リポジトリ:ステータス
  • パブリックリポジトリ

アクセス トークンを安全な場所に保存して、それを使用して GitHub API エンドポイントにアクセスできるようにします。

Warnung

個人用アクセス トークンをセキュリティで保護します。 個人用アクセス トークンを持つソフトウェアは、アクセス権を使用して GitHub API 呼び出しを行うことができます。

このチュートリアルでは、Visual Studio や .NET CLI など、C# と .NET について理解していることを前提としています。

スターター アプリケーションを実行する

このチュートリアルで使用するスターター アプリケーションのコードは、非同期プログラミング/スニペット フォルダー内の dotnet/docs リポジトリから取得できます。

スターター アプリケーションは、 GitHub GraphQL インターフェイスを使用して dotnet/docs リポジトリに書き込まれた最近の問題を取得するコンソール アプリケーションです。 まず、スターター アプリの Main メソッドの次のコードを確認します。

static async Task Main(string[] args)
{
    //Follow these steps to create a GitHub Access Token
    // https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
    //Select the following permissions for your GitHub Access Token:
    // - repo:status
    // - public_repo
    // Replace the 3rd parameter to the following code with your GitHub access token.
    var key = GetEnvVariable("GitHubKey",
    "You must store your GitHub key in the 'GitHubKey' environment variable",
    "");

    var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
    {
        Credentials = new Octokit.Credentials(key)
    };

    var progressReporter = new progressStatus((num) =>
    {
        Console.WriteLine($"Received {num} issues in total");
    });
    CancellationTokenSource cancellationSource = new CancellationTokenSource();

    try
    {
        var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
            cancellationSource.Token, progressReporter);
        foreach(var issue in results)
            Console.WriteLine(issue);
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Work has been cancelled");
    }
}

GitHubKey環境変数を個人用アクセス トークンに設定することも、GetEnvVariableの呼び出しの最後の引数を個人用アクセス トークンに置き換えることもできます。 ソースを他のユーザーと共有する場合は、ソース コードにアクセス コードを配置しないでください。 アクセス コードを共有ソース リポジトリにアップロードしないでください。

GitHub クライアントを作成した後、 Main のコードによって、進行状況レポート オブジェクトとキャンセル トークンが作成されます。 これらのオブジェクトが作成されると、MainRunPagedQueryAsync呼び出して、作成された最新の 250 個の問題を取得します。 そのタスクが完了すると、結果が表示されます。

スターター アプリケーションを実行すると、このアプリケーションの実行方法に関するいくつかの重要な観察を行うことができます。 GitHub から返された各ページの進行状況が報告されます。 GitHub が新しい問題の各ページを返す前に、顕著な一時停止を観察できます。 最後に、問題は、GitHub から 10 ページすべてが取得された後にのみ表示されます。

実装を確認する

この実装では、前のセクションで説明した動作を観察した理由が明らかになります。 RunPagedQueryAsync のコードを調べます:

private static async Task<JArray> RunPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    JArray finalResults = new JArray();
    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();
        finalResults.Merge(issues(results)["nodes"]!);
        progress?.Report(issuesReturned);
        cancel.ThrowIfCancellationRequested();
    }
    return finalResults;

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

このメソッドが最初に行うことは、 GraphQLRequest クラスを使用して POST オブジェクトを作成することです。

public class GraphQLRequest
{
    [JsonProperty("query")]
    public string? Query { get; set; }

    [JsonProperty("variables")]
    public IDictionary<string, object> Variables { get; } = new Dictionary<string, object>();

    public string ToJsonText() =>
        JsonConvert.SerializeObject(this);
}

これは、POST オブジェクト本体を形成するのに役立ち、 ToJsonText メソッドを使用して単一の文字列として提示される JSON に正しく変換します。これにより、要求本文から改行文字がすべて削除され、 \ (バックスラッシュ) エスケープ文字でマークされます。

前のコードのページング アルゴリズムと非同期構造に焦点を当ててみましょう。 (GitHub GraphQL API の詳細については、 GitHub GraphQL のドキュメント を参照してください)。 RunPagedQueryAsync メソッドは、最新のものから最も古いものまでの問題を列挙します。 1 ページあたり 25 個の問題を要求し、前のページを続行するために応答の pageInfo 構造を調べます。 これは、複数ページの応答に対する GraphQL の標準的なページング のサポートに従います。 応答には、pageInfo値と、前のページの要求に使用するhasPreviousPages値を含むstartCursor オブジェクトが含まれます。 問題は nodes 配列にあります。 RunPagedQueryAsync メソッドは、すべてのページからのすべての結果を含む配列にこれらのノードを追加します。

結果のページを取得して復元した後、 RunPagedQueryAsync は進行状況を報告し、取り消しを確認します。 取り消しが要求された場合、RunPagedQueryAsyncOperationCanceledException を投げます。

このコードには、改善できる要素がいくつかあります。 最も重要なのは、 RunPagedQueryAsync 返されるすべての問題にストレージを割り当てる必要があります。 開いているすべての問題を取得するには、取得したすべての問題を格納するためにはるかに多くのメモリが必要になるため、このサンプルは 250 の問題で停止します。 進行状況レポートと取り消しをサポートするためのプロトコルにより、アルゴリズムの最初の読み取りを理解するのが困難になります。 より多くの型と API が関係しています。 取り消しが要求される場所と許可されている場所を理解するには、 CancellationTokenSource とそれに関連付けられている CancellationToken を介して通信をトレースする必要があります。

非同期ストリームは、より良い方法を提供します

非同期ストリームと関連する言語サポートは、これらすべての問題に対処します。 シーケンスを生成するコードは、 yield return を使用して、 async 修飾子で宣言されたメソッド内の要素を返すようになりました。 await foreach ループを使用して任意のシーケンスを使用するのと同じように、foreach ループを使用して非同期ストリームを使用できます。

これらの新しい言語機能は、.NET Standard 2.1 に追加され、.NET Core 3.0 に実装された 3 つの新しいインターフェイスに依存します。

これら 3 つのインターフェイスは、ほとんどの C# 開発者にとってなじみのあるものでなければなりません。 これらは、同期版と同じように動作します。

なじみのない型の 1 つは System.Threading.Tasks.ValueTaskです。 ValueTask構造体は、System.Threading.Tasks.Task クラスと同様の API を提供します。 ValueTask は、パフォーマンス上の理由から、これらのインターフェイスで使用されます。

非同期ストリームへの変換

次に、 RunPagedQueryAsync メソッドを変換して非同期ストリームを生成します。 まず、 RunPagedQueryAsync のシグネチャを変更して IAsyncEnumerable<JToken>を返し、次のコードに示すようにキャンセル トークンと進行状況オブジェクトをパラメーター リストから削除します。

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)

スターター コードは、次のコードに示すように、ページが取得されるときに各ページを処理します。

finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();

これら 3 行を次のコードに置き換えます。

foreach (JObject issue in issues(results)["nodes"]!)
    yield return issue;

このメソッドの前の finalResults の宣言と、変更したループに続く return ステートメントを削除することもできます。

非同期ストリームを生成するための変更が完了しました。 完成したメソッドは、次のコードのようになります。

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

次に、非同期ストリームを使用するようにコレクションを使用するコードを変更します。 問題のコレクションを処理する Main で、次のコードを見つけます。

var progressReporter = new progressStatus((num) =>
{
    Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();

try
{
    var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
        cancellationSource.Token, progressReporter);
    foreach(var issue in results)
        Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
    Console.WriteLine("Work has been cancelled");
}

そのコードを次の await foreach ループに置き換えます。

int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
    Console.WriteLine(issue);
    Console.WriteLine($"Received {++num} issues in total");
}

新しいインターフェイス IAsyncEnumerator<T> は、 IAsyncDisposableから派生します。 つまり、上記のループは、ループの終了時にストリームを非同期的に破棄します。 ループは次のコードのようになります。

int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetAsyncEnumerator();
try
{
    while (await enumerator.MoveNextAsync())
    {
        var issue = enumerator.Current;
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
} finally
{
    if (enumerator != null)
        await enumerator.DisposeAsync();
}

既定では、ストリーム要素はキャプチャされたコンテキストで処理されます。 コンテキストのキャプチャを無効にする場合は、 TaskAsyncEnumerableExtensions.ConfigureAwait 拡張メソッドを使用します。 同期コンテキストと現在のコンテキストのキャプチャの詳細については、 タスク ベースの非同期パターンの使用に関する記事を参照してください。

非同期ストリームでは、他の async メソッドと同じプロトコルを使用したキャンセルがサポートされます。 キャンセルをサポートするには、非同期反復子メソッドのシグネチャを次のように変更します。

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

System.Runtime.CompilerServices.EnumeratorCancellationAttribute属性により、コンパイラはIAsyncEnumerator<T>のコードを生成し、GetAsyncEnumeratorに渡されたトークンをその引数として非同期反復子の本体に表示します。 runQueryAsync内では、トークンの状態を調べ、要求された場合はそれ以上の作業を取り消すことができます。

別の拡張メソッド ( WithCancellation) を使用して、キャンセル トークンを非同期ストリームに渡します。 次のように、問題を列挙するループを変更します。

private static async Task EnumerateWithCancellation(GitHubClient client)
{
    int num = 0;
    var cancellation = new CancellationTokenSource();
    await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs")
        .WithCancellation(cancellation.Token))
    {
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
}

完成したチュートリアルのコードは、非同期プログラミング/スニペット フォルダーの dotnet/docs リポジトリから取得できます。

完成したアプリケーションを実行する

アプリケーションをもう一度実行する その動作とスターター アプリケーションの動作を比較します。 結果の最初のページは、使用可能になるとすぐに列挙されます。 新しい各ページが要求されて取得されると、監視可能な一時停止が発生し、次のページの結果がすぐに列挙されます。 取り消しを処理するために try / catch ブロックは必要ありません。呼び出し元はコレクションの列挙を停止できます。 各ページがダウンロードされると非同期ストリームによって結果が生成されるため、進行状況が明確に報告されます。 返される各問題の状態は、 await foreach ループにシームレスに含まれます。 進行状況を追跡するためにコールバック オブジェクトは必要ありません。

コードを調べることで、メモリ使用量の改善を確認できます。 列挙する前にすべての結果を格納するためにコレクションを割り当てる必要がなくなりました。 呼び出し元は、結果を使用する方法と、ストレージ コレクションが必要かどうかを判断できます。

スターター アプリケーションと完成したアプリケーションの両方を実行すると、実装の違いを自分で確認できます。 完了したら、このチュートリアルを開始したときに作成した GitHub アクセス トークンを削除できます。 攻撃者がそのトークンにアクセスした場合、資格情報を使用して GitHub API にアクセスする可能性があります。

このチュートリアルでは、非同期ストリームを使用して、データのページを返すネットワーク API から個々の項目を読み取ります。 非同期ストリームは、ストック ティッカーやセンサー デバイスなどの "終わることのないストリーム" から読み取ることもできます。 MoveNextAsyncの呼び出しは、次の項目が使用可能になるとすぐに返されます。