Spark を使用してデータ ファイルを操作する

完了

Spark を使用する利点の 1 つは、さまざまなプログラミング言語でコードを記述して実行できることです。これにより、既にお持ちのプログラミング スキルを活用し、特定のタスクに最適な言語を使用することができます。 新しい Azure Databricks Spark ノートブックの既定の言語は PySpark です。これは、データ操作と視覚化の強力なサポートにより、データ サイエンティストやアナリストによって一般的に使用される、Spark 最適化バージョンの Python です。 さらに、 Scala (対話形式で使用できる Java 派生言語) や SQL ( Spark SQL ライブラリに含まれる一般的に使用される SQL 言語のバリアント) などの言語を使用して、リレーショナル データ構造を操作できます。 ソフトウェア エンジニアは、 Java などのフレームワークを使用して Spark で実行されるコンパイル済みソリューションを作成することもできます。

データフレームを使用してデータを探索する

ネイティブでは、Spark は回復性のある分散データセット (RDD) と呼ばれるデータ構造を使用します。ただし、RDD で直接動作するコードを記述できますが、Spark で構造化データを操作するために最も一般的に使用されるデータ構造は、Spark SQL ライブラリの一部として提供されるデータフレームです。 Spark のデータフレームは、ユビキタスな Pandas Python ライブラリのデータフレームに似ていますが、Spark の分散処理環境で動作するように最適化されています。

注意

データフレーム API に加えて、Spark SQL では、Java と Scala でサポートされている厳密に型指定された "データセット" API が提供されます。 このモジュールでは、Dataframe API に焦点を当てます。

データフレームにデータを読み込む

仮説の例を見て、データフレームを使用してデータを操作する方法を確認しましょう。 Databricks File System (DBFS) ストレージのデータ フォルダーに 、products.csv という名前のコンマ区切りのテキスト ファイルに次の データ があるとします。

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Spark ノートブックでは、次の PySpark コードを使用してデータフレームにデータを読み込み、最初の 10 行を表示できます。

%pyspark
df = spark.read.load('/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

先頭の %pyspark 行は "マジック" と呼ばれ、このセルで使用される言語が PySpark であることを Spark に伝えます。 製品データの例と同等の Scala コードを次に示します。

%spark
val df = spark.read.format("csv").option("header", "true").load("/data/products.csv")
display(df.limit(10))

マジック %spark は Scala を指定するために使用されます。

ヒント

Notebook インターフェイスの各セルに使用する言語を選択することもできます。

前に示した両方の例では、次のような出力が生成されます。

ProductID ProductName カテゴリ 定価
771 Mountain-100 Silver、38 マウンテン バイク 3399.9900
772 Mountain-100 Silver,42 マウンテン バイク 3399.9900
773 Mountain-100 Silver,44 マウンテン バイク 3399.9900
... ... ... ...

データフレーム スキーマを指定する

前の例では、CSV ファイルの最初の行に列名が含まれており、Spark により、含まれているデータから各列のデータ型を推論できました。 また、データの明示的なスキーマを指定することもできます。これは、次の CSV の例のように、データ ファイルに列名が含まれていない場合に便利です。

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

次の PySpark の例は、 product-data.csvという名前 のファイルから読み込まれるデータフレームのスキーマをこの形式で指定する方法を示しています。

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

ここでも、結果は次のようになります。

ProductID ProductName カテゴリ 定価
771 Mountain-100 Silver、38 マウンテン バイク 3399.9900
772 Mountain-100 Silver,42 マウンテン バイク 3399.9900
773 Mountain-100 Silver,44 マウンテン バイク 3399.9900
... ... ... ...

データフレームのフィルター処理とグループ化を行う

Dataframe クラスのメソッドを使用して、含まれているデータをフィルター処理、並べ替え、グループ化、操作できます。 たとえば、次のコード例では、select メソッドを使用して、前の例の製品データを含む df データフレームから ProductName 列と ListPrice 列を取得します。

pricelist_df = df.select("ProductID", "ListPrice")

このコード例の結果は次のようになります。

ProductID 定価
771 3399.9900
772 3399.9900
773 3399.9900
... ...

ほとんどのデータ操作メソッドと共通して、 select は新しいデータフレーム オブジェクトを返します。

ヒント

データフレームから列のサブセットを選択することは一般的な操作であり、次の短い構文を使用して実現することもできます。

pricelist_df = df["ProductID", "ListPrice"]

メソッドを "チェーン" して、変換されたデータフレームを作成する一連の操作を実行できます。 たとえば、次のコード例では、Select メソッドとwhere メソッドをチェーンして、Mountain Bikes または Road Bikes のカテゴリを持つ製品の ProductName 列と ListPrice 列を含む新しいデータフレームを作成します。

bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

このコード例の結果は次のようになります。

ProductName 定価
Mountain-100 Silver、38 3399.9900
ロード-750 ブラック、52 539.9900
... ...

データをグループ化および集計するには、 groupBy メソッドと集計関数を使用できます。 たとえば、次の PySpark コードでは、各カテゴリの製品数をカウントします。

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

このコード例の結果は次のようになります。

カテゴリ カウント
ヘッドセット 3
ホイール 14
マウンテン バイク 32
... ...

Spark で SQL 式を使用する

Dataframe API は Spark SQL という名前の Spark ライブラリの一部であり、データ アナリストは SQL 式を使用してデータのクエリと操作を行います。

Spark カタログでデータベース オブジェクトを作成する

Spark カタログは、ビューやテーブルなどのリレーショナル データ オブジェクトのメタストアです。 Spark ランタイムでは、このカタログを使用して、任意の Spark 対応言語で記述されたコードと、一部のデータ アナリストや開発者にとってより自然な SQL 式をシームレスに統合できます。

Spark カタログでクエリを実行するためにデータフレーム内のデータを使用できるようにする最も簡単な方法の 1 つは、次のコード例に示すように、一時ビューを作成することです。

df.createOrReplaceTempView("products")

"ビュー" は一時的なもので、現在のセッションの終了時に自動的に削除されます。 また、カタログに保持される "テーブル" を作成して、Spark SQL を使用してクエリを実行できるデータベースを定義することもできます。

注意

このモジュールでは Spark カタログ テーブルについて詳しく説明しませんが、いくつかの重要な点を確認しておくことをお勧めします。

  • spark.catalog.createTable メソッドを使用して、空のテーブルを作成できます。 テーブルは、カタログに関連付けられているストレージの場所に、基になるデータを格納するメタデータ構造です。 テーブルを削除すると、基になるデータも削除されます。
  • データフレームをテーブルとして保存するには、saveAsTable メソッドを使用します。
  • メソッドを使用して "外部"spark.catalog.createExternalTable テーブルを作成できます。 外部テーブルではカタログ内のメタデータが定義されますが、外部ストレージの場所 (通常は、データ レイク内のフォルダー) から基になるデータが取得されます。 外部テーブルを削除しても、基になるデータは削除されません。

Spark SQL API を使用してデータのクエリを実行する

任意の言語で記述されたコードで Spark SQL API を使用して、カタログ内のデータに対してクエリを実行できます。 たとえば、次の PySpark コードでは、SQL クエリを使用して 製品 ビューからデータフレームとしてデータを返します。

bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
                      FROM products \
                      WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)

このコード例の結果は、次の表のようになります。

ProductName 定価
Mountain-100 Silver、38 3399.9900
ロード-750 ブラック、52 539.9900
... ...

SQL コードを使用する

前の例では、Spark SQL API を使用して、Spark コードに SQL 式を埋め込む方法を示しました。 また、ノートブックで %sql マジックを使用して、次のようにカタログ内のオブジェクトに対してクエリを行う SQL コードを実行することもできます。

%sql

SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category

この SQL コード例では、次のように、ノートブックにテーブルとして自動的に表示される結果セットが返されます。

カテゴリ 製品数
ビブショーツ 3
バイク ラック 1
バイク スタンド 1
... ...