16 KiB
16 KiB
import torch
import jovian
import torchvision
import matplotlib
import torch.nn as nn
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import torch.nn.functional as F
from torchvision.datasets.utils import download_url
from torch.utils.data import DataLoader, TensorDataset, random_split
import random
import os
import sys
dataframe_raw = pd.read_csv("winequality-red.csv")
dataframe_raw.head()
fixed acidity | volatile acidity | citric acid | residual sugar | chlorides | free sulfur dioxide | total sulfur dioxide | density | pH | sulphates | alcohol | quality | |
---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 7.4 | 0.70 | 0.00 | 1.9 | 0.076 | 11.0 | 34.0 | 0.9978 | 3.51 | 0.56 | 9.4 | 5 |
1 | 7.8 | 0.88 | 0.00 | 2.6 | 0.098 | 25.0 | 67.0 | 0.9968 | 3.20 | 0.68 | 9.8 | 5 |
2 | 7.8 | 0.76 | 0.04 | 2.3 | 0.092 | 15.0 | 54.0 | 0.9970 | 3.26 | 0.65 | 9.8 | 5 |
3 | 11.2 | 0.28 | 0.56 | 1.9 | 0.075 | 17.0 | 60.0 | 0.9980 | 3.16 | 0.58 | 9.8 | 6 |
4 | 7.4 | 0.70 | 0.00 | 1.9 | 0.076 | 11.0 | 34.0 | 0.9978 | 3.51 | 0.56 | 9.4 | 5 |
input_cols=list(dataframe_raw.columns)[:-1]
output_cols = ['quality']
input_cols,output_cols
(['fixed acidity', 'volatile acidity', 'citric acid', 'residual sugar', 'chlorides', 'free sulfur dioxide', 'total sulfur dioxide', 'density', 'pH', 'sulphates', 'alcohol'], ['quality'])
def dataframe_to_arrays(dataframe):
dataframe1 = dataframe_raw.copy(deep=True)
inputs_array = dataframe1[input_cols].to_numpy()
targets_array = dataframe1[output_cols].to_numpy()
return inputs_array, targets_array
inputs_array, targets_array = dataframe_to_arrays(dataframe_raw)
inputs_array, targets_array
(array([[ 7.4 , 0.7 , 0. , ..., 3.51 , 0.56 , 9.4 ], [ 7.8 , 0.88 , 0. , ..., 3.2 , 0.68 , 9.8 ], [ 7.8 , 0.76 , 0.04 , ..., 3.26 , 0.65 , 9.8 ], ..., [ 6.3 , 0.51 , 0.13 , ..., 3.42 , 0.75 , 11. ], [ 5.9 , 0.645, 0.12 , ..., 3.57 , 0.71 , 10.2 ], [ 6. , 0.31 , 0.47 , ..., 3.39 , 0.66 , 11. ]]), array([[5], [5], [5], ..., [6], [5], [6]], dtype=int64))
inputs = torch.from_numpy(inputs_array).type(torch.float)
targets = torch.from_numpy(targets_array).type(torch.float)
inputs,targets
(tensor([[ 7.4000, 0.7000, 0.0000, ..., 3.5100, 0.5600, 9.4000], [ 7.8000, 0.8800, 0.0000, ..., 3.2000, 0.6800, 9.8000], [ 7.8000, 0.7600, 0.0400, ..., 3.2600, 0.6500, 9.8000], ..., [ 6.3000, 0.5100, 0.1300, ..., 3.4200, 0.7500, 11.0000], [ 5.9000, 0.6450, 0.1200, ..., 3.5700, 0.7100, 10.2000], [ 6.0000, 0.3100, 0.4700, ..., 3.3900, 0.6600, 11.0000]]), tensor([[5.], [5.], [5.], ..., [6.], [5.], [6.]]))
dataset = TensorDataset(inputs, targets)
dataset
<torch.utils.data.dataset.TensorDataset at 0x1f334183760>
train_ds, val_ds = random_split(dataset, [1300, 299])
batch_size=50
train_loader = DataLoader(train_ds, batch_size, shuffle=True)
val_loader = DataLoader(val_ds, batch_size)
class WineQuality(nn.Module):
def __init__(self):
super().__init__()
self.linear = nn.Linear(input_size,output_size)
def forward(self, xb):
out = self.linear(xb)
return out
def training_step(self, batch):
inputs, targets = batch
# Generate predictions
out = self(inputs)
# Calcuate loss
loss = F.l1_loss(out,targets)
return loss
def validation_step(self, batch):
inputs, targets = batch
# Generate predictions
out = self(inputs)
# Calculate loss
loss = F.l1_loss(out,targets)
return {'val_loss': loss.detach()}
def validation_epoch_end(self, outputs):
batch_losses = [x['val_loss'] for x in outputs]
epoch_loss = torch.stack(batch_losses).mean()
return {'val_loss': epoch_loss.item()}
def epoch_end(self, epoch, result, num_epochs):
# Print result every 100th epoch
if (epoch+1) % 100 == 0 or epoch == num_epochs-1:
print("Epoch [{}], val_loss: {:.4f}".format(epoch+1, result['val_loss']))
input_size = len(input_cols)
output_size = len(output_cols)
model=WineQuality()
def evaluate(model, val_loader):
outputs = [model.validation_step(batch) for batch in val_loader]
return model.validation_epoch_end(outputs)
def fit(epochs, lr, model, train_loader, val_loader, opt_func=torch.optim.SGD):
history = []
optimizer = opt_func(model.parameters(), lr)
for epoch in range(epochs):
for batch in train_loader:
loss = model.training_step(batch)
loss.backward()
optimizer.step()
optimizer.zero_grad()
result = evaluate(model, val_loader)
model.epoch_end(epoch, result, epochs)
history.append(result)
return history
epochs = 1500
lr = 1e-6
history5 = fit(epochs, lr, model, train_loader, val_loader)
Epoch [100], val_loss: 4.1732 Epoch [200], val_loss: 1.6444 Epoch [300], val_loss: 1.4860 Epoch [400], val_loss: 1.4119 Epoch [500], val_loss: 1.3407 Epoch [600], val_loss: 1.2709 Epoch [700], val_loss: 1.2045 Epoch [800], val_loss: 1.1401 Epoch [900], val_loss: 1.0783 Epoch [1000], val_loss: 1.0213 Epoch [1100], val_loss: 0.9678 Epoch [1200], val_loss: 0.9186 Epoch [1300], val_loss: 0.8729 Epoch [1400], val_loss: 0.8320 Epoch [1500], val_loss: 0.7959
def predict_single(input, target, model):
inputs = input.unsqueeze(0)
predictions = model(inputs)
prediction = predictions[0].detach()
return "Target: "+str(target)+"----- Prediction: "+str(prediction)+"\n"
#wylosuj 10 próbek predykcji
for i in random.sample(range(0, len(val_ds)), 10):
input_, target = val_ds[i]
print(predict_single(input_, target, model),end="")
Target: tensor([5.])----- Prediction: tensor([4.9765]) Target: tensor([5.])----- Prediction: tensor([6.6649]) Target: tensor([5.])----- Prediction: tensor([5.2627]) Target: tensor([7.])----- Prediction: tensor([5.7054]) Target: tensor([5.])----- Prediction: tensor([5.1168]) Target: tensor([7.])----- Prediction: tensor([5.3928]) Target: tensor([5.])----- Prediction: tensor([4.8501]) Target: tensor([4.])----- Prediction: tensor([5.4210]) Target: tensor([5.])----- Prediction: tensor([4.6719]) Target: tensor([5.])----- Prediction: tensor([7.8635])
with open("result.txt", "w+") as file:
for i in range(0, len(val_ds), 1):
input_, target = val_ds[i]
file.write(str(predict_single(input_, target, model)))