使用命令行工具提交 Spark 作业

适用于:SQL Server 2019 (15.x)

本文提供有关如何使用命令行工具在 SQL Server 大数据群集上运行 Spark 作业的指导。

重要

Microsoft SQL Server 2019 大数据群集附加产品将停用。 对 SQL Server 2019 大数据群集的支持将于 2025 年 2 月 28 日结束。 具有软件保障的 SQL Server 2019 的所有现有用户都将在平台上获得完全支持,在此之前,该软件将继续通过 SQL Server 累积更新进行维护。 有关详细信息,请参阅公告博客文章Microsoft SQL Server 平台上的大数据选项

先决条件

使用 azdata 或 Livy 的 Spark 作业

本文提供了如何使用命令行模式将 Spark 应用程序提交到 SQL Server 大数据群集的示例。

Azure 数据 CLI azdata bdc spark 命令 在命令行上显示 SQL Server 大数据群集 Spark 的所有功能。 本文重点介绍作业提交。 但也 azdata bdc spark 通过命令支持 Python、Scala、SQL 和 R 的 azdata bdc spark session 交互式模式。

如果需要与 REST API 的直接集成,请使用标准 Livy 调用来提交作业。 本文使用 curl Livy 示例中的命令行工具运行 REST API 调用。 有关演示如何使用 Python 代码与 Spark Livy 终结点交互的详细示例,请参阅 GitHub 上的 Livy 终结点中的 Spark

使用大数据群集 Spark 的简单 ETL

此提取、转换和加载(ETL)应用程序遵循常见的数据工程模式。 它从 Apache Hadoop 分布式文件系统 (HDFS) 登陆区域路径加载表格数据。 然后,它使用表格式写入 HDFS 处理的区域路径。

下载 示例应用程序的数据集。 然后使用 PySpark、Spark Scala 或 Spark SQL 创建 PySpark 应用程序。

在以下部分中,你将找到每个解决方案的示例练习。 选择平台的选项卡。 你将使用 azdatacurl.

此示例使用以下 PySpark 应用程序。 它作为名为 parquet_etl_sample.py 的 Python 文件保存在本地计算机上。

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Read clickstream_data from storage pool HDFS into a Spark data frame. Applies column renames.
df = spark.read.option("inferSchema", "true").csv('/securelake/landing/criteo/test.txt', sep='\t', 
    header=False).toDF("feat1","feat2","feat3","feat4","feat5","feat6","feat7","feat8",
    "feat9","feat10","feat11","feat12","feat13","catfeat1","catfeat2","catfeat3","catfeat4",
    "catfeat5","catfeat6","catfeat7","catfeat8","catfeat9","catfeat10","catfeat11","catfeat12",
    "catfeat13","catfeat14","catfeat15","catfeat16","catfeat17","catfeat18","catfeat19",
    "catfeat20","catfeat21","catfeat22","catfeat23","catfeat24","catfeat25","catfeat26")

# Print the data frame inferred schema
df.printSchema()

tot_rows = df.count()
print("Number of rows:", tot_rows)

# Drop the managed table
spark.sql("DROP TABLE dl_clickstream")

# Write data frame to HDFS managed table by using optimized Delta Lake table format
df.write.format("parquet").mode("overwrite").saveAsTable("dl_clickstream")

print("Sample ETL pipeline completed")

将 PySpark 应用程序复制到 HDFS

将应用程序存储在 HDFS 中,以便群集可以访问该应用程序以供执行。 最佳做法是标准化和管理群集中的应用程序位置,以简化管理。

在此示例用例中,所有 ETL 管道应用程序都存储在 hdfs:/apps/ETL-Pipelines 路径上。 示例应用程序存储在 hdfs:/apps/ETL-Pipelines/parquet_etl_sample.py

运行以下命令 ,将parquet_etl_sample.py 从本地开发或暂存计算机上传到 HDFS 群集。

azdata bdc hdfs cp --from-path parquet_etl_sample.py  --to-path "hdfs:/apps/ETL-Pipelines/parquet_etl_sample.py"

运行 Spark 应用程序

使用以下命令将应用程序提交到 SQL Server 大数据群集 Spark 以供执行。

azdata 命令使用常用指定参数运行应用程序。 有关完整参数选项 azdata bdc spark batch create,请参阅 azdata bdc spark

此应用程序需要 spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation 配置参数。 因此,该命令使用 --config 该选项。 此设置演示如何将配置传递到 Spark 会话。

可以使用此选项 --config 指定多个配置参数。 还可以通过在对象中设置配置,在应用程序会话中 SparkSession 指定它们。

azdata bdc spark batch create -f hdfs:/apps/ETL-Pipelines/parquet_etl_sample.py \
--config '{"spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation":"true"}' \
-n MyETLPipelinePySpark --executor-count 2 --executor-cores 2 --executor-memory 1664m

警告

每次创建新批处理时,批名称的“name”或“n”参数应是唯一的。

监视 Spark 作业

这些azdata bdc spark batch命令为 Spark 批处理作业提供管理作。

若要 列出所有正在运行的作业,请运行以下命令。

  • azdata 命令:

    azdata bdc spark batch list -o table
    
  • 使用 Livy 的 curl 命令:

    curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches
    

若要 获取 具有给定 ID 的 Spark 批处理的信息,请运行以下命令。 从batch idspark batch create中返回 。

  • azdata 命令:

    azdata bdc spark batch info --batch-id 0
    
  • 使用 Livy 的 curl 命令:

    curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches/<BATCH_ID>
    

若要获取具有给定 ID 的 Spark 批处理 的状态信息 ,请运行以下命令。

  • azdata 命令:

    azdata bdc spark batch state --batch-id 0
    
  • 使用 Livy 的 curl 命令:

    curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches/<BATCH_ID>/state
    

若要 获取 具有给定 ID 的 Spark 批处理的日志,请运行以下命令。

  • azdata 命令:

    azdata bdc spark batch log --batch-id 0
    
  • 使用 Livy 的 curl 命令:

    curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches/<BATCH_ID>/log
    

后续步骤

有关 Spark 代码故障排除的信息,请参阅 对 PySpark 笔记本进行故障排除

GitHub 上的 SQL Server 大数据群集 Spark 示例提供了全面的 Spark 示例 代码。

若要详细了解 SQL Server 大数据群集和相关方案,请参阅 SQL Server 大数据群集