データフレームの変更と保存
Apache Spark は、 データを 操作するための主要な構造としてデータフレーム オブジェクトを提供します。 データフレームを使用してデータのクエリと変換を行い、結果をデータ レイクに保持できます。 データフレームにデータを読み込むには、 spark.read 関数を使用し、読み取るデータのファイル形式、パス、および必要に応じてスキーマを指定します。 たとえば、次のコードは、 orders フォルダー内のすべての .csv ファイルから order_details という名前のデータフレームにデータを読み込み、最初の 5 つのレコードを表示します。
order_details = spark.read.csv('/orders/*.csv', header=True, inferSchema=True)
display(order_details.limit(5))
データ構造を変換する
ソース データをデータフレームに読み込んだ後、データフレーム オブジェクトのメソッドと Spark 関数を使用して変換できます。 データフレームに対する一般的な操作は次のとおりです。
- 行と列のフィルター処理
- 列の名前変更
- 新しい列の作成 (多くの場合、既存の列から派生)
- null またはその他の値を置き換える
次の例では、 split
関数を使用して 、CustomerName 列の値を FirstName と LastName という名前の 2 つの新しい列に分割します。 次に、 drop
メソッドを使用して、元の CustomerName 列を削除します。
from pyspark.sql.functions import split, col
# Create the new FirstName and LastName fields
transformed_df = order_details.withColumn("FirstName", split(col("CustomerName"), " ").getItem(0)).withColumn("LastName", split(col("CustomerName"), " ").getItem(1))
# Remove the CustomerName field
transformed_df = transformed_df.drop("CustomerName")
display(transformed_df.limit(5))
Spark SQL ライブラリのすべての機能を使用して、行のフィルター処理、列の派生、削除、名前変更、その他の必要なデータ変更の適用によってデータを変換できます。
変換されたデータを保存する
dataFrame が必要な構造になった後、データ レイクでサポートされている形式に結果を保存できます。
次のコード例では、dataFrame をデータ レイク内の Parquet ファイルに保存し、同じ名前の既存のファイルを置き換えます。
transformed_df.write.mode("overwrite").parquet('/transformed_data/orders.parquet')
print ("Transformed data saved!")
注
Parquet 形式は、通常、分析ストアへの追加の分析またはインジェストに使用するデータ ファイルに適しています。 Parquet は、ほとんどの大規模な Data Analytics システムでサポートされている非常に効率的な形式です。 実際、データ変換の要件が、単に別の形式 (CSV など) から Parquet にデータを変換することだけである場合があります。