Source code for xnmt.modelparts.embedders

import numbers
from typing import Any, Optional, Union
import io

import numpy as np
import dynet as dy

from xnmt import logger
from xnmt import batchers, events, expression_seqs, input_readers, param_collections, param_initializers, sent, vocabs
from xnmt.modelparts import transforms
from xnmt.persistence import bare, Path, Ref, Serializable, serializable_init

[docs]class Embedder(object): """ An embedder takes in word IDs and outputs continuous vectors. This can be done on a word-by-word basis, or over a sequence. """
[docs] def embed(self, word: Any) -> dy.Expression: """Embed a single word. Args: word: This will generally be an integer word ID, but could also be something like a string. It could also be batched, in which case the input will be a :class:`xnmt.batcher.Batch` of integers or other things. Returns: Expression corresponding to the embedding of the word(s). """ raise NotImplementedError('embed must be implemented in Embedder subclasses')
[docs] def embed_sent(self, x: Any) -> expression_seqs.ExpressionSequence: """Embed a full sentence worth of words. By default, just do a for loop. Args: x: This will generally be a list of word IDs, but could also be a list of strings or some other format. It could also be batched, in which case it will be a (possibly masked) :class:`xnmt.batcher.Batch` object Returns: An expression sequence representing vectors of each word in the input. """ # single mode if not batchers.is_batched(x): embeddings = [self.embed(word) for word in x] # minibatch mode else: embeddings = [] seq_len = x.sent_len() for single_sent in x: assert single_sent.sent_len()==seq_len for word_i in range(seq_len): batch = batchers.mark_as_batch([single_sent[word_i] for single_sent in x]) embeddings.append(self.embed(batch)) return expression_seqs.ExpressionSequence(expr_list=embeddings, mask=x.mask if batchers.is_batched(x) else None)
[docs] def choose_vocab(self, vocab: vocabs.Vocab, yaml_path: Path, src_reader: input_readers.InputReader, trg_reader: input_readers.InputReader) -> vocabs.Vocab: """Choose the vocab for the embedder basd on the passed arguments This is done in order of priority of vocab, model+yaml_path Args: vocab: If None, try to obtain from ``src_reader`` or ``trg_reader``, depending on the ``yaml_path`` yaml_path: Path of this embedder in the component hierarchy. Automatically determined when deserializing the YAML model. src_reader: Model's src_reader, if exists and unambiguous. trg_reader: Model's trg_reader, if exists and unambiguous. Returns: chosen vocab """ if vocab is not None: return len(vocab) elif "src_embedder" in yaml_path: if src_reader is None or src_reader.vocab is None: raise ValueError("Could not determine src_embedder's vocabulary. Please set its vocab member explicitly, or specify the vocabulary of src_reader ahead of time.") return len(src_reader.vocab) elif "embedder" in yaml_path or "output_projector" in yaml_path: if trg_reader is None or trg_reader.vocab is None: raise ValueError("Could not determine trg_embedder's vocabulary. Please set its vocab member explicitly, or specify the vocabulary of trg_reader ahead of time.") return len(trg_reader.vocab) else: raise ValueError("Attempted to determine vocab size of {} (path: {}), but path was not src_embedder, trg_embedder, or output_projector, so it could not determine what part of the model to use. Please set vocab_size or vocab explicitly.".format(self.__class__, yaml_path))
[docs] def choose_vocab_size(self, vocab_size: numbers.Integral, vocab: vocabs.Vocab, yaml_path: Path, src_reader: input_readers.InputReader, trg_reader: input_readers.InputReader) -> int: """Choose the vocab size for the embedder based on the passed arguments This is done in order of priority of vocab_size, vocab, model+yaml_path Args: vocab_size : vocab size or None vocab: vocab or None yaml_path: Path of this embedder in the component hierarchy. Automatically determined when YAML-deserializing. src_reader: Model's src_reader, if exists and unambiguous. trg_reader: Model's trg_reader, if exists and unambiguous. Returns: chosen vocab size """ if vocab_size is not None: return vocab_size elif vocab is not None: return len(vocab) elif "src_embedder" in yaml_path: if src_reader is None or getattr(src_reader,"vocab",None) is None: raise ValueError("Could not determine src_embedder's size. " "Please set its vocab_size or vocab member explicitly, or specify the vocabulary of src_reader ahead of time.") return len(src_reader.vocab) elif "embedder" in yaml_path or "output_projector" in yaml_path: if trg_reader is None or trg_reader.vocab is None: raise ValueError("Could not determine target embedder's size. " "Please set its vocab_size or vocab member explicitly, or specify the vocabulary of trg_reader ahead of time.") return len(trg_reader.vocab) else: raise ValueError(f"Attempted to determine vocab size of {self.__class__} (path: {yaml_path}), " f"but path was not src_embedder, decoder.embedder, or output_projector, so it could not determine what part of the model to use. " f"Please set vocab_size or vocab explicitly.")
[docs]class DenseWordEmbedder(Embedder, transforms.Linear, Serializable): """ Word embeddings via full matrix. Args: emb_dim: embedding dimension weight_noise: apply Gaussian noise with given standard deviation to embeddings word_dropout: drop out word types with a certain probability, sampling word types on a per-sentence level, see https://arxiv.org/abs/1512.05287 fix_norm: fix the norm of word vectors to be radius r, see https://arxiv.org/abs/1710.01329 param_init: how to initialize weight matrices bias_init: how to initialize bias vectors vocab_size: vocab size or None vocab: vocab or None yaml_path: Path of this embedder in the component hierarchy. Automatically set by the YAML deserializer. src_reader: A reader for the source side. Automatically set by the YAML deserializer. trg_reader: A reader for the target side. Automatically set by the YAML deserializer. """ yaml_tag = "!DenseWordEmbedder" @events.register_xnmt_handler @serializable_init def __init__(self, emb_dim: numbers.Integral = Ref("exp_global.default_layer_dim"), weight_noise: numbers.Real = Ref("exp_global.weight_noise", default=0.0), word_dropout: numbers.Real = 0.0, fix_norm: Optional[numbers.Real] = None, param_init: param_initializers.ParamInitializer = Ref("exp_global.param_init", default=bare( param_initializers.GlorotInitializer)), bias_init: param_initializers.ParamInitializer = Ref("exp_global.bias_init", default=bare(param_initializers.ZeroInitializer)), vocab_size: Optional[numbers.Integral] = None, vocab: Optional[vocabs.Vocab] = None, yaml_path: Path = '', src_reader: Optional[input_readers.InputReader] = Ref("model.src_reader", default=None), trg_reader: Optional[input_readers.InputReader] = Ref("model.trg_reader", default=None)) -> None: self.fix_norm = fix_norm self.weight_noise = weight_noise self.word_dropout = word_dropout self.emb_dim = emb_dim param_collection = param_collections.ParamManager.my_params(self) self.vocab_size = self.choose_vocab_size(vocab_size, vocab, yaml_path, src_reader, trg_reader) self.save_processed_arg("vocab_size", self.vocab_size) self.embeddings = param_collection.add_parameters((self.vocab_size, self.emb_dim), init=param_init.initializer((self.vocab_size, self.emb_dim), is_lookup=True)) self.bias = param_collection.add_parameters((self.vocab_size,), init=bias_init.initializer((self.vocab_size,))) @events.handle_xnmt_event def on_start_sent(self, *args, **kwargs) -> None: self.word_id_mask = None @events.handle_xnmt_event def on_set_train(self, val: bool) -> None: self.train = val
[docs] def embed(self, x: Union[batchers.Batch, numbers.Integral]) -> dy.Expression: if self.train and self.word_dropout > 0.0 and self.word_id_mask is None: batch_size = x.batch_size() if batchers.is_batched(x) else 1 self.word_id_mask = [set(np.random.choice(self.vocab_size, int(self.vocab_size * self.word_dropout), replace=False)) for _ in range(batch_size)] emb_e = dy.parameter(self.embeddings) # single mode if not batchers.is_batched(x): if self.train and self.word_id_mask and x in self.word_id_mask[0]: ret = dy.zeros((self.emb_dim,)) else: ret = dy.pick(emb_e, index=x) if self.fix_norm is not None: ret = dy.cdiv(ret, dy.l2_norm(ret)) if self.fix_norm != 1: ret *= self.fix_norm # minibatch mode else: ret = dy.pick_batch(emb_e, x) if self.fix_norm is not None: ret = dy.cdiv(ret, dy.l2_norm(ret)) if self.fix_norm != 1: ret *= self.fix_norm if self.train and self.word_id_mask and any(x[i] in self.word_id_mask[i] for i in range(x.batch_size())): dropout_mask = dy.inputTensor(np.transpose([[0.0]*self.emb_dim if x[i] in self.word_id_mask[i] else [1.0]*self.emb_dim for i in range(x.batch_size())]), batched=True) ret = dy.cmult(ret, dropout_mask) if self.train and self.weight_noise > 0.0: ret = dy.noise(ret, self.weight_noise) return ret
def transform(self, input_expr: dy.Expression) -> dy.Expression: W1 = dy.parameter(self.embeddings) b1 = dy.parameter(self.bias) return dy.affine_transform([b1, W1, input_expr])
[docs]class SimpleWordEmbedder(Embedder, Serializable): """ Simple word embeddings via lookup. Args: emb_dim: embedding dimension weight_noise: apply Gaussian noise with given standard deviation to embeddings word_dropout: drop out word types with a certain probability, sampling word types on a per-sentence level, see https://arxiv.org/abs/1512.05287 fix_norm: fix the norm of word vectors to be radius r, see https://arxiv.org/abs/1710.01329 param_init: how to initialize lookup matrices vocab_size: vocab size or None vocab: vocab or None yaml_path: Path of this embedder in the component hierarchy. Automatically set by the YAML deserializer. src_reader: A reader for the source side. Automatically set by the YAML deserializer. trg_reader: A reader for the target side. Automatically set by the YAML deserializer. """ yaml_tag = '!SimpleWordEmbedder' @events.register_xnmt_handler @serializable_init def __init__(self, emb_dim: numbers.Integral = Ref("exp_global.default_layer_dim"), weight_noise: numbers.Real = Ref("exp_global.weight_noise", default=0.0), word_dropout: numbers.Real = 0.0, fix_norm: Optional[numbers.Real] = None, param_init: param_initializers.ParamInitializer = Ref("exp_global.param_init", default=bare( param_initializers.GlorotInitializer)), vocab_size: Optional[numbers.Integral] = None, vocab: Optional[vocabs.Vocab] = None, yaml_path: Path = Path(), src_reader: Optional[input_readers.InputReader] = Ref("model.src_reader", default=None), trg_reader: Optional[input_readers.InputReader] = Ref("model.trg_reader", default=None)) -> None: self.emb_dim = emb_dim self.weight_noise = weight_noise self.word_dropout = word_dropout self.fix_norm = fix_norm self.word_id_mask = None self.train = False param_collection = param_collections.ParamManager.my_params(self) self.vocab_size = self.choose_vocab_size(vocab_size, vocab, yaml_path, src_reader, trg_reader) self.save_processed_arg("vocab_size", self.vocab_size) self.embeddings = param_collection.add_lookup_parameters((self.vocab_size, self.emb_dim), init=param_init.initializer((self.vocab_size, self.emb_dim), is_lookup=True)) @events.handle_xnmt_event def on_set_train(self, val: bool) -> None: self.train = val @events.handle_xnmt_event def on_start_sent(self, *args, **kwargs) -> None: self.word_id_mask = None
[docs] def embed(self, x: Union[numbers.Integral, batchers.Batch]) -> dy.Expression: if self.train and self.word_dropout > 0.0 and self.word_id_mask is None: batch_size = x.batch_size() if batchers.is_batched(x) else 1 self.word_id_mask = [set(np.random.choice(self.vocab_size, int(self.vocab_size * self.word_dropout), replace=False)) for _ in range(batch_size)] # single mode if not batchers.is_batched(x): if self.train and self.word_id_mask and x in self.word_id_mask[0]: ret = dy.zeros((self.emb_dim,)) else: ret = self.embeddings[x] if self.fix_norm is not None: ret = dy.cdiv(ret, dy.l2_norm(ret)) if self.fix_norm != 1: ret *= self.fix_norm # minibatch mode else: ret = self.embeddings.batch(x) if self.fix_norm is not None: ret = dy.cdiv(ret, dy.l2_norm(ret)) if self.fix_norm != 1: ret *= self.fix_norm if self.train and self.word_id_mask and any(x[i] in self.word_id_mask[i] for i in range(x.batch_size())): dropout_mask = dy.inputTensor(np.transpose([[0.0]*self.emb_dim if x[i] in self.word_id_mask[i] else [1.0]*self.emb_dim for i in range(x.batch_size())]), batched=True) ret = dy.cmult(ret, dropout_mask) if self.train and self.weight_noise > 0.0: ret = dy.noise(ret, self.weight_noise) return ret
[docs]class NoopEmbedder(Embedder, Serializable): """ This embedder performs no lookups but only passes through the inputs. Normally, the input is a Sentence object, which is converted to an expression. Args: emb_dim: Size of the inputs """ yaml_tag = '!NoopEmbedder' @serializable_init def __init__(self, emb_dim: Optional[numbers.Integral]) -> None: self.emb_dim = emb_dim
[docs] def embed(self, x: Union[np.ndarray, list]) -> dy.Expression: return dy.inputTensor(x, batched=batchers.is_batched(x))
[docs] def embed_sent(self, x: sent.Sentence) -> expression_seqs.ExpressionSequence: # TODO refactor: seems a bit too many special cases that need to be distinguished batched = batchers.is_batched(x) first_sent = x[0] if batched else x if hasattr(first_sent, "get_array"): if not batched: return expression_seqs.LazyNumpyExpressionSequence(lazy_data=x.get_array()) else: return expression_seqs.LazyNumpyExpressionSequence(lazy_data=batchers.mark_as_batch( [s for s in x]), mask=x.mask) else: if not batched: embeddings = [self.embed(word) for word in x] else: embeddings = [] for word_i in range(x.sent_len()): embeddings.append(self.embed(batchers.mark_as_batch([single_sent[word_i] for single_sent in x]))) return expression_seqs.ExpressionSequence(expr_list=embeddings, mask=x.mask)
[docs]class PretrainedSimpleWordEmbedder(SimpleWordEmbedder, Serializable): """ Simple word embeddings via lookup. Initial pretrained embeddings must be supplied in FastText text format. Args: filename: Filename for the pretrained embeddings emb_dim: embedding dimension; if None, use exp_global.default_layer_dim weight_noise: apply Gaussian noise with given standard deviation to embeddings; if ``None``, use exp_global.weight_noise word_dropout: drop out word types with a certain probability, sampling word types on a per-sentence level, see https://arxiv.org/abs/1512.05287 fix_norm: fix the norm of word vectors to be radius r, see https://arxiv.org/abs/1710.01329 vocab: vocab or None yaml_path: Path of this embedder in the component hierarchy. Automatically set by the YAML deserializer. src_reader: A reader for the source side. Automatically set by the YAML deserializer. trg_reader: A reader for the target side. Automatically set by the YAML deserializer. """ yaml_tag = '!PretrainedSimpleWordEmbedder' @events.register_xnmt_handler @serializable_init def __init__(self, filename: str, emb_dim: numbers.Integral = Ref("exp_global.default_layer_dim"), weight_noise: numbers.Real = Ref("exp_global.weight_noise", default=0.0), word_dropout: numbers.Real = 0.0, fix_norm: Optional[numbers.Real] = None, vocab: Optional[vocabs.Vocab] = None, yaml_path: Path = Path(), src_reader: Optional[input_readers.InputReader] = Ref("model.src_reader", default=None), trg_reader: Optional[input_readers.InputReader] = Ref("model.trg_reader", default=None)) -> None: self.emb_dim = emb_dim self.weight_noise = weight_noise self.word_dropout = word_dropout self.word_id_mask = None self.train = False self.fix_norm = fix_norm self.pretrained_filename = filename param_collection = param_collections.ParamManager.my_params(self) self.vocab = self.choose_vocab(vocab, yaml_path, src_reader, trg_reader) self.vocab_size = len(vocab) self.save_processed_arg("vocab", self.vocab) with open(self.pretrained_filename, encoding='utf-8') as embeddings_file: total_embs, in_vocab, missing, initial_embeddings = self._read_fasttext_embeddings(vocab, embeddings_file) self.embeddings = param_collection.lookup_parameters_from_numpy(initial_embeddings) logger.info(f"{in_vocab} vocabulary matches out of {total_embs} total embeddings; " f"{missing} vocabulary words without a pretrained embedding out of {self.vocab_size}") def _read_fasttext_embeddings(self, vocab: vocabs.Vocab, embeddings_file_handle: io.IOBase) -> tuple: """ Reads FastText embeddings from a file. Also prints stats about the loaded embeddings for sanity checking. Args: vocab: a `Vocab` object containing the vocabulary for the experiment embeddings_file_handle: A file handle on the embeddings file. The embeddings must be in FastText text format. Returns: tuple: A tuple of (total number of embeddings read, # embeddings that match vocabulary words, # vocabulary words without a matching embedding, embeddings array). """ _, dimension = next(embeddings_file_handle).split() if int(dimension) != self.emb_dim: raise Exception(f"An embedding size of {self.emb_dim} was specified, but the pretrained embeddings have size {dimension}") # Poor man's Glorot initializer for missing embeddings bound = np.sqrt(6/(self.vocab_size + self.emb_dim)) total_embs = 0 in_vocab = 0 missing = 0 embeddings = np.empty((self.vocab_size, self.emb_dim), dtype='float') found = np.zeros(self.vocab_size, dtype='bool_') for line in embeddings_file_handle: total_embs += 1 word, vals = line.strip().split(' ', 1) if word in vocab.w2i: in_vocab += 1 index = vocab.w2i[word] embeddings[index] = np.fromstring(vals, sep=" ") found[index] = True for i in range(self.vocab_size): if not found[i]: missing += 1 embeddings[i] = np.random.uniform(-bound, bound, self.emb_dim) return total_embs, in_vocab, missing, embeddings
[docs]class PositionEmbedder(Embedder, Serializable): yaml_tag = '!PositionEmbedder' @serializable_init def __init__(self, max_pos: numbers.Integral, emb_dim: numbers.Integral = Ref("exp_global.default_layer_dim"), param_init: param_initializers.ParamInitializer = Ref("exp_global.param_init", default=bare(param_initializers.GlorotInitializer))) \ -> None: """ max_pos: largest embedded position emb_dim: embedding size param_init: how to initialize embedding matrix """ self.max_pos = max_pos self.emb_dim = emb_dim param_collection = param_collections.ParamManager.my_params(self) param_init = param_init dim = (self.emb_dim, max_pos) self.embeddings = param_collection.add_parameters(dim, init=param_init.initializer(dim, is_lookup=True))
[docs] def embed(self, word): raise NotImplementedError("Position-embedding for individual words not implemented yet.")
[docs] def embed_sent(self, sent_len: numbers.Integral) -> expression_seqs.ExpressionSequence: embeddings = dy.strided_select(dy.parameter(self.embeddings), [1,1], [0,0], [self.emb_dim, sent_len]) return expression_seqs.ExpressionSequence(expr_tensor=embeddings, mask=None)