COPY INTO
SQL コマンドを使用すると、ファイルの場所から Delta テーブルにデータを読み込むことができます。 これは再試行可能で冪等の操作であり、ソース場所にある既に読み込み済みのファイルはスキップされます。
COPY INTO
には次の機能があります。
- S3、ADLS、ABFS、GCS、Unity カタログ ボリュームなど、クラウド ストレージから簡単に構成できるファイルまたはディレクトリ フィルター。
- CSV、JSON、XML、Avro、 ORC、Parquet、テキスト、バイナリ ファイルなど、複数のソース ファイル形式のサポート
- 既定で 1 回だけ (べき等) のファイル処理
- ターゲット テーブル スキーマの推論、マッピング、マージ、および展開
注
よりスケーラブルで堅牢なファイル インジェスト エクスペリエンスを実現するために、Databricks においては SQL ユーザーによるストリーミング テーブルの活用をお勧めします。 ストリーミング テーブルを参照してください。
警告
COPY INTO
は、削除ベクトルのワークスペース設定を考慮します。 有効にした場合、Databricks Runtime 14.0 以降を実行しているコンピューティングまたは SQL ウェアハウスで COPY INTO
が実行されている場合、ターゲット テーブルで削除ベクトルが有効になります。 有効にすると、削除ベクトルは Databricks Runtime 11.3 LTS 以下のテーブルに対するクエリをブロックします。 「削除ベクトルとは」と「削除ベクトルの自動有効化」を参照してください。
要件
アカウント管理者は、ユーザーが を使用してデータを読み込む前に、「COPY INTO
」の手順に従って、クラウド オブジェクト ストレージ内のデータへのアクセスを構成する必要があります。
例: スキーマレス Delta Lake テーブルにデータを読み込む
注
この機能は、Databricks Runtime 11.3 LTS 以降で使用できます。
COPY INTO
で mergeSchema
を true
に設定することで、後で COPY_OPTIONS
コマンド中にスキーマが推論されるように、空のプレースホルダー Delta テーブルを作成できます。
CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
上記の SQL ステートメントはアイデンポテント(冪等)であり、Delta テーブルにデータを一度だけ取り込むために実行をスケジュールすることができます。
注
空の Delta テーブルは、COPY INTO
の外部では使用できません。
INSERT INTO
および MERGE INTO
では、スキーマレス Delta テーブルへのデータの書き込みがサポートされていません。 データが COPY INTO
でテーブルに挿入されると、テーブルはクエリを実行することが可能になります。
COPY INTO
のターゲット テーブルの作成を参照してください。
例: スキーマを設定し、Delta Lake テーブルにデータを読み込む
次の例では、Delta テーブルを作成し、COPY INTO
SQL コマンドを使用して Databricks データセットのサンプル データをテーブルに読み込む方法を示しています。 Azure Databricks クラスターにアタッチされているノートブックから、Python、R、Scala、または SQL のサンプル コードを実行できます。 また、Databricks SQL の SQL ウェアハウスに関連付けられているクエリから SQL コードを実行することもできます。
SQL
DROP TABLE IF EXISTS default.loan_risks_upload;
CREATE TABLE default.loan_risks_upload (
loan_id BIGINT,
funded_amnt INT,
paid_amnt DOUBLE,
addr_state STRING
);
COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;
SELECT * FROM default.loan_risks_upload;
-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0 | 1000 | 182.22 | CA |
-- +---------+-------------+-----------+------------+
-- | 1 | 1000 | 361.19 | WA |
-- +---------+-------------+-----------+------------+
-- | 2 | 1000 | 176.26 | TX |
-- +---------+-------------+-----------+------------+
-- ...
Python(プログラミング言語)
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"loan_id BIGINT, " + \
"funded_amnt INT, " + \
"paid_amnt DOUBLE, " + \
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format
)
loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)
display(loan_risks_upload_data)
'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
'''
R
library(SparkR)
sparkR.session()
table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"loan_id BIGINT, ",
"funded_amnt INT, ",
"paid_amnt DOUBLE, ",
"addr_state STRING)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
sep = ""
))
loan_risks_upload_data = tableToDF(table_name)
display(loan_risks_upload_data)
# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0 | 1000 | 182.22 | CA |
# +---------+-------------+-----------+------------+
# | 1 | 1000 | 361.19 | WA |
# +---------+-------------+-----------+------------+
# | 2 | 1000 | 176.26 | TX |
# +---------+-------------+-----------+------------+
# ...
スカラ (プログラミング言語)
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"loan_id BIGINT, " +
"funded_amnt INT, " +
"paid_amnt DOUBLE, " +
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format
)
val loan_risks_upload_data = spark.table(table_name)
display(loan_risks_upload_data)
/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
*/
クリーンアップするには、次のコードを実行します。これで、テーブルが削除されます。
Python(プログラミング言語)
spark.sql("DROP TABLE " + table_name)
R
sql(paste("DROP TABLE ", table_name, sep = ""))
スカラ (プログラミング言語)
spark.sql("DROP TABLE " + table_name)
SQL
DROP TABLE default.loan_risks_upload
メタデータ ファイルをクリーンアップする
VACUUMを実行して、Databricks Runtime 15.2 以降でCOPY INTO
によって作成された参照されていないメタデータ ファイルをクリーンアップできます。
リファレンス
- Databricks Runtime 7.x 以降:
COPY INTO
追加のリソース
- 同じ Delta テーブルに対する複数の
COPY INTO
操作の例など、一般的な使用パターンについては、「COPY INTO
を使用した一般的なデータ読み込みパターン」を参照してください。