Source code for xnmt.modelparts.scorers

from typing import List, Union, Optional
import numbers

import numpy as np
import dynet as dy

from xnmt import batchers, input_readers, param_collections, param_initializers, vocabs, logger
from xnmt.modelparts import transforms
from xnmt.persistence import Serializable, serializable_init, bare, Ref
from xnmt.events import handle_xnmt_event, register_xnmt_handler

def find_best_k(scores, k):
  k = min(len(scores), k)
  top_words = np.argpartition(scores, -k, axis=0)[-k:]

  if len(scores.shape) > 1:
    assert top_words.shape == (k, scores.shape[1]), \
      'top_words has shape %s, expected (%d, %d)' % (str(top_words.shape), k, scores.shape[1])
    # top_words is (k, batch_size)
    # scores is (#classes, batch_size)
    top_scores = []
    for i in range(top_words.shape[1]):
      top_scores.append(scores[top_words[:, i], i])
    top_scores = np.array(top_scores).T
  else:
    assert top_words.shape == (k,)
    top_scores = scores[top_words]
  return top_words, top_scores

[docs]class Scorer(object): """ A template class of things that take in a vector and produce a score over discrete output items. """
[docs] def calc_scores(self, x: dy.Expression) -> dy.Expression: """ Calculate the score of each discrete decision, where the higher the score is the better the model thinks a decision is. These often correspond to unnormalized log probabilities. Args: x: The vector used to make the prediction """ raise NotImplementedError('calc_scores must be implemented by subclasses of Scorer')
[docs] def best_k(self, x: dy.Expression, k: numbers.Integral, normalize_scores: bool = False): """ Returns a list of the k items with the highest scores. The items may not be in sorted order. Args: x: The vector used to make the prediction k: Number of items to return normalize_scores: whether to normalize the scores """ raise NotImplementedError('best_k must be implemented by subclasses of Scorer')
[docs] def sample(self, x: dy.Expression, n: numbers.Integral): """ Return samples from the scores that are treated as probability distributions. """ raise NotImplementedError('sample must be implemented by subclasses of Scorer')
[docs] def calc_probs(self, x: dy.Expression) -> dy.Expression: """ Calculate the normalized probability of a decision. Args: x: The vector used to make the prediction """ raise NotImplementedError('calc_prob must be implemented by subclasses of Scorer')
[docs] def calc_log_probs(self, x: dy.Expression) -> dy.Expression: """ Calculate the log probability of a decision log(calc_prob()) == calc_log_prob() Both functions exist because it might help save memory. Args: x: The vector used to make the prediction """ raise NotImplementedError('calc_log_prob must be implemented by subclasses of Scorer')
[docs] def calc_loss(self, x: dy.Expression, y: Union[int, List[int]]) -> dy.Expression: """ Calculate the loss incurred by making a particular decision. Args: x: The vector used to make the prediction y: The correct label(s) """ raise NotImplementedError('calc_loss must be implemented by subclasses of Scorer')
def _choose_vocab_size(self, vocab_size: Optional[int], vocab: Optional[vocabs.Vocab], trg_reader: Optional[input_readers.InputReader]) -> int: """Choose the vocab size for the embedder based on the passed arguments. This is done in order of priority of vocab_size, vocab, model Args: vocab_size: vocab size or None vocab: vocab or None trg_reader: Model's trg_reader, if exists and unambiguous. Returns: chosen vocab size """ if vocab_size is not None: return vocab_size elif vocab is not None: return len(vocab) elif trg_reader is None or trg_reader.vocab is None: raise ValueError( "Could not determine scorer's's output size. " "Please set its vocab_size or vocab member explicitly, or specify the vocabulary of trg_reader ahead of time.") else: return len(trg_reader.vocab)
[docs]class Softmax(Scorer, Serializable): """ A class that does an affine transform from the input to the vocabulary size, and calculates a softmax. Note that all functions in this class rely on calc_scores(), and thus this class can be sub-classed by any other class that has an alternative method for calculating un-normalized log probabilities by simply overloading the calc_scores() function. Args: input_dim: Size of the input vector vocab_size: Size of the vocab to predict vocab: A vocab object from which the vocab size can be derived automatically trg_reader: An input reader for the target, which can be used to derive the vocab size label_smoothing: Whether to apply label smoothing (a value of 0.1 is good if so) param_init: How to initialize the parameters bias_init: How to initialize the bias output_projector: The projection to be used before the output """ yaml_tag = '!Softmax' @serializable_init def __init__(self, input_dim: numbers.Integral = Ref("exp_global.default_layer_dim"), vocab_size: Optional[numbers.Integral] = None, vocab: Optional[vocabs.Vocab] = None, trg_reader: Optional[input_readers.InputReader] = Ref("model.trg_reader", default=None), label_smoothing: numbers.Real = 0.0, param_init: param_initializers.ParamInitializer = Ref("exp_global.param_init", default=bare(param_initializers.GlorotInitializer)), bias_init: param_initializers.ParamInitializer = Ref("exp_global.bias_init", default=bare(param_initializers.ZeroInitializer)), output_projector: transforms.Linear = None) -> None: self.param_col = param_collections.ParamManager.my_params(self) self.input_dim = input_dim self.output_dim = self._choose_vocab_size(vocab_size, vocab, trg_reader) self.label_smoothing = label_smoothing self.output_projector = self.add_serializable_component("output_projector", output_projector, lambda: output_projector or transforms.Linear( input_dim=self.input_dim, output_dim=self.output_dim, param_init=param_init, bias_init=bias_init))
[docs] def calc_scores(self, x: dy.Expression) -> dy.Expression: return self.output_projector.transform(x)
[docs] def best_k(self, x: dy.Expression, k: numbers.Integral, normalize_scores: bool = False): scores_expr = self.calc_log_probs(x) if normalize_scores else self.calc_scores(x) scores = scores_expr.npvalue() return find_best_k(scores, k)
[docs] def sample(self, x: dy.Expression, n: numbers.Integral, temperature: numbers.Real=1.0): assert temperature != 0.0 scores_expr = self.calc_log_probs(x) if temperature != 1.0: scores_expr *= 1.0 / temperature scores = dy.softmax(scores_expr).npvalue() else: scores = dy.exp(scores_expr).npvalue() # Numpy is very picky. If the sum is off even by 1e-8 it complains. scores /= sum(scores) a = range(scores.shape[0]) samples = np.random.choice(a, (n,), replace=True, p=scores) r = [] for word in samples: r.append((word, dy.pick(scores_expr, word))) return r
[docs] def can_loss_be_derived_from_scores(self): """ This method can be used to determine whether dy.pickneglogsoftmax can be used to quickly calculate the loss value. If False, then the calc_loss method should (1) calc log_softmax, (2) perform necessary modification, (3) pick the loss """ return self.label_smoothing == 0.0
[docs] def calc_loss(self, x: dy.Expression, y: Union[numbers.Integral, List[numbers.Integral]]) -> dy.Expression: if self.can_loss_be_derived_from_scores(): scores = self.calc_scores(x) # single mode if not batchers.is_batched(y): loss = dy.pickneglogsoftmax(scores, y) # minibatch mode else: loss = dy.pickneglogsoftmax_batch(scores, y) else: log_prob = self.calc_log_probs(x) if not batchers.is_batched(y): loss = -dy.pick(log_prob, y) else: loss = -dy.pick_batch(log_prob, y) if self.label_smoothing > 0: ls_loss = -dy.mean_elems(log_prob) loss = ((1 - self.label_smoothing) * loss) + (self.label_smoothing * ls_loss) return loss
[docs] def calc_probs(self, x: dy.Expression) -> dy.Expression: return dy.softmax(self.calc_scores(x))
[docs] def calc_log_probs(self, x: dy.Expression) -> dy.Expression: return dy.log_softmax(self.calc_scores(x))
[docs]class LexiconSoftmax(Softmax, Serializable): """ A subclass of the softmax class that can make use of an external lexicon probability as described in: http://anthology.aclweb.org/D/D16/D16-1162.pdf Args: input_dim: Size of the input vector vocab_size: Size of the vocab to predict vocab: A vocab object from which the vocab size can be derived automatically trg_reader: An input reader for the target, which can be used to derive the vocab size label_smoothing: Whether to apply label smoothing (a value of 0.1 is good if so) param_init: How to initialize the parameters bias_init: How to initialize the bias output_projector: The projection to be used before the output lexicon_file: A file containing "trg src p(trg|src)" lexicon_alpha: smoothing constant for bias method lexicon_type: Either bias or linear method """ yaml_tag = '!LexiconSoftmax' @serializable_init @register_xnmt_handler def __init__(self, input_dim: numbers.Integral = Ref("exp_global.default_layer_dim"), vocab_size: Optional[numbers.Integral] = None, vocab: Optional[vocabs.Vocab] = None, trg_reader: Optional[input_readers.InputReader] = Ref("model.trg_reader", default=None), attender = Ref("model.attender"), label_smoothing: numbers.Real = 0.0, param_init: param_initializers.ParamInitializer = Ref("exp_global.param_init", default=bare( param_initializers.GlorotInitializer)), bias_init: param_initializers.ParamInitializer = Ref("exp_global.bias_init", default=bare(param_initializers.ZeroInitializer)), output_projector: transforms.Linear = None, lexicon_file=None, lexicon_alpha=0.001, lexicon_type='bias', coef_predictor: transforms.Linear = None, src_vocab = Ref("model.src_reader.vocab", default=None)) -> None: self.param_col = param_collections.ParamManager.my_params(self) self.input_dim = input_dim self.output_dim = self._choose_vocab_size(vocab_size, vocab, trg_reader) self.label_smoothing = label_smoothing self.output_projector = self.add_serializable_component("output_projector", output_projector, lambda: output_projector or transforms.Linear( input_dim=self.input_dim, output_dim=self.output_dim, param_init=param_init, bias_init=bias_init)) self.coef_predictor = self.add_serializable_component("coef_predictor", coef_predictor, lambda: coef_predictor or transforms.Linear( input_dim=self.input_dim, output_dim=1, param_init=param_init, bias_init=bias_init )) self.lexicon_file = lexicon_file self.lexicon_type = lexicon_type self.lexicon_alpha = lexicon_alpha assert lexicon_type in ["bias", "linear"], "Lexicon type can be either 'bias' or 'linear' only!" # Reference to other parts of the model self.src_vocab = src_vocab self.trg_vocab = vocab if vocab is not None else trg_reader.vocab self.attender = attender # Sparse data structure to store exteranl lexicon prob self.lexicon = None # State of the sofmax self.lexicon_prob = None self.coeff = None self.dict_prob = None def load_lexicon(self): logger.info("Loading lexicon from file: " + self.lexicon_file) lexicon = [{} for _ in range(len(self.src_vocab))] with open(self.lexicon_file, encoding='utf-8') as fp: for line in fp: try: trg, src, prob = line.rstrip().split() except: logger.warning("Failed to parse 'trg src prob' from:" + line.strip()) continue trg_id = self.trg_vocab.convert(trg) src_id = self.src_vocab.convert(src) lexicon[src_id][trg_id] = float(prob) # Setting the rest of the weight to the unknown word for i in range(len(lexicon)): sum_prob = sum(lexicon[i].values()) if sum_prob < 1.0: lexicon[i][self.trg_vocab.convert(self.trg_vocab.unk_token)] = 1.0 - sum_prob # Overriding special tokens src_unk_id = self.src_vocab.convert(self.src_vocab.unk_token) trg_unk_id = self.trg_vocab.convert(self.trg_vocab.unk_token) lexicon[self.src_vocab.SS] = {self.trg_vocab.SS: 1.0} lexicon[self.src_vocab.ES] = {self.trg_vocab.ES: 1.0} # TODO(philip30): Note sure if this is intended lexicon[src_unk_id] = {trg_unk_id: 1.0} return lexicon @handle_xnmt_event def on_new_epoch(self, *args, **kwargs): if self.lexicon is None: self.lexicon = self.load_lexicon() @handle_xnmt_event def on_start_sent(self, src): self.coeff = None self.dict_prob = None batch_size = src.batch_size() col_size = src.sent_len() idxs = [(x, j, i) for i in range(batch_size) for j in range(col_size) for x in self.lexicon[src[i][j]].keys()] idxs = tuple(map(list, list(zip(*idxs)))) values = [x for i in range(batch_size) for j in range(col_size) for x in self.lexicon[src[i][j]].values()] dim = len(self.trg_vocab), col_size, batch_size self.lexicon_prob = dy.nobackprop(dy.sparse_inputTensor(idxs, values, dim, batched=True))
[docs] def calc_scores(self, x: dy.Expression) -> dy.Expression: model_score = self.output_projector.transform(x) if self.lexicon_type == 'bias': model_score += dy.sum_dim(dy.log(self.calculate_dict_prob(x) + self.lexicon_alpha), [1]) return model_score
def calculate_coeff(self, x): if self.coeff is None: self.coeff = dy.logistic(self.coef_predictor.transform(x)) return self.coeff def calculate_dict_prob(self, x): if self.dict_prob is None: self.dict_prob = self.lexicon_prob * self.attender.calc_attention(x) return self.dict_prob
[docs] def calc_probs(self, x: dy.Expression) -> dy.Expression: model_score = dy.softmax(self.calc_scores(x)) if self.lexicon_type == 'linear': coeff = self.calculate_coeff(x) return dy.sum_dim(dy.cmult(coeff, model_score) + dy.cmult((1-coeff), self.calculate_dict_prob(x)), [1]) else: return model_score
[docs] def calc_log_probs(self, x: dy.Expression) -> dy.Expression: if self.lexicon_type == 'linear': return dy.log(self.calc_probs(x)) else: return dy.log_softmax(self.calc_scores(x))
[docs] def can_loss_be_derived_from_scores(self): return self.lexicon_type == 'bias' and super().can_loss_be_derived_from_scores()