Computing operations over a window of data, or a subset, is a common task. Often we want to rank information or subsets of data. For example, we may want to see the top sales per each month. In this article, we will learn how to use PySpark Windowing.
The quickest way to get started working with python is to use the following docker compose file. Simple create a docker-compose.yml
, paste the following code, then run docker-compose up
. You will then see a link in the console to open up and access a jupyter notebook.
version: '3'
services:
spark:
image: jupyter/pyspark-notebook
ports:
- "8888:8888"
- "4040-4080:4040-4080"
volumes:
- ./notebooks:/home/jovyan/work/notebooks/
We begin by creating a spark session and importing a few libraries.
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
Next, let's create some fake sales data in a data frame.
from datetime import datetime, date
rdd = spark.sparkContext.parallelize([
("jan", 2019, 86000,56),
("jan", 2019, 96000,56),
("jan", 2019, 76000,56),
("jan", 2020, 81000,30),
("jan", 2020, 71000,30),
("jan", 2021, 90000,24),
("feb", 2019, 99000,40),
("feb", 2019, 89000,40),
("feb", 2020, 83000,36),
("feb", 2021, 79000,53),
("feb", 2021, 69000,53),
("mar", 2019, 80000,25),
("mar", 2019, 84000,25),
("mar", 2020, 91000,50)
])
df = spark.createDataFrame(rdd, schema = ["month", "year", "total_revenue", "unique_products_sold"])
df.show()
+-----+----+-------------+--------------------+
|month|year|total_revenue|unique_products_sold|
+-----+----+-------------+--------------------+
| jan|2019| 86000| 56|
| jan|2019| 96000| 56|
| jan|2019| 76000| 56|
| jan|2020| 81000| 30|
| jan|2020| 71000| 30|
| jan|2021| 90000| 24|
| feb|2019| 99000| 40|
| feb|2019| 89000| 40|
| feb|2020| 83000| 36|
| feb|2021| 79000| 53|
| feb|2021| 69000| 53|
| mar|2019| 80000| 25|
| mar|2019| 84000| 25|
| mar|2020| 91000| 50|
+-----+----+-------------+--------------------+
To create a window, we need to use the Window
class. We can then use this new class to create a new colum in our data frame. Below is an example of ranking all sales in our dataset.
from pyspark.sql.window import Window
from pyspark.sql.functions import col,row_number
win = Window.orderBy(df['total_revenue'].desc())
df = df.withColumn('rank', row_number().over(win).alias('rank'))
df.show()
+-----+----+-------------+--------------------+----+
|month|year|total_revenue|unique_products_sold|rank|
+-----+----+-------------+--------------------+----+
| feb|2019| 99000| 40| 1|
| jan|2019| 96000| 56| 2|
| mar|2020| 91000| 50| 3|
| jan|2021| 90000| 24| 4|
| feb|2019| 89000| 40| 5|
| jan|2019| 86000| 56| 6|
| mar|2019| 84000| 25| 7|
| feb|2020| 83000| 36| 8|
| jan|2020| 81000| 30| 9|
| mar|2019| 80000| 25| 10|
| feb|2021| 79000| 53| 11|
| jan|2019| 76000| 56| 12|
| jan|2020| 71000| 30| 13|
| feb|2021| 69000| 53| 14|
+-----+----+-------------+--------------------+----+
21/09/28 01:10:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
The above example doesn't add much; it is simply a sorted list with index. However, if we want to rank per a month, we can use the partitionBy
. This is also good if you want to find the top products sold or top movies watched per a month.
win = Window.partitionBy("month").orderBy(df['total_revenue'].desc())
df = df.withColumn('rank', row_number().over(win).alias('rank'))
df.show()
+-----+----+-------------+--------------------+----+
|month|year|total_revenue|unique_products_sold|rank|
+-----+----+-------------+--------------------+----+
| feb|2019| 99000| 40| 1|
| feb|2019| 89000| 40| 2|
| feb|2020| 83000| 36| 3|
| feb|2021| 79000| 53| 4|
| feb|2021| 69000| 53| 5|
| mar|2020| 91000| 50| 1|
| mar|2019| 84000| 25| 2|
| mar|2019| 80000| 25| 3|
| jan|2019| 96000| 56| 1|
| jan|2021| 90000| 24| 2|
| jan|2019| 86000| 56| 3|
| jan|2020| 81000| 30| 4|
| jan|2019| 76000| 56| 5|
| jan|2020| 71000| 30| 6|
+-----+----+-------------+--------------------+----+
If we only want to see the top 3, we can then filter by rank.
df.filter(col('rank') < 4).show()
+-----+----+-------------+--------------------+----+
|month|year|total_revenue|unique_products_sold|rank|
+-----+----+-------------+--------------------+----+
| feb|2019| 99000| 40| 1|
| feb|2019| 89000| 40| 2|
| feb|2020| 83000| 36| 3|
| mar|2020| 91000| 50| 1|
| mar|2019| 84000| 25| 2|
| mar|2019| 80000| 25| 3|
| jan|2019| 96000| 56| 1|
| jan|2021| 90000| 24| 2|
| jan|2019| 86000| 56| 3|
+-----+----+-------------+--------------------+----+
More functions can be found here: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#functions
If you search "Window Function:" you can find the functions specifically for windows.
For more partition or subset options, check here: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#window