Analyzing the S&P 500 with PySpark


Author :: Kevin Vecmanis


The Spark dataframe API is moving undeniably towards the look and feel of Pandas dataframes, but there are some key differences in the way these two libraries operate. In this post I walk through an analysis of the S&P500 to illustrate common data analysis functionality in PySpark.

In this article you will learn:
  • How to create a Spark session in Python
  • How to import a CSV file into a Spark dataframe object
  • How to convert date strings to datetime objects
  • How to create new dataframe columns in PySpark
  • How to create and use a user defined function in PySpark
  • General methods for answering questions about an asset class's price history


Table of Contents



Introduction

This article is going to focus on typical adhoc data analysis functions using PySpark. Installing PySpark isn’t as straightforward as other installations, so if you haven’t installed PySpark yet I recommend reading my tutorial on installing it: Complete Guide to Installing PySpark on MacOS.



Creating a Spark object and Importing our .CSV data file

Here we’re creating a SparkSession and using it to import a CSV file with historical S&P 500 price data. We’re loading this into a spark dataframe called spx.

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('spx_analysis').getOrCreate()
spx = spark.read.csv('spx.csv', inferSchema = True, header=True)
spx.columns
['    Date',
 '       Open',
 '       High',
 '        Low',
 '      Close',
 '        Volume ']

We can see that there’s a problem with our column names - they are filled with random white space. So let’s go ahead and remove that. There are several ways to do this, but essentially we just want to rename these columns. We can do this in PySpark like this:

from pyspark.sql.functions import col

exprs = [col(column).alias(column.replace(' ', '')) for column in spx.columns]
spx = spx.select(*exprs)

Let’s walk through these two lines of code. Apache Spark has a list of built in functions available for dataframes. You can see the complete documentation here: pyspark.sql module

The col function returns a column object based on the given name. In this list comprehension we’re pulling each column object in the Spark dataframe and assigning it an alias that has all the whitespaced removed. We can then reassign the dataframe variable spx with the new column aliases.

The new output will be:

spx.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume']

We can see a description of our dataframe schema like this:

spx.printSchema()
root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: double (nullable = true)

One issue that we still have is that our Date column is a string type. We want to convert this to a datetime object so that we create new date-based features to perform analysis.

For this we can leverage PySpark’s built-in User Defined Function module, udf. While we’re at it, we’ll also import the month function and dayofweek functions. month returns an integer value from 1 to 12 (representing the month of the year), and dayofweek returns an integer value from 1 (Sunday) to 7 (Saturday). Because we’re dealing with stock market data, we should only see numbers from 2 to 6.

from datetime import datetime
from pyspark.sql.functions import udf, month, dayofweek, 
from pyspark.sql.types import DateType

# Setting a user defined function:
# This function converts the string cell into a date:
func =  udf (lambda x: datetime.strptime(x, '%m/%d/%Y'), DateType())
spx = spx.withColumn('Date', func(col('Date')))
spx.printSchema()
root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: double (nullable = true)

Our Date column is now a datetime object. We can see a description of our dataframe by running the following:

spx.describe().show()
+-------+-----------------+-----------------+-----------------+-----------------+--------------------+
|summary|             Open|             High|              Low|            Close|              Volume|
+-------+-----------------+-----------------+-----------------+-----------------+--------------------+
|  count|            12465|            12465|            12465|            12465|               12465|
|   mean|792.2395531488153|796.9072996389889|787.2112162053718|792.3797866024889|1.0043954708126755E9|
| stddev|723.2147727715842|726.6513155603651|719.4868797613425|723.3173413540445|1.3674823710970588E9|
|    min|            62.28|            63.23|            60.96|            62.28|                 0.0|
|    max|          2952.33|          2954.13|          2939.35|          2945.83|       9.512567808E9|
+-------+-----------------+-----------------+-----------------+-----------------+--------------------+

Now that we have our Spark dataframe with tidy column names and appropriate data types let’s use PySpark’s functionality to add new features and answer some questions about the S&P 500.


What day did the S&P 500 reach it’s highest peak price? What was the peak price?
print(spx.orderBy(spx["High"].desc()).head(1)[0][0]) # print date
print(spx.orderBy(spx["High"].desc()).head(1)[0][2]) # print price

'05/01/2019'
2954.13

Our output here is May 1st, 2019 and the corresponding all-time high was 2954.13.

What was the lowest daily return for the S&P 500?

For this, we need to create a new column to calculate the daily return. I’ll define this as the log of Close/Open.

from pyspark.sql.functions import log 

spx = spx.withColumn("Return", log(spx["Close"]/spx["Open"]) * 100)
spx.select('Return').describe().show()
+-------+--------------------+
|summary|              return|
+-------+--------------------+
|  count|               12465|
|   mean|0.015515563029372223|
| stddev|  0.8899828007318079|
|    min|  -9.127249466015163|
|    max|  10.245732795933025|
+-------+--------------------+

We can see that the lowest daily return for the S&P 500 was -9.13%. But when was it?

print(spx.orderBy(spx["return"].asc()).head(1)[0][0]) # print date
print(spx.orderBy(spx["return"].asc()).head(1)[0][6]) # print return

10/15/2008
-9.127249466015163

There’s a better way to reference these data. To explain that, it helps to break down what’s happening in the code above:

  • spx.orderBy(spx["return"].asc()) sorts the dataframe by returns, lowest to highest.
  • .head(1) returns the top row object of the dataframe as the sole item in a list. head() always returns a list in PySpark - something worth remembering.
  • Because we have a list, head(1)[0] returns the first row object in the list.
  • head(1)[0][6] returns the return column value because return is the 7th column.

PySpark offers us a way to convert row objects to dictionaries so that they can be accessed using the key names more intuitively. We can use asDict().

spx.orderBy(spx["return"].asc()).head(1)[0].asDict()
{'Date': '10/15/2008',
 'Open': 994.6,
 'High': 994.6,
 'Low': 903.99,
 'Close': 907.84,
 'Volume': 5391184384.0,
 'Return': -9.127249466015163}



What month has had the highest average daily returns?
spx.groupBy('Month').agg({'Return': 'mean'}).orderBy('avg(Return)').collect()
[Row(Month=9, avg(Return)=-0.02478001702557699),
 Row(Month=8, avg(Return)=-0.02003023613248328),
 Row(Month=6, avg(Return)=-0.011699020641305635),
 Row(Month=2, avg(Return)=0.009929214830250615),
 Row(Month=7, avg(Return)=0.0165204229244471),
 Row(Month=1, avg(Return)=0.017035074376419303),
 Row(Month=10, avg(Return)=0.017466736429507343),
 Row(Month=5, avg(Return)=0.01753874587884381),
 Row(Month=4, avg(Return)=0.026848571353080265),
 Row(Month=11, avg(Return)=0.04243340922116308),
 Row(Month=12, avg(Return)=0.046188427581179685),
 Row(Month=3, avg(Return)=0.04795851309791375)]

We can see that March has, historically, the highest daily average return at 0.048% and September has the lowest at -0.0248%.



What day of the week, if any, exhibits the highest returns?

In order to generate these metrics we need to add another column to our spark dataframe called dayofweek. Then we can run another groupBy call to retrieve something similar to the prior section.

spx = spx.withColumn("DayOfWeek", dayofweek(spx['Date']))
spx.groupBy('DayOfWeek').agg({'Return': 'mean'}).orderBy('avg(Return)').collect()
[Row(DayOfWeek=2, avg(Return)= -0.014361180650896898),
 Row(DayOfWeek=5, avg(Return)= 0.01024465844728082),
 Row(DayOfWeek=6, avg(Return)= 0.01606449592480873),
 Row(DayOfWeek=3, avg(Return)= 0.024974143676216223),
 Row(DayOfWeek=4, avg(Return)= 0.038366757107899954)]

Here we can see that Monday (day = 2) typically is the worst day for the S&P500, and Wednesday (day = 4) is typically the best day.

Note: To determine if these differences are statistically significant you should conduct a t-test to account for sample size and standard deviation of these daily returns.


What is the biggest intrady price swing?

Intrady price swings are defined as ((High-Low) / Close) * 100 - we want this expressed as a percentage. To do this we’ll make a new column called Swing and return the row object with the maximum value.

swing_formula = 100 * (col('High') - col('Low')) / col('Close')
spx = spx.withColumn("Swing", swing_formula)

# To display our new column with the inputs...
spx.select(['High','Low','Close','Swing']).show()
+-----+-----+-----+------------------+
| High|  Low|Close|             Swing|
+-----+-----+-----+------------------+
|93.54|91.79| 93.0| 1.881720430107527|
|94.25|92.53|93.46| 1.840359512090733|
|93.81|92.13|92.82|1.8099547511312293|
|93.38|91.93|92.63|1.5653675914930245|
|93.47|91.99|92.68| 1.596892533448429|
|93.25|91.82| 92.4| 1.547619047619055|
|92.67| 91.2| 91.7|1.6030534351145025|
|92.61|90.99|91.92|1.7624020887728509|
|92.40|90.88|91.65|1.6584833606110312|
|92.35|90.73|91.68| 1.767015706806272|
|92.49|90.36|90.92|2.3427188737351465|
|90.72|89.14|89.65| 1.762409369771331|
|90.45|88.64|89.83|2.0149170655682984|
|90.61| 89.2|89.95|  1.56753752084491|
|90.80| 89.2|90.04|1.7769880053309577|
|90.45|88.74|89.37|1.9133937562940673|
|89.23|87.49|88.17|1.9734603606669037|
|88.54|86.92|87.62|1.8488929468158006|
|88.24|86.44|86.79|2.0739716557207015|
|87.09|85.02|85.69| 2.415684443925788|
+-----+-----+-----+------------------+
only showing top 20 rows


To get our highest intraday swing we can do the following - returning the row object as a dict this time so it’s easier to read.

spx.orderBy(spx["Swing"].desc()).head(1)[0].asDict()
{'Date': datetime.date(1987, 10, 20),
 'Open': 236.83,
 'High': 245.61,
 'Low': 216.46,
 'Close': 236.83,
 'Volume': 0.0,
 'Return': 0.0,
 'Month': 10,
 '7DMA': 285.3275,
 'DayOfWeek': 3,
 'Swing': 12.308406874129123}

We can see that our largest swing was on October 20th, 1987, the day after Black Monday, with an intraday swing of 12.31% !

What is the most that the S&P 500 has ever exceeded its 50-day moving average?

To calculate this, we’ll need to make two new columns:

  • A new column for the 50-day moving average
  • A column to calculate the rolling deviation from this moving average.


Step 1: Calculate 50-Day moving average

To calculate moving averages in PySpark we need to leverage the Window function.

from pyspark.sql.window import Window

# Create a window between today and 50 days ago.
windowSpec = Window.orderBy(col("Date")).rowsBetween(-50, 0)

# Calculate new moving average column using 'avg' and the windowSpec
spx = spx.withColumn('50DMA', avg("Close").over(windowSpec)) 


Step 2: Calculate deviation from 50-Day moving average
deviation = log(col('Close')/col('50DMA'))*100
spx = spx.withColumn("50_DMA_DEV", deviation)
spx.select(['Date','Close','50DMA','50_DMA_DEV']).show()
+----------+-----+-----------------+--------------------+
|      Date|Close|            50DMA|          50_DMA_DEV|
+----------+-----+-----------------+--------------------+
|1970-01-02| 93.0|             93.0|                 0.0|
|1970-01-05|93.46|93.22999999999999|  0.2463978963677856|
|1970-01-06|92.82|93.09333333333332|-0.29404403156411246|
|1970-01-07|92.63|92.97749999999999|-0.37444651323383776|
|1970-01-08|92.68|92.91799999999999| -0.2564684214870897|
|1970-01-09| 92.4|92.83166666666666|-0.46608385690632365|
|1970-01-12| 91.7|            92.67| -1.0522416336180849|
|1970-01-13|91.92|         92.57625| -0.7113995733504739|
|1970-01-14|91.65|92.47333333333333| -0.8943340381320259|
|1970-01-15|91.68|           92.394| -0.7757788507901322|
|1970-01-16|90.92|            92.26| -1.4630679145332042|
|1970-01-19|89.65|          92.0425|  -2.633722685057328|
|1970-01-20|89.83|91.87230769230769|  -2.248065783320994|
|1970-01-21|89.95|           91.735| -1.9650025434140774|
|1970-01-22|90.04|           91.622|  -1.741740147672272|
|1970-01-23|89.37|91.48124999999999| -2.3348977856819215|
|1970-01-26|88.17|91.28647058823529| -3.4735821193107745|
|1970-01-27|87.62|91.08277777777776| -3.8759456563629078|
|1970-01-28|86.79|90.85684210526314|  -4.579369646097863|
|1970-01-29|85.69|90.59849999999999|  -5.570152393945847|
+----------+-----+-----------------+--------------------+
only showing top 20 rows

Now to see our largest deviation ever, we use our familiar orderBy method.

spx.orderBy(spx["50_DMA_DEV"].desc()).head(1)[0].asDict()
{'Date': datetime.date(1982, 10, 13),
 'Open': 136.71,
 'High': 138.88,
 'Low': 133.42,
 'Close': 136.71,
 'Volume': 0.0,
 'Return': 0.0,
 'Month': 10,
 '7DMA': 129.36375,
 'DayOfWeek': 4,
 'Swing': 3.993855606758838,
 '50DMA': 118.25745098039216,
 '50_DMA_DEV': 14.499785815192304}

We can see that the most the S&P 500’s closing price has moved from the 50-Day moving average (to the upside) is 14.5%.



What is the most active month for trading volumes?
spx.groupBy('Month').agg({'Volume': 'mean'}).orderBy('avg(Volume)').collect()
[Row(Month=8, avg(Volume)=909247383.5138122),
 Row(Month=12, avg(Volume)=939309235.7947725),
 Row(Month=6, avg(Volume)=961227856.3961905),
 Row(Month=7, avg(Volume)=962498152.7272727),
 Row(Month=9, avg(Volume)=996474784.996985),
 Row(Month=5, avg(Volume)=997403773.401322),
 Row(Month=11, avg(Volume)=1010177471.5515516),
 Row(Month=4, avg(Volume)=1030796912.7736944),
 Row(Month=2, avg(Volume)=1049379894.8571428),
 Row(Month=3, avg(Volume)=1053430377.6322049),
 Row(Month=10, avg(Volume)=1072404461.9168975),
 Row(Month=1, avg(Volume)=1072697156.0615385)]

Here we can see January has the highest average number of contracts traded with roughly 1.07 billion contracts trading hands. August, December, June, and July are typically the quietest months for trading volume - this makes sense because these are typically “holiday months”.


Summary

PySpark is an incredibly powerful Python framework that let’s Python users leverage the power of Apache Spark. In this post we walked through the Dataframe syntax in PySpark to illustrate some of the more common functions. This was by no means exhaustive, however. The method library in Spark is vast, powerful, and growing all the time.

I hope you enjoyed this post!

Kevin Vecmanis