この記事では、GraphFrames ユーザー ガイドの例を示します。
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._
GraphFrame の作成
頂点とエッジのデータフレームから GraphFrame を作成できます。
- 頂点データフレーム: 頂点データフレームには、グラフ内の各頂点の一意の ID を指定する
id
という名前の特殊な列が含まれている必要があります。 - エッジ データフレーム: エッジ DataFrame には、
src
(エッジのソース頂点 ID) とdst
(エッジの宛先頂点 ID) の 2 つの特殊な列が含まれている必要があります。
どちらの DataFrame にも、任意の他の列を含めることができます。 これらの列は、頂点属性とエッジ属性を表すことができます。
頂点とエッジを作成する
// Vertex DataFrame
val v = spark.createDataFrame(List(
("a", "Alice", 34),
("b", "Bob", 36),
("c", "Charlie", 30),
("d", "David", 29),
("e", "Esther", 32),
("f", "Fanny", 36),
("g", "Gabby", 60)
)).toDF("id", "name", "age")
// Edge DataFrame
val e = spark.createDataFrame(List(
("a", "b", "friend"),
("b", "c", "follow"),
("c", "b", "follow"),
("f", "c", "follow"),
("e", "f", "follow"),
("e", "d", "friend"),
("d", "a", "friend"),
("a", "e", "friend")
)).toDF("src", "dst", "relationship")
次の頂点とエッジからグラフを作成しましょう。
val g = GraphFrame(v, e)
// This example graph also comes with the GraphFrames package.
// val g = examples.Graphs.friends
基本的なグラフと DataFrame クエリ
GraphFrame は、ノードの次数などの単純なグラフ クエリを提供します。
また、GraphFrame は頂点とエッジの DataFrame のペアとしてグラフを表すので、頂点とエッジの DataFrame に対して強力なクエリを直接実行するのは簡単です。 これらのデータフレームは、GraphFrame の頂点およびエッジ フィールドとして使用できます。
display(g.vertices)
display(g.edges)
頂点の入次数:
display(g.inDegrees)
頂点の出次数:
display(g.outDegrees)
頂点の次数:
display(g.degrees)
頂点 DataFrame に対してクエリを直接実行できます。 たとえば、グラフ内で最年少の年齢を見つけることができます。
val youngest = g.vertices.groupBy().min("age")
display(youngest)
同様に、エッジ DataFrame に対してクエリを実行できます。 たとえば、グラフ内の "フォロー" リレーションシップの数をカウントします。
val numFollows = g.edges.filter("relationship = 'follow'").count()
モチーフ検索
モチーフを使用して、エッジと頂点を含むより複雑なリレーションシップを構築します。 次のセルは、両方向のエッジにある頂点のペアを検索します。 結果は DataFrame で、列名はモチーフ キーになります。
API の詳細については、GraphFrame ユーザー ガイドの を参照してください。
// Search for pairs of vertices with edges in both directions between them.
val motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
display(motifs)
結果は DataFrame であるため、モチーフの上にさらに複雑なクエリを作成できます。 1 人が 30 歳以上であるすべての相互関係を見つけましょう。
val filtered = motifs.filter("b.age > 30")
display(filtered)
ステートフル クエリ
上記の例のように、ほとんどのモチーフ クエリはステートレスで簡単に表現できます。 次の例では、モチーフ内のパスに沿って状態を伝達する、より複雑なクエリを示します。 これらのクエリを表現するには、GraphFrame のモチーフ検索と結果のフィルターを組み合わせて、フィルターでシーケンス操作を使用して一連の DataFrame 列を構築します。
たとえば、一連の関数によって定義されたプロパティを持つ 4 つの頂点のチェーンを識別するとします。 つまり、a->b->c->d
4 つの頂点のチェーンの中で、この複雑なフィルターに一致するチェーンのサブセットを特定します。
- パスの状態を初期化します。
- 頂点 a に基づいて状態を更新します。
- 頂点 b に基づいて状態を更新します。
- c と d の場合など。
- 最終的な状態が何らかの条件と一致する場合、フィルターはチェーンを受け入れます。
次のコード スニペットは、このプロセスを示しています。このプロセスでは、3 つのエッジのうち少なくとも 2 つが "フレンド" リレーションシップになるように、4 つの頂点のチェーンを識別します。 この例では、状態は "friend" エッジの現在の数です。一般に、任意の DataFrame 列を指定できます。
// Find chains of 4 vertices.
val chain4 = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")
// Query on sequence, with state (cnt)
// (a) Define method for updating state given the next element of the motif.
def sumFriends(cnt: Column, relationship: Column): Column = {
when(relationship === "friend", cnt + 1).otherwise(cnt)
}
// (b) Use sequence operation to apply method to sequence of elements in motif.
// In this case, the elements are the 3 edges.
val condition = Seq("ab", "bc", "cd").
foldLeft(lit(0))((cnt, e) => sumFriends(cnt, col(e)("relationship")))
// (c) Apply filter to DataFrame.
val chainWith2Friends2 = chain4.where(condition >= 2)
display(chainWith2Friends2)
サブグラフ
GraphFrames には、エッジと頂点をフィルター処理してサブグラフを構築するための API が用意されています。 これらのフィルターは一緒に構成できます。 たとえば、次のサブグラフには、友人であり、30 歳以上のユーザーのみが含まれています。
// Select subgraph of users older than 30, and edges of type "friend"
val g2 = g
.filterEdges("relationship = 'friend'")
.filterVertices("age > 30")
.dropIsolatedVertices()
複雑なトリプレット フィルター
次の例は、エッジとその "src" 頂点と "dst" 頂点で動作するトリプレット フィルターに基づいてサブグラフを選択する方法を示しています。 より複雑なモチーフを使用してトリプレットを超えるこの例を拡張するのは簡単です。
// Select subgraph based on edges "e" of type "follow"
// pointing from a younger user "a" to an older user "b".
val paths = g.find("(a)-[e]->(b)")
.filter("e.relationship = 'follow'")
.filter("a.age < b.age")
// "paths" contains vertex info. Extract the edges.
val e2 = paths.select("e.src", "e.dst", "e.relationship")
// In Spark 1.5+, the user may simplify this call:
// val e2 = paths.select("e.*")
// Construct the subgraph
val g2 = GraphFrame(g.vertices, e2)
display(g2.vertices)
display(g2.edges)
標準グラフ アルゴリズム
このセクションでは、GraphFrames に組み込まれている標準的なグラフ アルゴリズムについて説明します。
幅優先検索 (BFS)
"Esther" から 32歳 のユーザー < を検索する。
val paths: DataFrame = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32").run()
display(paths)
検索では、エッジ フィルターとパスの最大長が制限される場合もあります。
val filteredPaths = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32")
.edgeFilter("relationship != 'friend'")
.maxPathLength(3)
.run()
display(filteredPaths)
接続されているコンポーネント
各頂点の接続されたコンポーネント メンバーシップを計算し、各頂点にコンポーネント ID が割り当てられたグラフを返します。
val result = g.connectedComponents.run() // doesn't work on Spark 1.4
display(result)
強く接続されたコンポーネント
各頂点の厳密に接続されたコンポーネント (SCC) を計算し、その頂点を含む SCC に各頂点が割り当てられたグラフを返します。
val result = g.stronglyConnectedComponents.maxIter(10).run()
display(result.orderBy("component"))
ラベルの伝達
ネットワーク内のコミュニティを検出するための静的ラベル伝達アルゴリズムを実行します。
ネットワーク内の各ノードは、最初は独自のコミュニティに割り当てられます。 すべてのスーパーステップで、ノードはすべての近隣ノードにコミュニティ所属を送信し、受信メッセージのモード コミュニティ所属に状態を更新します。
LPA は、グラフの標準的なコミュニティ検出アルゴリズムです。 計算上は安価ですが、(1) 収束は保証されず、(2) 単純な解決策が得られます (すべてのノードが 1 つのコミュニティに識別されます)。
val result = g.labelPropagation.maxIter(5).run()
display(result.orderBy("label"))
PageRank
接続に基づいてグラフ内の重要な頂点を識別します。
// Run PageRank until convergence to tolerance "tol".
val results = g.pageRank.resetProbability(0.15).tol(0.01).run()
display(results.vertices)
display(results.edges)
// Run PageRank for a fixed number of iterations.
val results2 = g.pageRank.resetProbability(0.15).maxIter(10).run()
display(results2.vertices)
// Run PageRank personalized for vertex "a"
val results3 = g.pageRank.resetProbability(0.15).maxIter(10).sourceId("a").run()
display(results3.vertices)
最短パス
指定されたランドマーク頂点のセットに対して、頂点 ID によって指定されたランドマークへの最短経路を計算します。
val paths = g.shortestPaths.landmarks(Seq("a", "d")).run()
display(paths)
三角形のカウント
各頂点を通過する三角形の数を計算します。
import org.graphframes.examples
val g: GraphFrame = examples.Graphs.friends // get example graph
val results = g.triangleCount.run()
results.select("id", "count").show()