PySpark Drop OrderBy

10.08.2021

Intro

When working with data and viewing, we often want to sort or order our data for easier review. PySpark provides the sort and orderby functions to sort dataframes. In this article, we will learn how to use PySpark OrderBy and Sort.

Setting Up

The quickest way to get started working with python is to use the following docker compose file. Simple create a docker-compose.yml, paste the following code, then run docker-compose up. You will then see a link in the console to open up and access a jupyter notebook.

version: '3'
services:
  spark:
    image: jupyter/pyspark-notebook
    ports:
      - "8888:8888"
      - "4040-4080:4040-4080"
    volumes:
      - ./notebooks:/home/jovyan/work/notebooks/

Using the PySpark Collect

Let's start by creating a Spark Session.

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/usr/local/spark-3.1.2-bin-hadoop3.2/jars/spark-unsafe_2.12-3.1.2.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
21/10/08 16:47:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

Now, let's create a dataframe to work with.

rdd = spark.sparkContext.parallelize([    
    ("jan", 2019, 86000,56),
    ("jan", 2020, 71000,30),
    ("jan", 2021, 90000,24),
    
    ("feb", 2019, 99000,40),
    ("feb", 2020, 83000,36),
    ("feb", 2021, 69000,53),
    
    ("mar", 2019, 80000,25),
    ("mar", 2020, 91000,50)
])
df = spark.createDataFrame(rdd, schema = ["month", "year", "total_revenue", "unique_products_sold"])
df.show()
+-----+----+-------------+--------------------+
|month|year|total_revenue|unique_products_sold|
+-----+----+-------------+--------------------+
|  jan|2019|        86000|                  56|
|  jan|2020|        71000|                  30|
|  jan|2021|        90000|                  24|
|  feb|2019|        99000|                  40|
|  feb|2020|        83000|                  36|
|  feb|2021|        69000|                  53|
|  mar|2019|        80000|                  25|
|  mar|2020|        91000|                  50|
+-----+----+-------------+--------------------+

Sorting By Columns

The basic use case is the use the sort method on a dataframe and pass in the names of the columns to sort by. If you pass multiple columns, PySpark will sort the latter columns inside the sorting of the first.

df.sort("year").show()
+-----+----+-------------+--------------------+
|month|year|total_revenue|unique_products_sold|
+-----+----+-------------+--------------------+
|  feb|2019|        99000|                  40|
|  jan|2019|        86000|                  56|
|  mar|2019|        80000|                  25|
|  feb|2020|        83000|                  36|
|  mar|2020|        91000|                  50|
|  jan|2020|        71000|                  30|
|  feb|2021|        69000|                  53|
|  jan|2021|        90000|                  24|
+-----+----+-------------+--------------------+
df.sort("month", "year").show()
+-----+----+-------------+--------------------+
|month|year|total_revenue|unique_products_sold|
+-----+----+-------------+--------------------+
|  feb|2019|        99000|                  40|
|  feb|2020|        83000|                  36|
|  feb|2021|        69000|                  53|
|  jan|2019|        86000|                  56|
|  jan|2020|        71000|                  30|
|  jan|2021|        90000|                  24|
|  mar|2019|        80000|                  25|
|  mar|2020|        91000|                  50|
+-----+----+-------------+--------------------+

We can also use the orderBy method to accomplish the above.

df.orderBy("month", "year").show()
+-----+----+-------------+--------------------+
|month|year|total_revenue|unique_products_sold|
+-----+----+-------------+--------------------+
|  feb|2019|        99000|                  40|
|  feb|2020|        83000|                  36|
|  feb|2021|        69000|                  53|
|  jan|2019|        86000|                  56|
|  jan|2020|        71000|                  30|
|  jan|2021|        90000|                  24|
|  mar|2019|        80000|                  25|
|  mar|2020|        91000|                  50|
+-----+----+-------------+--------------------+

Sorting By Ascending and Descending

To change the direction of the sort, we can use asc and desc methods. These are column methods, so we need to column them on the individual columns. By default the columns will sort asc.

df.sort(df.month.desc(), df.year.asc()).show()
+-----+----+-------------+--------------------+
|month|year|total_revenue|unique_products_sold|
+-----+----+-------------+--------------------+
|  mar|2019|        80000|                  25|
|  mar|2020|        91000|                  50|
|  jan|2019|        86000|                  56|
|  jan|2020|        71000|                  30|
|  jan|2021|        90000|                  24|
|  feb|2019|        99000|                  40|
|  feb|2020|        83000|                  36|
|  feb|2021|        69000|                  53|
+-----+----+-------------+--------------------+

We can also use pyspark sql functions instead of the dataframe column notation.

from pyspark.sql.functions import col, asc, desc

df.sort(col('month').desc(), col('year').asc()).show()
+-----+----+-------------+--------------------+
|month|year|total_revenue|unique_products_sold|
+-----+----+-------------+--------------------+
|  mar|2019|        80000|                  25|
|  mar|2020|        91000|                  50|
|  jan|2019|        86000|                  56|
|  jan|2020|        71000|                  30|
|  jan|2021|        90000|                  24|
|  feb|2019|        99000|                  40|
|  feb|2020|        83000|                  36|
|  feb|2021|        69000|                  53|
+-----+----+-------------+--------------------+

Also, note that we can do the same with the orderBy method.

df.orderBy(col('month').desc(), col('year').asc()).show()
+-----+----+-------------+--------------------+
|month|year|total_revenue|unique_products_sold|
+-----+----+-------------+--------------------+
|  mar|2019|        80000|                  25|
|  mar|2020|        91000|                  50|
|  jan|2019|        86000|                  56|
|  jan|2020|        71000|                  30|
|  jan|2021|        90000|                  24|
|  feb|2019|        99000|                  40|
|  feb|2020|        83000|                  36|
|  feb|2021|        69000|                  53|
+-----+----+-------------+--------------------+

Sorting with SQL

Our final option to sort is to use SQL instead of the methods. We first create a temporary view then query it with the order by sql language.

df.createOrReplaceTempView("Sales")

spark.sql("select * from Sales ORDER BY month asc").show()
+-----+----+-------------+--------------------+
|month|year|total_revenue|unique_products_sold|
+-----+----+-------------+--------------------+
|  feb|2021|        69000|                  53|
|  feb|2019|        99000|                  40|
|  feb|2020|        83000|                  36|
|  jan|2019|        86000|                  56|
|  jan|2020|        71000|                  30|
|  jan|2021|        90000|                  24|
|  mar|2020|        91000|                  50|
|  mar|2019|        80000|                  25|
+-----+----+-------------+--------------------+