この記事では、Mosaic Streaming を使用して、Apache Spark から PyTorch と互換性のある形式にデータを変換する方法について説明します。
Mosaic Streaming は、オープン ソースのデータ読み込みライブラリです。 これにより、Apache Spark DataFrame として既に読み込まれているデータセットからのディープ ラーニング モデルの単一ノードまたは分散トレーニングと評価が可能になります。 Mosaic Streaming は主に Mosaic Composer をサポートするものですが、ネイティブの PyTorch、PyTorch Lightning、TorchDistributor とも統合されます。 Mosaic Streaming は、従来の PyTorch DataLoader と比べて一連の長所を持ち、これには以下が含まれます。
- 画像、テキスト、ビデオ、マルチモーダル データを含むあらゆるデータ型との互換性。
- 主要なクラウド ストレージ プロバイダー (AWS、OCI、GCS、Azure、Databricks UC Volume、そして Cloudflare R2、Coreweave、Backblaze b2 などの S3 互換オブジェクト ストア) のサポート
- 正確性の保証、パフォーマンス、柔軟性、使いやすさの最大化。 詳細については、主要機能ページを参照してください。
Mosaic Streaming に関する一般的な情報については、Streaming API のドキュメントを参照してください。
注
Mosaic Streaming は、Databricks Runtime 15.2 ML 以上のすべてのバージョンにプレインストールされています。
Mosaic Streaming を使用して Spark DataFrame からデータを読み込む
Mosaic Streaming は、Apache Spark から Mosaic Data Shard (MDS) 形式へ変換するための簡単なワークフローを提供しており、この形式は後に分散環境で使用するために読み込むことができます。
推奨されるワークフロー:
- Apache Spark を使用してデータを読み込み、必要に応じて前処理を実行します。
-
streaming.base.converters.dataframe_to_mds
を使用すると、データフレームを一時的な保存のためにディスクに保存したり、永続的な保存のために Unity Catalog ボリュームに保存したりできます。 このデータは MDS 形式で保存され、圧縮とハッシュのサポートによってさらに最適化することができます。 高度なユース ケースでは、UDF を使用したデータの前処理を含めることもできます。 詳細については、Spark DataFrame から MDS へのチュートリアルを参照してください。 -
streaming.StreamingDataset
を使用すると、必要なデータをメモリに読み込むことができます。StreamingDataset
は PyTorch の IterableDataset のバージョンであり、弾力的に確定的なシャッフル機能を備えています。これにより、高速なミッドエポック再開が可能になります。 詳細については、StreamingDataset のドキュメントを参照してください。 -
streaming.StreamingDataLoader
を使用すると、トレーニング/評価/テストのために必要なデータを読み込むことができます。StreamingDataLoader
は PyTorch の DataLoader のバージョンであり、追加のチェックポイント/再開インターフェイスを提供します。このインターフェイスでは、このランクのモデルによって見られるサンプルの数が追跡されます。
エンド ツー エンドの例については、次のノートブックを参照してください。
Mosaic Streaming を使用した Spark から PyTorch へのデータ読み込みの簡略化ノートブック
トラブルシューティング: 認証エラー
StreamingDataset
を使用して Unity カタログ ボリュームからデータを読み込むときに次のエラーが表示される場合は、次に示すように環境変数を設定します。
ValueError: default auth: cannot configure default credentials, please check https://docs.databricks.com/en/dev-tools/auth.html#databricks-client-unified-authentication to configure credentials for your preferred authentication method.
注
TorchDistributor
を使用して分散トレーニングを実行するときにこのエラーが発生した場合は、ワーカー ノードに環境変数も設定する必要があります。
db_host = "https://your-databricks-host.databricks.com"
db_token = "YOUR API TOKEN" # Create a token with either method from https://docs.databricks.com/en/dev-tools/auth/index.html#databricks-authentication-methods
def your_training_function():
import os
os.environ['DATABRICKS_HOST'] = db_host
os.environ['DATABRICKS_TOKEN'] = db_token
# The above function can be distributed with TorchDistributor:
# from pyspark.ml.torch.distributor import TorchDistributor
# distributor = TorchDistributor(...)
# distributor.run(your_training_function)