from collections import Counter
import numpy as np
from nltk import ngrams
[docs]def get_best_beam(beams, normalization_alpha=0):
best_index = 0
best_score = -1e10
for (i, beam) in enumerate(beams):
normalized_score = beam.normalized_score(normalization_alpha)
if normalized_score > best_score:
best_index = i
best_score = normalized_score
return beams[best_index]
[docs]def n_gram_repeats(sequence, n):
Returns true if sequence contains twice the same n-gram
:param sequence:
:param n:
counts = Counter(ngrams(sequence, n))
if len(counts) == 0:
return False
if counts.most_common()[0][1] > 1:
return True
return False
[docs]class Beam(object):
end_token = -1
[docs] def __init__(self, sequence, likelihood, mentioned_movies=None):
if mentioned_movies is None:
mentioned_movies = set()
self.finished = False
self.sequence = sequence
self.likelihood = likelihood
self.mentioned_movies = mentioned_movies.copy() if mentioned_movies else set()
def get_updated_beam(self, token, probability):
updated_beam = Beam(self.sequence + [token], self.likelihood * probability, self.mentioned_movies)
if token == self.end_token:
updated_beam.finished = True
return updated_beam
[docs] def __str__(self):
finished_str = "" if self.finished else "not"
return finished_str + " finished beam of likelihood {} : {}".format(self.likelihood, self.sequence)
[docs] def normalized_score(self, alpha):
Get score with a length penalty following
Wu et al 'Google's neural machine translation system: Bridging the gap between human and machine translation'
:param alpha:
if alpha == 0:
return np.log(self.likelihood)
penalty = ((5 + len(self.sequence)) / 6) ** alpha
return np.log(self.likelihood) / penalty
[docs]class BeamSearch(object):
def initial_beams(start_sentence):
return [Beam(sequence=start_sentence, likelihood=1)]
[docs] @staticmethod
def update_beams(beams, beam_size, probabilities, n_gram_block=None):
One step of beam search
:param n_gram_block:
:param probabilities: list of beam_size probability tensors (one for each beam)
:return: list of the new beams.
vocab_size = probabilities[0].data.shape[0]
# compute the likelihoods for the next token
# vector for finished beams. First dimension will be the likelihood of the finished beam, other dimensions are
# zeros so this beam is counted only once in the top k
finsished_beam_vec = np.zeros(vocab_size)
finsished_beam_vec[0] = 1
# (beam_size, vocab_size)
new_probabilities = np.array([beam.likelihood * if not beam.finished
else beam.likelihood * finsished_beam_vec
for beam, probability in zip(beams, probabilities)])
# get the top-k (beam_size) probabilities
ind = np.unravel_index(np.argsort(new_probabilities, axis=None), new_probabilities.shape)
# inspect hypothesis in descending order of likelihood
ind = (ind[0][::-1], ind[1][::-1])
# get the list of top-k updated beams
new_beams = []
for beam_index, token in zip(*ind):
# if finished, append the beam as is
if beams[beam_index].finished:
# otherwise, update the beam with the chosen token
# check n_gram blocking. Note that n_gram blocking is not used to produce the results in the article
if n_gram_block is None or not n_gram_repeats(beams[beam_index].sequence + [token], n_gram_block):
# add extended hypothesis to new_beam list
beams[beam_index].get_updated_beam(token, probabilities[beam_index][token].data.numpy()))
# return when beam_size valid beams found
if len(new_beams) >= beam_size:
return new_beams
return new_beams