Azure Cosmos DB の Cassandra 用 API は、さまざまな理由で Apache Cassandra で実行されるエンタープライズ ワークロードに最適な選択肢です。
- 管理と監視のオーバーヘッドなし: これにより、オペレーティング システム、Java 仮想マシン、および yaml ファイルとその相互作用全体で、無数の設定を管理および監視するオーバーヘッドが排除されます。
- 大幅なコスト削減: Azure Cosmos DB ではコストを節約できます。これには、仮想マシン、帯域幅、および該当するライセンスのコストが含まれます。 データ センター、サーバー、SSD ストレージ、ネットワーク、電気コストを管理する必要はありません。
- 既存のコードとツールを使用可能: Azure Cosmos DB では、既存の Cassandra SDK およびツールとのワイヤ プロトコル レベルの互換性が提供されます。 この互換性により、Azure Cosmos DB for Apache Cassandra を少し変更するだけで、既存のコードベースを使用できることが保証されます。
Azure Cosmos DB では、レプリケーション用のネイティブ Apache Cassandra gossip プロトコルはサポートされていません。 移行の必要なダウンタイムがゼロの場合は、別のアプローチが必要です。 このチュートリアルでは、デュアル書き込みプロキシと Apache Spark を使用して、ネイティブ Apache Cassandra クラスターから Azure Cosmos DB for Apache Cassandra にデータをライブ マイグレーションする方法について説明します。
次の図はパターンを示したものです。 デュアル書き込みプロキシは、ライブ変更をキャプチャするために使用されます。 履歴データは、Apache Spark を使用して一括コピーされます。 構成をほとんど、またはまったく変更することなく、プロキシでアプリケーション コードからの接続を受け入れることができます。 すべての要求がソース データベースにルーティングされ、一括コピーが行われている間に、Cassandra の API に書き込みが非同期的にルーティングされます。
前提条件
- Azure Cosmos DB for Apache Cassandra アカウントをプロビジョニングする。
- Azure Cosmos DB for Apache Cassandra への接続の基本を確認する。
- 互換性を確保するために Azure Cosmos DB for Apache Cassandra でサポートされる機能を確認する。
- 検証に cqlsh を使用する。
- ソース クラスターとターゲット Cassandra 用 API エンドポイントの間にネットワーク接続が確立されていることを確認する。
- ソース Cassandra データベースからターゲット API for Cassandra アカウントにキースペース/テーブル スキームを以前に移行したことを確認します。
重要
移行中に Apache Cassandra writetime
を保持する必要がある場合は、テーブルの作成時に次のフラグを設定する必要があります。
with cosmosdb_cell_level_timestamp=true and cosmosdb_cell_level_timestamp_tombstones=true and cosmosdb_cell_level_timetolive=true
次に例を示します。
CREATE KEYSPACE IF NOT EXISTS migrationkeyspace WITH REPLICATION= {'class': 'org.apache.> cassandra.locator.SimpleStrategy', 'replication_factor' : '1'};
CREATE TABLE IF NOT EXISTS migrationkeyspace.users (
name text,
userID int,
address text,
phone int,
PRIMARY KEY ((name), userID)) with cosmosdb_cell_level_timestamp=true and > cosmosdb_cell_level_timestamp_tombstones=true and cosmosdb_cell_level_timetolive=true;
Spark クラスターをプロビジョニングする
Azure Databricks を使用することをお勧めします。 Spark 3.0 以上をサポートするランタイムを使用します。
重要
Azure Databricks アカウントがソース Apache Cassandra クラスターとネットワーク接続されていることを確認する必要があります。 このセットアップには、仮想ネットワークの挿入が必要な場合があります。 詳細については、「Azure Virtual Network に Azure Databricks をデプロイする」を参照してください。
Spark の依存関係を追加する
Apache Spark Cassandra コネクタ ライブラリをクラスターに追加して、ネイティブと Azure Cosmos DB Cassandra 両方のエンドポイントに接続します。 自分のクラスターで、 [ライブラリ]>[新規インストール]>[Maven] の順に選択し、Maven 座標に com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0
を追加します。
重要
移行中に行ごとに Apache Cassandra writetime
を保持する必要がある場合は、こちらのサンプルを使用することをお勧めします。 このサンプルの依存関係 JAR には Spark コネクタも含まれているため、前に説明したコネクタ アセンブリではなく、このバージョンをインストールする必要があります。
このサンプルは、履歴データの読み込み後にソースとターゲット間の行比較検証を実行する場合にも便利です。 詳細については、「 履歴データの読み込みを実行する 」および「 ソースとターゲットを検証する」を参照してください。
[インストール] を選択し、インストールが完了したらクラスターを再起動します。
注
Cassandra コネクタ ライブラリがインストールされた後で、必ず Azure Databricks クラスターを再起動してください。
デュアル書き込みプロキシをインストールする
デュアル書き込み中に最適なパフォーマンスを得られるように、ソース Cassandra クラスター内のすべてのノードにプロキシをインストールすることをお勧めします。
#assuming you do not have git already installed
sudo apt-get install git
#assuming you do not have maven already installed
sudo apt install maven
#clone repo for dual-write proxy
git clone https://github.com/Azure-Samples/cassandra-proxy.git
#change directory
cd cassandra-proxy
#compile the proxy
mvn package
デュアル書き込みプロキシを開始する
ソース Cassandra クラスターのすべてのノードにプロキシをインストールすることをお勧めします。 少なくとも、各ノード上で次のコマンドを実行してプロキシを開始します。
<target-server>
をターゲット クラスターのいずれかのノードの IP またはサーバー アドレスに置き換えます。
<path to JKS file>
をローカル環境の .jks ファイルへのパスに置き換え、<keystore password>
を対応するパスワードに置き換えます。
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> --proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password>
この方法でプロキシを開始する場合は、次の条件を満たすことが前提となります。
- ソースとターゲットのエンドポイントで、ユーザー名とパスワードが同じである。
- ソースとターゲットのエンドポイントに、Secure Sockets Layer (SSL) が実装されている。
ソースとターゲットのエンドポイントでこれらの条件が満たされない場合は、詳細な構成オプションを参照してください。
SSL の構成
SSL の場合は、ソース クラスターで使用されるキーストアなど、既存のキーストアを実装するか、 keytool
を使用して自己署名証明書を作成できます。
keytool -genkey -keyalg RSA -alias selfsigned -keystore keystore.jks -storepass password -validity 360 -keysize 2048
また、ソースまたはターゲットのエンドポイントに SSL が実装されていない場合は、SSL を無効にすることができます。
--disable-source-tls
または --disable-target-tls
のフラグを使用します。
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> \
--source-port 9042 --target-port 10350 --proxy-jks-file <path to JKS file> \
--proxy-jks-password <keystore password> --target-username <username> \
--target-password <password> --disable-source-tls true --disable-target-tls true
注
クライアント アプリケーションで、プロキシ経由でデータベースへの SSL 接続を構築するときに、デュアルライト プロキシに使用されるキーストアとパスワードと同じキーストアとパスワードが使用されていることを確認します。
資格情報とポートを構成する
既定では、クライアント アプリはソース資格情報を渡します。 プロキシは資格情報を使用して、ソース クラスターとターゲット クラスターに接続します。 前に説明したように、このプロセスでは、ソースとターゲットの資格情報が同じであることを前提としています。 プロキシを起動するときに、Cassandra エンドポイントのターゲット API に別のユーザー名とパスワードを個別に指定する必要があります。
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> \
--proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password> \
--target-username <username> --target-password <password>
既定のソース ポートとターゲット ポート (指定されていない場合) は 9042 です。 この場合、Cassandra 用 API はポート 10350
で実行されます。
--source-port
または--target-port
を使用してポート番号を指定します。
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> \
--source-port 9042 --target-port 10350 --proxy-jks-file <path to JKS file> \
--proxy-jks-password <keystore password> --target-username <username> --target-password <password>
プロキシをリモートでデプロイする
クラスター ノード自体にプロキシをインストールしたくない場合があります。 別のコンピューターにインストールすることをお選びになる場合があります。 そのシナリオでは、 <source-server>
の IP アドレスを指定します。
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar <source-server> <destination-server>
警告
ソース Apache Cassandra クラスター内のすべてのノードでプロキシを実行するのではなく、別のコンピューターにプロキシをリモートでインストールして実行すると、ライブ マイグレーションの実行中のパフォーマンスに影響します。 この構成は機能しますが、クライアント ドライバーはクラスター内のすべてのノードへの接続を開くわけではありません。 クライアントは、接続を行うためにプロキシがインストールされている単一のコーディネーター ノードに依存します。
アプリケーションのコードを変更しなくてよいようにする
既定では、プロキシにより、ポート 29042 がリッスンされます。 このポートを指すアプリケーション コードを変更します。 代わりに、プロキシがリッスンするポートを変更できます。 アプリケーション レベルのコード変更を排除する場合は、次の方法でこの変更を行うことができます。
- ソース Cassandra サーバーを別のポートで実行する。
- プロキシを Cassandra の標準ポート 9042 で実行する。
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server --proxy-port 9042
注
クラスター ノードにプロキシをインストールする場合、ノードを再起動する必要はありません。 アプリケーション レベルのコード変更を排除するために、多数のアプリケーション クライアントがあり、標準の Cassandra ポート 9042 でプロキシを実行する場合は、 Apache Cassandra の既定のポートを変更します。 その後、クラスター内のノードを再起動し、ソース ポートをソース Cassandra クラスター用に定義した新しいポートとして構成する必要があります。
次の例では、ポート 3074 で実行するようにソース Cassandra クラスターを変更し、ポート 9042 でクラスターを起動します。
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server \
--proxy-port 9042 --source-port 3074
プロトコルを強制する
このプロキシにはプロトコルを強制する機能があります。これは、ソース エンドポイントがターゲットよりも高度な場合、またはそうしないとサポートされない場合に、必要になることがあります。 その場合は、--protocol-version
と --cql-version
を指定して、プロトコルをターゲットに準拠させることができます。
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server \
--protocol-version 4 --cql-version 3.11
デュアルライト プロキシが実行されたら、アプリケーション クライアントでポートを変更して再起動する必要があります。 または、Cassandra ポートを変更し、クラスターを再起動します (その方法を選択した場合)。 プロキシは、ターゲット エンドポイントへの書き込みの転送を開始します。 詳細については、 監視とメトリックを参照してください。
履歴データの読み込みを実行する
データを読み込むには、Azure Databricks アカウントで Scala ノートブックを作成します。 ソースとターゲットの Cassandra 構成を対応する資格情報に置き換え、ソースとターゲットのキースペースとテーブルを置き換えます。 次のサンプルのように、必要に応じて各テーブルに変数を追加し、実行します。 アプリケーションによってデュアル書き込みプロキシへの要求送信が開始されたら、履歴データを移行する準備は完了です。
重要
データを移行する前に、アプリケーションを迅速に移行するために必要な量にコンテナーのスループットを増やします。 移行を開始する前にスループットをスケーリングすると、データを短時間で移行するのに役立ちます。 履歴データの読み込み中のレート制限から保護するために、Cassandra 用 API でサーバー側の再試行 (SSR) を有効にすることができます。 SSR を有効にする方法の詳細については、「 Azure Cosmos DB for Apache Cassandra 操作のレート制限エラーを防止する」を参照してください。
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext
// source cassandra configs
val sourceCassandra = Map(
"spark.cassandra.connection.host" -> "<Source Cassandra Host>",
"spark.cassandra.connection.port" -> "9042",
"spark.cassandra.auth.username" -> "<USERNAME>",
"spark.cassandra.auth.password" -> "<PASSWORD>",
"spark.cassandra.connection.ssl.enabled" -> "true",
"keyspace" -> "<KEYSPACE>",
"table" -> "<TABLE>"
)
//target cassandra configs
val targetCassandra = Map(
"spark.cassandra.connection.host" -> "<Source Cassandra Host>",
"spark.cassandra.connection.port" -> "10350",
"spark.cassandra.auth.username" -> "<USERNAME>",
"spark.cassandra.auth.password" -> "<PASSWORD>",
"spark.cassandra.connection.ssl.enabled" -> "true",
"keyspace" -> "<KEYSPACE>",
"table" -> "<TABLE>",
//throughput related settings below - tweak these depending on data volumes.
"spark.cassandra.output.batch.size.rows"-> "1",
"spark.cassandra.output.concurrent.writes" -> "1000",
"spark.cassandra.connection.remoteConnectionsPerExecutor" -> "1",
"spark.cassandra.concurrent.reads" -> "512",
"spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
"spark.cassandra.connection.keep_alive_ms" -> "600000000"
)
//set timestamp to ensure it is before read job starts
val timestamp: Long = System.currentTimeMillis / 1000
//Read from source Cassandra
val DFfromSourceCassandra = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(sourceCassandra)
.load
//Write to target Cassandra
DFfromSourceCassandra
.write
.format("org.apache.spark.sql.cassandra")
.options(targetCassandra)
.option("writetime", timestamp)
.mode(SaveMode.Append)
.save
注
前の Scala サンプルでは、ソース テーブル内のすべてのデータを読み取る前に、 timestamp
が現在の時刻に設定されていることがわかります。 その後、writetime
がこの過去のタイムスタンプに設定されます。 この方法により、履歴データの読み込みからターゲット エンドポイントに書き込まれたレコードが、履歴データの読み取り中にデュアル書き込みプロキシから後のタイム スタンプで入力された更新を上書きできなくなります。
重要
何らかの理由で "正確な" タイムスタンプを保持する必要がある場合は、こちらのサンプルのような、タイムスタンプを保持する履歴データの移行アプローチを採用する必要があります。 サンプルの依存関係 JAR には Spark コネクタも含まれているため、前の前提条件で説明した Spark コネクタ アセンブリをインストールする必要はありません。 両方を Spark クラスターにインストールすると、競合が発生します。
ソースとターゲットを検証する
履歴データの読み込みが完了すると、データベースは同期され、切り替えの準備が整います。 ソースとターゲットを検証して、最終的に切り取る前にそれらが一致することを確認することをお勧めします。
注
前に説明した Cassandra 移行ツールサンプルを使用してwritetime
を保持した場合、このサンプルには、特定の許容範囲に基づいてソースとターゲットの行を比較して移行を検証する機能が含まれています。