다음을 통해 공유


자습서: C# 및 .NET을 사용하여 비동기 스트림 생성 및 사용

비동기 스트림은 데이터의 스트리밍 원본을 모델링합니다. 데이터 스트림은 종종 요소를 비동기적으로 검색하거나 생성합니다. 비동기 스트리밍 데이터 원본에 대한 자연스러운 프로그래밍 모델을 제공합니다.

이 자습서에서는 다음 방법을 알아봅니다.

  • 데이터 요소 시퀀스를 비동기적으로 생성하는 데이터 원본을 만듭니다.
  • 해당 데이터 원본을 비동기적으로 사용합니다.
  • 비동기 스트림에 대한 취소 및 캡처된 컨텍스트를 지원합니다.
  • 새 인터페이스 및 데이터 원본이 이전 동기 데이터 시퀀스에 선호되는 시기를 인식합니다.

필수 조건

C# 컴파일러를 포함하여 .NET을 실행하도록 컴퓨터를 설정해야 합니다. C# 컴파일러는 Visual Studio 2022 또는 .NET SDK사용할 수 있습니다.

GitHub GraphQL 엔드포인트에 액세스할 수 있도록 GitHub 액세스 토큰을 만들어야 합니다. GitHub 액세스 토큰에 대해 다음 권한을 선택합니다.

  • 저장소:상태
  • 공개_저장소

액세스 토큰을 안전한 장소에 저장하여 GitHub API 엔드포인트에 액세스하는 데 사용할 수 있습니다.

경고

개인 액세스 토큰을 안전하게 유지합니다. 개인 액세스 토큰이 있는 모든 소프트웨어는 액세스 권한을 사용하여 GitHub API를 호출할 수 있습니다.

이 자습서에서는 여러분이 Visual Studio 또는 .NET CLI를 비롯한 C# 및 .NET에 익숙하다고 가정합니다.

시작 애플리케이션 실행

동기 프로그래밍/코드 조각 폴더의 dotnet/docs 리포지토리에서 이 자습서에서 사용되는 시작 애플리케이션에 대한 코드를 가져올 수 있습니다.

시작 애플리케이션은 GitHub GraphQL 인터페이스를 사용하여 dotnet/docs 리포지토리에 기록된 최근 문제를 검색하는 콘솔 애플리케이션입니다. 시작 앱 Main 메서드에 대한 다음 코드를 확인하여 시작합니다.

static async Task Main(string[] args)
{
    //Follow these steps to create a GitHub Access Token
    // https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
    //Select the following permissions for your GitHub Access Token:
    // - repo:status
    // - public_repo
    // Replace the 3rd parameter to the following code with your GitHub access token.
    var key = GetEnvVariable("GitHubKey",
    "You must store your GitHub key in the 'GitHubKey' environment variable",
    "");

    var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
    {
        Credentials = new Octokit.Credentials(key)
    };

    var progressReporter = new progressStatus((num) =>
    {
        Console.WriteLine($"Received {num} issues in total");
    });
    CancellationTokenSource cancellationSource = new CancellationTokenSource();

    try
    {
        var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
            cancellationSource.Token, progressReporter);
        foreach(var issue in results)
            Console.WriteLine(issue);
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Work has been cancelled");
    }
}

환경 변수를 개인 액세스 토큰으로 GitHubKey 설정하거나 호출의 마지막 인수를 GetEnvVariable 개인 액세스 토큰으로 바꿀 수 있습니다. 원본을 다른 사용자와 공유하는 경우 소스 코드에 액세스 코드를 넣지 마세요. 공유 소스 리포지토리에 액세스 코드를 업로드하지 않습니다.

GitHub 클라이언트를 만든 후 코드는 Main 진행률 보고 개체 및 취소 토큰을 만듭니다. 이러한 객체들이 생성되면, Main는 가장 최근에 생성된 250개의 이슈를 검색하기 위해 RunPagedQueryAsync를 호출합니다. 해당 작업이 완료되면 결과가 표시됩니다.

시작 애플리케이션을 실행할 때 이 애플리케이션이 실행되는 방법에 대한 몇 가지 중요한 관찰을 수행할 수 있습니다. GitHub에서 반환된 각 페이지에 대해 보고된 진행률이 표시됩니다. GitHub가 각각의 새 문제 페이지를 반환하기 전에 눈에 띄는 일시 중지를 관찰할 수 있습니다. 마지막으로 GitHub에서 10페이지를 모두 검색한 후에만 문제가 표시됩니다.

구현 검사

구현은 이전 섹션에서 설명한 동작을 관찰한 이유를 보여줍니다. 코드에서 RunPagedQueryAsync를 검사합니다.

private static async Task<JArray> RunPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    JArray finalResults = new JArray();
    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();
        finalResults.Merge(issues(results)["nodes"]!);
        progress?.Report(issuesReturned);
        cancel.ThrowIfCancellationRequested();
    }
    return finalResults;

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

이 메서드가 가장 먼저 수행하는 작업은 GraphQLRequest 클래스를 사용하여 POST 개체를 생성하는 것입니다.

public class GraphQLRequest
{
    [JsonProperty("query")]
    public string? Query { get; set; }

    [JsonProperty("variables")]
    public IDictionary<string, object> Variables { get; } = new Dictionary<string, object>();

    public string ToJsonText() =>
        JsonConvert.SerializeObject(this);
}

POST 개체 본문을 형성하고 메서드를 사용하여 단일 문자열로 표시되는 JSON으로 ToJsonText 올바르게 변환하는 데 도움이 되며, 요청 본문에서 줄 바꿈 문자를 모두 제거하여 (백슬래시) 이스케이프 문자로 \ 표시합니다.

이전 코드의 페이징 알고리즘 및 비동기 구조에 집중해 보겠습니다. ( GitHub GraphQL API에 대한 자세한 내용은 GitHub GraphQL 설명서를 참조할 수 있습니다.) 이 메서드는 RunPagedQueryAsync 가장 최근에서 가장 오래된 문제를 열거합니다. 페이지당 25개의 문제를 요청하고, 응답의 구조를 검사하여 이전 페이지를 계속합니다 pageInfo. 이는 다중 페이지 응답에 대한 GraphQL의 표준 페이징 지원을 따릅니다. 이 응답에는 이전 페이지를 요청하는 데 사용되는 pageInfo 값과 hasPreviousPages 값을 포함한 startCursor 개체가 포함됩니다. 문제는 배열에 있습니다 nodes . 메서드는 RunPagedQueryAsync 모든 페이지의 모든 결과를 포함하는 배열에 이러한 노드를 추가합니다.

결과 RunPagedQueryAsync 페이지를 검색하고 복원한 후 진행 상황을 보고하고 취소를 확인합니다. RunPagedQueryAsync가 취소가 요청된 경우 OperationCanceledException를 던집니다.

이 코드에는 개선할 수 있는 몇 가지 요소가 있습니다. 가장 중요한 것은 반환된 RunPagedQueryAsync 모든 문제에 대해 스토리지를 할당해야 한다는 것입니다. 열려 있는 모든 문제를 검색하려면 검색된 모든 문제를 저장하는 데 훨씬 더 많은 메모리가 필요하기 때문에 이 샘플은 250개 문제에서 중지됩니다. 진행률 보고서 및 취소를 지원하는 프로토콜은 알고리즘이 첫 번째 읽기에서 이해하기 어렵게 만듭니다. 더 많은 형식과 API가 관련되어 있습니다. 통신 CancellationTokenSource 및 연결된 CancellationToken을 통해 통신을 추적하여 취소 요청이 발생하는 위치와 취소가 승인되는 위치를 이해해야 합니다.

비동기 스트림은 더 나은 방법을 제공합니다.

비동기 스트림 및 관련 언어 지원은 이러한 모든 문제를 해결합니다. 이제 시퀀스를 생성하는 코드는 yield return 한정자를 사용해 선언된 메서드에서 요소를 반환하기 위해 async를 사용할 수 있습니다. 일반적인 시퀀스를 await foreach 루프를 사용하여 소비하는 것처럼, foreach 루프를 사용하여 비동기 스트림도 소비할 수 있습니다.

이러한 새로운 언어 기능은 .NET Standard 2.1에 추가되고 .NET Core 3.0에서 구현된 세 가지 새로운 인터페이스에 따라 달라집니다.

이러한 세 가지 인터페이스는 대부분의 C# 개발자에게 익숙해야 합니다. 동기 대응과 유사한 방식으로 동작합니다.

익숙하지 않을 수 있는 한 가지 유형은 System.Threading.Tasks.ValueTask입니다. ValueTask 구조체는 System.Threading.Tasks.Task 클래스와 유사한 API를 제공합니다. ValueTask 는 성능상의 이유로 이러한 인터페이스에서 사용됩니다.

비동기 스트림으로 변환

다음으로 메서드를 RunPagedQueryAsync 변환하여 비동기 스트림을 생성합니다. 먼저 RunPagedQueryAsync의 반환형을 IAsyncEnumerable<JToken>로 변경하고, 매개변수 목록에서 취소 토큰 및 진행률 개체를 제거하세요. 아래 코드가 변경 사항을 보여줍니다.

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)

시작 코드는 다음 코드와 같이 페이지가 검색될 때 각 페이지를 처리합니다.

finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();

이 세 줄을 다음 코드로 바꿉다.

foreach (JObject issue in issues(results)["nodes"]!)
    yield return issue;

이 메서드의 finalResults 앞부분 선언과 수정한 return 루프 뒤에 오는 문을 제거할 수도 있습니다.

비동기 스트림을 생성하기 위해 변경 내용을 완료했습니다. 완성된 메서드는 다음 코드와 유사합니다.

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

다음으로, 비동기 스트림을 사용하도록 컬렉션을 사용하는 코드를 변경합니다. Main에서 문제 모음을 처리하는 다음 코드를 찾으십시오.

var progressReporter = new progressStatus((num) =>
{
    Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();

try
{
    var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
        cancellationSource.Token, progressReporter);
    foreach(var issue in results)
        Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
    Console.WriteLine("Work has been cancelled");
}

해당 코드를 다음 루프로 바꿉다.await foreach

int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
    Console.WriteLine(issue);
    Console.WriteLine($"Received {++num} issues in total");
}

새 인터페이스 IAsyncEnumerator<T> 는 .에서 IAsyncDisposable파생됩니다. 즉, 이전 루프는 루프가 완료될 때 스트림을 비동기적으로 삭제합니다. 루프가 다음 코드와 같다고 상상할 수 있습니다.

int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetAsyncEnumerator();
try
{
    while (await enumerator.MoveNextAsync())
    {
        var issue = enumerator.Current;
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
} finally
{
    if (enumerator != null)
        await enumerator.DisposeAsync();
}

기본적으로 스트림 요소는 캡처된 컨텍스트에서 처리됩니다. 컨텍스트 캡처를 사용하지 않도록 설정하려면 확장 메서드를 TaskAsyncEnumerableExtensions.ConfigureAwait 사용합니다. 동기화 컨텍스트 및 현재 컨텍스트 캡처에 대한 자세한 내용은 작업 기반 비동기 패턴 사용에 대한 문서를 참조하세요.

비동기 스트림은 다른 async 메서드와 동일한 프로토콜을 사용하여 취소를 지원합니다. 취소를 지원하기 위해 다음과 같이 비동기 반복기 메서드에 대한 서명을 수정합니다.

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

System.Runtime.CompilerServices.EnumeratorCancellationAttribute 특성은 전달된 IAsyncEnumerator<T> 토큰을 비동기 반복기의 본문에서 인수로 사용할 수 있도록 하는 GetAsyncEnumerator 코드 생성을 컴파일러에 지시합니다. 내부에서 runQueryAsync토큰의 상태를 검사하고 요청된 경우 추가 작업을 취소할 수 있습니다.

다른 확장 메서드를 WithCancellation사용하여 취소 토큰을 비동기 스트림에 전달합니다. 다음과 같이 문제를 열거하는 루프를 수정합니다.

private static async Task EnumerateWithCancellation(GitHubClient client)
{
    int num = 0;
    var cancellation = new CancellationTokenSource();
    await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs")
        .WithCancellation(cancellation.Token))
    {
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
}

완료된 자습서에 대한 코드는 비동기 프로그래밍/코드 조각 폴더의dotnet/docs 리포지토리에서 가져올 수 있습니다.

완료된 애플리케이션 실행

애플리케이션을 다시 실행합니다. 해당 동작을 시작 애플리케이션의 동작과 대조합니다. 결과의 첫 번째 페이지는 사용 가능한 즉시 열거됩니다. 각 새 페이지가 요청되고 검색될 때 관찰 가능한 일시 중지가 있으며, 다음 페이지의 결과가 빠르게 열거됩니다. try / catch 취소를 처리하는 데 블록이 필요하지 않습니다. 호출자는 컬렉션 열거를 중지할 수 있습니다. 각 페이지가 다운로드될 때 비동기 스트림이 결과를 생성하기 때문에 진행률이 명확하게 보고됩니다. 반환된 각 항목의 상태는 await foreach 루프에 자연스럽게 포함됩니다. 진행률을 추적하기 위해 콜백 개체가 필요하지 않습니다.

코드를 검사하여 메모리 사용이 개선된 것을 볼 수 있습니다. 모든 결과가 열거되기 전에 더 이상 컬렉션을 할당할 필요가 없습니다. 호출자는 결과를 사용하는 방법과 스토리지 컬렉션이 필요한지 여부를 결정할 수 있습니다.

시작 애플리케이션과 완성된 애플리케이션을 모두 실행하면 구현 간의 차이점을 직접 확인할 수 있습니다. 완료한 후 이 자습서를 시작할 때 만든 GitHub 액세스 토큰을 삭제할 수 있습니다. 공격자가 해당 토큰에 액세스한 경우 자격 증명을 사용하여 GitHub API에 액세스할 수 있습니다.

이 자습서에서는 비동기 스트림을 사용하여 데이터 페이지를 반환하는 네트워크 API에서 개별 항목을 읽었습니다. 비동기 스트림은 주식 시세 또는 센서 디바이스와 같은 "종료되지 않는 스트림"에서 읽을 수도 있습니다. 사용 가능한 즉시 다음 항목을 반환하는 MoveNextAsync 호출입니다.