PySpark ForEach

10.16.2021

Intro

The PySpark forEach method allows us to iterate over the rows in a DataFrame. Unlike methods like map and flatMap, the forEach method does not transform or returna any values. In this article, we will learn how to use PySpark forEach.

Setting Up

The quickest way to get started working with python is to use the following docker compose file. Simple create a docker-compose.yml, paste the following code, then run docker-compose up. You will then see a link in the console to open up and access a jupyter notebook.

version: '3'
services:
  spark:
    image: jupyter/pyspark-notebook
    ports:
      - "8888:8888"
      - "4040-4080:4040-4080"
    volumes:
      - ./notebooks:/home/jovyan/work/notebooks/

Creating the Data

Let's start by creating a Spark Session.

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

Now, let's create a dataframe to work with.

rdd = spark.sparkContext.parallelize([    
    ("jan", 2019, 86000, 56),
    ("jan", 2020, 71000, 30),
    ("jan", 2021, 90000, 24),
    
    ("feb", 2019, 99000, 40),
    ("feb", 2020, 83000, 36),
    ("feb", 2021, 69000, 53),
    
    ("mar", 2019, 80000, 25),
    ("mar", 2020, 91000, 50)
])
df = spark.createDataFrame(rdd, schema = ["month", "year", "total_revenue", "unique_products_sold"])
df.show()
+-----+----+-------------+--------------------+
|month|year|total_revenue|unique_products_sold|
+-----+----+-------------+--------------------+
|  jan|2019|        86000|                  56|
|  jan|2020|        71000|                  30|
|  jan|2021|        90000|                  24|
|  feb|2019|        99000|                  40|
|  feb|2020|        83000|                  36|
|  feb|2021|        69000|                  53|
|  mar|2019|        80000|                  25|
|  mar|2020|        91000|                  50|
+-----+----+-------------+--------------------+

Using ForEach

To use forEach, we first have to use the .rdd property of our DataFrame. A common use of forEach is to print out data, here we print each row.

df.rdd.foreach(lambda x: 
              print(x[0], x[1], x[2], x[3]))  
mar 2019 80000 25
feb 2021 69000 53
jan 2019 86000 56
feb 2020 83000 36
feb 2019 99000 40
mar 2020 91000 50
jan 2020 71000 30
jan 2021 90000 24

If we want, we can also use the column names in map.

df.rdd.foreach(lambda x: print((x.month, x.year)))  
('jan', 2020)
('jan', 2021)
('feb', 2021)
('jan', 2019)
('feb', 2019)
('mar', 2020)
('feb', 2020)
('mar', 2019)

Other ways to iterate

Just for a note, we can use the collect method and the toLocalIterator method to loop through our dataframes. Here are some examples.

for row in df.collect():
    print(row.month)
jan
jan
jan
feb
feb
feb
mar
mar
for row in df.rdd.toLocalIterator():
    print(row.month)
jan
jan
jan
feb
feb
feb
mar
mar