Count words using distributed computing

We have seen in the hello world of nlp, how to count words in a text or from a file but what happen if the file is very large? Sure, we can read it line by line and avoiding to overflow the memory of our machine but it can take long time.

There are better ways. We can distribute the processing on multiple machines, even distributed machines.

In this tutorial, we will write code that find out the most common words in the Complete Works of William Shakespeare. This could also be scaled to larger applications, such as finding the most common words in Wikipedia.

This tutorial will use Apache Spark, a framework for large-scale data processing, within a notebook. Many traditional frameworks were designed to be run on a single computer. However, many datasets today are too large to be stored on a single computer, and even when a dataset can be stored on one computer (such as the datasets in this tutorial), the dataset can often be processed much more quickly using multiple computers. Spark has efficient implementations of a number of transformations and actions that can be composed together to perform data processing and analysis. Spark excels at distributing these operations across a cluster while abstracting away many of the underlying implementation details. Spark has been designed with a focus on scalability and efficiency. With Spark you can begin developing your solution on your laptop, using a small dataset, and then use that same code to process terabytes or even petabytes across a distributed cluster.

This notebook is available also on Databricks , the easiest way to run Apache Spark and as input-only on GitHub.

ta_Spark-logo-small  python-logo-master-v3-TM-flattened_small

A simple word count application

The volume of unstructured text in existence is growing dramatically, and Spark is an excellent tool for analyzing this type of data.
We continue from the word counting example and in this notebook, we will write code that calculates the most common words in the Complete Works of William Shakespeare retrieved from Project Gutenberg. This could also be scaled to larger applications, such as finding the most common words in Wikipedia.

During this example we will cover:
Part 0: What is Apache Spark
Part 1: Creating a base DataFrame and performing operations
Part 2: Counting with Spark SQL and DataFrames
Part 3: Finding unique words and a mean value
Part 4: Apply word count to a file

Note that for reference, you can look up the details of the relevant methods in Spark’s Python API.

Part 0: Spark

An introduction to using Apache Spark with the PySpark SQL API running in a notebook

What is Spark

Apache Spark is an open-source cluster-computing framework. Originally developed at the University of California, Berkeley’s AMPLab, the Spark codebase was later donated to the Apache Software Foundation, which has maintained it since.
Spark it is a fast and general engine for large-scale data processing.
Databricks is a company founded by the creators of Apache Spark, that aims to help clients with cloud-based big data processing using Spark.

Traditional analysis tools like R and Python Pandas run on a single machine but data are growing faster than computation speed.
The Opportunity: Cloud computing is a game-changer.
It provides access to low-cost computing and storage.
Distributing data over cluster of machines means lots of hard drives, lots of CPUs but also lots of memory !
Storage is getting cheaper but stalling CPU speeds are the bottlenecks.
A new Opportunity: Keep more data in-memory!
In-memory can make a big difference, up to 100x faster.
Spark is a new distributed execution engine that leverages the in-memory paradigm.

The challenge with cloud computing has always been programming the resources.
Spark is developed in Scala and – besides Scala itself – supports other languages such as Java and Python.
We are using for this example the Python programming interface to Spark (pySpark).
pySpark provides an easy-to-use programming abstraction and parallel runtime: “Here’s an operation, run it on all of the data”.

Spark Context

In Spark, communication occurs between a driver and executors. The driver has Spark jobs that it needs to run and these jobs are split into tasks that are submitted to the executors for completion. Executor programs run on cluster nodes or in local threads. The results from these tasks are delivered back to the driver.
Where does code run?
– Locally, in the driver
– Distributed at the executors (Executors run in parallel and have much more memory)
– Both at the driver and the executors
Problems with cheap hardware are: failures, network speeds versus shared memory, much more latency, network slower than storage, uneven performance.
How do we deal with machine failures? We launch another task.
How do we deal with slow tasks? We launch another task.

When running Spark, you start a new Spark application by creating a SparkContext.
SparkContext tells Spark how and where to access a cluster.
The program next creates a SQLContext object which is used to create and manage the DataFrames.
When the `SparkContext` is created, it asks the master for some cores to use to do work. The master sets these cores aside just for you; they won’t be used for other applications.
When using Databricks, both a `SparkContext` and a `SQLContext` are created for you automatically. `sc` is your `SparkContext`, and `sqlContext` is your `SQLContext`.

# Display the type of the Spark sqlContext
type(sqlContext)

Out[1]: pyspark.sql.context.HiveContext

Note that the type is `HiveContext`. This means we’re working with a version of Spark that has Hive support. Compiling Spark with Hive support is a good idea, even if you don’t have a Hive metastore. As the
Spark Programming Guide states, a `HiveContext` “provides a superset of the functionality provided by the basic `SQLContext`. Additional features include the ability to write queries using the more complete HiveQL parser, access to Hive UDFs [user-defined functions], and the ability to read data from Hive tables.”

Part 1: Creating a base DataFrame and performing operations

DataFrames (DF) are the key concept, the primary abstraction in Spark.
Similar to Python Pandas dataframe, they are immutable once constructed and enable operations on collection of elements in parallel.
You construct DataFrames by parallelizing existing Python collections (lists), by transforming an existing Spark or pandas DFs or from files in HDFS or any other storage system.
DataFrames support two types of operations: transformations and actions.
Transformations are lazy (not computed immediately). A transformed DF is executed when action runs on it.
A transformation to a DataFrame is for example select. Actions to a DataFrame are for example show and count.

Spark Program Lifecycle

  1. Create DataFrames from external data or create DataFrame from a collection in driver program
  2. Lazily transform them into new DataFrames
  3. cache() some DataFrames for reuse (optional)
  4. Perform actions to execute parallel computation and produce results

Most of Python code runs in driver, except for code passed to transformations. Transformations run at executors. Actions can run both at executors and driver.

In this part, we will explore creating a base DataFrame with `sqlContext.createDataFrame` and using DataFrame operations to count words.

Create a DataFrame

We’ll start by generating a base DataFrame using a Python list of tuples and the `sqlContext.createDataFrame` method. Then we’ll print out the type and schema of the DataFrame. The Python API has several examples for using the ‘createDataFrame` method.

# create a silly test dataframe from Python collections (lists)
wordsDF = sqlContext.createDataFrame([('look',), ('spark',), 
          ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])
wordsDF.show()
print type(wordsDF)
wordsDF.printSchema()

(2) Spark Jobs
+--------+ 
|    word| 
+--------+ 
|    look| 
|   spark|
|tutorial| 
|   spark| 
|    look| 
|  python|
+--------+ 
<class 'pyspark.sql.dataframe.DataFrame'> 
root
  |-- word: string (nullable = true)

As you can see, DataFrame is a class of pyspark.sql

Create a new DataFrame from an existing one

This use lazy evaluation: results are not computed right away – Spark remembers the set of transformations applied to the base DataFrame.
Think of this as a recipe for creating result.

Spark Actions like show(), collect() or count() then cause Spark to execute the recipe to transform the source. It is the mechanism for getting results out of Spark.

Length of each word

You can create a new DataFrame from our base DF `wordsDF` by calling the `select` DataFrame function and pass in the appropriate recipe: we can use the SQL `length` function to find the number of characters in each word.

The `length` function is found in the `pyspark.sql.functions` module.

from pyspark.sql.functions import length
wordsLengthsDF = wordsDF.select(length('word').alias('lengths')) 
     # transformation
wordsLengthsDF.show() # action

+-------+ 
|lengths| 
+-------+ 
|      4| 
|      5| 
|      8| 
|      5| 
|      4| 
|      6| 
+-------+

Part 2: Counting with Spark SQL and DataFrames

Now, let’s count the number of times a particular word appears in the ‘word’ column. There are multiple ways to perform the counting, but some are much less efficient than others.

A naive approach would be to call `collect` on all of the elements and count them in the driver program. While this approach could work for small datasets, we want an approach that will work for any size dataset including terabyte- or petabyte-sized datasets. In addition, performing all of the work in the driver program is slower than performing it in parallel in the workers. For these reasons, we will use data parallel operations.

Using ‘groupBy’ and ‘count’

Using DataFrames, we can preform aggregations by grouping the data using the `groupBy` function on the DataFrame. Using `groupBy` returns a `GroupedData` object and we can use the functions available for `GroupedData` to aggregate the groups. For example, we can call `avg` or `count` on a `GroupedData` object to obtain the average of the values in the groups or the number of occurrences in the groups, respectively.

To find the counts of words, we group by the words and then use the `count` function to find the number of times that words occur.

wordCountsDF = (wordsDF
                 .groupBy('word').count())
wordCountsDF.show()

+--------+-----+ 
|    word|count| 
+--------+-----+ 
|tutorial|    1| 
|   spark|    2| 
|    look|    2| 
|  python|    1| 
+--------+-----+

You can see that without using alias(), the column gets simply the function name (e.g., “count” in this case).
Let’s also add some unit testing.

# Load in the testing code 
# If incorrect it will report back '1 test failed' for each failed test

from databricks_test_helper import Test

# TEST groupBy and count
Test.assertEquals(wordCountsDF.collect(), [('tutorial', 1), ('spark', 2), 
       ('look', 2), ('python', 1)], 'incorrect counts for wordCountsDF')

1 test passed.

Part 3: Finding unique words and a mean value

Unique words

Calculate the number of unique words in `wordsDF`.

from spark_notebook_helpers import printDataFrames

# This function returns all the DataFrames in the notebook and their 
# corresponding column names.
printDataFrames(True)

uniqueWordsCount = wordCountsDF.count()
print uniqueWordsCount

Out[2]: 4
# TEST Unique words
Test.assertEquals(uniqueWordsCount, 4, 'incorrect count of unique words')

1 test passed.

Means of groups using DataFrames

Find the mean number of occurrences of words in `wordCountsDF`.

We can use the `mean` GroupedData method to accomplish this. Note that when you use `groupBy` you don’t need to pass in any columns. A call without columns just prepares the DataFrame so that aggregation functions like `mean` can be applied.

averageCount = (wordCountsDF
                    .groupBy().mean('count')).collect()[0][0]
print averageCount
Out[3]: 1.5
# TEST Means of groups using DataFrames
Test.assertEquals(round(averageCount, 2), 1.5, 'incorrect value of 
                   averageCount')
1 test passed.

Part 4: Apply word count to a file

In this section we will finish developing our word count application. We’ll have to build the `wordCount` function, deal with real world problems like capitalization and punctuation, load in our data source, and compute the word count on the new data.

The ‘wordCount’ function

First, we define a function for word counting.
This function takes in a DataFrame that is a list of words like `wordsDF` and returns a DataFrame that has all of the words and their associated counts.

# the words count function
def wordCount(wordListDF):
  """Creates a DataFrame with word counts.

  Args:
     wordListDF (DataFrame of str): A DataFrame consisting of one string 
                                    column called 'word'.

  Returns:
     DataFrame of (str, int): A DataFrame containing 'word' and 'count' 
                              columns.
  """
  return (wordListDF
               .groupBy('word').count())
 
# apply the new function to the words DataFrame, it should get the same 
# result
wordCount(wordsDF).show()

+--------+-----+ 
|    word|count| 
+--------+-----+ 
|tutorial|    1| 
|   spark|    2| 
|    look|    2| 
|  python|    1| 
+--------+-----+

Capitalization and punctuation

Real world files are more complicated than the data we have been using until now. Some of the issues we have to address are:

+ Words should be counted independent of their capitialization (e.g., Spark and spark should be counted as the same word).
+ All punctuation should be removed.
+ Any leading or trailing spaces on a line should be removed.

We now define the function `removePunctuation` that converts all text to lower case, removes any punctuation, and removes leading and trailing spaces. We use the Python regexp_replace module to remove any text that is not a letter, number, or space and the `trim` and `lower` functions found in pyspark.sql.functions.

from pyspark.sql.functions import regexp_replace, trim, col, lower

def removePunctuation(column):
  """Removes punctuation, changes to lower case, and strips leading and 
     trailing spaces.

  Note:
    Only spaces, letters, and numbers should be retained. Other characters 
    should should be eliminated (e.g. it's becomes its). Leading and 
    trailing spaces should be removed after punctuation is removed.

  Args:
    column (Column): A Column containing a sentence.

  Returns:
    Column: A Column named 'sentence' with clean-up operations applied.
  """
  return trim(lower(regexp_replace(column, '([^\s\w_]|_)+', '')))
                .alias('sentence')


sentenceDF = sqlContext.createDataFrame([('Hi, you',),
                               (' Look! No under_score!',),
                               (' * Remove punctuation then spaces * ',)], 
                               ['sentence'])
  # display first original sentence
sentenceDF.show(truncate=False)
 # then sentence with punctuation removed
(sentenceDF
   .select(removePunctuation(col('sentence')))
   .show(truncate=False))

+------------------------------------------+ 
|sentence                                  | 
+------------------------------------------+ 
|Hi, you                                   | 
| Look! No under_score!                    | 
| * Remove punctuation then spaces *       | 
+------------------------------------------+ 

+------------------------------+ 
|sentence                      | 
+------------------------------+ 
|hi you                        | 
|look no underscore            | 
|remove punctuation then spaces| 
+------------------------------+

# TEST Capitalization and punctuation
testPunctDF = sqlContext.createDataFrame([(" The Elephant's 4 cats. ",)])
Test.assertEquals( testPunctDF.select( removePunctuation(col('_1')))
                    .first()[0],  'the elephants 4 cats',
                    'incorrect definition for removePunctuation function')

1 test passed.

Load a text file

For the next part, we will use the Complete Works of William Shakespeare from Project Gutenberg. To convert a text file into a DataFrame, we use the `sqlContext.read.text()` method. We also apply the recently defined `removePunctuation()` function using a `select()` transformation to strip out the punctuation and change all text to lower case. Since the file is large we use `show(15)`, so that we only print 15 lines.

fileName = "dbfs:/datasets/shakespeare.txt"

shakespeareDF = sqlContext.read.text(fileName)
                          .select(removePunctuation(col('value')))
shakespeareDF.show(15, truncate=False)

+-------------------------------------------------+ 
|sentence                                         | 
+-------------------------------------------------+ 
|1609                                             | 
|                                                 | 
|the sonnets                                      | 
|                                                 | 
|by william shakespeare                           | 
|                                                 | 
|                                                 | 
|                                                 | 
|1                                                | 
|from fairest creatures we desire increase        | 
|that thereby beautys rose might never die        | 
|but as the riper should by time decease          | 
|his tender heir might bear his memory            | 
|but thou contracted to thine own bright eyes     | 
|feedst thy lights flame with selfsubstantial fuel| 
+-------------------------------------------------+ 
only showing top 15 rows

Words from lines

Before we can use the `wordcount()` function, we have to address two issues with the format of the DataFrame:
+ The first issue is that that we need to split each line by its spaces.
+ The second issue is we need to filter out empty lines or words.

We apply a transformation that will split each ‘sentence’ in the DataFrame by its spaces, and then transform from a DataFrame that contains lists of words into a DataFrame with each word in its own row. To accomplish these two tasks you can use the `split` and `explode` functions found in pyspark.sql.functions.

Once we have a DataFrame with one word per row we can apply the DataFrame operation `where` to remove the rows that contain ”.

from pyspark.sql.functions import split, explode

shakeWordsSplitDF = (shakespeareDF
  .select(split(shakespeareDF.sentence, '\s+').alias('split')))
shakeWordsSingleDF = (shakeWordsSplitDF
  .select(explode(shakeWordsSplitDF.split).alias('word')))
shakeWordsDF = shakeWordsSingleDF.where(shakeWordsSingleDF.word <> '')
shakeWordsDF.show()
shakeWordsDFCount = shakeWordsDF.count()
print shakeWordsDFCount

+-----------+ 
| word      | 
+-----------+ 
| 1609      | 
| the       | 
| sonnets   | 
| by        | 
| william   | 
|shakespeare| 
| 1         | 
| from      | 
| fairest   | 
| creatures | 
| we        | 
| desire    | 
| increase  | 
| that      | 
| thereby   | 
| beautys   | 
| rose      | 
| might     | 
| never     | 
| die       | 
+-----------+ 
only showing top 20 rows 
882996
# TEST Remove empty elements
Test.assertEquals(shakeWordsDF.count(), 882996, 
                  'incorrect value for shakeWordCount')
Test.assertEquals(shakeWordsDF.columns, ['word'], 
                  "shakeWordsDF should only contain the Column 'word'")
1 test passed. 
1 test passed.

Count the words

We now have a DataFrame that is only words. Next, let’s apply the `wordCount()` function to produce a list of word counts. We can view the first 20 words by using the `show()` action; however, we’d like to see the words in descending order of count, so we’ll need to apply the `orderBy` DataFrame method to first sort the DataFrame that is returned from `wordCount()`.

You’ll notice that many of the words are common English words. These are called stopwords. We will see how to eliminate them from the results.

from pyspark.sql.functions import desc

WordsAndCountsDF = wordCount(shakeWordsDF)
topWordsAndCountsDF = WordsAndCountsDF.orderBy("count", ascending=0)

topWordsAndCountsDF.show()

+----+-----+ 
|word|count| 
+----+-----+ 
| the|27361| 
| and|26028| 
|   i|20681| 
|  to|19150| 
|  of|17463| 
|   a|14593| 
| you|13615| 
|  my|12481| 
|  in|10956| 
|that|10890| 
|  is| 9134| 
| not| 8497| 
|with| 7771| 
|  me| 7769| 
|  it| 7678| 
| for| 7558| 
| his| 6857| 
|  be| 6857| 
|your| 6655| 
|this| 6602| 
+----+-----+ 
only showing top 20 rows 
# TEST Count the words
Test.assertEquals(topWordsAndCountsDF.take(15),
 [(u'the', 27361), (u'and', 26028), (u'i', 20681), (u'to', 19150), 
 (u'of', 17463), (u'a', 14593), (u'you', 13615), (u'my', 12481), 
 (u'in', 10956), (u'that', 10890), (u'is', 9134), (u'not', 8497), 
 (u'with', 7771), (u'me', 7769), (u'it', 7678)],
 'incorrect value for top15WordsAndCountsDF')

1 test passed.

Removing stopwords

Stopwords are common (English) words that do not contribute much to the content or meaning of a document (e.g., “the”, “a”, “is”, “to”, etc.). Stopwords add noise to bag-of-words comparisons, so they are usually excluded.
Using the included file “stopwords.txt”, implement `tokenize`, an improved tokenizer that does not emit stopwords.

In Python, we can test membership in a set as follows:

“`
my_set = set([‘a’, ‘b’, ‘c’])
‘a’ in my_set # returns True
‘d’ in my_set # returns False
‘a’ not in my_set # returns False
“`
Within `tokenize()`, first tokenize the string using `simpleTokenize()`. Then, remove stopwords. To remove stop words, consider using a loop, a Python list comprehension, or the built-in Python filter() function.

import os

data_dir = 'datasets'
STOPWORDS_PATH = 'stopwords.txt'

stopfile = os.path.join(data_dir, STOPWORDS_PATH)
stopwords = set(sc.textFile(stopfile).collect())
print 'These are the stopwords: %s' % stopwords

type(stopwords)
Out[4]: set

We first test the approach on a simple string.

import re

quickbrownfox = 'A quick brown fox jumps over the lazy dog.'
split_regex = r'\W+'

def simpleTokenise(string):
  """ A simple implementation of input string tokenization
  Args:
    string (str): input string
  Returns:
    list: a list of tokens
  """
  return filter(None, re.split(split_regex, string.lower()))

print simpleTokenise(quickbrownfox) 
    # Should give ['a', 'quick', 'brown', ... ]

Out[5]:['a', 'quick', 'brown', 'fox', 'jumps', 'over', 'the', 'lazy', 'dog']
def removeStopWords(listOfTokens):
  return [token for token in listOfTokens if token not in stopwords]

def tokeniseAndRemoveStopwords(string):
  """ An implementation of input string tokenization that excludes stopwords
  Args:
    string (str): input string
  Returns:
    list: a list of tokens without stopwords
  """
  tmpLista = simpleTokenise(string)
  return removeStopWords(tmpLista)

print tokeniseAndRemoveStopwords(quickbrownfox) 
    # Should give ['quick', 'brown', ... ]
Out[6]: ['quick', 'brown', 'fox', 'jumps', 'lazy', 'dog']

We use the User Defined Function (UDF) to remove the stopwords.

from pyspark.sql.functions import UserDefinedFunction

from pyspark.sql.types import *

removeStopWords_udf = udf(removeStopWords, ArrayType(StringType()))

shakeWordsNoStopDF = (shakeWordsSplitDF.select(removeStopWords_udf("split")
                                       .alias('wordsNoStop')))

shakeWordsNoStopDF.show()

+--------------------+ 
| wordsNoStop        | 
+--------------------+ 
| [1609]             | 
| []                 | 
| [sonnets]          | 
| []                 | 
|[william, shakesp...| 
| []                 | 
| []                 | 
| []                 | 
| [1]                | 
|[fairest, creatur...| 
|[thereby, beautys...| 
|[riper, time, dec...| 
|[tender, heir, mi...| 
|[thou, contracted...| 
|[feedst, thy, lig...| 
|[making, famine, ...| 
|[thy, self, thy, ...| 
|[thou, art, world...| 
|[herald, gaudy, s...| 
|[within, thine, b...| 
+--------------------+ 
only showing top 20 rows 
shakeWordsSingleDF = (shakeWordsNoStopDF
       .select(explode(shakeWordsNoStopDF.wordsNoStop).alias('word')))
shakeWordsDF = shakeWordsSingleDF.where(shakeWordsSingleDF.word <> '')
shakeWordsDF.show()

print shakeWordsDF.count()
+-----------+
| word      |
+-----------+
| 1609      |
| sonnets   |
| william   |
|shakespeare|
| 1         |
| fairest   |
| creatures |
| desire    |
| increase  |
| thereby   |
| beautys   |
| rose      |
| might     |
| never     |
| die       |
| riper     |
| time      |
| decease   |
| tender    |
| heir      |
+-----------+
only showing top 20 rows

474432
WordsAndCountsDF = wordCount(shakeWordsDF)
topWordsAndCountsDF = WordsAndCountsDF.orderBy("count", ascending=0)

topWordsAndCountsDF.show()

+-----+-----+ 
| word|count| 
+-----+-----+ 
| thou| 5485| 
|  thy| 4032| 
|shall| 3591| 
| thee| 3178| 
| lord| 3059| 
| king| 2861| 
| good| 2812| 
|  sir| 2754| 
|    o| 2607| 
| come| 2507| 
| well| 2462| 
|would| 2293| 
|  let| 2099| 
|enter| 2098| 
| love| 2053| 
|  ill| 1972| 
| hath| 1941| 
|  man| 1835| 
|  one| 1779| 
|   go| 1733| 
+-----+-----+ 
only showing top 20 rows