from urllib.parse import urlparse import mlflow import mlflow.pytorch as model_logger import argparse import pandas as pd import numpy as np from sklearn.metrics import mean_squared_error, mean_absolute_error import torch from torch import nn from torch.utils import data as t_u_data # mlflow.set_tracking_uri("http://localhost:5000") mlflow.set_tracking_uri("http://172.17.0.1:5000") mlflow.set_experiment("s478841") # * Customized Dataset class (base provided by PyTorch) class AvocadoDataset(t_u_data.Dataset): def __init__(self, path: str, target: str = 'AveragePrice'): data = pd.read_csv(path) y = data[target].values.astype('float32') self.y = y.reshape((len(y), 1)) self.x_data = data.drop( [target], axis=1).values.astype('float32') self.x_shape = data.drop([target], axis=1).shape # print("Data shape is: ", self.x_data.shape) def __len__(self): return len(self.x_data) def __getitem__(self, idx): return [self.x_data[idx], self.y[idx]] def get_shape(self): return self.x_shape def get_splits(self, n_test=0.33): test_size = round(n_test * len(self.x_data)) train_size = len(self.x_data) - test_size return t_u_data.random_split(self, [train_size, test_size]) class AvocadoRegressor(nn.Module): def __init__(self, input_dim): super(AvocadoRegressor, self).__init__() self.hidden1 = nn.Linear(input_dim, 32) nn.init.xavier_uniform_(self.hidden1.weight) self.act1 = nn.ReLU() self.hidden2 = nn.Linear(32, 8) nn.init.xavier_uniform_(self.hidden2.weight) self.act2 = nn.ReLU() self.hidden3 = nn.Linear(8, 1) nn.init.xavier_uniform_(self.hidden3.weight) def forward(self, x): x = self.hidden1(x) x = self.act1(x) x = self.hidden2(x) x = self.act2(x) x = self.hidden3(x) return x def prepare_data(paths): train_dl = t_u_data.DataLoader(AvocadoDataset( paths[0]), batch_size=32, shuffle=True) validate_dl = t_u_data.DataLoader(AvocadoDataset( paths[1]), batch_size=128, shuffle=True) test_dl = t_u_data.DataLoader(AvocadoDataset( paths[2]), batch_size=1, shuffle=False) return train_dl, validate_dl, test_dl def train_model(train_dl, model, epochs, log_step): criterion = nn.MSELoss() optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.9) to_compare = None metrics = None for epoch in range(1, epochs+1): for _, (inputs, targets) in enumerate(train_dl): optimizer.zero_grad() yhat = model(inputs) # * For loss value inspection to_compare = (yhat, targets) loss = criterion(yhat, targets) loss.backward() optimizer.step() if epoch == 1 or (epoch) % log_step == 0: result, target = to_compare[0].detach( ).numpy(), to_compare[1].detach().numpy() metrics = {'train.mse': mean_squared_error(target, result), 'train.mae': mean_absolute_error(target, result), 'train.rmse': mean_squared_error(target, result, squared=False)} # _run.log_scalar("training.RMSE", np.sqrt(mse), epoch) # _run.log_scalar("training.MAE", mae, epoch) # _run.log_scalar('training.MSE', mse, epoch) print( f"Epoch {epoch}\t→\tMSE: {metrics['train.mse']},\tRMSE: {metrics['train.rmse']},\tMAE: {metrics['train.mae']}") return metrics def evaluate_model(test_dl, model): predictions, actuals = list(), list() for _, (inputs, targets) in enumerate(test_dl): yhat = model(inputs) # * retrieve numpy array yhat = yhat.detach().numpy() actual = targets.numpy() actual = actual.reshape((len(actual), 1)) # * store predictions predictions.append(yhat) actuals.append(actual) predictions, actuals = np.vstack(predictions), np.vstack(actuals) # * return MSE value mse = mean_squared_error(actuals, predictions) rmse = mean_squared_error(actuals, predictions, squared=False) mae = mean_absolute_error(actuals, predictions) return mse, rmse, mae def predict(row, model): row = row[0].flatten() yhat = model(row) yhat = yhat.detach().numpy() return yhat def main(epochs, save_model, log_step): print( f"Your model will be trained for {epochs} epochs, logging every {log_step} steps. Trained model will {'not ' if save_model else ''}be saved.") # * Paths to data avocado_data = ['./data/avocado.data.train', './data/avocado.data.valid', './data/avocado.data.test'] # * Data preparation train_dl, validate_dl, test_dl = prepare_data(paths=avocado_data) print(f""" Train set size: {len(train_dl.dataset)}, Validate set size: {len(validate_dl.dataset)} Test set size: {len(test_dl.dataset)} """) # * Model definition # ! 66 - in case only regions and type are used (among all the categorical vals) model = AvocadoRegressor(235) # * Train model print("Let's start the training, mate!") with mlflow.start_run() as run: print("MLflow run experiment_id: {0}".format(run.info.experiment_id)) print("MLflow run artifact_uri: {0}".format(run.info.artifact_uri)) metrics = train_model(train_dl=train_dl, model=model, epochs=epochs, log_step=log_step) mlflow.log_param('epochs', epochs) mlflow.log_metrics(metrics) # * Evaluate model val_metrics = {key: val for key, val in zip( ['validate.mse', 'validate.rmse', 'validate.mae'], evaluate_model(validate_dl, model))} print( f"\nEvaluation on VALIDATION set\t→\tMSE: {val_metrics['validate.mse']}, RMSE: {val_metrics['validate.rmse']}, MAE: {val_metrics['validate.mae']}") mlflow.log_metrics(val_metrics) test_loss = {key: val for key, val in zip( ['test.mse', 'test.rmse', 'test.mae'], evaluate_model(test_dl, model))} print( f"\nEvaluation on TEST set\t→\tMSE: {test_loss['test.mse']}, RMSE: {test_loss['test.rmse']}, MAE: {test_loss['test.mae']}") mlflow.log_metrics(test_loss) # tracking_url_type_store = urlparse(mlflow.get_tracking_uri()).scheme # if tracking_url_type_store != 'file': # print('First option') # model_logger.log_model( # model, "avocados-model", registered_model_name="AvocadoModel_478841") # else: # print('Second option') # model_logger.log_model(model, "model") # * Save the trained model if save_model: print("Your model has been saved - have a nice day!") scripted_model = torch.jit.script(model) scripted_model.save('./data/model_scripted.pt') # ex.add_artifact('./data/model_scripted.pt') # ex.run() if __name__ == '__main__': # * Model parameters parser = argparse.ArgumentParser(description="Script performing logistic regression model training", formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument( "-e", "--epochs", default=100, help="Number of epochs the model will be trained for") parser.add_argument( "-s", "--step", default=10, help="Number of steps to repeat logging loss values on") parser.add_argument("--save", action="store_true", help="Save trained model to file 'trained_model.h5'") args = vars(parser.parse_args()) epochs = int(args['epochs']) save_model = args['save'] log_step = int(args['step']) main(epochs, save_model, log_step)