次の方法で共有


デュアル書き込みプロキシを使用した Azure Managed Instance for Apache Cassandra へのライブ マイグレーション

可能であれば、Apache Cassandra ネイティブ機能を使用して、 ハイブリッド クラスターを構成して既存のクラスターから Azure Managed Instance for Apache Cassandra にデータを移行することをお勧めします。 この機能では、Apache Cassandra のゴシップ プロトコルが使用され、ソース データセンターから新しいマネージド インスタンス データセンターにデータがシームレスにレプリケートされます。

ソース データベースのバージョンに互換性がない場合や、ハイブリッド クラスターのセットアップが不可能なシナリオがある場合があります。 このチュートリアルでは、デュアル書き込みプロキシと Apache Spark を使用して、Azure Managed Instance for Apache Cassandra にデータのライブ マイグレーションを行う方法について説明します。 ライブ変更はデュアル書き込みプロキシを使用してキャプチャされ、履歴データは Apache Spark を使用して一括コピーされます。 このアプローチには次のような利点があります。

  • アプリケーションの変更が最小限。 構成をほとんど、またはまったく変更することなく、プロキシでアプリケーション コードからの接続を受け入れることができます。 すべての要求がソース データベースにルーティングされ、非同期的にセカンダリ ターゲットに書き込みがルーティングされます。
  • クライアントでのワイヤー プロトコルの依存関係。 この方法はバックエンド リソースや内部プロトコルに依存しないため、Apache Cassandra ワイヤ プロトコルを実装する任意のソースまたはターゲットの Cassandra システムで使用できます。

次の図はこのアプローチを示したものです。

Azure Managed Instance for Apache Cassandra へのデータのライブ マイグレーションを示すアニメーション。

前提条件

Spark クラスターをプロビジョニングする

Spark 3.0 をサポートする Azure Databricks ランタイム バージョン 7.5 を選択することをお勧めします。

Azure Databricks ランタイムのバージョンを見つける方法を示すスクリーンショット。

Spark の依存関係を追加する

Apache Spark Cassandra コネクタ ライブラリをクラスターに追加して、ワイヤ プロトコルに互換性のある任意の Apache Cassandra エンドポイントに接続できるようにする必要があります。 自分のクラスターで、 [ライブラリ]>[新規インストール]>[Maven] の順に選択し、Maven 座標に com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0 を追加します。

重要

移行中に行ごとに Apache Cassandra writetime を保持する必要がある場合は、こちらのサンプルを使用することをお勧めします。 このサンプルの依存関係 jar にも Spark コネクタが含まれているため、コネクタ アセンブリの代わりにこのバージョンをインストールします。

このサンプルは、履歴データの読み込み後にソースとターゲット間の行比較検証を実行する場合にも便利です。 「 履歴データの読み込みを実行する 」および「 ソースとターゲットを検証する」を参照してください。

Azure Databricks で Maven パッケージを検索する方法を示すスクリーンショット。

[インストール] を選択し、インストールが完了したらクラスターを再起動します。

Cassandra コネクタ ライブラリがインストールされた後で、必ず Azure Databricks クラスターを再起動してください。

デュアル書き込みプロキシをインストールする

デュアル書き込み中に最適なパフォーマンスを得られるように、ソース Cassandra クラスター内のすべてのノードにプロキシをインストールすることをお勧めします。

#assuming you do not have git already installed
sudo apt-get install git 

#assuming you do not have maven already installed
sudo apt install maven

#clone repo for dual-write proxy
git clone https://github.com/Azure-Samples/cassandra-proxy.git

#change directory
cd cassandra-proxy

#compile the proxy
mvn package

デュアル書き込みプロキシを開始する

ソース Cassandra クラスターのすべてのノードにプロキシをインストールすることをお勧めします。 少なくとも、各ノード上で次のコマンドを実行してプロキシを開始します。 <target-server> をターゲット クラスターのいずれかのノードの IP またはサーバー アドレスに置き換えます。 <path to JKS file>をローカルの jks ファイルへのパスに置き換え、<keystore password>を対応するパスワードに置き換えます。

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> --proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password>

この方法でプロキシを開始する場合は、次の条件を満たすことが前提となります。

  • ソースとターゲットのエンドポイントで、ユーザー名とパスワードが同じである。
  • ソースとターゲットのエンドポイントに、Secure Sockets Layer (SSL) が実装されている。

ソースとターゲットのエンドポイントでこれらの条件が満たされない場合は、詳細な構成オプションを参照してください。

SSL の構成

SSL の場合は、ソース クラスターで使用されるキーストアなど、既存のキーストアを実装するか、 keytoolを使用して自己署名証明書を作成できます。

keytool -genkey -keyalg RSA -alias selfsigned -keystore keystore.jks -storepass password -validity 360 -keysize 2048

また、ソースまたはターゲットのエンドポイントに SSL が実装されていない場合は、SSL を無効にすることができます。 --disable-source-tls または --disable-target-tls のフラグを使用します。

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> \
  --source-port 9042 --target-port 10350 --proxy-jks-file <path to JKS file> \
  --proxy-jks-password <keystore password> --target-username <username> --target-password <password> \
  --disable-source-tls true  --disable-target-tls true 

クライアント アプリケーションで、プロキシを使用するデータベースへの SSL 接続を構築するときに、デュアルライト プロキシに使用されるキーストアとパスワードと同じキーストアとパスワードが使用されていることを確認します。

資格情報とポートを構成する

既定では、ソース資格情報はクライアント アプリから渡されます。 プロキシは、ソース クラスターとターゲット クラスターへの接続に資格情報を使用します。 前に説明したように、このプロセスでは、ソースとターゲットの資格情報が同じであることを前提としています。 必要な場合は、プロキシを開始するときに、ターゲット Cassandra エンドポイントに異なるユーザー名とパスワードを指定することができます。

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> \
  --proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password> \
  --target-username <username> --target-password <password>

既定のソース ポートとターゲット ポート (指定されていない場合) は 9042 です。 ターゲットまたはソースの Cassandra エンドポイントが異なるポートで実行されている場合は、--source-port または --target-port を使用して別のポート番号を指定できます。

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> \
  --source-port 9042 --target-port 10350 --proxy-jks-file <path to JKS file> \
  --proxy-jks-password <keystore password> --target-username <username> --target-password <password>

プロキシをリモートでデプロイする

クラスター ノード自体にプロキシをインストールしたくない場合があります。 別のコンピューターにインストールしたい場合。 そのシナリオでは、 <source-server>の IP アドレスを指定します。

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar <source-server> <destination-server>

警告

ソース Apache Cassandra クラスター内のすべてのノードでプロキシを実行するのではなく、別のコンピューターでプロキシをリモートで実行できます。 その場合は、クラスター内のノードがあるのと同じ数のマシンにプロキシをデプロイすることをお勧めします。 system.peers で IP アドレスの置換を設定します。 プロキシでこの構成を使用します。 この方法を使用しない場合は、クライアント ドライバーがクラスター内のすべてのノードへの接続を開くことができないため、ライブ マイグレーションの実行中にパフォーマンスに影響する可能性があります。

アプリケーションのコードを変更しなくてよいようにする

既定では、プロキシにより、ポート 29042 がリッスンされます。 このポートを指すアプリケーション コードを変更する必要があります。 または、プロキシがリッスンするポートを変更することもできます。 アプリケーション レベルのコード変更を排除する場合は、次の方法を使用できます。

  • ソース Cassandra サーバーを別のポートで実行する。
  • プロキシを Cassandra の標準ポート 9042 で実行する。
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server --proxy-port 9042

クラスター ノードにプロキシをインストールする場合、ノードを再起動する必要はありません。 多数のアプリケーション クライアントがあり、標準の Cassandra ポート 9042 でプロキシを実行してアプリケーション レベルのコード変更を排除する場合は、 Apache Cassandra の既定のポートを変更します。 その後、クラスター内のノードを再起動し、ソース Cassandra クラスターに対して定義した新しいポートに、ソース ポートを構成する必要があります。

次の例では、ポート 3074 で実行するようにソース Cassandra クラスターを変更し、ポート 9042 でクラスターを起動します。

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server --proxy-port 9042 --source-port 3074

プロトコルを強制する

このプロキシにはプロトコルを強制する機能があります。これは、ソース エンドポイントがターゲットよりも高度な場合、またはそうしないとサポートされない場合に、必要になることがあります。 その場合は、--protocol-version--cql-version を指定して、プロトコルをターゲットに準拠させることができます。

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server --protocol-version 4 --cql-version 3.11

デュアルライト プロキシが実行されたら、アプリケーション クライアントのポートを変更して再起動します。 または、Cassandra ポートを変更し、その方法を選択した場合はクラスターを再起動します。 プロキシは、ターゲット エンドポイントへの書き込みの転送を開始します。 プロキシ ツールで使用できる監視とメトリックについてはこちらを参照してください。

履歴データの読み込みを実行する

データを読み込むには、Azure Databricks アカウントで Scala ノートブックを作成します。 ソースとターゲットの Cassandra 構成を対応する資格情報に置き換え、ソースとターゲットのキースペースとテーブルを置き換えます。 次のサンプルのように、必要に応じて各テーブルに変数を追加し、実行します。 アプリケーションによってデュアル書き込みプロキシへの要求送信が開始されたら、履歴データを移行する準備は完了です。

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext

// source cassandra configs
val sourceCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>"
)

//target cassandra configs
val targetCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>",
    //throughput related settings below - tweak these depending on data volumes. 
    "spark.cassandra.output.batch.size.rows"-> "1",
    "spark.cassandra.output.concurrent.writes" -> "1000",
    "spark.cassandra.connection.remoteConnectionsPerExecutor" -> "1",
    "spark.cassandra.concurrent.reads" -> "512",
    "spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
    "spark.cassandra.connection.keep_alive_ms" -> "600000000"
)

//set timestamp to ensure it is before read job starts
val timestamp: Long = System.currentTimeMillis / 1000

//Read from source Cassandra
val DFfromSourceCassandra = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(sourceCassandra)
  .load
  
//Write to target Cassandra
DFfromSourceCassandra
  .write
  .format("org.apache.spark.sql.cassandra")
  .options(targetCassandra)
  .option("writetime", timestamp)
  .mode(SaveMode.Append)
  .save

前の Scala サンプルでは、ソース テーブル内のすべてのデータを読み取る前に、 timestamp が現在の時刻に設定されています。 その後、writetime がこの過去のタイムスタンプに設定されます。 この方法により、履歴データの読み込みからターゲット エンドポイントに書き込まれたレコードが、履歴データの読み取り中にデュアル書き込みプロキシから後のタイム スタンプで入力された更新を上書きできなくなります。

重要

何らかの理由で "正確な" タイムスタンプを保持する必要がある場合は、こちらのサンプルのような、タイムスタンプを保持する履歴データの移行アプローチを採用する必要があります。 サンプルの依存関係 jar には Spark コネクタも含まれているため、前の前提条件で説明した Spark コネクタ アセンブリをインストールする必要はありません。 両方を Spark クラスターにインストールすると、競合が発生します。

ソースとターゲットを検証する

履歴データの読み込みが完了すると、データベースは同期され、切り替えの準備が整います。 ソースとターゲットを検証して、最終的に切り取る前にそれらが一致することを確認することをお勧めします。

writetimeを保持するために前のセクションで説明した Cassandra 移行機能のサンプルを使用した場合は、特定の許容範囲に基づいてソースとターゲットの行を比較することで、移行を検証する機能があります。

次のステップ