[2]:
import warnings
warnings.filterwarnings('ignore')
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("error")
Chapter 6: Old SQL, New Tricks - Running SQL on PySpark#
Introduction#
This section explains how to use the Spark SQL API in PySpark and compare it with the DataFrame API. It also covers how to switch between the two APIs seamlessly, along with some practical tips and tricks.
Running SQL with PySpark#
PySpark offers two main ways to perform SQL operations:
Using spark.sql()
#
The spark.sql()
function allows you to execute SQL queries directly.
[10]:
# Create a table via spark.sql()
spark.sql("DROP TABLE IF EXISTS people")
spark.sql("""
CREATE TABLE people USING PARQUET
AS SELECT * FROM VALUES (1, 'Alice', 10), (2, 'Bob', 20), (3, 'Charlie', 30) t(id, name, age)
""")
[10]:
DataFrame[]
[11]:
# Use spark.sql() to select data from a table
spark.sql("SELECT name, age FROM people WHERE age > 21").show()
+-------+---+
| name|age|
+-------+---+
|Charlie| 30|
+-------+---+
Using the PySpark DataFrame API#
The PySpark DataFrame API provides equivalent functionality to SQL but with a Pythonic approach.
[12]:
# Read a table using the DataFrame API
people_df = spark.read.table("people")
# Use DataFrame API to select data
people_df.select("name", "age").filter("age > 21").show()
+-------+---+
| name|age|
+-------+---+
|Charlie| 30|
+-------+---+
SQL vs. DataFrame API in PySpark#
When to use which API depends on your background and the specific task:
SQL API: - Ideal for users with SQL backgrounds who are more comfortable writing SQL queries.
DataFrame API: - Preferred by Python developers as it aligns with Python syntax and idioms. - Provides greater flexibility for complex transformations, especially with user-defined functions (UDFs).
Code Examples: SQL vs. DataFrame API#
Here are some examples comparing how common tasks are performed using the SQL API and PySpark’s DataFrame API to give you an idea of their differences and when one might be more suitable than the other.
Example: SELECT and FILTER Operation#
SQL API:
[15]:
spark.sql("SELECT name FROM people WHERE age > 21").show()
+-------+
| name|
+-------+
|Charlie|
+-------+
DataFrame API:
[16]:
spark.read.table("people").select("name").filter("age > 21").show()
+-------+
| name|
+-------+
|Charlie|
+-------+
Example: JOIN Operation#
[18]:
spark.sql("DROP TABLE IF EXISTS orders")
spark.sql("""
CREATE TABLE orders USING PARQUET
AS SELECT * FROM VALUES (101, 1, 200), (102, 2, 150), (103,3, 300) t(order_id, customer_id, amount)
""")
[18]:
DataFrame[]
SQL API:
[19]:
spark.sql("""
SELECT p.name, o.order_id
FROM people p
JOIN orders o ON p.id = o.customer_id
""").show()
+-------+--------+
| name|order_id|
+-------+--------+
|Charlie| 103|
| Alice| 101|
| Bob| 102|
+-------+--------+
DataFrame API:
[20]:
people_df = spark.read.table("people")
orders_df = spark.read.table("orders")
(
people_df
.join(orders_df, people_df.id == orders_df.customer_id)
.select(people_df.name, orders_df.order_id)
.show()
)
+-------+--------+
| name|order_id|
+-------+--------+
|Charlie| 103|
| Alice| 101|
| Bob| 102|
+-------+--------+
Example: GROUP BY and Aggregate Operation#
SQL API:
[21]:
spark.sql("""
SELECT p.name, SUM(o.amount) AS total_amount
FROM people p
JOIN orders o ON p.id = o.customer_id
GROUP BY p.name
""").show()
+-------+------------+
| name|total_amount|
+-------+------------+
|Charlie| 300|
| Alice| 200|
| Bob| 150|
+-------+------------+
DataFrame API:
[22]:
from pyspark.sql.functions import sum
(
people_df
.join(orders_df, people_df.id == orders_df.customer_id)
.groupBy("name")
.agg(sum("amount").alias("total_amount"))
.show()
)
+-------+------------+
| name|total_amount|
+-------+------------+
|Charlie| 300|
| Alice| 200|
| Bob| 150|
+-------+------------+
Example: Window Operations#
SQL API:
[23]:
spark.sql("""
SELECT
p.name,
o.amount,
RANK() OVER (PARTITION BY p.name ORDER BY o.amount DESC) AS rank
FROM people p
JOIN orders o ON p.id = o.customer_id
""").show()
+-------+------+----+
| name|amount|rank|
+-------+------+----+
| Alice| 200| 1|
| Bob| 150| 1|
|Charlie| 300| 1|
+-------+------+----+
DataFrame API:
[24]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank
# Define the window specification
window_spec = Window.partitionBy("name").orderBy(orders_df.amount.desc())
# Window operation with RANK
(
people_df
.join(orders_df, people_df.id == orders_df.customer_id)
.withColumn("rank", rank().over(window_spec))
.select("name", "amount", "rank")
.show()
)
+-------+------+----+
| name|amount|rank|
+-------+------+----+
| Alice| 200| 1|
| Bob| 150| 1|
|Charlie| 300| 1|
+-------+------+----+
Example: UNION Operation#
SQL API: - The UNION
operator combines rows from two queries and removes duplicates by default.
[25]:
spark.sql("CREATE OR REPLACE TEMP VIEW people2 AS SELECT * FROM VALUES (1, 'Alice', 10), (4, 'David', 35) t(id, name, age)")
[25]:
DataFrame[]
[26]:
spark.sql("""
SELECT * FROM people
UNION
SELECT * FROM people2
""").show()
+---+-------+---+
| id| name|age|
+---+-------+---+
| 3|Charlie| 30|
| 1| Alice| 10|
| 2| Bob| 20|
| 4| David| 35|
+---+-------+---+
DataFrame API: - The union()
method is used to combine two DataFrames, but it does not remove duplicates by default. - To match the behavior of SQL’s UNION, we use the .dropDuplicates() method to eliminate duplicates after the union operation.
[27]:
people_df = spark.read.table("people")
people2_df = spark.read.table("people2")
# This will have duplicate values.
people_df.union(people2_df).show()
+---+-------+---+
| id| name|age|
+---+-------+---+
| 3|Charlie| 30|
| 1| Alice| 10|
| 2| Bob| 20|
| 1| Alice| 10|
| 4| David| 35|
+---+-------+---+
[28]:
# Remove duplicate values
people_df.union(people2_df).dropDuplicates().show()
+---+-------+---+
| id| name|age|
+---+-------+---+
| 3|Charlie| 30|
| 1| Alice| 10|
| 2| Bob| 20|
| 4| David| 35|
+---+-------+---+
Example: SET Configurations#
SQL API:
[29]:
spark.sql("SET spark.sql.shuffle.partitions=8")
[29]:
DataFrame[key: string, value: string]
[30]:
spark.sql("SET spark.sql.shuffle.partitions").show(truncate=False)
+----------------------------+-----+
|key |value|
+----------------------------+-----+
|spark.sql.shuffle.partitions|8 |
+----------------------------+-----+
DataFrame API:
[31]:
spark.conf.set("spark.sql.shuffle.partitions", 10)
[32]:
spark.conf.get("spark.sql.shuffle.partitions")
[32]:
'10'
Example: Listing Tables and Views#
SQL API:
[33]:
spark.sql("SHOW TABLES").show()
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
| default| orders| false|
| default| people| false|
| | people2| true|
+---------+---------+-----------+
DataFrame API:
[34]:
tables = spark.catalog.listTables()
for table in tables:
print(f"Name: {table.name}, isTemporary: {table.isTemporary}")
Name: orders, isTemporary: False
Name: people, isTemporary: False
Name: people2, isTemporary: True
DataFrame API Exclusive Functions#
Certain operations are exclusive to the DataFrame API and are not supported in SQL, such as:
withColumn: Adds or modifies columns in a DataFrame.
[35]:
people_df.withColumn("new_col", people_df["age"] + 10).show()
+---+-------+---+-------+
| id| name|age|new_col|
+---+-------+---+-------+
| 3|Charlie| 30| 40|
| 1| Alice| 10| 20|
| 2| Bob| 20| 30|
+---+-------+---+-------+
[39]:
people_df.withColumn("age", people_df["age"] + 10).show()
+---+-------+---+
| id| name|age|
+---+-------+---+
| 3|Charlie| 40|
| 1| Alice| 20|
| 2| Bob| 30|
+---+-------+---+
Using SQL and DataFrame API Interchangeably#
PySpark supports switching between SQL and DataFrame API, making it easy to mix and match.
Chaining DataFrame Operations on SQL Outputs#
PySpark’s DataFrame API allows you to chain multiple operations together to create efficient and readable transformations.
[36]:
# Chaining DataFrame operations on SQL results
spark.sql("SELECT name, age FROM people").filter("age > 21").show()
+-------+---+
| name|age|
+-------+---+
|Charlie| 30|
+-------+---+
Using selectExpr()
#
The selectExpr()
method allows you to run SQL expressions within the DataFrame API.
[37]:
people_df.selectExpr("name", "age + 1 AS age_plus_one").show()
+-------+------------+
| name|age_plus_one|
+-------+------------+
|Charlie| 31|
| Alice| 11|
| Bob| 21|
+-------+------------+
Querying a DataFrame in SQL#
You can create a temporary view from a DataFrame and run SQL queries on it.
[38]:
# First create a temp view on top of the DataFrame.
people_df.createOrReplaceTempView("people_view")
# Then it can be referenced in SQL.
spark.sql("SELECT * FROM people_view WHERE age > 21").show()
+---+-------+---+
| id| name|age|
+---+-------+---+
| 3|Charlie| 30|
+---+-------+---+
Use Python User-Defined Functions in SQL#
You can register Python user-defined functions (UDFs) for use within SQL queries, enabling custom transformations within SQL syntax.
[41]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
# Define the UDF
@udf("string")
def uppercase_name(name):
return name.upper()
# Register the UDF
spark.udf.register("uppercase_name", uppercase_name)
# Use it in SQL
spark.sql("SELECT name, uppercase_name(name) FROM people_view WHERE age > 21").show()
+-------+--------------------+
| name|uppercase_name(name)|
+-------+--------------------+
|Charlie| CHARLIE|
+-------+--------------------+
[ ]: