次の方法で共有


COPY INTOを使用してデータを読み込む

COPY INTO SQL コマンドを使用すると、ファイルの場所から Delta テーブルにデータを読み込むことができます。 これは再試行可能で冪等の操作であり、ソース場所にある既に読み込み済みのファイルはスキップされます。

COPY INTO には次の機能があります。

  • S3、ADLS、ABFS、GCS、Unity カタログ ボリュームなど、クラウド ストレージから簡単に構成できるファイルまたはディレクトリ フィルター。
  • CSV、JSON、XML、AvroORCParquet、テキスト、バイナリ ファイルなど、複数のソース ファイル形式のサポート
  • 既定で 1 回だけ (べき等) のファイル処理
  • ターゲット テーブル スキーマの推論、マッピング、マージ、および展開

よりスケーラブルで堅牢なファイル インジェスト エクスペリエンスを実現するために、Databricks においては SQL ユーザーによるストリーミング テーブルの活用をお勧めします。 ストリーミング テーブルを参照してください。

警告

COPY INTO は、削除ベクトルのワークスペース設定を考慮します。 有効にした場合、Databricks Runtime 14.0 以降を実行しているコンピューティングまたは SQL ウェアハウスで COPY INTO が実行されている場合、ターゲット テーブルで削除ベクトルが有効になります。 有効にすると、削除ベクトルは Databricks Runtime 11.3 LTS 以下のテーブルに対するクエリをブロックします。 「削除ベクトルとは」と「削除ベクトルの自動有効化」を参照してください。

要件

アカウント管理者は、ユーザーが を使用してデータを読み込む前に、「COPY INTO」の手順に従って、クラウド オブジェクト ストレージ内のデータへのアクセスを構成する必要があります。

例: スキーマレス Delta Lake テーブルにデータを読み込む

この機能は、Databricks Runtime 11.3 LTS 以降で使用できます。

COPY INTOmergeSchematrue に設定することで、後で COPY_OPTIONS コマンド中にスキーマが推論されるように、空のプレースホルダー Delta テーブルを作成できます。

CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];

COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

上記の SQL ステートメントはアイデンポテント(冪等)であり、Delta テーブルにデータを一度だけ取り込むために実行をスケジュールすることができます。

空の Delta テーブルは、COPY INTO の外部では使用できません。 INSERT INTO および MERGE INTO では、スキーマレス Delta テーブルへのデータの書き込みがサポートされていません。 データが COPY INTO でテーブルに挿入されると、テーブルはクエリを実行することが可能になります。

COPY INTO のターゲット テーブルの作成を参照してください。

例: スキーマを設定し、Delta Lake テーブルにデータを読み込む

次の例では、Delta テーブルを作成し、COPY INTO SQL コマンドを使用して Databricks データセットのサンプル データをテーブルに読み込む方法を示しています。 Azure Databricks クラスターにアタッチされているノートブックから、Python、R、Scala、または SQL のサンプル コードを実行できます。 また、Databricks SQLSQL ウェアハウスに関連付けられているクエリから SQL コードを実行することもできます。

SQL

DROP TABLE IF EXISTS default.loan_risks_upload;

CREATE TABLE default.loan_risks_upload (
  loan_id BIGINT,
  funded_amnt INT,
  paid_amnt DOUBLE,
  addr_state STRING
);

COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;

SELECT * FROM default.loan_risks_upload;

-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0       | 1000        | 182.22    | CA         |
-- +---------+-------------+-----------+------------+
-- | 1       | 1000        | 361.19    | WA         |
-- +---------+-------------+-----------+------------+
-- | 2       | 1000        | 176.26    | TX         |
-- +---------+-------------+-----------+------------+
-- ...

Python(プログラミング言語)

table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" \
  "loan_id BIGINT, " + \
  "funded_amnt INT, " + \
  "paid_amnt DOUBLE, " + \
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format
)

loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)

display(loan_risks_upload_data)

'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
'''

R

library(SparkR)
sparkR.session()

table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"

sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))

sql(paste("CREATE TABLE ", table_name, " (",
  "loan_id BIGINT, ",
  "funded_amnt INT, ",
  "paid_amnt DOUBLE, ",
  "addr_state STRING)",
  sep = ""
))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  sep = ""
))

loan_risks_upload_data = tableToDF(table_name)

display(loan_risks_upload_data)

# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0       | 1000        | 182.22    | CA         |
# +---------+-------------+-----------+------------+
# | 1       | 1000        | 361.19    | WA         |
# +---------+-------------+-----------+------------+
# | 2       | 1000        | 176.26    | TX         |
# +---------+-------------+-----------+------------+
# ...

スカラ (プログラミング言語)

val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" +
  "loan_id BIGINT, " +
  "funded_amnt INT, " +
  "paid_amnt DOUBLE, " +
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format
)

val loan_risks_upload_data = spark.table(table_name)

display(loan_risks_upload_data)

/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
*/

クリーンアップするには、次のコードを実行します。これで、テーブルが削除されます。

Python(プログラミング言語)

spark.sql("DROP TABLE " + table_name)

R

sql(paste("DROP TABLE ", table_name, sep = ""))

スカラ (プログラミング言語)

spark.sql("DROP TABLE " + table_name)

SQL

DROP TABLE default.loan_risks_upload

メタデータ ファイルをクリーンアップする

VACUUMを実行して、Databricks Runtime 15.2 以降でCOPY INTOによって作成された参照されていないメタデータ ファイルをクリーンアップできます。

リファレンス

追加のリソース