次の方法で共有


Azure Functions 用 RedisStreamTrigger

RedisStreamTrigger はストリームから新規エントリを読み取り、それらの要素を関数に提示します。

関数トリガーの可用性のスコープ

トリガーの種類 Azure Managed Redis Azure Cache for Redis (アジュール・キャッシュ・フォー・レディス)
ストリーム イエス イエス

重要

Azure Managed Redis または Azure Cache for Redis の Enterprise レベルを使用する場合は、ポート 6380 または 6379 ではなくポート 10000 を使用します。

重要

現在1、Redis トリガーは、従量課金プランで実行されている関数ではサポートされていません。

重要

Functions の Node.js v4 モデルは、Azure Cache for Redis 拡張機能ではまだサポートされていません。 v4 モデルの動作の詳細については、Azure Functions Node.js 開発者ガイドを参照してください。 v3 と v4 の違いの詳細については、移行ガイドを参照してください。

重要

Functions 用 Python v2 モデルは、Azure Cache for Redis 拡張機能ではまだサポートされていません。 v2 モデルの動作の詳細については、Azure Functions Node.js 開発者ガイドを参照してください。

重要

.NET 関数の場合は、"インプロセス" モデルより、"分離ワーカー モデル" を使うことをお勧めします。 in-processisolated worker モデルの比較については、azure Functions の .NET の isolated worker モデルと in-process モデルの違いを参照してください。

実行モデル 説明
分離ワーカー モデル 関数コードは、別の .NET ワーカー プロセスで実行されます。 .NET と .NET Framework のサポートされているバージョンで使います。 詳細については、 分離ワーカー モデルで C# Azure Functions を実行するためのガイドを参照してください。
インプロセス モデル 関数コードは、Functions ホスト プロセスと同じプロセスで実行されます。 .NET の長期サポート (LTS) バージョンのみをサポートします。 詳細については、「Azure Functions を使用する C# クラス ライブラリ関数を開発する」を参照してください。
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.Functions.Worker.Extensions.Redis.Samples.RedisStreamTrigger
{
    internal class SimpleStreamTrigger
    {
        private readonly ILogger<SimpleStreamTrigger> logger;

        public SimpleStreamTrigger(ILogger<SimpleStreamTrigger> logger)
        {
            this.logger = logger;
        }

        [Function(nameof(SimpleStreamTrigger))]
        public void Run(
            [RedisStreamTrigger(Common.connectionStringSetting, "streamKey")] string entry)
        {
            logger.LogInformation(entry);
        }
    }
}

package com.function.RedisStreamTrigger;

import com.microsoft.azure.functions.*;
import com.microsoft.azure.functions.annotation.*;
import com.microsoft.azure.functions.redis.annotation.*;

public class SimpleStreamTrigger {
    @FunctionName("SimpleStreamTrigger")
    public void run(
            @RedisStreamTrigger(
                name = "req",
                connection = "redisConnectionString",
                key = "streamTest",
                pollingIntervalInMs = 1000,
                maxBatchSize = 1)
                String message,
            final ExecutionContext context) {
            context.getLogger().info(message);
    }
}

このサンプルでは、同じ index.js ファイルと、function.json ファイル内のバインディング データを使用します。

index.js ファイルは次のとおりです。

module.exports = async function (context, entry) {
    context.log(entry);
}

function.json のバインディング データは次のとおりです。

{
  "bindings": [
    {
      "type": "redisStreamTrigger",
      "connection": "redisConnectionString",
      "key": "streamTest",
      "pollingIntervalInMs": 1000,
      "maxBatchSize": 16,
      "name": "entry",
      "direction": "in"
    }
  ],
  "scriptFile": "index.js"
}

このサンプルでは、同じ run.ps1 ファイルと、function.json ファイル内のバインディング データを使用します。

run.ps1 ファイルは次のとおりです。

param($entry, $TriggerMetadata)
Write-Host ($entry | ConvertTo-Json)

function.json のバインディング データは次のとおりです。

{
  "bindings": [
    {
      "type": "redisStreamTrigger",
      "connection": "redisConnectionString",
      "key": "streamTest",
      "pollingIntervalInMs": 1000,
      "maxBatchSize": 16,
      "name": "entry",
      "direction": "in"
    }
  ],
  "scriptFile": "run.ps1"
}

Python v1 プログラミング モデルでは、関数フォルダー内の個別の function.json ファイルでバインドを定義する必要があります。 詳細については、「Python 開発者ガイド」を参照してください。

このサンプルでは、同じ __init__.py ファイルと、function.json ファイル内のバインディング データを使用します。

__init__.py ファイルは次のとおりです。

import logging

def main(entry: str):
    logging.info(entry)

function.json のバインディング データは次のとおりです。

{
  "bindings": [
    {
      "type": "redisStreamTrigger",
      "connection": "redisConnectionString",
      "key": "streamTest",
      "pollingIntervalInMs": 1000,
      "maxBatchSize": 16,
      "name": "entry",
      "direction": "in"
    }
  ],
  "scriptFile": "__init__.py"
}

属性

パラメーター 説明 必要 既定値
Connection キャッシュ 接続文字列を含むアプリケーション設定の名前。次に例を示します。<cacheName>.redis.cache.windows.net:6380,password... イエス
Key 読み取り元のキー。 イエス
PollingIntervalInMs Redis サーバーをポーリングする頻度 (ミリ秒単位)。 省略可能 1000
MessagesPerWorker 各関数のワーカーが処理する必要のあるメッセージの数。 関数がスケーリングする必要のあるワーカーの数を決めるために使われます。 省略可能 100
Count Redis から一度にプルする要素の数。 省略可能 10
DeleteAfterProcess 関数が処理後にストリーム エントリを削除するかどうかを示します。 省略可能 false

注釈

パラメーター 説明 必要 既定値
name entry イエス
connection キャッシュ 接続文字列を含むアプリケーション設定の名前。次に例を示します。<cacheName>.redis.cache.windows.net:6380,password... イエス
key 読み取り元のキー。 イエス
pollingIntervalInMs Redis をポーリングする頻度 (ミリ秒単位)。 省略可能 1000
messagesPerWorker 各関数のワーカーが処理する必要のあるメッセージの数。 関数をスケーリングするワーカーの数を決定するために使用されます。 省略可能 100
count Redis から一度に読み取るエントリの数。 エントリは並列で処理されます。 省略可能 10
deleteAfterProcess 関数の実行後にストリーム エントリを削除するかどうか。 省略可能 false

構成

次の表は、function.json ファイルで設定したバインド構成のプロパティを説明しています。

function.json のプロパティ 説明 必要 既定値
type イエス
deleteAfterProcess 省略可能 false
connection キャッシュ 接続文字列を含むアプリケーション設定の名前。次に例を示します。<cacheName>.redis.cache.windows.net:6380,password... イエス
key 読み取り元のキー。 イエス
pollingIntervalInMs Redis をポーリングする頻度 (ミリ秒単位)。 省略可能 1000
messagesPerWorker (省略可能) 各関数のワーカーが処理する必要のあるメッセージの数。 関数がスケーリングする必要のあるワーカーの数を決めるために使用されます。 省略可能 100
count Redis から一度に読み取るエントリの数。 これらは並列で処理されます。 省略可能 10
name イエス
direction イエス

完全な例については、セクションの例を参照してください。

使用方法

RedisStreamTrigger Azure 関数は、ストリームから新規エントリを読み取り、それらのエントリを関数に提示します。

トリガーは、構成可能な固定間隔で Redis をポーリングし、XREADGROUP を使ってストリームから要素を読み取ります。

関数のすべてのインスタンスのコンシューマー グループは、関数の名前、つまり、SimpleStreamTriggerです。

各関数インスタンスは、 WEBSITE_INSTANCE_ID を使用するか、グループ内のコンシューマー名として使用するランダム GUID を生成して、関数のスケールアウトされたインスタンスがストリームから同じメッセージを読み取らないようにします。

説明
byte[] チャネルからのメッセージ。
string チャネルからのメッセージ。
Custom トリガーは、Json.NET シリアル化を使用して、チャネルからのメッセージを string からカスタム型にマップします。