教程第 1 部分:使用 Apache Spark 将数据引入到 Microsoft Fabric lakehouse

本教程以 delta lake 格式将数据引入 Fabric 湖屋。 我们在此处定义一些重要术语:

  • 湖屋 - 湖屋是文件、文件夹、和/或表的集合,充当数据湖的数据库。 Spark 引擎和 SQL 引擎使用 Lakehouse 资源来处理大数据。 使用开源 Delta 格式表时,该处理包括增强的 ACID 事务功能。

  • Delta Lake - Delta Lake 是一个开源存储层,用于将 ACID 事务、可缩放的元数据管理和批处理和流式数据处理引入 Apache Spark。 作为数据表格式,Delta Lake 使用基于文件的事务日志扩展 Parquet 数据文件,以便进行 ACID 事务和可缩放的元数据管理。

  • Azure 开放数据集 是特选的公共数据集,用于向机器学习解决方案添加特定于方案的功能。 这会导致更准确的模型。 开放数据集是驻留在 Microsoft Azure 存储上的云资源。 Apache Spark、REST API、数据工厂和其他工具可以访问开放数据集。

在本教程中,你将使用 Apache Spark 来:

  • 从 Azure 开放数据集容器读取数据。
  • 将数据写入 Fabric 湖屋 Delta 表。

先决条件

  • 将湖屋添加到此笔记本。 在本教程中,首先从公共 Blob 下载数据。 然后,数据存储在该湖屋资源中。

在笔记本中继续操作

本教程随附了 1-ingest-data.ipynb 笔记本。

银行流失数据

数据集包含 10,000 个客户的流失状态信息。 它还包括可能影响变动率的属性 , 例如:

  • 信用评分
  • 地理位置(德国、法国、西班牙)
  • 性别(男性、女性)
  • 年龄
  • 任期(客户是该银行的客户年数)
  • 帐户余额
  • 估计工资
  • 客户通过银行购买的产品数
  • 信用卡状态(客户是否有信用卡)
  • 活动成员状态(客户是否具有活动银行客户状态)

该数据集还包括以下列:

  • 行号
  • 客户 ID
  • 客户姓氏

这些列不应影响客户离开银行的决定。

客户银行账户的关闭定义了该客户的流失。 数据集 exited 列是指客户的放弃。 关于这些属性的相关信息非常少,因此你必须在没有数据集背景信息的情况下继续进行。 我们的目标是了解这些属性如何为 exited 状态做出贡献。

示例数据集行:

“CustomerID” "Surname" “CreditScore” 地理 性别 年龄 “任期” “平衡” “产品数量” “HasCrCard” "IsActiveMember" "EstimatedSalary" "Exited"
15634602 哈格雷夫 619 法国 女性 42 2 0.00 1 1 1 101348.88 1
15647311 Hill 608 西班牙 女性 41 1 83807.86 1 0 1 112542.58 0

下载数据集并上传到 Lakehouse

提示

定义以下参数时,可以轻松地将此笔记本用于不同的数据集:

IS_CUSTOM_DATA = False  # if TRUE, dataset has to be uploaded manually

DATA_ROOT = "/lakehouse/default"
DATA_FOLDER = "Files/churn"  # folder with data files
DATA_FILE = "churn.csv"  # data file name

以下代码片段下载数据集的公开可用版本,然后将该资源存储在 Fabric Lakehouse 中:

重要

在运行笔记本之前,请确保向其添加湖屋。 否则会导致错误。

import os, requests
if not IS_CUSTOM_DATA:
# Download demo data files into lakehouse if not exist
    remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/bankcustomerchurn"
    file_list = [DATA_FILE]
    download_path = f"{DATA_ROOT}/{DATA_FOLDER}/raw"

    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError(
            "Default lakehouse not found, please add a lakehouse and restart the session."
        )
    os.makedirs(download_path, exist_ok=True)
    for fname in file_list:
        if not os.path.exists(f"{download_path}/{fname}"):
            r = requests.get(f"{remote_url}/{fname}", timeout=30)
            with open(f"{download_path}/{fname}", "wb") as f:
                f.write(r.content)
    print("Downloaded demo data files into lakehouse.")

使用刚刚引入的数据: