We usually work with structured data in our machine learning applications. However, unstructured text data can also have vital content for machine learning models. In this blog post, we will see how to use PySpark to build machine learning models with unstructured text data.The data is from UCI Machine Learning Repository and can be downloaded from here.

According to the data describing the data is a set of SMS tagged messages that have been collected for SMS Spam research. It contains one set of SMS messages in English of 5,574 messages, tagged according to being ham (legitimate) or spam. We will tokenize the messages and create TF-IDF and then we will build various models using cross-validation and grid search and compare their performances.

Start a SparkSession

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

Read Data

df = spark.read.csv("SMSSpamCollection", sep = "\t", inferSchema=True, header = False)

Let’s see the first five rows. As shown below, the data does not have column names. So, we will rename them.

df.show(5, truncate = False)
+----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|_c0 |_c1                                                                                                                                                        |
+----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|ham |Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...                                            |
|ham |Ok lar... Joking wif u oni...                                                                                                                              |
|spam|Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's|
|ham |U dun say so early hor... U c already then say...                                                                                                          |
|ham |Nah I don't think he goes to usf, he lives around here though                                                                                              |
+----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
only showing top 5 rows

Rename Columns

df = df.withColumnRenamed('_c0', 'status').withColumnRenamed('_c1', 'message')
df.show(5, truncate = False)
+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|status|message                                                                                                                                                    |
+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|ham   |Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...                                            |
|ham   |Ok lar... Joking wif u oni...                                                                                                                              |
|spam  |Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's|
|ham   |U dun say so early hor... U c already then say...                                                                                                          |
|ham   |Nah I don't think he goes to usf, he lives around here though                                                                                              |
+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
only showing top 5 rows

Change the status column to numeric

Now, let’s change the status field to numeric field: ham to 1.0 and spam to 0. All our fields need to be numeric.

df.createOrReplaceTempView('temp')
df = spark.sql('select case status when "ham" then 1.0  else 0 end as label, message from temp')
df.show(5, truncate = False)
+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|label|message                                                                                                                                                    |
+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|1.0  |Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...                                            |
|1.0  |Ok lar... Joking wif u oni...                                                                                                                              |
|0.0  |Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's|
|1.0  |U dun say so early hor... U c already then say...                                                                                                          |
|1.0  |Nah I don't think he goes to usf, he lives around here though                                                                                              |
+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
only showing top 5 rows

Tokenize the messages

Tokenization is the process of taking text (such as a sentence) and breaking it into individual terms (usually words). Let’s tokenize the messages and create a list of words of each message.

from pyspark.ml.feature import  Tokenizer
tokenizer = Tokenizer(inputCol="message", outputCol="words")
wordsData = tokenizer.transform(df)
wordsData.show()
+-----+--------------------+--------------------+
|label|             message|               words|
+-----+--------------------+--------------------+
|  1.0|Go until jurong p...|[go, until, juron...|
|  1.0|Ok lar... Joking ...|[ok, lar..., joki...|
|  0.0|Free entry in 2 a...|[free, entry, in,...|
|  1.0|U dun say so earl...|[u, dun, say, so,...|
|  1.0|Nah I don't think...|[nah, i, don't, t...|
|  0.0|FreeMsg Hey there...|[freemsg, hey, th...|
|  1.0|Even my brother i...|[even, my, brothe...|
|  1.0|As per your reque...|[as, per, your, r...|
|  0.0|WINNER!! As a val...|[winner!!, as, a,...|
|  0.0|Had your mobile 1...|[had, your, mobil...|
|  1.0|I'm gonna be home...|[i'm, gonna, be, ...|
|  0.0|SIX chances to wi...|[six, chances, to...|
|  0.0|URGENT! You have ...|[urgent!, you, ha...|
|  1.0|I've been searchi...|[i've, been, sear...|
|  1.0|I HAVE A DATE ON ...|[i, have, a, date...|
|  0.0|XXXMobileMovieClu...|[xxxmobilemoviecl...|
|  1.0|Oh k...i'm watchi...|[oh, k...i'm, wat...|
|  1.0|Eh u remember how...|[eh, u, remember,...|
|  1.0|Fine if that’s th...|[fine, if, that’s...|
|  0.0|England v Macedon...|[england, v, mace...|
+-----+--------------------+--------------------+
only showing top 20 rows

Apply CountVectorizer

CountVectorizer converts the list of tokens above to vectors of token counts. See the documentation description for details.
from pyspark.ml.feature import CountVectorizer
count = CountVectorizer (inputCol="words", outputCol="rawFeatures")
model = count.fit(wordsData)
featurizedData = model.transform(wordsData)
featurizedData.show()
+-----+--------------------+--------------------+--------------------+
|label|             message|               words|         rawFeatures|
+-----+--------------------+--------------------+--------------------+
|  1.0|Go until jurong p...|[go, until, juron...|(13587,[8,42,52,6...|
|  1.0|Ok lar... Joking ...|[ok, lar..., joki...|(13587,[5,75,411,...|
|  0.0|Free entry in 2 a...|[free, entry, in,...|(13587,[0,3,8,20,...|
|  1.0|U dun say so earl...|[u, dun, say, so,...|(13587,[5,22,60,1...|
|  1.0|Nah I don't think...|[nah, i, don't, t...|(13587,[0,1,66,87...|
|  0.0|FreeMsg Hey there...|[freemsg, hey, th...|(13587,[0,2,6,10,...|
|  1.0|Even my brother i...|[even, my, brothe...|(13587,[0,7,9,13,...|
|  1.0|As per your reque...|[as, per, your, r...|(13587,[0,10,11,4...|
|  0.0|WINNER!! As a val...|[winner!!, as, a,...|(13587,[0,2,3,14,...|
|  0.0|Had your mobile 1...|[had, your, mobil...|(13587,[0,4,5,10,...|
|  1.0|I'm gonna be home...|[i'm, gonna, be, ...|(13587,[0,1,6,32,...|
|  0.0|SIX chances to wi...|[six, chances, to...|(13587,[0,6,40,46...|
|  0.0|URGENT! You have ...|[urgent!, you, ha...|(13587,[0,2,3,4,8...|
|  1.0|I've been searchi...|[i've, been, sear...|(13587,[0,1,2,3,4...|
|  1.0|I HAVE A DATE ON ...|[i, have, a, date...|(13587,[1,3,14,16...|
|  0.0|XXXMobileMovieClu...|[xxxmobilemoviecl...|(13587,[0,4,8,11,...|
|  1.0|Oh k...i'm watchi...|[oh, k...i'm, wat...|(13587,[158,314,3...|
|  1.0|Eh u remember how...|[eh, u, remember,...|(13587,[1,5,20,47...|
|  1.0|Fine if that’s th...|[fine, if, that’s...|(13587,[4,5,29,59...|
|  0.0|England v Macedon...|[england, v, mace...|(13587,[0,4,28,82...|
+-----+--------------------+--------------------+--------------------+
only showing top 20 rows

Apply term frequency–inverse document frequency (TF-IDF)

from pyspark.ml.feature import  IDF
IDF down-weighs features which appear frequently in a corpus. This generally improves performance when using text as features since most frequent, and hence less important words, get down-weighed.
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
rescaledData.select("label", "features").show()  # We want only the label and features columns for our machine learning models
+-----+--------------------+
|label|            features|
+-----+--------------------+
|  1.0|(13587,[8,42,52,6...|
|  1.0|(13587,[5,75,411,...|
|  0.0|(13587,[0,3,8,20,...|
|  1.0|(13587,[5,22,60,1...|
|  1.0|(13587,[0,1,66,87...|
|  0.0|(13587,[0,2,6,10,...|
|  1.0|(13587,[0,7,9,13,...|
|  1.0|(13587,[0,10,11,4...|
|  0.0|(13587,[0,2,3,14,...|
|  0.0|(13587,[0,4,5,10,...|
|  1.0|(13587,[0,1,6,32,...|
|  0.0|(13587,[0,6,40,46...|
|  0.0|(13587,[0,2,3,4,8...|
|  1.0|(13587,[0,1,2,3,4...|
|  1.0|(13587,[1,3,14,16...|
|  0.0|(13587,[0,4,8,11,...|
|  1.0|(13587,[158,314,3...|
|  1.0|(13587,[1,5,20,47...|
|  1.0|(13587,[4,5,29,59...|
|  0.0|(13587,[0,4,28,82...|
+-----+--------------------+
only showing top 20 rows

Split data into training (80%) and testing (20%)

We will split the dataframe into training and test sets, train on the first dataset, and then evaluate on the held-out test set.

seed = 0  # set seed for reproducibility
trainDF, testDF = rescaledData.randomSplit([0.8,0.2],seed)

Number of records of each dataframe

trainDF.count()
4450
testDF.count()
1124

Now, let's fit different classifiers. We will use grid search with cross-validation to search better parameter values among the provided ones. You can fine tune the models by providing finer parameter grid, and also including more of the important parameters for each algorithm.

Logistic Regression Classifier

Logistic regression is a popular method to predict a categorical response. It is a special case of Generalized Linear models that predicts the probability of the outcomes. In spark.ml logistic regression can be used to predict a binary outcome by using binomial logistic regression, or it can be used to predict a multiclass outcome by using multinomial logistic regression. Use the family parameter to select between these two algorithms, or leave it unset and Spark will infer the correct variant.

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import numpy as np
lr = LogisticRegression(maxIter = 10)

paramGrid_lr = ParamGridBuilder() \
    .addGrid(lr.regParam, np.linspace(0.3, 0.01, 10)) \
    .addGrid(lr.elasticNetParam, np.linspace(0.3, 0.8, 6)) \
    .build()
crossval_lr = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid_lr,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds= 5)  
cvModel_lr = crossval_lr.fit(trainDF)
best_model_lr = cvModel_lr.bestModel.summary
best_model_lr.predictions.columns
['label',
 'message',
 'words',
 'rawFeatures',
 'features',
 'rawPrediction',
 'probability',
 'prediction']

As shown below, we see the data was fitted perfectly. We will see the models performance with the test data. We have to practice caution when the models show extraordinary performance with the training data as this can be due to overfitting problem which makes the model not to generalize to unseen data.

Area under the curve for the training data

from pyspark.ml.evaluation import BinaryClassificationEvaluator
my_eval_lr = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label', metricName='areaUnderROC')
my_eval_lr.evaluate(best_model_lr.predictions)
1.0

We can get the f1 score, accuracy, precision and recall using MulticlassClassificationEvaluator which can be used for binary classification as well.

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
my_mc_lr = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='f1')
my_mc_lr.evaluate(best_model_lr.predictions)
1.0
my_mc_lr = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='accuracy')
my_mc_lr.evaluate(best_model_lr.predictions)
1.0
train_fit_lr = best_model_lr.predictions.select('label','prediction')
train_fit_lr.groupBy('label','prediction').count().show()
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0| 3848|
|  0.0|       0.0|  602|
+-----+----------+-----+

Predict using the test data and evaluate the predictions

predictions_lr = cvModel_lr.transform(testDF)
As you can see below, the predictions dataframe contains the original data and the predictions.
predictions_lr.columns
['label',
 'message',
 'words',
 'rawFeatures',
 'features',
 'rawPrediction',
 'probability',
 'prediction']
predictions_lr.show(5)
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|label|             message|               words|         rawFeatures|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|  0.0|(Bank of Granite ...|[(bank, of, grani...|(13587,[3,7,10,12...|(13587,[3,7,10,12...|[1.46730111617655...|[0.81264682234974...|       0.0|
|  0.0|+123 Congratulati...|[+123, congratula...|(13587,[0,4,5,8,1...|(13587,[0,4,5,8,1...|[5.36211698537807...|[0.99533093752360...|       0.0|
|  0.0|+449071512431 URG...|[+449071512431, u...|(13587,[0,4,7,14,...|(13587,[0,4,7,14,...|[2.52752829905056...|[0.92604926446112...|       0.0|
|  0.0|3. You have recei...|[3., you, have, r...|(13587,[2,11,14,9...|(13587,[2,11,14,9...|[-0.6098888828551...|[0.35208454565894...|       1.0|
|  0.0|44 7732584351, Do...|[44, 7732584351,,...|(13587,[0,2,3,15,...|(13587,[0,2,3,15,...|[6.30912983467855...|[0.99818368907109...|       0.0|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
only showing top 5 rows

Show sample predictions:

predictions_lr.select('label', 'prediction').show(5)
+-----+----------+
|label|prediction|
+-----+----------+
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       1.0|
|  0.0|       0.0|
+-----+----------+
only showing top 5 rows

It missed 21 spam messages but it got the ham ones correctly.

predictions_lr.groupBy('label','prediction').count().show()
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  0.0|       1.0|   21|
|  1.0|       1.0|  979|
|  0.0|       0.0|  124|
+-----+----------+-----+

Area under the curve with the test data

my_eval_lr.evaluate(predictions_lr)
0.9275862068965517

F1-score with the test data

my_mc_lr = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='f1')
my_mc_lr.evaluate(predictions_lr)
0.9806865812338207

Acccuracy with the test data

(979+124)/(979+124+21)
0.9813167259786477
my_mc_lr = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='accuracy')
my_mc_lr.evaluate(predictions_lr)
0.9813167259786477

We see that the model has good performance with the test data.

Naive Bayes

Naive Bayes classifiers are a family of simple probabilistic classifiers based on applying Bayes’ theorem with strong (naive) independence assumptions between the features. The spark.ml implementation currently supports both multinomial naive Bayes and Bernoulli naive Bayes.

from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes()
paramGrid_nb = ParamGridBuilder() \
    .addGrid(nb.smoothing, np.linspace(0.3, 10, 10)) \
    .build()
crossval_nb = CrossValidator(estimator=nb,
                          estimatorParamMaps=paramGrid_nb,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds= 5) 
cvModel_nb = crossval_nb.fit(trainDF)

Shown below are the average area under the curve values for the ten smoothing values. We see the best value is 0.8932204672602355.

cvModel_nb.avgMetrics
[0.8932204672602355,
 0.8845042603061242,
 0.8805366509217409,
 0.8777792239323049,
 0.8756333853762843,
 0.8740922267500476,
 0.8728146322696352,
 0.8716234788548282,
 0.8705026371694609,
 0.8694540174664303]

Make predictions:

predictions_nb = cvModel_nb.transform(testDF)
predictions_nb.select('label', 'prediction').show(5)
+-----+----------+
|label|prediction|
+-----+----------+
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
+-----+----------+
only showing top 5 rows

We see that this model missed some ham messages but it has better performance in identifying spam message than the logistic regression we built above.

predictions_nb.groupBy('label','prediction').count().show()
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  0.0|       1.0|    5|
|  1.0|       1.0|  921|
|  0.0|       0.0|  140|
|  1.0|       0.0|   58|
+-----+----------+-----+

From the accuracy, F1 score and area under the curve values shown below, we notice that the performance of the logistic regression is better than the Naive Bayes Model.

my_eval_nb = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label', metricName='areaUnderROC')
my_eval_nb.evaluate(predictions_nb)
0.9531365573597268
my_mc_nb = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='f1')
my_mc_nb.evaluate(predictions_nb)
0.9475008620872061
my_mc_nb = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='accuracy')
my_mc_nb.evaluate(predictions_nb)
0.943950177935943

We will see the other classification models in the next post.