This is the second blog post on the Spark tutorial series to help big data enthusiasts prepare for Apache Spark Certification from companies such as Cloudera, Hortonworks, Databricks, etc. The first one is here. If you want to learn/master Spark with Python or if you are preparing for a Spark Certification to show your skills in big data, these articles are for you. Previously, we visualized thefts in Chicago by using ggplot2
package in R.
In this tutorial, we will analyze crimes data from data. The dataset reflects reported incidents of crime (with the exception of murders where data exists for each victim) that occurred in the City of Chicago since 2001.
A SparkSession can be used create DataFrame, register DataFrame as tables, execute SQL over tables, cache tables, and read parquet files. It is the entry point to programming Spark with the DataFrame API. We can create a SparkSession, usfollowing builder pattern:
from pyspark.sql import SparkSession spark = SparkSession.builder.appName("Chicago_crime_analysis").getOrCreate()
We can let Spark infer the schema of our csv data but proving pre-defined schema makes the reading process faster. Further, it helps us to make the colum names to have the format we want, for example, to avoid spaces in the names of the columns.
from pyspark.sql.types import (StructType, StructField, DateType, BooleanType, DoubleType, IntegerType, StringType, TimestampType) crimes_schema = StructType([StructField("ID", StringType(), True), StructField("CaseNumber", StringType(), True), StructField("Date", StringType(), True ), StructField("Block", StringType(), True), StructField("IUCR", StringType(), True), StructField("PrimaryType", StringType(), True ), StructField("Description", StringType(), True ), StructField("LocationDescription", StringType(), True ), StructField("Arrest", BooleanType(), True), StructField("Domestic", BooleanType(), True), StructField("Beat", StringType(), True), StructField("District", StringType(), True), StructField("Ward", StringType(), True), StructField("CommunityArea", StringType(), True), StructField("FBICode", StringType(), True ), StructField("XCoordinate", DoubleType(), True), StructField("YCoordinate", DoubleType(), True ), StructField("Year", IntegerType(), True), StructField("UpdatedOn", DateType(), True ), StructField("Latitude", DoubleType(), True), StructField("Longitude", DoubleType(), True), StructField("Location", StringType(), True ) ])
Create crimes dataframe by providing the schema above
crimes = spark.read.csv("Chicago_crimes_2001_to_present.csv", header = True, schema = crimes_schema)
First, let’se see how many rows the crimes dataframe has:
print(" The crimes dataframe has {} records".format(crimes.count())) The crimes dataframe has 6481208 records
We can also see the columns, the data type of each column and the schema using the commands below.
crimes.columns ['ID', 'CaseNumber', 'Date', 'Block', 'IUCR', 'PrimaryType', 'Description', 'LocationDescription', 'Arrest', 'Domestic', 'Beat', 'District', 'Ward', 'CommunityArea', 'FBICode', 'XCoordinate', 'YCoordinate', 'Year', 'UpdatedOn', 'Latitude', 'Longitude', 'Location']
crimes.dtypes [('ID', 'string'), ('CaseNumber', 'string'), ('Date', 'string'), ('Block', 'string'), ('IUCR', 'string'), ('PrimaryType', 'string'), ('Description', 'string'), ('LocationDescription', 'string'), ('Arrest', 'boolean'), ('Domestic', 'boolean'), ('Beat', 'string'), ('District', 'string'), ('Ward', 'string'), ('CommunityArea', 'string'), ('FBICode', 'string'), ('XCoordinate', 'double'), ('YCoordinate', 'double'), ('Year', 'int'), ('UpdatedOn', 'date'), ('Latitude', 'double'), ('Longitude', 'double'), ('Location', 'string')]
crimes.printSchema() root |-- ID: string (nullable = true) |-- CaseNumber: string (nullable = true) |-- Date: string (nullable = true) |-- Block: string (nullable = true) |-- IUCR: string (nullable = true) |-- PrimaryType: string (nullable = true) |-- Description: string (nullable = true) |-- LocationDescription: string (nullable = true) |-- Arrest: boolean (nullable = true) |-- Domestic: boolean (nullable = true) |-- Beat: string (nullable = true) |-- District: string (nullable = true) |-- Ward: string (nullable = true) |-- CommunityArea: string (nullable = true) |-- FBICode: string (nullable = true) |-- XCoordinate: double (nullable = true) |-- YCoordinate: double (nullable = true) |-- Year: integer (nullable = true) |-- UpdatedOn: date (nullable = true) |-- Latitude: double (nullable = true) |-- Longitude: double (nullable = true) |-- Location: string (nullable = true)
We can also quickly see some rows as below. We select one or more columns using select. show helps us to print the first n rows.
crimes.select("Date").show(10, truncate = False) +----------------------+ |Date | +----------------------+ |01/31/2006 12:13:05 PM| |03/21/2006 07:00:00 PM| |02/09/2006 01:44:41 AM| |03/21/2006 04:45:00 PM| |03/21/2006 10:00:00 PM| |03/20/2006 11:00:00 PM| |02/01/2006 11:25:00 PM| |03/21/2006 02:37:00 PM| |02/09/2006 05:38:07 AM| |11/29/2005 03:10:00 PM| +----------------------+ only showing top 10 rows
Change data type of a column
The Date column is in string format. Let’s change it to timestamp format using the user-defined functions (udf).
withColumn helps to create a new column and we remove one or more columns with drop.
from datetime import datetime from pyspark.sql.functions import col,udf myfunc = udf(lambda x: datetime.strptime(x, '%m/%d/%Y %I:%M:%S %p'), TimestampType()) df = crimes.withColumn('Date_time', myfunc(col('Date'))).drop("Date") df.select(df["Date_time"]).show(5) +-------------------+ | Date_time| +-------------------+ |2006-01-31 12:13:05| |2006-03-21 19:00:00| |2006-02-09 01:44:41| |2006-03-21 16:45:00| |2006-03-21 22:00:00| +-------------------+ only showing top 5 rows
Calculate statistics of numeric and string columns
We can calculate the statistics of string and numeric columns using describe. When we select more than one columns, we have to pass the column names as a python list.
crimes.select(["Latitude","Longitude","Year","XCoordinate","YCoordinate"]).describe().show() +-------+-------------------+-------------------+------------------+------------------+------------------+ |summary| Latitude| Longitude| Year| XCoordinate| YCoordinate| +-------+-------------------+-------------------+------------------+------------------+------------------+ | count| 6393147| 6393147| 6479775| 6393147| 6393147| | mean| 41.84186221474304| -87.67189839071902|2007.9269172154898|1164490.5803256205|1885665.2150490205| | stddev|0.09076954083441872|0.06277083346349299| 4.712584642906088|17364.095200290543|32982.572778759975| | min| 36.619446395| -91.686565684| 2001| 0.0| 0.0| | max| 42.022910333| -87.524529378| 2017| 1205119.0| 1951622.0| +-------+-------------------+-------------------+------------------+------------------+------------------+
The above numbers are ugly. Let’s round them using format_number from PySpark’s the functions.
from pyspark.sql.functions import format_number result = crimes.select(["Latitude","Longitude","Year","XCoordinate","YCoordinate"]).describe() result.select(result['summary'], format_number(result['Latitude'].cast('float'),2).alias('Latitude'), format_number(result['Longitude'].cast('float'),2).alias('Longitude'), result['Year'].cast('int').alias('year'), format_number(result['XCoordinate'].cast('float'),2).alias('XCoordinate'), format_number(result['YCoordinate'].cast('float'),2).alias('YCoordinate') ).show() +-------+------------+------------+-------+------------+------------+ |summary| Latitude| Longitude| year| XCoordinate| YCoordinate| +-------+------------+------------+-------+------------+------------+ | count|6,394,450.00|6,394,450.00|6481208|6,394,450.00|6,394,450.00| | mean| 41.84| -87.67| 2007|1,164,490.62|1,885,665.88| | stddev| 0.09| 0.06| 4| 17,363.81| 32,982.29| | min| 36.62| -91.69| 2001| 0.00| 0.00| | max| 42.02| -87.52| 2017|1,205,119.00|1,951,622.00| +-------+------------+------------+-------+------------+------------+
How many primary crime types are there?
distinct returns distinct elements.
crimes.select("PrimaryType").distinct().count() 35
We can also see a list of the primary crime types.
crimes.select("PrimaryType").distinct().show(n = 35) +--------------------+ | PrimaryType| +--------------------+ |OFFENSE INVOLVING...| | STALKING| |PUBLIC PEACE VIOL...| | OBSCENITY| |NON-CRIMINAL (SUB...| | ARSON| | DOMESTIC VIOLENCE| | GAMBLING| | CRIMINAL TRESPASS| | ASSAULT| | NON - CRIMINAL| |LIQUOR LAW VIOLATION| | MOTOR VEHICLE THEFT| | THEFT| | BATTERY| | ROBBERY| | HOMICIDE| | RITUALISM| | PUBLIC INDECENCY| | CRIM SEXUAL ASSAULT| | HUMAN TRAFFICKING| | INTIMIDATION| | PROSTITUTION| | DECEPTIVE PRACTICE| |CONCEALED CARRY L...| | SEX OFFENSE| | CRIMINAL DAMAGE| | NARCOTICS| | NON-CRIMINAL| | OTHER OFFENSE| | KIDNAPPING| | BURGLARY| | WEAPONS VIOLATION| |OTHER NARCOTIC VI...| |INTERFERENCE WITH...| +--------------------+
How many homicides are there in the dataset?
crimes.where(crimes["PrimaryType"] == "HOMICIDE").count() 8847
how many domestic assualts there are?
Make sure to add in the parenthesis separating the statements!
crimes.filter((crimes["PrimaryType"] == "ASSAULT") & (crimes["Domestic"] == "True")).count() 86552
We can use filter or where to do filtering.
columns = ['PrimaryType', 'Description', 'Arrest', 'Domestic'] crimes.where((crimes["PrimaryType"] == "HOMICIDE") & (crimes["Arrest"] == "true"))\ .select(columns).show(10) +-----------+-------------------+------+--------+ |PrimaryType| Description|Arrest|Domestic| +-----------+-------------------+------+--------+ | HOMICIDE|FIRST DEGREE MURDER| true| true| | HOMICIDE|FIRST DEGREE MURDER| true| false| | HOMICIDE|FIRST DEGREE MURDER| true| false| | HOMICIDE|FIRST DEGREE MURDER| true| false| | HOMICIDE|FIRST DEGREE MURDER| true| false| | HOMICIDE|FIRST DEGREE MURDER| true| false| | HOMICIDE|FIRST DEGREE MURDER| true| false| | HOMICIDE|FIRST DEGREE MURDER| true| false| | HOMICIDE|FIRST DEGREE MURDER| true| false| | HOMICIDE|FIRST DEGREE MURDER| true| true| +-----------+-------------------+------+--------+ only showing top 10 rows
We can use limit to limit the number of columns we want to retrieve from a dataframe.
crimes.select(columns).limit(10). show(truncate = True) +-------------------+--------------------+------+--------+ | PrimaryType| Description|Arrest|Domestic| +-------------------+--------------------+------+--------+ | NARCOTICS|POSS: CANNABIS 30...| true| false| | CRIMINAL TRESPASS| TO LAND| true| false| | NARCOTICS|POSS: CANNABIS 30...| true| false| | THEFT| OVER $500| false| false| | THEFT| $500 AND UNDER| false| false| |MOTOR VEHICLE THEFT| AUTOMOBILE| false| false| | NARCOTICS| POSS: CRACK| true| false| | CRIMINAL DAMAGE| TO PROPERTY| false| false| | PROSTITUTION|SOLICIT FOR PROST...| true| false| | CRIMINAL DAMAGE| TO STATE SUP PROP| false| false| +-------------------+--------------------+------+--------+
Create a new column with withColumn
lat_max = crimes.agg({"Latitude" : "max"}).collect()[0][0] print("The maximum latitude values is {}".format(lat_max)) The maximum latitude values is 42.022910333
Let’s subtract each latitude value from the maximum latitude.
df = crimes.withColumn("difference_from_max_lat",lat_max - crimes["Latitude"]) df.select(["Latitude", "difference_from_max_lat"]).show(5) +------------+-----------------------+ | Latitude|difference_from_max_lat| +------------+-----------------------+ |42.002478396| 0.0204319369999979| |41.780595495| 0.24231483799999864| |41.787955143| 0.23495519000000087| |41.901774026| 0.12113630700000044| |41.748674558| 0.27423577500000107| +------------+-----------------------+ only showing top 5 rows
Rename a column with withColumnRenamed
Let’s rename Latitude to Lat.
df = crimes.withColumnRenamed("Latitude", "Lat") columns = ['PrimaryType', 'Description', 'Arrest', 'Domestic','Lat'] +-------------------+--------------------+------+--------+------------+ | PrimaryType| Description|Arrest|Domestic| Lat| +-------------------+--------------------+------+--------+------------+ | THEFT| $500 AND UNDER| false| false|42.022910333| |MOTOR VEHICLE THEFT| AUTOMOBILE| false| false|42.022878225| | BURGLARY| UNLAWFUL ENTRY| false| false|42.022709624| | THEFT| POCKET-PICKING| false| false|42.022671246| | THEFT| $500 AND UNDER| false| false|42.022671246| | OTHER OFFENSE| PAROLE VIOLATION| true| false|42.022671246| | CRIMINAL DAMAGE| TO VEHICLE| false| false|42.022653914| | BATTERY| SIMPLE| false| true|42.022644813| | NARCOTICS|POSS: CANNABIS 30...| true| false|42.022644813| | OTHER OFFENSE| TELEPHONE THREAT| false| true|42.022644369| +-------------------+--------------------+------+--------+------------+ only showing top 10 rows
Use PySpark’s functions to calculate various statistics
Calculate average latitude value.
from pyspark.sql.functions import mean df.select(mean("Lat")).alias("Mean Latitude").show() +-----------------+ | avg(Lat)| +-----------------+ |41.84186221474304| +-----------------+
We can also use the agg method to calculate the average.
df.agg({"Lat":"avg"}).show() +------------------+ | avg(Lat)| +------------------+ |41.841863914298415| +------------------+
We can also calculate maximum and minimum values using functions from Pyspark.
from pyspark.sql.functions import max,min df.select(max("Xcoordinate"),min("Xcoordinate")).show() +----------------+----------------+ |max(Xcoordinate)|min(Xcoordinate)| +----------------+----------------+ | 1205119.0| 0.0| +----------------+----------------+
What percentage of the crimes are domestic
df.filter(df["Domestic"]==True).count()/df.count() * 100 12.988412036768453
What is the Pearson correlation between Lat and Ycoordinate?
from pyspark.sql.functions import corr df.select(corr("Lat","Ycoordinate")).show() from pyspark.sql.functions import corr df.select(corr("Lat","Ycoordinate")).show()
Find the number of crimes per year
df.groupBy("Year").count().show() df.groupBy("Year").count().collect()
We can also use matplotlib and Pandas to visualize the total number of crimes per year.
count = [item[1] for item in df.groupBy("Year").count().collect()] year = [item[0] for item in df.groupBy("Year").count().collect()] number_of_crimes_per_year = {"count":count, "year" : year} import pandas as pd import matplotlib.pyplot as plt %matplotlib inline number_of_crimes_per_year = pd.DataFrame(number_of_crimes_per_year) number_of_crimes_per_year.head() count year 0 475921 2003 1 436966 2007 2 263495 2015 3 448066 2006 4 306847 2013
number_of_crimes_per_year = number_of_crimes_per_year.sort_values(by = "year") number_of_crimes_per_year.plot(figsize = (20,10), kind = "bar", color = "red", x = "year", y = "count", legend = False) plt.xlabel("", fontsize = 18) plt.ylabel("Number of Crimes", fontsize = 18) plt.title("Number of Crimes Per Year", fontsize = 28) plt.xticks(size = 18) plt.yticks(size = 18) plt.show()
Plot number of crimes by month
from pyspark.sql.functions import month monthdf = df.withColumn("Month",month("Date_time")) monthCounts = monthdf.select("Month").groupBy("Month").count() monthCounts = monthCounts.collect() months = [item[0] for item in monthCounts] count = [item[1] for item in monthCounts] crimes_per_month = {"month":months, "crime_count": count} crimes_per_month = pd.DataFrame(crimes_per_month) crimes_per_month = crimes_per_month.sort_values(by = "month") crimes_per_month.plot(figsize = (20,10), kind = "line", x = "month", y = "crime_count", color = "red", linewidth = 8, legend = False) plt.xlabel("Month", fontsize = 18) plt.ylabel("Number of Crimes", fontsize = 18) plt.title("Number of Crimes Per Month", fontsize = 28) plt.xticks(size = 18) plt.yticks(size = 18) plt.show()
Where do most crimes take pace?
crime_location = crimes.groupBy("LocationDescription").count().collect() location = [item[0] for item in crime_location] count = [item[1] for item in crime_location] crime_location = {"location" : location, "count": count} crime_location = pd.DataFrame(crime_location) crime_location = crime_location.sort_values(by = "count", ascending = False) crime_location = crime_location.iloc[:20] myplot = crime_location .plot(figsize = (20,20), kind = "barh", color = "#b35900", width = 0.8, x = "location", y = "count", legend = False) myplot.invert_yaxis() plt.xlabel("Number of crimes", fontsize = 28) plt.ylabel("Crime Location", fontsize = 28) plt.title("Number of Crimes By Location", fontsize = 36) plt.xticks(size = 24) plt.yticks(size = 24) plt.show()
Which days have the highest number of crimes?
weekDaysCount = df.groupBy(["DayOfWeek", "DayOfWeek_number"]).count().collect() days = [item[0] for item in weekDaysCount] count = [item[2] for item in weekDaysCount] day_number = [item[1] for item in weekDaysCount] crime_byDay = {"days" : days, "count": count, "day_number": day_number} crime_byDay = pd.DataFrame(crime_byDay) crime_byDay = crime_byDay.sort_values(by = "day_number", ascending = True) crime_byDay.plot(figsize = (20,10), kind = "line", x = "days", y = "count", color = "red", linewidth = 8, legend = False) plt.ylabel("Number of Crimes", fontsize = 18) plt.xlabel("") plt.title("Number of Crimes by Day", fontsize = 28) plt.xticks(size = 18) plt.yticks(size = 18) plt.show()
We can also show the only number of domestic crimes by day
weekDaysCount = df.filter(df["Domestic"] == "true").groupBy(["DayOfWeek", "DayOfWeek_number"]).count().collect() days = [item[0] for item in weekDaysCount] count = [item[2] for item in weekDaysCount] day_number = [item[1] for item in weekDaysCount] crime_byDay = {"days" : days, "count": count, "day_number": day_number} crime_byDay = pd.DataFrame(crime_byDay) crime_byDay = crime_byDay.sort_values(by = "days", ascending = True) crime_byDay.plot(figsize = (20,10), kind = "line", x = "days", y = "count", color = "red", linewidth = 8, legend = False) plt.ylabel("Number of Domestic Crimes", fontsize = 18) plt.xlabel("") plt.title("Number of domestic crimes by day", fontsize = 28) plt.xticks(size = 18) plt.yticks(size = 18) plt.show()
Number of domestic crimes by hour
temp = df.filter(df["Domestic"] == "true") temp = temp.select(df['HourOfDay'].cast('int').alias('HourOfDay')) hourlyCount = temp.groupBy(["HourOfDay"]).count().collect() hours = [item[0] for item in hourlyCount] count = [item[1] for item in hourlyCount] crime_byHour = {"count": count, "hours": hours} crime_byHour = pd.DataFrame(crime_byHour) crime_byHour = crime_byHour.sort_values(by = "hours", ascending = True) crime_byHour.plot(figsize = (20,10), kind = "line", x = "hours", y = "count", color = "green", linewidth = 5, legend = False) plt.ylabel("Number of Domestic Crimes", fontsize = 18) plt.xlabel("Hour", fontsize = 18) plt.title("Number of domestic crimes by hour", fontsize = 28) plt.xticks(size = 18) plt.yticks(size = 18) plt.show()
We can also show number of domestic crimes by day and hour
import seaborn as sns temp = df.filter(df["Domestic"] == "true") temp = temp.select("DayOfWeek", df['HourOfDay'].cast('int').alias('HourOfDay')) hourlyCount = temp.groupBy(["DayOfWeek","HourOfDay"]).count().collect() days = [item[0] for item in hourlyCount] hours = [item[1] for item in hourlyCount] count = [item[2] for item in hourlyCount] crime_byHour = {"count": count, "hours": hours, "days": days} crime_byHour = pd.DataFrame(crime_byHour) crime_byHour = crime_byHour.sort_values(by = "hours", ascending = True) import seaborn as sns g = sns.FacetGrid(crime_byHour, hue="days", size = 12) g.map(plt.plot, "hours", "count", linewidth = 3) g.add_legend() plt.ylabel("Number of Domestic Crimes", fontsize = 18) plt.xlabel("Hour", fontsize = 18) plt.title("Number of domestic crimes by day and hour", fontsize = 28) plt.xticks(size = 18) plt.yticks(size = 18) plt.show()
Remarks
This is the second blog post on the Spark tutorial series to help big data enthusiasts prepare for Apache Spark Certifications from companies such as Cloudera, Hortonworks, Databricks, etc. The first one is here. If you want to learn/master Spark with Python or if you are preparing for a Spark Certification to show your skills in big data, these articles are for you.