The PySpark concat_ws
function allows you to concatenate an array field into a single Sting field. This serves as the opposite of the split
function. This allows you to perform string operations on a column that was created as an Array column. In this article, we will learn how to use PySpark concat_ws
.
The quickest way to get started working with python is to use the following docker compose file. Simple create a docker-compose.yml
, paste the following code, then run docker-compose up
. You will then see a link in the console to open up and access a jupyter notebook.
version: '3'
services:
spark:
image: jupyter/pyspark-notebook
ports:
- "8888:8888"
- "4040-4080:4040-4080"
volumes:
- ./notebooks:/home/jovyan/work/notebooks/
Let's start by creating a Spark Session.
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
Now, let's create a dataframe to work with.
rdd = spark.sparkContext.parallelize([
(["jan", "2019"], 86000, 30),
(["jan", "2020"], 71000, 30),
(["feb", "2019"], 99000, 42),
(["feb", "2020"], 99500, 36),
(["mar", "2019"], 92000, 25),
(["mar", "2020"], 91000, 50)
])
df = spark.createDataFrame(rdd, schema = ["month year", "total_revenue", "unique_products_sold"])
df.show()
+-----------+-------------+--------------------+
| month year|total_revenue|unique_products_sold|
+-----------+-------------+--------------------+
|[jan, 2019]| 86000| 30|
|[jan, 2020]| 71000| 30|
|[feb, 2019]| 99000| 42|
|[feb, 2020]| 99500| 36|
|[mar, 2019]| 92000| 25|
|[mar, 2020]| 91000| 50|
+-----------+-------------+--------------------+
To use concat_ws
we can pass a delimeter and the column we would like to concat. In our first example, we use this concat_ws
with the select method to group our month year column.
from pyspark.sql.functions import concat_ws, col
df.select(
concat_ws(", ", col("month year")).alias('MonthYear')
).show()
+---------+
|MonthYear|
+---------+
|jan, 2019|
|jan, 2020|
|feb, 2019|
|feb, 2020|
|mar, 2019|
|mar, 2020|
+---------+
We can also use the withColumn
to return a new DataFrame with the split column.
df.withColumn(
"MonthYear",
concat_ws(", ", col("month year"))
).drop('month year').show()
+-------------+--------------------+---------+
|total_revenue|unique_products_sold|MonthYear|
+-------------+--------------------+---------+
| 86000| 30|jan, 2019|
| 71000| 30|jan, 2020|
| 99000| 42|feb, 2019|
| 99500| 36|feb, 2020|
| 92000| 25|mar, 2019|
| 91000| 50|mar, 2020|
+-------------+--------------------+---------+
Another option we have is to use the sql api from PySpark. We first create a temporary table, then we can use the split method in our sql select.
df.createOrReplaceTempView("Sales")
spark.sql("select concat_ws(',', 'month year') as MonthYear from Sales") \
.show()
+----------+
| MonthYear|
+----------+
|month year|
|month year|
|month year|
|month year|
|month year|
|month year|
+----------+