PySpark DataFrame Join

09.25.2021

Intro

Often you will have multiple datasets, tables, or dataframes that you would like to combine. For example, you may have customers and their purchases and would like to see these in a single dataframe. Pyspark provides the join method to allow you to merge dataframes. In this article, we will learn how to work with PySpark joins.

Setting Up

The quickest way to get started working with python is to use the following docker compose file. Simple create a docker-compose.yml, paste the following code, then run docker-compose up. You will then see a link in the console to open up and access a jupyter notebook.

version: '3'
services:
  spark:
    image: jupyter/pyspark-notebook
    ports:
      - "8888:8888"
      - "4040-4080:4040-4080"
    volumes:
      - ./notebooks:/home/jovyan/work/notebooks/

Creating a PySpark Data Frame

We begin by creating a spark session and importing a few libraries.

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

Next, let's create some fake sales data in a data frame.

from datetime import datetime, date

rdd = spark.sparkContext.parallelize([    
    ("jan", 2019, 86000, 56),
    ("jan", 2020, 81000, 30),
    ("jan", 2021, 90000, 24),
    ("feb", 2019, 99000, 40),
    ("feb", 2020, 83000, 36),
    ("feb", 2021, 79000, 53),
    ("mar", 2019, 80000, 25),
    ("mar", 2020, 91000, 50)
])
df = spark.createDataFrame(rdd, schema = ["month", "year", "total_revenue", "unique_products_sold"])
df.show()
+-----+----+-------------+--------------------+
|month|year|total_revenue|unique_products_sold|
+-----+----+-------------+--------------------+
|  jan|2019|        86000|                  56|
|  jan|2020|        81000|                  30|
|  jan|2021|        90000|                  24|
|  feb|2019|        99000|                  40|
|  feb|2020|        83000|                  36|
|  feb|2021|        79000|                  53|
|  mar|2019|        80000|                  25|
|  mar|2020|        91000|                  50|
+-----+----+-------------+--------------------+
from pyspark.sql.types import *

web_visits_rdd = spark.sparkContext.parallelize([    
    ("jan", 2019, 186000),
    ("jan", 2020, 181000),
    ("jan", 2021, 190000),
    ("feb", 2019, 199000),
    ("feb", 2020, 183000),
    ("feb", 2021, 179000),
    ("mar", 2019, 180000),
    ("mar", 2020, 191000)
])

web_visits_df = spark.createDataFrame(web_visits_rdd, schema = ["month", "year", "webvisits"])
web_visits_df.show()
+-----+----+----------+
|month|year|webvisits|
+-----+----+----------+
|  jan|2019|    186000|
|  jan|2020|    181000|
|  jan|2021|    190000|
|  feb|2019|    199000|
|  feb|2020|    183000|
|  feb|2021|    179000|
|  mar|2019|    180000|
|  mar|2020|    191000|
+-----+----+----------+

Joining Data

The basic join can be done using the join method. You will call this on one data frame and pass two parameters. The first parameter is the dataframe you would like to join with. Then, you pass the column to match on using the on named parameter.

Here is an example joining the two dataframes above using the month.

inner_join_df = df.join(web_visits_df, on = 'month')
inner_join_df.show()
+-----+----+-------------+--------------------+----+----------+
|month|year|total_revenue|unique_products_sold|year|webvisits|
+-----+----+-------------+--------------------+----+----------+
|  feb|2019|        99000|                  40|2019|    199000|
|  feb|2019|        99000|                  40|2020|    183000|
|  feb|2019|        99000|                  40|2021|    179000|
|  feb|2020|        83000|                  36|2019|    199000|
|  feb|2020|        83000|                  36|2020|    183000|
|  feb|2020|        83000|                  36|2021|    179000|
|  feb|2021|        79000|                  53|2019|    199000|
|  feb|2021|        79000|                  53|2020|    183000|
|  feb|2021|        79000|                  53|2021|    179000|
|  mar|2019|        80000|                  25|2019|    180000|
|  mar|2019|        80000|                  25|2020|    191000|
|  mar|2020|        91000|                  50|2019|    180000|
|  mar|2020|        91000|                  50|2020|    191000|
|  jan|2019|        86000|                  56|2019|    186000|
|  jan|2019|        86000|                  56|2020|    181000|
|  jan|2019|        86000|                  56|2021|    190000|
|  jan|2020|        81000|                  30|2019|    186000|
|  jan|2020|        81000|                  30|2020|    181000|
|  jan|2020|        81000|                  30|2021|    190000|
|  jan|2021|        90000|                  24|2019|    186000|
+-----+----+-------------+--------------------+----+----------+
only showing top 20 rows

If the dataframe join column did not match, you can pass a condition with the two dataframes to the on named parameter.

web_visits_df = web_visits_df.withColumnRenamed('month', 'mon')

inner_join_df = df.join(web_visits_df, on = df.month == web_visits_df.mon)
inner_join_df.show()
+-----+----+-------------+--------------------+---+----+----------+
|month|year|total_revenue|unique_products_sold|mon|year|webvisits|
+-----+----+-------------+--------------------+---+----+----------+
|  feb|2019|        99000|                  40|feb|2019|    199000|
|  feb|2019|        99000|                  40|feb|2020|    183000|
|  feb|2019|        99000|                  40|feb|2021|    179000|
|  feb|2020|        83000|                  36|feb|2019|    199000|
|  feb|2020|        83000|                  36|feb|2020|    183000|
|  feb|2020|        83000|                  36|feb|2021|    179000|
|  feb|2021|        79000|                  53|feb|2019|    199000|
|  feb|2021|        79000|                  53|feb|2020|    183000|
|  feb|2021|        79000|                  53|feb|2021|    179000|
|  mar|2019|        80000|                  25|mar|2019|    180000|
|  mar|2019|        80000|                  25|mar|2020|    191000|
|  mar|2020|        91000|                  50|mar|2019|    180000|
|  mar|2020|        91000|                  50|mar|2020|    191000|
|  jan|2019|        86000|                  56|jan|2019|    186000|
|  jan|2019|        86000|                  56|jan|2020|    181000|
|  jan|2019|        86000|                  56|jan|2021|    190000|
|  jan|2020|        81000|                  30|jan|2019|    186000|
|  jan|2020|        81000|                  30|jan|2020|    181000|
|  jan|2020|        81000|                  30|jan|2021|    190000|
|  jan|2021|        90000|                  24|jan|2019|    186000|
+-----+----+-------------+--------------------+---+----+----------+
only showing top 20 rows

Specifying the Type of Join

We can also specify many types of join using the how named parameter. This parameter provides and interface for all normal joins you would expect in a sql or pandas environment. Here is an example of conducting an outer join.

outer_join_df = df.join(web_visits_df, on = 'month', how = "outer")
outer_join_df.show()
+-----+----+-------------+--------------------+----+----------+
|month|year|total_revenue|unique_products_sold|year|webvisits|
+-----+----+-------------+--------------------+----+----------+
|  feb|2019|        99000|                  40|2019|    199000|
|  feb|2019|        99000|                  40|2020|    183000|
|  feb|2019|        99000|                  40|2021|    179000|
|  feb|2020|        83000|                  36|2019|    199000|
|  feb|2020|        83000|                  36|2020|    183000|
|  feb|2020|        83000|                  36|2021|    179000|
|  feb|2021|        79000|                  53|2019|    199000|
|  feb|2021|        79000|                  53|2020|    183000|
|  feb|2021|        79000|                  53|2021|    179000|
|  mar|2019|        80000|                  25|2019|    180000|
|  mar|2019|        80000|                  25|2020|    191000|
|  mar|2020|        91000|                  50|2019|    180000|
|  mar|2020|        91000|                  50|2020|    191000|
|  jan|2019|        86000|                  56|2019|    186000|
|  jan|2019|        86000|                  56|2020|    181000|
|  jan|2019|        86000|                  56|2021|    190000|
|  jan|2020|        81000|                  30|2019|    186000|
|  jan|2020|        81000|                  30|2020|    181000|
|  jan|2020|        81000|                  30|2021|    190000|
|  jan|2021|        90000|                  24|2019|    186000|
+-----+----+-------------+--------------------+----+----------+
only showing top 20 rows