ium_478839/ml_pytorch.py

192 lines
4.4 KiB
Python

#!/usr/bin/env python
# coding: utf-8
# In[233]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import pandas as pd
import numpy as np
import random
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, TensorDataset, random_split
from sklearn import preprocessing
class Model(nn.Module):
def __init__(self):
super().__init__()
# self.fc1 = nn.Linear(2, 60)
# self.fc2 = nn.Linear(60, 30)
# self.out = nn.Linear(30, 1)
self.linear = nn.Linear(2, 616)
def forward(self, x):
out = torch.sigmoid(self.linear(x))
return out
def training_step(self, batch):
inputs, targets = batch
# Generate predictions
out = self(inputs)
# Calcuate loss
loss = F.l1_loss(out,targets)
return loss
def validation_step(self, batch):
inputs, targets = batch
# Generate predictions
out = self(inputs)
# Calculate loss
loss = F.l1_loss(out,targets)
return {'val_loss': loss.detach()}
def validation_epoch_end(self, outputs):
batch_losses = [x['val_loss'] for x in outputs]
epoch_loss = torch.stack(batch_losses).mean()
return {'val_loss': epoch_loss.item()}
def epoch_end(self, epoch, result, num_epochs):
# Print result every 100th epoch
if (epoch+1) % 100 == 0 or epoch == num_epochs-1:
print("Epoch [{}], val_loss: {:.4f}".format(epoch+1, result['val_loss']))
# In[234]:
data = pd.read_csv('understat.csv')
# In[235]:
training_data = data.sample(frac=0.9, random_state=25)
testing_data = data.drop(training_data.index)
# In[236]:
train_set = training_data[['matches', 'wins', 'position']]
test_set = testing_data[['matches', 'wins', 'position']]
# In[237]:
# Zamiana danych na tensory
X_train = train_set[['matches', 'wins']].to_numpy()
X_test = test_set[['matches', 'wins']].to_numpy()
y_train = train_set['position'].to_numpy()
y_test = test_set['position'].to_numpy()
X_train = torch.FloatTensor(X_train)
X_test = torch.FloatTensor(X_test)
y_train = torch.LongTensor(y_train)
y_test = torch.LongTensor(y_test)
# In[238]:
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)
# In[239]:
batch_size=50
train_loader = DataLoader(train_dataset, batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size)
# In[240]:
# Hiperparametry
model = Model()
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
# In[241]:
def evaluate(model, val_loader):
outputs = [model.validation_step(batch) for batch in val_loader]
return model.validation_epoch_end(outputs)
def fit(epochs, lr, model, train_loader, val_loader, opt_func=torch.optim.SGD):
history = []
optimizer = opt_func(model.parameters(), lr)
for epoch in range(epochs):
for batch in train_loader:
loss = model.training_step(batch)
loss.backward()
optimizer.step()
optimizer.zero_grad()
result = evaluate(model, val_loader)
model.epoch_end(epoch, result, epochs)
history.append(result)
return history
# In[242]:
epochs = 1000
def print_(loss):
print ("The loss calculated: ", loss)
# In[243]:
for epoch in range(1, epochs+1):
y_pred = model(X_train)
loss = loss_fn(y_pred, y_train)
if epoch%100 == 0:
print ("Epoch #",epoch)
print_(loss.item())
# Zero gradients
optimizer.zero_grad()
loss.backward() # Gradients
optimizer.step() # Update
# In[244]:
def predict_single(input, target, model):
inputs = input.unsqueeze(0)
predictions = model(inputs)
prediction = predictions[0].detach()
return "Target: "+str(target)+"----- Prediction: "+str(prediction)+"\n"
# In[245]:
for i in random.sample(range(0, len(test_dataset)), 10):
input_, target = test_dataset[i]
print(predict_single(input_, target, model),end="")
# In[246]:
with open("result.txt", "w+") as file:
for i in range(0, len(test_dataset), 1):
input_, target = test_dataset[i]
file.write(str(predict_single(input_, target, model)))