16 KiB
16 KiB
! pip install bpe
import torch
import re
import random
import pandas
import numpy
from torch.autograd import Variable
import torch.nn as nn
import time
import math
from torch import optim
import torch.nn.functional as F
from bpe import Encoder
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
SOS_token = 0
EOS_token = -1
class EncoderRNN(nn.Module):
def __init__(self, input_size, hidden_size):
super(EncoderRNN, self).__init__()
self.hidden_size = hidden_size
self.embedding = nn.Embedding(input_size, hidden_size)
self.gru = nn.GRU(hidden_size, hidden_size)
def forward(self, input, hidden):
embedded = self.embedding(input).view(1, 1, -1)
output = embedded
output, hidden = self.gru(output, hidden)
return output, hidden
def initHidden(self):
return torch.zeros(1, 1, self.hidden_size, device=device)
class DecoderRNN(nn.Module):
def __init__(self, hidden_size, output_size):
super(DecoderRNN, self).__init__()
self.hidden_size = hidden_size
self.embedding = nn.Embedding(output_size, hidden_size)
self.gru = nn.GRU(hidden_size, hidden_size)
self.out = nn.Linear(hidden_size, output_size)
self.softmax = nn.LogSoftmax(dim=1)
def forward(self, input, hidden):
output = self.embedding(input).view(1, 1, -1)
output = F.relu(output)
output, hidden = self.gru(output, hidden)
output = self.softmax(self.out(output[0]))
return output, hidden
def initHidden(self):
return torch.zeros(1, 1, self.hidden_size, device=device)
vocab_size = 1500
bpe_encoder_pl = Encoder(vocab_size=vocab_size, pct_bpe=0.5)
bpe_encoder_en = Encoder(vocab_size=vocab_size, pct_bpe=0.5)
MAX_LENGTH = 80
def filter_pair(p):
return len(p[0]) < MAX_LENGTH and \
len(p[1]) < MAX_LENGTH and \
len(p[0]) > 0 and \
len(p[1]) > 0
def filter_pairs(pairs):
return [pair for pair in pairs if filter_pair(pair)]
def normalize_string(s):
s = s.lower().strip()
s = re.sub(r"([.!?~])", r" \1", s)
return s
def sentence_to_codes(s, bpe_coder):
s = normalize_string(s)
#s += " ___"
c = next(bpe_coder.transform([s]))
#c.append(EOS_token)
return c
def read_langs(in_f, exp_f, lines=150):
print("Reading lines...")
# Read the file and split into lines
linesIn = open(in_f).read().strip().split('\n')[:lines]
linesOut = open(exp_f).read().strip().split('\n')[:lines]
#for i, (line_in, line_out) in enumerate(zip(linesIn, linesOut)):
# linesIn[i] += normalize_string(line_in)
# linesOut[i] += normalize_string(line_out) + " ~"
bpe_encoder_pl.fit(linesIn)
bpe_encoder_en.fit(linesOut)
# Split every line into pairs and normalize
pairs = [[sentence_to_codes(a, bpe_encoder_pl),sentence_to_codes(b, bpe_encoder_en)] for a,b in zip(linesIn,linesOut)]
pairs = filter_pairs(pairs)
print("Pairs created")
return pairs
code_pairs = read_langs('train/in.tsv', 'train/expected.tsv', 2500)
#code_pairs[0]
#bpe_encoder_en.bpe_vocab
teacher_forcing_ratio = 0.95
def train(input_tensor, target_tensor, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion, max_length=MAX_LENGTH):
encoder_hidden = encoder.initHidden()
encoder_optimizer.zero_grad()
decoder_optimizer.zero_grad()
input_length = input_tensor.size(0)
target_length = target_tensor.size(0)
encoder_outputs = torch.zeros(max_length, encoder.hidden_size, device=device)
loss = 0
for ei in range(input_length):
encoder_output, encoder_hidden = encoder(
input_tensor[ei], encoder_hidden)
encoder_outputs[ei] = encoder_output[0, 0]
decoder_input = torch.tensor([[SOS_token]], device=device)
decoder_hidden = encoder_hidden
use_teacher_forcing = True if random.random() < teacher_forcing_ratio else False
last = 500
if use_teacher_forcing:
# Teacher forcing: Feed the target as the next input
for di in range(target_length):
decoder_output, decoder_hidden = decoder(
decoder_input, decoder_hidden)
loss += criterion(decoder_output, target_tensor[di])
decoder_input = target_tensor[di] # Teacher forcing
else:
# Without teacher forcing: use its own predictions as the next input
for di in range(target_length):
decoder_output, decoder_hidden = decoder(
decoder_input, decoder_hidden)
topv, topi = decoder_output.topk(1)
decoder_input = topi.squeeze().detach() # detach from history as input
loss += criterion(decoder_output, target_tensor[di])
#if decoder_input.item() == EOS_token:
# break
#print(loss)
try:
loss.backward()
except AttributeError:
print(f"loss: {loss}")
print(f"input_tensor: {input_tensor}")
print(f"target_tensor: {target_tensor}")
encoder_optimizer.step()
decoder_optimizer.step()
return loss.item() / target_length
def list_to_tensor(l):
return torch.tensor(l, dtype=torch.long, device=device).view(-1, 1)
def pairs_to_tensor(pair):
in_tensor = list_to_tensor(pair[0])
out_tensor = list_to_tensor(pair[1])
return (in_tensor, out_tensor)
def asMinutes(s):
m = math.floor(s / 60)
s -= m * 60
return '%dm %ds' % (m, s)
def timeSince(since, percent):
now = time.time()
s = now - since
es = s / (percent)
rs = es - s
return '%s (- %s)' % (asMinutes(s), asMinutes(rs))
def trainIters(encoder, decoder, n_iters, print_every=1000, learning_rate=0.01):
start = time.time()
plot_losses = []
print_loss_total = 0 # Reset every print_every
plot_loss_total = 0 # Reset every plot_every
encoder_optimizer = optim.SGD(encoder.parameters(), lr=learning_rate)
decoder_optimizer = optim.SGD(decoder.parameters(), lr=learning_rate)
training_pairs = [pairs_to_tensor(random.choice(code_pairs))
for i in range(n_iters)]
criterion = nn.NLLLoss()
for iter in range(1, n_iters + 1):
training_pair = training_pairs[iter - 1]
input_tensor = training_pair[0]
target_tensor = training_pair[1]
loss = train(input_tensor, target_tensor, encoder,
decoder, encoder_optimizer, decoder_optimizer, criterion)
print_loss_total += loss
plot_loss_total += loss
if iter % print_every == 0:
print_loss_avg = print_loss_total / print_every
print_loss_total = 0
print('%s (%d %d%%) %.4f' % (timeSince(start, iter / n_iters),
iter, iter / n_iters * 100, print_loss_avg))
hidden_size = 256
encoder1 = EncoderRNN(vocab_size, hidden_size).to(device)
decoder1 = DecoderRNN(hidden_size, vocab_size).to(device)
trainIters(encoder1, decoder1, 35000, print_every=5)
def evaluate(encoder, decoder, sentence, max_length=MAX_LENGTH):
with torch.no_grad():
#a = sentence_to_codes(sentence, bpe_encoder_pl)
#input_tensor = tensorFromSentence(input_lang, sentence)
input_tensor = list_to_tensor(sentence_to_codes(sentence, bpe_encoder_pl))
input_length = input_tensor.size()[0]
encoder_hidden = encoder.initHidden()
encoder_outputs = torch.zeros(max_length, encoder.hidden_size, device=device)
for ei in range(input_length):
encoder_output, encoder_hidden = encoder(input_tensor[ei],
encoder_hidden)
encoder_outputs[ei] += encoder_output[0, 0]
decoder_input = torch.tensor([[SOS_token]], device=device) # SOS
decoder_hidden = encoder_hidden
decoded_words = []
eow_token = 501
last_word = -1
for di in range(max_length):
decoder_output, decoder_hidden = decoder(
decoder_input, decoder_hidden)
topv, topi = decoder_output.data.topk(1)
if topi.item() == last_word and topi.item() == eow_token:
# decoded_words.append('<EOS>')
break
else:
decoded_words.append(topi.item())
last_word = topi.item()
decoder_input = topi.squeeze().detach()
decoded_tokens = bpe_encoder_en.inverse_transform([decoded_words])
return decoded_tokens
def evaluateAndShow(input_sentence):
output_words = evaluate(
encoder1, decoder1, input_sentence)
return next(output_words)
temp = open('test-A/in.tsv', 'r').readlines()
data = []
for sent in temp:
data.append(sent.replace('\n',''))
f=open('test-A/out.tsv','w')
for sent in data:
f.write(evaluateAndShow(sent) + '\n')
f.close()
temp = open('dev-0/in.tsv', 'r').readlines()
data = []
for sent in temp:
data.append(sent.replace('\n',''))
f=open('dev-0/out.tsv','w')
for sent in data:
f.write(evaluateAndShow(sent) + '\n')
f.close()