使用 sparklyr

R 语言 sparklyr 资源充当 Apache Spark 的接口。 sparklr 资源提供一种机制,用于与熟悉的 R 接口的 Spark 交互。 通过 Spark 批处理作业定义或交互式 Microsoft Fabric 笔记本使用 sparklyr。

sparklyr 与其他 tidyverse 包一起使用 - 例如 dplyr。 Microsoft Fabric 在每个运行时版本中分发 sparklyr 和 tidyverse 的最新稳定版本。 可以导入这些资源并开始使用 API。

先决条件

  • 打开或创建笔记本。 若要了解如何操作,请参阅 如何使用 Microsoft Fabric 笔记本

  • 将语言选项设置为 SparkR (R) 以更改主要语言。

  • 将笔记本附加到湖屋。 在左侧,选择 添加 以添加现有湖屋或创建新的湖屋。

将 sparklyr 连接到 Synapse Spark 群集

spark_connect() 函数连接方法可建立连接 sparklyr 连接。 该函数生成一个名为 synapse的新连接方法,该方法连接到现有的 Spark 会话。 这大大减少了 sparklyr 会话开始时间。 此连接方法在 开源 sparklyr 项目中可用。 使用method = "synapse"时,可以在同一会话中同时使用这两个sparklyrSparkR会话,并轻松地在它们之间共享数据。 以下笔记本单元代码示例使用函数 spark_connect()

# connect sparklyr to your spark cluster
spark_version <- sparkR.version()
config <- spark_config()
sc <- spark_connect(master = "yarn", version = spark_version, spark_home = "/opt/spark", method = "synapse", config = config)

使用 sparklyr 读取数据

新的 Spark 会话不包含任何数据。 然后,必须将数据加载到 Spark 会话的内存中,或将 Spark 指向数据的位置,以便会话可以按需访问数据。

# load the sparklyr package
library(sparklyr)

# copy data from R environment to the Spark session's memory
mtcars_tbl <- copy_to(sc, mtcars, "spark_mtcars", overwrite = TRUE)

head(mtcars_tbl)

使用 sparklyr,还可以使用 ABFS 路径值从湖屋文件获取 writeread 数据。 要读取和写入湖屋,请先将湖屋添加到会话。 在笔记本左侧,选择“添加”以添加现有的湖屋。 此外,还可以创建湖屋。

若要查找 ABFS 路径,请右键单击 Lakehouse 中的 “文件” 文件夹,然后选择“ 复制 ABFS 路径”。 在以下代码示例中粘贴要替换 abfss://xxxx@onelake.dfs.fabric.microsoft.com/xxxx/Files 的路径:

temp_csv = "abfss://xxxx@onelake.dfs.fabric.microsoft.com/xxxx/Files/data/mtcars.csv"

# write the table to your lakehouse using the ABFS path
spark_write_csv(mtcars_tbl, temp_csv, header = TRUE, mode = 'overwrite')

# read the data as CSV from lakehouse using the ABFS path
mtcarsDF <- spark_read_csv(sc, temp_csv) 
head(mtcarsDF)

使用 sparklyr 操作数据

sparklyr 提供了处理 Spark 中的数据的不同方法,包括:

  • dplyr 命令
  • SparkSQL
  • Spark 的功能转换器

使用 dplyr

可以使用熟悉的 dplyr 命令在 Spark 中准备数据。 这些命令在 Spark 中运行,防止 R 和 Spark 之间进行不必要的数据传输。

# count cars by the number of cylinders the engine contains (cyl), order the results descendingly
library(dplyr)

cargroup <- group_by(mtcars_tbl, cyl) %>%
  count() %>%
  arrange(desc(n))

cargroup

使用 dplyr 操作数据资源提供了有关将 dplyr 与 Spark 配合使用的详细信息。 sparklyr 并将 dplyr R 命令转换为 Spark SQL。 使用 show_query() 来展示生成的查询。

# show the dplyr commands that are to run against the Spark connection
dplyr::show_query(cargroup)

使用 SQL

还可以直接对 Spark 群集中的表执行 SQL 查询。 spark_connection() 对象实现适用于 Spark 的 DBI 接口,因此可以使用 dbGetQuery() 执行 SQL 并将结果作为 R 数据帧返回:

library(DBI)
dbGetQuery(sc, "select cyl, count(*) as n from spark_mtcars
GROUP BY cyl
ORDER BY n DESC")

使用功能转换器

上述两种方法都依赖于 SQL 语句。 Spark 提供命令,使某些数据转换更加方便,而无需使用 SQL。 例如, ft_binarizer() 该命令简化了新列的创建过程,该列指示另一列中的值是否超过特定阈值:

mtcars_tbl %>% 
  ft_binarizer("mpg", "over_20", threshold = 20) %>% 
  select(mpg, over_20) %>% 
  head(5)

Reference -FT 资源提供了 Spark 功能转换器的完整列表,可通过该sparklyr列表获取。

sparklyrSparkR 之间共享数据

使用 method = "synapse"sparklyr 连接到 synapse spark 群集时,sparklyrSparkR 均可用于同一会话,并且可在它们之间轻松共享数据。 可以在 sparklyr 中创建 spark 表,并从 SparkR 读取它:

# load the sparklyr package
library(sparklyr)

# Create table in `sparklyr`
mtcars_sparklyr <- copy_to(sc, df = mtcars, name = "mtcars_tbl", overwrite = TRUE, repartition = 3L)

# Read table from `SparkR`
mtcars_sparklr <- SparkR::sql("select cyl, count(*) as n
from mtcars_tbl
GROUP BY cyl
ORDER BY n DESC")

head(mtcars_sparklr)

机器学习

以下示例使用 ml_linear_regression() 来拟合线性回归模型。 该模型使用内置mtcars数据集来尝试根据汽车的重量(mpg)和汽车发动机的缸数()来预测汽车的油耗(wtcyl)。 此处的所有情况都假定 mpg 与我们每个特征之间存在线性关系。

生成测试和训练数据集

使用拆分,其中 70% 用于训练,30% 用于测试模型。 此比率的更改会导致不同的模型:

# split the dataframe into test and training dataframes

partitions <- mtcars_tbl %>%
  select(mpg, wt, cyl) %>% 
  sdf_random_split(training = 0.7, test = 0.3, seed = 2023)

训练模型

训练逻辑回归模型。

fit <- partitions$training %>%
  ml_linear_regression(mpg ~ .)

fit

使用summary() 了解我们的模型质量以及每个预测因子的统计学意义。

summary(fit)

使用模型

调用 ml_predict() 以将模型应用于测试数据集:

pred <- ml_predict(fit, partitions$test)

head(pred)

有关通过 sparklyr 提供的 Spark ML 模型列表,请访问 参考 - ML

断开与 Spark 群集的连接

呼叫 spark_disconnect(),或选择笔记本功能区顶部的 “停止会话 ”按钮,结束 Spark 会话:

spark_disconnect(sc)

详细了解 R 功能: