PySpark ConcatWS

10.22.2021

Intro

The PySpark concat_ws function allows you to concatenate an array field into a single Sting field. This serves as the opposite of the split function. This allows you to perform string operations on a column that was created as an Array column. In this article, we will learn how to use PySpark concat_ws.

Setting Up

The quickest way to get started working with python is to use the following docker compose file. Simple create a docker-compose.yml, paste the following code, then run docker-compose up. You will then see a link in the console to open up and access a jupyter notebook.

version: '3'
services:
  spark:
    image: jupyter/pyspark-notebook
    ports:
      - "8888:8888"
      - "4040-4080:4040-4080"
    volumes:
      - ./notebooks:/home/jovyan/work/notebooks/

Creating the DataSet

Let's start by creating a Spark Session.

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

Now, let's create a dataframe to work with.

rdd = spark.sparkContext.parallelize([    
    (["jan", "2019"], 86000, 30),
    (["jan", "2020"], 71000, 30),
    
    (["feb", "2019"], 99000, 42),
    (["feb", "2020"], 99500, 36),
    
    (["mar", "2019"], 92000, 25),
    (["mar", "2020"], 91000, 50)
])
df = spark.createDataFrame(rdd, schema = ["month year", "total_revenue", "unique_products_sold"])
df.show()
+-----------+-------------+--------------------+
| month year|total_revenue|unique_products_sold|
+-----------+-------------+--------------------+
|[jan, 2019]|        86000|                  30|
|[jan, 2020]|        71000|                  30|
|[feb, 2019]|        99000|                  42|
|[feb, 2020]|        99500|                  36|
|[mar, 2019]|        92000|                  25|
|[mar, 2020]|        91000|                  50|
+-----------+-------------+--------------------+

Using Concat_WS

To use concat_ws we can pass a delimeter and the column we would like to concat. In our first example, we use this concat_ws with the select method to group our month year column.

from pyspark.sql.functions import concat_ws, col

df.select(
    concat_ws(", ", col("month year")).alias('MonthYear')
).show()
+---------+
|MonthYear|
+---------+
|jan, 2019|
|jan, 2020|
|feb, 2019|
|feb, 2020|
|mar, 2019|
|mar, 2020|
+---------+

We can also use the withColumn to return a new DataFrame with the split column.

df.withColumn(
    "MonthYear",
    concat_ws(", ", col("month year"))
).drop('month year').show()
+-------------+--------------------+---------+
|total_revenue|unique_products_sold|MonthYear|
+-------------+--------------------+---------+
|        86000|                  30|jan, 2019|
|        71000|                  30|jan, 2020|
|        99000|                  42|feb, 2019|
|        99500|                  36|feb, 2020|
|        92000|                  25|mar, 2019|
|        91000|                  50|mar, 2020|
+-------------+--------------------+---------+

Another option we have is to use the sql api from PySpark. We first create a temporary table, then we can use the split method in our sql select.

df.createOrReplaceTempView("Sales")
spark.sql("select concat_ws(',', 'month year') as MonthYear from Sales") \
    .show()
+----------+
| MonthYear|
+----------+
|month year|
|month year|
|month year|
|month year|
|month year|
|month year|
+----------+