次の方法で共有


コマンド ライン ツールを使用して Spark ジョブを送信する

適用対象: SQL Server 2019 (15.x)

この記事では、コマンドライン ツールを使用して SQL Server ビッグ データ クラスターで Spark ジョブを実行する方法に関するガイダンスを提供します。

Von Bedeutung

Microsoft SQL Server 2019 ビッグ データ クラスターのアドオンは廃止されます。 SQL Server 2019 ビッグ データ クラスターのサポートは、2025 年 2 月 28 日に終了します。 ソフトウェア アシュアランス付きの SQL Server 2019 を使用する既存の全ユーザーはプラットフォームで完全にサポートされ、ソフトウェアはその時点まで SQL Server の累積更新プログラムによって引き続きメンテナンスされます。 詳細については、お知らせのブログ記事と「Microsoft SQL Server プラットフォームのビッグ データ オプション」を参照してください。

[前提条件]

azdata または Livy を使用する Spark ジョブ

この記事では、コマンド ライン パターンを使用して Spark アプリケーションを SQL Server ビッグ データ クラスターに送信する方法の例を示します。

Azure Data CLI azdata bdc spark コマンド は、SQL Server ビッグ データ クラスター Spark のすべての機能をコマンド ラインで表示します。 この記事では、ジョブの送信について説明します。 ただし、 azdata bdc spark では、 azdata bdc spark session コマンドを使用した Python、Scala、SQL、R の対話型モードもサポートされています。

REST API と直接統合する必要がある場合は、標準の Livy 呼び出しを使用してジョブを送信します。 この記事では、Livy の例の curl コマンドライン ツールを使用して、REST API 呼び出しを実行します。 Python コードを使用して Spark Livy エンドポイントを操作する方法を示す詳細な例については、GitHub の Livy エンドポイントからの Spark の使用 に関するページを参照してください。

ビッグ データ クラスター Spark を使用する単純な ETL

この抽出、変換、読み込み (ETL) アプリケーションは、一般的なデータ エンジニアリング パターンに従います。 Apache Hadoop 分散ファイル システム (HDFS) ランディング ゾーン パスから表形式データを読み込みます。 次に、テーブル形式を使用して HDFS で処理されたゾーン パスに書き込みます。

サンプル アプリケーションのデータセットをダウンロードします。 次に、PySpark、Spark Scala、または Spark SQL を使用して PySpark アプリケーションを作成します。

次のセクションでは、各ソリューションのサンプル演習を紹介します。 プラットフォームのタブを選択します。 アプリケーションは、 azdata または curlを使用して実行します。

この例では、次の PySpark アプリケーションを使用します。 ローカル コンピューターに parquet_etl_sample.py という名前の Python ファイルとして保存されます。

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Read clickstream_data from storage pool HDFS into a Spark data frame. Applies column renames.
df = spark.read.option("inferSchema", "true").csv('/securelake/landing/criteo/test.txt', sep='\t', 
    header=False).toDF("feat1","feat2","feat3","feat4","feat5","feat6","feat7","feat8",
    "feat9","feat10","feat11","feat12","feat13","catfeat1","catfeat2","catfeat3","catfeat4",
    "catfeat5","catfeat6","catfeat7","catfeat8","catfeat9","catfeat10","catfeat11","catfeat12",
    "catfeat13","catfeat14","catfeat15","catfeat16","catfeat17","catfeat18","catfeat19",
    "catfeat20","catfeat21","catfeat22","catfeat23","catfeat24","catfeat25","catfeat26")

# Print the data frame inferred schema
df.printSchema()

tot_rows = df.count()
print("Number of rows:", tot_rows)

# Drop the managed table
spark.sql("DROP TABLE dl_clickstream")

# Write data frame to HDFS managed table by using optimized Delta Lake table format
df.write.format("parquet").mode("overwrite").saveAsTable("dl_clickstream")

print("Sample ETL pipeline completed")

PySpark アプリケーションを HDFS にコピーする

クラスターが実行のためにアプリケーションにアクセスできるように、アプリケーションを HDFS に格納します。 ベスト プラクティスとして、クラスター内のアプリケーションの場所を標準化して管理し、管理を合理化します。

このユース ケースの例では、すべての ETL パイプライン アプリケーションが hdfs:/apps/ETL-Pipelines パスに格納されます。 サンプル アプリケーションは hdfs:/apps/ETL-Pipelines/parquet_etl_sample.py に格納されます。

次のコマンドを実行して ローカル開発またはステージング コンピューターから HDFS クラスターにparquet_etl_sample.pyをアップロードします。

azdata bdc hdfs cp --from-path parquet_etl_sample.py  --to-path "hdfs:/apps/ETL-Pipelines/parquet_etl_sample.py"

Spark アプリケーションを実行する

次のコマンドを使用して、アプリケーションを SQL Server ビッグ データ クラスター Spark に送信して実行します。

azdata コマンドは、一般的に指定されたパラメーターを使用してアプリケーションを実行します。 azdata bdc spark batch createの完全なパラメーター オプションについては、azdata bdc sparkを参照してください。

このアプリケーションには、 spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation 構成パラメーターが必要です。 そのため、コマンドは --config オプションを使用します。 このセットアップでは、Spark セッションに構成を渡す方法を示します。

--config オプションを使用して、複数の構成パラメーターを指定できます。 SparkSession オブジェクトで構成を設定して、アプリケーション セッション内で指定することもできます。

azdata bdc spark batch create -f hdfs:/apps/ETL-Pipelines/parquet_etl_sample.py \
--config '{"spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation":"true"}' \
-n MyETLPipelinePySpark --executor-count 2 --executor-cores 2 --executor-memory 1664m

Warnung

バッチ名の "name" または "n" パラメーターは、新しいバッチが作成されるたびに一意である必要があります。

Spark ジョブの監視

azdata bdc spark batch コマンドは、Spark バッチ ジョブの管理アクションを提供します。

実行中のすべてのジョブを一覧表示するには、次のコマンドを実行します。

  • azdata コマンドは、次のことを行います。

    azdata bdc spark batch list -o table
    
  • Livy を使用した curl コマンド:

    curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches
    

指定された ID を持つ Spark バッチの 情報を取得 するには、次のコマンドを実行します。 batch idspark batch createから返されます。

  • azdata コマンドは、次のことを行います。

    azdata bdc spark batch info --batch-id 0
    
  • Livy を使用した curl コマンド:

    curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches/<BATCH_ID>
    

指定された ID を持つ Spark バッチの 状態情報を取得 するには、次のコマンドを実行します。

  • azdata コマンドは、次のことを行います。

    azdata bdc spark batch state --batch-id 0
    
  • Livy を使用した curl コマンド:

    curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches/<BATCH_ID>/state
    

指定 された ID を 持つ Spark バッチのログを取得するには、次のコマンドを実行します。

  • azdata コマンドは、次のことを行います。

    azdata bdc spark batch log --batch-id 0
    
  • Livy を使用した curl コマンド:

    curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches/<BATCH_ID>/log
    

次のステップ

Spark コードのトラブルシューティングの詳細については、「 PySpark ノートブックのトラブルシューティング」を参照してください。

包括的な Spark サンプル コードは、GitHub の SQL Server ビッグ データ クラスターの Spark サンプル で入手できます。

SQL Server ビッグ データ クラスターおよびこれに関連するシナリオの詳細については、「SQL Server ビッグ データ クラスター」を参照してください。