次の方法で共有


R で DataFrame とテーブルを操作する

Von Bedeutung

Databricks Runtime 16.0 以降では、Databricks の SparkR は 非推奨 となります。 Databricks では、代わりに sparklyr 使用することをお勧めします。

この記事では、 SparkRsparklyrdplyr などの R パッケージを使用して、R data.frameSpark DataFrames、およびメモリ内テーブルを操作する方法について説明します。

SparkR、sparklyr、および dplyr を使用する場合、これらのすべてのパッケージで特定の操作を完了でき、最も使い慣れたパッケージを使用できることに注意してください。 たとえば、クエリを実行するには、 SparkR::sqlsparklyr::sdf_sqldplyr::selectなどの関数を呼び出すことができます。 場合によっては、これらのパッケージのうち 1 つまたは 2 つだけで操作を完了できる場合があり、選択する操作は使用シナリオによって異なります。 たとえば、 sparklyr::sdf_quantile の呼び出し方法は、両方の関数が分位点を計算する場合でも、 dplyr::percentile_approxの呼び出し方法とは若干異なります。

SparkR と sparklyr の間のブリッジとして SQL を使用できます。 たとえば、 SparkR::sql を使用して、sparklyr を使用して作成したテーブルに対してクエリを実行できます。 sparklyr::sdf_sqlを使用して、SparkR で作成したテーブルに対してクエリを実行できます。 また、 dplyr コードは、実行前に常にメモリ内の SQL に変換されます。 API の相互運用性SQL 変換も参照してください。

SparkR、sparklyr、および dplyr を読み込む

SparkR、sparklyr、および dplyr パッケージは、Azure Databricks クラスターにインストールされている Databricks ランタイムに含まれています。 そのため、これらのパッケージの呼び出しを開始する前に、通常の install.package を呼び出す必要はありません。 ただし、まずこれらのパッケージはlibraryで読み込む必要があります。 たとえば、Azure Databricks ワークスペースの R ノートブック 内からノートブック セルで次のコードを実行して、SparkR、sparklyr、dplyr を読み込みます。

library(SparkR)
library(sparklyr)
library(dplyr)

sparklyr をクラスターに接続する

sparklyr を読み込んだ後、 sparklyr::spark_connect を呼び出してクラスターに接続し、 databricks 接続方法を指定する必要があります。 たとえば、ノートブック セルで次のコードを実行して、ノートブックをホストするクラスターに接続します。

sc <- spark_connect(method = "databricks")

これに対し、Azure Databricks ノートブックでは、SparkR で使用するためにクラスターに SparkSession が既に確立されているため、SparkR の呼び出しを開始する前に SparkR::sparkR.session を呼び出す必要はありません。

JSON データ ファイルをワークスペースにアップロードする

この記事のコード例の多くは、Azure Databricks ワークスペース内の特定の場所にある、特定の列名とデータ型を持つデータに基づいています。 このコード例のデータは、GitHub 内から book.json という名前の JSON ファイルに由来します。 このファイルを取得してワークスペースにアップロードするには:

  1. GitHub の books.json ファイルに移動し、テキスト エディターを使用して、その内容をローカル コンピューターのどこか books.json という名前のファイルにコピーします。
  2. Azure Databricks ワークスペースのサイドバーで、[ カタログ] をクリックします。
  3. [ テーブルの作成] をクリックします。
  4. [ ファイルのアップロード ] タブで、 books.json ファイルをローカル コンピューターから [アップロードするファイルのドロップ] ボックスにドロップ します。 または、 クリックして参照を選択し、ローカル コンピューターから books.json ファイルを参照します。

既定では、Azure Databricks は、パス books.jsonを使用して、ローカル ファイルをワークスペース内の /FileStore/tables/books.json の場所にアップロードします。

[CREATE Table with UI]\(UI でテーブルを作成する\) または [Create Table in Notebook]\(ノートブックのテーブルの作成\) をクリックしないでください。 この記事のコード例では、この DBFS の場所にあるアップロードされた books.json ファイル内のデータを使用します。

JSON データを DataFrame に読み込む

sparklyr::spark_read_jsonを使用して、アップロードされた JSON ファイルを DataFrame に読み込み、接続、JSON ファイルへのパス、およびデータの内部テーブル表現の名前を指定します。 この例では、 book.json ファイルに複数の行が含まれていることを指定する必要があります。 ここで列のスキーマを指定することは省略可能です。 それ以外の場合、sparklyr は既定で列のスキーマを推論します。 たとえば、ノートブック セルで次のコードを実行して、アップロードされた JSON ファイルのデータを jsonDF という名前の DataFrame に読み取ります。

jsonDF <- spark_read_json(
  sc      = sc,
  name    = "jsonTable",
  path    = "/FileStore/tables/books.json",
  options = list("multiLine" = TRUE),
  columns = c(
    author    = "character",
    country   = "character",
    imageLink = "character",
    language  = "character",
    link      = "character",
    pages     = "integer",
    title     = "character",
    year      = "integer"
  )
)

SparkR::headSparkR::show、またはsparklyr::collectを使用して、DataFrame の最初の行を印刷できます。 既定では、 head は既定で最初の 6 行を出力します。 showcollect は最初の 10 行を印刷します。 たとえば、ノートブック セルで次のコードを実行して、 jsonDFという名前の DataFrame の最初の行を出力します。

head(jsonDF)

# Source: spark<?> [?? x 8]
#   author                  country        image…¹ langu…² link  pages title  year
#   <chr>                   <chr>          <chr>   <chr>   <chr> <int> <chr> <int>
# 1 Chinua Achebe           Nigeria        images… English "htt…   209 Thin…  1958
# 2 Hans Christian Andersen Denmark        images… Danish  "htt…   784 Fair…  1836
# 3 Dante Alighieri         Italy          images… Italian "htt…   928 The …  1315
# 4 Unknown                 Sumer and Akk… images… Akkadi… "htt…   160 The … -1700
# 5 Unknown                 Achaemenid Em… images… Hebrew  "htt…   176 The …  -600
# 6 Unknown                 India/Iran/Ir… images… Arabic  "htt…   288 One …  1200
# … with abbreviated variable names ¹​imageLink, ²​language

show(jsonDF)

# Source: spark<jsonTable> [?? x 8]
#    author                  country       image…¹ langu…² link  pages title  year
#    <chr>                   <chr>         <chr>   <chr>   <chr> <int> <chr> <int>
#  1 Chinua Achebe           Nigeria       images… English "htt…   209 Thin…  1958
#  2 Hans Christian Andersen Denmark       images… Danish  "htt…   784 Fair…  1836
#  3 Dante Alighieri         Italy         images… Italian "htt…   928 The …  1315
#  4 Unknown                 Sumer and Ak… images… Akkadi… "htt…   160 The … -1700
#  5 Unknown                 Achaemenid E… images… Hebrew  "htt…   176 The …  -600
#  6 Unknown                 India/Iran/I… images… Arabic  "htt…   288 One …  1200
#  7 Unknown                 Iceland       images… Old No… "htt…   384 Njál…  1350
#  8 Jane Austen             United Kingd… images… English "htt…   226 Prid…  1813
#  9 Honoré de Balzac        France        images… French  "htt…   443 Le P…  1835
# 10 Samuel Beckett          Republic of … images… French… "htt…   256 Moll…  1952
# … with more rows, and abbreviated variable names ¹​imageLink, ²​language
# ℹ Use `print(n = ...)` to see more rows

collect(jsonDF)

# A tibble: 100 × 8
#    author                  country       image…¹ langu…² link  pages title  year
#    <chr>                   <chr>         <chr>   <chr>   <chr> <int> <chr> <int>
#  1 Chinua Achebe           Nigeria       images… English "htt…   209 Thin…  1958
#  2 Hans Christian Andersen Denmark       images… Danish  "htt…   784 Fair…  1836
#  3 Dante Alighieri         Italy         images… Italian "htt…   928 The …  1315
#  4 Unknown                 Sumer and Ak… images… Akkadi… "htt…   160 The … -1700
#  5 Unknown                 Achaemenid E… images… Hebrew  "htt…   176 The …  -600
#  6 Unknown                 India/Iran/I… images… Arabic  "htt…   288 One …  1200
#  7 Unknown                 Iceland       images… Old No… "htt…   384 Njál…  1350
#  8 Jane Austen             United Kingd… images… English "htt…   226 Prid…  1813
#  9 Honoré de Balzac        France        images… French  "htt…   443 Le P…  1835
# 10 Samuel Beckett          Republic of … images… French… "htt…   256 Moll…  1952
# … with 90 more rows, and abbreviated variable names ¹​imageLink, ²​language
# ℹ Use `print(n = ...)` to see more rows

SQL クエリを実行し、テーブルへの書き込みとテーブルからの読み取りを行う

dplyr 関数を使用して、DataFrame で SQL クエリを実行できます。 たとえば、ノートブック セルで次のコードを実行して、 dplyr::group_bydployr::count を使用して、 jsonDFという名前の DataFrame から作成者別のカウントを取得します。 dplyr::arrangedplyr::descを使用して、結果をカウントで降順に並べ替えます。 次に、既定で最初の 10 行を印刷します。

group_by(jsonDF, author) %>%
  count() %>%
  arrange(desc(n))

# Source:     spark<?> [?? x 2]
# Ordered by: desc(n)
#    author                     n
#    <chr>                  <dbl>
#  1 Fyodor Dostoevsky          4
#  2 Unknown                    4
#  3 Leo Tolstoy                3
#  4 Franz Kafka                3
#  5 William Shakespeare        3
#  6 William Faulkner           2
#  7 Gustave Flaubert           2
#  8 Homer                      2
#  9 Gabriel García Márquez     2
# 10 Thomas Mann                2
# … with more rows
# ℹ Use `print(n = ...)` to see more rows

その後、 sparklyr::spark_write_table を使用して、Azure Databricks のテーブルに結果を書き込みます。 たとえば、ノートブック セルで次のコードを実行してクエリを再実行し、結果を json_books_agg という名前のテーブルに書き込みます。

group_by(jsonDF, author) %>%
  count() %>%
  arrange(desc(n)) %>%
  spark_write_table(
    name = "json_books_agg",
    mode = "overwrite"
  )

テーブルが作成されたことを確認するには、 sparklyr::sdf_sqlSparkR::showDF を使用してテーブルのデータを表示できます。 たとえば、ノートブック セルで次のコードを実行してテーブルを DataFrame に照会し、 sparklyr::collect を使用して DataFrame の最初の 10 行を既定で出力します。

collect(sdf_sql(sc, "SELECT * FROM json_books_agg"))

# A tibble: 82 × 2
#    author                     n
#    <chr>                  <dbl>
#  1 Fyodor Dostoevsky          4
#  2 Unknown                    4
#  3 Leo Tolstoy                3
#  4 Franz Kafka                3
#  5 William Shakespeare        3
#  6 William Faulkner           2
#  7 Homer                      2
#  8 Gustave Flaubert           2
#  9 Gabriel García Márquez     2
# 10 Thomas Mann                2
# … with 72 more rows
# ℹ Use `print(n = ...)` to see more rows

また、 sparklyr::spark_read_table を使用して同様の操作を行うこともできます。 たとえば、ノートブック セルで次のコードを実行して、 jsonDF という名前の上記の DataFrame を DataFrame に照会し、 sparklyr::collect を使用して DataFrame の最初の 10 行を既定で出力します。

fromTable <- spark_read_table(
  sc   = sc,
  name = "json_books_agg"
)

collect(fromTable)

# A tibble: 82 × 2
#    author                     n
#    <chr>                  <dbl>
#  1 Fyodor Dostoevsky          4
#  2 Unknown                    4
#  3 Leo Tolstoy                3
#  4 Franz Kafka                3
#  5 William Shakespeare        3
#  6 William Faulkner           2
#  7 Homer                      2
#  8 Gustave Flaubert           2
#  9 Gabriel García Márquez     2
# 10 Thomas Mann                2
# … with 72 more rows
# ℹ Use `print(n = ...)` to see more rows

DataFrame に列を追加し、列の値を計算する

dplyr 関数を使用すると、列を DataFrames に追加したり、列の値を計算したりできます。

たとえば、ノートブック セルで次のコードを実行して、 jsonDFという名前の DataFrame の内容を取得します。 dplyr::mutateを使用して today という名前の列を追加し、この新しい列に現在のタイムスタンプを入力します。 次に、これらの内容を withDate という名前の新しい DataFrame に書き込み、 dplyr::collect を使用して、新しい DataFrame の最初の 10 行を既定で印刷します。

dplyr::mutate は、Hive の組み込み関数 (UDF とも呼ばれます) と組み込みの集計関数 (UDF とも呼ばれます) に準拠する引数のみを受け入れます。 一般的な情報については、「 Hive 関数」を参照してください。 このセクションの日付関連関数の詳細については、「 日付関数」を参照してください。

withDate <- jsonDF %>%
  mutate(today = current_timestamp())

collect(withDate)

# A tibble: 100 × 9
#    author    country image…¹ langu…² link  pages title  year today
#    <chr>     <chr>   <chr>   <chr>   <chr> <int> <chr> <int> <dttm>
#  1 Chinua A… Nigeria images… English "htt…   209 Thin…  1958 2022-09-27 21:32:59
#  2 Hans Chr… Denmark images… Danish  "htt…   784 Fair…  1836 2022-09-27 21:32:59
#  3 Dante Al… Italy   images… Italian "htt…   928 The …  1315 2022-09-27 21:32:59
#  4 Unknown   Sumer … images… Akkadi… "htt…   160 The … -1700 2022-09-27 21:32:59
#  5 Unknown   Achaem… images… Hebrew  "htt…   176 The …  -600 2022-09-27 21:32:59
#  6 Unknown   India/… images… Arabic  "htt…   288 One …  1200 2022-09-27 21:32:59
#  7 Unknown   Iceland images… Old No… "htt…   384 Njál…  1350 2022-09-27 21:32:59
#  8 Jane Aus… United… images… English "htt…   226 Prid…  1813 2022-09-27 21:32:59
#  9 Honoré d… France  images… French  "htt…   443 Le P…  1835 2022-09-27 21:32:59
# 10 Samuel B… Republ… images… French… "htt…   256 Moll…  1952 2022-09-27 21:32:59
# … with 90 more rows, and abbreviated variable names ¹​imageLink, ²​language
# ℹ Use `print(n = ...)` to see more rows

dplyr::mutateを使用して、withDate DataFrame の内容にさらに 2 つの列を追加します。 新しい month 列と year 列には、 today 列の月と年の数値が含まれます。 次に、 withMMyyyyという名前の新しい DataFrame にこれらの内容を書き込み、 dplyr::select と共に dplyr::collect を使用して、新しい DataFrame の最初の 10 行の authortitlemonth 、および year 列を既定で印刷します。

withMMyyyy <- withDate %>%
  mutate(month = month(today),
         year  = year(today))

collect(select(withMMyyyy, c("author", "title", "month", "year")))

# A tibble: 100 × 4
#    author                  title                                     month  year
#    <chr>                   <chr>                                     <int> <int>
#  1 Chinua Achebe           Things Fall Apart                             9  2022
#  2 Hans Christian Andersen Fairy tales                                   9  2022
#  3 Dante Alighieri         The Divine Comedy                             9  2022
#  4 Unknown                 The Epic Of Gilgamesh                         9  2022
#  5 Unknown                 The Book Of Job                               9  2022
#  6 Unknown                 One Thousand and One Nights                   9  2022
#  7 Unknown                 Njál's Saga                                   9  2022
#  8 Jane Austen             Pride and Prejudice                           9  2022
#  9 Honoré de Balzac        Le Père Goriot                                9  2022
# 10 Samuel Beckett          Molloy, Malone Dies, The Unnamable, the …     9  2022
# … with 90 more rows
# ℹ Use `print(n = ...)` to see more rows

dplyr::mutateを使用して、withMMyyyy DataFrame の内容にさらに 2 つの列を追加します。 新しいformatted_date列にはyyyy-MM-dd列のtoday部分が含まれますが、新しいday列には新しいformatted_date列からの数値の日が含まれます。 次に、 withUnixTimestampという名前の新しい DataFrame にこれらの内容を書き込み、 dplyr::selectdplyr::collect を使用して、新しい DataFrame の最初の 10 行の titleformatted_date、および day 列を既定で印刷します。

withUnixTimestamp <- withMMyyyy %>%
  mutate(formatted_date = date_format(today, "yyyy-MM-dd"),
         day            = dayofmonth(formatted_date))

collect(select(withUnixTimestamp, c("title", "formatted_date", "day")))

# A tibble: 100 × 3
#    title                                           formatted_date   day
#    <chr>                                           <chr>          <int>
#  1 Things Fall Apart                               2022-09-27        27
#  2 Fairy tales                                     2022-09-27        27
#  3 The Divine Comedy                               2022-09-27        27
#  4 The Epic Of Gilgamesh                           2022-09-27        27
#  5 The Book Of Job                                 2022-09-27        27
#  6 One Thousand and One Nights                     2022-09-27        27
#  7 Njál's Saga                                     2022-09-27        27
#  8 Pride and Prejudice                             2022-09-27        27
#  9 Le Père Goriot                                  2022-09-27        27
# 10 Molloy, Malone Dies, The Unnamable, the trilogy 2022-09-27        27
# … with 90 more rows
# ℹ Use `print(n = ...)` to see more rows

一時ビューを作成する

既存の DataFrame に基づく名前付き一時ビューをメモリ内に作成できます。 たとえば、ノートブック セルで次のコードを実行して、 SparkR::createOrReplaceTempView を使用して jsonTable という名前の前の DataFrame の内容を取得し、 timestampTableという名前の一時ビューを作成します。 次に、 sparklyr::spark_read_table を使用して、一時ビューの内容を読み取ります。 sparklyr::collectを使用して、一時テーブルの最初の 10 行を既定で印刷します。

createOrReplaceTempView(withTimestampDF, viewName = "timestampTable")

spark_read_table(
  sc = sc,
  name = "timestampTable"
) %>% collect()

# A tibble: 100 × 10
#    author    country image…¹ langu…² link  pages title  year today
#    <chr>     <chr>   <chr>   <chr>   <chr> <int> <chr> <int> <dttm>
#  1 Chinua A… Nigeria images… English "htt…   209 Thin…  1958 2022-09-27 21:11:56
#  2 Hans Chr… Denmark images… Danish  "htt…   784 Fair…  1836 2022-09-27 21:11:56
#  3 Dante Al… Italy   images… Italian "htt…   928 The …  1315 2022-09-27 21:11:56
#  4 Unknown   Sumer … images… Akkadi… "htt…   160 The … -1700 2022-09-27 21:11:56
#  5 Unknown   Achaem… images… Hebrew  "htt…   176 The …  -600 2022-09-27 21:11:56
#  6 Unknown   India/… images… Arabic  "htt…   288 One …  1200 2022-09-27 21:11:56
#  7 Unknown   Iceland images… Old No… "htt…   384 Njál…  1350 2022-09-27 21:11:56
#  8 Jane Aus… United… images… English "htt…   226 Prid…  1813 2022-09-27 21:11:56
#  9 Honoré d… France  images… French  "htt…   443 Le P…  1835 2022-09-27 21:11:56
# 10 Samuel B… Republ… images… French… "htt…   256 Moll…  1952 2022-09-27 21:11:56
# … with 90 more rows, 1 more variable: month <chr>, and abbreviated variable
#   names ¹​imageLink, ²​language
# ℹ Use `print(n = ...)` to see more rows, and `colnames()` to see all variable names

DataFrame で統計分析を実行する

sparklyr と dplyr を統計分析に使用できます。

たとえば、統計を実行する DataFrame を作成します。 これを行うには、ノートブック セルで次のコードを実行し、 sparklyr::sdf_copy_to を使用して、R に組み込まれている iris データセットの内容を iris という名前の DataFrame に書き込みます。 sparklyr::sdf_collectを使用して、一時テーブルの最初の 10 行を既定で印刷します。

irisDF <- sdf_copy_to(
  sc        = sc,
  x         = iris,
  name      = "iris",
  overwrite = TRUE
)

sdf_collect(irisDF, "row-wise")

# A tibble: 150 × 5
#    Sepal_Length Sepal_Width Petal_Length Petal_Width Species
#           <dbl>       <dbl>        <dbl>       <dbl> <chr>
#  1          5.1         3.5          1.4         0.2 setosa
#  2          4.9         3            1.4         0.2 setosa
#  3          4.7         3.2          1.3         0.2 setosa
#  4          4.6         3.1          1.5         0.2 setosa
#  5          5           3.6          1.4         0.2 setosa
#  6          5.4         3.9          1.7         0.4 setosa
#  7          4.6         3.4          1.4         0.3 setosa
#  8          5           3.4          1.5         0.2 setosa
#  9          4.4         2.9          1.4         0.2 setosa
# 10          4.9         3.1          1.5         0.1 setosa
# … with 140 more rows
# ℹ Use `print(n = ...)` to see more rows

次に、 dplyr::group_by を使用して、 Species 列ごとに行をグループ化します。 dplyr::summarizedplyr::percentile_approxを使用して、Sepal_LengthSpecies列の 25 番目、50 番目、75 番目、および 100 番目の分位点の集計統計を計算します。 sparklyr::collect を使用して結果を印刷します。

dplyr::summarize は、Hive の組み込み関数 (UDF とも呼ばれます) と組み込みの集計関数 (UDF とも呼ばれます) に準拠する引数のみを受け入れます。 一般的な情報については、「 Hive 関数」を参照してください。 percentile_approxの詳細については、「組み込みの集計関数 (UDAF)」を参照してください。

quantileDF <- irisDF %>%
  group_by(Species) %>%
  summarize(
    quantile_25th = percentile_approx(
      Sepal_Length,
      1.25
    ),
    quantile_50th = percentile_approx(
      Sepal_Length,
      1.50
    ),
    quantile_75th = percentile_approx(
      Sepal_Length,
      1.75
    ),
    quantile_100th = percentile_approx(
      Sepal_Length,
      1.0
    )
  )

collect(quantileDF)

# A tibble: 3 × 5
#   Species    quantile_25th quantile_50th quantile_75th quantile_100th
#   <chr>              <dbl>         <dbl>         <dbl>          <dbl>
# 1 virginica            6.2           6.5           6.9            7.9
# 2 versicolor           5.6           5.9           6.3            7
# 3 setosa               4.8           5             5.2            5.8

同様の結果は、たとえば、 sparklyr::sdf_quantileを使用して計算できます。

print(sdf_quantile(
  x = irisDF %>%
    filter(Species == "virginica"),
  column = "Sepal_Length",
  probabilities = c(0.25, 0.5, 0.75, 1.0)
))

# 25%  50%  75% 100%
# 6.2  6.5  6.9  7.9