適用対象: SQL Server 2019 (15.x)
この記事では、コマンドライン ツールを使用して SQL Server ビッグ データ クラスターで Spark ジョブを実行する方法に関するガイダンスを提供します。
Von Bedeutung
Microsoft SQL Server 2019 ビッグ データ クラスターのアドオンは廃止されます。 SQL Server 2019 ビッグ データ クラスターのサポートは、2025 年 2 月 28 日に終了します。 ソフトウェア アシュアランス付きの SQL Server 2019 を使用する既存の全ユーザーはプラットフォームで完全にサポートされ、ソフトウェアはその時点まで SQL Server の累積更新プログラムによって引き続きメンテナンスされます。 詳細については、お知らせのブログ記事と「Microsoft SQL Server プラットフォームのビッグ データ オプション」を参照してください。
[前提条件]
-
SQL Server 2019 ビッグ データ ツール が構成され、クラスターにログインしました。
azdata
- Livy への REST API 呼び出しを実行する
curl
アプリケーション
azdata または Livy を使用する Spark ジョブ
この記事では、コマンド ライン パターンを使用して Spark アプリケーションを SQL Server ビッグ データ クラスターに送信する方法の例を示します。
Azure Data CLI azdata bdc spark
コマンド は、SQL Server ビッグ データ クラスター Spark のすべての機能をコマンド ラインで表示します。 この記事では、ジョブの送信について説明します。 ただし、 azdata bdc spark
では、 azdata bdc spark session
コマンドを使用した Python、Scala、SQL、R の対話型モードもサポートされています。
REST API と直接統合する必要がある場合は、標準の Livy 呼び出しを使用してジョブを送信します。 この記事では、Livy の例の curl
コマンドライン ツールを使用して、REST API 呼び出しを実行します。 Python コードを使用して Spark Livy エンドポイントを操作する方法を示す詳細な例については、GitHub の Livy エンドポイントからの Spark の使用 に関するページを参照してください。
ビッグ データ クラスター Spark を使用する単純な ETL
この抽出、変換、読み込み (ETL) アプリケーションは、一般的なデータ エンジニアリング パターンに従います。 Apache Hadoop 分散ファイル システム (HDFS) ランディング ゾーン パスから表形式データを読み込みます。 次に、テーブル形式を使用して HDFS で処理されたゾーン パスに書き込みます。
サンプル アプリケーションのデータセットをダウンロードします。 次に、PySpark、Spark Scala、または Spark SQL を使用して PySpark アプリケーションを作成します。
次のセクションでは、各ソリューションのサンプル演習を紹介します。 プラットフォームのタブを選択します。 アプリケーションは、 azdata
または curl
を使用して実行します。
この例では、次の PySpark アプリケーションを使用します。 ローカル コンピューターに parquet_etl_sample.py という名前の Python ファイルとして保存されます。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# Read clickstream_data from storage pool HDFS into a Spark data frame. Applies column renames.
df = spark.read.option("inferSchema", "true").csv('/securelake/landing/criteo/test.txt', sep='\t',
header=False).toDF("feat1","feat2","feat3","feat4","feat5","feat6","feat7","feat8",
"feat9","feat10","feat11","feat12","feat13","catfeat1","catfeat2","catfeat3","catfeat4",
"catfeat5","catfeat6","catfeat7","catfeat8","catfeat9","catfeat10","catfeat11","catfeat12",
"catfeat13","catfeat14","catfeat15","catfeat16","catfeat17","catfeat18","catfeat19",
"catfeat20","catfeat21","catfeat22","catfeat23","catfeat24","catfeat25","catfeat26")
# Print the data frame inferred schema
df.printSchema()
tot_rows = df.count()
print("Number of rows:", tot_rows)
# Drop the managed table
spark.sql("DROP TABLE dl_clickstream")
# Write data frame to HDFS managed table by using optimized Delta Lake table format
df.write.format("parquet").mode("overwrite").saveAsTable("dl_clickstream")
print("Sample ETL pipeline completed")
PySpark アプリケーションを HDFS にコピーする
クラスターが実行のためにアプリケーションにアクセスできるように、アプリケーションを HDFS に格納します。 ベスト プラクティスとして、クラスター内のアプリケーションの場所を標準化して管理し、管理を合理化します。
このユース ケースの例では、すべての ETL パイプライン アプリケーションが hdfs:/apps/ETL-Pipelines パスに格納されます。 サンプル アプリケーションは hdfs:/apps/ETL-Pipelines/parquet_etl_sample.py に格納されます。
次のコマンドを実行して 、 ローカル開発またはステージング コンピューターから HDFS クラスターにparquet_etl_sample.pyをアップロードします。
azdata bdc hdfs cp --from-path parquet_etl_sample.py --to-path "hdfs:/apps/ETL-Pipelines/parquet_etl_sample.py"
Spark アプリケーションを実行する
次のコマンドを使用して、アプリケーションを SQL Server ビッグ データ クラスター Spark に送信して実行します。
azdata
コマンドは、一般的に指定されたパラメーターを使用してアプリケーションを実行します。
azdata bdc spark batch create
の完全なパラメーター オプションについては、azdata bdc spark
を参照してください。
このアプリケーションには、 spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation
構成パラメーターが必要です。 そのため、コマンドは --config
オプションを使用します。 このセットアップでは、Spark セッションに構成を渡す方法を示します。
--config
オプションを使用して、複数の構成パラメーターを指定できます。
SparkSession
オブジェクトで構成を設定して、アプリケーション セッション内で指定することもできます。
azdata bdc spark batch create -f hdfs:/apps/ETL-Pipelines/parquet_etl_sample.py \
--config '{"spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation":"true"}' \
-n MyETLPipelinePySpark --executor-count 2 --executor-cores 2 --executor-memory 1664m
Warnung
バッチ名の "name" または "n" パラメーターは、新しいバッチが作成されるたびに一意である必要があります。
Spark ジョブの監視
azdata bdc spark batch
コマンドは、Spark バッチ ジョブの管理アクションを提供します。
実行中のすべてのジョブを一覧表示するには、次のコマンドを実行します。
azdata
コマンドは、次のことを行います。azdata bdc spark batch list -o table
Livy を使用した
curl
コマンド:curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches
指定された ID を持つ Spark バッチの 情報を取得 するには、次のコマンドを実行します。
batch id
はspark batch create
から返されます。
azdata
コマンドは、次のことを行います。azdata bdc spark batch info --batch-id 0
Livy を使用した
curl
コマンド:curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches/<BATCH_ID>
指定された ID を持つ Spark バッチの 状態情報を取得 するには、次のコマンドを実行します。
azdata
コマンドは、次のことを行います。azdata bdc spark batch state --batch-id 0
Livy を使用した
curl
コマンド:curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches/<BATCH_ID>/state
指定 された ID を 持つ Spark バッチのログを取得するには、次のコマンドを実行します。
azdata
コマンドは、次のことを行います。azdata bdc spark batch log --batch-id 0
Livy を使用した
curl
コマンド:curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches/<BATCH_ID>/log
次のステップ
Spark コードのトラブルシューティングの詳細については、「 PySpark ノートブックのトラブルシューティング」を参照してください。
包括的な Spark サンプル コードは、GitHub の SQL Server ビッグ データ クラスターの Spark サンプル で入手できます。
SQL Server ビッグ データ クラスターおよびこれに関連するシナリオの詳細については、「SQL Server ビッグ データ クラスター」を参照してください。