本文介绍如何使用两种方法执行特定的分类任务。 一种方法使用普通 pyspark
方法,一种方法使用 synapseml
库。 虽然这些方法的性能相同,但它们突出了synapseml
相较于pyspark
的简单性。
本文中所述的任务根据评论文本预测亚马逊上销售的书籍的特定客户评论是否良好(评级 > 3)或差。 若要生成任务,请使用不同的超参数训练 LogisticRegression 学习器,然后选择最佳模型。
先决条件
将笔记本附加到湖屋。 在左侧,可以选择添加来添加现有湖屋,或创建新的湖屋。
设置
导入必要的 Python 库并获取 Spark 会话:
from pyspark.sql import SparkSession
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
读取数据
下载并读取数据:
rawData = spark.read.parquet(
"wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)
提取特征和处理数据
与之前下载的数据集相比,实际数据具有更高的复杂性。 数据集通常具有多种类型的特征,例如文本、数字和分类。 若要显示使用这些数据集的困难,请将两个数值特征添加到数据集:审阅 的字数 和 平均字长:
from pyspark.sql.functions import udf
from pyspark.sql.types import *
def wordCount(s):
return len(s.split())
def wordLength(s):
import numpy as np
ss = [len(w) for w in s.split()]
return round(float(np.mean(ss)), 2)
wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer
wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline
data = (
Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
.fit(rawData)
.transform(rawData)
.withColumn("label", rawData["rating"] > 3)
.drop("rating")
)
data.show(5)
使用 pyspark 进行分类
若要使用 pyspark
库选择最佳 LogisticRegression 分类器,必须 显式 执行以下步骤:
- 处理特征
- 标记化文本列
- 将标记化列通过哈希转换为向量
- 将数值特征与向量合并
- 为了处理标签列,请将该列转换为正确的类型。
- 在
train
数据集上使用不同的超参数训练多个LogisticRegression算法 - 计算每个已训练模型的 ROC 曲线下面积,并选择在
test
数据集上计算出的指标最高的模型。 - 评估
validation
集上的最佳模型
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler
# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)
# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)
# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
"label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []
# Select the best model
for learner in logisticRegressions:
model = learner.fit(train)
models.append(model)
scoredData = model.transform(test)
metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]
# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))
使用 SynapseML 进行分类
该 synapseml
选项涉及更简单的步骤:
TrainClassifier
估算器对数据进行内部特征化,只要train
、test
、validation
数据集中选择的列表示特征FindBestModel
估算器从已训练的模型池中找到最佳模型。 为此,它会找到在给定指定指标的情况下对数据集执行最佳性能的test
模型ComputeModelStatistics
转换器同时计算评分数据集(在本例中,validation
数据集)上的不同指标
from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel
# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
for lrm in logisticRegressions
]
# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)
# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
"Best model's AUC on validation set = "
+ "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)