从 Salesforce 引入数据

本页介绍如何使用 Lakeflow Connect 从 Salesforce 引入数据并将其加载到 Azure Databricks 中。

在您开始之前

若要创建引入管道,必须满足以下要求:

  • 你的工作区必须启用 Unity Catalog。

  • 必须为工作区启用无服务器计算。 请参阅启用无服务器计算

  • 如果计划创建新的连接,必须对元存储拥有 CREATE CONNECTION 特权。

    如果连接器支持基于 UI 的管道创作,可以通过完成此页上的步骤,同时创建连接和管道。 但是,如果使用基于 API 的管道创作,则必须在目录资源管理器中创建连接,然后才能完成此页上的步骤。 请参阅连接到托管的数据引入源

  • 如果计划使用现有连接:您必须在连接对象上具有 USE CONNECTION 特权或 ALL PRIVILEGES

  • 你必须对目标目录拥有 USE CATALOG 特权。

  • 你必须对现有架构拥有USE SCHEMACREATE TABLE特权,或者对目标目录拥有CREATE SCHEMA特权。

若要从 Salesforce 引入,建议执行以下步骤:

  • 创建 Databricks 可用于检索数据的 Salesforce 用户。 确保用户具有 API 访问权限,并有权访问计划引入的所有对象。

创建引入管道

所需权限:连接上的 USE CONNECTIONALL PRIVILEGES

此步骤介绍如何创建引入管道。 每个引入的表都被写入一个流式处理表,该表具有相同的名称,但名称中的所有字母都转换为小写。

Databricks 用户界面

  1. 在 Azure Databricks 工作区的边栏中,单击 “数据引入”。

  2. 在“添加数据”页上的“Databricks 连接器”下,单击“Salesforce”。

    Salesforce 引入向导随即打开。

  3. 在向导的 “管道 ”页上,输入引入管道的唯一名称。

  4. “目标目录 ”下拉列表中,选择一个目录。 引入的数据和事件日志将会写入到此目录。

  5. 选择存储访问 Salesforce 数据所需的凭据的 Unity 目录连接。

    如果没有 Salesforce 连接,请单击“ 创建连接”。 必须对元存储拥有 CREATE CONNECTION 权限。

  6. 单击“ 创建管道”并继续

  7. “源 ”页上,选择要引入的表,然后单击“ 下一步”。

    如果选择 “所有表”,Salesforce 引入连接器会将源架构中的所有现有表和将来表写入目标架构。 每个管道最多有 250 个对象。

  8. “目标 ”页上,选择要写入的 Unity 目录和架构。

    如果不想使用现有架构,请单击“ 创建架构”。 要在父目录上拥有USE CATALOGCREATE SCHEMA权限。

  9. 单击“ 保存管道”并继续

  10. (可选)在 “设置” 页上,单击“ 创建计划”。 设置刷新目标表的频率。

  11. (可选)设置管道操作成功或失败的电子邮件通知。

  12. 单击“ 保存”并运行管道

Databricks 资产捆绑包

此选项卡介绍如何使用 Databricks 资产捆绑包部署引入管道。 捆绑包可以包含作业和任务的 YAML 定义,使用 Databricks CLI 进行管理,并且可以在不同的目标工作区(如开发、过渡和生产)中共享和运行。 有关详细信息,请参阅 Databricks 资产捆绑包

可以在管道定义中使用以下表配置属性,来选择或取消选择需要导入的特定列:

  • include_columns:(可选)指定要添加且用于引入的列的列表。 如果使用此选项显式地添加列,则管道会自动排除将来添加到源的列。 若要引入将来的列,必须将它们添加到列表中。
  • exclude_columns:(可选)指定要从引入中排除的列的列表。 如果使用此选项显式排除某些列,管道会自动包含将来在源中新增的列。 若要引入将来的列,必须将它们添加到列表中。
  1. 使用 Databricks CLI 创建新捆绑包:

    databricks bundle init
    
  2. 将两个新资源文件添加到捆绑包:

    • 管道定义文件(resources/sfdc_pipeline.yml)。
    • 控制数据引入频率(resources/sfdc_job.yml)的工作流文件。

    以下是一个示例 resources/sfdc_pipeline.yml 文件:

    variables:
      dest_catalog:
        default: main
      dest_schema:
        default: ingest_destination_schema
    
    # The main pipeline for sfdc_dab
    resources:
      pipelines:
        pipeline_sfdc:
          name: salesforce_pipeline
          catalog: ${var.dest_catalog}
          schema: ${var.dest_schema}
          ingestion_definition:
            connection_name: <salesforce-connection>
            objects:
              # An array of objects to ingest from Salesforce. This example
              # ingests the AccountShare, AccountPartner, and ApexPage objects.
              - table:
                  source_schema: objects
                  source_table: AccountShare
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
                  table_configuration:
                    include_columns: # This can be exclude_columns instead
                      - <column_a>
                      - <column_b>
                      - <column_c>
              - table:
                  source_schema: objects
                  source_table: AccountPartner
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
              - table:
                  source_schema: objects
                  source_table: ApexPage
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
    

    以下是一个示例 resources/sfdc_job.yml 文件:

    resources:
      jobs:
        sfdc_dab_job:
          name: sfdc_dab_job
    
          trigger:
            # Run this job every day, exactly one day from the last run
            # See https://docs.databricks.com/api/workspace/jobs/create#trigger
            periodic:
              interval: 1
              unit: DAYS
    
          email_notifications:
            on_failure:
              - <email-address>
    
          tasks:
            - task_key: refresh_pipeline
              pipeline_task:
                pipeline_id: ${resources.pipelines.pipeline_sfdc.id}
    
  3. 使用 Databricks CLI 部署管道:

    databricks bundle deploy
    

Databricks 命令行界面 (CLI)

可以在管道定义中使用以下表配置属性,来选择或取消选择需要导入的特定列:

  • include_columns:(可选)指定要添加且用于引入的列的列表。 如果使用此选项显式地添加列,则管道会自动排除将来添加到源的列。 若要引入将来的列,必须将它们添加到列表中。
  • exclude_columns:(可选)指定要从引入中排除的列的列表。 如果使用此选项显式排除某些列,管道会自动包含将来在源中新增的列。 若要引入将来的列,必须将它们添加到列表中。

若要创建管道:

databricks pipelines create --json "<pipeline-definition | json-file-path>"

更新管道:

databricks pipelines update --json "<pipeline-definition | json-file-path>"

若要获取管道定义:

databricks pipelines get "<pipeline-id>"

若要删除管道:

databricks pipelines delete "<pipeline-id>"

若要获得详细信息,可以运行:

databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help

示例 JSON 管道定义

"ingestion_definition": {

     "connection_name": "<connection-name>",

     "objects": [

       {

         "table": {

           "source_schema": "<source-schema>",

           "source_table": "<source-table>",

           "destination_catalog": "<destination-catalog>",

           "destination_schema": "<destination-schema>",

           "table_configuration": {

             "include_columns": ["<column-a>", "<column-b>", "<column-c>"]

           }

         }

       }

     ]

 }

在管道上启动、计划和设置警报

可以在管道详细信息页上为管道创建计划。

  1. 创建管道后,重新访问 Azure Databricks 工作区,然后单击 管道

    新管道将显示在管道列表中。

  2. 若要查看管道详细信息,请单击管道名称。

  3. 在管道详细信息页上,可以通过单击“计划”来计划管道。

  4. 若要在管道上设置通知,请单击 设置,然后添加通知。

对于添加到管道的每个调度,Lakeflow Connect 会自动为其创建任务。 引入管道是作业中的任务。 可以选择将更多任务添加到作业。

注释

管道运行时,可能会看到给定表的两个源视图。 一个视图包含公式字段的快照。 另一个视图包含非公式字段的增量数据拉取。 这些视图在目标表中联接。

示例:将两个 Salesforce 对象引入单独的架构

本部分中的示例管道定义将两个 Salesforce 对象引入单独的架构。 多目标管道支持仅限 API。

resources:
  pipelines:
    pipeline_sfdc:
      name: salesforce_pipeline
      catalog: my_catalog_1 # Location of the pipeline event log
      schema: my_schema_1 # Location of the pipeline event log
      ingestion_definition:
        connection_name: <salesforce-connection>
        objects:
          - table:
              source_schema: objects
              source_table: AccountShare
              destination_catalog: my_catalog_1 # Location of this table
              destination_schema: my_schema_1 # Location of this table
          - table:
              source_schema: objects
              source_table: AccountPartner
              destination_catalog: my_catalog_2 # Location of this table
              destination_schema: my_schema_2 # Location of this table

示例:将一个 Salesforce 对象导入三次

本节中的示例管道定义将 Salesforce 对象引入三个不同的目标表中。 多目标管道支持仅限 API。

可以选择重命名引入的表。 如果重命名管道中的表,它将变为仅限 API 的管道,并且不能再在 UI 中编辑管道。

resources:
  pipelines:
    pipeline_sfdc:
      name: salesforce_pipeline
      catalog: my_catalog_1	# Location of the pipeline event log
      schema: my_schema_1	# Location of the pipeline event log
      ingestion_definition:
        connection_name: <salesforce-connection>
        objects:
          - table:
              source_schema: objects
              source_table: Order
              destination_catalog: my_catalog_1	# Location of first copy
              destination_schema: my_schema_1	# Location of first copy
          - table:
              source_schema: objects
              source_table: Order
              destination_catalog: my_catalog_2	# Location of second copy
              destination_schema: my_schema_2	# Location of second copy
	        - table:
              source_schema: objects
              source_table: Order
              destination_catalog: my_catalog_2	# Location of third copy, renamed
              destination_schema: my_schema_2	# Location of third copy, renamed
              destination_table: order_duplicate # Table rename

其他资源