PoS tagging using a Hidden Markov model

Parts-of-Speech Tagging (POS)

In the first part we have seen how to build a simple tagger using the probability that a word belongs to a specific tag.
Part-of-speech refers to the category of words (Noun, Verb, Adjective…) in the language.
The part-of-speech (POS) tagging is the process of assigning a part-of-speech tag to each word in an input text.
Tagging is difficult because some words can represent more than one part of speech at different times. They are ambiguous.

We have seen that a simple tagger based on occurence probabilities reached a good accuracy but failed at correctly tagging a sentence like “I work in Shanghai” because it tagged work as a noun and not – in this case – as a verb.

The goal

We will use now a different approach based on the Viterbi algorithm and the Hidden Markov model, that hopefully will improve the tagging accuracy.

The code and the datasets are available also on my GitHub repository.

Data Sources

This notebook will use the same tagged data sets used when building the first tagger, collected from the Wall Street Journal (WSJ):

  • One data set (WSJ-2_21.pos) will be used for training.
  • The other (WSJ-24.pos) for testing.
  • The tagged training data has been preprocessed to form a vocabulary (hmm_vocab.txt).
    The words in the vocabulary are words from the training set that were used two or more times.
    The vocabulary is augmented with a set of ‘unknown word tokens’.

They are the same as the previous notebook, and pre-processing them is also the same so I will be faster here. Please refer to the first part for more details.

Training data


# load in the training corpus
with open("../datasets/WSJ_02-21.pos", 'r') as f:
    training_corpus = f.readlines()  # list

print("A few items of the training corpus list: ")
print(training_corpus[0:5])

    
A few items of the training corpus list: 
    ['In\tIN\n', 'an\tDT\n', 'Oct.\tNNP\n', '19\tCD\n', 'review\tNN\n']

As you can see, the training_corpus is a list with all words extracted from English articles, together with their POS tag.
Almost one million of them!

Testing data

# load in the testing corpus
with open("../datasets/WSJ_24.pos", 'r') as f:
    testing_corpus = f.readlines()  # list

print("A sample of the testing corpus")
print(testing_corpus[0:10])

    A sample of the testing corpus
    ['The\tDT\n', 'economy\tNN\n', "'s\tPOS\n", 'temperature\tNN\n', 'will\tMD\n', 'be\tVB\n', 'taken\tVBN\n', 'from\tIN\n', 'several\tJJ\n', 'vantage\tNN\n']
len(testing_corpus)

    34199

The Testing Corpus is similar, just a subset of the Training one.
It will be used at the end for calculating the model’s accuracy.

Testing words and vocabulary

The testing set (WSJ-24.pos) has also been preprocessed to remove the tags to form test_words.txt. This is read in to create y, a helper list for the algorithm later on.

with open("../datasets/test.words", 'r') as f:
    testing_words = f.readlines()  # list

print(testing_words[0:20])

    ['The\n', 'economy\n', "'s\n", 'temperature\n', 'will\n', 'be\n', 'taken\n', 'from\n', 'several\n', 'vantage\n', 'points\n', 'this\n', 'week\n', ',\n', 'with\n', 'readings\n', 'on\n', 'trade\n', ',\n', 'output\n']

The vocabulary is an indexed list of words; almost 24K of them.
The unique words have been extracted from the training corpus.

# read the vocabulary data, split by each line of text, and save the list
with open("../datasets/hmm_vocab.txt", 'r') as f:
    voc_l = f.read().split('\n')  # list

# vocab: dictionary that has the index of the corresponding words
vocabulary = {} 

# Get the index of the corresponding words. 
for i, word in enumerate(sorted(voc_l)): 
    vocabulary[word] = i       
    

print("A few items at the end of the vocabulary list")
print(voc_l[-25:])

    A few items at the end of the vocabulary list
    ['yields', 'you', 'young', 'younger', 'youngest', 'youngsters', 'your', 'yourself', 'youth', 'youthful', 'yuppie', 'yuppies', 'zero', 'zero-coupon', 'zeroing', 'zeros', 'zinc', 'zip', 'zombie', 'zone', 'zones', 'zoning', '{', '}', '']

Create the helper dictionaries

Before we start predicting the tags of each word, we will need to compute – from the training corpus – a few dictionaries that will help to generate the tables.
Again, this is the same as done in the first part, so I will go quickly through them.

  • the emissionCounts dictionary will be used to compute the probability of a word given its tag.
  • the transitionCounts dictionary which computes the number of times each tag happened next to another tag.
  • the tagCounts dictionary computes the number of times each tag appeared.
import string


# Punctuation characters
punct = set(string.punctuation)

# Morphology rules used to assign unknown word tokens
noun_suffix = ["action", "age", "ance", "cy", "dom", "ee", "ence", "er", "hood", "ion", "ism", "ist", "ity", "ling", "ment", "ness", "or", "ry", "scape", "ship", "ty"]
verb_suffix = ["ate", "ify", "ise", "ize"]
adj_suffix = ["able", "ese", "ful", "i", "ian", "ible", "ic", "ish", "ive", "less", "ly", "ous"]
adv_suffix = ["ward", "wards", "wise"]



def assign_unk(tok):
    """
    Assign unknown word tokens
    """
    # Digits
    if any(char.isdigit() for char in tok):
        return "--unk_digit--"

    # Punctuation
    elif any(char in punct for char in tok):
        return "--unk_punct--"

    # Upper-case
    elif any(char.isupper() for char in tok):
        return "--unk_upper--"

    # Nouns
    elif any(tok.endswith(suffix) for suffix in noun_suffix):
        return "--unk_noun--"

    # Verbs
    elif any(tok.endswith(suffix) for suffix in verb_suffix):
        return "--unk_verb--"

    # Adjectives
    elif any(tok.endswith(suffix) for suffix in adj_suffix):
        return "--unk_adj--"

    # Adverbs
    elif any(tok.endswith(suffix) for suffix in adv_suffix):
        return "--unk_adv--"

    return "--unk--"

# Helper: substitues word not in the vocabulary with "unknown"
def get_word_tag(line, vocab): 
    if not line.split():
        word = "--n--"
        tag = "--s--"
        return word, tag
    else:
        word, tag = line.split()
        if word not in vocab: 
            # Handle unknown words
            word = assign_unk(word)
        return word, tag
    return None 

from collections import defaultdict


def create_dictionaries(corpus, vocab):
    """
    Input: 
        corpus: a corpus where each line has a word followed by its tag.
        vocab: a dictionary where keys are words in vocabulary and value is an index
    Output: 
        emission_counts: a dictionary where the keys are (tag, word) and the values are the counts
        transition_counts: a dictionary where the keys are (prev_tag, tag) and the values are the counts
        tag_counts: a dictionary where the keys are the tags and the values are the counts
    """
    
    # initialize the dictionaries using defaultdict
    emission_counts = defaultdict(int)
    transition_counts = defaultdict(int)
    tag_counts = defaultdict(int)
    
    # Initialize "prev_tag" (previous tag) with the start state, denoted by '--s--'
    prev_tag = '--s--' 
    
    # use 'i' to track the line number in the corpus
    i = 0 
    
    # Each item in the training corpus contains a word and its POS tag
    # Go through each word and its tag in the training corpus
    for word_tag in corpus:
        
        # Increment the word_tag count
        i += 1
        
            
        # get the word and tag using the get_word_tag helper function 
        word, tag = get_word_tag(word_tag, vocab) 
        
        # Increment the transition count for the previous word and tag
        transition_counts[(prev_tag, tag)] += 1
        
        # Increment the emission count for the tag and word
        emission_counts[(tag, word)] += 1

        # Increment the tag count
        tag_counts[tag] += 1

        # Set the previous tag to this tag (for the next iteration of the loop)
        prev_tag = tag
        
        
    return emission_counts, transition_counts, tag_counts

emissionCounts, transitionCounts, tagCounts = create_dictionaries(training_corpus, vocabulary)

# get all the POS states
tags = sorted(tagCounts.keys())
print(f"Number of POS tags: {len(tags)}")
print("View these POS tags")
print(tags)

    Number of POS tags: 46
    View these POS tags
    ['#', '$', "''", '(', ')', ',', '--s--', '.', ':', 'CC', 'CD', 'DT', 'EX', 'FW', 'IN', 'JJ', 'JJR', 'JJS', 'LS', 'MD', 'NN', 'NNP', 'NNPS', 'NNS', 'PDT', 'POS', 'PRP', 'PRP$', 'RB', 'RBR', 'RBS', 'RP', 'SYM', 'TO', 'UH', 'VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ', 'WDT', 'WP', 'WP$', 'WRB', '``']

We got mapped a total of 46 POS tags which is great: every tag mentioned in the Penn Databank is here.

The ‘tags’ are the Parts-of-speech designations found in the training data.

  • “NN” is noun, singular,
  • ‘NNS’ is noun, plural.
  • In addition, there are helpful tags like ‘–s–‘ which indicate a start of a sentence.
  • You can get a more complete description at Penn Treebank II tag set.

We didn’t use the transition dictionary in the first model but we will be now.

print("transition examples: ")
for ex in list(transitionCounts.items())[:3]:
    print(ex)
print()

    transition examples: 
    (('--s--', 'IN'), 5050)
    (('IN', 'DT'), 32364)
    (('DT', 'NNP'), 9044)

The transition dictionary shows how often we go from a tag to another, for example from DT (a determiner, an article such as ‘the’, or ‘a’) to a NNP (proper noun) is 9044 times

print("ambiguous word example: ")
for tup,cnt in emissionCounts.items():
    if tup[1] == 'back': print (tup, cnt) 

    ambiguous word example: 
    ('RB', 'back') 304
    ('VB', 'back') 20
    ('RP', 'back') 84
    ('JJ', 'back') 25
    ('NN', 'back') 29
    ('VBP', 'back') 4

We can search all the tags used for a specific word (for example ‘back’) in the EmissionCounts dictionary and see that most of the times was tagged as RB (adverb) but not always, it can be also a verb! It’s ambiguous.

Markov Models for POS

Now we will build something more context specific. Concretely, we will be implementing a Hidden Markov Model (HMM) with a Viterbi decoder.
The HMM is one of the most commonly used algorithms in Natural Language Processing, and is a foundation to many deep learning techniques.
In addition to parts-of-speech tagging, HMM is used in speech recognition, speech synthesis, etc.

A Markov chain is basically a directed graph: a data structure consisting of a set of objects (called nodes, the circles in the image); in our case they are the PoS tags of our model that are connected together by edges (the uni-directional lines in the image).
The arrows from state q1 to q2 represents the transition probability to move from q1 to q2.

The probability of the next event only depends on the current events.

A simple graph with 3 states q1, q2, q3

An example of a Markov chain could be a graph predicting if a day is sunny (S) or rainy (R), i.e. it has only two states. When the Markov chain is in state “R”, it has a 0.9 probability of staying there (means the next day will also rain) and a 0.1 chance of leaving for the “S” state (means the next day will instead be sunny). Likewise, “S” state has 0.9 probability of staying sunny and a 0.1 chance of transitioning to the “R” state.

A Rainy / Sunny graph

A sentence is a sequence of words with associated parts of speech tags.
We can represent that sequence with a graph where the parts of speech tags (NN, VB, O) are events that can occur depicted by the nodes of the model graph. The weights on the arrows between the states define the probability of going from one tag to another:

A graph where the states are Part of Speech

Hidden Markov Models for POS

The Hidden Markov Model (HMM) is a statistical model in which the system being modeled is yes a Markov process – call it X – but with unobservable (“hidden”) states.
The goal is to learn about X by observing another process Y whose behavior “depends” on X.

Let’s see how the weather example above can have unobservable events.
Consider two friends, Alice and Bob, who live far apart from each other and who talk together daily over the telephone about what they did that day. Bob is only interested in three activities: walking in the park, shopping and cleaning his apartment.
The choice of what to do is determined exclusively by the weather on a given day.
Alice has no definite information about the weather but she knows general trends. Based on what Bob tells her he did each day, Alice tries to guess what the weather must have been like.

Alice believes that the weather operates as a discrete Markov chain.
There are two states: “Rainy” and “Sunny”, but she cannot observe them directly as they are “hidden” from her. On each day, there is a certain chance that Bob will perform one of the following activities, depending on the weather: “walk”, “shop”, or “clean”.
Since Bob tells Alice about his activities, those are the observations. The entire system is that of a hidden Markov model (HMM).

It’s the same when we are trying to tag the POS on a sentence: we cannot observe the POS of each word but we can know the transition probabilities of another model trained on a given dataset.

Describe Markov models with matrixes

The Markov model graph can be represented as a matrix with dimension n+1 by n, where n is the number of tags:

  • each element a(i,j) represents the probability to transition from tag i to tag j
  • when no previous state, we introduce an initial state π.
  • The sum of all transition from a state should always be 1.

To describe a HMM you will need a transition matrix A (for example with the probabilities of rainy and sunny states) and also an emission matrix B.
The second matrix contains the probabilities of the observable states based on the unobservable: how likely Bob is to perform a certain activity on each day. If it is rainy, there is a 50% chance that he is cleaning his apartment; if it is sunny, there is a 60% chance that he is outside for a walk.

For POS tagging, the hidden Markov model have emission probabilities matrix B describe the transition from the hidden states to the observables (the words of your corpus).

Generating Matrices

Starting from the emissionCounts, transitionCounts and tagCounts, we will now implement the Hidden Markov Model.

This will allow to quickly construct the

  • A transition probabilities matrix.
  • B emission probabilities matrix.

Transition matrix A

To avoid division by zero since lot of entries in the transition matrix are zero, we apply smoothing to the probability formula.
The matrix A is computed with smoothing as follows:

(1) P(t_i | t_{i-1}) = \frac{C(t_{i-1}, t_{i}) + \alpha }{C(t_{i-1}) +\alpha * N}

  • N is the total number of tags
  • C(t_{i-1}, t_{i}) is the count of the tuple (previous POS, current POS) in transition_counts dictionary.
  • C(t_{i-1}) is the count of the previous POS in the tag_counts dictionary.
  • ɑ is a smoothing parameter.

The function below implements the create_transition_matrix for all tags. It outputs a matrix that computes the above equation (1) for each cell in matrix A.
Inputs are the transitionCounts and the tagCounts.

import numpy as np

def create_transition_matrix(alpha, tag_counts, transition_counts):
    ''' 
    Input: 
        alpha: number used for smoothing
        tag_counts: a dictionary mapping each tag to its respective count
        transition_counts: transition count for the previous word and tag
    Output:
        A: matrix of dimension (num_tags,num_tags)
    '''
    # Get a sorted list of unique POS tags; e.g. all_tags[20]is 'NN'
    all_tags = sorted(tag_counts.keys())
    
    # Get the number of unique POS tags, this will be n, the matrix size
    num_tags = len(all_tags)
    
    # Initialize the transition matrix 'A' (size is n by n) to zero
    A = np.zeros((num_tags, num_tags))
    
    # Get the unique transition tuples (previous POS, current POS)
    trans_keys = set(transition_counts.keys())
    
    
    # Go through each row of the transition matrix A
    for i in range(num_tags):
        
        # Go through each column of the transition matrix A
        for j in range(num_tags):

            # Initialize the count of the (prev POS, current POS) to zero
            count = 0
        
            # Define the tuple (prev POS, current POS)
            # Get the tag at position i and tag at position j (from the all_tags list)
            key = (all_tags[i], all_tags[j]) # this is a tuple (tag1, tag2)

            # Check if the (prev POS, current POS) tuple 
            # exists in the transition counts dictionaory
            if key in transition_counts:
                
                # If yes, get count from the transition_counts dictionary 
                # for the (prev POS, current POS) tuple
                # we know that the transition dictionary contains key=tuple(pos1,pos2) and value=probability
                # e.g. (('DT', 'NNP'), 9044)
                count = transition_counts[key]
                
            # Get the count of the previous tag (index position i), how may times appeared in corpus
            count_prev_tag = tag_counts[all_tags[i]]
            
            # Formula (1)
            # Apply smoothing using count of the tuple, alpha, 
            # count of previous tag, alpha, and number of total tags
            A[i,j] = (count + alpha) / (count_prev_tag + alpha * num_tags)

    
    return A

import pandas as pd


alpha = 0.001


A = create_transition_matrix(alpha, tagCounts, transitionCounts)

print("A subset of transition matrix A:")
print(pd.DataFrame(A[30:35,30:35], index=tags[30:35], columns = tags[30:35] ))
    A subset of transition matrix A:
                  RBS            RP           SYM        TO            UH
    RBS  2.217069e-06  2.217069e-06  2.217069e-06  0.008870  2.217069e-06
    RP   3.756509e-07  7.516775e-04  3.756509e-07  0.051089  3.756509e-07
    SYM  1.722772e-05  1.722772e-05  1.722772e-05  0.000017  1.722772e-05
    TO   4.477336e-05  4.472863e-08  4.472863e-08  0.000090  4.477336e-05
    UH   1.030439e-05  1.030439e-05  1.030439e-05  0.061837  3.092348e-02

That is an example of what the A transition matrix would look like (it is simplified to 5 tags for viewing. It is 46×46 in reality).

Each cell gives the probability to go from one part of speech to another.

  • In other words, there is a 4.47e-8 chance of going from parts-of-speech TO to RP.
  • The sum of each row has to be equal to 1, because we assume that the next POS tag must be one of the available columns in the table.

The emission probabilities matrix B

Now we will create the B transition matrix which computes the emission probability by counting the co-occurrences of a part of speech tag with a specific word.

We will use smoothing as defined below:

(2) P(w_i | t_i) = \frac{C(t_i, word_i)+ \alpha}{C(t_{i}) +\alpha * N}

  • C(t_i, word_i) is the number of times $word_i$ was associated with tag_i in the training data (stored in emission_counts dictionary).
  • C(t_i) is the number of times tag_i was in the training data (stored in tag_counts dictionary).
  • N is the number of words in the vocabulary
  • ɑ is a smoothing parameter.

The matrix B is of dimension (num_tags, N), where num_tags is the number of possible parts-of-speech tags.

Finally the function below implements the create_emission_matrix that computes the B emission probabilities matrix. The function takes in $\alpha$, the smoothing parameter, tagCounts, which is the dictionary mapping each tag to its respective count and – this time – the emissionCounts dictionary where the keys are (tag, word) and the values are the counts. It outputs a matrix that computes equation (2) above for each cell in matrix B.

def create_emission_matrix(alpha, tag_counts, emission_counts, vocab):
    '''
    Input: 
        alpha: tuning parameter used in smoothing 
        tag_counts: a dictionary mapping each tag to its respective count
        emission_counts: a dictionary where the keys are (tag, word) and the values are the counts
        vocab: a dictionary where keys are words in vocabulary and value is an index
    Output:
        B: a matrix of dimension (num_tags, len(vocab))
    '''
    
    # get the number of POS tag
    num_tags = len(tag_counts) # this is the number of rows of the B matrix
    
    # Get a list of all POS tags
    all_tags = sorted(tag_counts.keys())
    
    # Get the total number of unique words in the vocabulary
    num_words = len(vocab) # this is the number of columns of the B matrix
    
    # Initialize the emission matrix B with places for
    # tags in the rows and words in the columns
    B = np.zeros((num_tags, num_words)) 
    
    # Get a set of all (POS, word) tuples 
    # from the keys of the emission_counts dictionary
    # e.g. ('NN', 'dog')
    emis_keys = set(list(emission_counts.keys()))
        
    # Go through each row (POS tags)
    for i in range(num_tags):
        
        # Go through each column (words)
        for j in range(num_words):

            # Initialize the emission count for the (POS tag, word) to zero
            count = 0
                    
            # Define the (POS tag, word) tuple for this row and column
            key = (all_tags[i], vocab[j]) # this is a tuple (tag, word) as ('NN', 'dog')
            
            # check if the (POS tag, word) tuple exists as a key in emission counts
            if key in emission_counts.keys():
        
                # Get the count of (POS tag, word) from the emission_counts
                # we know that the emission dictionary contains key=tuple(pos,word) and value=count
                # e.g. ('NN','dog') = 10 (appears ten times in corpus)
                count = emission_counts[key]
                
            # Get the count of the POS tag (how may times appeared in corpus)
            count_tag = tag_counts[all_tags[i]]
                
            # Apply smoothing and store the smoothed value 
            # into the emission matrix B for this row and column
            B[i,j] = (count + alpha) / (count_tag+ alpha*num_words)

    return B

# creating the emission probability matrix. this takes a few minutes to run. 

B = create_emission_matrix(alpha, tagCounts, emissionCounts, list(vocabulary))


# Try viewing emissions for a few words in a sample dataframe
cidx  = ['725','well','dog', 'dogs', 'work']

# Get the integer ID for each word
cols = [vocabulary[a] for a in cidx]

# Choose POS tags to show in a sample dataframe
rvals =['CD','NN','NNS', 'VB','RB','RP']

# For each POS tag, get the row number from the 'states' list
rows = [tags.index(a) for a in rvals]

# Get the emissions for the sample of words, and the sample of POS tags
print(pd.DataFrame(B[np.ix_(rows,cols)], index=rvals, columns = cidx ))
                  725          well           dog          dogs          work
    CD   8.201296e-05  2.732854e-08  2.732854e-08  2.732854e-08  2.732854e-08
    NN   7.521128e-09  6.769767e-05  7.521880e-05  7.521128e-09  2.015670e-03
    NNS  1.670013e-08  1.670013e-08  1.670013e-08  1.169176e-04  1.670013e-08
    VB   3.779036e-08  3.779036e-08  3.779036e-08  3.779036e-08  4.345929e-03
    RB   3.226454e-08  1.319623e-02  3.226454e-08  3.226454e-08  3.226454e-08
    RP   3.723317e-07  3.723317e-07  3.723317e-07  3.723317e-07  3.723317e-07

That is an example of the matrix, only a subset of tags and words are shown.

E.g. the probability for the tuple (NN, dog) is 7.5e-05, much higher than for (NNS, dog).

But note that ‘work’ has similar probabilities to be a singular noun (NN) or a verb (VB)

Pre-process the corpus

Finally, the corpus needs to be pre-processed (as in the first part) to tag the unknown words and mark the end of sentence:

def preprocessCorpus(y):
    
    orig = []
    y_prepr = []
    
      # we already read the words from testing dataset into 'y'
    for cnt, word in enumerate(y):


            # End of sentence
        if not word.split():
            orig.append(word.strip())
            word = "--n--"
            y_prepr.append(word)
            continue

            # Handle unknown words
        elif word.strip() not in vocabulary:
            orig.append(word.strip())
            word = assign_unk(word)
            y_prepr.append(word)
            continue

        else:
            orig.append(word.strip())
            y_prepr.append(word.strip())

    assert(len(orig) == len(y)) # just to be sure
    assert(len(y_prepr) == len(y))
    
    return y_prepr

tw_prepr = preprocessCorpus(testing_words)

print('The length of the preprocessed test corpus: ', len(tw_prepr))
print('This is a sample of the test_corpus: ')
print(tw_prepr[0:20])
    The length of the preprocessed test corpus:  34199
    This is a sample of the test_corpus: 
    ['The', 'economy', "'s", 'temperature', 'will', 'be', 'taken', 'from', 'several', '--unk--', 'points', 'this', 'week', ',', 'with', 'readings', 'on', 'trade', ',', 'output']

Viterbi Algorithm

The Viterbi algorithm is a graph algorithm to find the most likely sequence of hidden states (or parts of speech tags in our case) — called the Viterbi path — that have the highest a posteriori probability for a sequence of observed events.

The Viterbi algorithm is named after Andrew Viterbi, who proposed it in 1967 as a decoding algorithm for convolutional codes over noisy digital communication links.
It was introduced to Natural Language Processing as a method of part-of-speech tagging as early as 1987.

I cannot make a better explanation than what is on Wikipedia, completed with pseudocode and example in Python.
Now we will implement the Viterbi algorithm which makes use of dynamic programming. Specifically, we will use the two matrices A and B to compute the Viterbi algorithm in three main steps:

  • Initialization – initialize the best_paths and best_probabilities matrices that we will be populating in feed_forward.
  • Feed forward – At each step, calculate the probability of each path happening and the best paths up to that point.
  • Feed backward: This allows to find the best path with the highest probabilities.

Initialization

Let’s start by initializing two matrices of the same dimension.

  • best_probs: Each cell contains the probability of going from one POS tag to a word in the corpus.
    (The initialization of matrix C tell the probability of every word belongs to a certain part of speech)
  • best_paths: A matrix that helps you trace through the best possible path in the corpus.
    (in D matrix, we store the labels that represent the different states we are traversing when finding the most likely sequence of parts of speech tags for the given sequence of words W1 all the way to Wk)

Here is how to initialize best_probs:

  • The probability of the best path going from the start index to a given POS tag indexed by integer i is denoted by best_probs [s_{idx}, i] .
  • This is estimated as the probability that the start tag transitions to the POS denoted by index i: \mathbf{A}[s_{idx}, i] AND that the POS tag denoted by i emits the first word of the given corpus, which is \mathbf{B}[i, vocab[corpus[0]]].
  • Note that vocab[corpus[0]] refers to the first word of the corpus (the word at position 0 of the corpus).
  • vocab is a dictionary that returns the unique integer that refers to that particular word.

Conceptually, it looks like this:
best\_probs[s_{idx}, i] = \mathbf{A}[s_{idx}, i] \times \mathbf{B}[i, corpus[0] ]

In order to avoid multiplying and storing small values on the computer, we’ll take the log of the product, which becomes the sum of two logs:

best_probs[i,0] = log(A[s_{idx}, i]) + log(B[i, vocab[corpus[0]]
import math

def initialize(states, tag_counts, A, B, corpus, vocab):
    '''
    Scope: initializes the `best_probs` and the `best_paths` matrix
    
    Input: 
        states: a list of all possible parts-of-speech
        tag_counts: a dictionary mapping each tag to its respective count
        A: Transition Matrix of dimension (num_tags, num_tags)
        B: Emission Matrix of dimension (num_tags, len(vocab))
        corpus: a sequence of words whose POS is to be identified in a list 
        vocab: a dictionary where keys are words in vocabulary and value is an index
    Output:
        best_probs: matrix of dimension (num_tags, len(corpus)) of floats
        best_paths: matrix of dimension (num_tags, len(corpus)) of integers
    '''
      # Get the total number of unique POS tags
    num_tags = len(tag_counts)
    
      # Initialize best_probs matrix to zero
      # POS tags as the rows, number of words in the corpus as the columns
    best_probs = np.zeros((num_tags, len(corpus)))
    
      # Initialize best_paths matrix to zero
      # POS tags as the rows, number of words in the corpus as columns
      # the two matrices have the same size
    best_paths = np.zeros((num_tags, len(corpus)), dtype=int)
    
      # Define the start token
    s_idx = states.index("--s--")
    
      # Go through each of the POS tags (matrix rows)
    for i in range(num_tags): 
        
          # Handle the special case when the transition from start token to POS tag i is zero
          # Column zero of `best_probs` is initialized with the assumption that the first word 
          # of the corpus was preceded by a start token ("--s--"). 
          # This allows  to reference the A matrix for the transition probability
        if A[s_idx,i] == 0: 
            
              # Initialize best_probs at POS tag 'i', column 0, to negative infinity (log 0)
            best_probs[i,0] = float('-inf')
        
          # For all other cases when transition from start token to POS tag i is non-zero:
        else:
            
              # Initialize best_probs at POS tag 'i', column 0
              # Check the formula above
            best_probs[i,0] = math.log(A[s_idx,i]) + math.log(B[i,vocab[corpus[0]]] )
                        
    return best_probs, best_paths

best_probs, best_paths = initialize(tags, tagCounts, A, B, tw_prepr, vocabulary)

Viterbi Forward

Now we will implement the viterbi_forward segment. In other words, we will populate the best_probs and best_paths matrices:

  • Walk forward through the corpus.
  • For each word, compute a probability for each possible tag.
  • This will include the path up to that (word,tag) combination.

The formula to compute the probability and path for the i^{th} word in the corpus, the prior word i-1 in the corpus, current POS tag j, and previous POS tag k is:

prob = best\_probs_{k, i-1} + \mathrm{log}(A_{k, j}) + log(B_{j, vocab(corpus_{i})})

where corpus_{i}  is the word in the corpus at index i, and vocab is the dictionary that gets the unique integer that represents a given word.

path = k, where k is the integer representing the previous POS tag.

def viterbi_forward(A, B, test_corpus, best_probs, best_paths, vocab):
    '''
    Scope: 
        store the best_path and best_prob for every possible tag for each word 
        in the matrices `best_probs` and `best_tags`
    Input: 
        A, B: The transiton and emission matrices respectively
        test_corpus: a list containing a preprocessed corpus
        best_probs: an initialized matrix of dimension (num_tags, len(corpus))
        best_paths: an initialized matrix of dimension (num_tags, len(corpus))
        vocab: a dictionary where keys are words in vocabulary and value is an index 
    Output: 
        best_probs: a completed matrix of dimension (num_tags, len(corpus))
        best_paths: a completed matrix of dimension (num_tags, len(corpus))
    '''
      # Get the number of unique POS tags (which is the num of rows in best_probs)
    num_tags = best_probs.shape[0]
    
      # Go through every word in the corpus starting from word 1
      # Recall that word 0 was specially initialized in `initialize()`
    for i in range(1, len(test_corpus)): 
        
          # For PROGRESS: Print number of words processed, every 5000 words
        if i % 5000 == 0:
            print("Words processed: {:>8}".format(i))
            
          # For each unique POS tag that the current word can be
        for j in range(num_tags): 
            
              # Initialize best_prob for word i to negative infinity
            best_prob_i =  float("-inf")
            
              # Initialize best_path for current word i to None
            best_path_i = None

              # For each POS tag that the previous word can be:
            for k in range(num_tags): 
            
                  # compute the probability that the previous word had a given POS tag, 
                  # that the current word has a given POS tag, 
                  # and that the POS tag would emit this current word.
                
                  # Probability = 
                  # best probs of POS tag k, previous word i-1 + 
                  # log(prob of transition from POS k to POS j) + 
                  # log(prob that emission of POS j is word i)
                prob = best_probs[k,i-1] + math.log(A[k,j]) + math.log(B[j,vocab[test_corpus[i]]])
                
                  # retain the highest probability computed for the current word
                if prob > best_prob_i: 
                    
                    best_prob_i = prob
                    
                      # set best_paths to the index 'k', keep track of the POS tag of the previous word
                      # that is part of the best path.  
                      # K is representing the POS tag of the previous word which produced the highest probability 
                    best_path_i = k

              # Save the best probability for the 
              # given current word's POS tag
              # and the position of the current word inside the corpus
            best_probs[j,i] = best_prob_i
            
              # Save the unique integer ID of the previous POS tag
              # into best_paths matrix, for the POS tag of the current word
              # and the position of the current word inside the corpus
            best_paths[j,i] = best_path_i

    return best_probs, best_paths

Run the `viterbi_forward` function to fill in the `best_probs` and `best_paths` matrices.

Note that this will take a few minutes to run. There are about 30,000 words to process.

# this will take a few minutes to run => processes ~ 30,000 words
best_probs, best_paths = viterbi_forward(A, B, tw_prepr, best_probs, best_paths, vocabulary)
    Words processed:     5000
    Words processed:    10000
    Words processed:    15000
    Words processed:    20000
    Words processed:    25000
    Words processed:    30000

Viterbi backward

The Viterbi backward algorithm gets the predictions of the POS tags for each word in the corpus using the best_paths and the best_probs matrices.

The backward pass help retrieve the most likely sequence of parts of speech tags for your given sequence of words.

  • First it calculates the index of the entry with the highest probability in the last column of best_probs.
    It represents the last hidden state we traversed when we observe the word
  • This index is used to traverse back through the best_paths matrix to reconstruct the sequence of parts of speech tags
def viterbi_backward(best_probs, best_paths, corpus, states):
    '''
    Scope: 
        This function retrieves the best path.
    Input: 
        corpus: a list containing a preprocessed corpus
        best_probs: an initialized matrix of dimension (num_tags, len(corpus))
        best_paths: an initialized matrix of dimension (num_tags, len(corpus))
        states: dictionary with each POS tag and its index 
    Output: 
        pred: returns a list of predicted POS tags for each word in the corpus.   
    '''
    
  #  
  # Initialization
  #

      # Get the number of words in the corpus
      # which is also the number of columns in best_probs, best_paths
    m = best_paths.shape[1] 
    
      # Get the number of unique POS tags
    num_tags = best_probs.shape[0]
    
      # Initialize the best probability for the last word
    best_prob_for_last_word = float('-inf')
    
      # Initialize array z, same length as the corpus
    z = [None] * m
    
      # Initialize pred array, same length as corpus
    pred = [None] * m
  
  #
  # Step 1 : Loop through all the rows (POS tags) in the last entry of `best_probs` 
  # and find the row (POS tag) with the maximum value. This is the best POS tag for the last word
  #
    
      # Go through each POS tag for the *last* word (last column of best_probs)
      # in order to find the row (POS tag integer ID) 
      # with highest probability for the last word
    for k in range(num_tags):
        

          # If the probability of POS tag at row k 
          # is better than the previosly best probability for the last word:
        if best_probs[k, m - 1] > best_prob_for_last_word:
            
              # then store the new best probability for the last word
            best_prob_for_last_word = best_probs[k, m - 1]
    
              # and store the unique integer ID of the POS tag
              # which is also the row number in best_probs
            z[m - 1] = k
            
            
      # Convert the last word's predicted POS tag
      # from its unique integer ID into the string representation
      # using the 'states' dictionary
      # Store this in the 'pred' array for the last word
    pred[m - 1] = states[z[m - 1]]
    
  #
  # Step 2: traverse back through the best_paths matrix and retrieve all POS tags
  #
    
      # Find all the best POS tags by walking backward through the best_paths
      # From the last word in the corpus to the first word in the corpus
    for i in range(m-1, -1, -1): 
        
          # Retrieve the unique integer ID of
          # the POS tag for the word at position 'i' in the corpus
        pos_tag_for_word_i = z[i]
        
        
          # In best_paths, go to the row representing the POS tag of word i
          # and the column representing the word's position in the corpus
          # to retrieve the predicted POS for the word at position i-1 in the corpus
        z[i - 1] = best_paths[pos_tag_for_word_i,i]
        
          # Get the previous word's POS tag in string form
          # again using the 'states' dictionary, 
          # where the key is the unique integer ID of the POS tag,
          # and the value is the string representation of that POS tag
        pred[i - 1] = states[z[i - 1]]
        
    return pred

# Run and test the function
pred = viterbi_backward(best_probs, best_paths, tw_prepr, tags)


m=len(pred)
print(f"The prediction for {tw_prepr[-7:m-1]} is: \n {pred[-7:m-1]} \n")
print(f"The prediction for {tw_prepr[0:7]} is: \n {pred[0:7]}")
    The prediction for ['see', 'them', 'here', 'with', 'us', '.'] is: 
     ['VB', 'PRP', 'RB', 'IN', 'PRP', '.'] 
    
    The prediction for ['The', 'economy', "'s", 'temperature', 'will', 'be', 'taken'] is: 
     ['DT', 'NN', 'POS', 'NN', 'MD', 'VB', 'VBN']

Predict on single words or sentences

Let’s see a couple of examples:
In general the list pred just retrieved from the Viterbi algorith running on the training corpus can be used to check a word POS, knowing the word index:

print('The third word is:', tw_prepr[3])
print('Its prediction for the POS tag is:', pred[3])
print('The corresponding true label is: ', testing_corpus[3])
    The third word is: temperature
    Its prediction for the POS tag is: NN
    The corresponding true label is:  temperature	NN

Obviously you don’t know the word index in advance, so it can be retrieved from the training word preprocessed dictionary (tw_prepr).
Let’s do a quick helper function:

def getWordPOS(word):
    if word in tw_prepr:
        return pred[tw_prepr.index(word)]
    else:
        return "unknown"

getWordPOS("cat")
    'unknown'
getWordPOS("work")
    'NN'

Same can be done for an entire sentence. The following helper function will split a sentence (string) into words and run the Viterbi algorithm on them:

def getSentencePOS(sentence):
    """
    Input: 
        sentence: the sentence to be tagged. String. Needs a marker at the end (newline).
    Output: 
        a list of tuples: each word with its most probable tag
    """
    words = sentence.split(" ") # split into words (a list)
    corpus = preprocessCorpus(words)
    
    best_probs, best_paths = initialize(tags, tagCounts, A, B, corpus, vocabulary)
    best_probs, best_paths = viterbi_forward(A, B, corpus, best_probs, best_paths, vocabulary)
    predictedPOS = viterbi_backward(best_probs, best_paths, corpus, tags)
                
            # return a list of tuples: (word, POS)
    return predictedPOS

Examples:

print(getSentencePOS("I have a black cat \n"))
    ['PRP', 'VBP', 'DT', 'JJ', 'NNS', '#']
print(getSentencePOS("I work in Shanghai \n"))
    ['PRP', 'VBP', 'IN', 'NNP', '#']

And here you can see that the sentence previously failing is now working!
‘work’ is not only a Noun, in this case is a Verb and is correctly identified.

Now we can check the overall accuracy on the testing dataset and see if it improved against the previous simpler model (we check it in the same way).

Predicting on a data set

Finally we compute the accuracy of the viterbi algorithm’s POS tag predictions by comparing it with the true labels.
pred is the list of predicted POS tags corresponding to the words of the test_corpus.

def compute_accuracy(pred, y):
    '''
    Input: 
        pred: a list of the predicted parts-of-speech 
        y: a list of lines where each word is separated by a '\t' (i.e. word \t tag)
    Output: 
        
    '''
    num_correct = 0
    total = 0
    
      # Zip together the prediction and the labels
    for prediction, y in zip(pred, y):

          # Split the label into the word and the POS tag
        word_tag_tuple = y.split()
        
          # Check that there is actually a word and a tag
          # no more and no less than 2 items
        if len(word_tag_tuple) != 2: # complete this line
            continue 
        
          # store the word and tag separately
        word, tag = word_tag_tuple
          # Check if the POS tag label matches the prediction
        if prediction == tag: 
              # count the number of times that the prediction
              # and label match
            num_correct += 1
            
          # keep track of the total number of examples (that have valid labels)
        total += 1
        
    return (num_correct/total)

print(f"Accuracy of the Viterbi algorithm is {compute_accuracy(pred, testing_corpus):.4f}")
    Accuracy of the Viterbi algorithm is 0.9531

As you can see we are able to classify the parts-of-speech with 95% accuracy, which is an improvement above the previous model.

References

Here we predicted POS tags by walking forward through a corpus and knowing the previous word (Viterbi algorithm and Hidden Markov Model).
There are other implementations that use bidirectional POS tagging.
Bidirectional POS tagging requires knowing the previous word and the next word in the corpus when predicting the current word’s POS tag.
Bidirectional POS tagging would tell you more about the POS and be more accurate. It can be implemented starting from the algorith above.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s