There are many ways to create a data frame in spark. You can supply the data yourself, use a pandas data frame, or read from a number of sources such as a database or even a Kafka stream. In this article, we will learn how to create DataFrames in PySpark.
The quickest way to get started working with python is to use the following docker compose file. Simple create a docker-compose.yml
, paste the following code, then run docker-compose up
. You will then see a link in the console to open up and access a jupyter notebook.
version: '3'
services:
spark:
image: jupyter/pyspark-notebook
ports:
- "8888:8888"
- "4040-4080:4040-4080"
volumes:
- ./notebooks:/home/jovyan/work/notebooks/
We begin by creating a spark session and importing a few libraries.
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row
We can create a dataframe using the pyspark.sql
Row class as follows:
df = spark.createDataFrame([
Row(amount = 20000, month = 'jan', date = datetime(2000, 1, 1, 12, 0)),
Row(amount = 40000, month = 'feb', date = datetime(2000, 2, 1, 12, 0)),
Row(amount = 50000, month = 'mar', date = datetime(2000, 3, 1, 12, 0))
])
df.show()
+------+-----+-------------------+
|amount|month| date|
+------+-----+-------------------+
| 20000| jan|2000-01-01 12:00:00|
| 40000| feb|2000-02-01 12:00:00|
| 50000| mar|2000-03-01 12:00:00|
+------+-----+-------------------+
We can also start with a pandas data frame. This means you can import using any of the normal pandas methods and then pass the data frame to spark.
pd_df = pd.DataFrame({
"amount": [2000, 40000, 50000],
"month": ["jan", "feb", "mar"],
"date": [datetime(2000, 1, 1, 12, 0), datetime(2000, 2, 12, 0), datetime(2000, 3, 1, 12, 0)]
})
df = spark.createDataFrame(pd_df)
df.show()
+------+-----+-------------------+
|amount|month| date|
+------+-----+-------------------+
| 2000| jan|2000-01-01 12:00:00|
| 40000| feb|2000-02-12 00:00:00|
| 50000| mar|2000-03-01 12:00:00|
+------+-----+-------------------+
We can also pass a list of tuples to the spark.sparkContext.parallelize
method to create a Spark RDD. Then, we can create a data frame from this RDD.
rdd = spark.sparkContext.parallelize([
(20000, 'jan', datetime(2000, 1, 1, 12, 0)),
(40000, 'feb', datetime(2000, 2, 1, 12, 0)),
(50000, 'mar', datetime(2000, 3, 1, 12, 0))
])
df = spark.createDataFrame(rdd, schema = ["amount", "month", "date"])
df.show()
+------+-----+-------------------+
|amount|month| date|
+------+-----+-------------------+
| 20000| jan|2000-01-01 12:00:00|
| 40000| feb|2000-02-01 12:00:00|
| 50000| mar|2000-03-01 12:00:00|
+------+-----+-------------------+
To read from files, we can use their corresponding methods. For example, here is how we would read from a csv.
df = spark.read.csv("file.csv")
And similarly, for a json file.
df = spark.read.json("file.json")
In the future, we will cover other import methods as some data sources, such as psql, require plugins.