次の方法で共有


チュートリアル: Apache Spark DataFrame を使用してデータを読み込んで変換する

このチュートリアルでは、Azure Databricks で Apache Spark Python (PySpark) DataFrame API、Apache Spark Scala DataFrame API、SparkR SparkDataFrame API を使ってデータを読み込んで変換する方法について説明します。

このチュートリアルを最後まで進めると、DataFrame とは何であるかを理解し、以下のタスクを快適に実行できます。

Python(プログラミング言語)

Apache Spark PySpark API リファレンスも参照してください。

スカラ (プログラミング言語)

Apache Spark Scala API リファレンスも参照してください。

R

Apache SparkR API リファレンスも参照してください。

DataFrame とは

DataFrame は、潜在的に異なる型の列を持つ 2 次元のラベル付きデータ構造です。 DataFrame は、スプレッドシート、SQL テーブル、または一連のオブジェクトのディクショナリのようなものと考えることができます。 Apache Spark DataFrames には、一般的なデータ分析の問題を効率的に解決できるようにする豊富な機能セット (列の選択、フィルター、結合、集計) が用意されています。

Apache Spark DataFrames は、Resilient Distributed Datasets (RDD) に基づいて構築された抽象化です。 Spark DataFrames と Spark SQL では、統合された計画と最適化エンジンが使用されるため、Azure Databricks でサポートされているすべての言語 (Python、SQL、Scala、R) でほぼ同じパフォーマンスを得ることができます。

要件

次のチュートリアルを完了するには、次の要件を満たす必要があります。

  • このチュートリアルの例を使用するには、ワークスペースで Unity カタログ が有効になっている必要があります。

  • このチュートリアルの例では、Unity カタログ ボリューム を使用してサンプル データを格納します。 これらの例を使用するには、ボリュームを作成し、そのボリュームのカタログ、スキーマ、およびボリューム名を使用して、例で使用されるボリューム パスを設定します。

  • Unity カタログには、次のアクセス許可が必要です。

    • このチュートリアルで使うボリュームに対する READ VOLUMEWRITE VOLUME、または ALL PRIVILEGES
    • USE SCHEMA または ALL PRIVILEGES は、このチュートリアルで使用されるスキーマです。
    • このチュートリアルで使用するカタログとしてUSE CATALOGまたはALL PRIVILEGESを使用します。

    これらのアクセス許可を設定するには、Databricks 管理者または Unity カタログの権限とセキュリティ保護可能なオブジェクトを参照してください。

ヒント

この記事の完成したノートブックについては、「DataFrame チュートリアルのノートブック」を参照してください。

ステップ 1: 変数を定義して CSV ファイルを読み込む

この手順では、このチュートリアルで使用する変数を定義し、赤ちゃんの名前データを含む CSV ファイルを health.data.ny.gov から Unity カタログ ボリュームに読み込みます。

  1. 新規アイコン アイコンをクリックして、新しいノートブックを開きます。 Azure Databricks ノートブックを操作する方法については、「ノートブックの外観をカスタマイズする」を参照してください。

  2. 次のコードをコピーして、新しい空のノートブック セルに貼り付けます。 <catalog-name><schema-name><volume-name> を、Unity Catalog ボリュームのカタログ、スキーマ、ボリュームの名前に置き換えます。 <table_name>を、選択したテーブル名に置き換えます。 このチュートリアルの後半で、このテーブルに赤ちゃんの名前データを読み込みます。

    Python(プログラミング言語)

    catalog = "<catalog_name>"
    schema = "<schema_name>"
    volume = "<volume_name>"
    download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name = "rows.csv"
    table_name = "<table_name>"
    path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
    path_table = catalog + "." + schema
    print(path_table) # Show the complete path
    print(path_volume) # Show the complete path
    

    スカラ (プログラミング言語)

    val catalog = "<catalog_name>"
    val schema = "<schema_name>"
    val volume = "<volume_name>"
    val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    val fileName = "rows.csv"
    val tableName = "<table_name>"
    val pathVolume = s"/Volumes/$catalog/$schema/$volume"
    val pathTable = s"$catalog.$schema"
    print(pathVolume) // Show the complete path
    print(pathTable) // Show the complete path
    

    R

    catalog <- "<catalog_name>"
    schema <- "<schema_name>"
    volume <- "<volume_name>"
    download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name <- "rows.csv"
    table_name <- "<table_name>"
    path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
    path_table <- paste(catalog, ".", schema, sep = "")
    print(path_volume) # Show the complete path
    print(path_table) # Show the complete path
    
  3. Shift+Enter キーを押してセルを実行し、新しい空のセルを作成します。

  4. 次のコードをコピーして、新しい空のノートブック セルに貼り付けます。 このコードでは、rows.csv コマンドを使って、health.data.ny.gov から Unity Catalog ボリュームに ファイルをコピーします。

    Python(プログラミング言語)

    dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
    

    スカラ (プログラミング言語)

    dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
    

    R

    dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
    
  5. Shift+Enter キーを押してセルを実行してから、次のセルに移動します。

ステップ 2: DataFrame を作成する

このステップでは、テスト データを含む df1 という名前の DataFrame を作成してから、その内容を表示します。

  1. 次のコードをコピーして、新しい空のノートブック セルに貼り付けます。 このコードでは、テスト データを含む DataFrame を作成し、DataFrame の内容とスキーマを表示します。

    Python(プログラミング言語)

    data = [[2021, "test", "Albany", "M", 42]]
    columns = ["Year", "First_Name", "County", "Sex", "Count"]
    
    df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    スカラ (プログラミング言語)

    val data = Seq((2021, "test", "Albany", "M", 42))
    val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
    
    val df1 = data.toDF(columns: _*)
    display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
    // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    R

    # Load the SparkR package that is already preinstalled on the cluster.
    library(SparkR)
    
    data <- data.frame(
      Year = as.integer(c(2021)),
      First_Name = c("test"),
      County = c("Albany"),
      Sex = c("M"),
      Count = as.integer(c(42))
    )
    
    df1 <- createDataFrame(data)
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
    
  2. Shift+Enter キーを押してセルを実行してから、次のセルに移動します。

ステップ 3: CSV ファイルから DataFrame にデータを読み込む

この手順では、以前に Unity カタログ ボリュームに読み込んだ CSV ファイルから、 df_csv という名前の DataFrame を作成します。 spark.read.csv を参照してください。

  1. 次のコードをコピーして、新しい空のノートブック セルに貼り付けます。 このコードでは、赤ちゃんの名前のデータを CSV ファイルから DataFrame df_csv に読み込み、DataFrame の内容を表示します。

    Python(プログラミング言語)

    df_csv = spark.read.csv(f"{path_volume}/{file_name}",
        header=True,
        inferSchema=True,
        sep=",")
    display(df_csv)
    

    スカラ (プログラミング言語)

    val dfCsv = spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .option("delimiter", ",")
        .csv(s"$pathVolume/$fileName")
    
    display(dfCsv)
    

    R

    df_csv <- read.df(paste(path_volume, "/", file_name, sep=""),
        source="csv",
        header = TRUE,
        inferSchema = TRUE,
        delimiter = ",")
    
    display(df_csv)
    
  2. Shift+Enter キーを押してセルを実行してから、次のセルに移動します。

サポートされている多くのファイル形式からデータを読み込むことができます。

ステップ 4: DataFrame を表示して操作する

次の方法を使って、赤ちゃんの名前の DataFrame を表示して操作します。

Apache Spark DataFrame のスキーマを表示する方法について説明します。 Apache Spark では、 スキーマ という用語を使用して、DataFrame 内の列の名前とデータ型を参照します。

また、Azure Databricks では、スキーマという用語を使用して、カタログに登録されているテーブルのコレクションを記述します。

  1. 次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、2 つの DataFrame のスキーマを表示する .printSchema() メソッドを使用して DataFrames のスキーマを示します。2 つの DataFrame を結合するための準備をします。

    Python(プログラミング言語)

    df_csv.printSchema()
    df1.printSchema()
    

    スカラ (プログラミング言語)

    dfCsv.printSchema()
    df1.printSchema()
    

    R

    printSchema(df_csv)
    printSchema(df1)
    
  2. Shift+Enter キーを押してセルを実行してから、次のセルに移動します。

DataFrame の列の名前を変更する

DataFrame の列の名前を変更する方法について説明します。

  1. 次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードは、 df1_csv DataFrame 内の列の名前を、 df1 DataFrame 内のそれぞれの列と一致するように変更します。 このコードでは、Apache Spark withColumnRenamed() メソッドを使います。

    Python(プログラミング言語)

    df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
    df_csv.printSchema
    

    スカラ (プログラミング言語)

    val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name")
    // when modifying a DataFrame in Scala, you must assign it to a new variable
    dfCsvRenamed.printSchema()
    

    R

    df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
    printSchema(df_csv)
    
  2. Shift+Enter キーを押してセルを実行してから、次のセルに移動します。

DataFrame を結合する

1 つの DataFrame の行を別のものに追加する新しい DataFrame を作成する方法を説明します。

  1. 次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、Apache Spark union() メソッドを使って、最初の DataFrame df の内容と、CSV ファイルから読み込まれた赤ちゃんの名前のデータを含む DataFrame df_csv を結合します。

    Python(プログラミング言語)

    df = df1.union(df_csv)
    display(df)
    

    スカラ (プログラミング言語)

    val df = df1.union(dfCsvRenamed)
    display(df)
    

    R

    display(df <- union(df1, df_csv))
    
  2. Shift+Enter キーを押してセルを実行してから、次のセルに移動します。

DataFrame で行をフィルター処理する

Apache Spark .filter() または .where() メソッドを使用して行をフィルター処理することで、データ セット内で最も一般的な赤ちゃんの名前を見つけられます。 フィルター処理を使用して、DataFrame で返す行または変更する行のサブセットを選択します。 以下の例に示すように、パフォーマンスや構文に違いはありません。

.filter() メソッドの使用

  1. 次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、Apache Spark .filter() メソッドを使って、DataFrame 内で数が 50 より多い行を表示します。

    Python(プログラミング言語)
    display(df.filter(df["Count"] > 50))
    
    スカラ (プログラミング言語)
    display(df.filter(df("Count") > 50))
    
    R
    display(filteredDF <- filter(df, df$Count > 50))
    
  2. Shift+Enter キーを押してセルを実行してから、次のセルに移動します。

.where() メソッドの使用

  1. 次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、Apache Spark .where() メソッドを使って、DataFrame 内で数が 50 より多い行を表示します。

    Python(プログラミング言語)
    display(df.where(df["Count"] > 50))
    
    スカラ (プログラミング言語)
    display(df.where(df("Count") > 50))
    
    R
    display(filtered_df <- where(df, df$Count > 50))
    
  2. Shift+Enter キーを押してセルを実行してから、次のセルに移動します。

データフレームから列を選択し、頻度で並べ替えます

赤ちゃんの名前の頻度について学ぶために、返す DataFrame の列を指定するselect()方法を使用します。 Apache Spark orderbydesc 関数を使って結果を並べ替えます。

Apache Spark 用の pyspark.sql モジュールでは、SQL 関数がサポートされています。 このチュートリアルで使うこれらの関数の中に、Apache Spark orderBy()desc()expr() 関数があります。 必要に応じてこれらの関数をセッションにインポートし、それらを使用できるようにします。

  1. 次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、desc() 関数をインポートしてから、Apache Spark select() メソッドおよび Apache Spark orderBy()desc() 関数を使って、最も一般的な名前とその数を降順に表示します。

    Python(プログラミング言語)

    from pyspark.sql.functions import desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    

    スカラ (プログラミング言語)

    import org.apache.spark.sql.functions.desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    

    R

    display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
    
  2. Shift+Enter キーを押してセルを実行してから、次のセルに移動します。

サブセットの DataFrame を作成する

既存の DataFrame からサブセット DataFrame を作成する方法を説明します。

  1. 次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、Apache Spark filter メソッドを使い、年、数、性別でデータを制限して新しい DataFrame を作成します。 Apache Spark select() メソッドを使用して列を制限します。 また、Apache Spark orderBy()desc() 関数を使って、数の順で新しい DataFrame を並べ替えます。

    Python(プログラミング言語)

    subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    display(subsetDF)
    

    スカラ (プログラミング言語)

    val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    
    display(subsetDF)
    

    R

    subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
    display(subsetDF)
    
  2. Shift+Enter キーを押してセルを実行してから、次のセルに移動します。

ステップ 5: DataFrameを保存する

DataFrame を保存する方法を説明します。 DataFrame をテーブルに保存するか、データフレームをファイルまたは複数のファイルに書き込むことができます。

DataFrame をテーブルに保存する

Azure Databricks では、既定ですべてのテーブルに Delta Lake 形式が使用されます。 DataFrame を保存するには、カタログとスキーマに対する CREATE テーブル権限が必要です。

  1. 次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、このチュートリアルの開始時に定義した変数を使用して、DataFrame の内容をテーブルに保存します。

    Python(プログラミング言語)

    df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
    

    スカラ (プログラミング言語)

    df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
    

    R

    saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
    
  2. Shift+Enter キーを押してセルを実行してから、次のセルに移動します。

ほとんどの Apache Spark アプリケーションは、大規模なデータ セットに対して分散方式で動作します。 Apache Spark は、1 つのファイルではなく、ファイルのディレクトリを書き出します。 Delta Lake では Parquet フォルダーとファイルを分割します。 多くのデータ システムでは、これらのファイルのディレクトリを読み取ることができます。 Azure Databricks では、ほとんどのアプリケーションでファイル パスに対してテーブルを使用することをお勧めします。

DataFrame を JSON ファイルに保存する

  1. 次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードは、DataFrame を JSON ファイルのディレクトリに保存します。

    Python(プログラミング言語)

    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    

    スカラ (プログラミング言語)

    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    

    R

    write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
    
  2. Shift+Enter キーを押してセルを実行してから、次のセルに移動します。

DataFrame を JSON ファイルから読み取る

Apache Spark spark.read.format() メソッドを使って、ディレクトリから DataFrame に JSON データを読み取る方法を説明します。

  1. 次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、前の例で保存した JSON ファイルが表示されます。

    Python(プログラミング言語)

    display(spark.read.format("json").json("/tmp/json_data"))
    

    スカラ (プログラミング言語)

    display(spark.read.format("json").json("/tmp/json_data"))
    

    R

    display(read.json("/tmp/json_data"))
    
  2. Shift+Enter キーを押してセルを実行してから、次のセルに移動します。

追加タスク: PySpark、Scala、R で SQL クエリを実行する

Apache Spark の DataFrame には、SQL と PySpark、Scala、R を組み合わせる次のオプションがあります。このチュートリアル用に作成したのと同じノートブックで、次のコードを実行できます。

SQL クエリとして列を指定する

Apache Spark の selectExpr() メソッドの使用方法を説明します。 これは、SQL 式を受け取って更新された DataFrame を返す select() メソッドのバリエーションです。 このメソッドでは、upper などの SQL 式を使用できます。

  1. 次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、Apache Spark selectExpr() メソッドと SQL upper 式を使用して、文字列列を大文字に変換します (列の名前を変更します)。

    Python(プログラミング言語)

    display(df.selectExpr("Count", "upper(County) as big_name"))
    

    スカラ (プログラミング言語)

    display(df.selectExpr("Count", "upper(County) as big_name"))
    

    R

    display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
    
  2. Shift+Enter キーを押してセルを実行してから、次のセルに移動します。

expr()を使用して列に SQL 構文を使用する

Apache Spark expr() 関数をインポートして使用して、列を指定する任意の場所で SQL 構文を使用する方法について説明します。

  1. 次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、 expr() 関数をインポートし、Apache Spark expr() 関数と SQL lower 式を使用して、文字列列を小文字に変換します (列の名前を変更します)。

    Python(プログラミング言語)

    from pyspark.sql.functions import expr
    display(df.select("Count", expr("lower(County) as little_name")))
    

    スカラ (プログラミング言語)

    import org.apache.spark.sql.functions.{col, expr}
    // Scala requires us to import the col() function as well as the expr() function
    
    display(df.select(col("Count"), expr("lower(County) as little_name")))
    

    R

    display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
    # expr() function is not supported in R, selectExpr in SparkR replicates this functionality
    
  2. Shift+Enter キーを押してセルを実行してから、次のセルに移動します。

spark.sql() 関数を使用して任意の SQL クエリを実行する

Apache Spark の spark.sql() 関数を使って任意の SQL クエリを実行する方法を説明します。

  1. 次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードでは、Apache Spark spark.sql() 関数を使用して、SQL 構文を使用して SQL テーブルに対してクエリを実行します。

    Python(プログラミング言語)

    display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
    

    スカラ (プログラミング言語)

    display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
    

    R

    display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
    
  2. Shift+Enter キーを押してセルを実行してから、次のセルに移動します。

DataFrame チュートリアルのノートブック

以下のノートブックには、このチュートリアルのクエリ例が含まれています。

Python(プログラミング言語)

Python を使用する DataFrame チュートリアル

ノートブックを入手

スカラ (プログラミング言語)

Scala を使用する DataFrame チュートリアル

ノートブックを入手

R

R を使用する DataFrame チュートリアル

ノートブックを入手

その他のリソース