Apache Spark のスケーラブルな機械学習ライブラリ (MLlib) は、分散環境にモデリング機能を提供します。 Spark パッケージ spark.ml
は、DataFrames 上に構築された一連の高レベル API です。 これらの API は、実用的な機械学習パイプラインの作成と調整に役立ちます。
Spark 機械学習 は、古い RDD ベースのパイプライン API ではなく、この MLlib DataFrame ベースの API を参照します。
機械学習 (ML) パイプラインは、複数の機械学習アルゴリズムを組み合わせた完全なワークフローです。 データの処理と学習には多くの手順が必要であり、一連のアルゴリズムが必要です。 パイプラインは、機械学習プロセスのステージと順序を定義します。 MLlib では、パイプラインのステージは、Transformer と Estimator がそれぞれタスクを実行する PipelineStages の特定のシーケンスによって表されます。
トランスフォーマーは、 transform()
メソッドを使用して 1 つの DataFrame を別のデータフレームに変換するアルゴリズムです。 たとえば、機能トランスフォーマーは、DataFrame の 1 つの列を読み取り、別の列にマップし、マップされた列が追加された新しい DataFrame を出力できます。
Estimator は学習アルゴリズムの抽象化であり、トランスフォーマーを生成するためのデータセットの調整またはトレーニングを担当します。 Estimator は、 fit()
という名前のメソッドを実装します。このメソッドは DataFrame を受け入れ、トランスフォーマーである DataFrame を生成します。
Transformer または Estimator の各ステートレスインスタンスには、パラメーターを指定するときに使用されるユニークな識別子があります。 どちらも、これらのパラメーターを指定するために Uniform API を使用します。
パイプラインの例
ML パイプラインの実際の使用方法を示すために、この例では、HDInsight クラスターの既定のストレージ (Azure Storage または Data Lake Storage) に事前に読み込まれるサンプル HVAC.csv
データ ファイルを使用します。 ファイルの内容を表示するには、 /HdiSamples/HdiSamples/SensorSampleData/hvac
ディレクトリに移動します。
HVAC.csv
には、さまざまな建物の HVAC (暖房、換気、および空調) システムの目標温度と実際の温度の両方を持つ一連の時間が含まれています。 目標は、データに対してモデルをトレーニングし、特定の建物の予測温度を生成することです。
コード例を次に示します。
-
BuildingID
、SystemInfo
(システムの識別子と年齢)、label
(建物が暑すぎる場合は 1.0、それ以外の場合は 0.0) を格納するLabeledDocument
を定義します。 - データの行 (行) を受け取り、目標温度と実際の温度を比較して建物が "ホット" かどうかを判断するカスタム パーサー関数
parseDocument
を作成します。 - ソース データを抽出するときにパーサーを適用します。
- トレーニング データを作成します。
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row
# The data structure (column meanings) of the data array:
# 0 Date
# 1 Time
# 2 TargetTemp
# 3 ActualTemp
# 4 System
# 5 SystemAge
# 6 BuildingID
LabeledDocument = Row("BuildingID", "SystemInfo", "label")
# Define a function that parses the raw CSV file and returns an object of type LabeledDocument
def parseDocument(line):
values = [str(x) for x in line.split(',')]
if (values[3] > values[2]):
hot = 1.0
else:
hot = 0.0
textValue = str(values[4]) + " " + str(values[5])
return LabeledDocument((values[6]), textValue, hot)
# Load the raw HVAC.csv file, parse it using the function
data = sc.textFile(
"wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv")
documents = data.filter(lambda s: "Date" not in s).map(parseDocument)
training = documents.toDF()
このパイプライン例には、 Tokenizer
と HashingTF
(トランスフォーマーの両方)、 Logistic Regression
(Estimator) の 3 つのステージがあります。
training
DataFrame 内の抽出および解析されたデータは、pipeline.fit(training)
が呼び出されたときにパイプラインを通過します。
- 最初のステージ
Tokenizer
では、SystemInfo
入力列 (システム識別子と年齢値で構成) がwords
出力列に分割されます。 この新しいwords
列が DataFrame に追加されます。 - 2 番目のステージ
HashingTF
、新しいwords
列を特徴ベクトルに変換します。 この新しいfeatures
列が DataFrame に追加されます。 これら最初の 2 つの段階はトランスフォーマーです。 - 3 番目のステージである
LogisticRegression
は Estimator であるため、パイプラインはLogisticRegression.fit()
メソッドを呼び出してLogisticRegressionModel
を生成します。
tokenizer = Tokenizer(inputCol="SystemInfo", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
# Build the pipeline with our tokenizer, hashingTF, and logistic regression stages
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(training)
Tokenizer
およびHashingTF
トランスフォーマーによって追加された新しいwords
列とfeatures
列、およびLogisticRegression
推定器のサンプルを表示するには、元の DataFrame で PipelineModel.transform()
メソッドを実行します。 運用コードでは、次の手順として、トレーニングを検証するためにテスト DataFrame を渡します。
peek = model.transform(training)
peek.show()
# Outputs the following:
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+
|BuildingID|SystemInfo|label| words| features| rawPrediction| probability|prediction|
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+
| 4| 13 20| 0.0|[13, 20]|(262144,[250802,2...|[0.11943986671420...|[0.52982451901740...| 0.0|
| 17| 3 20| 0.0| [3, 20]|(262144,[89074,25...|[0.17511205617446...|[0.54366648775222...| 0.0|
| 18| 17 20| 1.0|[17, 20]|(262144,[64358,25...|[0.14620993833623...|[0.53648750722548...| 0.0|
| 15| 2 23| 0.0| [2, 23]|(262144,[31351,21...|[-0.0361327091023...|[0.49096780538523...| 1.0|
| 3| 16 9| 1.0| [16, 9]|(262144,[153779,1...|[-0.0853679939336...|[0.47867095324139...| 1.0|
| 4| 13 28| 0.0|[13, 28]|(262144,[69821,25...|[0.14630166986618...|[0.53651031790592...| 0.0|
| 2| 12 24| 0.0|[12, 24]|(262144,[187043,2...|[-0.0509556393066...|[0.48726384581522...| 1.0|
| 16| 20 26| 1.0|[20, 26]|(262144,[128319,2...|[0.33829638728900...|[0.58377663577684...| 0.0|
| 9| 16 9| 1.0| [16, 9]|(262144,[153779,1...|[-0.0853679939336...|[0.47867095324139...| 1.0|
| 12| 6 5| 0.0| [6, 5]|(262144,[18659,89...|[0.07513008136562...|[0.51877369045183...| 0.0|
| 15| 10 17| 1.0|[10, 17]|(262144,[64358,25...|[-0.0291988646553...|[0.49270080242078...| 1.0|
| 7| 2 11| 0.0| [2, 11]|(262144,[212053,2...|[0.03678030020834...|[0.50919403860812...| 0.0|
| 15| 14 2| 1.0| [14, 2]|(262144,[109681,2...|[0.06216423725633...|[0.51553605651806...| 0.0|
| 6| 3 2| 0.0| [3, 2]|(262144,[89074,21...|[0.00565582077537...|[0.50141395142468...| 0.0|
| 20| 19 22| 0.0|[19, 22]|(262144,[139093,2...|[-0.0769288695989...|[0.48077726176073...| 1.0|
| 8| 19 11| 0.0|[19, 11]|(262144,[139093,2...|[0.04988910033929...|[0.51246968885151...| 0.0|
| 6| 15 7| 0.0| [15, 7]|(262144,[77099,20...|[0.14854929135994...|[0.53706918109610...| 0.0|
| 13| 12 5| 0.0| [12, 5]|(262144,[89689,25...|[-0.0519932532562...|[0.48700461408785...| 1.0|
| 4| 8 22| 0.0| [8, 22]|(262144,[98962,21...|[-0.0120753606650...|[0.49698119651572...| 1.0|
| 7| 17 5| 0.0| [17, 5]|(262144,[64358,89...|[-0.0721054054871...|[0.48198145477106...| 1.0|
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+
only showing top 20 rows
model
オブジェクトを使用して予測を行えるようになりました。 この機械学習アプリケーションの完全なサンプルと、それを実行するための詳細な手順については、 Azure HDInsight での Apache Spark 機械学習アプリケーションの構築に関するページを参照してください。