ストリーミング テーブルは、ストリーミングまたは増分データ処理をサポートするテーブルです。 ストリーミング テーブルは、Lakeflow 宣言型パイプラインによってサポートされます。 ストリーミング テーブルが更新されるたびに、ソース テーブルに追加されたデータがストリーミング テーブルに追加されます。 ストリーミング テーブルは、手動で、またはスケジュールに従って更新できます。
更新を実行またはスケジュールする方法の詳細については、「 Lakeflow 宣言型パイプラインで更新を実行する」を参照してください。
構文
CREATE [OR REFRESH] [PRIVATE] STREAMING TABLE
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( { column_identifier column_type [column_properties] } [, ...]
[ column_constraint ] [, ...]
[ , table_constraint ] [...] )
column_properties
{ NOT NULL | COMMENT column_comment | column_constraint | MASK clause } [ ... ]
table_clauses
{ USING DELTA
PARTITIONED BY (col [, ...]) |
CLUSTER BY clause |
LOCATION path |
COMMENT view_comment |
TBLPROPERTIES clause |
WITH { ROW FILTER clause } } [ ... ]
パラメーター
REFRESH
指定されている場合、テーブルを作成するか、既存のテーブルとその内容を更新します。
プライベート
プライベート ストリーミング テーブルを作成します。
- これらはカタログに追加されず、定義パイプライン内でのみアクセスできます
- カタログ内の既存のオブジェクトと同じ名前を持つことができます。 パイプライン内で、プライベート ストリーミング テーブルとカタログ内のオブジェクトの名前が同じである場合、この名前への参照はプライベート ストリーミング テーブルに解決されます。
- プライベート ストリーミング テーブルは、1 回の更新中ではなく、パイプラインの有効期間全体でのみ保持されます。
プライベート ストリーミング テーブルは、以前は
TEMPORARY
パラメーターを使用して作成されました。table_name
新しく作成されたテーブルの名前。 完全修飾のテーブル名は一意にする必要があります。
テーブル仕様
この省略可能な句で、列、その型、プロパティ、説明、および列制約の一覧を定義します。
-
列名は一意で、クエリの出力列にマップする必要があります。
-
列のデータ型を指定します。 Azure Databricks でサポートされているすべてのデータ型が、ストリーミング テーブルでサポートされているわけではありません。
column_comment
列を記述する任意の
STRING
リテラル。 このオプションは、column_type
と共に指定する必要があります。 列タイプが指定されていない場合、列コメントはスキップされます。-
テーブルにデータが取り込まれるときにデータを検証する制約を追加します。 パイプラインの期待を使用してデータ品質を管理する方法については、を参照してください。
-
重要
この機能はパブリック プレビュー段階にあります。
列マスク関数を追加して、機密データを匿名化します。
「行フィルターと列マスクを使用して機密性の高いテーブル データをフィルター処理する」を参照してください。
-
テーブル制約
重要
この機能はパブリック プレビュー段階にあります。
スキーマを指定する際には、主キーと外部キーを定義できます。 制約は情報提供のみを目的としており、強制されるものではありません。 SQL 言語リファレンスの CONSTRAINT 句 を参照してください。
注
テーブル制約を定義するには、パイプラインが Unity Catalog 対応のパイプラインである必要があります。
テーブル条項
必要に応じて、テーブルのパーティション分割、コメント、ユーザー定義プロパティを指定します。 各サブ句は、1 回だけ指定できます。
DELTA の使用
データ形式を指定します。 唯一のオプションは DELTA です。
この句は省略可能で、既定値は DELTA です。
でパーティション分割
テーブル内のパーティション分割に使用する 1 つ以上の列の省略可能なリスト。
CLUSTER BY
と相互に排他的です。液体クラスタリングは、クラスタリング用の柔軟で最適化されたソリューションを提供します。 Lakeflow 宣言パイプラインの
CLUSTER BY
ではなく、PARTITIONED BY
を使用することを検討してください。CLUSTER BY
テーブルでリキッド クラスタリングを有効にし、クラスタリング キーとして使用する列を定義します。
PARTITIONED BY
と相互に排他的です。表に液体クラスタリングを使用するを参照してください。
場所
テーブル データの省略可能な保存場所。 設定されていない場合、システムは既定でパイプラインの保存場所に設定します。
コメント
テーブルについて説明するオプションの
STRING
リテラル。TBLPROPERTIES
テーブルのテーブル プロパティの省略可能なリスト。
で ROW FILTER
重要
この機能はパブリック プレビュー段階にあります。
行フィルター関数をテーブルに追加します。 今後そのテーブルのクエリでは、関数が TRUE に評価される行のサブセットを受け取ります。 これは、関数が呼び出し元ユーザーの ID およびグループ メンバーシップを検査して、特定の行をフィルター処理するかどうかを決定できるため、きめ細かいアクセスの制御に役立ちます。
ROW FILTER
の条項を参照してください。-
この句により、
query
からデータがテーブルに入力されます。 このクエリはストリーミング クエリにする必要があります。 STREAM キーワードを使用して、ストリーミング セマンティクスを使用してソースから読み取ります。 読み取りで既存のレコードの変更または削除が発生した場合は、エラーがスローされます。 静的ソースまたは追加専用ソースから読み取るのが最も安全です。 変更コミットがあるデータを取り込むには、Python とSkipChangeCommits
オプションを使用してエラーを処理できます。query
とtable_specification
を一緒に指定するとき、table_specification
に指定されているテーブル スキーマに、query
から返される列をすべて含める必要があります。含まれていない場合、エラーが出ます。table_specification
で指定されているが、query
から返されない列はクエリ時にnull
値を返します。ストリーミング データの詳細については、「パイプラインを使用してデータを変換する」を参照してください。
必要なアクセス許可
パイプラインの実行時のユーザーには、次のアクセス許可が必要です。
- ストリーミング テーブルによって参照されるベース テーブルに対する
SELECT
権限。 - 親カタログに対する
USE CATALOG
権限と親スキーマに対するUSE SCHEMA
権限。 - ストリーミング テーブルのスキーマに対する
CREATE MATERIALIZED VIEW
権限。
ストリーミング テーブルが定義されているパイプラインをユーザーが更新できるようにするには、次が必要です。
- 親カタログに対する
USE CATALOG
権限と親スキーマに対するUSE SCHEMA
権限。 - ストリーミング テーブルの所有権またはストリーミング テーブルに対する
REFRESH
権限。 - ストリーミング テーブルの所有者は、ストリーミング テーブルによって参照されるベース テーブルに対する
SELECT
権限を持っている必要があります。
ユーザーが結果のストリーミング テーブルに対してクエリを実行できるようにするには、次が必要です。
- 親カタログに対する
USE CATALOG
権限と親スキーマに対するUSE SCHEMA
権限。 - ストリーミング テーブルに対する
SELECT
権限。
制限事項
- テーブル所有者だけがストリーミング テーブルを更新して最新のデータを取得できます。
-
ALTER TABLE
コマンドはストリーミング テーブルでは許可されません。 テーブルの定義とプロパティは、CREATE OR REFRESH
または ALTER STREAMING TABLE ステートメントを使用して変更する必要があります。 -
INSERT INTO
やMERGE
などの DML コマンドを利用してテーブル スキーマを導き出すことはできません。 - 次のコマンドは、ストリーミング テーブルではサポートされていません。
CREATE TABLE ... CLONE <streaming_table>
COPY INTO
ANALYZE TABLE
RESTORE
TRUNCATE
GENERATE MANIFEST
[CREATE OR] REPLACE TABLE
- テーブルの名前変更や所有者の変更はサポートされていません。
- 生成された列、ID 列、既定の列はサポートされていません。
例示
-- Define a streaming table from a volume of files:
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/customers/*", format => "csv")
-- Define a streaming table from a streaming source table:
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)
-- Define a table with a row filter and column mask:
CREATE OR REFRESH STREAMING TABLE customers_silver (
id int COMMENT 'This is the customer ID',
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(customers_bronze)
-- Define a streaming table that you can add flows into:
CREATE OR REFRESH STREAMING TABLE orders;