diff --git a/jenkins/training.Jenkinsfile b/jenkins/training.Jenkinsfile index a7d9659..d8567f5 100644 --- a/jenkins/training.Jenkinsfile +++ b/jenkins/training.Jenkinsfile @@ -36,6 +36,7 @@ pipeline { steps { sh "chmod +x -R ${env.WORKSPACE}" sh 'python3 scripts/sacred_train.py -e $epochs -s $step $save_model' + sh 'python3 scripts/mlflow_train.py -e $epochs -s $step $save_model' } } stage('Archive artifacts') { diff --git a/scripts/mlflow_train.py b/scripts/mlflow_train.py new file mode 100644 index 0000000..bd0b9e7 --- /dev/null +++ b/scripts/mlflow_train.py @@ -0,0 +1,211 @@ +from urllib.parse import urlparse +import mlflow +import mlflow.pytorch as model_logger + +import argparse +import pandas as pd +import numpy as np +from sklearn.metrics import mean_squared_error, mean_absolute_error + +import torch +from torch import nn +from torch.utils import data as t_u_data + + +mlflow.set_tracking_uri("http://localhost:5000") +mlflow.set_experiment("s478841") + + +# * Customized Dataset class (base provided by PyTorch) +class AvocadoDataset(t_u_data.Dataset): + def __init__(self, path: str, target: str = 'AveragePrice'): + data = pd.read_csv(path) + y = data[target].values.astype('float32') + self.y = y.reshape((len(y), 1)) + self.x_data = data.drop( + [target], axis=1).values.astype('float32') + self.x_shape = data.drop([target], axis=1).shape + # print("Data shape is: ", self.x_data.shape) + + def __len__(self): + return len(self.x_data) + + def __getitem__(self, idx): + return [self.x_data[idx], self.y[idx]] + + def get_shape(self): + return self.x_shape + + def get_splits(self, n_test=0.33): + test_size = round(n_test * len(self.x_data)) + train_size = len(self.x_data) - test_size + return t_u_data.random_split(self, [train_size, test_size]) + + +class AvocadoRegressor(nn.Module): + def __init__(self, input_dim): + super(AvocadoRegressor, self).__init__() + self.hidden1 = nn.Linear(input_dim, 32) + nn.init.xavier_uniform_(self.hidden1.weight) + self.act1 = nn.ReLU() + self.hidden2 = nn.Linear(32, 8) + nn.init.xavier_uniform_(self.hidden2.weight) + self.act2 = nn.ReLU() + self.hidden3 = nn.Linear(8, 1) + nn.init.xavier_uniform_(self.hidden3.weight) + + def forward(self, x): + x = self.hidden1(x) + x = self.act1(x) + x = self.hidden2(x) + x = self.act2(x) + x = self.hidden3(x) + return x + + +def prepare_data(paths): + train_dl = t_u_data.DataLoader(AvocadoDataset( + paths[0]), batch_size=32, shuffle=True) + validate_dl = t_u_data.DataLoader(AvocadoDataset( + paths[1]), batch_size=128, shuffle=True) + test_dl = t_u_data.DataLoader(AvocadoDataset( + paths[2]), batch_size=1, shuffle=False) + return train_dl, validate_dl, test_dl + + +def train_model(train_dl, model, epochs, log_step): + criterion = nn.MSELoss() + optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.9) + to_compare = None + metrics = None + for epoch in range(1, epochs+1): + for _, (inputs, targets) in enumerate(train_dl): + optimizer.zero_grad() + yhat = model(inputs) + # * For loss value inspection + to_compare = (yhat, targets) + loss = criterion(yhat, targets) + loss.backward() + optimizer.step() + if epoch == 1 or (epoch) % log_step == 0: + result, target = to_compare[0].detach( + ).numpy(), to_compare[1].detach().numpy() + metrics = {'train.mse': mean_squared_error(target, result), + 'train.mae': mean_absolute_error(target, result), + 'train.rmse': mean_squared_error(target, result, squared=False)} + # _run.log_scalar("training.RMSE", np.sqrt(mse), epoch) + # _run.log_scalar("training.MAE", mae, epoch) + # _run.log_scalar('training.MSE', mse, epoch) + print( + f"Epoch {epoch}\t→\tMSE: {metrics['train.mse']},\tRMSE: {metrics['train.rmse']},\tMAE: {metrics['train.mae']}") + return metrics + + +def evaluate_model(test_dl, model): + predictions, actuals = list(), list() + for _, (inputs, targets) in enumerate(test_dl): + yhat = model(inputs) + # * retrieve numpy array + yhat = yhat.detach().numpy() + actual = targets.numpy() + actual = actual.reshape((len(actual), 1)) + # * store predictions + predictions.append(yhat) + actuals.append(actual) + predictions, actuals = np.vstack(predictions), np.vstack(actuals) + # * return MSE value + mse = mean_squared_error(actuals, predictions) + rmse = mean_squared_error(actuals, predictions, squared=False) + mae = mean_absolute_error(actuals, predictions) + return mse, rmse, mae + + +def predict(row, model): + row = row[0].flatten() + yhat = model(row) + yhat = yhat.detach().numpy() + return yhat + + +def main(epochs, save_model, log_step): + print( + f"Your model will be trained for {epochs} epochs, logging every {log_step} steps. Trained model will {'not ' if save_model else ''}be saved.") + + # * Paths to data + avocado_data = ['./data/avocado.data.train', + './data/avocado.data.valid', + './data/avocado.data.test'] + + # * Data preparation + train_dl, validate_dl, test_dl = prepare_data(paths=avocado_data) + print(f""" + Train set size: {len(train_dl.dataset)}, + Validate set size: {len(validate_dl.dataset)} + Test set size: {len(test_dl.dataset)} + """) + + # * Model definition + # ! 66 - in case only regions and type are used (among all the categorical vals) + model = AvocadoRegressor(235) + + # * Train model + print("Let's start the training, mate!") + with mlflow.start_run() as run: + print("MLflow run experiment_id: {0}".format(run.info.experiment_id)) + print("MLflow run artifact_uri: {0}".format(run.info.artifact_uri)) + metrics = train_model(train_dl=train_dl, model=model, + epochs=epochs, log_step=log_step) + mlflow.log_param('epochs', epochs) + mlflow.log_metrics(metrics) + + # * Evaluate model + val_metrics = {key: val for key, val in zip( + ['validate.mse', 'validate.rmse', 'validate.mae'], evaluate_model(validate_dl, model))} + print( + f"\nEvaluation on VALIDATION set\t→\tMSE: {val_metrics['validate.mse']}, RMSE: {val_metrics['validate.rmse']}, MAE: {val_metrics['validate.mae']}") + mlflow.log_metrics(val_metrics) + + test_loss = {key: val for key, val in zip( + ['test.mse', 'test.rmse', 'test.mae'], evaluate_model(test_dl, model))} + print( + f"\nEvaluation on TEST set\t→\tMSE: {test_loss['test.mse']}, RMSE: {test_loss['test.rmse']}, MAE: {test_loss['test.mae']}") + mlflow.log_metrics(test_loss) + + # tracking_url_type_store = urlparse(mlflow.get_tracking_uri()).scheme + + # if tracking_url_type_store != 'file': + # print('First option') + # model_logger.log_model( + # model, "avocados-model", registered_model_name="AvocadoModel_478841") + # else: + # print('Second option') + # model_logger.log_model(model, "model") + + + # * Save the trained model + if save_model: + print("Your model has been saved - have a nice day!") + scripted_model = torch.jit.script(model) + scripted_model.save('./data/model_scripted.pt') + # ex.add_artifact('./data/model_scripted.pt') + + +# ex.run() +if __name__ == '__main__': + # * Model parameters + parser = argparse.ArgumentParser(description="Script performing logistic regression model training", + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument( + "-e", "--epochs", default=100, help="Number of epochs the model will be trained for") + parser.add_argument( + "-s", "--step", default=10, help="Number of steps to repeat logging loss values on") + parser.add_argument("--save", action="store_true", + help="Save trained model to file 'trained_model.h5'") + + args = vars(parser.parse_args()) + + epochs = int(args['epochs']) + save_model = args['save'] + log_step = int(args['step']) + + main(epochs, save_model, log_step) diff --git a/scripts/sacred_train.py b/scripts/sacred_train.py index d2c25b5..b8d81f5 100644 --- a/scripts/sacred_train.py +++ b/scripts/sacred_train.py @@ -1,7 +1,3 @@ -from audioop import rms -from cgi import test -from multiprocessing.spawn import prepare -from xml.etree.ElementPath import prepare_star from sacred import Experiment from sacred.observers import FileStorageObserver, MongoObserver