PySpark Map

10.14.2021

Intro

The PySpark map method allows use to iterate over rows in an RDD and transform each item. Mapping is a common functional operation and PySpark allows us to use this at scale. In this article, we will learn how to use PySpark map.

Setting Up

The quickest way to get started working with python is to use the following docker compose file. Simple create a docker-compose.yml, paste the following code, then run docker-compose up. You will then see a link in the console to open up and access a jupyter notebook.

version: '3'
services:
  spark:
    image: jupyter/pyspark-notebook
    ports:
      - "8888:8888"
      - "4040-4080:4040-4080"
    volumes:
      - ./notebooks:/home/jovyan/work/notebooks/

Creating the Data

Let's start by creating a Spark Session.

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/usr/local/spark-3.1.2-bin-hadoop3.2/jars/spark-unsafe_2.12-3.1.2.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
21/10/15 12:25:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

Now, let's create a dataframe to work with.

rdd = spark.sparkContext.parallelize([    
    ("jan", 2019, 86000, 56),
    ("jan", 2020, 71000, 30),
    ("jan", 2021, 90000, 24),
    
    ("feb", 2019, 99000, 40),
    ("feb", 2020, 83000, 36),
    ("feb", 2021, 69000, 53),
    
    ("mar", 2019, 80000, 25),
    ("mar", 2020, 91000, 50)
])
df = spark.createDataFrame(rdd, schema = ["month", "year", "total_revenue", "unique_products_sold"])
df.show()
+-----+----+-------------+--------------------+
|month|year|total_revenue|unique_products_sold|
+-----+----+-------------+--------------------+
|  jan|2019|        86000|                  56|
|  jan|2020|        71000|                  30|
|  jan|2021|        90000|                  24|
|  feb|2019|        99000|                  40|
|  feb|2020|        83000|                  36|
|  feb|2021|        69000|                  53|
|  mar|2019|        80000|                  25|
|  mar|2020|        91000|                  50|
+-----+----+-------------+--------------------+

Using Map

To use map, we first have to use the .rdd property of our DataFrame. The map function will return a new RDD and then we can convert that back to a DataFrame. Here is an example of merging our first two columns. Notice that we use the column indexes to access the column values.

rdd = df.rdd.map(lambda x: 
    (x[0] + "," + str(x[1]), x[2], x[3] * 100)
)  

df_mapped = rdd.toDF(["MonthYear", "Revenue", "Product Sold"])

df_mapped.show()
+---------+-------+------------+
|MonthYear|Revenue|Product Sold|
+---------+-------+------------+
| jan,2019|  86000|        5600|
| jan,2020|  71000|        3000|
| jan,2021|  90000|        2400|
| feb,2019|  99000|        4000|
| feb,2020|  83000|        3600|
| feb,2021|  69000|        5300|
| mar,2019|  80000|        2500|
| mar,2020|  91000|        5000|
+---------+-------+------------+

If we want, we can also use the column names in map.

rdd = df.rdd.map(lambda x: 
    (x.month + "," + str(x.year), x.total_revenue, x.unique_products_sold * 100)
)  

df_mapped = rdd.toDF(["MonthYear", "Revenue", "Product Sold"])

df_mapped.show()
+---------+-------+------------+
|MonthYear|Revenue|Product Sold|
+---------+-------+------------+
| jan,2019|  86000|        5600|
| jan,2020|  71000|        3000|
| jan,2021|  90000|        2400|
| feb,2019|  99000|        4000|
| feb,2020|  83000|        3600|
| feb,2021|  69000|        5300|
| mar,2019|  80000|        2500|
| mar,2020|  91000|        5000|
+---------+-------+------------+

MapPartition

One final example to note, that if you want to map over multiple partitions, you can do so using the mapPartitions function. In your mapping function, you will recieve a single partition, so you need to iterate of each row in that partition. The result will keep the partitions.

def transformPartitions(data):
    for x in data:
        yield (x.month + "," + str(x.year), x.total_revenue, x.unique_products_sold * 100)

rdd = df.rdd.mapPartitions(transformPartitions)  

df_mapped = rdd.toDF(["MonthYear", "Revenue", "Product Sold"])

df_mapped.show()
+---------+-------+------------+
|MonthYear|Revenue|Product Sold|
+---------+-------+------------+
| jan,2019|  86000|        5600|
| jan,2020|  71000|        3000|
| jan,2021|  90000|        2400|
| feb,2019|  99000|        4000|
| feb,2020|  83000|        3600|
| feb,2021|  69000|        5300|
| mar,2019|  80000|        2500|
| mar,2020|  91000|        5000|
+---------+-------+------------+