PySpark Create Dataframe

09.21.2021

Intro

There are many ways to create a data frame in spark. You can supply the data yourself, use a pandas data frame, or read from a number of sources such as a database or even a Kafka stream. In this article, we will learn how to create DataFrames in PySpark.

Setting Up

The quickest way to get started working with python is to use the following docker compose file. Simple create a docker-compose.yml, paste the following code, then run docker-compose up. You will then see a link in the console to open up and access a jupyter notebook.

version: '3'
services:
  spark:
    image: jupyter/pyspark-notebook
    ports:
      - "8888:8888"
      - "4040-4080:4040-4080"
    volumes:
      - ./notebooks:/home/jovyan/work/notebooks/

Creating a PySpark Data Frame

We begin by creating a spark session and importing a few libraries.

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row

We can create a dataframe using the pyspark.sql Row class as follows:

df = spark.createDataFrame([
    Row(amount = 20000, month = 'jan', date = datetime(2000, 1, 1, 12, 0)),
    Row(amount = 40000, month = 'feb', date = datetime(2000, 2, 1, 12, 0)),
    Row(amount = 50000, month = 'mar', date = datetime(2000, 3, 1, 12, 0))
])

df.show()
+------+-----+-------------------+
|amount|month|               date|
+------+-----+-------------------+
| 20000|  jan|2000-01-01 12:00:00|
| 40000|  feb|2000-02-01 12:00:00|
| 50000|  mar|2000-03-01 12:00:00|
+------+-----+-------------------+

We can also start with a pandas data frame. This means you can import using any of the normal pandas methods and then pass the data frame to spark.

pd_df = pd.DataFrame({
    "amount": [2000, 40000, 50000],
    "month": ["jan", "feb", "mar"],
    "date": [datetime(2000, 1, 1, 12, 0), datetime(2000, 2, 12, 0), datetime(2000, 3, 1, 12, 0)]
})

df = spark.createDataFrame(pd_df)

df.show()
+------+-----+-------------------+
|amount|month|               date|
+------+-----+-------------------+
|  2000|  jan|2000-01-01 12:00:00|
| 40000|  feb|2000-02-12 00:00:00|
| 50000|  mar|2000-03-01 12:00:00|
+------+-----+-------------------+

We can also pass a list of tuples to the spark.sparkContext.parallelize method to create a Spark RDD. Then, we can create a data frame from this RDD.

rdd = spark.sparkContext.parallelize([
    (20000, 'jan', datetime(2000, 1, 1, 12, 0)),
    (40000, 'feb', datetime(2000, 2, 1, 12, 0)),
    (50000, 'mar', datetime(2000, 3, 1, 12, 0))
])
df = spark.createDataFrame(rdd, schema = ["amount", "month", "date"])
df.show()
+------+-----+-------------------+
|amount|month|               date|
+------+-----+-------------------+
| 20000|  jan|2000-01-01 12:00:00|
| 40000|  feb|2000-02-01 12:00:00|
| 50000|  mar|2000-03-01 12:00:00|
+------+-----+-------------------+

Other Data Sources

To read from files, we can use their corresponding methods. For example, here is how we would read from a csv.

df = spark.read.csv("file.csv")

And similarly, for a json file.

df = spark.read.json("file.json")

In the future, we will cover other import methods as some data sources, such as psql, require plugins.