在 Salesforce Data Cloud 上运行联合查询

本文介绍如何设置 Lakehouse Federation,以便对不受 Azure Databricks 管理的 Salesforce Data Cloud 数据运行联合查询。 若要详细了解 Lakehouse Federation,请参阅“什么是 Lakehouse Federation?”。

若要使用 Lakehouse Federation 连接到 Salesforce Data Cloud 数据库,必须在 Azure Databricks Unity Catalog 元存储中创建以下内容:

  • 与 Salesforce Data Cloud 数据库的连接
  • 在 Unity Catalog 中镜像您的 Salesforce Data Cloud 数据库的一个外来目录,以便您可以使用 Unity Catalog 查询语法和数据治理工具来管理 Azure Databricks 用户对数据库的访问权限。

我应使用哪个 Salesforce 连接器?

Databricks 为 Salesforce 提供了多个连接器。 有两个零复制连接器:Salesforce Data Cloud 文件共享连接器和 Salesforce Data Cloud 查询联合连接器。 通过它们,可以在 Salesforce Data Cloud 中查询数据,而无需移动数据。 还有一个 Salesforce 引入连接器,用于从各种 Salesforce 产品(包括 Salesforce Data Cloud 和 Salesforce Sales Cloud)复制数据。

下表总结了 Databricks 中 Salesforce 连接器之间的差异:

连接器 用例 支持的 Salesforce 产品
Salesforce Data Cloud 文件共享 在 Lakehouse Federation 中使用 Salesforce Data Cloud 文件共享连接器时,Databricks 会调用 Salesforce Data-as-Service (DaaS) API 来直接读取基础云对象存储位置中的数据。 查询在 Databricks 计算上运行,而无需使用 JDBC 协议。
与查询联合相比,文件共享非常适合联合大量数据。 它提高了从多个数据源读取文件的性能,以及更好的下推功能。 请参阅 Lakehouse Federation for Salesforce Data Cloud File Sharing
Salesforce 数据云
Salesforce Data Cloud 查询联合身份验证 在 Lakehouse Federation 中使用 Salesforce Data Cloud 查询联合连接器时,Databricks 使用 JDBC 连接到源数据并将查询向下推送到 Salesforce。 请参阅 在 Salesforce Data Cloud 上运行联合查询 Salesforce 数据云
Salesforce 数据导入 使用 Lakeflow Connect 中的 Salesforce 引入连接器,可以从 Salesforce Platform 数据(包括 Salesforce Data Cloud 和 Salesforce Sales Cloud 中的数据)创建完全托管的引入管道。 此连接器不仅利用 CDP 数据,而且还利用数据智能平台中的 CRM 数据来最大化价值。 请参阅从 Salesforce 引入数据 Salesforce Data Cloud、Salesforce Sales Cloud 等。 有关受支持的 Salesforce 产品的综合列表,请参阅 Salesforce 引入连接器支持哪些 Salesforce 产品?

开始之前

工作区要求:

  • 启用了 Unity Catalog 的工作区。

计算要求:

  • 从计算资源到目标数据库系统的网络连接。 请参阅 Lakehouse Federation 网络建议
  • Azure Databricks 计算必须使用 Databricks Runtime 15.2 或更高版本以及 标准专用 访问模式。
  • SQL 仓库必须是专业或无服务器,并且必须使用 2024.30 或更高版本。

所需的权限:

  • 若要创建连接,你必须是元存储管理员或对附加到工作区的 Unity Catalog 元存储具有 CREATE CONNECTION 权限的用户。
  • 若要创建外部目录,你必须对元存储具有 CREATE CATALOG 权限,并且是连接的所有者或对连接具有 CREATE FOREIGN CATALOG 特权。

后面每个基于任务的部分都指定了其他权限要求。

创建与 Salesforce 连接的应用程序

Salesforce 连接的应用允许外部应用程序使用 API 和标准协议与 Salesforce 集成。 本部分介绍如何使用 SSO 创建连接的应用,以允许 Databricks 使用 Salesforce 进行身份验证。

注意

有关更详细的说明,请参阅 Salesforce Data Cloud 文档中的创建连接应用

若要创建与 Salesforce 连接的应用,请执行以下操作:

  1. 在 Data Cloud 的右上角,单击“设置”
  2. 在“平台工具”下,单击“应用”>“应用管理器”
  3. 单击“新建连接的应用”
  4. 输入姓名和联系人电子邮件地址。
  5. 启用 OAuth 设置:
    1. 输入回调 URL,格式如下:https://<databricks_instance_url>/login/oauth/salesforce.html。 例如:https://cust-success.cloud.databricks.com/login/oauth/salesforce.html
    2. (可选)如果计划在下一步中使用 SQL 创建 Azure Databricks 连接和外部目录,则 Salesforce Connected 应用程序还需要支持重定向 URI https://login.salesforce.com/services/oauth2/success。 如果计划使用目录资源管理器创建 Azure Databricks 连接和外部目录,则无需如此。 Databricks 建议使用目录资源管理器,因为它需要的手动步骤比其他方法更少。
    3. 添加以下Scopes
      • 访问所有 Data Cloud API 资源 (cdp_api)
      • 通过 API 管理用户数据 (api)
      • 对 Data Cloud 数据执行 ANSI SQL 查询 (cdp_query_api)
      • 随时执行请求(refresh_token、offline_access
    4. 单击“ 保存”。
    5. 单击“继续” 。
  6. 在“连接的应用程序概述”页面上,单击“管理使用者详细信息”。 系统将提示你进行身份验证。
  7. 身份验证成功后,页面会显示使用者密钥和使用者机密。 保存这些值。 创建 Azure Databricks 连接时需要使用。

创建 Azure Databricks 连接

连接指定用于访问外部数据库系统的路径和凭据。 若要创建连接,可以使用目录资源管理器,或者使用 Azure Databricks 笔记本或 Databricks SQL 查询编辑器中的 CREATE CONNECTION SQL 命令。

注意

你还可以使用 Databricks REST API 或 Databricks CLI 来创建连接。 请参阅 POST /api/2.1/unity-catalog/connectionsUnity Catalog 命令

所需的权限:具有 CREATE CONNECTION 特权的元存储管理员或用户。

目录资源管理器

  1. 在 Azure Databricks 工作区中,单击 “数据”图标。目录

  2. 在“目录”窗格顶部,单击“添加”或“加号”图标“添加”图标,然后从菜单中选择“添加连接”

    或者,在“快速访问”页中,单击“外部数据 >”按钮,转到“连接”选项卡,然后单击“创建连接

  3. 在“设置连接”向导的“连接基本信息”页面上,输入用户友好的连接名称

  4. 选择 Salesforce Data Cloud 的连接类型

  5. (可选)添加注释。

  6. 身份验证 页上,输入 Salesforce Data Cloud 的以下连接属性:

    • (可选)选择“是沙盒”
    • 客户端 ID:Salesforce 连接的应用使用者密钥。
    • 客户端密码:Salesforce 连接的应用使用者机密。
    • 客户端范围cdp_api api cdp_query_api refresh_token offline_access
  7. 单击“使用 Salesforce 登录”

  8. (OAuth) 系统会提示使用 SSO 凭据登录到 Salesforce Data Cloud。

  9. 成功登录后,将定向回 Databricks“设置连接”向导。 已将“使用 Salesforce 登录”按钮替换为 Successfully authorized 消息。

  10. 单击“创建连接”。

  11. 目录概览 页上,输入外部目录的名称。 外部目录镜像外部数据系统中的数据库,以便可以使用 Azure Databricks 和 Unity Catalog 查询和管理对该数据库中数据的访问。

  12. 输入 Salesforce 数据空间。

  13. (可选)单击 测试连接 以确认它是否正常工作。

  14. 单击 创建目录

  15. 在“访问权限”页上,选择用户可以在其中访问所创建的目录的工作区。 您可以选择所有工作区均具有访问权限,或单击分配到工作区,选择工作区,然后单击分配

  16. 更改将能够管理对目录中所有对象的访问的“所有者”。 开始在文本框中键入主体,然后单击返回的结果中的主体。

  17. 授予对目录的“权限”。 单击“授予”

    1. 指定 主体 谁有权访问目录中的对象。 开始在文本框中键入主体,然后单击返回的结果中的主体。
    2. 选择要授予每个主体的“特权预设”。 默认情况下,向所有帐户用户授予 BROWSE
      • 从下拉菜单中选择 数据读取器,以授予对目录中对象的 read 特权。
      • 从下拉菜单中选择 数据编辑器,以授予对目录中对象的 readmodify 权限。
      • 手动选择要授予的权限。
    3. 单击授权
  18. 单击“下一步”。

  19. 在“元数据”页上,指定标记键值对。 有关详细信息,请参阅 将标记应用于 Unity 目录安全对象

  20. (可选)添加注释。

  21. 单击“ 保存”。

SQL

Databricks 建议使用目录资源管理器创建连接和外部目录,因为它需要的手动步骤比其他方法更少。

如果计划使用 SQL 创建 Azure Databricks 连接和外部目录,则 Salesforce Connected 应用程序需要支持重定向 URI https://login.salesforce.com/services/oauth2/success。 如果使用目录资源管理器,则无需如此。

  1. 生成 PKCE 代码验证器和代码质询代码。 可以使用联机工具(例如 https://tonyxu-io.github.io/pkce-generator/)或运行以下 Python 脚本执行此操作:

    %python
    
    import base64
    import re
    import os
    import hashlib
    
    code_verifier = base64.urlsafe_b64encode(os.urandom(40)).decode('utf-8')
    code_verifier = re.sub('[^a-zA-Z0-9]+', '', code_verifier)
    
    code_challenge = hashlib.sha256(code_verifier.encode('utf-8')).digest()
    code_challenge = base64.urlsafe_b64encode(code_challenge).decode('utf-8')
    code_challenge = code_challenge.replace('=', '')
    print(f"pkce_verifier  = \"{code_verifier}\"")
    print(f"code_challenge = \"{code_challenge}\"")
    
  2. 访问以下 URL 并使用 Salesforce 凭据进行身份验证,以获取 authorization_code(用你的参数替换 <client_id><code_challenge>)。

    https://login.salesforce.com/services/oauth2/authorize
    ?client_id=<client_id>
    &redirect_uri=https://login.salesforce.com/services/oauth2/success
    &response_type=code
    &code_challenge=<code_challenge>
    

    URL 编码的授权代码在重定向 URL 中可见。

  3. 在笔记本或 Databricks SQL 查询编辑器中运行以下命令:

    CREATE CONNECTION '<Connection name>' TYPE salesforce_data_cloud
    OPTIONS (
      client_id '<Consumer key from Salesforce Connected App>',
      client_secret '<Consumer secret from Salesforce Connected App>',
      pkce_verifier '<pkce_verifier from the last step>',
      authorization_code '<URL decoded `authorization_code`, should end with == instead of %3D%3D>',
      oauth_redirect_uri "https://login.salesforce.com/services/oauth2/success",
      oauth_scope "cdp_api api cdp_query_api refresh_token offline access",
      is_sandbox "false"
      );
    

    Databricks 建议对凭据等敏感值使用 Azure Databricks 机密而不是纯文本字符串。 例如:

    CREATE CONNECTION '<Connection name>' TYPE salesforce_data_cloud
    OPTIONS (
      client_id secret ('<Secret scope>','<Secret key client id>'),
      client_secret secret ('<Secret scope>','<Secret key client secret>'),
      pkce_verifier '<pkce_verifier from the last step>',
      authorization_code '<URL decoded `authorization_code`, should end with == instead of %3D%3D>',
      oauth_redirect_uri "https://login.salesforce.com/services/oauth2/success",
      oauth_scope "cdp_api api cdp_query_api refresh_token offline access",
      is_sandbox "false"
      );
    

    有关设置机密的详细信息,请参阅机密管理

创建外部目录

注意

如果使用 UI 创建与数据源的连接,则包含外部目录创建,你可以跳过此步骤。

外部目录镜像外部数据系统中的数据库,以便可以使用 Azure Databricks 和 Unity Catalog 查询和管理对该数据库中数据的访问。 若要创建外部目录,请使用与已定义的数据源的连接。

要创建外部目录,可以使用目录资源管理器,或在 Azure Databricks 笔记本或 SQL 查询编辑器中使用 CREATE FOREIGN CATALOG SQL 命令。 你还可以使用 Databricks REST API 或 Databricks CLI 来创建目录。 请参阅 POST /api/2.1/unity-catalog/catalogsUnity Catalog 命令

所需的权限:对元存储的 CREATE CATALOG 权限以及连接的所有权或对连接的 CREATE FOREIGN CATALOG 特权。

目录资源管理器

  1. 在 Azure Databricks 工作区中,单击 “数据”图标以打开目录资源管理器
  2. 单击右上角的“创建目录”。
  3. 输入 Salesforce Data Cloud 目录的以下属性。
    • 目录名称:目录的用户友好名称。
    • 类型Foreign
    • 连接名称:将用于创建目录的连接的名称。
    • 数据空间:Salesforce 数据空间。
  4. 单击“创建”。

SQL

在笔记本或 SQL 查询编辑器中运行以下 SQL 命令。 括号中的项是可选的。

CREATE FOREIGN CATALOG [IF NOT EXISTS] '<catalog-name>' USING CONNECTION '<connection-name>'
OPTIONS (dataspace '<dataspace>');

请替换以下值:

  • <catalog-name>:
  • <connection-name>:
  • <dataspace>:Salesforce 数据空间。 例如 default

支持的下推

支持以下下推:

  • 筛选器
  • 预测
  • 限制
  • 聚合
  • 抵消
  • 强制转换
  • Contains、Startswith、Endswith

数据类型映射

从 Salesforce Data Cloud 读取数据到 Spark 时,数据类型的映射关系如下:

Salesforce Data Cloud 类型 Spark 类型
布尔值 BooleanType
日期 日期类型
日期/时间 时间戳类型
电子邮件、电话、文本、URL 字符串类型
数字、百分比 DecimalType(38,18)

限制

  • 每个 Databricks 目录仅支持一个 Salesforce 数据空间。
  • 在 Databricks Runtime 16.1 及更早版本中,不支持区分大小写的表和架构名称。 这包括 Salesforce Data Cloud 上的名称(以大写字母表示)。 例如, MyTable 不支持。