Source code for recwizard.modules.kgsf.transformer_utils
# Copyright (c) Facebook, Inc. and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import torch
import torch.nn as nn
import torch.nn.functional as F
import os
import math
import pickle as pkl
from collections import OrderedDict
import numpy as np
from .utils import neginf
[docs]def _normalize(tensor, norm_layer):
"""Broadcast layer norm"""
size = tensor.size()
return norm_layer(tensor.view(-1, size[-1])).view(size)
[docs]def _build_encoder(opt, dictionary, embedding=None, padding_idx=None, reduction=True,
n_positions=1024):
return TransformerEncoder(
n_heads=opt['n_heads'],
n_layers=opt['n_layers'],
embedding_size=opt['embedding_size'],
ffn_size=opt['ffn_size'],
vocabulary_size=len(dictionary)+4,
embedding=embedding,
dropout=opt['dropout'],
attention_dropout=opt['attention_dropout'],
relu_dropout=opt['relu_dropout'],
padding_idx=padding_idx,
learn_positional_embeddings=opt.get('learn_positional_embeddings', False),
embeddings_scale=opt['embeddings_scale'],
reduction=reduction,
n_positions=n_positions,
)
[docs]def _build_encoder4kg(opt, padding_idx=None, reduction=True,
n_positions=1024):
return TransformerEncoder4kg(
n_heads=1,#opt['n_heads'],
n_layers=1,#opt['n_layers'],
embedding_size=opt['dim'],#opt['embedding_size'],
ffn_size=opt['dim'],#opt['ffn_size'],
dropout=opt['dropout'],
attention_dropout=opt['attention_dropout'],
relu_dropout=opt['relu_dropout'],
padding_idx=padding_idx,
learn_positional_embeddings=opt.get('learn_positional_embeddings', False),
embeddings_scale=opt['embeddings_scale'],
reduction=reduction,
n_positions=n_positions,
)
[docs]def _build_encoder_mask(opt, dictionary, embedding=None, padding_idx=None, reduction=True,
n_positions=1024):
return TransformerEncoder_mask(
n_heads=opt['n_heads'],
n_layers=opt['n_layers'],
embedding_size=opt['embedding_size'],
ffn_size=opt['ffn_size'],
vocabulary_size=len(dictionary)+4,
embedding=embedding,
dropout=opt['dropout'],
attention_dropout=opt['attention_dropout'],
relu_dropout=opt['relu_dropout'],
padding_idx=padding_idx,
learn_positional_embeddings=opt.get('learn_positional_embeddings', False),
embeddings_scale=opt['embeddings_scale'],
reduction=reduction,
n_positions=n_positions,
)
[docs]def _build_decoder(opt, dictionary, embedding=None, padding_idx=None,
n_positions=1024):
return TransformerDecoder(
n_heads=opt['n_heads'],
n_layers=opt['n_layers'],
embedding_size=opt['embedding_size'],
ffn_size=opt['ffn_size'],
vocabulary_size=len(dictionary)+4,
embedding=embedding,
dropout=opt['dropout'],
attention_dropout=opt['attention_dropout'],
relu_dropout=opt['relu_dropout'],
padding_idx=padding_idx,
learn_positional_embeddings=opt.get('learn_positional_embeddings', False),
embeddings_scale=opt['embeddings_scale'],
n_positions=n_positions,
)
[docs]def _build_decoder4kg(opt, dictionary, embedding=None, padding_idx=None,
n_positions=1024):
return TransformerDecoderKG(
n_heads=config.n_heads,
n_layers=config.n_relation,
embedding_size=config.embedding_size,
ffn_size=config.ffn_size,
vocabulary_size=len(dictionary)+4,
embedding=embedding,
dropout=config.dropout,
attention_dropout=config.attention_dropout,
relu_dropout=config.relu_dropout,
padding_idx=padding_idx,
learn_positional_embeddings=config.learn_positional_embeddings,
embeddings_scale=config.embeddings_scale,
n_positions=n_positions,
)
[docs]def create_position_codes(n_pos, dim, out):
position_enc = np.array([
[pos / np.power(10000, 2 * j / dim) for j in range(dim // 2)]
for pos in range(n_pos)
])
out_temp = out.clone()
out_temp[:, 0::2] = torch.FloatTensor(np.sin(position_enc)).type_as(out)
out_temp[:, 1::2] = torch.FloatTensor(np.cos(position_enc)).type_as(out)
out = out_temp.clone()
out.detach_()
out.requires_grad = False
[docs]class BasicAttention(nn.Module):
[docs] def __init__(self, dim=1, attn='cosine'):
super().__init__()
self.softmax = nn.Softmax(dim=dim)
if attn == 'cosine':
self.cosine = nn.CosineSimilarity(dim=dim)
self.attn = attn
self.dim = dim
[docs] def forward(self, xs, ys):
if self.attn == 'cosine':
l1 = self.cosine(xs, ys).unsqueeze(self.dim - 1)
else:
l1 = torch.bmm(xs, ys.transpose(1, 2))
if self.attn == 'sqrt':
d_k = ys.size(-1)
l1 = l1 / math.sqrt(d_k)
l2 = self.softmax(l1)
lhs_emb = torch.bmm(l2, ys)
# add back the query
lhs_emb = lhs_emb.add(xs)
return lhs_emb.squeeze(self.dim - 1), l2
[docs]class MultiHeadAttention(nn.Module):
[docs] def __init__(self, n_heads, dim, dropout=0):
super(MultiHeadAttention, self).__init__()
self.n_heads = n_heads
self.dim = dim
self.attn_dropout = nn.Dropout(p=dropout) # --attention-dropout
self.q_lin = nn.Linear(dim, dim)
self.k_lin = nn.Linear(dim, dim)
self.v_lin = nn.Linear(dim, dim)
# TODO: merge for the initialization step
nn.init.xavier_normal_(self.q_lin.weight)
nn.init.xavier_normal_(self.k_lin.weight)
nn.init.xavier_normal_(self.v_lin.weight)
# and set biases to 0
self.out_lin = nn.Linear(dim, dim)
nn.init.xavier_normal_(self.out_lin.weight)
[docs] def forward(self, query, key=None, value=None, mask=None):
# Input is [B, query_len, dim]
# Mask is [B, key_len] (selfattn) or [B, key_len, key_len] (enc attn)
batch_size, query_len, dim = query.size()
assert dim == self.dim, \
f'Dimensions do not match: {dim} query vs {self.dim} configured'
assert mask is not None, 'Mask is None, please specify a mask'
n_heads = self.n_heads
dim_per_head = dim // n_heads
scale = math.sqrt(dim_per_head)
def prepare_head(tensor):
# input is [batch_size, seq_len, n_heads * dim_per_head]
# output is [batch_size * n_heads, seq_len, dim_per_head]
bsz, seq_len, _ = tensor.size()
tensor = tensor.view(batch_size, tensor.size(1), n_heads, dim_per_head)
tensor = tensor.transpose(1, 2).contiguous().view(
batch_size * n_heads,
seq_len,
dim_per_head
)
return tensor
# q, k, v are the transformed values
if key is None and value is None:
# self attention
key = value = query
elif value is None:
# key and value are the same, but query differs
# self attention
value = key
_, key_len, dim = key.size()
q = prepare_head(self.q_lin(query))
k = prepare_head(self.k_lin(key))
v = prepare_head(self.v_lin(value))
dot_prod = q.div_(scale).bmm(k.transpose(1, 2))
# [B * n_heads, query_len, key_len]
attn_mask = (
(mask == 0)
.view(batch_size, 1, -1, key_len)
.repeat(1, n_heads, 1, 1)
.expand(batch_size, n_heads, query_len, key_len)
.view(batch_size * n_heads, query_len, key_len)
)
assert attn_mask.shape == dot_prod.shape
dot_prod.masked_fill_(attn_mask, neginf(dot_prod.dtype))
attn_weights = F.softmax(dot_prod, dim=-1).type_as(query)
attn_weights = self.attn_dropout(attn_weights) # --attention-dropout
attentioned = attn_weights.bmm(v)
attentioned = (
attentioned.type_as(query)
.view(batch_size, n_heads, query_len, dim_per_head)
.transpose(1, 2).contiguous()
.view(batch_size, query_len, dim)
)
out = self.out_lin(attentioned)
return out
[docs]class TransformerFFN(nn.Module):
[docs] def __init__(self, dim, dim_hidden, relu_dropout=0):
super(TransformerFFN, self).__init__()
self.relu_dropout = nn.Dropout(p=relu_dropout)
self.lin1 = nn.Linear(dim, dim_hidden)
self.lin2 = nn.Linear(dim_hidden, dim)
nn.init.xavier_uniform_(self.lin1.weight)
nn.init.xavier_uniform_(self.lin2.weight)
# TODO: initialize biases to 0
[docs] def forward(self, x):
x = F.relu(self.lin1(x))
x = self.relu_dropout(x) # --relu-dropout
x = self.lin2(x)
return x
[docs]class TransformerResponseWrapper(nn.Module):
"""Transformer response rapper. Pushes input through transformer and MLP"""
[docs] def __init__(self, transformer, hdim):
super(TransformerResponseWrapper, self).__init__()
dim = transformer.out_dim
self.transformer = transformer
self.mlp = nn.Sequential(
nn.Linear(dim, hdim),
nn.ReLU(),
nn.Linear(hdim, dim)
)
[docs]class TransformerEncoder4kg(nn.Module):
"""
Transformer encoder module.
:param int n_heads: the number of multihead attention heads.
:param int n_layers: number of transformer layers.
:param int embedding_size: the embedding sizes. Must be a multiple of n_heads.
:param int ffn_size: the size of the hidden layer in the FFN
:param embedding: an embedding matrix for the bottom layer of the transformer.
If none, one is created for this encoder.
:param float dropout: Dropout used around embeddings and before layer
layer normalizations. This is used in Vaswani 2017 and works well on
large datasets.
:param float attention_dropout: Dropout performed after the multhead attention
softmax. This is not used in Vaswani 2017.
:param float relu_attention: Dropout used after the ReLU in the FFN. Not used
in Vaswani 2017, but used in Tensor2Tensor.
:param int padding_idx: Reserved padding index in the embeddings matrix.
:param bool learn_positional_embeddings: If off, sinusoidal embeddings are
used. If on, position embeddings are learned from scratch.
:param bool embeddings_scale: Scale embeddings relative to their dimensionality.
Found useful in fairseq.
:param bool reduction: If true, returns the mean vector for the entire encoding
sequence.
:param int n_positions: Size of the position embeddings matrix.
"""
[docs] def __init__(
self,
n_heads,
n_layers,
embedding_size,
ffn_size,
dropout=0.0,
attention_dropout=0.0,
relu_dropout=0.0,
padding_idx=0,
learn_positional_embeddings=False,
embeddings_scale=False,
reduction=True,
n_positions=1024
):
super(TransformerEncoder4kg, self).__init__()
self.embedding_size = embedding_size
self.ffn_size = ffn_size
self.n_layers = n_layers
self.n_heads = n_heads
self.dim = embedding_size
self.embeddings_scale = embeddings_scale
self.reduction = reduction
self.padding_idx = padding_idx
# this is --dropout, not --relu-dropout or --attention-dropout
self.dropout = nn.Dropout(p=dropout)
self.out_dim = embedding_size
assert embedding_size % n_heads == 0, \
'Transformer embedding size must be a multiple of n_heads'
# create the positional embeddings
self.position_embeddings = nn.Embedding(n_positions, embedding_size)
if not learn_positional_embeddings:
create_position_codes(
n_positions, embedding_size, out=self.position_embeddings.weight
)
else:
nn.init.normal_(self.position_embeddings.weight, 0, embedding_size ** -0.5)
# build the model
self.layers = nn.ModuleList()
for _ in range(self.n_layers):
self.layers.append(TransformerEncoderLayer(
n_heads, embedding_size, ffn_size,
attention_dropout=attention_dropout,
relu_dropout=relu_dropout,
dropout=dropout,
))
[docs] def forward(self, input, mask):
"""
input data is a FloatTensor of shape [batch, seq_len, dim]
mask is a ByteTensor of shape [batch, seq_len], filled with 1 when
inside the sequence and 0 outside.
"""
positions = (mask.cumsum(dim=1, dtype=torch.int64) - 1).clamp_(min=0)
tensor = input
if self.embeddings_scale:
tensor = tensor * np.sqrt(self.dim)
tensor = tensor + self.position_embeddings(positions).expand_as(tensor)
# --dropout on the embeddings
tensor = self.dropout(tensor)
tensor *= mask.unsqueeze(-1).type_as(tensor)
for i in range(self.n_layers):
tensor = self.layers[i](tensor, mask)
if self.reduction:
divisor = mask.type_as(tensor).sum(dim=1).unsqueeze(-1).clamp(min=1e-7)
output = tensor.sum(dim=1) / divisor
return output
else:
output = tensor
return output, mask
[docs]class TransformerEncoderLayer(nn.Module):
[docs] def __init__(
self,
n_heads,
embedding_size,
ffn_size,
attention_dropout=0.0,
relu_dropout=0.0,
dropout=0.0,
):
super().__init__()
self.dim = embedding_size
self.ffn_dim = ffn_size
self.attention = MultiHeadAttention(
n_heads, embedding_size,
dropout=attention_dropout, # --attention-dropout
)
self.norm1 = nn.LayerNorm(embedding_size)
self.ffn = TransformerFFN(embedding_size, ffn_size, relu_dropout=relu_dropout)
self.norm2 = nn.LayerNorm(embedding_size)
self.dropout = nn.Dropout(p=dropout)
[docs] def forward(self, tensor, mask):
tensor = tensor + self.dropout(self.attention(tensor, mask=mask))
tensor = _normalize(tensor, self.norm1)
tensor = tensor + self.dropout(self.ffn(tensor))
tensor = _normalize(tensor, self.norm2)
tensor *= mask.unsqueeze(-1).type_as(tensor)
return tensor
[docs]class TransformerEncoder(nn.Module):
"""
Transformer encoder module.
:param int n_heads: the number of multihead attention heads.
:param int n_layers: number of transformer layers.
:param int embedding_size: the embedding sizes. Must be a multiple of n_heads.
:param int ffn_size: the size of the hidden layer in the FFN
:param embedding: an embedding matrix for the bottom layer of the transformer.
If none, one is created for this encoder.
:param float dropout: Dropout used around embeddings and before layer
layer normalizations. This is used in Vaswani 2017 and works well on
large datasets.
:param float attention_dropout: Dropout performed after the multhead attention
softmax. This is not used in Vaswani 2017.
:param float relu_attention: Dropout used after the ReLU in the FFN. Not used
in Vaswani 2017, but used in Tensor2Tensor.
:param int padding_idx: Reserved padding index in the embeddings matrix.
:param bool learn_positional_embeddings: If off, sinusoidal embeddings are
used. If on, position embeddings are learned from scratch.
:param bool embeddings_scale: Scale embeddings relative to their dimensionality.
Found useful in fairseq.
:param bool reduction: If true, returns the mean vector for the entire encoding
sequence.
:param int n_positions: Size of the position embeddings matrix.
"""
[docs] def __init__(
self,
n_heads,
n_layers,
embedding_size,
ffn_size,
vocabulary_size,
embedding=None,
dropout=0.0,
attention_dropout=0.0,
relu_dropout=0.0,
padding_idx=0,
learn_positional_embeddings=False,
embeddings_scale=False,
reduction=True,
n_positions=1024
):
super(TransformerEncoder, self).__init__()
self.embedding_size = embedding_size
self.ffn_size = ffn_size
self.n_layers = n_layers
self.n_heads = n_heads
self.dim = embedding_size
self.embeddings_scale = embeddings_scale
self.reduction = reduction
self.padding_idx = padding_idx
# this is --dropout, not --relu-dropout or --attention-dropout
self.dropout = nn.Dropout(p=dropout)
self.out_dim = embedding_size
assert embedding_size % n_heads == 0, \
'Transformer embedding size must be a multiple of n_heads'
# check input formats:
if embedding is not None:
assert (
embedding_size is None or embedding_size == embedding.weight.shape[1]
), "Embedding dim must match the embedding size."
if embedding is not None:
self.embeddings = embedding
else:
assert False
assert padding_idx is not None
self.embeddings = nn.Embedding(
vocabulary_size, embedding_size, padding_idx=padding_idx
)
nn.init.normal_(self.embeddings.weight, 0, embedding_size ** -0.5)
# create the positional embeddings
self.position_embeddings = nn.Embedding(n_positions, embedding_size)
if not learn_positional_embeddings:
create_position_codes(
n_positions, embedding_size, out=self.position_embeddings.weight
)
else:
nn.init.normal_(self.position_embeddings.weight, 0, embedding_size ** -0.5)
# build the model
self.layers = nn.ModuleList()
for _ in range(self.n_layers):
self.layers.append(TransformerEncoderLayer(
n_heads, embedding_size, ffn_size,
attention_dropout=attention_dropout,
relu_dropout=relu_dropout,
dropout=dropout,
))
[docs] def forward(self, input):
"""
input data is a FloatTensor of shape [batch, seq_len, dim]
mask is a ByteTensor of shape [batch, seq_len], filled with 1 when
inside the sequence and 0 outside.
"""
mask = input != self.padding_idx
positions = (mask.cumsum(dim=1, dtype=torch.int64) - 1).clamp_(min=0)
tensor = self.embeddings(input)
if self.embeddings_scale:
tensor = tensor * np.sqrt(self.dim)
tensor = tensor + self.position_embeddings(positions).expand_as(tensor)
# --dropout on the embeddings
tensor = self.dropout(tensor)
tensor *= mask.unsqueeze(-1).type_as(tensor)
for i in range(self.n_layers):
tensor = self.layers[i](tensor, mask)
if self.reduction:
divisor = mask.type_as(tensor).sum(dim=1).unsqueeze(-1).clamp(min=1e-7)
output = tensor.sum(dim=1) / divisor
return output
else:
output = tensor
return output, mask
[docs]class TransformerEncoder_mask(nn.Module):
"""
Transformer encoder module.
:param int n_heads: the number of multihead attention heads.
:param int n_layers: number of transformer layers.
:param int embedding_size: the embedding sizes. Must be a multiple of n_heads.
:param int ffn_size: the size of the hidden layer in the FFN
:param embedding: an embedding matrix for the bottom layer of the transformer.
If none, one is created for this encoder.
:param float dropout: Dropout used around embeddings and before layer
layer normalizations. This is used in Vaswani 2017 and works well on
large datasets.
:param float attention_dropout: Dropout performed after the multhead attention
softmax. This is not used in Vaswani 2017.
:param float relu_attention: Dropout used after the ReLU in the FFN. Not used
in Vaswani 2017, but used in Tensor2Tensor.
:param int padding_idx: Reserved padding index in the embeddings matrix.
:param bool learn_positional_embeddings: If off, sinusoidal embeddings are
used. If on, position embeddings are learned from scratch.
:param bool embeddings_scale: Scale embeddings relative to their dimensionality.
Found useful in fairseq.
:param bool reduction: If true, returns the mean vector for the entire encoding
sequence.
:param int n_positions: Size of the position embeddings matrix.
"""
[docs] def __init__(
self,
n_heads,
n_layers,
embedding_size,
ffn_size,
vocabulary_size,
embedding=None,
dropout=0.0,
attention_dropout=0.0,
relu_dropout=0.0,
padding_idx=0,
learn_positional_embeddings=False,
embeddings_scale=False,
reduction=True,
n_positions=1024
):
super(TransformerEncoder_mask, self).__init__()
self.embedding_size = embedding_size
self.ffn_size = ffn_size
self.n_layers = n_layers
self.n_heads = n_heads
self.dim = embedding_size
self.embeddings_scale = embeddings_scale
self.reduction = reduction
self.padding_idx = padding_idx
# this is --dropout, not --relu-dropout or --attention-dropout
self.dropout = nn.Dropout(p=dropout)
self.out_dim = embedding_size
assert embedding_size % n_heads == 0, \
'Transformer embedding size must be a multiple of n_heads'
# check input formats:
if embedding is not None:
assert (
embedding_size is None or embedding_size == embedding.weight.shape[1]
), "Embedding dim must match the embedding size."
if embedding is not None:
self.embeddings = embedding
else:
assert False
assert padding_idx is not None
self.embeddings = nn.Embedding(
vocabulary_size, embedding_size, padding_idx=padding_idx
)
nn.init.normal_(self.embeddings.weight, 0, embedding_size ** -0.5)
# create the positional embeddings
self.position_embeddings = nn.Embedding(n_positions, embedding_size)
if not learn_positional_embeddings:
create_position_codes(
n_positions, embedding_size, out=self.position_embeddings.weight
)
else:
nn.init.normal_(self.position_embeddings.weight, 0, embedding_size ** -0.5)
# build the model
self.layers = nn.ModuleList()
for _ in range(self.n_layers):
self.layers.append(TransformerEncoderLayer(
n_heads, embedding_size+128, ffn_size+128,
attention_dropout=attention_dropout,
relu_dropout=relu_dropout,
dropout=dropout,
))
[docs] def forward(self, input, m_emb):
"""
input data is a FloatTensor of shape [batch, seq_len, dim]
mask is a ByteTensor of shape [batch, seq_len], filled with 1 when
inside the sequence and 0 outside.
"""
mask = input != self.padding_idx
positions = (mask.cumsum(dim=1, dtype=torch.int64) - 1).clamp_(min=0)
tensor = self.embeddings(input)
if self.embeddings_scale:
tensor = tensor * np.sqrt(self.dim)
p_length=tensor.size()[1]
tensor = tensor + self.position_embeddings(positions).expand_as(tensor)
tensor=torch.cat([tensor,m_emb.unsqueeze(1).repeat(1,p_length,1)],dim=-1)
# --dropout on the embeddings
tensor = self.dropout(tensor)
tensor *= mask.unsqueeze(-1).type_as(tensor)
for i in range(self.n_layers):
tensor = self.layers[i](tensor, mask)
if self.reduction:
divisor = mask.type_as(tensor).sum(dim=1).unsqueeze(-1).clamp(min=1e-7)
output = tensor.sum(dim=1) / divisor
return output
else:
output = tensor
return output, mask
[docs]class TransformerDecoderLayer(nn.Module):
[docs] def __init__(
self,
n_heads,
embedding_size,
ffn_size,
attention_dropout=0.0,
relu_dropout=0.0,
dropout=0.0,
):
super().__init__()
self.dim = embedding_size
self.ffn_dim = ffn_size
self.dropout = nn.Dropout(p=dropout)
self.self_attention = MultiHeadAttention(
n_heads, embedding_size, dropout=attention_dropout
)
self.norm1 = nn.LayerNorm(embedding_size)
self.encoder_attention = MultiHeadAttention(
n_heads, embedding_size, dropout=attention_dropout
)
self.norm2 = nn.LayerNorm(embedding_size)
self.ffn = TransformerFFN(embedding_size, ffn_size, relu_dropout=relu_dropout)
self.norm3 = nn.LayerNorm(embedding_size)
[docs] def forward(self, x, encoder_output, encoder_mask):
decoder_mask = self._create_selfattn_mask(x)
# first self attn
residual = x
# don't peak into the future!
x = self.self_attention(query=x, mask=decoder_mask)
x = self.dropout(x) # --dropout
x = x + residual
x = _normalize(x, self.norm1)
residual = x
x = self.encoder_attention(
query=x,
key=encoder_output,
value=encoder_output,
mask=encoder_mask
)
x = self.dropout(x) # --dropout
x = residual + x
x = _normalize(x, self.norm2)
# finally the ffn
residual = x
x = self.ffn(x)
x = self.dropout(x) # --dropout
x = residual + x
x = _normalize(x, self.norm3)
return x
def _create_selfattn_mask(self, x):
# figure out how many timestamps we need
bsz = x.size(0)
time = x.size(1)
# make sure that we don't look into the future
mask = torch.tril(x.new(time, time).fill_(1))
# broadcast across batch
mask = mask.unsqueeze(0).expand(bsz, -1, -1)
return mask
[docs]class TransformerDecoder(nn.Module):
"""
Transformer Decoder layer.
:param int n_heads: the number of multihead attention heads.
:param int n_layers: number of transformer layers.
:param int embedding_size: the embedding sizes. Must be a multiple of n_heads.
:param int ffn_size: the size of the hidden layer in the FFN
:param embedding: an embedding matrix for the bottom layer of the transformer.
If none, one is created for this encoder.
:param float dropout: Dropout used around embeddings and before layer
layer normalizations. This is used in Vaswani 2017 and works well on
large datasets.
:param float attention_dropout: Dropout performed after the multhead attention
softmax. This is not used in Vaswani 2017.
:param float relu_attention: Dropout used after the ReLU in the FFN. Not used
in Vaswani 2017, but used in Tensor2Tensor.
:param int padding_idx: Reserved padding index in the embeddings matrix.
:param bool learn_positional_embeddings: If off, sinusoidal embeddings are
used. If on, position embeddings are learned from scratch.
:param bool embeddings_scale: Scale embeddings relative to their dimensionality.
Found useful in fairseq.
:param int n_positions: Size of the position embeddings matrix.
"""
[docs] def __init__(
self,
n_heads,
n_layers,
embedding_size,
ffn_size,
vocabulary_size,
embedding=None,
dropout=0.0,
attention_dropout=0.0,
relu_dropout=0.0,
embeddings_scale=True,
learn_positional_embeddings=False,
padding_idx=None,
n_positions=1024,
):
super().__init__()
self.embedding_size = embedding_size
self.ffn_size = ffn_size
self.n_layers = n_layers
self.n_heads = n_heads
self.dim = embedding_size
self.embeddings_scale = embeddings_scale
self.dropout = nn.Dropout(p=dropout) # --dropout
self.out_dim = embedding_size
assert embedding_size % n_heads == 0, \
'Transformer embedding size must be a multiple of n_heads'
self.embeddings = embedding
# create the positional embeddings
self.position_embeddings = nn.Embedding(n_positions, embedding_size)
if not learn_positional_embeddings:
create_position_codes(
n_positions, embedding_size, out=self.position_embeddings.weight
)
else:
nn.init.normal_(self.position_embeddings.weight, 0, embedding_size ** -0.5)
# build the model
self.layers = nn.ModuleList()
for _ in range(self.n_layers):
self.layers.append(TransformerDecoderLayer(
n_heads, embedding_size, ffn_size,
attention_dropout=attention_dropout,
relu_dropout=relu_dropout,
dropout=dropout,
))
[docs] def forward(self, input, encoder_state, incr_state=None):
encoder_output, encoder_mask = encoder_state
seq_len = input.size(1)
positions = input.new(seq_len).long()
positions = torch.arange(seq_len, out=positions).unsqueeze(0)
tensor = self.embeddings(input)
if self.embeddings_scale:
tensor = tensor * np.sqrt(self.dim)
tensor = tensor + self.position_embeddings(positions).expand_as(tensor)
tensor = self.dropout(tensor) # --dropout
for layer in self.layers:
tensor = layer(tensor, encoder_output, encoder_mask)
return tensor, None
[docs]class TransformerDecoderLayerKG(nn.Module):
[docs] def __init__(
self,
n_heads,
embedding_size,
ffn_size,
attention_dropout=0.0,
relu_dropout=0.0,
dropout=0.0,
):
super().__init__()
self.dim = embedding_size
self.ffn_dim = ffn_size
self.dropout = nn.Dropout(p=dropout)
self.self_attention = MultiHeadAttention(
n_heads, embedding_size, dropout=attention_dropout
)
self.norm1 = nn.LayerNorm(embedding_size)
self.encoder_attention = MultiHeadAttention(
n_heads, embedding_size, dropout=attention_dropout
)
self.norm2 = nn.LayerNorm(embedding_size)
self.encoder_db_attention = MultiHeadAttention(
n_heads, embedding_size, dropout=attention_dropout
)
self.norm2_db = nn.LayerNorm(embedding_size)
self.encoder_kg_attention = MultiHeadAttention(
n_heads, embedding_size, dropout=attention_dropout
)
self.norm2_kg = nn.LayerNorm(embedding_size)
self.ffn = TransformerFFN(embedding_size, ffn_size, relu_dropout=relu_dropout)
self.norm3 = nn.LayerNorm(embedding_size)
[docs] def forward(self, x, encoder_output, encoder_mask, kg_encoder_output, kg_encoder_mask, db_encoder_output, db_encoder_mask):
decoder_mask = self._create_selfattn_mask(x)
# first self attn
residual = x
# don't peak into the future!
x = self.self_attention(query=x, mask=decoder_mask)
x = self.dropout(x) # --dropout
x = x + residual
x = _normalize(x, self.norm1)
residual = x
x = self.encoder_db_attention(
query=x,
key=db_encoder_output,
value=db_encoder_output,
mask=db_encoder_mask
)
x = self.dropout(x) # --dropout
x = residual + x
x = _normalize(x, self.norm2_db)
residual = x
x = self.encoder_kg_attention(
query=x,
key=kg_encoder_output,
value=kg_encoder_output,
mask=kg_encoder_mask
)
x = self.dropout(x) # --dropout
x = residual + x
x = _normalize(x, self.norm2_kg)
residual = x
x = self.encoder_attention(
query=x,
key=encoder_output,
value=encoder_output,
mask=encoder_mask
)
x = self.dropout(x) # --dropout
x = residual + x
x = _normalize(x, self.norm2)
# finally the ffn
residual = x
x = self.ffn(x)
x = self.dropout(x) # --dropout
x = residual + x
x = _normalize(x, self.norm3)
return x
def _create_selfattn_mask(self, x):
# figure out how many timestamps we need
bsz = x.size(0)
time = x.size(1)
# make sure that we don't look into the future
mask = torch.tril(x.new(time, time).fill_(1))
# broadcast across batch
mask = mask.unsqueeze(0).expand(bsz, -1, -1)
return mask
class TransformerDecoderKG(nn.Module):
"""
Transformer Decoder layer.
:param int n_heads: the number of multihead attention heads.
:param int n_layers: number of transformer layers.
:param int embedding_size: the embedding sizes. Must be a multiple of n_heads.
:param int ffn_size: the size of the hidden layer in the FFN
:param embedding: an embedding matrix for the bottom layer of the transformer.
If none, one is created for this encoder.
:param float dropout: Dropout used around embeddings and before layer
layer normalizations. This is used in Vaswani 2017 and works well on
large datasets.
:param float attention_dropout: Dropout performed after the multhead attention
softmax. This is not used in Vaswani 2017.
:param float relu_attention: Dropout used after the ReLU in the FFN. Not used
in Vaswani 2017, but used in Tensor2Tensor.
:param int padding_idx: Reserved padding index in the embeddings matrix.
:param bool learn_positional_embeddings: If off, sinusoidal embeddings are
used. If on, position embeddings are learned from scratch.
:param bool embeddings_scale: Scale embeddings relative to their dimensionality.
Found useful in fairseq.
:param int n_positions: Size of the position embeddings matrix.
"""
def __init__(
self,
n_heads,
n_layers,
embedding_size,
ffn_size,
vocabulary_size,
embedding=None,
dropout=0.0,
attention_dropout=0.0,
relu_dropout=0.0,
embeddings_scale=True,
learn_positional_embeddings=False,
padding_idx=None,
n_positions=1024,
):
super().__init__()
self.embedding_size = embedding_size
self.ffn_size = ffn_size
self.n_layers = n_layers
self.n_heads = n_heads
self.dim = embedding_size
self.embeddings_scale = embeddings_scale
self.dropout = nn.Dropout(p=dropout) # --dropout
self.out_dim = embedding_size
assert embedding_size % n_heads == 0, \
'Transformer embedding size must be a multiple of n_heads'
self.embeddings = embedding
# create the positional embeddings
self.position_embeddings = nn.Embedding(n_positions, embedding_size)
if not learn_positional_embeddings:
create_position_codes(
n_positions, embedding_size, out=self.position_embeddings.weight
)
else:
nn.init.normal_(self.position_embeddings.weight, 0, embedding_size ** -0.5)
# build the model
self.layers = nn.ModuleList()
for _ in range(self.n_layers):
self.layers.append(TransformerDecoderLayerKG(
n_heads, embedding_size, ffn_size,
attention_dropout=attention_dropout,
relu_dropout=relu_dropout,
dropout=dropout,
))
def forward(self, input, encoder_state, encoder_kg_state, encoder_db_state, incr_state=None):
encoder_output, encoder_mask = encoder_state
kg_encoder_output, kg_encoder_mask = encoder_kg_state
db_encoder_output, db_encoder_mask = encoder_db_state
seq_len = input.size(1)
positions = input.new(seq_len).long()
positions = torch.arange(seq_len, out=positions).unsqueeze(0)
tensor = self.embeddings(input)
if self.embeddings_scale:
tensor = tensor * np.sqrt(self.dim)
tensor = tensor + self.position_embeddings(positions).expand_as(tensor)
tensor = self.dropout(tensor) # --dropout
for layer in self.layers:
tensor = layer(tensor, encoder_output, encoder_mask, kg_encoder_output, kg_encoder_mask, db_encoder_output, db_encoder_mask)
return tensor, None
[docs]class TransformerMemNetModel(nn.Module):
"""Model which takes context, memories, candidates and encodes them"""
[docs] def __init__(self, opt, dictionary):
super().__init__()
self.opt = opt
self.pad_idx = dictionary[dictionary.null_token]
# set up embeddings
self.embeddings = _create_embeddings(
dictionary, opt['embedding_size'], self.pad_idx
)
if not opt.get('learn_embeddings'):
self.embeddings.weight.requires_grad = False
if opt.get('n_positions'):
# if the number of positions is explicitly provided, use that
n_positions = opt['n_positions']
else:
# else, use the worst case from truncate
n_positions = max(
opt.get('truncate') or 0,
opt.get('text_truncate') or 0,
opt.get('label_truncate') or 0
)
if n_positions == 0:
# default to 1024
n_positions = 1024
if n_positions < 0:
raise ValueError('n_positions must be positive')
self.context_encoder = _build_encoder(
opt, dictionary, self.embeddings, self.pad_idx,
n_positions=n_positions,
)
if opt.get('share_encoders'):
self.cand_encoder = TransformerResponseWrapper(
self.context_encoder, self.context_encoder.out_dim,
)
else:
self.cand_encoder = _build_encoder(
opt, dictionary, self.embeddings, self.pad_idx, reduction=True,
n_positions=n_positions,
)
# build memory encoder
if opt.get('wrap_memory_encoder', False):
self.memory_transformer = TransformerResponseWrapper(
self.context_encoder, self.context_encoder.out_dim
)
else:
self.memory_transformer = self.context_encoder
self.attender = BasicAttention(dim=2, attn=opt['memory_attention'])
def encode_cand(self, words):
if words is None:
return None
# flatten if there are many candidates
if words.dim() == 3:
oldshape = words.shape
words = words.reshape(oldshape[0] * oldshape[1], oldshape[2])
else:
oldshape = None
encoded = self.cand_encoder(words)
if oldshape is not None:
encoded = encoded.reshape(oldshape[0], oldshape[1], -1)
return encoded
def encode_context_memory(self, context_w, memories_w):
# [batch, d]
if context_w is None:
# it's possible that only candidates were passed into the
# forward function, return None here for LHS representation
return None, None
context_h = self.context_encoder(context_w)
if memories_w is None:
return [], context_h
bsz = memories_w.size(0)
memories_w = memories_w.view(-1, memories_w.size(-1))
memories_h = self.memory_transformer(memories_w)
memories_h = memories_h.view(bsz, -1, memories_h.size(-1))
context_h = context_h.unsqueeze(1)
context_h, weights = self.attender(context_h, memories_h)
return weights, context_h
[docs] def forward(self, xs, mems, cands):
weights, context_h = self.encode_context_memory(xs, mems)
cands_h = self.encode_cand(cands)
if self.opt['normalize_sent_emb']:
context_h = context_h / context_h.norm(2, dim=1, keepdim=True)
cands_h = cands_h / cands_h.norm(2, dim=1, keepdim=True)
return context_h, cands_h
[docs]class TorchGeneratorModel(nn.Module):
"""
This Interface expects you to implement model with the following reqs:
:attribute model.encoder:
takes input returns tuple (enc_out, enc_hidden, attn_mask)
:attribute model.decoder:
takes decoder params and returns decoder outputs after attn
:attribute model.output:
takes decoder outputs and returns distr over dictionary
"""
[docs] def __init__(
self,
padding_idx=0,
start_idx=1,
end_idx=2,
unknown_idx=3,
input_dropout=0,
longest_label=1,
):
super().__init__()
self.NULL_IDX = padding_idx
self.END_IDX = end_idx
self.register_buffer('START', torch.LongTensor([start_idx]))
self.longest_label = longest_label
[docs] def _starts(self, bsz):
"""Return bsz start tokens."""
return self.START.detach().expand(bsz, 1)
[docs] def decode_greedy(self, encoder_states, bsz, maxlen):
"""
Greedy search
:param int bsz:
Batch size. Because encoder_states is model-specific, it cannot
infer this automatically.
:param encoder_states:
Output of the encoder model.
:type encoder_states:
Model specific
:param int maxlen:
Maximum decoding length
:return:
pair (logits, choices) of the greedy decode
:rtype:
(FloatTensor[bsz, maxlen, vocab], LongTensor[bsz, maxlen])
"""
xs = self._starts(bsz)
incr_state = None
logits = []
for i in range(maxlen):
# todo, break early if all beams saw EOS
scores, incr_state = self.decoder(xs, encoder_states, incr_state)
scores = scores[:, -1:, :]
scores = self.output(scores)
_, preds = scores.max(dim=-1)
logits.append(scores)
xs = torch.cat([xs, preds], dim=1)
# check if everyone has generated an end token
all_finished = ((xs == self.END_IDX).sum(dim=1) > 0).sum().item() == bsz
if all_finished:
break
logits = torch.cat(logits, 1)
return logits, xs
[docs] def decode_forced(self, encoder_states, ys):
"""
Decode with a fixed, true sequence, computing loss. Useful for
training, or ranking fixed candidates.
:param ys:
the prediction targets. Contains both the start and end tokens.
:type ys:
LongTensor[bsz, time]
:param encoder_states:
Output of the encoder. Model specific types.
:type encoder_states:
model specific
:return:
pair (logits, choices) containing the logits and MLE predictions
:rtype:
(FloatTensor[bsz, ys, vocab], LongTensor[bsz, ys])
"""
bsz = ys.size(0)
seqlen = ys.size(1)
inputs = ys.narrow(1, 0, seqlen - 1)
inputs = torch.cat([self._starts(bsz), inputs], 1)
latent, _ = self.decoder(inputs, encoder_states)
logits = self.output(latent)
_, preds = logits.max(dim=2)
return logits, preds
[docs] def reorder_encoder_states(self, encoder_states, indices):
"""
Reorder encoder states according to a new set of indices.
This is an abstract method, and *must* be implemented by the user.
Its purpose is to provide beam search with a model-agnostic interface for
beam search. For example, this method is used to sort hypotheses,
expand beams, etc.
For example, assume that encoder_states is an bsz x 1 tensor of values
.. code-block:: python
indices = [0, 2, 2]
encoder_states = [[0.1]
[0.2]
[0.3]]
then the output will be
.. code-block:: python
output = [[0.1]
[0.3]
[0.3]]
:param encoder_states:
output from encoder. type is model specific.
:type encoder_states:
model specific
:param indices:
the indices to select over. The user must support non-tensor
inputs.
:type indices: list[int]
:return:
The re-ordered encoder states. It should be of the same type as
encoder states, and it must be a valid input to the decoder.
:rtype:
model specific
"""
raise NotImplementedError(
"reorder_encoder_states must be implemented by the model"
)
[docs] def reorder_decoder_incremental_state(self, incremental_state, inds):
"""
Reorder incremental state for the decoder.
Used to expand selected beams in beam_search. Unlike reorder_encoder_states,
implementing this method is optional. However, without incremental decoding,
decoding a single beam becomes O(n^2) instead of O(n), which can make
beam search impractically slow.
In order to fall back to non-incremental decoding, just return None from this
method.
:param incremental_state:
second output of model.decoder
:type incremental_state:
model specific
:param inds:
indices to select and reorder over.
:type inds:
LongTensor[n]
:return:
The re-ordered decoder incremental states. It should be the same
type as incremental_state, and usable as an input to the decoder.
This method should return None if the model does not support
incremental decoding.
:rtype:
model specific
"""
raise NotImplementedError(
"reorder_decoder_incremental_state must be implemented by model"
)
[docs] def forward(self, *xs, ys=None, cand_params=None, prev_enc=None, maxlen=None,
bsz=None):
"""
Get output predictions from the model.
:param xs:
input to the encoder
:type xs:
LongTensor[bsz, seqlen]
:param ys:
Expected output from the decoder. Used
for teacher forcing to calculate loss.
:type ys:
LongTensor[bsz, outlen]
:param prev_enc:
if you know you'll pass in the same xs multiple times, you can pass
in the encoder output from the last forward pass to skip
recalcuating the same encoder output.
:param maxlen:
max number of tokens to decode. if not set, will use the length of
the longest label this model has seen. ignored when ys is not None.
:param bsz:
if ys is not provided, then you must specify the bsz for greedy
decoding.
:return:
(scores, candidate_scores, encoder_states) tuple
- scores contains the model's predicted token scores.
(FloatTensor[bsz, seqlen, num_features])
- candidate_scores are the score the model assigned to each candidate.
(FloatTensor[bsz, num_cands])
- encoder_states are the output of model.encoder. Model specific types.
Feed this back in to skip encoding on the next call.
"""
if ys is not None:
# TODO: get rid of longest_label
# keep track of longest label we've ever seen
# we'll never produce longer ones than that during prediction
self.longest_label = max(self.longest_label, ys.size(1))
# use cached encoding if available
encoder_states = prev_enc if prev_enc is not None else self.encoder(*xs)
if ys is not None:
# use teacher forcing
scores, preds = self.decode_forced(encoder_states, ys)
else:
scores, preds = self.decode_greedy(
encoder_states,
bsz,
maxlen or self.longest_label
)
return scores, preds, encoder_states