修改和保存数据帧

已完成

Apache Spark 提供 数据帧 对象作为处理数据的主要结构。 可以使用数据帧来查询和转换数据,并将结果保存在数据湖中。 若要将数据加载到数据帧中,请使用 spark.read 函数,指定要读取的数据的文件格式、路径和(可选)架构。 例如,以下代码将数据从 订单 文件夹中的所有 .csv 文件加载到名为 order_details 的数据帧中,然后显示前五条记录。

order_details = spark.read.csv('/orders/*.csv', header=True, inferSchema=True)
display(order_details.limit(5))

转换数据结构

将源数据加载到数据帧后,可以使用数据帧对象的方法和 Spark 函数对其进行转换。 数据帧的典型作包括:

  • 筛选行和列
  • 重命名列
  • 创建新列,通常派生自现有列
  • 替换 null 或其他值

在以下示例中,代码使用 split 函数将 CustomerName 列中的值分隔为两个名为 FirstNameLastName 的新列。 然后使用该方法 drop 删除原始 CustomerName 列。

from pyspark.sql.functions import split, col

# Create the new FirstName and LastName fields
transformed_df = order_details.withColumn("FirstName", split(col("CustomerName"), " ").getItem(0)).withColumn("LastName", split(col("CustomerName"), " ").getItem(1))

# Remove the CustomerName field
transformed_df = transformed_df.drop("CustomerName")

display(transformed_df.limit(5))

可以使用 Spark SQL 库的全部功能来转换数据,方法是筛选行、派生、删除、重命名列以及应用其他任何必需的数据修改。

保存转换后的数据

在数据帧采用所需的结构后,可以将结果保存到数据湖中受支持的格式。

以下代码示例将数据帧保存到数据湖中的 Parquet 文件中,替换任何同名的现有文件。

transformed_df.write.mode("overwrite").parquet('/transformed_data/orders.parquet')
print ("Transformed data saved!")

注释

对于用于进一步分析或引入到分析存储的数据文件,通常首选 Parquet 格式。 Parquet 是一种非常高效的格式,大多数大规模数据分析系统都支持这种格式。 事实上,有时数据转换要求可能只是将数据从其他格式(如 CSV)转换为 Parquet!