Often you will have multiple datasets, tables, or dataframes that you would like to combine. For example, you may have customers and their purchases and would like to see these in a single dataframe. Pyspark provides the join method to allow you to merge dataframes. In this article, we will learn how to work with PySpark joins.
The quickest way to get started working with python is to use the following docker compose file. Simple create a docker-compose.yml
, paste the following code, then run docker-compose up
. You will then see a link in the console to open up and access a jupyter notebook.
version: '3'
services:
spark:
image: jupyter/pyspark-notebook
ports:
- "8888:8888"
- "4040-4080:4040-4080"
volumes:
- ./notebooks:/home/jovyan/work/notebooks/
We begin by creating a spark session and importing a few libraries.
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
Next, let's create some fake sales data in a data frame.
from datetime import datetime, date
rdd = spark.sparkContext.parallelize([
("jan", 2019, 86000, 56),
("jan", 2020, 81000, 30),
("jan", 2021, 90000, 24),
("feb", 2019, 99000, 40),
("feb", 2020, 83000, 36),
("feb", 2021, 79000, 53),
("mar", 2019, 80000, 25),
("mar", 2020, 91000, 50)
])
df = spark.createDataFrame(rdd, schema = ["month", "year", "total_revenue", "unique_products_sold"])
df.show()
+-----+----+-------------+--------------------+
|month|year|total_revenue|unique_products_sold|
+-----+----+-------------+--------------------+
| jan|2019| 86000| 56|
| jan|2020| 81000| 30|
| jan|2021| 90000| 24|
| feb|2019| 99000| 40|
| feb|2020| 83000| 36|
| feb|2021| 79000| 53|
| mar|2019| 80000| 25|
| mar|2020| 91000| 50|
+-----+----+-------------+--------------------+
from pyspark.sql.types import *
web_visits_rdd = spark.sparkContext.parallelize([
("jan", 2019, 186000),
("jan", 2020, 181000),
("jan", 2021, 190000),
("feb", 2019, 199000),
("feb", 2020, 183000),
("feb", 2021, 179000),
("mar", 2019, 180000),
("mar", 2020, 191000)
])
web_visits_df = spark.createDataFrame(web_visits_rdd, schema = ["month", "year", "webvisits"])
web_visits_df.show()
+-----+----+----------+
|month|year|webvisits|
+-----+----+----------+
| jan|2019| 186000|
| jan|2020| 181000|
| jan|2021| 190000|
| feb|2019| 199000|
| feb|2020| 183000|
| feb|2021| 179000|
| mar|2019| 180000|
| mar|2020| 191000|
+-----+----+----------+
The basic join can be done using the join
method. You will call this on one data frame and pass two parameters. The first parameter is the dataframe you would like to join with. Then, you pass the column to match on using the on
named parameter.
Here is an example joining the two dataframes above using the month.
inner_join_df = df.join(web_visits_df, on = 'month')
inner_join_df.show()
+-----+----+-------------+--------------------+----+----------+
|month|year|total_revenue|unique_products_sold|year|webvisits|
+-----+----+-------------+--------------------+----+----------+
| feb|2019| 99000| 40|2019| 199000|
| feb|2019| 99000| 40|2020| 183000|
| feb|2019| 99000| 40|2021| 179000|
| feb|2020| 83000| 36|2019| 199000|
| feb|2020| 83000| 36|2020| 183000|
| feb|2020| 83000| 36|2021| 179000|
| feb|2021| 79000| 53|2019| 199000|
| feb|2021| 79000| 53|2020| 183000|
| feb|2021| 79000| 53|2021| 179000|
| mar|2019| 80000| 25|2019| 180000|
| mar|2019| 80000| 25|2020| 191000|
| mar|2020| 91000| 50|2019| 180000|
| mar|2020| 91000| 50|2020| 191000|
| jan|2019| 86000| 56|2019| 186000|
| jan|2019| 86000| 56|2020| 181000|
| jan|2019| 86000| 56|2021| 190000|
| jan|2020| 81000| 30|2019| 186000|
| jan|2020| 81000| 30|2020| 181000|
| jan|2020| 81000| 30|2021| 190000|
| jan|2021| 90000| 24|2019| 186000|
+-----+----+-------------+--------------------+----+----------+
only showing top 20 rows
If the dataframe join column did not match, you can pass a condition with the two dataframes to the on
named parameter.
web_visits_df = web_visits_df.withColumnRenamed('month', 'mon')
inner_join_df = df.join(web_visits_df, on = df.month == web_visits_df.mon)
inner_join_df.show()
+-----+----+-------------+--------------------+---+----+----------+
|month|year|total_revenue|unique_products_sold|mon|year|webvisits|
+-----+----+-------------+--------------------+---+----+----------+
| feb|2019| 99000| 40|feb|2019| 199000|
| feb|2019| 99000| 40|feb|2020| 183000|
| feb|2019| 99000| 40|feb|2021| 179000|
| feb|2020| 83000| 36|feb|2019| 199000|
| feb|2020| 83000| 36|feb|2020| 183000|
| feb|2020| 83000| 36|feb|2021| 179000|
| feb|2021| 79000| 53|feb|2019| 199000|
| feb|2021| 79000| 53|feb|2020| 183000|
| feb|2021| 79000| 53|feb|2021| 179000|
| mar|2019| 80000| 25|mar|2019| 180000|
| mar|2019| 80000| 25|mar|2020| 191000|
| mar|2020| 91000| 50|mar|2019| 180000|
| mar|2020| 91000| 50|mar|2020| 191000|
| jan|2019| 86000| 56|jan|2019| 186000|
| jan|2019| 86000| 56|jan|2020| 181000|
| jan|2019| 86000| 56|jan|2021| 190000|
| jan|2020| 81000| 30|jan|2019| 186000|
| jan|2020| 81000| 30|jan|2020| 181000|
| jan|2020| 81000| 30|jan|2021| 190000|
| jan|2021| 90000| 24|jan|2019| 186000|
+-----+----+-------------+--------------------+---+----+----------+
only showing top 20 rows
We can also specify many types of join using the how
named parameter. This parameter provides and interface for all normal joins you would expect in a sql or pandas environment. Here is an example of conducting an outer
join.
outer_join_df = df.join(web_visits_df, on = 'month', how = "outer")
outer_join_df.show()
+-----+----+-------------+--------------------+----+----------+
|month|year|total_revenue|unique_products_sold|year|webvisits|
+-----+----+-------------+--------------------+----+----------+
| feb|2019| 99000| 40|2019| 199000|
| feb|2019| 99000| 40|2020| 183000|
| feb|2019| 99000| 40|2021| 179000|
| feb|2020| 83000| 36|2019| 199000|
| feb|2020| 83000| 36|2020| 183000|
| feb|2020| 83000| 36|2021| 179000|
| feb|2021| 79000| 53|2019| 199000|
| feb|2021| 79000| 53|2020| 183000|
| feb|2021| 79000| 53|2021| 179000|
| mar|2019| 80000| 25|2019| 180000|
| mar|2019| 80000| 25|2020| 191000|
| mar|2020| 91000| 50|2019| 180000|
| mar|2020| 91000| 50|2020| 191000|
| jan|2019| 86000| 56|2019| 186000|
| jan|2019| 86000| 56|2020| 181000|
| jan|2019| 86000| 56|2021| 190000|
| jan|2020| 81000| 30|2019| 186000|
| jan|2020| 81000| 30|2020| 181000|
| jan|2020| 81000| 30|2021| 190000|
| jan|2021| 90000| 24|2019| 186000|
+-----+----+-------------+--------------------+----+----------+
only showing top 20 rows