以半结构化变体类型导入数据

重要

此功能目前以公共预览版提供。

在 Databricks Runtime 15.3 及更高版本中,可以使用VARIANT类型引入半结构化数据。 本文介绍了行为并提供使用自动加载器和COPY INTO从云对象存储引入数据的示例模式、Kafka 的流式处理记录以及用于使用变体数据新建表或使用变体类型插入新记录的 SQL 命令。 下表汇总了支持的文件格式和 Databricks Runtime 版本支持:

文件格式 支持的 Databricks Runtime 版本
JSON(JavaScript 对象表示法) 15.3 及更高版本
XML 16.4 及更高版本
CSV 16.4 及更高版本

请参阅查询变体数据

创建一个包含变体列的表

VARIANT是 Databricks Runtime 15.3 及更高版本中的标准 SQL 类型,受 Delta Lake 支持的表支持。 Azure Databricks 上的托管表默认使用 Delta Lake,因此可以使用以下语法创建包含单个VARIANT列的空表:

CREATE TABLE table_name (variant_column VARIANT)

或者,可以使用 PARSE_JSON JSON 字符串上的函数或 FROM_XML XML 字符串上的函数来使用 CTAS 语句创建具有变体列的表。 以下示例创建一个包含两个列的表:

  • 从 JSON 字符串中提取的id列作为STRING类型。
  • variant_column列包含编码为VARIANT类型的整个 JSON 字符串。
CREATE TABLE table_name AS
  SELECT json_string:id AS id,
    PARSE_JSON(json_string) variant_column
  FROM source_data

注意

Databricks 建议将字段提取并存储为计划用于加速查询和优化存储布局的非变体列。

VARIANT列不能用于聚类分析键、分区或 Z 顺序键。 VARIANT 数据类型不能用于比较、分组、排序和设置操作。 有关限制的完整列表,请参阅 限制

使用parse_json插入数据

如果目标表已包含编码为VARIANT的列,则可以使用parse_json将 JSON 字符串记录作为VARIANT插入,如以下示例所示:

SQL

INSERT INTO table_name (variant_column)
  SELECT PARSE_JSON(json_string)
  FROM source_data

Python语言

from pyspark.sql.functions import col, parse_json

(spark.read
  .table("source_data")
  .select(parse_json(col("json_string")))
  .write
  .mode("append")
  .saveAsTable("table_name")
)

使用from_xml插入数据

如果目标表已包含编码为 VARIANT的列,则可以使用 from_xml 将 XML 字符串记录插入为 VARIANT。 例如:

SQL

INSERT INTO table_name (variant_column)
  SELECT FROM_XML(xml_string, 'variant')
  FROM source_data

Python语言

from pyspark.sql.functions import col, from_xml

(spark.read
  .table("source_data")
  .select(from_xml(col("xml_string"), "variant"))
  .write
  .mode("append")
  .saveAsTable("table_name")
)

使用 from_csv 插入数据

如果目标表已包含编码为 VARIANT的列,则可以使用 from_xml 将 XML 字符串记录插入为 VARIANT。 例如:

SQL

INSERT INTO table_name (variant_column)
  SELECT FROM_CSV(csv_string, 'v variant').v
  FROM source_data

Python语言

from pyspark.sql.functions import col, from_csv

(spark.read
  .table("source_data")
  .select(from_csv(col("csv_string"), "v variant").v)
  .write
  .mode("append")
  .saveAsTable("table_name")
)

将数据从云对象存储引入为变体

自动加载程序可用于将受支持的文件源中的所有数据作为目标表中的单个 VARIANT 列加载。 由于VARIANT能够灵活应对架构和类型更改,并且维护数据源中存在的区分大小写和NULL值,因此此模式对于大多数引入方案都十分可靠,但需要注意以下事项:

  • 格式不正确的记录不能使用 VARIANT 类型进行编码。
  • VARIANT类型只能容纳最大大小为 16mb 的记录。

注意

Variant 对超大型记录的处理方式与对损坏记录的处理类似。 在默认 PERMISSIVE 处理模式下,过大的记录会被捕获到 corruptRecordColumn

由于整个记录记录为单个 VARIANT 列,因此在引入期间不会发生架构演变,rescuedDataColumn 不被支持。 以下示例假定目标表已存在单个VARIANT列。

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("singleVariantColumn", "variant_column")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

还可以在定义架构或传递VARIANT时指定schemaHints。 引用的源字段中的数据必须包含有效的记录。 以下示例演示了这些语法:

# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("name STRING, address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("payload VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaHints", "address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

COPY INTO与变体配合使用

Databricks 建议在可用时使用自动加载程序,而不是 COPY INTO

COPY INTO 支持将支持的数据源的全部内容作为单个列引入。 以下示例新建包含单个VARIANT列的表,然后使用COPY INTO从 JSON 文件源引入记录。

CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON
  FORMAT_OPTIONS ('singleVariantColumn' = 'name')

将 Kafka 数据作为变体流式传输

许多 Kafka 流使用 JSON 对有效负载进行编码。 使用VARIANT引入 Kafka 流会使这些工作负载对架构更改非常可靠。

以下示例演示了如何读取 Kafka 流式处理源,将key强制转换为STRING,将value强制转换为VARIANT,并写出到目标表。

from pyspark.sql.functions import col, parse_json

(spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("startingOffsets", "earliest")
  .load()
  .select(
    col("key").cast("string"),
    parse_json(col("value").cast("string"))
  ).writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)