次の方法で共有


テーブルに液体クラスタリングを使用する

Liquid クラスタリングは、テーブルのパーティション分割と ZORDER を置き換えて、データ レイアウトの決定を簡素化し、クエリのパフォーマンスを最適化します。 既存のデータを書き換えることなくクラスタリング キーを再定義する柔軟性が提供され、時間の経過と共に分析ニーズと共にデータ レイアウトを進化させることができます。 液体クラスタリングは、ストリーミング テーブルと具体化されたビューの両方に適用されます。

重要

液体クラスタリングは、Delta Lake と Apache Iceberg のパブリック プレビューで一般提供されています。

液体クラスタリングが有効になっているすべての Delta Lake テーブルについて、Databricks では Databricks Runtime 15.2 以降を使用することをお勧めします。 制限付きのパブリック プレビュー のサポートは、Databricks Runtime 13.3 LTS 以降で利用できます。 行レベルのコンカレンシーは Databricks Runtime 13.3 LTS 以降でサポートされており、削除ベクターが有効になっているすべてのテーブルについて Databricks Runtime 14.2 以降で一般提供されています。 Azure Databricks での分離レベルと書き込みの競合に関するページを参照してください。

液体クラスタリングが有効になっているすべての Apache Iceberg テーブルでは、Databricks Runtime 16.4 LTS 以降が必要です。

リキッド クラスタリングの用途

Databricks では、ストリーミング テーブル (ST) とマテリアライズド ビュー (MV) の両方を含むすべての新しいテーブルに対してリキッド クラスタリングを推奨しています。 クラスタリングのメリットが得られるシナリオの例を次に示します。

  • カーディナリティの高い列でフィルター処理されることが多いテーブル。
  • データ分散に大きな偏りがあるテーブル。
  • 急速に大きくなり、メンテナンスとチューニング作業が必要なテーブル。
  • 同時書き込みの要件を持つテーブル。
  • 時間の経過と共に変化するアクセス パターンを持つテーブル。
  • 一般的なパーティション キーでは、パーティションが多すぎるか、少なすぎるテーブルが残ってしまう可能性があるテーブル。

リキッド クラスタリングを有効にする

既存のテーブルで、またはテーブルの作成時に、リキッド クラスタリングを有効にすることができます。 クラスタリングはパーティション分割または ZORDER と互換性がありません。また、Azure Databricks を使用して、テーブル内のデータに対するすべてのレイアウト操作と最適化操作を管理する必要があります。 リキッド クラスタリングが有効になった後、OPTIMIZE ジョブを通常どおり実行して、データを増分でクラスター化します。 「クラスタリングをトリガーする方法」を参照してください。

リキッド クラスタリングを有効にするには、次の例のように、テーブル作成ステートメントに CLUSTER BY フレーズを追加します。

Databricks Runtime 14.2 以降では、Python または Scala で DataFrame API と DeltaTable API を使用して、Delta Lake テーブルの液体クラスタリングを有効にすることができます。

SQL

-- Create an empty Delta table
CREATE TABLE table1(col0 INT, col1 string) CLUSTER BY (col0);

-- Using a CTAS statement
CREATE EXTERNAL TABLE table2 CLUSTER BY (col0)  -- specify clustering after table name, not in subquery
LOCATION 'table_location'
AS SELECT * FROM table1;

-- Using a LIKE statement to copy configurations
CREATE TABLE table3 LIKE table1;

Apache Iceberg の場合、管理された Iceberg テーブルで Liquid Clustering を有効にする場合は、削除ベクターと行 ID を明示的に無効にする必要があります。

Python(プログラミング言語)

# Create an empty Delta table
(DeltaTable.create()
  .tableName("table1")
  .addColumn("col0", dataType = "INT")
  .addColumn("col1", dataType = "STRING")
  .clusterBy("col0")
  .execute())

# Using a CTAS statement
df = spark.read.table("table1")
df.write.clusterBy("col0").saveAsTable("table2")

# CTAS using DataFrameWriterV2
df = spark.read.table("table1")
df.writeTo("table1").using("delta").clusterBy("col0").create()

スカラ (プログラミング言語)

// Create an empty Delta table
DeltaTable.create()
  .tableName("table1")
  .addColumn("col0", dataType = "INT")
  .addColumn("col1", dataType = "STRING")
  .clusterBy("col0")
  .execute()

// Using a CTAS statement
val df = spark.read.table("table1")
df.write.clusterBy("col0").saveAsTable("table2")

// CTAS using DataFrameWriterV2
val df = spark.read.table("table1")
df.writeTo("table1").using("delta").clusterBy("col0").create()

Databricks Runtime 16.0 以降では、次の例のように、構造化ストリーミング書き込みを使用して、液体クラスタリングが有効になっているテーブルを作成できます。

:::

SQL

CREATE TABLE table1 (
  col0 STRING,
  col1 DATE,
  col2 BIGINT
)
CLUSTER BY (col0, col1)
TBLPROPERTIES (
  'clusterByAuto' = 'true'
);

Python(プログラミング言語)

(spark.readStream.table("source_table")
  .writeStream
  .clusterBy("column_name")
  .option("checkpointLocation", checkpointPath)
  .toTable("target_table")
)

スカラ (プログラミング言語)

spark.readStream.table("source_table")
  .writeStream
  .clusterBy("column_name")
  .option("checkpointLocation", checkpointPath)
  .toTable("target_table")

警告

液体クラスタリングを有効にして作成されたデルタ テーブルでは、作成時に多数の Delta テーブル機能が有効になり、Delta ライター バージョン 7 とリーダー バージョン 3 が使用されます。 これらの機能の一部の有効化をオーバーライドできます。 「既定の機能の有効化をオーバーライドする (省略可能)」を参照してください。

テーブル プロトコルのバージョンをダウングレードすることはできません。また、有効になっている Delta リーダー プロトコル テーブルの機能がすべてサポートされていない Delta Lake クライアントでは、クラスタリングが有効になっているテーブルを読み取ることはできません。 Delta Lake の機能の互換性とプロトコルに関する記事を参照してください。

次の構文を使用して、既存のパーティション分割されていない Delta テーブルで液体クラスタリングを有効にします。

-- Alter an existing table
ALTER TABLE <table_name>
CLUSTER BY (<clustering_columns>)

Apache Iceberg の場合、既存のマネージド Iceberg テーブルで液体クラスタリングを有効にする場合は、削除ベクターと行 ID を明示的に無効にする必要があります。

重要

既定の動作では、以前に書き込まれたデータにクラスタリングは適用されません。 すべてのレコードに対して再クラスター化を強制するには、OPTIMIZE FULLを使用する必要があります。 「すべてのレコードに対して再クラスター化を強制する」をご覧ください。

クラスタリング キーを削除するには、次の構文を使用します。

ALTER TABLE table_name CLUSTER BY NONE;

自動液体クラスタリング

Databricks Runtime 15.4 LTS 以降では、Unity カタログマネージド Delta テーブルに対して自動液体クラスタリングを有効にすることができます。 自動液体クラスタリングを有効にすると、Azure Databricks では、クエリのパフォーマンスを最適化するためにクラスタリング キーがインテリジェントに選択されます。 CLUSTER BY AUTO句を使用して、自動液体クラスタリングを有効にします。

有効にすると、キーの自動選択操作とクラスタリング操作がメンテナンス操作として非同期的に実行され、テーブルに対して予測最適化が有効になっている必要があります。 「Unity Catalog 管理テーブルの予測最適化」を参照してください。

クラスタリング キーを識別するために、Azure Databricks はテーブルの履歴クエリ ワークロードを分析し、最適な候補列を識別します。 クラスタリング キーは、データ スキップによるコスト削減の予測がデータ クラスタリング コストよりも高い場合に変更されます。

データのクエリ方法が時間の経過と同時に変化する場合、またはクエリのパフォーマンスがデータ分布の変化を示している場合、自動液体クラスタリングによって新しいキーが選択され、パフォーマンスが最適化されます。

自動液体クラスタリングによってキーが選択されていない場合、その理由は次のようになります。

  • テーブルが小さすぎて、液体クラスタリングの恩恵を受け得ない。
  • テーブルには、適切なクラスタリング スキームが既に用意されています。 たとえば、以前に 1 回適用された適切なキーがある場合や、指定されたクエリ パターンに対して挿入順序が既に適切に実行されている (時系列で挿入され、タイムスタンプでクエリされるデータなど) などです。
  • このテーブルには頻繁に行われるクエリは存在しません。
  • Databricks Runtime 15.4 LTS 以降を使用していません。

データとクエリの特性に関係なく、すべての Unity カタログマネージド テーブルに自動液体クラスタリングを適用できます。 これらの機能により、データの使用パターンに基づいてデータ レイアウトがインテリジェントに最適化され、ヒューリスティックによってクラスタリング キーを選択するコストが高いかどうかを判断できます。

自動クラスタリングが有効になっているテーブルは、液体クラスタリングをサポートするすべての Databricks Runtime バージョンから読み取りまたは書き込みが可能です。 ただし、インテリジェント キーの選択は、Databricks Runtime 15.4 LTS で導入されたメタデータに依存します。 Databricks Runtime 15.4 LTS 以降を使用して、自動的に選択されたキーがすべてのワークロードに役立ち、新しいキーを選択するときにこれらのワークロードが考慮されるようにします。

自動液体クラスタリングを有効または無効にする

新規または既存のテーブルで自動液体クラスタリングを有効または無効にするには、次の構文を使用します。

SQL

-- Create an empty table.
CREATE OR REPLACE TABLE table1(column01 int, column02 string) CLUSTER BY AUTO;

-- Enable automatic liquid clustering on an existing table,
-- including tables that previously had manually specified keys.
ALTER TABLE table1 CLUSTER BY AUTO;

-- Disable automatic liquid clustering on an existing table.
ALTER TABLE table1 CLUSTER BY NONE;

-- Disable automatic liquid clustering by setting the clustering keys
-- to chosen clustering columns or new columns.
ALTER TABLE table1 CLUSTER BY (column01, column02);

CREATE OR REPLACE table_nameを指定せずにCLUSTER BY AUTOを実行し、テーブルが既に存在し、自動液体クラスタリングが有効になっている場合、テーブルのAUTO設定とクラスタリング列 (適用されている場合) は、テーブルが置き換えられるときに保持されます。 予測最適化では、このテーブルのクエリ の履歴ワークロードも維持され、最適なクラスタリング キーが識別されます。

Python(プログラミング言語)

df = spark.read.table("table1")
df.write
  .format("delta")
  .option(“clusterByAuto”, “true”)
  .saveAsTable(...)

# To set clustering columns and auto, which serves as a way to give a hint
# for the initial selection.
df.write
  .format("delta")
  .clusterBy("clusteringColumn1", "clusteringColumn2")
  .option(“clusterByAuto”, “true”)
  .saveAsTable(...)

# Using DataFrameWriterV2
df.writeTo(...).using("delta")
  .option(“clusterByAuto”, “true”)
  .create()

# To set clustering columns and auto, which serves as a way to give a hint
# for the initial selection.
df.writeTo(...).using("delta")
  .clusterBy("clusteringColumn1", "clusteringColumn2")
  .option(“clusterByAuto”, “true”)
  .create()

# Similar syntax can also be used to set clusterByAuto for streaming tables.
spark.readStream.table("source_table")
  .writeStream
  .option("clusterByAuto", "true")
  .option("checkpointLocation", checkpointPath)
  .toTable("target_table")

# Or to specify a hint for the clustering columns by specifying both auto and columns together
spark.readStream.table("source_table")
  .writeStream
 .clusterBy("column1", "column2")
  .option("clusterByAuto", "true")
  .option("checkpointLocation", checkpointPath)
  .toTable("target_table")

Python API は、Databricks Runtime 16.4 以降で使用できます。

.clusterBy.option('clusterByAuto', 'true)と共に使用する場合は、次のようにします。

  • これが初めて自動液体クラスタリングを設定する場合、常に手動入力が考慮され、 .clusterByでクラスタリング列が設定されます。
  • これが既に自動液体クラスタリングを含むテーブルである場合は、 .clusterBy を使用するヒントを 1 回受け入れることもできます。 たとえば、.clusterByで指定された列は、テーブルにクラスタリング列がすでに設定されていない場合にのみ設定されます。これは、ユーザーによる設定または自動化された液体クラスタリング方式による設定を含みます。

Python は、テーブルを作成または置換する場合にのみ使用できます。 SQL を使用して、既存のテーブルの clusterByAuto 状態を変更します。

自動クラスタリングが有効になっているかどうかを確認する

テーブルで自動液体クラスタリングが有効になっているかどうかを確認するには、 DESCRIBE TABLE または SHOW TBLPROPERTIESを使用します。

自動液体クラスタリングが有効になっている場合、 clusterByAuto プロパティは true に設定されます。 clusteringColumns プロパティには、自動的または手動で選択された現在のクラスタリング列が表示されます。

制限事項

Apache Iceberg では、自動液体クラスタリングを使用できません。

既定の機能の有効化をオーバーライドする (省略可能)

リキッド クラスタリングの有効化中に Delta テーブル機能を有効にする既定の動作をオーバーライドできます。 これにより、これらのテーブル機能に関連付けられているリーダー プロトコルとライター プロトコルがアップグレードされなくなります。 次の手順を実行するには、既存のテーブルが必要です。

  1. ALTER TABLE を使用して、1 つ以上の機能を無効にするテーブル プロパティを設定します。 たとえば、削除ベクトルを無効にするには、次を実行します。

    ALTER TABLE table_name SET TBLPROPERTIES ('delta.enableDeletionVectors' = false);
    
  2. 次を実行して、テーブルでリキッド クラスタリングを有効にします。

    ALTER TABLE <table_name>
    CLUSTER BY (<clustering_columns>)
    

次の表に、オーバーライドできる Delta 機能と、有効化が Databricks Runtime バージョンとの互換性に与える影響に関する情報を示します。

Delta 機能 ランタイムの互換性 有効化をオーバーライドするプロパティ リキッド クラスタリングに対する無効化の影響
削除ベクトル 読み取りと書き込みには、Databricks Runtime 12.2 LTS 以降が必要です。 'delta.enableDeletionVectors' = false 行レベルのコンカレンシーが無効になっているため、トランザクションとクラスタリング操作が競合する可能性が高くなります。 「行レベルのコンカレンシーでの書き込みの競合」を参照してください。
DELETEMERGE、および UPDATE コマンドの実行速度が低下する場合があります。
行追跡 書き込みには、Databricks Runtime 13.3 LTS 以降が必要です。 任意の Databricks Runtime バージョンから読み取ることができます。 'delta.enableRowTracking' = false 行レベルのコンカレンシーが無効になっているため、トランザクションとクラスタリング操作が競合する可能性が高くなります。 「行レベルのコンカレンシーでの書き込みの競合」を参照してください。
チェックポイント V2 読み取りと書き込みには、Databricks Runtime 13.3 LTS 以降が必要です。 'delta.checkpointPolicy' = 'classic' リキッド クラスタリングの動作には影響ありません。

クラスタリング キーを選択する

Databricks では、サポートされているテーブルに対して自動液体クラスタリングが推奨されます。 自動液体クラスタリングを参照してください。

Databricks では、クエリ フィルターで最もよく使用される列に基づいてクラスタリング キーを選択することをお勧めします。 クラスタリング キーは任意の順序で定義できます。 2 つの列が高度に相関している場合は、そのうちの 1 つのみをクラスタリング キーとして含める必要があります。

最大 4 つのクラスタリング キーを指定できます。 小さいテーブル (10 TB 未満) の場合、たとえば 4 つのクラスターキーを使用することで、1 つの列でフィルター処理を行う際に、2 つのクラスターキーを使用する場合と比較してパフォーマンスが低下する可能性があります。 ただし、テーブル サイズが大きくなると、単一列クエリに対してクラスタリング キーを使用する場合のパフォーマンスの違いはごくわずかになります。

クラスタリング キーとして収集された統計がある列のみを指定できます。 既定では、Delta テーブルの最初の 32 列で統計が収集されます。 「Delta 統計の列を指定する」を参照してください。

クラスタリングでは、クラスタリング キーについて次のデータ型がサポートされています。

  • タイムスタンプ
  • TimestampNTZ (Databricks Runtime 14.3 LTS 以降が必要です)
  • 整数
  • 長い
  • 短い
  • 浮く
  • ダブル
  • 10 進法
  • バイト

既存のテーブルを変換する場合は、次の推奨事項を検討してください。

現在のデータ最適化手法 クラスタリング キーの推奨事項
Hive スタイルのパーティション分割 パーティション列をクラスタリング キーとして使用します。
Z オーダーのインデックス作成 ZORDER BY 列をクラスタリング キーとして使用します。
Hive スタイルのパーティション分割と Z オーダー パーティション列と ZORDER BY 列の両方をクラスタリング キーとして使用します。
カーディナリティを減らすために生成された列 (タイムスタンプの日付など) 元の列をクラスタリング キーとして使用し、生成された列を作成しないでください。

クラスター化されたテーブルにデータを書き込む

クラスター化された Delta テーブルに書き込むには、液体クラスタリングで使用されるすべての Delta 書き込みプロトコル テーブル機能をサポートする Delta ライター クライアントを使用する必要があります。 クラスター化された Iceberg テーブルに書き込むには、Unity カタログの Iceberg REST Catalog API を使用できます。 Azure Databricks では、Databricks Runtime 13.3 LTS 以降を使用する必要があります。

書き込み時にクラスター化される操作は次のとおりです。

  • INSERT INTO 操作
  • CTASRTAS のステートメント
  • Parquet 形式からの COPY INTO
  • spark.write.mode("append")

構造化ストリーミング書き込みでは、書き込み時にクラスタリングがトリガーされることはありません。 その他にも適用される制限があります。 「制限事項」を参照してください。

書き込み時のクラスタリングがトリガーされるのは、トランザクション内のデータがサイズのしきい値を超えた場合だけです。 これらのしきい値はクラスタリング列の数によって異なり、Unity Catalog マネージド テーブルにおいてはその他の Delta テーブルにおいてよりも低くなります。

クラスタリング列の数 Unity Catalog マネージド テーブルのしきい値サイズ その他の Delta テーブルのしきい値サイズ
1 64 MB 256 MB
2 256 MB 1 GB
3 512 MB 2 GB
4 1 GB 4 GB

すべての操作にリキッド クラスタリングが適用されるわけではないため、Databricks では、すべてのデータが効率的にクラスター化されるように、OPTIMIZE を頻繁に実行することをお勧めします。

クラスタリングをトリガーする方法

予測最適化では、有効なテーブルに対して OPTIMIZE コマンドが自動的に実行されます。 「Unity Catalog 管理テーブルの予測最適化」を参照してください。 予測最適化を使用する場合、Databricks では、スケジュールされた OPTIMIZE ジョブを無効にすることをお勧めします。

クラスタリングをトリガーするには、Databricks Runtime 13.3 LTS 以降を使用する必要があります。 テーブルで OPTIMIZE コマンドを使用します。

OPTIMIZE table_name;

リキッド クラスタリングは増分です。つまり、クラスター化する必要があるデータに対応するために、必要に応じてデータのみが書き換えられます。 クラスター化するデータと一致しないクラスタリング キーを持つデータ ファイルは書き換えされません。

予測最適化を使用していない場合、Databricks では、クラスター データに対して通常の OPTIMIZE ジョブをスケジュールすることをお勧めします。 多数の更新または挿入が発生しているテーブルの場合、Databricks では、1 時間または 2 時間ごとに OPTIMIZE ジョブをスケジュールすることをお勧めしています。 リキッド クラスタリングは増分であるため、クラスター化されたテーブルのほとんどの OPTIMIZE ジョブは迅速に実行されます。

すべてのレコードの再クラスター化を強制する

Databricks Runtime 16.0 以降では、次の構文を使用して、テーブル内のすべてのレコードを強制的に再クラスター化できます。

OPTIMIZE table_name FULL;

重要

OPTIMIZE FULL 実行すると、必要に応じて既存のすべてのデータが再クラスター化されます。 指定したキーでクラスター化されていない大きなテーブルの場合、この操作には数時間かかることがあります。

クラスタリングを初めて有効にしたとき、またはクラスタリング キーを変更するときに、OPTIMIZE FULL を実行します。 以前に OPTIMIZE FULL を実行していて、クラスタリング キーに変更がない場合、OPTIMIZE FULLOPTIMIZEと同じように実行されます。 このシナリオでは、 OPTIMIZE は増分アプローチを使用し、以前に圧縮されていないファイルのみを書き換える。 データ レイアウトに現在のクラスタリング キーが反映されるようにするには、常に OPTIMIZE FULL を使用します。

クラスター化されたテーブルからデータを読み取る

削除ベクトルの読み取りをサポートする Delta Lake クライアントを使用して、クラスター化された Delta テーブル内のデータを読み取ることができます。 Iceberg REST Catalog API を使用すると、クラスター化された Iceberg テーブル内のデータを読み取ることができます。

SELECT * FROM table_name WHERE cluster_key_column_name = "some_value";

クラスタリング キーを変更する

テーブルのクラスタリング キーは、次の例のように、ALTER TABLE コマンドを実行することでいつでも変更できます。

ALTER TABLE table_name CLUSTER BY (new_column1, new_column2);

クラスタリング キーを変更すると、後続の OPTIMIZE 操作と書き込み操作では新しいクラスタリング アプローチが使用されますが、既存のデータは書き換えられません。

次の例のように、キーを NONE に設定してクラスタリングを無効にすることもできます。

ALTER TABLE table_name CLUSTER BY NONE;

クラスター キーを NONE に設定しても、既にクラスター化されているデータは書き換えられませんが、今後の OPTIMIZE 操作ではクラスタリング キーを使用できなくなります。

外部エンジンからの液体クラスタリングを利用する

外部 Iceberg エンジンから管理された Iceberg テーブルで液体クラスタリングを有効にすることができます。 液体クラスタリングを有効にするには、テーブルの作成時にパーティション列を指定します。 Unity カタログでは、パーティションがクラスタリング キーとして解釈されます。 たとえば、OSS Spark で次のコマンドを実行します。

CREATE OR REPLACE TABLE main.schema.icebergTable
PARTITIONED BY c1;

液体クラスタリングを無効にできます。

ALTER TABLE main.schema.icebergTable DROP PARTITION FIELD c2;

Iceberg パーティションの進化を使用してクラスタリング キーを変更できます。

ALTER TABLE main.schema.icebergTable ADD PARTITION FIELD c2;

バケット変換を使用してパーティションを指定すると、Unity Catalog によって式が削除され、その列がクラスタリング キーとして使用されます。

CREATE OR REPLACE TABLE main.schema.icebergTable
PARTITIONED BY (bucket(c1, 10));

テーブルがクラスター化される方法を確認する

次の例のように、DESCRIBE コマンドを使用して、テーブルのクラスタリング キーを確認できます。

DESCRIBE TABLE table_name;

DESCRIBE DETAIL table_name;

リキッド クラスタリングありのテーブルの互換性

Databricks Runtime 14.1 以降でリキッド クラスタリングありで作成されたテーブルでは、既定で v2 チェックポイントが使用されます。 Databricks Runtime 13.3 LTS 以降では、v2 チェックポイントを使用してテーブルを読み書きできます。

Databricks Runtime 12.2 LTS 以降では、v2 チェックポイントを無効にし、テーブル プロトコルをダウングレードすることでリキッド クラスタリングありのテーブルを読み取ることができます。 Delta Lake テーブル機能の削除とテーブル プロトコルのダウングレードに関する記事を参照してください。

制限事項

次の制限があります。

  • Databricks Runtime 15.1 以下では、書き込み時のクラスタリングは、フィルター、結合、集計を含むソース クエリをサポートしていません。
  • 構造化ストリーミング ワークロードでは、書き込み時のクラスタリングはサポートされていません。
  • Databricks Runtime 15.4 LTS 以下では、構造化ストリーミング書き込みを使用して液体クラスタリングが有効になっているテーブルを作成することはできません。 構造化ストリーミングを使用して、リキッド クラスタリングが有効になっている既存のテーブルにデータを書き込むことはできます。
  • Iceberg テーブルでは削除ベクトルと行追跡がサポートされていないため、行レベルのコンカレンシーは、マネージド Iceberg テーブルではサポートされていません。