This is the third tutorial on the Spark RDDs Vs DataFrames vs SparkSQL blog post series. The first one is available here. In the first part, we saw how to retrieve, sort and filter data using Spark RDDs, DataFrames and SparkSQL. In the second part (here), we saw how to work with multiple tables in Spark the RDD way, the DataFrame way and with SparkSQL. In this third part of the blog post series, we will perform web server log analysis using real-world text-based production logs. Log data can be used monitoring servers, improving business and customer intelligence, building recommendation systems, fraud detection, and much more. Server log analysis is a good use case for Spark. It’s a very large, common data source and contains a rich set of information.

If you like this tutorial series, check also my other recent blos posts on Spark on Analyzing the Bible and the Quran using Spark and Spark DataFrames: Exploring Chicago Crimes. The data and the notebooks can be downloaded from my GitHub repository.

The log files that we use for this assignment are in the Apache Common Log Format (CLF). The log file entries produced in CLF will look something like this:

127.0.0.1 – – [01/Aug/1995:00:00:01 -0400] “GET /images/launch-logo.gif HTTP/1.0” 200 1839 Each part of this log entry is described below.
127.0.0.1 This is the IP address (or host name, if available) of the client (remote host) which made the request to the server.
– The “hyphen” in the output indicates that the requested piece of information (user identity from remote machine) is not available.
– The “hyphen” in the output indicates that the requested piece of information (user identity from local logon) is not available.
[01/Aug/1995:00:00:01 -0400] the time that the server finished processing the request. The format is: [day/month/year:hour:minute:second timezone]
day = 2 digits
month = 3 letters
year = 4 digits
hour = 2 digits
minute = 2 digits
second = 2 digits
zone = (+ | -) 4 digits

“GET /images/launch-logo.gif HTTP/1.0” This is the first line of the request string from the client. It consists of a three components: the request method (e.g., GET, POST, etc.), the endpoint (a Uniform Resource Identifier), and the client protocol version.
200 This is the status code that the server sends back to the client. This information is very valuable, because it reveals whether the request resulted in a successful response (codes beginning in 2), a redirection (codes beginning in 3), an error caused by the client (codes beginning in 4), or an error in the server (codes beginning in 5).
1839 The last entry indicates the size of the object returned to the client, not including the response headers. If no content was returned to the client, this value will be “-” (or sometimes 0).

We will use a data set from NASA Kennedy Space Center WWW server in Florida. The full data set is freely available here and contains two months of all HTTP requests.

Let’s download the data. Since I am using Jupyter Notebook, ! helps us to run a shell command

! wget ftp://ita.ee.lbl.gov/traces/NASA_access_log_Jul95.gz
! wget ftp://ita.ee.lbl.gov/traces/NASA_access_log_Aug95.gzCopy

Create Spark Context and SQL Context

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
conf = SparkConf().setAppName("Spark-RDD-DataFrame_SQL").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf)
sqlcontext = SQLContext(sc)Copy

Create RDD

rdd = sc.textFile("NASA_access*")Copy

Show sample logs

for line in rdd.sample(withReplacement = False, fraction = 0.000001, seed = 120).collect():
    print(line)
    print("\n")Copy

Use regular expressions to extract the logs

import re
def parse_log1(line):
    match = re.search('^(\S+) (\S+) (\S+) \[(\S+) [-](\d{4})\] "(\S+)\s*(\S+)\s*(\S+)\s*([\w\.\s*]+)?\s*"*(\d{3}) (\S+)', line)
    if match is None:
        return 0
    else:
        return 1
n_logs = rdd.count()
failed = rdd.map(lambda line: parse_log1(line)).filter(lambda line: line == 0).count()
print('Out of {} logs, {} failed to parse'.format(n_logs,failed))
Out of 3461613 logs, 1685 failed to parseCopy

We see that 1685 out of the 3.5 million logs failed to parse. I took samples of the failed logs and tried to modify the above regular expression pattern as show below.

def parse_log2(line):
match = re.search('^(\S+) (\S+) (\S+) \[(\S+) [-](\d{4})\] "(\S+)\s*(\S+)\s*(\S+)\s*([/\w\.\s*]+)?\s*"* (\d{3}) (\S+)',line)
    if match is None:
        match = re.search('^(\S+) (\S+) (\S+) \[(\S+) [-](\d{4})\] "(\S+)\s*([/\w\.]+)>*([\w/\s\.]+)\s*(\S+)\s*(\d{3})\s*(\S+)',line)
    if match is None:
        return (line, 0)
    else:
        return (line, 1)
failed = rdd.map(lambda line: parse_log2(line)).filter(lambda line: line[1] == 0).count()
print('Out of {} logs, {} failed to parse'.format(n_logs,failed))
Out of 3738455 logs, 1253 failed to parseCopy

Still, 1253 of them failed to parse. However, since we have successfully parsed more than 99.9% of the data, we can work with what we have parsed. You can play with the regular expression pattern to match all of the data :).

Extract the 11 elements from each log

def map_log(line):
match = re.search('^(\S+) (\S+) (\S+) \[(\S+) [-](\d{4})\] "(\S+)\s*(\S+)\s*(\S+)\s*([/\w\.\s*]+)?\s*"* (\d{3}) (\S+)',line)
    if match is None:
        match = re.search('^(\S+) (\S+) (\S+) \[(\S+) [-](\d{4})\] "(\S+)\s*([/\w\.]+)>*([\w/\s\.]+)\s*(\S+)\s*(\d{3})\s*(\S+)',line)
return(match.groups())
parsed_rdd = rdd.map(lambda line: parse_log2(line)).filter(lambda line: line[1] == 1).map(lambda line : line[0])
parsed_rdd2 = parsed_rdd.map(lambda line: map_log(line))Copy

Now, let’s try to answer some questions.

Find the 10 most common IP addresses (or host name, if available) of the client (remote host) which made the request to the server

RDD way

result = parsed_rdd2.map(lambda line: (line[0],1)).reduceByKey(lambda a, b: a + b).takeOrdered(10, lambda x: -x[1])
result
[('piweba3y.prodigy.com', 21988),
 ('piweba4y.prodigy.com', 16437),
 ('piweba1y.prodigy.com', 12825),
 ('edams.ksc.nasa.gov', 11964),
 ('163.206.89.4', 9697),
 ('news.ti.com', 8161),
 ('www-d1.proxy.aol.com', 8047),
 ('alyssa.prodigy.com', 8037),
 ('siltb10.orl.mmc.com', 7573),
 ('www-a2.proxy.aol.com', 7516)]Copy

We can also use Pandas and Matplotlib to creata a viz.

import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
Host = [x[0] for x in result]
count = [x[1] for x in result]
Host_count_dct = {'Host':Host, 'count':count}
Host_count_df = pd.DataFrame(Host_count_dct )
myplot = Host_count_df.plot(figsize = (12,8), kind = "barh", color = "#7a7a52", width = 0.8,
                               x = "Host", y = "count", legend = False)
myplot.invert_yaxis()
plt.xlabel("Count", fontsize = 16)
plt.ylabel("Host", fontsize = 16)
plt.title("Most common hosts  ", fontsize = 20, color = 'b')
plt.xticks(size = 14)
plt.yticks(size = 14)
plt.show()Copy

Gives this plot:

DataFrame way

parsed_df = sqlcontext.createDataFrame(parsed_rdd2,
                                       schema = ['host','identity_remote', 'identity_local','date_time',
                                                 'time_zone','request_method','endpoint','client_protocol','mis',
                                                 'status_code','size_returned'], samplingRatio = 0.1)
parsed_df.printSchema()
root
 |-- host: string (nullable = true)
 |-- identity_remote: string (nullable = true)
 |-- identity_local: string (nullable = true)
 |-- date_time: string (nullable = true)
 |-- time_zone: string (nullable = true)
 |-- request_method: string (nullable = true)
 |-- endpoint: string (nullable = true)
 |-- client_protocol: string (nullable = true)
 |-- mis: string (nullable = true)
 |-- status_code: string (nullable = true)
 |-- size_returned: string (nullable = true)Copy
parsed_df.groupBy('host').count().orderBy('count', ascending = False).show(10, truncate = False)
+--------------------+-----+
|host                |count|
+--------------------+-----+
|piweba3y.prodigy.com|21988|
|piweba4y.prodigy.com|16437|
|piweba1y.prodigy.com|12825|
|edams.ksc.nasa.gov  |11964|
|163.206.89.4        |9697 |
|news.ti.com         |8161 |
|www-d1.proxy.aol.com|8047 |
|alyssa.prodigy.com  |8037 |
|siltb10.orl.mmc.com |7573 |
|www-a2.proxy.aol.com|7516 |
+--------------------+-----+
only showing top 10 rowsCopy

Find statistics of the size of the object returned to the client

RDD way

def convert_long(x):
    x = re.sub('[^0-9]',"",x) 
    if x =="":
        return 0
    else:
        return int(x)
parsed_rdd2.map(lambda line: convert_long(line[-1])).stats()
(count: 3460360, mean: 18935.441238194595, stdev: 73043.8640344, max: 6823936.0, min: 0.0)Copy

DataFrame way
Here, we can use functions from pyspark.

from pyspark.sql.functions import mean, udf, col, min, max, stddev, count
from pyspark.sql.types import DoubleType, IntegerType
my_udf = udf(convert_long, IntegerType() )
(parsed_df.select(my_udf('size_returned').alias('size'))
 .select(mean('size').alias('Mean Size'),
  max('size').alias('Max Size'),
  min('size').alias('Min Size'),
  count('size').alias('Count'),
  stddev('size').alias('stddev Size')).show()
)
+-----------------+--------+--------+-------+-----------------+
|        Mean Size|Max Size|Min Size|  Count|      stddev Size|
+-----------------+--------+--------+-------+-----------------+
|18935.44123819487| 6823936|       0|3460360|73043.87458874052|
+-----------------+--------+--------+-------+-----------------+Copy

SQL way

parsed_df_cleaned = parsed_rdd2.map(lambda line: convert_long(line[-1])).toDF(IntegerType())
parsed_df_cleaned.createOrReplaceTempView("parsed_df_cleaned_table")
sqlcontext.sql("SELECT avg(value) AS Mean_size, max(value) AS Max_size, \
                min(value) AS Min_size, count(*) AS count, \
                std(value) AS stddev_size FROM parsed_df_cleaned_table").show()
+-----------------+--------+--------+-------+-----------------+
|        Mean_size|Max_size|Min_size|  count|      stddev_size|
+-----------------+--------+--------+-------+-----------------+
|18935.44123819487| 6823936|       0|3460360|73043.87458874052|
+-----------------+--------+--------+-------+-----------------+Copy

Find the number of logs with each response code

RDD way

n_codes = parsed_rdd2.map(lambda line: (line[-2], 1)).distinct().count()
codes_count = (parsed_rdd2.map(lambda line: (line[-2], 1))
          .reduceByKey(lambda a, b: a + b)
          .takeOrdered(n_codes, lambda x: -x[1]))
codes_count
[('200', 3099280),
 ('304', 266773),
 ('302', 73070),
 ('404', 20890),
 ('403', 225),
 ('500', 65),
 ('501', 41),
 ('400', 14),
 ('243', 1),
 ('199', 1)]Copy
codes =[x[0] for x in codes_count]
count =[x[1] for x in codes_count]
codes_dict = {'code':codes,'count':count}
codes_df = pd.DataFrame(codes_dict)
plot = codes_df.plot(figsize = (12, 6), kind = 'barh', y = 'count', x = 'code', legend = False)
plot.invert_yaxis()
plt.title('Number of requests by response code', fontsize = 20, color = 'b')
plt.xlabel('Count', fontsize = 16, color = 'b')
plt.ylabel('Response code', fontsize = 16, color = 'b')
plt.xticks(size = 14)
plt.yticks(size = 14)
plt.show()Copy

Gives this plot:

We can also create a pie chart as below.

def pie_pct_format(value):
    return '' if value < 7 else '%.0f%%' % value

fig = plt.figure(figsize =(10, 10), facecolor = 'white', edgecolor = 'white')
colors = ['yellowgreen', 'lightskyblue', 'gold', 'purple', 'lightcoral', 'yellow', 'black']
explode = (0.05, 0.05, 0.1, 0, 0, 0, 0,0,0,0)
patches, texts, autotexts = plt.pie(count, labels = codes, colors = colors,
                                    explode = explode, autopct = pie_pct_format,
                                    shadow = False,  startangle = 125)
for text, autotext in zip(texts, autotexts):
    if autotext.get_text() == '':
        text.set_text('')  
plt.legend(codes, loc = (0.80, -0.1), shadow=True)
passCopy

Gives this plot:

DataFrame way

parsed_df.groupBy('status_code').count().orderBy('count', ascending = False).show()
+-----------+-------+
|status_code|  count|
+-----------+-------+
|        200|3099280|
|        304| 266773|
|        302|  73070|
|        404|  20890|
|        403|    225|
|        500|     65|
|        501|     41|
|        400|     14|
|        199|      1|
|        243|      1|
+-----------+-------+Copy

SQL way

sqlcontext.sql("SELECT status_code, count(*) AS count FROM parsed_table \
GROUP BY status_code ORDER BY count DESC").show()
+-----------+-------+
|status_code|  count|
+-----------+-------+
|        200|3099280|
|        304| 266773|
|        302|  73070|
|        404|  20890|
|        403|    225|
|        500|     65|
|        501|     41|
|        400|     14|
|        199|      1|
|        243|      1|
+-----------+-------+Copy

What are the top ten endpoints?

RDD way

result = parsed_rdd2.map(lambda line: (line[6],1)).reduceByKey(lambda a, b: a + b).takeOrdered(10, lambda x: -x[1])
result
[('/images/NASA-logosmall.gif', 208798),
 ('/images/KSC-logosmall.gif', 164976),
 ('/images/MOSAIC-logosmall.gif', 127916),
 ('/images/USA-logosmall.gif', 127082),
 ('/images/WORLD-logosmall.gif', 125933),
 ('/images/ksclogo-medium.gif', 121580),
 ('/ksc.html', 83918),
 ('/images/launch-logo.gif', 76009),
 ('/history/apollo/images/apollo-logo1.gif', 68898),
 ('/shuttle/countdown/', 64740)]
endpoint = [x[0] for x in result]
count = [x[1] for x in result]
endpoint_count_dct = {'endpoint':endpoint, 'count':count}
endpoint_count_df = pd.DataFrame(endpoint_count_dct )
myplot = endpoint_count_df .plot(figsize = (12,8), kind = "barh", color = "#669999", width = 0.8,
                               x = "endpoint", y = "count", legend = False)
myplot.invert_yaxis()
plt.xlabel("Count", fontsize = 16)
plt.ylabel("End point", fontsize = 16)
plt.title("Top ten endpoints ", fontsize = 20, color = 'b')
plt.xticks(size = 14)
plt.yticks(size = 14)
plt.show()Copy

Gives this plot:

DataFrame way

parsed_df.groupBy('endpoint').count().orderBy('count', ascending = False).show(10, truncate = False)
Copy

SQL way

sqlcontext.sql("SELECT endpoint, count(*) AS count FROM parsed_table \
GROUP BY endpoint ORDER BY count DESC LIMIT 10").show(truncate = False)
+---------------------------------------+------+
|endpoint                               |count |
+---------------------------------------+------+
|/images/NASA-logosmall.gif             |208798|
|/images/KSC-logosmall.gif              |164976|
|/images/MOSAIC-logosmall.gif           |127916|
|/images/USA-logosmall.gif              |127082|
|/images/WORLD-logosmall.gif            |125933|
|/images/ksclogo-medium.gif             |121580|
|/ksc.html                              |83918 |
|/images/launch-logo.gif                |76009 |
|/history/apollo/images/apollo-logo1.gif|68898 |
|/shuttle/countdown/                    |64740 |
+---------------------------------------+------+Copy

What are the top eight endpoints which did not have return code 200?

These are error endpoints
RDD way

result = (parsed_rdd2.filter(lambda line: line[9] != '200')
          .map(lambda line: (line[6], 1))
          .reduceByKey(lambda a, b: a+b)
          .takeOrdered(8, lambda x: -x[1]))
result
[('/images/NASA-logosmall.gif', 40090),
 ('/images/KSC-logosmall.gif', 23763),
 ('/images/MOSAIC-logosmall.gif', 15245),
 ('/images/USA-logosmall.gif', 15142),
 ('/images/WORLD-logosmall.gif', 14773),
 ('/images/ksclogo-medium.gif', 13559),
 ('/images/launch-logo.gif', 8806),
 ('/history/apollo/images/apollo-logo1.gif', 7489)]Copy
endpoint = [x[0] for x in result]
count = [x[1] for x in result]
endpoint_count_dct = {'endpoint':endpoint, 'count':count}
endpoint_count_df = pd.DataFrame(endpoint_count_dct )

myplot = endpoint_count_df .plot(figsize = (12,8), kind = "barh", color = "#ff4000", width = 0.8,
                               x = "endpoint", y = "count", legend = False)

myplot.invert_yaxis()

plt.xlabel("Count", fontsize = 16)
plt.ylabel("End Point", fontsize = 16)
plt.title("Top eight error endpoints ", fontsize = 20, color = 'b')
plt.xticks(size = 14)
plt.yticks(size = 14)
plt.show()Copy

Gives this plot:

DataFrame way

(parsed_df.filter(parsed_df['status_code']!=200)
 .groupBy('endpoint').count().orderBy('count', ascending = False)
   .show(8, truncate = False))
+---------------------------------------+-----+
|endpoint                               |count|
+---------------------------------------+-----+
|/images/NASA-logosmall.gif             |40090|
|/images/KSC-logosmall.gif              |23763|
|/images/MOSAIC-logosmall.gif           |15245|
|/images/USA-logosmall.gif              |15142|
|/images/WORLD-logosmall.gif            |14773|
|/images/ksclogo-medium.gif             |13559|
|/images/launch-logo.gif                |8806 |
|/history/apollo/images/apollo-logo1.gif|7489 |
+---------------------------------------+-----+
only showing top 8 rowsCopy

SQL way

sqlcontext.sql("SELECT endpoint, count(*) AS count FROM parsed_table \
WHERE status_code != 200 GROUP BY endpoint ORDER BY count DESC LIMIT 8").show(truncate = False)
+---------------------------------------+-----+
|endpoint                               |count|
+---------------------------------------+-----+
|/images/NASA-logosmall.gif             |40090|
|/images/KSC-logosmall.gif              |23763|
|/images/MOSAIC-logosmall.gif           |15245|
|/images/USA-logosmall.gif              |15142|
|/images/WORLD-logosmall.gif            |14773|
|/images/ksclogo-medium.gif             |13559|
|/images/launch-logo.gif                |8806 |
|/history/apollo/images/apollo-logo1.gif|7489 |
+---------------------------------------+-----+Copy

How many unique hosts are there in the entire log?

RDD way

parsed_rdd2.map(lambda line: line[0]).distinct().count()
137978Copy

DataFrame way

parsed_df.select(parsed_df['host']).distinct().count()
137978Copy

SQL way

sqlcontext.sql("SELECT  count(distinct(host)) AS unique_host_count FROM parsed_table ").show(truncate = False)
+-----------------+
|unique_host_count|
+-----------------+
|137978           |
+-----------------+Copy

Get the number of daily hosts.

RDD way

from datetime import datetime
def day_month(line):
    date_time = line[3]
    return datetime.strptime(date_time[:11], "%d/%b/%Y")
result = parsed_rdd2.map(lambda line:  (day_month(line), 1)).reduceByKey(lambda a, b: a + b).collect()
day = [x[0] for x in result]
count = [x[1] for x in result]
day_count_dct = {'day':day, 'count':count}
day_count_df = pd.DataFrame(day_count_dct )

myplot = day_count_df.plot(figsize = (12,8), kind = "line", color = "#cc9900",
                               x = "day", y = "count", legend = False)


plt.ylabel("Count", fontsize = 16)
plt.xlabel("")
plt.title("Number of hosts per day ", fontsize = 20, color = 'b')
plt.xticks(size = 14)
plt.yticks(size = 14)
plt.show()Copy

Gives this plot:

Now, let's just display the first ten values to compare with results from the other methods.

parsed_rdd2.map(lambda line:  (day_month(line), 1)).reduceByKey(lambda a, b: a + b).takeOrdered(10, lambda x: x[0])
[(datetime.datetime(1995, 7, 1, 0, 0), 64714),
 (datetime.datetime(1995, 7, 2, 0, 0), 60265),
 (datetime.datetime(1995, 7, 3, 0, 0), 89584),
 (datetime.datetime(1995, 7, 4, 0, 0), 70452),
 (datetime.datetime(1995, 7, 5, 0, 0), 94575),
 (datetime.datetime(1995, 7, 6, 0, 0), 100960),
 (datetime.datetime(1995, 7, 7, 0, 0), 87233),
 (datetime.datetime(1995, 7, 8, 0, 0), 38866),
 (datetime.datetime(1995, 7, 9, 0, 0), 35272),
 (datetime.datetime(1995, 7, 10, 0, 0), 72860)]Copy

DataFrame way

from datetime import datetime
from pyspark.sql.functions import col,udf
from pyspark.sql.types import TimestampType

myfunc =  udf(lambda x: datetime.strptime(x, '%d/%b/%Y:%H:%M:%S'), TimestampType())
parsed_df2 = parsed_df.withColumn('date_time', myfunc(col('date_time')))
from pyspark.sql.functions import date_format, month,dayofmonth

parsed_df2 = parsed_df2.withColumn("month", month(col("date_time"))).\
                                   withColumn("DayOfmonth", dayofmonth(col("date_time")))
    
n_hosts_by_day  = parsed_df2.groupBy(["month", "DayOfmonth"]).count().orderBy((["month", "DayOfmonth"]))
n_hosts_by_day.show(n = 10)
+-----+----------+------+
|month|DayOfmonth| count|
+-----+----------+------+
|    7|         1| 64714|
|    7|         2| 60265|
|    7|         3| 89584|
|    7|         4| 70452|
|    7|         5| 94575|
|    7|         6|100960|
|    7|         7| 87233|
|    7|         8| 38866|
|    7|         9| 35272|
|    7|        10| 72860|
+-----+----------+------+
only showing top 10 rowsCopy

SQL way

parsed_df2.createOrReplaceTempView("parsed_df2_table")
sqlcontext.sql("SELECT month, DayOfmonth, count(*) As count FROM parsed_df2_table GROUP BY month, DayOfmonth\
                 ORDER BY  month, DayOfmonth LIMIT 10").show()
+-----+----------+------+
|month|DayOfmonth| count|
+-----+----------+------+
|    7|         1| 64714|
|    7|         2| 60265|
|    7|         3| 89584|
|    7|         4| 70452|
|    7|         5| 94575|
|    7|         6|100960|
|    7|         7| 87233|
|    7|         8| 38866|
|    7|         9| 35272|
|    7|        10| 72860|
+-----+----------+------+Copy

Number of unique hosts per day

RDD way

result = (parsed_rdd2.map(lambda line:  (day_month(line),line[0]))
          .groupByKey().mapValues(set)
          .map(lambda x: (x[0], len(x[1])))).collect()
day = [x[0] for x in result]
count = [x[1] for x in result]
day_count_dct = {'day':day, 'count':count}
day_count_df = pd.DataFrame(day_count_dct )

myplot = day_count_df.plot(figsize = (12,8), kind = "line", color = "#000099",
                               x = "day", y = "count", legend = False)


plt.ylabel("Count", fontsize = 16)
plt.xlabel("")
plt.title("Number of unique hosts per day ", fontsize = 20, color = 'b')
plt.xticks(size = 14)
plt.yticks(size = 14)
plt.show()Copy

Gives this plot:

Now, let's display 10 days with the highest values to compare with results from the other methods

(parsed_rdd2.map(lambda line:  (day_month(line),line[0]))
          .groupByKey().mapValues(set)
          .map(lambda x: (x[0], len(x[1])))).takeOrdered(10, lambda x: x[0])
[(datetime.datetime(1995, 7, 1, 0, 0), 5192),
 (datetime.datetime(1995, 7, 2, 0, 0), 4859),
 (datetime.datetime(1995, 7, 3, 0, 0), 7336),
 (datetime.datetime(1995, 7, 4, 0, 0), 5524),
 (datetime.datetime(1995, 7, 5, 0, 0), 7383),
 (datetime.datetime(1995, 7, 6, 0, 0), 7820),
 (datetime.datetime(1995, 7, 7, 0, 0), 6474),
 (datetime.datetime(1995, 7, 8, 0, 0), 2898),
 (datetime.datetime(1995, 7, 9, 0, 0), 2554),
 (datetime.datetime(1995, 7, 10, 0, 0), 4464)]Copy

SQL way

sqlcontext.sql("SELECT  DATE(date_time) Date, COUNT(DISTINCT host) AS totalUniqueHosts FROM\
                parsed_df2_table GROUP   BY  DATE(date_time) ORDER BY DATE(date_time) ASC").show(n = 10) 
+----------+----------------+
|      Date|totalUniqueHosts|
+----------+----------------+
|1995-07-01|            5192|
|1995-07-02|            4859|
|1995-07-03|            7336|
|1995-07-04|            5524|
|1995-07-05|            7383|
|1995-07-06|            7820|
|1995-07-07|            6474|
|1995-07-08|            2898|
|1995-07-09|            2554|
|1995-07-10|            4464|
+----------+----------------+
only showing top 10 rowsCopy

Average Number of Daily Requests per Hosts

RDD way

unique_result = (parsed_rdd2.map(lambda line:  (day_month(line),line[0]))
          .groupByKey().mapValues(set)
          .map(lambda x: (x[0], len(x[1]))))

length_result = (parsed_rdd2.map(lambda line:  (day_month(line),line[0]))
          .groupByKey().mapValues(len))

joined = length_result.join(unique_result).map(lambda a: (a[0], (a[1][0])/(a[1][1]))).collect()
day = [x[0] for x in joined]
count = [x[1] for x in joined]
day_count_dct = {'day':day, 'count':count}
day_count_df = pd.DataFrame(day_count_dct )

myplot = day_count_df.plot(figsize = (12,8), kind = "line", color = "#000099",
                               x = "day", y = "count", legend = False)


plt.ylabel("Count", fontsize = 16)
plt.xlabel("")
plt.title("Average number of daily requests per host", fontsize = 20, color = 'b')
plt.xticks(size = 14)
plt.yticks(size = 14)
plt.show()Copy

Gives this plot:

sorted(joined)[:10]
[(datetime.datetime(1995, 7, 1, 0, 0), 12.464175654853621),
 (datetime.datetime(1995, 7, 2, 0, 0), 12.40275776908829),
 (datetime.datetime(1995, 7, 3, 0, 0), 12.211559432933479),
 (datetime.datetime(1995, 7, 4, 0, 0), 12.753801593048516),
 (datetime.datetime(1995, 7, 5, 0, 0), 12.809833401056482),
 (datetime.datetime(1995, 7, 6, 0, 0), 12.910485933503836),
 (datetime.datetime(1995, 7, 7, 0, 0), 13.474358974358974),
 (datetime.datetime(1995, 7, 8, 0, 0), 13.411318150448585),
 (datetime.datetime(1995, 7, 9, 0, 0), 13.810493343774471),
 (datetime.datetime(1995, 7, 10, 0, 0), 16.32168458781362)]Copy

SQL way

sqlcontext.sql("SELECT  DATE(date_time) Date, COUNT(host)/COUNT(DISTINCT host) AS daily_requests_per_host FROM\
                parsed_df2_table GROUP   BY  DATE(date_time) ORDER BY DATE(date_time) ASC").show(n = 10) 
+----------+-----------------------+
|      Date|daily_requests_per_host|
+----------+-----------------------+
|1995-07-01|     12.464175654853621|
|1995-07-02|      12.40275776908829|
|1995-07-03|     12.211559432933479|
|1995-07-04|     12.753801593048516|
|1995-07-05|     12.809833401056482|
|1995-07-06|     12.910485933503836|
|1995-07-07|     13.474358974358974|
|1995-07-08|     13.411318150448585|
|1995-07-09|     13.810493343774471|
|1995-07-10|      16.32168458781362|
+----------+-----------------------+
only showing top 10 rowsCopy

How many 404 records are in the log?

RDD way

parsed_rdd2.filter(lambda line: line[9] == '404').count()
20890Copy

DataFrame way

parsed_df2.filter(parsed_df2['status_code']=="404").count()
20890Copy

SQL way

sqlcontext.sql("SELECT  COUNT(*) AS logs_404_count FROM\
                parsed_df2_table WHERE status_code ==404").show(n = 10) 
+--------------+
|logs_404_count|
+--------------+
|         20890|
+--------------+Copy

Find the top five 404 response code endpoints

RDD way

result = (parsed_rdd2.filter(lambda line: line[9] == '404')
          .map(lambda line: (line[6], 1))
          .reduceByKey(lambda a, b: a+b)
          .takeOrdered(5, lambda x: -x[1]))
result
[('/pub/winvn/readme.txt', 2004),
 ('/pub/winvn/release.txt', 1732),
 ('/shuttle/missions/STS-69/mission-STS-69.html', 683),
 ('/shuttle/missions/sts-68/ksc-upclose.gif', 428),
 ('/history/apollo/a-001/a-001-patch-small.gif', 384)]Copy
endpoint = [x[0] for x in result]
count = [x[1] for x in result]
endpoint_count_dct = {'endpoint':endpoint, 'count':count}
endpoint_count_df = pd.DataFrame(endpoint_count_dct )

myplot = endpoint_count_df .plot(figsize = (12,8), kind = "barh", color = "#cc9900", width = 0.8,
                               x = "endpoint", y = "count", legend = False)

myplot.invert_yaxis()

plt.xlabel("Count", fontsize = 16)
plt.ylabel("End Point", fontsize = 16)
plt.title("Top ten endpoints with 404 response codes ", fontsize = 20, color = 'r')
plt.xticks(size = 14)
plt.yticks(size = 14)
plt.show()Copy

Gives this plot:

DataFrame way

(parsed_df2.filter(parsed_df2['status_code']=="404")
.groupBy('endpoint').count().orderBy('count', ascending = False).show(5, truncate = False))
+--------------------------------------------+-----+
|endpoint                                    |count|
+--------------------------------------------+-----+
|/pub/winvn/readme.txt                       |2004 |
|/pub/winvn/release.txt                      |1732 |
|/shuttle/missions/STS-69/mission-STS-69.html|683  |
|/shuttle/missions/sts-68/ksc-upclose.gif    |428  |
|/history/apollo/a-001/a-001-patch-small.gif |384  |
+--------------------------------------------+-----+
only showing top 5 rowsCopy

SQL way

sqlcontext.sql("SELECT endpoint, COUNT(*) AS count FROM\
                parsed_df2_table WHERE status_code ==404 GROUP BY endpoint\
                ORDER BY count DESC LIMIT 5").show(truncate = False) 
+--------------------------------------------+-----+
|endpoint                                    |count|
+--------------------------------------------+-----+
|/pub/winvn/readme.txt                       |2004 |
|/pub/winvn/release.txt                      |1732 |
|/shuttle/missions/STS-69/mission-STS-69.html|683  |
|/shuttle/missions/sts-68/ksc-upclose.gif    |428  |
|/history/apollo/a-001/a-001-patch-small.gif |384  |
+--------------------------------------------+-----+Copy

Find the top five 404 response code hosts

RDD way

result = (parsed_rdd2.filter(lambda line: line[9] == '404')
          .map(lambda line: (line[0], 1))
          .reduceByKey(lambda a, b: a+b)
          .takeOrdered(5, lambda x: -x[1]))
result
[('hoohoo.ncsa.uiuc.edu', 251),
 ('piweba3y.prodigy.com', 157),
 ('jbiagioni.npt.nuwc.navy.mil', 132),
 ('piweba1y.prodigy.com', 114),
 ('www-d4.proxy.aol.com', 91)]Copy
host = [x[0] for x in result]
count = [x[1] for x in result]
host_count_dct = {'host':host, 'count':count}
host_count_df = pd.DataFrame(host_count_dct )

myplot = host_count_df .plot(figsize = (12,8), kind = "barh", color = "#cc9900", width = 0.8,
                               x = "host", y = "count", legend = False)

myplot.invert_yaxis()

plt.xlabel("Count", fontsize = 16)
plt.ylabel("Host", fontsize = 16)
plt.title("Top five hosts with 404 response codes ", fontsize = 20, color = 'r')
plt.xticks(size = 14)
plt.yticks(size = 14)
plt.show()Copy

Gives this plot:

DataFrame way

(parsed_df2.filter(parsed_df2['status_code']=="404")
.groupBy('host').count().orderBy('count', ascending = False).show(5, truncate = False))
+---------------------------+-----+
|host                       |count|
+---------------------------+-----+
|hoohoo.ncsa.uiuc.edu       |251  |
|piweba3y.prodigy.com       |157  |
|jbiagioni.npt.nuwc.navy.mil|132  |
|piweba1y.prodigy.com       |114  |
|www-d4.proxy.aol.com       |91   |
+---------------------------+-----+
only showing top 5 rowsCopy

SQL way

sqlcontext.sql("SELECT host, COUNT(*) AS count FROM\
                parsed_df2_table WHERE status_code ==404 GROUP BY host\
                ORDER BY count DESC LIMIT 5").show(truncate = False) 
+---------------------------+-----+
|host                       |count|
+---------------------------+-----+
|hoohoo.ncsa.uiuc.edu       |251  |
|piweba3y.prodigy.com       |157  |
|jbiagioni.npt.nuwc.navy.mil|132  |
|piweba1y.prodigy.com       |114  |
|www-d4.proxy.aol.com       |91   |
+---------------------------+-----+Copy

Create a viz of 404 response codes per day

RDD way

result = (parsed_rdd2.filter(lambda line: line[9] == '404')
          .map(lambda line:  (day_month(line), 1))
          .reduceByKey(lambda a, b: a+b).collect())
day = [x[0] for x in result]
count = [x[1] for x in result]
day_count_dct = {'day':day, 'count':count}
day_count_df = pd.DataFrame(day_count_dct )

myplot = day_count_df.plot(figsize = (12,8), kind = "line", color = "#cc9900",
                               x = "day", y = "count", legend = False)


plt.ylabel("Count", fontsize = 16)
plt.xlabel("")
plt.title("Number of 404 response codes per day ", fontsize = 20, color = 'r')
plt.xticks(size = 14)
plt.yticks(size = 14)
plt.show()Copy

Gives this plot:

Now, let's display 10 days with the highest number of 404 errors to compare results from the other methods.

day_count_df.sort_values('count', ascending = False)[:10]
count	day
25	640	1995-07-06
30	639	1995-07-19
15	571	1995-08-30
34	570	1995-07-07
50	537	1995-08-07
17	530	1995-07-13
6	526	1995-08-31
4	497	1995-07-05
27	474	1995-07-03
56	471	1995-07-11Copy

DataFrame way

(parsed_df2.filter(parsed_df2['status_code']=="404")
.groupBy(["month", "DayOfmonth"]).count()
.orderBy('count', ascending = False).show(10)
)
+-----+----------+-----+
|month|DayOfmonth|count|
+-----+----------+-----+
|    7|         6|  640|
|    7|        19|  639|
|    8|        30|  571|
|    7|         7|  570|
|    8|         7|  537|
|    7|        13|  530|
|    8|        31|  526|
|    7|         5|  497|
|    7|         3|  474|
|    7|        11|  471|
+-----+----------+-----+
only showing top 10 rowsCopy

SQL way

sqlcontext.sql("SELECT  DATE(date_time) AS Date, COUNT(*) AS daily_404_erros FROM\
                parsed_df2_table WHERE status_code = 404 \
                GROUP   BY  DATE(date_time) ORDER BY daily_404_erros DESC LIMIT 10").show()
+----------+---------------+
|      Date|daily_404_erros|
+----------+---------------+
|1995-07-06|            640|
|1995-07-19|            639|
|1995-08-30|            571|
|1995-07-07|            570|
|1995-08-07|            537|
|1995-07-13|            530|
|1995-08-31|            526|
|1995-07-05|            497|
|1995-07-03|            474|
|1995-07-11|            471|
+----------+---------------+Copy

Top five days for 404 response codes

RDD way

(parsed_rdd2.filter(lambda line: line[9] == '404')
          .map(lambda line:  (day_month(line), 1))
          .reduceByKey(lambda a, b: a+b).takeOrdered(5, lambda x: -x[1]))
[(datetime.datetime(1995, 7, 6, 0, 0), 640),
 (datetime.datetime(1995, 7, 19, 0, 0), 639),
 (datetime.datetime(1995, 8, 30, 0, 0), 571),
 (datetime.datetime(1995, 7, 7, 0, 0), 570),
 (datetime.datetime(1995, 8, 7, 0, 0), 537)]Copy

This has been solved the SQL way and the RDD way in No. 13 above.

Create an hourly 404 response codes line chart

RDD way

def date_time(line):
    date_time = line[3]
    return datetime.strptime(date_time, "%d/%b/%Y:%H:%M:%S")
result = (parsed_rdd2.filter(lambda line: line[9] == '404').map(lambda line:  (date_time(line).hour, 1))
          .reduceByKey(lambda a, b: a + b)).collect()
result = sorted(result)
hour = [x[0] for x in result]
count = [x[1] for x in result]
hour_count_dct = {'hour': hour, 'count':count}
hour_count_df = pd.DataFrame(hour_count_dct )

myplot = hour_count_df.plot(figsize = (12,8), kind = "bar", color = "#000000",x ='hour',
                                y = "count", legend = False)


plt.ylabel("Count", fontsize = 16)
plt.xlabel("Hour of day", fontsize = 16)
plt.title("Number of 404 errors per hour ", fontsize = 20, color = 'r')
plt.xticks(size = 14)
plt.yticks(size = 14)
plt.show()Copy

Gives this plot:

DataFrame way

from pyspark.sql.functions import hour
parsed_df3 = parsed_df2.withColumn('hour_of_day', hour(col('date_time')))

(parsed_df3.filter(parsed_df3['status_code']=="404")
.groupBy("hour_of_day").count()
.orderBy("hour_of_day", ascending = True).show(10))
+-----------+-----+
|hour_of_day|count|
+-----------+-----+
|          0|  774|
|          1|  648|
|          2|  868|
|          3|  603|
|          4|  351|
|          5|  306|
|          6|  269|
|          7|  458|
|          8|  705|
|          9|  840|
+-----------+-----+
only showing top 10 rowsCopy

SQL way

sqlcontext.sql("SELECT  HOUR(date_time) AS hour, COUNT(*) AS hourly_404_erros FROM\
                parsed_df2_table WHERE status_code = 404 \
                GROUP BY HOUR(date_time) ORDER BY HOUR(date_time) LIMIT 10").show(n = 100)
+----+----------------+
|hour|hourly_404_erros|
+----+----------------+
|   0|             774|
|   1|             648|
|   2|             868|
|   3|             603|
|   4|             351|
|   5|             306|
|   6|             269|
|   7|             458|
|   8|             705|
|   9|             840|
+----+----------------+Copy

This is enough for today. See you in the next part of the DataFrames Vs RDDs in Spark tutorial series.