PySpark UnionByName

10.12.2021

Intro

When merging two dataframes with union, we sometimes have a different order of columns, or sometimes, we have one dataframe missing columns. In these cases, PySpark provides us with the unionByName method. In this article, we will learn how to use PySpark UnionByName.

Setting Up

The quickest way to get started working with python is to use the following docker compose file. Simple create a docker-compose.yml, paste the following code, then run docker-compose up. You will then see a link in the console to open up and access a jupyter notebook.

version: '3'
services:
  spark:
    image: jupyter/pyspark-notebook
    ports:
      - "8888:8888"
      - "4040-4080:4040-4080"
    volumes:
      - ./notebooks:/home/jovyan/work/notebooks/

Using the PySpark Collect

Let's start by creating a Spark Session.

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

Now, let's create two dataframes. The content is the same, however, the order of the columns are different.

rdd = spark.sparkContext.parallelize([    
    ("jan", 2019, 86000, 56),
    ("jan", 2020, 71000, 30),
    ("jan", 2021, 90000, 24),
    
    ("feb", 2019, 99000, 40),
    ("feb", 2020, 83000, 36),
    ("feb", 2021, 69000, 53),
    
    ("mar", 2019, 80000, 25),
    ("mar", 2020, 91000, 50)
])
df = spark.createDataFrame(rdd, schema = ["month", "year", "total_revenue", "unique_products_sold"])
df.show()
+-----+----+-------------+--------------------+
|month|year|total_revenue|unique_products_sold|
+-----+----+-------------+--------------------+
|  jan|2019|        86000|                  56|
|  jan|2020|        71000|                  30|
|  jan|2021|        90000|                  24|
|  feb|2019|        99000|                  40|
|  feb|2020|        83000|                  36|
|  feb|2021|        69000|                  53|
|  mar|2019|        80000|                  25|
|  mar|2020|        91000|                  50|
+-----+----+-------------+--------------------+
rdd2 = spark.sparkContext.parallelize([    
    ("jan", 2019, 86, 56000),
    ("jan", 2020, 70, 30000),
    ("jan", 2021, 90, 24000),
    
    ("feb", 2019, 99, 40000),
    ("feb", 2020, 83, 36000),
    ("feb", 2021, 69, 53000),
    
    ("mar", 2019, 80, 25000),
    ("mar", 2020, 91, 50000)
])
df2 = spark.createDataFrame(rdd, schema = ["month", "year", "unique_products_sold", "total_revenue"])
df2.show()
+-----+----+--------------------+-------------+
|month|year|unique_products_sold|total_revenue|
+-----+----+--------------------+-------------+
|  jan|2019|               86000|           56|
|  jan|2020|               71000|           30|
|  jan|2021|               90000|           24|
|  feb|2019|               99000|           40|
|  feb|2020|               83000|           36|
|  feb|2021|               69000|           53|
|  mar|2019|               80000|           25|
|  mar|2020|               91000|           50|
+-----+----+--------------------+-------------+

Using UnionByName

Unlike union, which matches by position, unionByName will match based on the column name. So, even though both of our dataframes don't have matching column positions, we can still union them.

df_new = df.unionByName(df2)
df_new.show()
+-----+----+-------------+--------------------+
|month|year|total_revenue|unique_products_sold|
+-----+----+-------------+--------------------+
|  jan|2019|        86000|                  56|
|  jan|2020|        71000|                  30|
|  jan|2021|        90000|                  24|
|  feb|2019|        99000|                  40|
|  feb|2020|        83000|                  36|
|  feb|2021|        69000|                  53|
|  mar|2019|        80000|                  25|
|  mar|2020|        91000|                  50|
|  jan|2019|           56|               86000|
|  jan|2020|           30|               71000|
|  jan|2021|           24|               90000|
|  feb|2019|           40|               99000|
|  feb|2020|           36|               83000|
|  feb|2021|           53|               69000|
|  mar|2019|           25|               80000|
|  mar|2020|           50|               91000|
+-----+----+-------------+--------------------+

UnionByName with Missing Columns

The second use case for unionByName is when one of our dataframes has missing columns. Let's drop a column from our second dataframe and add the allowMissingColumns property to our unionByName call.

df2_dropped = df2.drop('unique_products_sold')
df2_dropped.show()
+-----+----+-------------+
|month|year|total_revenue|
+-----+----+-------------+
|  jan|2019|           56|
|  jan|2020|           30|
|  jan|2021|           24|
|  feb|2019|           40|
|  feb|2020|           36|
|  feb|2021|           53|
|  mar|2019|           25|
|  mar|2020|           50|
+-----+----+-------------+
df_new = df.unionByName(df2_dropped, allowMissingColumns = True)
df_new.show()
+-----+----+-------------+--------------------+
|month|year|total_revenue|unique_products_sold|
+-----+----+-------------+--------------------+
|  jan|2019|        86000|                  56|
|  jan|2020|        71000|                  30|
|  jan|2021|        90000|                  24|
|  feb|2019|        99000|                  40|
|  feb|2020|        83000|                  36|
|  feb|2021|        69000|                  53|
|  mar|2019|        80000|                  25|
|  mar|2020|        91000|                  50|
|  jan|2019|           56|                null|
|  jan|2020|           30|                null|
|  jan|2021|           24|                null|
|  feb|2019|           40|                null|
|  feb|2020|           36|                null|
|  feb|2021|           53|                null|
|  mar|2019|           25|                null|
|  mar|2020|           50|                null|
+-----+----+-------------+--------------------+

Above you can see that both dataframes were merged and missing values were filled for the second dataframe that was missing a column.