适用于:SQL Server 2019 (15.x)
本文提供有关如何使用命令行工具在 SQL Server 大数据群集上运行 Spark 作业的指导。
重要
Microsoft SQL Server 2019 大数据群集附加产品将停用。 对 SQL Server 2019 大数据群集的支持将于 2025 年 2 月 28 日结束。 具有软件保障的 SQL Server 2019 的所有现有用户都将在平台上获得完全支持,在此之前,该软件将继续通过 SQL Server 累积更新进行维护。 有关详细信息,请参阅公告博客文章和 Microsoft SQL Server 平台上的大数据选项。
先决条件
- 配置并登录到群集的 SQL Server 2019 大数据工具:
azdata
- 执行
curl
对 Livy 的 REST API 调用的应用程序
使用 azdata 或 Livy 的 Spark 作业
本文提供了如何使用命令行模式将 Spark 应用程序提交到 SQL Server 大数据群集的示例。
Azure 数据 CLI azdata bdc spark
命令 在命令行上显示 SQL Server 大数据群集 Spark 的所有功能。 本文重点介绍作业提交。 但也 azdata bdc spark
通过命令支持 Python、Scala、SQL 和 R 的 azdata bdc spark session
交互式模式。
如果需要与 REST API 的直接集成,请使用标准 Livy 调用来提交作业。 本文使用 curl
Livy 示例中的命令行工具运行 REST API 调用。 有关演示如何使用 Python 代码与 Spark Livy 终结点交互的详细示例,请参阅 GitHub 上的 Livy 终结点中的 Spark 。
使用大数据群集 Spark 的简单 ETL
此提取、转换和加载(ETL)应用程序遵循常见的数据工程模式。 它从 Apache Hadoop 分布式文件系统 (HDFS) 登陆区域路径加载表格数据。 然后,它使用表格式写入 HDFS 处理的区域路径。
下载 示例应用程序的数据集。 然后使用 PySpark、Spark Scala 或 Spark SQL 创建 PySpark 应用程序。
在以下部分中,你将找到每个解决方案的示例练习。 选择平台的选项卡。 你将使用 azdata
或 curl
.
此示例使用以下 PySpark 应用程序。 它作为名为 parquet_etl_sample.py 的 Python 文件保存在本地计算机上。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# Read clickstream_data from storage pool HDFS into a Spark data frame. Applies column renames.
df = spark.read.option("inferSchema", "true").csv('/securelake/landing/criteo/test.txt', sep='\t',
header=False).toDF("feat1","feat2","feat3","feat4","feat5","feat6","feat7","feat8",
"feat9","feat10","feat11","feat12","feat13","catfeat1","catfeat2","catfeat3","catfeat4",
"catfeat5","catfeat6","catfeat7","catfeat8","catfeat9","catfeat10","catfeat11","catfeat12",
"catfeat13","catfeat14","catfeat15","catfeat16","catfeat17","catfeat18","catfeat19",
"catfeat20","catfeat21","catfeat22","catfeat23","catfeat24","catfeat25","catfeat26")
# Print the data frame inferred schema
df.printSchema()
tot_rows = df.count()
print("Number of rows:", tot_rows)
# Drop the managed table
spark.sql("DROP TABLE dl_clickstream")
# Write data frame to HDFS managed table by using optimized Delta Lake table format
df.write.format("parquet").mode("overwrite").saveAsTable("dl_clickstream")
print("Sample ETL pipeline completed")
将 PySpark 应用程序复制到 HDFS
将应用程序存储在 HDFS 中,以便群集可以访问该应用程序以供执行。 最佳做法是标准化和管理群集中的应用程序位置,以简化管理。
在此示例用例中,所有 ETL 管道应用程序都存储在 hdfs:/apps/ETL-Pipelines 路径上。 示例应用程序存储在 hdfs:/apps/ETL-Pipelines/parquet_etl_sample.py。
运行以下命令 ,将parquet_etl_sample.py 从本地开发或暂存计算机上传到 HDFS 群集。
azdata bdc hdfs cp --from-path parquet_etl_sample.py --to-path "hdfs:/apps/ETL-Pipelines/parquet_etl_sample.py"
运行 Spark 应用程序
使用以下命令将应用程序提交到 SQL Server 大数据群集 Spark 以供执行。
该 azdata
命令使用常用指定参数运行应用程序。 有关完整参数选项 azdata bdc spark batch create
,请参阅 azdata bdc spark
。
此应用程序需要 spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation
配置参数。 因此,该命令使用 --config
该选项。 此设置演示如何将配置传递到 Spark 会话。
可以使用此选项 --config
指定多个配置参数。 还可以通过在对象中设置配置,在应用程序会话中 SparkSession
指定它们。
azdata bdc spark batch create -f hdfs:/apps/ETL-Pipelines/parquet_etl_sample.py \
--config '{"spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation":"true"}' \
-n MyETLPipelinePySpark --executor-count 2 --executor-cores 2 --executor-memory 1664m
警告
每次创建新批处理时,批名称的“name”或“n”参数应是唯一的。
监视 Spark 作业
这些azdata bdc spark batch
命令为 Spark 批处理作业提供管理作。
若要 列出所有正在运行的作业,请运行以下命令。
azdata
命令:azdata bdc spark batch list -o table
使用 Livy 的
curl
命令:curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches
若要 获取 具有给定 ID 的 Spark 批处理的信息,请运行以下命令。 从batch id
spark batch create
中返回 。
azdata
命令:azdata bdc spark batch info --batch-id 0
使用 Livy 的
curl
命令:curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches/<BATCH_ID>
若要获取具有给定 ID 的 Spark 批处理 的状态信息 ,请运行以下命令。
azdata
命令:azdata bdc spark batch state --batch-id 0
使用 Livy 的
curl
命令:curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches/<BATCH_ID>/state
若要 获取 具有给定 ID 的 Spark 批处理的日志,请运行以下命令。
azdata
命令:azdata bdc spark batch log --batch-id 0
使用 Livy 的
curl
命令:curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches/<BATCH_ID>/log
后续步骤
有关 Spark 代码故障排除的信息,请参阅 对 PySpark 笔记本进行故障排除。
GitHub 上的 SQL Server 大数据群集 Spark 示例提供了全面的 Spark 示例 代码。
若要详细了解 SQL Server 大数据群集和相关方案,请参阅 SQL Server 大数据群集。