SQL を使用してデータを変換する
データフレーム構造を提供する SparkSQL ライブラリでは、データを操作する方法として SQL を使用することもできます。 この方法では、SQL クエリを使用してデータフレーム内のデータのクエリと変換を行い、結果をテーブルとして保持できます。
注
テーブルは、ファイルに対するメタデータの抽象化です。 データはリレーショナル テーブルに格納されませんが、テーブルはデータ レイク内のファイルに対してリレーショナル レイヤーを提供します。
テーブルとビューを定義する
Spark のテーブル定義は、ファイルに対するリレーショナル抽象化をカプセル化するメタデータ レイヤーである メタストアに格納されます。 外部 テーブルは、指定したデータ レイクの場所にあるファイルを参照するメタストア内のリレーショナル テーブルです。 このデータにアクセスする場合は、テーブルに対してクエリを実行するか、データ レイクから直接ファイルを読み取ります。
注
外部テーブルは基になるファイルに「疎バインド」されているため、テーブルを削除してもファイルは削除されません。 これにより、Spark を使用して変換の負荷を高め、データをレイクに保持することができます。 これが完了したら、テーブルを削除し、ダウンストリーム プロセスはこれらの最適化された構造にアクセスできます。 また、基になるデータ ファイルがメタストアに関連付けられた内部管理ストレージの場所に格納される、マネージド テーブルを定義することもできます。 マネージド テーブルはファイルに "密バインド" され、マネージド テーブルを削除すると、関連付けられているファイルが削除されます。
次のコード例では、(CSV ファイルから読み込まれた) データフレームを外部テーブル名として sales_orders保存します。 ファイルは、データ レイクの /sales_orders_table フォルダーに格納されます。
order_details.write.saveAsTable('sales_orders', format='parquet', mode='overwrite', path='/sales_orders_table')
SQL を使用してデータのクエリと変換を行う
テーブルを定義したら、SQL を使用してデータのクエリと変換を行うことができます。 次のコードでは、Year と Month という名前の 2 つの新しい派生列を作成し、新しい派生列が追加された新しいテーブル transformed_orders を作成します。
# Create derived columns
sql_transform = spark.sql("SELECT *, YEAR(OrderDate) AS Year, MONTH(OrderDate) AS Month FROM sales_orders")
# Save the results
sql_transform.write.partitionBy("Year","Month").saveAsTable('transformed_orders', format='parquet', mode='overwrite', path='/transformed_orders_table')
新しいテーブルのデータ ファイルは、Year=*NNNN* / Month=*N*形式のフォルダーの階層に格納され、各フォルダーには、年および月別の対応する注文の Parquet ファイルが含まれています。
メタストアのクエリを実行する
この新しいテーブルはメタストアで作成されているため、SQL を使用して最初の行の %%sql マジック キーを使用して直接クエリを実行し、次のスクリプトに示すように SQL 構文が使用されることを示すことができます。
%%sql
SELECT * FROM transformed_orders
WHERE Year = 2021
AND Month = 1
テーブルを削除する
外部テーブルを操作する場合は、DROP
コマンドを使用して、データ レイク内のファイルに影響を与えずにメタストアからテーブル定義を削除できます。 この方法では、SQL を使用してデータを変換した後にメタストアをクリーンアップし、変換されたデータ ファイルをダウンストリームのデータ分析およびインジェスト プロセスで使用できるようにします。
%%sql
DROP TABLE transformed_orders;
DROP TABLE sales_orders;