Source code for xnmt.transducers.recurrent

import numbers
import collections.abc
from typing import List, Optional, Sequence, Tuple, Union

import numpy as np
import dynet as dy

from xnmt import expression_seqs, param_collections, param_initializers
from xnmt.events import register_xnmt_handler, handle_xnmt_event
from xnmt.transducers import base as transducers
from xnmt.persistence import bare, Ref, Serializable, serializable_init, Path

[docs]class UniLSTMState(object): """ State object for UniLSTMSeqTransducer. """ def __init__(self, network: 'UniLSTMSeqTransducer', prev: Optional['UniLSTMState'] = None, c: Sequence[dy.Expression] = None, h: Sequence[dy.Expression] = None) -> None: self._network = network if c is None: c = [dy.zeroes(dim=(network.hidden_dim,)) for _ in range(network.num_layers)] if h is None: h = [dy.zeroes(dim=(network.hidden_dim,)) for _ in range(network.num_layers)] self._c = tuple(c) self._h = tuple(h) self._prev = prev def add_input(self, x: Union[dy.Expression, Sequence[dy.Expression]]): new_c, new_h = self._network.add_input_to_prev(self, x) return UniLSTMState(self._network, prev=self, c=new_c, h=new_h) def b(self) -> 'UniLSTMSeqTransducer': return self._network def h(self) -> Sequence[dy.Expression]: return self._h def c(self) -> Sequence[dy.Expression]: return self._c def s(self) -> Sequence[dy.Expression]: return self._c + self._h def prev(self) -> 'UniLSTMState': return self._prev def set_h(self, es: Optional[Sequence[dy.Expression]] = None) -> 'UniLSTMState': if es is not None: assert len(es) == self._network.num_layers self._h = tuple(es) return self def set_s(self, es: Optional[Sequence[dy.Expression]] = None) -> 'UniLSTMState': if es is not None: assert len(es) == 2 * self._network.num_layers self._c = tuple(es[:self._network.num_layers]) self._h = tuple(es[self._network.num_layers:]) return self def output(self) -> dy.Expression: return self._h[-1] def __getitem__(self, item): return UniLSTMState(network=self._network, prev=self._prev, c=[ci[item] for ci in self._c], h=[hi[item] for hi in self._h])
[docs]class UniLSTMSeqTransducer(transducers.SeqTransducer, Serializable): """ This implements a single LSTM layer based on the memory-friendly dedicated DyNet nodes. It works similar to DyNet's CompactVanillaLSTMBuilder, but in addition supports taking multiple inputs that are concatenated on-the-fly. Args: layers (int): number of layers input_dim (int): input dimension hidden_dim (int): hidden dimension dropout (float): dropout probability weightnoise_std (float): weight noise standard deviation param_init (ParamInitializer): how to initialize weight matrices bias_init (ParamInitializer): how to initialize bias vectors yaml_path (str): decoder_input_dim (int): input dimension of the decoder; if ``yaml_path`` contains 'decoder' and ``decoder_input_feeding`` is True, this will be added to ``input_dim`` decoder_input_feeding (bool): whether this transducer is part of an input-feeding decoder; cf. ``decoder_input_dim`` """ yaml_tag = '!UniLSTMSeqTransducer' @register_xnmt_handler @serializable_init def __init__(self, layers: numbers.Integral = 1, input_dim: numbers.Integral = Ref("exp_global.default_layer_dim"), hidden_dim: numbers.Integral = Ref("exp_global.default_layer_dim"), dropout: numbers.Real = Ref("exp_global.dropout", default=0.0), weightnoise_std: numbers.Real = Ref("exp_global.weight_noise", default=0.0), param_init: param_initializers.ParamInitializer = Ref("exp_global.param_init", default=bare(param_initializers.GlorotInitializer)), bias_init: param_initializers.ParamInitializer = Ref("exp_global.bias_init", default=bare(param_initializers.ZeroInitializer)), yaml_path: Path = Path(), decoder_input_dim: Optional[numbers.Integral] = Ref("exp_global.default_layer_dim", default=None), decoder_input_feeding: bool = True) -> None: self.num_layers = layers model = param_collections.ParamManager.my_params(self) self.hidden_dim = hidden_dim self.dropout_rate = dropout self.weightnoise_std = weightnoise_std self.input_dim = input_dim self.total_input_dim = input_dim if yaml_path is not None and "decoder" in yaml_path: if decoder_input_feeding: self.total_input_dim += decoder_input_dim if not isinstance(param_init, collections.abc.Sequence): param_init = [param_init] * layers if not isinstance(bias_init, collections.abc.Sequence): bias_init = [bias_init] * layers # [i; f; o; g] self.p_Wx = [model.add_parameters(dim=(hidden_dim*4, self.total_input_dim), init=param_init[0].initializer((hidden_dim*4, self.total_input_dim), num_shared=4))] self.p_Wx += [model.add_parameters(dim=(hidden_dim*4, hidden_dim), init=param_init[i].initializer((hidden_dim*4, hidden_dim), num_shared=4)) for i in range(1, layers)] self.p_Wh = [model.add_parameters(dim=(hidden_dim*4, hidden_dim), init=param_init[i].initializer((hidden_dim*4, hidden_dim), num_shared=4)) for i in range(layers)] self.p_b = [model.add_parameters(dim=(hidden_dim*4,), init=bias_init[i].initializer((hidden_dim*4,), num_shared=4)) for i in range(layers)] self.dropout_mask_x = None self.dropout_mask_h = None @handle_xnmt_event def on_set_train(self, val): self.train = val @handle_xnmt_event def on_start_sent(self, src): self._final_states = None self.Wx = [dy.parameter(Wx) for Wx in self.p_Wx] self.Wh = [dy.parameter(Wh) for Wh in self.p_Wh] self.b = [dy.parameter(b) for b in self.p_b] self.dropout_mask_x = None self.dropout_mask_h = None
[docs] def get_final_states(self) -> List[transducers.FinalTransducerState]: return self._final_states
def initial_state(self) -> UniLSTMState: return UniLSTMState(self) def set_dropout(self, dropout: numbers.Real) -> None: self.dropout_rate = dropout def set_dropout_masks(self, batch_size: numbers.Integral = 1) -> None: if self.dropout_rate > 0.0 and self.train: retention_rate = 1.0 - self.dropout_rate scale = 1.0 / retention_rate self.dropout_mask_x = [dy.random_bernoulli((self.total_input_dim,), retention_rate, scale, batch_size=batch_size)] self.dropout_mask_x += [dy.random_bernoulli((self.hidden_dim,), retention_rate, scale, batch_size=batch_size) for _ in range(1, self.num_layers)] self.dropout_mask_h = [dy.random_bernoulli((self.hidden_dim,), retention_rate, scale, batch_size=batch_size) for _ in range(self.num_layers)] def add_input_to_prev(self, prev_state: UniLSTMState, x: Union[dy.Expression, Sequence[dy.Expression]]) \ -> Tuple[Sequence[dy.Expression]]: if isinstance(x, dy.Expression): x = [x] elif type(x) != list: x = list(x) if self.dropout_rate > 0.0 and self.train and self.dropout_mask_x is None: self.set_dropout_masks() new_c, new_h = [], [] for layer_i in range(self.num_layers): if self.dropout_rate > 0.0 and self.train: # apply dropout according to https://arxiv.org/abs/1512.05287 (tied weights) gates = dy.vanilla_lstm_gates_dropout_concat( x, prev_state._h[layer_i], self.Wx[layer_i], self.Wh[layer_i], self.b[layer_i], self.dropout_mask_x[layer_i], self.dropout_mask_h[layer_i], self.weightnoise_std if self.train else 0.0) else: gates = dy.vanilla_lstm_gates_concat( x, prev_state._h[layer_i], self.Wx[layer_i], self.Wh[layer_i], self.b[layer_i], self.weightnoise_std if self.train else 0.0) new_c.append(dy.vanilla_lstm_c(prev_state._c[layer_i], gates)) new_h.append(dy.vanilla_lstm_h(new_c[-1], gates)) x = [new_h[-1]] return new_c, new_h
[docs] def transduce(self, expr_seq: 'expression_seqs.ExpressionSequence') -> 'expression_seqs.ExpressionSequence': """ transduce the sequence, applying masks if given (masked timesteps simply copy previous h / c) Args: expr_seq: expression sequence or list of expression sequences (where each inner list will be concatenated) Returns: expression sequence """ if isinstance(expr_seq, expression_seqs.ExpressionSequence): expr_seq = [expr_seq] batch_size = expr_seq[0][0].dim()[1] seq_len = len(expr_seq[0]) if self.dropout_rate > 0.0 and self.train: self.set_dropout_masks(batch_size=batch_size) cur_input = expr_seq self._final_states = [] for layer_i in range(self.num_layers): h = [dy.zeroes(dim=(self.hidden_dim,), batch_size=batch_size)] c = [dy.zeroes(dim=(self.hidden_dim,), batch_size=batch_size)] for pos_i in range(seq_len): x_t = [cur_input[j][pos_i] for j in range(len(cur_input))] if isinstance(x_t, dy.Expression): x_t = [x_t] elif type(x_t) != list: x_t = list(x_t) if sum([x_t_i.dim()[0][0] for x_t_i in x_t]) != self.total_input_dim: found_dim = sum([x_t_i.dim()[0][0] for x_t_i in x_t]) raise ValueError(f"VanillaLSTMGates: x_t has inconsistent dimension {found_dim}, expecting {self.total_input_dim}") if self.dropout_rate > 0.0 and self.train: # apply dropout according to https://arxiv.org/abs/1512.05287 (tied weights) gates_t = dy.vanilla_lstm_gates_dropout_concat(x_t, h[-1], self.Wx[layer_i], self.Wh[layer_i], self.b[layer_i], self.dropout_mask_x[layer_i], self.dropout_mask_h[layer_i], self.weightnoise_std if self.train else 0.0) else: gates_t = dy.vanilla_lstm_gates_concat(x_t, h[-1], self.Wx[layer_i], self.Wh[layer_i], self.b[layer_i], self.weightnoise_std if self.train else 0.0) c_t = dy.vanilla_lstm_c(c[-1], gates_t) h_t = dy.vanilla_lstm_h(c_t, gates_t) if expr_seq[0].mask is None or np.isclose(np.sum(expr_seq[0].mask.np_arr[:,pos_i:pos_i+1]), 0.0): c.append(c_t) h.append(h_t) else: c.append(expr_seq[0].mask.cmult_by_timestep_expr(c_t,pos_i,True) + expr_seq[0].mask.cmult_by_timestep_expr(c[-1],pos_i,False)) h.append(expr_seq[0].mask.cmult_by_timestep_expr(h_t,pos_i,True) + expr_seq[0].mask.cmult_by_timestep_expr(h[-1],pos_i,False)) self._final_states.append(transducers.FinalTransducerState(h[-1], c[-1])) cur_input = [h[1:]] return expression_seqs.ExpressionSequence(expr_list=h[1:], mask=expr_seq[0].mask)
[docs]class BiLSTMSeqTransducer(transducers.SeqTransducer, Serializable): """ This implements a bidirectional LSTM and requires about 8.5% less memory per timestep than DyNet's CompactVanillaLSTMBuilder due to avoiding concat operations. It uses 2 :class:`xnmt.lstm.UniLSTMSeqTransducer` objects in each layer. Args: layers (int): number of layers input_dim (int): input dimension hidden_dim (int): hidden dimension dropout (float): dropout probability weightnoise_std (float): weight noise standard deviation param_init: a :class:`xnmt.param_init.ParamInitializer` or list of :class:`xnmt.param_init.ParamInitializer` objects specifying how to initialize weight matrices. If a list is given, each entry denotes one layer. bias_init: a :class:`xnmt.param_init.ParamInitializer` or list of :class:`xnmt.param_init.ParamInitializer` objects specifying how to initialize bias vectors. If a list is given, each entry denotes one layer. forward_layers: set automatically backward_layers: set automatically """ yaml_tag = '!BiLSTMSeqTransducer' @register_xnmt_handler @serializable_init def __init__(self, layers: numbers.Integral = 1, input_dim: numbers.Integral = Ref("exp_global.default_layer_dim"), hidden_dim: numbers.Integral = Ref("exp_global.default_layer_dim"), dropout: numbers.Real = Ref("exp_global.dropout", default=0.0), weightnoise_std: numbers.Real = Ref("exp_global.weight_noise", default=0.0), param_init: param_initializers.ParamInitializer = Ref("exp_global.param_init", default=bare(param_initializers.GlorotInitializer)), bias_init: param_initializers.ParamInitializer = Ref("exp_global.bias_init", default=bare(param_initializers.ZeroInitializer)), forward_layers : Optional[Sequence[UniLSTMSeqTransducer]] = None, backward_layers: Optional[Sequence[UniLSTMSeqTransducer]] = None) -> None: self.num_layers = layers self.hidden_dim = hidden_dim self.dropout_rate = dropout self.weightnoise_std = weightnoise_std assert hidden_dim % 2 == 0 self.forward_layers = self.add_serializable_component("forward_layers", forward_layers, lambda: [ UniLSTMSeqTransducer(input_dim=input_dim if i == 0 else hidden_dim, hidden_dim=hidden_dim // 2, dropout=dropout, weightnoise_std=weightnoise_std, param_init=param_init[i] if isinstance(param_init, collections.abc.Sequence) else param_init, bias_init=bias_init[i] if isinstance(bias_init, collections.abc.Sequence) else bias_init) for i in range(layers)]) self.backward_layers = self.add_serializable_component("backward_layers", backward_layers, lambda: [ UniLSTMSeqTransducer(input_dim=input_dim if i == 0 else hidden_dim, hidden_dim=hidden_dim // 2, dropout=dropout, weightnoise_std=weightnoise_std, param_init=param_init[i] if isinstance(param_init, collections.abc.Sequence) else param_init, bias_init=bias_init[i] if isinstance(bias_init, collections.abc.Sequence) else bias_init) for i in range(layers)]) @handle_xnmt_event def on_start_sent(self, src): self._final_states = None
[docs] def get_final_states(self) -> List[transducers.FinalTransducerState]: return self._final_states
[docs] def transduce(self, es: 'expression_seqs.ExpressionSequence') -> 'expression_seqs.ExpressionSequence': mask = es.mask # first layer forward_es = self.forward_layers[0].transduce(es) rev_backward_es = self.backward_layers[0].transduce(expression_seqs.ReversedExpressionSequence(es)) for layer_i in range(1, len(self.forward_layers)): new_forward_es = self.forward_layers[layer_i].transduce([forward_es, expression_seqs.ReversedExpressionSequence(rev_backward_es)]) rev_backward_es = expression_seqs.ExpressionSequence( self.backward_layers[layer_i].transduce([expression_seqs.ReversedExpressionSequence(forward_es), rev_backward_es]).as_list(), mask=mask) forward_es = new_forward_es self._final_states = [ transducers.FinalTransducerState(dy.concatenate([self.forward_layers[layer_i].get_final_states()[0].main_expr(), self.backward_layers[layer_i].get_final_states()[ 0].main_expr()]), dy.concatenate([self.forward_layers[layer_i].get_final_states()[0].cell_expr(), self.backward_layers[layer_i].get_final_states()[ 0].cell_expr()])) \ for layer_i in range(len(self.forward_layers))] return expression_seqs.ExpressionSequence(expr_list=[dy.concatenate([forward_es[i],rev_backward_es[-i-1]]) for i in range(len(forward_es))], mask=mask)
[docs]class CustomLSTMSeqTransducer(transducers.SeqTransducer, Serializable): """ This implements an LSTM builder based on elementary DyNet operations. It is more memory-hungry than the compact LSTM, but can be extended more easily. It currently does not support dropout or multiple layers and is mostly meant as a starting point for LSTM extensions. Args: layers (int): number of layers input_dim (int): input dimension; if None, use exp_global.default_layer_dim hidden_dim (int): hidden dimension; if None, use exp_global.default_layer_dim param_init: a :class:`xnmt.param_init.ParamInitializer` or list of :class:`xnmt.param_init.ParamInitializer` objects specifying how to initialize weight matrices. If a list is given, each entry denotes one layer. If None, use ``exp_global.param_init`` bias_init: a :class:`xnmt.param_init.ParamInitializer` or list of :class:`xnmt.param_init.ParamInitializer` objects specifying how to initialize bias vectors. If a list is given, each entry denotes one layer. If None, use ``exp_global.param_init`` """ yaml_tag = "!CustomLSTMSeqTransducer" @serializable_init def __init__(self, layers: numbers.Integral, input_dim: numbers.Integral, hidden_dim: numbers.Integral, param_init: param_initializers.ParamInitializer = Ref("exp_global.param_init", default=bare(param_initializers.GlorotInitializer)), bias_init: param_initializers.ParamInitializer = Ref("exp_global.bias_init", default=bare(param_initializers.ZeroInitializer))) -> None: if layers!=1: raise RuntimeError("CustomLSTMSeqTransducer supports only exactly one layer") self.input_dim = input_dim self.hidden_dim = hidden_dim model = param_collections.ParamManager.my_params(self) # [i; f; o; g] self.p_Wx = model.add_parameters(dim=(hidden_dim*4, input_dim), init=param_init.initializer((hidden_dim*4, input_dim))) self.p_Wh = model.add_parameters(dim=(hidden_dim*4, hidden_dim), init=param_init.initializer((hidden_dim*4, hidden_dim))) self.p_b = model.add_parameters(dim=(hidden_dim*4,), init=bias_init.initializer((hidden_dim*4,)))
[docs] def transduce(self, xs: 'expression_seqs.ExpressionSequence') -> 'expression_seqs.ExpressionSequence': Wx = dy.parameter(self.p_Wx) Wh = dy.parameter(self.p_Wh) b = dy.parameter(self.p_b) h = [] c = [] for i, x_t in enumerate(xs): if i==0: tmp = dy.affine_transform([b, Wx, x_t]) else: tmp = dy.affine_transform([b, Wx, x_t, Wh, h[-1]]) i_ait = dy.pick_range(tmp, 0, self.hidden_dim) i_aft = dy.pick_range(tmp, self.hidden_dim, self.hidden_dim*2) i_aot = dy.pick_range(tmp, self.hidden_dim*2, self.hidden_dim*3) i_agt = dy.pick_range(tmp, self.hidden_dim*3, self.hidden_dim*4) i_it = dy.logistic(i_ait) i_ft = dy.logistic(i_aft + 1.0) i_ot = dy.logistic(i_aot) i_gt = dy.tanh(i_agt) if i==0: c.append(dy.cmult(i_it, i_gt)) else: c.append(dy.cmult(i_ft, c[-1]) + dy.cmult(i_it, i_gt)) h.append(dy.cmult(i_ot, dy.tanh(c[-1]))) return h