R 语言 sparklyr 资源充当 Apache Spark 的接口。 sparklr 资源提供一种机制,用于与熟悉的 R 接口的 Spark 交互。 通过 Spark 批处理作业定义或交互式 Microsoft Fabric 笔记本使用 sparklyr。
sparklyr
与其他 tidyverse 包一起使用 - 例如 dplyr。 Microsoft Fabric 在每个运行时版本中分发 sparklyr 和 tidyverse 的最新稳定版本。 可以导入这些资源并开始使用 API。
先决条件
获取 Microsoft Fabric 订阅。 或者,注册免费的 Microsoft Fabric 试用版。
登录到 Microsoft Fabric。
使用主页左下侧的体验切换器切换到 Fabric。
打开或创建笔记本。 若要了解如何操作,请参阅 如何使用 Microsoft Fabric 笔记本。
将语言选项设置为 SparkR (R) 以更改主要语言。
将笔记本附加到湖屋。 在左侧,选择 添加 以添加现有湖屋或创建新的湖屋。
将 sparklyr 连接到 Synapse Spark 群集
spark_connect()
函数连接方法可建立连接 sparklyr
连接。 该函数生成一个名为 synapse
的新连接方法,该方法连接到现有的 Spark 会话。 这大大减少了 sparklyr
会话开始时间。 此连接方法在 开源 sparklyr 项目中可用。 使用method = "synapse"
时,可以在同一会话中同时使用这两个sparklyr
SparkR
会话,并轻松地在它们之间共享数据。 以下笔记本单元代码示例使用函数 spark_connect()
:
# connect sparklyr to your spark cluster
spark_version <- sparkR.version()
config <- spark_config()
sc <- spark_connect(master = "yarn", version = spark_version, spark_home = "/opt/spark", method = "synapse", config = config)
使用 sparklyr 读取数据
新的 Spark 会话不包含任何数据。 然后,必须将数据加载到 Spark 会话的内存中,或将 Spark 指向数据的位置,以便会话可以按需访问数据。
# load the sparklyr package
library(sparklyr)
# copy data from R environment to the Spark session's memory
mtcars_tbl <- copy_to(sc, mtcars, "spark_mtcars", overwrite = TRUE)
head(mtcars_tbl)
使用 sparklyr
,还可以使用 ABFS 路径值从湖屋文件获取 write
和 read
数据。 要读取和写入湖屋,请先将湖屋添加到会话。 在笔记本左侧,选择“添加”以添加现有的湖屋。 此外,还可以创建湖屋。
若要查找 ABFS 路径,请右键单击 Lakehouse 中的 “文件” 文件夹,然后选择“ 复制 ABFS 路径”。 在以下代码示例中粘贴要替换 abfss://xxxx@onelake.dfs.fabric.microsoft.com/xxxx/Files
的路径:
temp_csv = "abfss://xxxx@onelake.dfs.fabric.microsoft.com/xxxx/Files/data/mtcars.csv"
# write the table to your lakehouse using the ABFS path
spark_write_csv(mtcars_tbl, temp_csv, header = TRUE, mode = 'overwrite')
# read the data as CSV from lakehouse using the ABFS path
mtcarsDF <- spark_read_csv(sc, temp_csv)
head(mtcarsDF)
使用 sparklyr 操作数据
sparklyr
提供了处理 Spark 中的数据的不同方法,包括:
dplyr
命令- SparkSQL
- Spark 的功能转换器
使用 dplyr
可以使用熟悉的 dplyr
命令在 Spark 中准备数据。 这些命令在 Spark 中运行,防止 R 和 Spark 之间进行不必要的数据传输。
# count cars by the number of cylinders the engine contains (cyl), order the results descendingly
library(dplyr)
cargroup <- group_by(mtcars_tbl, cyl) %>%
count() %>%
arrange(desc(n))
cargroup
使用 dplyr
操作数据资源提供了有关将 dplyr 与 Spark 配合使用的详细信息。 sparklyr
并将 dplyr
R 命令转换为 Spark SQL。 使用 show_query()
来展示生成的查询。
# show the dplyr commands that are to run against the Spark connection
dplyr::show_query(cargroup)
使用 SQL
还可以直接对 Spark 群集中的表执行 SQL 查询。 spark_connection()
对象实现适用于 Spark 的 DBI 接口,因此可以使用 dbGetQuery()
执行 SQL 并将结果作为 R 数据帧返回:
library(DBI)
dbGetQuery(sc, "select cyl, count(*) as n from spark_mtcars
GROUP BY cyl
ORDER BY n DESC")
使用功能转换器
上述两种方法都依赖于 SQL 语句。 Spark 提供命令,使某些数据转换更加方便,而无需使用 SQL。 例如, ft_binarizer()
该命令简化了新列的创建过程,该列指示另一列中的值是否超过特定阈值:
mtcars_tbl %>%
ft_binarizer("mpg", "over_20", threshold = 20) %>%
select(mpg, over_20) %>%
head(5)
Reference -FT 资源提供了 Spark 功能转换器的完整列表,可通过该sparklyr
列表获取。
在 sparklyr
和 SparkR
之间共享数据
使用 method = "synapse"
将 sparklyr
连接到 synapse spark 群集时,sparklyr
和 SparkR
均可用于同一会话,并且可在它们之间轻松共享数据。 可以在 sparklyr
中创建 spark 表,并从 SparkR
读取它:
# load the sparklyr package
library(sparklyr)
# Create table in `sparklyr`
mtcars_sparklyr <- copy_to(sc, df = mtcars, name = "mtcars_tbl", overwrite = TRUE, repartition = 3L)
# Read table from `SparkR`
mtcars_sparklr <- SparkR::sql("select cyl, count(*) as n
from mtcars_tbl
GROUP BY cyl
ORDER BY n DESC")
head(mtcars_sparklr)
机器学习
以下示例使用 ml_linear_regression()
来拟合线性回归模型。 该模型使用内置mtcars
数据集来尝试根据汽车的重量(mpg
)和汽车发动机的缸数()来预测汽车的油耗(wt
cyl
)。 此处的所有情况都假定 mpg
与我们每个特征之间存在线性关系。
生成测试和训练数据集
使用拆分,其中 70% 用于训练,30% 用于测试模型。 此比率的更改会导致不同的模型:
# split the dataframe into test and training dataframes
partitions <- mtcars_tbl %>%
select(mpg, wt, cyl) %>%
sdf_random_split(training = 0.7, test = 0.3, seed = 2023)
训练模型
训练逻辑回归模型。
fit <- partitions$training %>%
ml_linear_regression(mpg ~ .)
fit
使用summary()
了解我们的模型质量以及每个预测因子的统计学意义。
summary(fit)
使用模型
调用 ml_predict()
以将模型应用于测试数据集:
pred <- ml_predict(fit, partitions$test)
head(pred)
有关通过 sparklyr 提供的 Spark ML 模型列表,请访问 参考 - ML 。
断开与 Spark 群集的连接
呼叫 spark_disconnect()
,或选择笔记本功能区顶部的 “停止会话 ”按钮,结束 Spark 会话:
spark_disconnect(sc)
相关内容
详细了解 R 功能: