Compartir a través de


Migre en directo datos de Apache Cassandra a Azure Cosmos DB for Apache Cassandra mediante el proxy de doble escritura y Apache Spark

LA API para Cassandra en Azure Cosmos DB es una opción excelente para cargas de trabajo empresariales que se ejecutan en Apache Cassandra por varias razones:

  • Sin sobrecarga de administración y supervisión: Elimina la sobrecarga de administrar y supervisar una gran cantidad de configuraciones en sistemas operativos, máquinas virtuales Java y archivos yaml y sus interacciones.
  • Ahorro significativo de costos: Puede ahorrar costos con Azure Cosmos DB, lo que incluye el costo de las máquinas virtuales, el ancho de banda y las licencias aplicables. No es necesario administrar los centros de datos, los servidores, el almacenamiento SSD, las redes y los costos de electricidad.
  • Posibilidad de usar código y herramientas existentes: Azure Cosmos DB proporciona compatibilidad de nivel de protocolo de conexión con SDK y herramientas existentes de Cassandra. Esta compatibilidad garantiza que pueda usar el código base existente con Azure Cosmos DB for Apache Cassandra con cambios triviales.

Azure Cosmos DB no admite el protocolo de gossip de Apache Cassandra nativo para la replicación. Cuando cero tiempo de inactividad es un requisito para la migración, es necesario un enfoque diferente. En este tutorial se describe cómo migrar en directo datos a Azure Cosmos DB for Apache Cassandra desde un clúster nativo de Apache Cassandra mediante un proxy de doble escritura y Apache Spark.

En la siguiente imagen, se ilustra este patrón. El proxy de doble escritura se usa para capturar los cambios en directo. Los datos históricos se copian de forma masiva mediante Apache Spark. El proxy puede aceptar conexiones desde el código de las aplicaciones con pocos o ningún cambio de configuración. Enruta todas las solicitudes a la base de datos de origen y redirige de forma asincrónica las escrituras a la API de Cassandra mientras se realiza la copia masiva.

Animación que muestra la migración de datos en vivo a Azure Managed Instance for Apache Cassandra.

Prerrequisitos

Importante

Si tiene un requisito para conservar Apache Cassandra writetime durante la migración, se deben establecer las siguientes marcas al crear tablas:

with cosmosdb_cell_level_timestamp=true and cosmosdb_cell_level_timestamp_tombstones=true and cosmosdb_cell_level_timetolive=true

Por ejemplo:

CREATE KEYSPACE IF NOT EXISTS migrationkeyspace WITH REPLICATION= {'class': 'org.apache.> cassandra.locator.SimpleStrategy', 'replication_factor' : '1'};
CREATE TABLE IF NOT EXISTS migrationkeyspace.users (
 name text,
 userID int,
 address text,
 phone int,
 PRIMARY KEY ((name), userID)) with cosmosdb_cell_level_timestamp=true and > cosmosdb_cell_level_timestamp_tombstones=true and cosmosdb_cell_level_timetolive=true;

Aprovisionamiento de un clúster de Spark

Se recomienda usar Azure Databricks. Use un runtime compatible con Spark 3.0 o posterior.

Importante

Debe asegurarse de que su cuenta de Azure Databricks tiene conectividad de red con el clúster Apache Cassandra de origen. Esta configuración puede requerir la inserción de red virtual. Para más información, consulte Implementación de Azure Databricks en la red virtual de Azure.

Captura de pantalla que muestra la búsqueda de la versión de Azure Databricks Runtime.

Adición de dependencias de Spark

Agregue la biblioteca de conectores de Cassandra de Apache Spark a su clúster para conectarse a los puntos de conexión nativos y de Cassandra de Azure Cosmos DB. En el clúster, seleccione Libraries>Install New>Maven (Bibliotecas > Instalar nueva > Maven) y, después, agregue com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0 en las coordenadas de Maven.

Importante

Si tiene el requisito de conservar Apache Cassandra writetime para cada fila durante la migración, le recomendamos que utilice esta muestra. El archivo JAR de dependencia de este ejemplo también contiene el conector de Spark, por lo que debe instalar esta versión en lugar del paquete del conector descrito anteriormente.

Este ejemplo también es útil si desea realizar una validación de comparación de filas entre el origen y el destino después de completar la carga de datos histórica. Para obtener más información, consulte Ejecución de la carga de datos históricos y Validación del origen y el destino.

Captura de pantalla que muestra la búsqueda de paquetes Maven en Azure Databricks.

Seleccione Install (Instalar) y asegúrese de reiniciar el clúster cuando se complete la instalación.

Nota

Asegúrese de reiniciar el clúster de Azure Databricks después de instalar la biblioteca del conector de Cassandra.

Instalación del proxy de doble escritura

Para obtener un rendimiento óptimo durante las escrituras duales, se recomienda instalar el proxy en todos los nodos del clúster de Cassandra de origen.

#assuming you do not have git already installed
sudo apt-get install git 

#assuming you do not have maven already installed
sudo apt install maven

#clone repo for dual-write proxy
git clone https://github.com/Azure-Samples/cassandra-proxy.git

#change directory
cd cassandra-proxy

#compile the proxy
mvn package

Inicio del proxy de doble escritura

Se recomienda instalar el proxy en todos los nodos del clúster de Cassandra de origen. Como mínimo, ejecute el siguiente comando para iniciar el proxy en cada nodo. Reemplace <target-server> por una dirección IP o de servidor de uno de los nodos del clúster de destino. Reemplace <path to JKS file> por la ruta de acceso a un archivo .jks local, y <keystore password>, por la contraseña correspondiente.

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> --proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password>

Al iniciar el proxy de esta manera, se da por hecho que se cumple lo siguiente:

  • Los puntos de conexión de origen y destino tienen el mismo nombre de usuario y la misma contraseña.
  • Los puntos de conexión de origen y destino implementan el protocolo SSL (Capa de sockets seguros).

Si los puntos de conexión de origen y destino no pueden cumplir estos criterios, siga leyendo para conocer más opciones de configuración.

Configuración de SSL

Para SSL, puede implementar un almacén de claves existente, por ejemplo, el que usa el clúster de origen o crear un certificado autofirmado mediante keytool:

keytool -genkey -keyalg RSA -alias selfsigned -keystore keystore.jks -storepass password -validity 360 -keysize 2048

También puede deshabilitar SSL para los puntos de conexión de origen o destino si no implementan este protocolo. Use las marcas --disable-source-tls o --disable-target-tls:

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> \
  --source-port 9042 --target-port 10350 --proxy-jks-file <path to JKS file> \
  --proxy-jks-password <keystore password> --target-username <username> \
  --target-password <password> --disable-source-tls true  --disable-target-tls true 

Nota

Asegúrese de que la aplicación cliente usa el mismo almacén de claves y contraseña que los usados para el proxy de escritura dual al compilar conexiones SSL a la base de datos a través del proxy.

Configuración de las credenciales y el puerto

De forma predeterminada, la aplicación cliente pasa las credenciales de origen. El proxy usa las credenciales para establecer conexiones con los clústeres de origen y de destino. Como hemos mencionado anteriormente, en este proceso se da por supuesto que las credenciales del origen y el destino son las mismas. Debe especificar un nombre de usuario y una contraseña diferentes para la API de destino para el punto de conexión de Cassandra por separado al iniciar el proxy:

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> \
  --proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password> \
  --target-username <username> --target-password <password>

Los puertos de origen y destino predeterminados, cuando no se especifican, son 9042. En este caso, la API para Cassandra se ejecuta en el puerto 10350. Use --source-port o --target-port para especificar números de puerto:

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> \
  --source-port 9042 --target-port 10350 --proxy-jks-file <path to JKS file> \
  --proxy-jks-password <keystore password> --target-username <username> --target-password <password>

Implementación del proxy de forma remota

Puede haber circunstancias en las que no quiera instalar el proxy en los nodos del clúster. Es posible que prefiera instalarlo en una máquina independiente. En ese escenario, especifique la dirección IP de <source-server>:

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar <source-server> <destination-server>

Advertencia

La instalación y ejecución del proxy de forma remota en una máquina independiente en lugar de ejecutarlo en todos los nodos del clúster de Apache Cassandra de origen afecta al rendimiento mientras se produce la migración en vivo. Aunque esta configuración funciona funcionalmente, el controlador de cliente no puede abrir conexiones a todos los nodos del clúster. El cliente se basa en el nodo de coordinación único donde se instala el proxy para realizar conexiones.

Cero cambios en el código de las aplicaciones

De forma predeterminada, el proxy escucha en el puerto 29042. Cambie el código de la aplicación para que apunte a este puerto. En su lugar, puede cambiar el puerto en el que escucha el proxy. Puede realizar este cambio si desea eliminar los cambios de código de nivel de aplicación por:

  • Haciendo que el servidor de Cassandra de origen se ejecute en otro puerto.
  • Haciendo que el proxy se ejecute en el puerto estándar 9042 de Cassandra.
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server --proxy-port 9042

Nota

La instalación del proxy en los nodos del clúster no requiere reiniciar los nodos. Si tiene muchos clientes de aplicación y prefiere ejecutar el proxy en el puerto estándar de Cassandra 9042 para eliminar los cambios de código de nivel de aplicación, cambie el puerto predeterminado de Apache Cassandra. A continuación, debe reiniciar los nodos del clúster y configurar el puerto de origen para que sea el nuevo puerto que definió para el clúster de Cassandra de origen.

En el ejemplo siguiente, cambiamos el clúster de Cassandra de origen para que se ejecute en el puerto 3074 e iniciamos el clúster en el puerto 9042:

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server \
 --proxy-port 9042 --source-port 3074

Imposición de protocolos

El proxy tiene funcionalidad para imponer protocolos, lo que podría ser necesario si el punto de conexión de origen es más avanzado que el de destino o no se admite por otro motivo. En ese caso, puede especificar --protocol-version y --cql-version para exigir que el protocolo cumpla con el destino:

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server \
  --protocol-version 4 --cql-version 3.11

Después de ejecutar el proxy de doble escritura, debe cambiar el puerto en el cliente de la aplicación y reiniciarlo. O bien, cambie el puerto de Cassandra y reinicie el clúster, si elige ese enfoque. El proxy inicia el reenvío de escrituras al punto de conexión de destino. Para obtener información, consulte Supervisión y métricas.

Ejecución de la carga de datos históricos

Para cargar los datos, cree un cuaderno de Scala en la cuenta de Azure Databricks. Reemplace las configuraciones de Cassandra de origen y de destino por las credenciales correspondientes, así como los espacios de claves y las tablas de origen y destino. Agregue más variables para cada tabla según sea necesario en el ejemplo siguiente y ejecútelo. Cuando la aplicación empiece a enviar solicitudes al proxy de doble escritura, estará listo para migrar datos históricos.

Importante

Antes de migrar los datos, aumente el rendimiento del contenedor a la cantidad necesaria para que la aplicación se migre rápidamente. El escalado del rendimiento antes de iniciar la migración le ayuda a migrar los datos en menos tiempo. Para ayudar a proteger contra la limitación de velocidad durante la carga de datos históricos, puede habilitar reintentos del lado servidor (SSR) en la API para Cassandra. Para obtener instrucciones sobre cómo habilitar SSR y más información, consulte Prevención de errores de limitación de velocidad para las operaciones de Azure Cosmos DB para Apache Cassandra.

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext

// source cassandra configs
val sourceCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>"
)

//target cassandra configs
val targetCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "10350",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>",
    //throughput related settings below - tweak these depending on data volumes. 
    "spark.cassandra.output.batch.size.rows"-> "1",
    "spark.cassandra.output.concurrent.writes" -> "1000",
    "spark.cassandra.connection.remoteConnectionsPerExecutor" -> "1",
    "spark.cassandra.concurrent.reads" -> "512",
    "spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
    "spark.cassandra.connection.keep_alive_ms" -> "600000000"
)

//set timestamp to ensure it is before read job starts
val timestamp: Long = System.currentTimeMillis / 1000

//Read from source Cassandra
val DFfromSourceCassandra = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(sourceCassandra)
  .load
  
//Write to target Cassandra
DFfromSourceCassandra
  .write
  .format("org.apache.spark.sql.cassandra")
  .options(targetCassandra)
  .option("writetime", timestamp)
  .mode(SaveMode.Append)
  .save

Nota

En el ejemplo anterior de Scala, observará que timestamp se establece en la hora actual antes de leer todos los datos de la tabla de origen. Después, writetime se antedata con esta marca de tiempo. Este enfoque garantiza que los registros escritos desde la carga de datos históricos en el punto de conexión de destino no puedan sobrescribir las actualizaciones que se incluyen con una marca de tiempo posterior del proxy de escritura dual mientras se leen los datos históricos.

Importante

Si, por algún motivo, necesita conservar las marcas de tiempo exactas, debe seguir un enfoque de migración de datos históricos que mantenga las marcas de tiempo, como en este ejemplo. El archivo JAR de dependencia del ejemplo también contiene el conector de Spark, por lo que no es necesario instalar el ensamblado del conector de Spark mencionado en los requisitos previos anteriores. Tener ambos instalados en el clúster de Spark provoca conflictos.

Validación del origen y el destino

Una vez completada la carga de datos históricos, las bases de datos deben estar sincronizadas y listas para la transición. Se recomienda validar el origen y el destino para asegurarse de que coinciden antes de que finalmente se corten.

Nota

Si usó el ejemplo de migración Cassandra mencionado anteriormente para conservar writetime, este ejemplo incluye la capacidad de validar el de migración mediante comparando filas en origen y destino en función de determinadas tolerancias.

Paso siguiente