Parts-of-Speech Tagging (POS)
In the first part we have seen how to build a simple tagger using the probability that a word belongs to a specific tag.
Part-of-speech refers to the category of words (Noun, Verb, Adjective…) in the language.
The part-of-speech (POS) tagging is the process of assigning a part-of-speech tag to each word in an input text.
Tagging is difficult because some words can represent more than one part of speech at different times. They are ambiguous.
We have seen that a simple tagger based on occurence probabilities reached a good accuracy but failed at correctly tagging a sentence like “I work in Shanghai” because it tagged work as a noun and not – in this case – as a verb.
The goal
We will use now a different approach based on the Viterbi algorithm and the Hidden Markov model, that hopefully will improve the tagging accuracy.
The code and the datasets are available also on my GitHub repository.
Data Sources
This notebook will use the same tagged data sets used when building the first tagger, collected from the Wall Street Journal (WSJ):
- One data set (WSJ-2_21.pos) will be used for training.
- The other (WSJ-24.pos) for testing.
- The tagged training data has been preprocessed to form a vocabulary (hmm_vocab.txt).
The words in the vocabulary are words from the training set that were used two or more times.
The vocabulary is augmented with a set of ‘unknown word tokens’.
They are the same as the previous notebook, and pre-processing them is also the same so I will be faster here. Please refer to the first part for more details.
Training data
# load in the training corpus
with open("../datasets/WSJ_02-21.pos", 'r') as f:
training_corpus = f.readlines() # list
print("A few items of the training corpus list: ")
print(training_corpus[0:5])
A few items of the training corpus list: ['In\tIN\n', 'an\tDT\n', 'Oct.\tNNP\n', '19\tCD\n', 'review\tNN\n']
As you can see, the training_corpus is a list with all words extracted from English articles, together with their POS tag.
Almost one million of them!
Testing data
# load in the testing corpus
with open("../datasets/WSJ_24.pos", 'r') as f:
testing_corpus = f.readlines() # list
print("A sample of the testing corpus")
print(testing_corpus[0:10])
A sample of the testing corpus ['The\tDT\n', 'economy\tNN\n', "'s\tPOS\n", 'temperature\tNN\n', 'will\tMD\n', 'be\tVB\n', 'taken\tVBN\n', 'from\tIN\n', 'several\tJJ\n', 'vantage\tNN\n']
len(testing_corpus)
34199
The Testing Corpus is similar, just a subset of the Training one.
It will be used at the end for calculating the model’s accuracy.
Testing words and vocabulary
The testing set (WSJ-24.pos) has also been preprocessed to remove the tags to form test_words.txt. This is read in to create y
, a helper list for the algorithm later on.
with open("../datasets/test.words", 'r') as f:
testing_words = f.readlines() # list
print(testing_words[0:20])
['The\n', 'economy\n', "'s\n", 'temperature\n', 'will\n', 'be\n', 'taken\n', 'from\n', 'several\n', 'vantage\n', 'points\n', 'this\n', 'week\n', ',\n', 'with\n', 'readings\n', 'on\n', 'trade\n', ',\n', 'output\n']
The vocabulary is an indexed list of words; almost 24K of them.
The unique words have been extracted from the training corpus.
# read the vocabulary data, split by each line of text, and save the list
with open("../datasets/hmm_vocab.txt", 'r') as f:
voc_l = f.read().split('\n') # list
# vocab: dictionary that has the index of the corresponding words
vocabulary = {}
# Get the index of the corresponding words.
for i, word in enumerate(sorted(voc_l)):
vocabulary[word] = i
print("A few items at the end of the vocabulary list")
print(voc_l[-25:])
A few items at the end of the vocabulary list ['yields', 'you', 'young', 'younger', 'youngest', 'youngsters', 'your', 'yourself', 'youth', 'youthful', 'yuppie', 'yuppies', 'zero', 'zero-coupon', 'zeroing', 'zeros', 'zinc', 'zip', 'zombie', 'zone', 'zones', 'zoning', '{', '}', '']
Create the helper dictionaries
Before we start predicting the tags of each word, we will need to compute – from the training corpus – a few dictionaries that will help to generate the tables.
Again, this is the same as done in the first part, so I will go quickly through them.
- the
emissionCounts
dictionary will be used to compute the probability of a word given its tag. - the
transitionCounts
dictionary which computes the number of times each tag happened next to another tag. - the
tagCounts
dictionary computes the number of times each tag appeared.
import string
# Punctuation characters
punct = set(string.punctuation)
# Morphology rules used to assign unknown word tokens
noun_suffix = ["action", "age", "ance", "cy", "dom", "ee", "ence", "er", "hood", "ion", "ism", "ist", "ity", "ling", "ment", "ness", "or", "ry", "scape", "ship", "ty"]
verb_suffix = ["ate", "ify", "ise", "ize"]
adj_suffix = ["able", "ese", "ful", "i", "ian", "ible", "ic", "ish", "ive", "less", "ly", "ous"]
adv_suffix = ["ward", "wards", "wise"]
def assign_unk(tok):
"""
Assign unknown word tokens
"""
# Digits
if any(char.isdigit() for char in tok):
return "--unk_digit--"
# Punctuation
elif any(char in punct for char in tok):
return "--unk_punct--"
# Upper-case
elif any(char.isupper() for char in tok):
return "--unk_upper--"
# Nouns
elif any(tok.endswith(suffix) for suffix in noun_suffix):
return "--unk_noun--"
# Verbs
elif any(tok.endswith(suffix) for suffix in verb_suffix):
return "--unk_verb--"
# Adjectives
elif any(tok.endswith(suffix) for suffix in adj_suffix):
return "--unk_adj--"
# Adverbs
elif any(tok.endswith(suffix) for suffix in adv_suffix):
return "--unk_adv--"
return "--unk--"
# Helper: substitues word not in the vocabulary with "unknown"
def get_word_tag(line, vocab):
if not line.split():
word = "--n--"
tag = "--s--"
return word, tag
else:
word, tag = line.split()
if word not in vocab:
# Handle unknown words
word = assign_unk(word)
return word, tag
return None
from collections import defaultdict
def create_dictionaries(corpus, vocab):
"""
Input:
corpus: a corpus where each line has a word followed by its tag.
vocab: a dictionary where keys are words in vocabulary and value is an index
Output:
emission_counts: a dictionary where the keys are (tag, word) and the values are the counts
transition_counts: a dictionary where the keys are (prev_tag, tag) and the values are the counts
tag_counts: a dictionary where the keys are the tags and the values are the counts
"""
# initialize the dictionaries using defaultdict
emission_counts = defaultdict(int)
transition_counts = defaultdict(int)
tag_counts = defaultdict(int)
# Initialize "prev_tag" (previous tag) with the start state, denoted by '--s--'
prev_tag = '--s--'
# use 'i' to track the line number in the corpus
i = 0
# Each item in the training corpus contains a word and its POS tag
# Go through each word and its tag in the training corpus
for word_tag in corpus:
# Increment the word_tag count
i += 1
# get the word and tag using the get_word_tag helper function
word, tag = get_word_tag(word_tag, vocab)
# Increment the transition count for the previous word and tag
transition_counts[(prev_tag, tag)] += 1
# Increment the emission count for the tag and word
emission_counts[(tag, word)] += 1
# Increment the tag count
tag_counts[tag] += 1
# Set the previous tag to this tag (for the next iteration of the loop)
prev_tag = tag
return emission_counts, transition_counts, tag_counts
emissionCounts, transitionCounts, tagCounts = create_dictionaries(training_corpus, vocabulary)
# get all the POS states
tags = sorted(tagCounts.keys())
print(f"Number of POS tags: {len(tags)}")
print("View these POS tags")
print(tags)
Number of POS tags: 46 View these POS tags ['#', '$', "''", '(', ')', ',', '--s--', '.', ':', 'CC', 'CD', 'DT', 'EX', 'FW', 'IN', 'JJ', 'JJR', 'JJS', 'LS', 'MD', 'NN', 'NNP', 'NNPS', 'NNS', 'PDT', 'POS', 'PRP', 'PRP$', 'RB', 'RBR', 'RBS', 'RP', 'SYM', 'TO', 'UH', 'VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ', 'WDT', 'WP', 'WP$', 'WRB', '``']
We got mapped a total of 46 POS tags which is great: every tag mentioned in the Penn Databank is here.
The ‘tags’ are the Parts-of-speech designations found in the training data.
- “NN” is noun, singular,
- ‘NNS’ is noun, plural.
- In addition, there are helpful tags like ‘–s–‘ which indicate a start of a sentence.
- You can get a more complete description at Penn Treebank II tag set.
We didn’t use the transition dictionary in the first model but we will be now.
print("transition examples: ")
for ex in list(transitionCounts.items())[:3]:
print(ex)
print()
transition examples: (('--s--', 'IN'), 5050) (('IN', 'DT'), 32364) (('DT', 'NNP'), 9044)
The transition dictionary shows how often we go from a tag to another, for example from DT (a determiner, an article such as ‘the’, or ‘a’) to a NNP (proper noun) is 9044 times
print("ambiguous word example: ")
for tup,cnt in emissionCounts.items():
if tup[1] == 'back': print (tup, cnt)
ambiguous word example: ('RB', 'back') 304 ('VB', 'back') 20 ('RP', 'back') 84 ('JJ', 'back') 25 ('NN', 'back') 29 ('VBP', 'back') 4
We can search all the tags used for a specific word (for example ‘back’) in the EmissionCounts dictionary and see that most of the times was tagged as RB (adverb) but not always, it can be also a verb! It’s ambiguous.
Markov Models for POS
Now we will build something more context specific. Concretely, we will be implementing a Hidden Markov Model (HMM) with a Viterbi decoder.
The HMM is one of the most commonly used algorithms in Natural Language Processing, and is a foundation to many deep learning techniques.
In addition to parts-of-speech tagging, HMM is used in speech recognition, speech synthesis, etc.
A Markov chain is basically a directed graph: a data structure consisting of a set of objects (called nodes, the circles in the image); in our case they are the PoS tags of our model that are connected together by edges (the uni-directional lines in the image).
The arrows from state q1 to q2 represents the transition probability to move from q1 to q2.
The probability of the next event only depends on the current events.

An example of a Markov chain could be a graph predicting if a day is sunny (S) or rainy (R), i.e. it has only two states. When the Markov chain is in state “R”, it has a 0.9 probability of staying there (means the next day will also rain) and a 0.1 chance of leaving for the “S” state (means the next day will instead be sunny). Likewise, “S” state has 0.9 probability of staying sunny and a 0.1 chance of transitioning to the “R” state.

A sentence is a sequence of words with associated parts of speech tags.
We can represent that sequence with a graph where the parts of speech tags (NN, VB, O) are events that can occur depicted by the nodes of the model graph. The weights on the arrows between the states define the probability of going from one tag to another:

Hidden Markov Models for POS
The Hidden Markov Model (HMM) is a statistical model in which the system being modeled is yes a Markov process – call it X – but with unobservable (“hidden”) states.
The goal is to learn about X by observing another process Y whose behavior “depends” on X.
Let’s see how the weather example above can have unobservable events.
Consider two friends, Alice and Bob, who live far apart from each other and who talk together daily over the telephone about what they did that day. Bob is only interested in three activities: walking in the park, shopping and cleaning his apartment.
The choice of what to do is determined exclusively by the weather on a given day.
Alice has no definite information about the weather but she knows general trends. Based on what Bob tells her he did each day, Alice tries to guess what the weather must have been like.
Alice believes that the weather operates as a discrete Markov chain.
There are two states: “Rainy” and “Sunny”, but she cannot observe them directly as they are “hidden” from her. On each day, there is a certain chance that Bob will perform one of the following activities, depending on the weather: “walk”, “shop”, or “clean”.
Since Bob tells Alice about his activities, those are the observations. The entire system is that of a hidden Markov model (HMM).
It’s the same when we are trying to tag the POS on a sentence: we cannot observe the POS of each word but we can know the transition probabilities of another model trained on a given dataset.
Describe Markov models with matrixes
The Markov model graph can be represented as a matrix with dimension n+1 by n, where n is the number of tags:
- each element a(i,j) represents the probability to transition from tag i to tag j
- when no previous state, we introduce an initial state π.
- The sum of all transition from a state should always be 1.
To describe a HMM you will need a transition matrix A (for example with the probabilities of rainy and sunny states) and also an emission matrix B.
The second matrix contains the probabilities of the observable states based on the unobservable: how likely Bob is to perform a certain activity on each day. If it is rainy, there is a 50% chance that he is cleaning his apartment; if it is sunny, there is a 60% chance that he is outside for a walk.
For POS tagging, the hidden Markov model have emission probabilities matrix B describe the transition from the hidden states to the observables (the words of your corpus).
Generating Matrices
Starting from the emissionCounts
, transitionCounts
and tagCounts
, we will now implement the Hidden Markov Model.
This will allow to quickly construct the
A
transition probabilities matrix.B
emission probabilities matrix.
Transition matrix A
To avoid division by zero since lot of entries in the transition matrix are zero, we apply smoothing to the probability formula.
The matrix A is computed with smoothing as follows:
(1)
- N is the total number of tags
is the count of the tuple (previous POS, current POS) in
transition_counts
dictionary.is the count of the previous POS in the
tag_counts
dictionary.- ɑ is a smoothing parameter.
The function below implements the create_transition_matrix
for all tags. It outputs a matrix that computes the above equation (1) for each cell in matrix A
.
Inputs are the transitionCounts and the tagCounts.
import numpy as np
def create_transition_matrix(alpha, tag_counts, transition_counts):
'''
Input:
alpha: number used for smoothing
tag_counts: a dictionary mapping each tag to its respective count
transition_counts: transition count for the previous word and tag
Output:
A: matrix of dimension (num_tags,num_tags)
'''
# Get a sorted list of unique POS tags; e.g. all_tags[20]is 'NN'
all_tags = sorted(tag_counts.keys())
# Get the number of unique POS tags, this will be n, the matrix size
num_tags = len(all_tags)
# Initialize the transition matrix 'A' (size is n by n) to zero
A = np.zeros((num_tags, num_tags))
# Get the unique transition tuples (previous POS, current POS)
trans_keys = set(transition_counts.keys())
# Go through each row of the transition matrix A
for i in range(num_tags):
# Go through each column of the transition matrix A
for j in range(num_tags):
# Initialize the count of the (prev POS, current POS) to zero
count = 0
# Define the tuple (prev POS, current POS)
# Get the tag at position i and tag at position j (from the all_tags list)
key = (all_tags[i], all_tags[j]) # this is a tuple (tag1, tag2)
# Check if the (prev POS, current POS) tuple
# exists in the transition counts dictionaory
if key in transition_counts:
# If yes, get count from the transition_counts dictionary
# for the (prev POS, current POS) tuple
# we know that the transition dictionary contains key=tuple(pos1,pos2) and value=probability
# e.g. (('DT', 'NNP'), 9044)
count = transition_counts[key]
# Get the count of the previous tag (index position i), how may times appeared in corpus
count_prev_tag = tag_counts[all_tags[i]]
# Formula (1)
# Apply smoothing using count of the tuple, alpha,
# count of previous tag, alpha, and number of total tags
A[i,j] = (count + alpha) / (count_prev_tag + alpha * num_tags)
return A
import pandas as pd
alpha = 0.001
A = create_transition_matrix(alpha, tagCounts, transitionCounts)
print("A subset of transition matrix A:")
print(pd.DataFrame(A[30:35,30:35], index=tags[30:35], columns = tags[30:35] ))
A subset of transition matrix A: RBS RP SYM TO UH RBS 2.217069e-06 2.217069e-06 2.217069e-06 0.008870 2.217069e-06 RP 3.756509e-07 7.516775e-04 3.756509e-07 0.051089 3.756509e-07 SYM 1.722772e-05 1.722772e-05 1.722772e-05 0.000017 1.722772e-05 TO 4.477336e-05 4.472863e-08 4.472863e-08 0.000090 4.477336e-05 UH 1.030439e-05 1.030439e-05 1.030439e-05 0.061837 3.092348e-02
That is an example of what the A
transition matrix would look like (it is simplified to 5 tags for viewing. It is 46×46 in reality).
Each cell gives the probability to go from one part of speech to another.
- In other words, there is a 4.47e-8 chance of going from parts-of-speech
TO
toRP
. - The sum of each row has to be equal to 1, because we assume that the next POS tag must be one of the available columns in the table.
The emission probabilities matrix B
Now we will create the B
transition matrix which computes the emission probability by counting the co-occurrences of a part of speech tag with a specific word.
We will use smoothing as defined below:
(2)
is the number of times $word_i$ was associated with
in the training data (stored in
emission_counts
dictionary).is the number of times
was in the training data (stored in
tag_counts
dictionary).- N is the number of words in the vocabulary
- ɑ is a smoothing parameter.
The matrix B
is of dimension (num_tags, N), where num_tags is the number of possible parts-of-speech tags.
Finally the function below implements the create_emission_matrix
that computes the B
emission probabilities matrix. The function takes in $\alpha$, the smoothing parameter, tagCounts
, which is the dictionary mapping each tag to its respective count and – this time – the emissionCounts
dictionary where the keys are (tag, word) and the values are the counts. It outputs a matrix that computes equation (2) above for each cell in matrix B
.
def create_emission_matrix(alpha, tag_counts, emission_counts, vocab):
'''
Input:
alpha: tuning parameter used in smoothing
tag_counts: a dictionary mapping each tag to its respective count
emission_counts: a dictionary where the keys are (tag, word) and the values are the counts
vocab: a dictionary where keys are words in vocabulary and value is an index
Output:
B: a matrix of dimension (num_tags, len(vocab))
'''
# get the number of POS tag
num_tags = len(tag_counts) # this is the number of rows of the B matrix
# Get a list of all POS tags
all_tags = sorted(tag_counts.keys())
# Get the total number of unique words in the vocabulary
num_words = len(vocab) # this is the number of columns of the B matrix
# Initialize the emission matrix B with places for
# tags in the rows and words in the columns
B = np.zeros((num_tags, num_words))
# Get a set of all (POS, word) tuples
# from the keys of the emission_counts dictionary
# e.g. ('NN', 'dog')
emis_keys = set(list(emission_counts.keys()))
# Go through each row (POS tags)
for i in range(num_tags):
# Go through each column (words)
for j in range(num_words):
# Initialize the emission count for the (POS tag, word) to zero
count = 0
# Define the (POS tag, word) tuple for this row and column
key = (all_tags[i], vocab[j]) # this is a tuple (tag, word) as ('NN', 'dog')
# check if the (POS tag, word) tuple exists as a key in emission counts
if key in emission_counts.keys():
# Get the count of (POS tag, word) from the emission_counts
# we know that the emission dictionary contains key=tuple(pos,word) and value=count
# e.g. ('NN','dog') = 10 (appears ten times in corpus)
count = emission_counts[key]
# Get the count of the POS tag (how may times appeared in corpus)
count_tag = tag_counts[all_tags[i]]
# Apply smoothing and store the smoothed value
# into the emission matrix B for this row and column
B[i,j] = (count + alpha) / (count_tag+ alpha*num_words)
return B
# creating the emission probability matrix. this takes a few minutes to run.
B = create_emission_matrix(alpha, tagCounts, emissionCounts, list(vocabulary))
# Try viewing emissions for a few words in a sample dataframe
cidx = ['725','well','dog', 'dogs', 'work']
# Get the integer ID for each word
cols = [vocabulary[a] for a in cidx]
# Choose POS tags to show in a sample dataframe
rvals =['CD','NN','NNS', 'VB','RB','RP']
# For each POS tag, get the row number from the 'states' list
rows = [tags.index(a) for a in rvals]
# Get the emissions for the sample of words, and the sample of POS tags
print(pd.DataFrame(B[np.ix_(rows,cols)], index=rvals, columns = cidx ))
725 well dog dogs work CD 8.201296e-05 2.732854e-08 2.732854e-08 2.732854e-08 2.732854e-08 NN 7.521128e-09 6.769767e-05 7.521880e-05 7.521128e-09 2.015670e-03 NNS 1.670013e-08 1.670013e-08 1.670013e-08 1.169176e-04 1.670013e-08 VB 3.779036e-08 3.779036e-08 3.779036e-08 3.779036e-08 4.345929e-03 RB 3.226454e-08 1.319623e-02 3.226454e-08 3.226454e-08 3.226454e-08 RP 3.723317e-07 3.723317e-07 3.723317e-07 3.723317e-07 3.723317e-07
That is an example of the matrix, only a subset of tags and words are shown.
E.g. the probability for the tuple (NN, dog) is 7.5e-05, much higher than for (NNS, dog).
But note that ‘work’ has similar probabilities to be a singular noun (NN) or a verb (VB)
Pre-process the corpus
Finally, the corpus needs to be pre-processed (as in the first part) to tag the unknown words and mark the end of sentence:
def preprocessCorpus(y):
orig = []
y_prepr = []
# we already read the words from testing dataset into 'y'
for cnt, word in enumerate(y):
# End of sentence
if not word.split():
orig.append(word.strip())
word = "--n--"
y_prepr.append(word)
continue
# Handle unknown words
elif word.strip() not in vocabulary:
orig.append(word.strip())
word = assign_unk(word)
y_prepr.append(word)
continue
else:
orig.append(word.strip())
y_prepr.append(word.strip())
assert(len(orig) == len(y)) # just to be sure
assert(len(y_prepr) == len(y))
return y_prepr
tw_prepr = preprocessCorpus(testing_words)
print('The length of the preprocessed test corpus: ', len(tw_prepr))
print('This is a sample of the test_corpus: ')
print(tw_prepr[0:20])
The length of the preprocessed test corpus: 34199 This is a sample of the test_corpus: ['The', 'economy', "'s", 'temperature', 'will', 'be', 'taken', 'from', 'several', '--unk--', 'points', 'this', 'week', ',', 'with', 'readings', 'on', 'trade', ',', 'output']
Viterbi Algorithm
The Viterbi algorithm is a graph algorithm to find the most likely sequence of hidden states (or parts of speech tags in our case) — called the Viterbi path — that have the highest a posteriori probability for a sequence of observed events.
The Viterbi algorithm is named after Andrew Viterbi, who proposed it in 1967 as a decoding algorithm for convolutional codes over noisy digital communication links.
It was introduced to Natural Language Processing as a method of part-of-speech tagging as early as 1987.
I cannot make a better explanation than what is on Wikipedia, completed with pseudocode and example in Python.
Now we will implement the Viterbi algorithm which makes use of dynamic programming. Specifically, we will use the two matrices A
and B
to compute the Viterbi algorithm in three main steps:
- Initialization – initialize the
best_paths
andbest_probabilities
matrices that we will be populating infeed_forward
. - Feed forward – At each step, calculate the probability of each path happening and the best paths up to that point.
- Feed backward: This allows to find the best path with the highest probabilities.
Initialization
Let’s start by initializing two matrices of the same dimension.
- best_probs: Each cell contains the probability of going from one POS tag to a word in the corpus.
(The initialization of matrix C tell the probability of every word belongs to a certain part of speech) - best_paths: A matrix that helps you trace through the best possible path in the corpus.
(in D matrix, we store the labels that represent the different states we are traversing when finding the most likely sequence of parts of speech tags for the given sequence of words W1 all the way to Wk)
Here is how to initialize best_probs
:
- The probability of the best path going from the start index to a given POS tag indexed by integer i is denoted by best_probs
.
- This is estimated as the probability that the start tag transitions to the POS denoted by index i:
AND that the POS tag denoted by i emits the first word of the given corpus, which is
.
- Note that vocab[corpus[0]] refers to the first word of the corpus (the word at position 0 of the corpus).
- vocab is a dictionary that returns the unique integer that refers to that particular word.
Conceptually, it looks like this:
In order to avoid multiplying and storing small values on the computer, we’ll take the log of the product, which becomes the sum of two logs:
best_probs[i,0] = log(A[s_{idx}, i]) + log(B[i, vocab[corpus[0]]
import math
def initialize(states, tag_counts, A, B, corpus, vocab):
'''
Scope: initializes the `best_probs` and the `best_paths` matrix
Input:
states: a list of all possible parts-of-speech
tag_counts: a dictionary mapping each tag to its respective count
A: Transition Matrix of dimension (num_tags, num_tags)
B: Emission Matrix of dimension (num_tags, len(vocab))
corpus: a sequence of words whose POS is to be identified in a list
vocab: a dictionary where keys are words in vocabulary and value is an index
Output:
best_probs: matrix of dimension (num_tags, len(corpus)) of floats
best_paths: matrix of dimension (num_tags, len(corpus)) of integers
'''
# Get the total number of unique POS tags
num_tags = len(tag_counts)
# Initialize best_probs matrix to zero
# POS tags as the rows, number of words in the corpus as the columns
best_probs = np.zeros((num_tags, len(corpus)))
# Initialize best_paths matrix to zero
# POS tags as the rows, number of words in the corpus as columns
# the two matrices have the same size
best_paths = np.zeros((num_tags, len(corpus)), dtype=int)
# Define the start token
s_idx = states.index("--s--")
# Go through each of the POS tags (matrix rows)
for i in range(num_tags):
# Handle the special case when the transition from start token to POS tag i is zero
# Column zero of `best_probs` is initialized with the assumption that the first word
# of the corpus was preceded by a start token ("--s--").
# This allows to reference the A matrix for the transition probability
if A[s_idx,i] == 0:
# Initialize best_probs at POS tag 'i', column 0, to negative infinity (log 0)
best_probs[i,0] = float('-inf')
# For all other cases when transition from start token to POS tag i is non-zero:
else:
# Initialize best_probs at POS tag 'i', column 0
# Check the formula above
best_probs[i,0] = math.log(A[s_idx,i]) + math.log(B[i,vocab[corpus[0]]] )
return best_probs, best_paths
best_probs, best_paths = initialize(tags, tagCounts, A, B, tw_prepr, vocabulary)
Viterbi Forward
Now we will implement the viterbi_forward
segment. In other words, we will populate the best_probs
and best_paths
matrices:
- Walk forward through the corpus.
- For each word, compute a probability for each possible tag.
- This will include the path up to that (word,tag) combination.
The formula to compute the probability and path for the word in the corpus, the prior word i-1 in the corpus, current POS tag j, and previous POS tag k is:
where is the word in the corpus at index i, and vocab is the dictionary that gets the unique integer that represents a given word.
path = k, where k is the integer representing the previous POS tag.
def viterbi_forward(A, B, test_corpus, best_probs, best_paths, vocab):
'''
Scope:
store the best_path and best_prob for every possible tag for each word
in the matrices `best_probs` and `best_tags`
Input:
A, B: The transiton and emission matrices respectively
test_corpus: a list containing a preprocessed corpus
best_probs: an initialized matrix of dimension (num_tags, len(corpus))
best_paths: an initialized matrix of dimension (num_tags, len(corpus))
vocab: a dictionary where keys are words in vocabulary and value is an index
Output:
best_probs: a completed matrix of dimension (num_tags, len(corpus))
best_paths: a completed matrix of dimension (num_tags, len(corpus))
'''
# Get the number of unique POS tags (which is the num of rows in best_probs)
num_tags = best_probs.shape[0]
# Go through every word in the corpus starting from word 1
# Recall that word 0 was specially initialized in `initialize()`
for i in range(1, len(test_corpus)):
# For PROGRESS: Print number of words processed, every 5000 words
if i % 5000 == 0:
print("Words processed: {:>8}".format(i))
# For each unique POS tag that the current word can be
for j in range(num_tags):
# Initialize best_prob for word i to negative infinity
best_prob_i = float("-inf")
# Initialize best_path for current word i to None
best_path_i = None
# For each POS tag that the previous word can be:
for k in range(num_tags):
# compute the probability that the previous word had a given POS tag,
# that the current word has a given POS tag,
# and that the POS tag would emit this current word.
# Probability =
# best probs of POS tag k, previous word i-1 +
# log(prob of transition from POS k to POS j) +
# log(prob that emission of POS j is word i)
prob = best_probs[k,i-1] + math.log(A[k,j]) + math.log(B[j,vocab[test_corpus[i]]])
# retain the highest probability computed for the current word
if prob > best_prob_i:
best_prob_i = prob
# set best_paths to the index 'k', keep track of the POS tag of the previous word
# that is part of the best path.
# K is representing the POS tag of the previous word which produced the highest probability
best_path_i = k
# Save the best probability for the
# given current word's POS tag
# and the position of the current word inside the corpus
best_probs[j,i] = best_prob_i
# Save the unique integer ID of the previous POS tag
# into best_paths matrix, for the POS tag of the current word
# and the position of the current word inside the corpus
best_paths[j,i] = best_path_i
return best_probs, best_paths
Run the `viterbi_forward` function to fill in the `best_probs` and `best_paths` matrices.
Note that this will take a few minutes to run. There are about 30,000 words to process.
# this will take a few minutes to run => processes ~ 30,000 words
best_probs, best_paths = viterbi_forward(A, B, tw_prepr, best_probs, best_paths, vocabulary)
Words processed: 5000 Words processed: 10000 Words processed: 15000 Words processed: 20000 Words processed: 25000 Words processed: 30000
Viterbi backward
The Viterbi backward algorithm gets the predictions of the POS tags for each word in the corpus using the best_paths
and the best_probs
matrices.
The backward pass help retrieve the most likely sequence of parts of speech tags for your given sequence of words.
- First it calculates the index of the entry with the highest probability in the last column of best_probs.
It represents the last hidden state we traversed when we observe the word - This index is used to traverse back through the best_paths matrix to reconstruct the sequence of parts of speech tags
def viterbi_backward(best_probs, best_paths, corpus, states):
'''
Scope:
This function retrieves the best path.
Input:
corpus: a list containing a preprocessed corpus
best_probs: an initialized matrix of dimension (num_tags, len(corpus))
best_paths: an initialized matrix of dimension (num_tags, len(corpus))
states: dictionary with each POS tag and its index
Output:
pred: returns a list of predicted POS tags for each word in the corpus.
'''
#
# Initialization
#
# Get the number of words in the corpus
# which is also the number of columns in best_probs, best_paths
m = best_paths.shape[1]
# Get the number of unique POS tags
num_tags = best_probs.shape[0]
# Initialize the best probability for the last word
best_prob_for_last_word = float('-inf')
# Initialize array z, same length as the corpus
z = [None] * m
# Initialize pred array, same length as corpus
pred = [None] * m
#
# Step 1 : Loop through all the rows (POS tags) in the last entry of `best_probs`
# and find the row (POS tag) with the maximum value. This is the best POS tag for the last word
#
# Go through each POS tag for the *last* word (last column of best_probs)
# in order to find the row (POS tag integer ID)
# with highest probability for the last word
for k in range(num_tags):
# If the probability of POS tag at row k
# is better than the previosly best probability for the last word:
if best_probs[k, m - 1] > best_prob_for_last_word:
# then store the new best probability for the last word
best_prob_for_last_word = best_probs[k, m - 1]
# and store the unique integer ID of the POS tag
# which is also the row number in best_probs
z[m - 1] = k
# Convert the last word's predicted POS tag
# from its unique integer ID into the string representation
# using the 'states' dictionary
# Store this in the 'pred' array for the last word
pred[m - 1] = states[z[m - 1]]
#
# Step 2: traverse back through the best_paths matrix and retrieve all POS tags
#
# Find all the best POS tags by walking backward through the best_paths
# From the last word in the corpus to the first word in the corpus
for i in range(m-1, -1, -1):
# Retrieve the unique integer ID of
# the POS tag for the word at position 'i' in the corpus
pos_tag_for_word_i = z[i]
# In best_paths, go to the row representing the POS tag of word i
# and the column representing the word's position in the corpus
# to retrieve the predicted POS for the word at position i-1 in the corpus
z[i - 1] = best_paths[pos_tag_for_word_i,i]
# Get the previous word's POS tag in string form
# again using the 'states' dictionary,
# where the key is the unique integer ID of the POS tag,
# and the value is the string representation of that POS tag
pred[i - 1] = states[z[i - 1]]
return pred
# Run and test the function
pred = viterbi_backward(best_probs, best_paths, tw_prepr, tags)
m=len(pred)
print(f"The prediction for {tw_prepr[-7:m-1]} is: \n {pred[-7:m-1]} \n")
print(f"The prediction for {tw_prepr[0:7]} is: \n {pred[0:7]}")
The prediction for ['see', 'them', 'here', 'with', 'us', '.'] is: ['VB', 'PRP', 'RB', 'IN', 'PRP', '.'] The prediction for ['The', 'economy', "'s", 'temperature', 'will', 'be', 'taken'] is: ['DT', 'NN', 'POS', 'NN', 'MD', 'VB', 'VBN']
Predict on single words or sentences
Let’s see a couple of examples:
In general the list pred just retrieved from the Viterbi algorith running on the training corpus can be used to check a word POS, knowing the word index:
print('The third word is:', tw_prepr[3])
print('Its prediction for the POS tag is:', pred[3])
print('The corresponding true label is: ', testing_corpus[3])
The third word is: temperature Its prediction for the POS tag is: NN The corresponding true label is: temperature NN
Obviously you don’t know the word index in advance, so it can be retrieved from the training word preprocessed dictionary (tw_prepr).
Let’s do a quick helper function:
def getWordPOS(word):
if word in tw_prepr:
return pred[tw_prepr.index(word)]
else:
return "unknown"
getWordPOS("cat")
'unknown'
getWordPOS("work")
'NN'
Same can be done for an entire sentence. The following helper function will split a sentence (string) into words and run the Viterbi algorithm on them:
def getSentencePOS(sentence):
"""
Input:
sentence: the sentence to be tagged. String. Needs a marker at the end (newline).
Output:
a list of tuples: each word with its most probable tag
"""
words = sentence.split(" ") # split into words (a list)
corpus = preprocessCorpus(words)
best_probs, best_paths = initialize(tags, tagCounts, A, B, corpus, vocabulary)
best_probs, best_paths = viterbi_forward(A, B, corpus, best_probs, best_paths, vocabulary)
predictedPOS = viterbi_backward(best_probs, best_paths, corpus, tags)
# return a list of tuples: (word, POS)
return predictedPOS
Examples:
print(getSentencePOS("I have a black cat \n"))
['PRP', 'VBP', 'DT', 'JJ', 'NNS', '#']
print(getSentencePOS("I work in Shanghai \n"))
['PRP', 'VBP', 'IN', 'NNP', '#']
And here you can see that the sentence previously failing is now working!
‘work’ is not only a Noun, in this case is a Verb and is correctly identified.
Now we can check the overall accuracy on the testing dataset and see if it improved against the previous simpler model (we check it in the same way).
Predicting on a data set
Finally we compute the accuracy of the viterbi algorithm’s POS tag predictions by comparing it with the true labels.pred
is the list of predicted POS tags corresponding to the words of the test_corpus
.
def compute_accuracy(pred, y):
'''
Input:
pred: a list of the predicted parts-of-speech
y: a list of lines where each word is separated by a '\t' (i.e. word \t tag)
Output:
'''
num_correct = 0
total = 0
# Zip together the prediction and the labels
for prediction, y in zip(pred, y):
# Split the label into the word and the POS tag
word_tag_tuple = y.split()
# Check that there is actually a word and a tag
# no more and no less than 2 items
if len(word_tag_tuple) != 2: # complete this line
continue
# store the word and tag separately
word, tag = word_tag_tuple
# Check if the POS tag label matches the prediction
if prediction == tag:
# count the number of times that the prediction
# and label match
num_correct += 1
# keep track of the total number of examples (that have valid labels)
total += 1
return (num_correct/total)
print(f"Accuracy of the Viterbi algorithm is {compute_accuracy(pred, testing_corpus):.4f}")
Accuracy of the Viterbi algorithm is 0.9531
As you can see we are able to classify the parts-of-speech with 95% accuracy, which is an improvement above the previous model.
References
Here we predicted POS tags by walking forward through a corpus and knowing the previous word (Viterbi algorithm and Hidden Markov Model).
There are other implementations that use bidirectional POS tagging.
Bidirectional POS tagging requires knowing the previous word and the next word in the corpus when predicting the current word’s POS tag.
Bidirectional POS tagging would tell you more about the POS and be more accurate. It can be implemented starting from the algorith above.