次の方法で共有


CREATE STREAMING TABLE (Lakeflow 宣言型パイプライン)

ストリーミング テーブルは、ストリーミングまたは増分データ処理をサポートするテーブルです。 ストリーミング テーブルは、Lakeflow 宣言型パイプラインによってサポートされます。 ストリーミング テーブルが更新されるたびに、ソース テーブルに追加されたデータがストリーミング テーブルに追加されます。 ストリーミング テーブルは、手動で、またはスケジュールに従って更新できます。

更新を実行またはスケジュールする方法の詳細については、「 Lakeflow 宣言型パイプラインで更新を実行する」を参照してください。

構文

CREATE [OR REFRESH] [PRIVATE] STREAMING TABLE
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ AS query ]

table_specification
  ( { column_identifier column_type [column_properties] } [, ...]
    [ column_constraint ] [, ...]
    [ , table_constraint ] [...] )

   column_properties
      { NOT NULL | COMMENT column_comment | column_constraint | MASK clause } [ ... ]

table_clauses
  { USING DELTA
    PARTITIONED BY (col [, ...]) |
    CLUSTER BY clause |
    LOCATION path |
    COMMENT view_comment |
    TBLPROPERTIES clause |
    WITH { ROW FILTER clause } } [ ... ]

パラメーター

  • REFRESH

    指定されている場合、テーブルを作成するか、既存のテーブルとその内容を更新します。

  • プライベート

    プライベート ストリーミング テーブルを作成します。

    • これらはカタログに追加されず、定義パイプライン内でのみアクセスできます
    • カタログ内の既存のオブジェクトと同じ名前を持つことができます。 パイプライン内で、プライベート ストリーミング テーブルとカタログ内のオブジェクトの名前が同じである場合、この名前への参照はプライベート ストリーミング テーブルに解決されます。
    • プライベート ストリーミング テーブルは、1 回の更新中ではなく、パイプラインの有効期間全体でのみ保持されます。

    プライベート ストリーミング テーブルは、以前は TEMPORARY パラメーターを使用して作成されました。

  • table_name

    新しく作成されたテーブルの名前。 完全修飾のテーブル名は一意にする必要があります。

  • テーブル仕様

    この省略可能な句で、列、その型、プロパティ、説明、および列制約の一覧を定義します。

    • column_identifier

      列名は一意で、クエリの出力列にマップする必要があります。

    • カラムタイプ (column_type)

      列のデータ型を指定します。 Azure Databricks でサポートされているすべてのデータ型が、ストリーミング テーブルでサポートされているわけではありません。

    • column_comment

      列を記述する任意のSTRING リテラル。 このオプションは、column_type と共に指定する必要があります。 列タイプが指定されていない場合、列コメントはスキップされます。

    • column_constraint

      テーブルにデータが取り込まれるときにデータを検証する制約を追加します。 パイプラインの期待を使用してデータ品質を管理する方法については、を参照してください。

    • MASK 句

      重要

      この機能はパブリック プレビュー段階にあります。

      列マスク関数を追加して、機密データを匿名化します。

      行フィルターと列マスクを使用して機密性の高いテーブル データをフィルター処理する」を参照してください。

  • テーブル制約

    重要

    この機能はパブリック プレビュー段階にあります。

    スキーマを指定する際には、主キーと外部キーを定義できます。 制約は情報提供のみを目的としており、強制されるものではありません。 SQL 言語リファレンスの CONSTRAINT 句 を参照してください。

    テーブル制約を定義するには、パイプラインが Unity Catalog 対応のパイプラインである必要があります。

  • テーブル条項

    必要に応じて、テーブルのパーティション分割、コメント、ユーザー定義プロパティを指定します。 各サブ句は、1 回だけ指定できます。

    • DELTA の使用

      データ形式を指定します。 唯一のオプションは DELTA です。

      この句は省略可能で、既定値は DELTA です。

    • でパーティション分割

      テーブル内のパーティション分割に使用する 1 つ以上の列の省略可能なリスト。 CLUSTER BY と相互に排他的です。

      液体クラスタリングは、クラスタリング用の柔軟で最適化されたソリューションを提供します。 Lakeflow 宣言パイプラインのCLUSTER BYではなく、PARTITIONED BYを使用することを検討してください。

    • CLUSTER BY

      テーブルでリキッド クラスタリングを有効にし、クラスタリング キーとして使用する列を定義します。 PARTITIONED BY と相互に排他的です。

      表に液体クラスタリングを使用するを参照してください。

    • 場所

      テーブル データの省略可能な保存場所。 設定されていない場合、システムは既定でパイプラインの保存場所に設定します。

    • コメント

      テーブルについて説明するオプションの STRING リテラル。

    • TBLPROPERTIES

      テーブルのテーブル プロパティの省略可能なリスト。

    • で ROW FILTER

    重要

    この機能はパブリック プレビュー段階にあります。

    行フィルター関数をテーブルに追加します。 今後そのテーブルのクエリでは、関数が TRUE に評価される行のサブセットを受け取ります。 これは、関数が呼び出し元ユーザーの ID およびグループ メンバーシップを検査して、特定の行をフィルター処理するかどうかを決定できるため、きめ細かいアクセスの制御に役立ちます。

    ROW FILTER の条項を参照してください。

  • クエリ

    この句により、query からデータがテーブルに入力されます。 このクエリはストリーミング クエリにする必要があります。 STREAM キーワードを使用して、ストリーミング セマンティクスを使用してソースから読み取ります。 読み取りで既存のレコードの変更または削除が発生した場合は、エラーがスローされます。 静的ソースまたは追加専用ソースから読み取るのが最も安全です。 変更コミットがあるデータを取り込むには、Python と SkipChangeCommits オプションを使用してエラーを処理できます。

    querytable_specification を一緒に指定するとき、table_specification に指定されているテーブル スキーマに、query から返される列をすべて含める必要があります。含まれていない場合、エラーが出ます。 table_specification で指定されているが、query から返されない列はクエリ時に null 値を返します。

    ストリーミング データの詳細については、「パイプラインを使用してデータを変換する」を参照してください。

必要なアクセス許可

パイプラインの実行時のユーザーには、次のアクセス許可が必要です。

  • ストリーミング テーブルによって参照されるベース テーブルに対する SELECT 権限。
  • 親カタログに対する USE CATALOG 権限と親スキーマに対する USE SCHEMA 権限。
  • ストリーミング テーブルのスキーマに対する CREATE MATERIALIZED VIEW 権限。

ストリーミング テーブルが定義されているパイプラインをユーザーが更新できるようにするには、次が必要です。

  • 親カタログに対する USE CATALOG 権限と親スキーマに対する USE SCHEMA 権限。
  • ストリーミング テーブルの所有権またはストリーミング テーブルに対する REFRESH 権限。
  • ストリーミング テーブルの所有者は、ストリーミング テーブルによって参照されるベース テーブルに対する SELECT 権限を持っている必要があります。

ユーザーが結果のストリーミング テーブルに対してクエリを実行できるようにするには、次が必要です。

  • 親カタログに対する USE CATALOG 権限と親スキーマに対する USE SCHEMA 権限。
  • ストリーミング テーブルに対する SELECT 権限。

制限事項

  • テーブル所有者だけがストリーミング テーブルを更新して最新のデータを取得できます。
  • ALTER TABLE コマンドはストリーミング テーブルでは許可されません。 テーブルの定義とプロパティは、CREATE OR REFRESH または ALTER STREAMING TABLE ステートメントを使用して変更する必要があります。
  • INSERT INTOMERGE などの DML コマンドを利用してテーブル スキーマを導き出すことはできません。
  • 次のコマンドは、ストリーミング テーブルではサポートされていません。
    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • テーブルの名前変更や所有者の変更はサポートされていません。
  • 生成された列、ID 列、既定の列はサポートされていません。

例示

-- Define a streaming table from a volume of files:
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/customers/*", format => "csv")

-- Define a streaming table from a streaming source table:
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)

-- Define a table with a row filter and column mask:
CREATE OR REFRESH STREAMING TABLE customers_silver (
  id int COMMENT 'This is the customer ID',
  name string,
  region string,
  ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(customers_bronze)

-- Define a streaming table that you can add flows into:
CREATE OR REFRESH STREAMING TABLE orders;