Source code for recwizard.modules.kgsf.graph_utils

import math

import networkx as nx
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
[docs]def kaiming_reset_parameters(linear_module): nn.init.kaiming_uniform_(linear_module.weight, a=math.sqrt(5)) if linear_module.bias is not None: fan_in, _ = nn.init._calculate_fan_in_and_fan_out(linear_module.weight) bound = 1 / math.sqrt(fan_in) nn.init.uniform_(linear_module.bias, -bound, bound)
[docs]class GraphConvolution(nn.Module): """ Simple GCN layer, similar to https://arxiv.org/abs/1609.02907 """
[docs] def __init__(self, in_features, out_features, bias=True): super(GraphConvolution, self).__init__() self.in_features = in_features self.out_features = out_features self.weight = nn.Parameter(torch.FloatTensor(in_features, out_features)) if bias: self.bias = nn.Parameter(torch.FloatTensor(out_features)) else: self.register_parameter('bias', None) self.reset_parameters()
def reset_parameters(self): # stdv = 1. / math.sqrt(self.weight.size(1)) # self.weight.data.uniform_(-stdv, stdv) # if self.bias is not None: # self.bias.data.uniform_(-stdv, stdv) kaiming_reset_parameters(self)
[docs] def forward(self, input, adj): support = torch.mm(input, self.weight) output = torch.spmm(adj, support) if self.bias is not None: return output + self.bias else: return output
[docs] def __repr__(self): return self.__class__.__name__ + ' (' \ + str(self.in_features) + ' -> ' \ + str(self.out_features) + ')'
[docs]class GCN(nn.Module):
[docs] def __init__(self, ninp, nhid, dropout=0.5): super(GCN, self).__init__() # self.gc1 = GraphConvolution(ninp, nhid) self.gc2 = GraphConvolution(ninp, nhid) self.dropout = dropout
[docs] def forward(self, x, adj): """x: shape (|V|, |D|); adj: shape(|V|, |V|)""" # x = F.relu(self.gc1(x, adj)) # x = F.dropout(x, self.dropout, training=self.training) x = self.gc2(x, adj) return x
# return F.log_softmax(x, dim=1)
[docs]class GraphAttentionLayer(nn.Module): """ Simple GAT layer, similar to https://arxiv.org/abs/1710.10903 """
[docs] def __init__(self, in_features, out_features, dropout, alpha, concat=True): super(GraphAttentionLayer, self).__init__() self.dropout = dropout self.in_features = in_features self.out_features = out_features self.alpha = alpha self.concat = concat self.W = nn.Parameter(torch.zeros(size=(in_features, out_features))) nn.init.xavier_uniform_(self.W.data, gain=1.414) self.a = nn.Parameter(torch.zeros(size=(2*out_features, 1))) nn.init.xavier_uniform_(self.a.data, gain=1.414) self.leakyrelu = nn.LeakyReLU(self.alpha)
[docs] def forward(self, input, adj): h = torch.mm(input, self.W) N = h.size()[0] a_input = torch.cat([h.repeat(1, N).view(N * N, -1), h.repeat(N, 1)], dim=1).view(N, -1, 2 * self.out_features) e = self.leakyrelu(torch.matmul(a_input, self.a).squeeze(2)) zero_vec = -9e15*torch.ones_like(e) attention = torch.where(adj > 0, e, zero_vec) attention = F.softmax(attention, dim=1) attention = F.dropout(attention, self.dropout, training=self.training) h_prime = torch.matmul(attention, h) if self.concat: return F.elu(h_prime) else: return h_prime
[docs] def __repr__(self): return self.__class__.__name__ + ' (' + str(self.in_features) + ' -> ' + str(self.out_features) + ')'
[docs]class SelfAttentionLayer(nn.Module):
[docs] def __init__(self, dim, da, alpha=0.2, dropout=0.5): super(SelfAttentionLayer, self).__init__() self.dim = dim self.da = da self.alpha = alpha self.dropout = dropout # self.a = nn.Parameter(torch.zeros(size=(2*self.dim, 1))) # nn.init.xavier_uniform_(self.a.data, gain=1.414) self.a = nn.Parameter(torch.zeros(size=(self.dim, self.da))) self.b = nn.Parameter(torch.zeros(size=(self.da, 1))) nn.init.xavier_uniform_(self.a.data, gain=1.414) nn.init.xavier_uniform_(self.b.data, gain=1.414)
# self.leakyrelu = nn.LeakyReLU(self.alpha)
[docs] def forward(self, h): N = h.shape[0] assert self.dim == h.shape[1] # a_input = torch.cat([h.repeat(1, N).view(N * N, -1), h.repeat(N, 1)], dim=1).view(N, -1, 2 * self.dim) # e = self.leakyrelu(torch.matmul(a_input, self.a).squeeze(2)) # attention = F.softmax(e, dim=1) e = torch.matmul(torch.tanh(torch.matmul(h, self.a)), self.b).squeeze(dim=1) attention = F.softmax(e) # attention = F.dropout(attention, self.dropout, training=self.training) return torch.matmul(attention, h)
[docs]class SelfAttentionLayer_batch(nn.Module):
[docs] def __init__(self, dim, da, alpha=0.2, dropout=0.5): super(SelfAttentionLayer_batch, self).__init__() self.dim = dim self.da = da self.alpha = alpha self.dropout = dropout # self.a = nn.Parameter(torch.zeros(size=(2*self.dim, 1))) # nn.init.xavier_uniform_(self.a.data, gain=1.414) self.a = nn.Parameter(torch.zeros(size=(self.dim, self.da))) self.b = nn.Parameter(torch.zeros(size=(self.da, 1))) nn.init.xavier_uniform_(self.a.data, gain=1.414) nn.init.xavier_uniform_(self.b.data, gain=1.414)
# self.leakyrelu = nn.LeakyReLU(self.alpha)
[docs] def forward(self, h, mask): N = h.shape[0] assert self.dim == h.shape[2] # a_input = torch.cat([h.repeat(1, N).view(N * N, -1), h.repeat(N, 1)], dim=1).view(N, -1, 2 * self.dim) # e = self.leakyrelu(torch.matmul(a_input, self.a).squeeze(2)) # attention = F.softmax(e, dim=1) mask=1e-30*mask.float() e = torch.matmul(torch.tanh(torch.matmul(h, self.a)), self.b) #print(e.size()) #print(mask.size()) attention = F.softmax(e+mask.unsqueeze(-1),dim=1) # attention = F.dropout(attention, self.dropout, training=self.training) return torch.matmul(torch.transpose(attention,1,2), h).squeeze(1),attention
[docs]class SelfAttentionLayer2(nn.Module):
[docs] def __init__(self, dim, da): super(SelfAttentionLayer2, self).__init__() self.dim = dim self.Wq = nn.Parameter(torch.zeros(self.dim, self.dim)) self.Wk = nn.Parameter(torch.zeros(self.dim, self.dim)) nn.init.xavier_uniform_(self.Wq.data, gain=1.414) nn.init.xavier_uniform_(self.Wk.data, gain=1.414)
# self.leakyrelu = nn.LeakyReLU(self.alpha)
[docs] def forward(self, h): N = h.shape[0] assert self.dim == h.shape[1] q = torch.matmul(h, self.Wq) k = torch.matmul(h, self.Wk) e = torch.matmul(q, k.t()) / math.sqrt(self.dim) attention = F.softmax(e, dim=1) attention = attention.mean(dim=0) x = torch.matmul(attention, h) return x
[docs]class BiAttention(nn.Module):
[docs] def __init__(self, input_size, dropout): super().__init__() self.dropout = nn.Dropout(p=dropout) self.input_linear = nn.Linear(input_size, 1, bias=False) self.memory_linear = nn.Linear(input_size, 1, bias=False) self.dot_scale = nn.Parameter(torch.Tensor(input_size).uniform_(1.0 / (input_size ** 0.5))) def forward(self, input, memory, mask=None): bsz, input_len, memory_len = input.size(0), input.size(1), memory.size(1) input = self.dropout(input) memory = self.dropout(memory) input_dot = self.input_linear(input) memory_dot = self.memory_linear(memory).view(bsz, 1, memory_len) cross_dot = torch.bmm(input * self.dot_scale, memory.permute(0, 2, 1).contiguous()) att = input_dot + memory_dot + cross_dot if mask is not None: att = att - 1e30 * (1 - mask[:,None]) weight_one = F.softmax(att, dim=-1) output_one = torch.bmm(weight_one, memory) weight_two = F.softmax(att.max(dim=-1)[0], dim=-1).view(bsz, 1, input_len) output_two = torch.bmm(weight_two, input) return torch.cat([input, output_one, input*output_one, output_two*output_one], dim=-1)
[docs]class GAT(nn.Module):
[docs] def __init__(self, nfeat, nhid, nclass, dropout, alpha, nheads): """Dense version of GAT.""" super(GAT, self).__init__() self.dropout = dropout self.attentions = [GraphAttentionLayer(nfeat, nhid, dropout=dropout, alpha=alpha, concat=True) for _ in range(nheads)] for i, attention in enumerate(self.attentions): self.add_module('attention_{}'.format(i), attention) self.out_att = GraphAttentionLayer(nhid * nheads, nclass, dropout=dropout, alpha=alpha, concat=False)
[docs] def forward(self, x, adj): x = F.dropout(x, self.dropout, training=self.training) x = torch.cat([att(x, adj) for att in self.attentions], dim=1) x = F.dropout(x, self.dropout, training=self.training) x = F.elu(self.out_att(x, adj)) return F.log_softmax(x, dim=1)
[docs]class SpecialSpmmFunction(torch.autograd.Function): """Special function for only sparse region backpropataion layer."""
[docs] @staticmethod def forward(ctx, indices, values, shape, b): assert indices.requires_grad == False a = torch.sparse_coo_tensor(indices, values, shape) ctx.save_for_backward(a, b) ctx.N = shape[0] return torch.matmul(a, b)
[docs] @staticmethod def backward(ctx, grad_output): a, b = ctx.saved_tensors grad_values = grad_b = None if ctx.needs_input_grad[1]: grad_a_dense = grad_output.matmul(b.t()) edge_idx = a._indices()[0, :] * ctx.N + a._indices()[1, :] grad_values = grad_a_dense.view(-1)[edge_idx] if ctx.needs_input_grad[3]: grad_b = a.t().matmul(grad_output) return None, grad_values, None, grad_b
[docs]class SpecialSpmm(nn.Module):
[docs] def forward(self, indices, values, shape, b): return SpecialSpmmFunction.apply(indices, values, shape, b)
[docs]class SpGraphAttentionLayer(nn.Module): """ Sparse version GAT layer, similar to https://arxiv.org/abs/1710.10903 """
[docs] def __init__(self, in_features, out_features, dropout, alpha, concat=True): super(SpGraphAttentionLayer, self).__init__() self.in_features = in_features self.out_features = out_features self.alpha = alpha self.concat = concat self.W = nn.Parameter(torch.zeros(size=(in_features, out_features))) nn.init.xavier_normal_(self.W.data, gain=1.414) # self.a = nn.Parameter(torch.zeros(size=(1, 2*out_features))) self.a = nn.Parameter(torch.zeros(size=(1, out_features))) nn.init.xavier_normal_(self.a.data, gain=1.414) # self.dropout = nn.Dropout(dropout) self.leakyrelu = nn.LeakyReLU(self.alpha) self.special_spmm = SpecialSpmm()
[docs] def forward(self, input, adj): N = input.size()[0] # edge = adj.nonzero().t() edge = adj._indices() h = torch.mm(input, self.W) # h: N x out assert not torch.isnan(h).any() # Self-attention on the nodes - Shared attention mechanism # edge_h = torch.cat((h[edge[0, :], :], h[edge[1, :], :]), dim=1).t() edge_h = h[edge[1, :], :].t() # edge: 2*D x E edge_e = torch.exp(-self.leakyrelu(self.a.mm(edge_h).squeeze())) assert not torch.isnan(edge_e).any() # edge_e: E e_rowsum = self.special_spmm(edge, edge_e, torch.Size([N, N]), torch.ones(size=(N,1)).to(device)) # e_rowsum: N x 1 # edge_e = self.dropout(edge_e) # edge_e: E h_prime = self.special_spmm(edge, edge_e, torch.Size([N, N]), h) assert not torch.isnan(h_prime).any() # h_prime: N x out h_prime = h_prime.div(e_rowsum) # h_prime: N x out assert not torch.isnan(h_prime).any() if self.concat: # if this layer is not last layer, return F.elu(h_prime) else: # if this layer is last layer, return h_prime
[docs] def __repr__(self): return self.__class__.__name__ + ' (' + str(self.in_features) + ' -> ' + str(self.out_features) + ')'
[docs]class SpGAT(nn.Module):
[docs] def __init__(self, nfeat, nhid, nclass, dropout, alpha, nheads): """Sparse version of GAT.""" super(SpGAT, self).__init__() self.dropout = dropout self.out_att = SpGraphAttentionLayer(nhid, nclass, dropout=dropout, alpha=alpha, concat=False)
[docs] def forward(self, x, adj): x = self.out_att(x, adj) return x
[docs]def _add_neighbors(kg, g, seed_set, hop): tails_of_last_hop = seed_set for h in range(hop): next_tails_of_last_hop = [] for entity in tails_of_last_hop: if entity not in kg: continue for tail_and_relation in kg[entity]: g.add_edge(entity, tail_and_relation[1]) if entity != tail_and_relation[1]: next_tails_of_last_hop.append(tail_and_relation[1]) tails_of_last_hop = next_tails_of_last_hop
# http://dbpedia.org/ontology/director