你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

使用双重写入代理和 Apache Spark 将数据从 Apache Cassandra 实时迁移到 Azure Cosmos DB for Apache Cassandra

Azure Cosmos DB 中 Cassandra 的 API 对于在 Apache Cassandra 上运行的企业工作负荷来说是一个不错的选择,原因有多种:

  • 无管理和监视开销:它不需跨操作系统、Java 虚拟机和 yaml 文件及其交互管理和监视大量设置的开销
  • 节省大量成本: 可以使用 Azure Cosmos DB 节省成本,其中包括虚拟机、带宽和任何适用的许可证的成本。 无需管理数据中心、服务器、SSD 存储、网络和电力成本。
  • 能够使用现有的代码和工具: Azure Cosmos DB 提供的线路协议级别与现有 Cassandra SDK 和工具兼容。 此兼容性确保只需经过细微的更改,就可以将现有代码库用于 Azure Cosmos DB for Apache Cassandra。

Azure Cosmos DB 不支持用于复制的原生 Apache Cassandra Gossip 协议。 如果零停机时间是迁移的要求,则需要采用不同的方法。 本教程介绍如何使用双重写入代理Apache Spark 将数据从原生 Apache Cassandra 群集实时迁移到 Azure Cosmos DB for Apache Cassandra。

下图演示了该模式。 双写入代理用于捕获实时更改。 使用 Apache Spark 批量复制历史数据。 只需进行少量的配置更改甚至不进行任何配置更改,代理就能接受来自应用程序代码的连接。 它将所有请求路由到源数据库,并在进行大容量复制时,将写操作异步路由至 Cassandra 的 API。

演示如何将数据实时迁移到 Azure Managed Instance for Apache Cassandra 的动画。

先决条件

重要

在迁移过程中,如果需要保留 Apache Cassandra writetime,则必须在创建表时设置以下标志:

with cosmosdb_cell_level_timestamp=true and cosmosdb_cell_level_timestamp_tombstones=true and cosmosdb_cell_level_timetolive=true

例如:

CREATE KEYSPACE IF NOT EXISTS migrationkeyspace WITH REPLICATION= {'class': 'org.apache.> cassandra.locator.SimpleStrategy', 'replication_factor' : '1'};
CREATE TABLE IF NOT EXISTS migrationkeyspace.users (
 name text,
 userID int,
 address text,
 phone int,
 PRIMARY KEY ((name), userID)) with cosmosdb_cell_level_timestamp=true and > cosmosdb_cell_level_timestamp_tombstones=true and cosmosdb_cell_level_timetolive=true;

预配 Spark 群集

建议使用 Azure Databricks。 使用支持 Spark 3.0 或更高版本的运行时。

重要

你需要确保 Azure Databricks 帐户与源 Apache Cassandra 群集之间已建立网络连接。 此设置可能需要虚拟网络注入。 有关详细信息,请参阅在 Azure 虚拟网络中部署 Azure Databricks

显示如何查找 Azure Databricks 运行时版本的屏幕截图。

添加 Spark 依赖项

将 Apache Spark Cassandra 连接器库添加到群集,以便连接到原生终结点和 Azure Cosmos DB Cassandra 终结点。 在群集中,选择“库”“安装新库”>“Maven”,然后在 Maven 坐标中添加

重要

如果需要在迁移期间保留每一行的 Apache Cassandra writetime,建议使用此示例。 此示例中的依赖项 JAR 还包含 Spark 连接器,因此应安装此版本,而不是前面所述的连接器程序集。

如果要在历史数据加载完成后在源和目标之间执行行比较验证,此示例也很有用。 有关详细信息,请参阅 运行历史数据加载验证源和目标

显示如何在 Azure Databricks 中搜索 Maven 包的屏幕截图。

选择“安装”,然后在安装完成后重启群集。

注意

安装 Cassandra 连接器库后,请务必重启 Azure Databricks 群集。

安装双重写入代理

为了在双重写入期间获得最佳性能,我们建议在源 Cassandra 群集中的所有节点上安装代理。

#assuming you do not have git already installed
sudo apt-get install git 

#assuming you do not have maven already installed
sudo apt install maven

#clone repo for dual-write proxy
git clone https://github.com/Azure-Samples/cassandra-proxy.git

#change directory
cd cassandra-proxy

#compile the proxy
mvn package

启动双重写入代理

建议在源 Cassandra 群集中的所有节点上安装该代理。 至少,请运行以下命令在每个节点上启动代理。 请将 <target-server> 替换为目标群集中某个节点上的 IP 地址或服务器地址。 请将 <path to JKS file> 替换为本地 jks 文件的路径,将 <keystore password> 替换为相应的密码。

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> --proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password>

以这种方式启动代理的前提是满足以下条件:

  • 源和目标终结点具有相同的用户名和密码。
  • 源和目标终结点实现安全套接字层 (SSL)。

如果源和目标终结点无法满足这些条件,请继续阅读以了解其他配置选项。

配置 SSL

对于 SSL,可以实现现有的密钥存储,例如源群集使用的密钥存储,或者使用 keytool以下命令创建自签名证书:

keytool -genkey -keyalg RSA -alias selfsigned -keystore keystore.jks -storepass password -validity 360 -keysize 2048

如果源或目标终结点不实现 SSL,则你还可为其禁用 SSL。 使用 --disable-source-tls--disable-target-tls 标志:

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> \
  --source-port 9042 --target-port 10350 --proxy-jks-file <path to JKS file> \
  --proxy-jks-password <keystore password> --target-username <username> \
  --target-password <password> --disable-source-tls true  --disable-target-tls true 

注意

确保客户端应用程序在通过代理与数据库建立 SSL 连接时使用的密钥存储和密码与用于双重写入代理的密码相同。

配置凭据和端口

默认情况下,客户端应用会传递源凭据。 代理使用凭据连接到源群集和目标群集。 如前所述,此过程假设源和目标凭据相同。 在启动代理时,必须为 Cassandra 终结点的目标 API 单独指定不同的用户名和密码。

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> \
  --proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password> \
  --target-username <username> --target-password <password>

未指定的默认源和目标端口为 9042。 在这种情况下,Cassandra API 在端口 10350上运行。 使用 --source-port--target-port 指定端口号:

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> \
  --source-port 9042 --target-port 10350 --proxy-jks-file <path to JKS file> \
  --proxy-jks-password <keystore password> --target-username <username> --target-password <password>

远程部署代理

在某些情况下,你不希望在群集节点上自行安装代理。 你可能更喜欢在单独的计算机上安装它。 在这种情况下,请指定以下 IP <source-server>地址:

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar <source-server> <destination-server>

警告

在单独的计算机上安装和运行代理,而不是在源 Apache Cassandra 群集中的所有节点上运行代理会影响实时迁移时的性能。 虽然此配置在功能上有效,但客户端驱动程序无法打开与群集中的所有节点的连接。 客户端依赖于安装代理的单个协调器节点来建立连接。

实现零应用程序代码更改

默认情况下,代理侦听端口 29042。 将应用程序代码更改为指向此端口。 可以改为更改代理侦听的端口。 如果要通过以下方法消除应用程序级代码更改,可以进行此更改:

  • 让源 Cassandra 服务器在其他端口上运行。
  • 让代理在标准 Cassandra 端口 9042 上运行。
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server --proxy-port 9042

注意

在群集节点上安装代理不需要重启节点。 如果有许多应用程序客户端,并且更喜欢在标准 Cassandra 端口 9042 上运行代理,以便消除应用程序级代码更改,请更改 Apache Cassandra 默认端口。 然后,需要重启群集中的节点,并将源端口配置为为源 Cassandra 群集定义的新端口。

在以下示例中,我们将源 Cassandra 群集更改为在端口 3074 上运行,并在端口 9042 上启动群集:

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server \
 --proxy-port 9042 --source-port 3074

强制实施协议

代理提供强制实施协议的功能。如果源终结点比目标更高级或者不受支持,可能需要强制实施协议。 在这种情况下,可以指定 --protocol-version--cql-version 来强制实施协议,以便与目标相符:

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server \
  --protocol-version 4 --cql-version 3.11

运行双写入代理后,需要更改应用程序客户端上的端口并重启。 或者,如果选择此选项,请更改 Cassandra 端口并重启群集。 代理开始将写入转发到目标终结点。 有关信息,请参阅 监视和指标

运行历史数据加载

若要加载数据,请在 Azure Databricks 帐户中创建一个 Scala 笔记本。 将源和目标 Cassandra 配置替换为相应的凭据,并替换源和目标密钥空间和表。 根据需要在以下示例中为每个表添加更多变量,然后运行该示例。 在应用程序开始向双重写入代理发送请求后,你便可以迁移历史数据。

重要

在迁移数据之前,请将容器吞吐量提高到应用程序快速迁移所需的量。 在开始迁移之前缩放吞吐量有助于在更少的时间内迁移数据。 为了帮助防止在历史数据加载期间速率限制,可以在用于 Cassandra 的 API 中启用服务器端重试(SSR)。 有关如何启用 SSR 和详细信息的说明,请参阅防止 Azure Cosmos DB for Apache Cassandra 操作发生速率限制错误

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext

// source cassandra configs
val sourceCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>"
)

//target cassandra configs
val targetCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "10350",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>",
    //throughput related settings below - tweak these depending on data volumes. 
    "spark.cassandra.output.batch.size.rows"-> "1",
    "spark.cassandra.output.concurrent.writes" -> "1000",
    "spark.cassandra.connection.remoteConnectionsPerExecutor" -> "1",
    "spark.cassandra.concurrent.reads" -> "512",
    "spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
    "spark.cassandra.connection.keep_alive_ms" -> "600000000"
)

//set timestamp to ensure it is before read job starts
val timestamp: Long = System.currentTimeMillis / 1000

//Read from source Cassandra
val DFfromSourceCassandra = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(sourceCassandra)
  .load
  
//Write to target Cassandra
DFfromSourceCassandra
  .write
  .format("org.apache.spark.sql.cassandra")
  .options(targetCassandra)
  .option("writetime", timestamp)
  .mode(SaveMode.Append)
  .save

注意

在前面的 Scala 示例中,你会注意到,在读取源表中的所有数据之前,timestamp 正设置为当前时间。 然后 writetime 设置为此回溯时间戳。 此方法可以确保通过加载历史数据而写入到目标终结点的记录,无法覆盖在读取历史数据时从双重写入代理传入的具有更迟时间戳的更新内容。

重要

如果你出于任何原因需要保留确切的时间戳,应采用可保留时间戳的历史数据迁移方法,如此示例所示。 示例中的依赖项 JAR 还包含 Spark 连接器,因此无需安装前面先决条件中提到的 Spark 连接器程序集。 在 Spark 群集中安装这两个组件会导致冲突。

验证源和目标

历史数据加载完成后,数据库应已同步并准备好进行直接转换。 我们建议验证源和目标以确保它们匹配,然后再最终进行直接转换。

注意

如果使用前面提到的 Cassandra 迁移程序示例进行保留writetime,则此示例包含通过基于特定容差比较源和目标中的验证迁移的功能。

后续步骤