This is the fifth tutorial on the Spark RDDs Vs DataFrames vs SparkSQL blog post series. The first one is available here. In the first part, we saw how to retrieve, sort and filter data using Spark RDDs, DataFrames and SparkSQL.

In the second part (here), we saw how to work with multiple tables in Spark the RDD way, the DataFrame way and with SparkSQL. In the third part (available here) of the blog post series, we performed web server log analysis using real-world text-based production logs. In the fourth part (available here), we saw set operators in Spark the RDD way, the DataFrame way and the SparkSQL way. In this part, we will see how to use functions (scalar, aggerage and window functions).

Also, check out my other recent blog posts on Spark on Analyzing the Bible and the Quran using Spark, Spark DataFrames: Exploring Chicago Crimes and Leveraging Hive with Spark using Python.
The data and the notebooks can be downloaded from my GitHub repository.

from pyspark import SparkContext, SparkConf
conf = SparkConf().setMaster("local[*]")
sc = SparkContext.getOrCreate(conf)
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

Changing a string to uppercase and rounding off a numeric field

Write a query to return the product name formatted as upper case and the weight of each product rounded to the nearest whole unit.

RDD way

Here, we use Python’s built-in functions round, int, float and the string method str.upper().

rdd = sc.textFile("SalesLTProduct.txt")
header = rdd.first()
content = rdd.filter(lambda line: line != header)\
          .filter(lambda line: line.split('\t')[7] != 'NULL')\ 
          .map(lambda line : (line.split('\t')[1].upper(), int(round(float(line.split('\t')[7]), 0))))
content.takeOrdered(10, lambda x: -x[1])
[('TOURING-3000 BLUE, 62', 13608),
 ('TOURING-3000 YELLOW, 62', 13608),
 ('TOURING-3000 BLUE, 58', 13562),
 ('TOURING-3000 YELLOW, 58', 13512),
 ('TOURING-3000 BLUE, 54', 13463),
 ('TOURING-3000 YELLOW, 54', 13345),
 ('TOURING-3000 YELLOW, 50', 13213),
 ('TOURING-3000 BLUE, 50', 13213),
 ('TOURING-3000 YELLOW, 44', 13050),
 ('TOURING-3000 BLUE, 44', 13050)]

DataFrame way

The functions we need from pyspark.sql module are imported below. Row can be used to create a row object by using named arguments. With Row we can create a DataFrame from an RDD using toDF. col returns a column based on the given column name.

from pyspark.sql.functions import col, round, upper
from pyspark.sql.types import *
from pyspark.sql import Row
df = rdd.filter(lambda line: line != header)\
          .filter(lambda line: line.split('\t')[7] != 'NULL')\
          .map(lambda line : Row(Name = line.split('\t')[1], Weight = line.split('\t')[7])).toDF()
        
        
df.withColumn("ApproxWeight", round(col("Weight").cast(DoubleType()), 0).cast(IntegerType()))\
.withColumn('Name',upper(col('Name'))).orderBy('ApproxWeight', ascending = False).show(10, truncate = False)
+-----------------------+--------+------------+
|Name                   |Weight  |ApproxWeight|
+-----------------------+--------+------------+
|TOURING-3000 YELLOW, 62|13607.70|13608       |
|TOURING-3000 BLUE, 62  |13607.70|13608       |
|TOURING-3000 BLUE, 58  |13562.34|13562       |
|TOURING-3000 YELLOW, 58|13512.45|13512       |
|TOURING-3000 BLUE, 54  |13462.55|13463       |
|TOURING-3000 YELLOW, 54|13344.62|13345       |
|TOURING-3000 BLUE, 50  |13213.08|13213       |
|TOURING-3000 YELLOW, 50|13213.08|13213       |
|TOURING-3000 BLUE, 44  |13049.78|13050       |
|TOURING-3000 YELLOW, 44|13049.78|13050       |
+-----------------------+--------+------------+
only showing top 10 rows

SQL way

To perform SQL operations, we have to first register our dataframe as a temporary table. We can use registerTempTable or createOrReplaceTempView to register our dataframe as a temporary table with the provided name.

df.createOrReplaceTempView('df_table')
spark.sql('select  upper(Name), int(round(Weight)) as ApproxWeight\
      from df_table order by ApproxWeight desc limit 10').show(10, truncate = False)
+-----------------------+------------+
|upper(Name)            |ApproxWeight|
+-----------------------+------------+
|TOURING-3000 BLUE, 62  |13608       |
|TOURING-3000 YELLOW, 62|13608       |
|TOURING-3000 BLUE, 58  |13562       |
|TOURING-3000 YELLOW, 58|13512       |
|TOURING-3000 BLUE, 54  |13463       |
|TOURING-3000 YELLOW, 54|13345       |
|TOURING-3000 BLUE, 50  |13213       |
|TOURING-3000 YELLOW, 50|13213       |
|TOURING-3000 YELLOW, 44|13050       |
|TOURING-3000 BLUE, 44  |13050       |
+-----------------------+------------+

Retrieve the year and month in which products were first sold

Get the year and month in which Adventure Works started selling each product.

RDD way

Let’s use the datetime Python package to parse the dates and times column. Since we do not need the time, let’s exclude tha and work with the string part that includes the year, month and day. Once we parse the date, we can use the year and month attributes to get the year and month in which each product started selling.

from datetime import datetime
def get_year(x):
     return datetime.strptime(x[:10], '%Y-%m-%d').year
def get_month(x):
        return datetime.strptime(x[:10], '%Y-%m-%d').month
content = rdd.filter(lambda line: line != header)\
          .filter(lambda line: line.split('\t')[7] != 'NULL')\
          .map(lambda line : (line.split('\t')[1].upper(), int(round(float(line.split('\t')[7]), 0)),
                              line.split('\t')[10], get_year(line.split('\t')[10]),  get_month(line.split('\t')[10])))
content.takeOrdered(10, lambda x: -x[1])
[('TOURING-3000 BLUE, 62', 13608, '2003-07-01 00:00:00.000', 2003, 7),
 ('TOURING-3000 YELLOW, 62', 13608, '2003-07-01 00:00:00.000', 2003, 7),
 ('TOURING-3000 BLUE, 58', 13562, '2003-07-01 00:00:00.000', 2003, 7),
 ('TOURING-3000 YELLOW, 58', 13512, '2003-07-01 00:00:00.000', 2003, 7),
 ('TOURING-3000 BLUE, 54', 13463, '2003-07-01 00:00:00.000', 2003, 7),
 ('TOURING-3000 YELLOW, 54', 13345, '2003-07-01 00:00:00.000', 2003, 7),
 ('TOURING-3000 YELLOW, 50', 13213, '2003-07-01 00:00:00.000', 2003, 7),
 ('TOURING-3000 BLUE, 50', 13213, '2003-07-01 00:00:00.000', 2003, 7),
 ('TOURING-3000 YELLOW, 44', 13050, '2003-07-01 00:00:00.000', 2003, 7),
 ('TOURING-3000 BLUE, 44', 13050, '2003-07-01 00:00:00.000', 2003, 7)]

DataFrame way

We can use the above python function to use with DataFrames by using the udf pyspark.sql functions.

from pyspark.sql.functions import udf
year_udf  = udf(get_year, IntegerType())
month_udf  = udf(get_month, IntegerType())
df = rdd.filter(lambda line: line != header)\
          .filter(lambda line: line.split('\t')[7] != 'NULL')\
          .map(lambda line : Row(Name = line.split('\t')[1], Weight = line.split('\t')[7],
                                SellStartDate = line.split('\t')[10])).toDF()  
df.withColumn("ApproxWeight", round(col("Weight").cast(DoubleType()), 0).cast(IntegerType()))\
.withColumn('Name',upper(col('Name'))).withColumn('SellStartYear',year_udf(col('SellStartDate')))\
.withColumn('SellStartMonth',month_udf(col('SellStartDate'))).orderBy('ApproxWeight', ascending = False).show(10, truncate = False)
+-----------------------+-----------------------+--------+------------+-------------+--------------+
|Name                   |SellStartDate          |Weight  |ApproxWeight|SellStartYear|SellStartMonth|
+-----------------------+-----------------------+--------+------------+-------------+--------------+
|TOURING-3000 YELLOW, 62|2003-07-01 00:00:00.000|13607.70|13608       |2003         |7             |
|TOURING-3000 BLUE, 62  |2003-07-01 00:00:00.000|13607.70|13608       |2003         |7             |
|TOURING-3000 BLUE, 58  |2003-07-01 00:00:00.000|13562.34|13562       |2003         |7             |
|TOURING-3000 YELLOW, 58|2003-07-01 00:00:00.000|13512.45|13512       |2003         |7             |
|TOURING-3000 BLUE, 54  |2003-07-01 00:00:00.000|13462.55|13463       |2003         |7             |
|TOURING-3000 YELLOW, 54|2003-07-01 00:00:00.000|13344.62|13345       |2003         |7             |
|TOURING-3000 BLUE, 50  |2003-07-01 00:00:00.000|13213.08|13213       |2003         |7             |
|TOURING-3000 YELLOW, 50|2003-07-01 00:00:00.000|13213.08|13213       |2003         |7             |
|TOURING-3000 BLUE, 44  |2003-07-01 00:00:00.000|13049.78|13050       |2003         |7             |
|TOURING-3000 YELLOW, 44|2003-07-01 00:00:00.000|13049.78|13050       |2003         |7             |
+-----------------------+-----------------------+--------+------------+-------------+--------------+
only showing top 10 rows

SQL way

Let’s register our table as a temporary table and then we can use year and month Hive functions to get the year and month in which the product was started selling.

df.createOrReplaceTempView('df_table2')
df.printSchema()
root
 |-- Name: string (nullable = true)
 |-- SellStartDate: string (nullable = true)
 |-- Weight: string (nullable = true)
spark.sql('select  upper(Name) as Name, int(round(Weight)) as ApproxWeight, SellStartDate,\
      year(SellStartDate) as SellStartYear, month(SellStartDate) as SellStartMonth\
     from df_table2 order by ApproxWeight desc limit 10').show(10, truncate = False)
+-----------------------+------------+-----------------------+-------------+--------------+
|Name                   |ApproxWeight|SellStartDate          |SellStartYear|SellStartMonth|
+-----------------------+------------+-----------------------+-------------+--------------+
|TOURING-3000 YELLOW, 62|13608       |2003-07-01 00:00:00.000|2003         |7             |
|TOURING-3000 BLUE, 62  |13608       |2003-07-01 00:00:00.000|2003         |7             |
|TOURING-3000 BLUE, 58  |13562       |2003-07-01 00:00:00.000|2003         |7             |
|TOURING-3000 YELLOW, 58|13512       |2003-07-01 00:00:00.000|2003         |7             |
|TOURING-3000 BLUE, 54  |13463       |2003-07-01 00:00:00.000|2003         |7             |
|TOURING-3000 YELLOW, 54|13345       |2003-07-01 00:00:00.000|2003         |7             |
|TOURING-3000 BLUE, 50  |13213       |2003-07-01 00:00:00.000|2003         |7             |
|TOURING-3000 YELLOW, 50|13213       |2003-07-01 00:00:00.000|2003         |7             |
|TOURING-3000 BLUE, 44  |13050       |2003-07-01 00:00:00.000|2003         |7             |
|TOURING-3000 YELLOW, 44|13050       |2003-07-01 00:00:00.000|2003         |7             |
+-----------------------+------------+-----------------------+-------------+--------------+

String Extraction

The leftmost two characters show the product type. Now, let’s extract the first two characters and include to our previous query.

RDD way

rdd = sc.textFile("SalesLTProduct.txt")
header = rdd.first()
content = rdd.filter(lambda line: line != header)\
          .filter(lambda line: line.split('\t')[7] != 'NULL')\
          .map(lambda line : (line.split('\t')[1].upper(), line.split('\t')[2][:2], int(round(float(line.split('\t')[7]), 0)),
                              line.split('\t')[10], get_year(line.split('\t')[10]),  get_month(line.split('\t')[10])))
content.takeOrdered(10, lambda x: -x[2])
[('TOURING-3000 BLUE, 62', 'BK', 13608, '2003-07-01 00:00:00.000', 2003, 7),
 ('TOURING-3000 YELLOW, 62', 'BK', 13608, '2003-07-01 00:00:00.000', 2003, 7),
 ('TOURING-3000 BLUE, 58', 'BK', 13562, '2003-07-01 00:00:00.000', 2003, 7),
 ('TOURING-3000 YELLOW, 58', 'BK', 13512, '2003-07-01 00:00:00.000', 2003, 7),
 ('TOURING-3000 BLUE, 54', 'BK', 13463, '2003-07-01 00:00:00.000', 2003, 7),
 ('TOURING-3000 YELLOW, 54', 'BK', 13345, '2003-07-01 00:00:00.000', 2003, 7),
 ('TOURING-3000 YELLOW, 50', 'BK', 13213, '2003-07-01 00:00:00.000', 2003, 7),
 ('TOURING-3000 BLUE, 50', 'BK', 13213, '2003-07-01 00:00:00.000', 2003, 7),
 ('TOURING-3000 YELLOW, 44', 'BK', 13050, '2003-07-01 00:00:00.000', 2003, 7),
 ('TOURING-3000 BLUE, 44', 'BK', 13050, '2003-07-01 00:00:00.000', 2003, 7)]

DataFrame

The below simple udf helps to extract the first two letters.

left_two = udf(lambda x: x[:2])
df  =  rdd.filter(lambda line: line != header)\
          .filter(lambda line: line.split('\t')[7] != 'NULL')\
          .map(lambda line : Row(Name = line.split('\t')[1], ProductNumber = line.split('\t')[2], 
                                 Weight = line.split('\t')[7],
                                SellStartDate = line.split('\t')[10])).toDF()  
df.withColumn("ApproxWeight", round(col("Weight").cast(DoubleType()), 0).cast(IntegerType()))\
.withColumn('ProductNumber', left_two(col('ProductNumber')))\
.withColumn('Name',upper(col('Name'))).withColumn('SellStartYear',year_udf(col('SellStartDate')))\
.withColumn('SellStartMonth', month_udf(col('SellStartDate'))).orderBy('ApproxWeight', ascending = False).show(10, truncate = False)
+-----------------------+-------------+-----------------------+--------+------------+-------------+--------------+
|Name                   |ProductNumber|SellStartDate          |Weight  |ApproxWeight|SellStartYear|SellStartMonth|
+-----------------------+-------------+-----------------------+--------+------------+-------------+--------------+
|TOURING-3000 BLUE, 62  |BK           |2003-07-01 00:00:00.000|13607.70|13608       |2003         |7             |
|TOURING-3000 YELLOW, 62|BK           |2003-07-01 00:00:00.000|13607.70|13608       |2003         |7             |
|TOURING-3000 BLUE, 58  |BK           |2003-07-01 00:00:00.000|13562.34|13562       |2003         |7             |
|TOURING-3000 YELLOW, 58|BK           |2003-07-01 00:00:00.000|13512.45|13512       |2003         |7             |
|TOURING-3000 BLUE, 54  |BK           |2003-07-01 00:00:00.000|13462.55|13463       |2003         |7             |
|TOURING-3000 YELLOW, 54|BK           |2003-07-01 00:00:00.000|13344.62|13345       |2003         |7             |
|TOURING-3000 YELLOW, 50|BK           |2003-07-01 00:00:00.000|13213.08|13213       |2003         |7             |
|TOURING-3000 BLUE, 50  |BK           |2003-07-01 00:00:00.000|13213.08|13213       |2003         |7             |
|TOURING-3000 BLUE, 44  |BK           |2003-07-01 00:00:00.000|13049.78|13050       |2003         |7             |
|TOURING-3000 YELLOW, 44|BK           |2003-07-01 00:00:00.000|13049.78|13050       |2003         |7             |
+-----------------------+-------------+-----------------------+--------+------------+-------------+--------------+
only showing top 10 rows

SQL

We can use the Hive function substr to extract the first two letters. Let’s first see what this function does.

spark.sql('describe function substr').show(truncate = False)
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|function_desc                                                                                                                                                                       |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Function: substr                                                                                                                                                                    |
|Class: org.apache.spark.sql.catalyst.expressions.Substring                                                                                                                          |
|Usage: substr(str, pos[, len]) - Returns the substring of `str` that starts at `pos` and is of length `len`, or the slice of byte array that starts at `pos` and is of length `len`.|
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Now, let’s register our table as a temporary table and perform our SQL operations.

df.createOrReplaceTempView('df_table3')
spark.sql('select  upper(Name) as Name, substr(ProductNumber, 1, 2) as ProductNumber,  int(round(Weight)) as ApproxWeight, SellStartDate,\
      year(SellStartDate) as SellStartYear, month(SellStartDate) as SellStartMonth\
     from df_table3 order by ApproxWeight desc limit 10').show(10, truncate = False)
+-----------------------+-------------+------------+-----------------------+-------------+--------------+
|Name                   |ProductNumber|ApproxWeight|SellStartDate          |SellStartYear|SellStartMonth|
+-----------------------+-------------+------------+-----------------------+-------------+--------------+
|TOURING-3000 BLUE, 62  |BK           |13608       |2003-07-01 00:00:00.000|2003         |7             |
|TOURING-3000 YELLOW, 62|BK           |13608       |2003-07-01 00:00:00.000|2003         |7             |
|TOURING-3000 BLUE, 58  |BK           |13562       |2003-07-01 00:00:00.000|2003         |7             |
|TOURING-3000 YELLOW, 58|BK           |13512       |2003-07-01 00:00:00.000|2003         |7             |
|TOURING-3000 BLUE, 54  |BK           |13463       |2003-07-01 00:00:00.000|2003         |7             |
|TOURING-3000 YELLOW, 54|BK           |13345       |2003-07-01 00:00:00.000|2003         |7             |
|TOURING-3000 BLUE, 50  |BK           |13213       |2003-07-01 00:00:00.000|2003         |7             |
|TOURING-3000 YELLOW, 50|BK           |13213       |2003-07-01 00:00:00.000|2003         |7             |
|TOURING-3000 YELLOW, 44|BK           |13050       |2003-07-01 00:00:00.000|2003         |7             |
|TOURING-3000 BLUE, 44  |BK           |13050       |2003-07-01 00:00:00.000|2003         |7             |
+-----------------------+-------------+------------+-----------------------+-------------+--------------+

Retrieve only products with a numeric size

The size field can be numeric such as 48, 58, etc or it can be letters such as M for medium, S for short and L for Large. Now, let’s extend our query to filter the product returned so that only products with a numeric size are included.

RDD way

There could be different ways of doing this but I am using the re module to use regular expression matching. I personally prefer regular expressions because they help to solve a wider set of problems.

import re
def isNumeric(x):
    return bool(re.match('[0-9]', x))
content = rdd.filter(lambda line: line != header)\
          .filter(lambda line: line.split('\t')[7] != 'NULL')\
          .filter(lambda line: isNumeric(line.split('\t')[6]))\
          .map(lambda line : (line.split('\t')[1].upper(), line.split('\t')[2][:2], \
                              int(round(float(line.split('\t')[7]), 0)), line.split('\t')[6], \
                              get_year(line.split('\t')[10]),  get_month(line.split('\t')[10])))
content.takeOrdered(10, lambda x: -x[2])
[('TOURING-3000 BLUE, 62', 'BK', 13608, '62', 2003, 7),
 ('TOURING-3000 YELLOW, 62', 'BK', 13608, '62', 2003, 7),
 ('TOURING-3000 BLUE, 58', 'BK', 13562, '58', 2003, 7),
 ('TOURING-3000 YELLOW, 58', 'BK', 13512, '58', 2003, 7),
 ('TOURING-3000 BLUE, 54', 'BK', 13463, '54', 2003, 7),
 ('TOURING-3000 YELLOW, 54', 'BK', 13345, '54', 2003, 7),
 ('TOURING-3000 YELLOW, 50', 'BK', 13213, '50', 2003, 7),
 ('TOURING-3000 BLUE, 50', 'BK', 13213, '50', 2003, 7),
 ('TOURING-3000 YELLOW, 44', 'BK', 13050, '44', 2003, 7),
 ('TOURING-3000 BLUE, 44', 'BK', 13050, '44', 2003, 7)]

We change a Python function to a user defined function (UDF) by passing the output data type.

is_Numeric = udf(isNumeric, BooleanType() )
df  =  rdd.filter(lambda line: line != header)\
           .filter(lambda line: line.split('\t')[7] != 'NULL')\
           .map(lambda line : Row(Name = line.split('\t')[1], ProductNumber = line.split('\t')[2],  
                                  size = line.split('\t')[6], Weight = line.split('\t')[7],
                                SellStartDate = line.split('\t')[10])).toDF()
df.withColumn("ApproxWeight", round(col("Weight").cast(DoubleType()), 0).cast(IntegerType()))\
.filter(is_Numeric(col('size')))\
.withColumn('ProductNumber', left_two(col('ProductNumber')))\
.withColumn('Name',upper(col('Name'))).withColumn('SellStartYear',year_udf(col('SellStartDate')))\
.withColumn('SellStartMonth', month_udf(col('SellStartDate'))).orderBy('ApproxWeight', ascending = False).show(10, truncate = False)
+-----------------------+-------------+-----------------------+--------+----+------------+-------------+--------------+
|Name                   |ProductNumber|SellStartDate          |Weight  |size|ApproxWeight|SellStartYear|SellStartMonth|
+-----------------------+-------------+-----------------------+--------+----+------------+-------------+--------------+
|TOURING-3000 YELLOW, 62|BK           |2003-07-01 00:00:00.000|13607.70|62  |13608       |2003         |7             |
|TOURING-3000 BLUE, 62  |BK           |2003-07-01 00:00:00.000|13607.70|62  |13608       |2003         |7             |
|TOURING-3000 BLUE, 58  |BK           |2003-07-01 00:00:00.000|13562.34|58  |13562       |2003         |7             |
|TOURING-3000 YELLOW, 58|BK           |2003-07-01 00:00:00.000|13512.45|58  |13512       |2003         |7             |
|TOURING-3000 BLUE, 54  |BK           |2003-07-01 00:00:00.000|13462.55|54  |13463       |2003         |7             |
|TOURING-3000 YELLOW, 54|BK           |2003-07-01 00:00:00.000|13344.62|54  |13345       |2003         |7             |
|TOURING-3000 BLUE, 50  |BK           |2003-07-01 00:00:00.000|13213.08|50  |13213       |2003         |7             |
|TOURING-3000 YELLOW, 50|BK           |2003-07-01 00:00:00.000|13213.08|50  |13213       |2003         |7             |
|TOURING-3000 BLUE, 44  |BK           |2003-07-01 00:00:00.000|13049.78|44  |13050       |2003         |7             |
|TOURING-3000 YELLOW, 44|BK           |2003-07-01 00:00:00.000|13049.78|44  |13050       |2003         |7             |
+-----------------------+-------------+-----------------------+--------+----+------------+-------------+--------------+
only showing top 10 rows

SQL

df.createOrReplaceTempView('df_table4')

In Hive to check weather a field is numeric or not is to cast that column to numeric and see if the output is numeric. Non-numeric fields will be NULL and we can easily filter them aout as shown below.

spark.sql('select  upper(Name) as Name,  substr(ProductNumber, 1, 2) as ProductNumber, size, int(round(Weight)) as ApproxWeight, SellStartDate,\
      year(SellStartDate) as SellStartYear, month(SellStartDate) as SellStartMonth\
     from df_table4 where cast(size as double) is not null order by ApproxWeight desc limit 10').show(10, truncate = False)
+-----------------------+-------------+----+------------+-----------------------+-------------+--------------+
|Name                   |ProductNumber|size|ApproxWeight|SellStartDate          |SellStartYear|SellStartMonth|
+-----------------------+-------------+----+------------+-----------------------+-------------+--------------+
|TOURING-3000 YELLOW, 62|BK           |62  |13608       |2003-07-01 00:00:00.000|2003         |7             |
|TOURING-3000 BLUE, 62  |BK           |62  |13608       |2003-07-01 00:00:00.000|2003         |7             |
|TOURING-3000 BLUE, 58  |BK           |58  |13562       |2003-07-01 00:00:00.000|2003         |7             |
|TOURING-3000 YELLOW, 58|BK           |58  |13512       |2003-07-01 00:00:00.000|2003         |7             |
|TOURING-3000 BLUE, 54  |BK           |54  |13463       |2003-07-01 00:00:00.000|2003         |7             |
|TOURING-3000 YELLOW, 54|BK           |54  |13345       |2003-07-01 00:00:00.000|2003         |7             |
|TOURING-3000 BLUE, 50  |BK           |50  |13213       |2003-07-01 00:00:00.000|2003         |7             |
|TOURING-3000 YELLOW, 50|BK           |50  |13213       |2003-07-01 00:00:00.000|2003         |7             |
|TOURING-3000 BLUE, 44  |BK           |44  |13050       |2003-07-01 00:00:00.000|2003         |7             |
|TOURING-3000 YELLOW, 44|BK           |44  |13050       |2003-07-01 00:00:00.000|2003         |7             |
+-----------------------+-------------+----+------------+-----------------------+-------------+--------------+

Window Functions

Let’s use Spark SQL and DataFrame APIs ro retrieve companies ranked by sales totals from the SalesOrderHeader and SalesLTCustomer tables. We will display the first 10 rows from the solution using each method to just compare our answers to make sure we are doing it right.

orderHeader = sc.textFile("SalesLTSalesOrderHeader.txt")
header = orderHeader.first()
orderHeader_rdd =   orderHeader.filter(lambda line: line != header)
orderHeader_df = orderHeader_rdd.map(lambda line: Row(CustomerID = line.split('\t')[10],
                                                     TotalDue = float(line.split('\t')[-4] ))).toDF()
customer = sc.textFile("SalesLTCustomer.txt")
header = customer.first()
customer_rdd =   customer.filter(lambda line: line != header)
customer_df = customer_rdd.map(lambda line: Row(CustomerID = line.split('\t')[0],
                                            CompanyName = line.split('\t')[7])).toDF()
customer_df.printSchema()
root
 |-- CompanyName: string (nullable = true)
 |-- CustomerID: string (nullable = true)

orderHeader_df.printSchema()
root
 |-- CustomerID: string (nullable = true)
 |-- TotalDue: double (nullable = true)

DataFrame way

rank returns the rank of rows within a window partition. We use Window to specify the columns for partitioning and ordering.

from pyspark.sql.functions import col
from pyspark.sql.functions import rank
from pyspark.sql.window import Window
windowspecs = Window.orderBy(-col('TotalDue'))
df = customer_df.join(orderHeader_df, 'CustomerID','inner')\
.select('CompanyName','TotalDue')
df.withColumn('rank', rank().over(windowspecs)).show(10, truncate = False)
+------------------------------+-----------+----+
|CompanyName                   |TotalDue   |rank|
+------------------------------+-----------+----+
|Action Bicycle Specialists    |119960.824 |1   |
|Metropolitan Bicycle Supply   |108597.9536|2   |
|Bulk Discount Store           |98138.2131 |3   |
|Eastside Department Store     |92663.5609 |4   |
|Riding Cycles                 |86222.8072 |5   |
|Many Bikes Store              |81834.9826 |6   |
|Instruments and Parts Company |70698.9922 |7   |
|Extreme Riding Supplies       |63686.2708 |8   |
|Trailblazing Sports           |45992.3665 |9   |
|Professional Sales and Service|43962.7901 |10  |
+------------------------------+-----------+----+
only showing top 10 rows

SQL

Here, we can use the normal SQL rank function.

customer_df.createOrReplaceTempView('customer_table')
orderHeader_df.createOrReplaceTempView('orderHeader_table')
spark.sql('select c.CompanyName, o.TotalDue,\
Rank() OVER (ORDER BY TotalDue DESC ) AS RankByRevenue FROM customer_table AS c \
INNER JOIN orderHeader_table AS o ON c.CustomerID = o.CustomerID').show(10, truncate = False)
+------------------------------+-----------+-------------+
|CompanyName                   |TotalDue   |RankByRevenue|
+------------------------------+-----------+-------------+
|Action Bicycle Specialists    |119960.824 |1            |
|Metropolitan Bicycle Supply   |108597.9536|2            |
|Bulk Discount Store           |98138.2131 |3            |
|Eastside Department Store     |92663.5609 |4            |
|Riding Cycles                 |86222.8072 |5            |
|Many Bikes Store              |81834.9826 |6            |
|Instruments and Parts Company |70698.9922 |7            |
|Extreme Riding Supplies       |63686.2708 |8            |
|Trailblazing Sports           |45992.3665 |9            |
|Professional Sales and Service|43962.7901 |10           |
+------------------------------+-----------+-------------+
only showing top 10 rows

Write a query to retrieve a list of the product names and the total revenue

calculated as the sum of the LineTotal from the SalesLT.SalesOrderDetail table, with the results sorted in descending order of total revenue.

Let’s display 10 records from each method to compare the answers.

RDD way

orderDetail = sc.textFile("SalesLTSalesOrderDetail.txt")
header1 = orderDetail.first()
orderDetail_rdd = orderDetail.filter(lambda line: line != header1 )\
                  .map(lambda line: (line.split('\t')[3], float(line.split('\t')[6])))
product = sc.textFile("SalesLTProduct.txt")
header2 = product.first()
product_rdd = product.filter(lambda line: line != header2)\
            .map(lambda line: (line.split('\t')[0], (line.split('\t')[1], float(line.split('\t')[5]))))
product_rdd.join(orderDetail_rdd)\
.map(lambda line: (line[1][0][0],line[1][1]))\
.reduceByKey(lambda a, b: a+b)\
.sortBy(lambda x: -x[1])\
.take(10)
[('Touring-1000 Blue, 60', 37191.492),
 ('Mountain-200 Black, 42', 37178.838),
 ('Road-350-W Yellow, 48', 36486.2355),
 ('Mountain-200 Black, 38', 35801.844),
 ('Touring-1000 Yellow, 60', 23413.474656),
 ('Touring-1000 Blue, 50', 22887.072),
 ('Mountain-200 Silver, 42', 20879.91),
 ('Road-350-W Yellow, 40', 20411.88),
 ('Mountain-200 Black, 46', 19277.916),
 ('Road-350-W Yellow, 42', 18692.519308)]

DataFrame way

orderDetail_df = orderDetail.filter(lambda line: line != header1 )\
                             .map(lambda line: Row(ProductID = line.split('\t')[3],
                                                 LineTotal = float(line.split('\t')[6]))).toDF()
product_df = product.filter(lambda line: line != header2)\
                     .map(lambda line: Row(ProductID = line.split('\t')[0], 
                                             Name = line.split('\t')[1],
                                             ListPrice = float(line.split('\t')[5]))).toDF()
orderDetail_df.printSchema()
root
 |-- LineTotal: double (nullable = true)
 |-- ProductID: string (nullable = true)
product_df.printSchema()
root
 |-- ListPrice: double (nullable = true)
 |-- Name: string (nullable = true)
 |-- ProductID: string (nullable = true)
product_df.join(orderDetail_df,'ProductID','inner')\
.groupBy('Name').agg({'LineTotal':'sum'}).orderBy('sum(LineTotal)', ascending = False)\
.show(10, truncate = False)
+-----------------------+--------------+
|Name                   |sum(LineTotal)|
+-----------------------+--------------+
|Touring-1000 Blue, 60  |37191.492     |
|Mountain-200 Black, 42 |37178.838     |
|Road-350-W Yellow, 48  |36486.2355    |
|Mountain-200 Black, 38 |35801.844     |
|Touring-1000 Yellow, 60|23413.474656  |
|Touring-1000 Blue, 50  |22887.072     |
|Mountain-200 Silver, 42|20879.91      |
|Road-350-W Yellow, 40  |20411.88      |
|Mountain-200 Black, 46 |19277.916     |
|Road-350-W Yellow, 42  |18692.519308  |
+-----------------------+--------------+
only showing top 10 rows

SQL

orderDetail_df.createOrReplaceTempView('orderDetail_table')
product_df.createOrReplaceTempView('product_df_table')
spark.sql('SELECT Name,SUM(LineTotal) AS TotalRevenue FROM product_df_table AS p \
INNER JOIN orderDetail_table AS o ON o.ProductID = p.ProductID \
GROUP BY Name ORDER BY TotalRevenue DESC').show(10, truncate = False)
+-----------------------+------------+
|Name                   |TotalRevenue|
+-----------------------+------------+
|Touring-1000 Blue, 60  |37191.492   |
|Mountain-200 Black, 42 |37178.838   |
|Road-350-W Yellow, 48  |36486.2355  |
|Mountain-200 Black, 38 |35801.844   |
|Touring-1000 Yellow, 60|23413.474656|
|Touring-1000 Blue, 50  |22887.072   |
|Mountain-200 Silver, 42|20879.91    |
|Road-350-W Yellow, 40  |20411.88    |
|Mountain-200 Black, 46 |19277.916   |
|Road-350-W Yellow, 42  |18692.519308|
+-----------------------+------------+
only showing top 10 rows

Modify the previous query to include sales totals for products that have a list price of more than $1000

RDD way

orderDetail = sc.textFile("SalesLTSalesOrderDetail.txt")
header1 = orderDetail.first()
orderDetail_rdd = orderDetail.filter(lambda line: line != header1 )\
                  .map(lambda line: (line.split('\t')[3], float(line.split('\t')[6])))
product = sc.textFile("SalesLTProduct.txt")
header2 = product.first()
product_rdd = product.filter(lambda line: line != header2)\
              .filter(lambda line: float(line.split('\t')[5]) > 1000)\
              .map(lambda line: (line.split('\t')[0], (line.split('\t')[1], float(line.split('\t')[5]))))
rdd_answer = product_rdd.join(orderDetail_rdd)\
.map(lambda line: (line[1][0][0],line[1][1]))\
.reduceByKey(lambda a, b: a + b)\
.sortBy(lambda x: -x[1])

DataFrame way

orderDetail_df = orderDetail.filter(lambda line: line != header1 )\
                             .map(lambda line: Row(ProductID = line.split('\t')[3],
                                                 LineTotal = float(line.split('\t')[6]))).toDF()
product_df = product.filter(lambda line: line != header2)\
                     .map(lambda line: Row(ProductID = line.split('\t')[0], 
                                             Name = line.split('\t')[1],
                                             ListPrice = float(line.split('\t')[5]))).toDF()
df_answer = product_df.filter(product_df.ListPrice > 1000)\
           .join(orderDetail_df,'ProductID','inner')\
           .groupBy('Name').agg({'LineTotal':'sum'}).orderBy('sum(LineTotal)', ascending = False)

SQL

sql_answer = spark.sql('SELECT Name,SUM(LineTotal) AS TotalRevenue FROM product_df_table AS p \
INNER JOIN orderDetail_table AS o ON o.ProductID = p.ProductID \
WHERE p.ListPrice > 1000 GROUP BY Name ORDER BY TotalRevenue DESC')

Filter the product sales groups to include only total sales over 20,000

Modify the previous query to only include only product groups with a total sales value greater than 20,000 dollars.

RDD way

for i in rdd_answer.filter(lambda line: line[1] > 20000).collect():
  print(i)
('Touring-1000 Blue, 60', 37191.492)
('Mountain-200 Black, 42', 37178.838)
('Road-350-W Yellow, 48', 36486.2355)
('Mountain-200 Black, 38', 35801.844)
('Touring-1000 Yellow, 60', 23413.474656)
('Touring-1000 Blue, 50', 22887.072)
('Mountain-200 Silver, 42', 20879.91)
('Road-350-W Yellow, 40', 20411.88)

DataFrame

for i in df_answer.filter(df_answer['sum(LineTotal)'] > 20000).collect():
    print((i[0],i[1]))
('Touring-1000 Blue, 60', 37191.492)
('Mountain-200 Black, 42', 37178.838)
('Road-350-W Yellow, 48', 36486.2355)
('Mountain-200 Black, 38', 35801.844)
('Touring-1000 Yellow, 60', 23413.474656)
('Touring-1000 Blue, 50', 22887.072)
('Mountain-200 Silver, 42', 20879.91)
('Road-350-W Yellow, 40', 20411.88)

SQL

spark.sql('SELECT Name,SUM(LineTotal) AS TotalRevenue FROM product_df_table AS p \
INNER JOIN orderDetail_table AS o ON o.ProductID = p.ProductID \
WHERE p.ListPrice > 1000 GROUP BY Name \
HAVING SUM(LineTotal) > 20000 ORDER BY TotalRevenue DESC').show(truncate = False)
+-----------------------+------------+
|Name                   |TotalRevenue|
+-----------------------+------------+
|Touring-1000 Blue, 60  |37191.492   |
|Mountain-200 Black, 42 |37178.838   |
|Road-350-W Yellow, 48  |36486.2355  |
|Mountain-200 Black, 38 |35801.844   |
|Touring-1000 Yellow, 60|23413.474656|
|Touring-1000 Blue, 50  |22887.072   |
|Mountain-200 Silver, 42|20879.91    |
|Road-350-W Yellow, 40  |20411.88    |
+-----------------------+------------+

We see that the answers from the three approaches are the same.
This is enough for today. See you in the next part of the Spark with Python tutorial series.