MLFlow logging

This commit is contained in:
MatOgr 2022-05-15 10:54:30 +02:00
parent 84e4b8bccc
commit 97bdf3be9b
3 changed files with 212 additions and 4 deletions

View File

@ -36,6 +36,7 @@ pipeline {
steps {
sh "chmod +x -R ${env.WORKSPACE}"
sh 'python3 scripts/sacred_train.py -e $epochs -s $step $save_model'
sh 'python3 scripts/mlflow_train.py -e $epochs -s $step $save_model'
}
}
stage('Archive artifacts') {

211
scripts/mlflow_train.py Normal file
View File

@ -0,0 +1,211 @@
from urllib.parse import urlparse
import mlflow
import mlflow.pytorch as model_logger
import argparse
import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error
import torch
from torch import nn
from torch.utils import data as t_u_data
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("s478841")
# * Customized Dataset class (base provided by PyTorch)
class AvocadoDataset(t_u_data.Dataset):
def __init__(self, path: str, target: str = 'AveragePrice'):
data = pd.read_csv(path)
y = data[target].values.astype('float32')
self.y = y.reshape((len(y), 1))
self.x_data = data.drop(
[target], axis=1).values.astype('float32')
self.x_shape = data.drop([target], axis=1).shape
# print("Data shape is: ", self.x_data.shape)
def __len__(self):
return len(self.x_data)
def __getitem__(self, idx):
return [self.x_data[idx], self.y[idx]]
def get_shape(self):
return self.x_shape
def get_splits(self, n_test=0.33):
test_size = round(n_test * len(self.x_data))
train_size = len(self.x_data) - test_size
return t_u_data.random_split(self, [train_size, test_size])
class AvocadoRegressor(nn.Module):
def __init__(self, input_dim):
super(AvocadoRegressor, self).__init__()
self.hidden1 = nn.Linear(input_dim, 32)
nn.init.xavier_uniform_(self.hidden1.weight)
self.act1 = nn.ReLU()
self.hidden2 = nn.Linear(32, 8)
nn.init.xavier_uniform_(self.hidden2.weight)
self.act2 = nn.ReLU()
self.hidden3 = nn.Linear(8, 1)
nn.init.xavier_uniform_(self.hidden3.weight)
def forward(self, x):
x = self.hidden1(x)
x = self.act1(x)
x = self.hidden2(x)
x = self.act2(x)
x = self.hidden3(x)
return x
def prepare_data(paths):
train_dl = t_u_data.DataLoader(AvocadoDataset(
paths[0]), batch_size=32, shuffle=True)
validate_dl = t_u_data.DataLoader(AvocadoDataset(
paths[1]), batch_size=128, shuffle=True)
test_dl = t_u_data.DataLoader(AvocadoDataset(
paths[2]), batch_size=1, shuffle=False)
return train_dl, validate_dl, test_dl
def train_model(train_dl, model, epochs, log_step):
criterion = nn.MSELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
to_compare = None
metrics = None
for epoch in range(1, epochs+1):
for _, (inputs, targets) in enumerate(train_dl):
optimizer.zero_grad()
yhat = model(inputs)
# * For loss value inspection
to_compare = (yhat, targets)
loss = criterion(yhat, targets)
loss.backward()
optimizer.step()
if epoch == 1 or (epoch) % log_step == 0:
result, target = to_compare[0].detach(
).numpy(), to_compare[1].detach().numpy()
metrics = {'train.mse': mean_squared_error(target, result),
'train.mae': mean_absolute_error(target, result),
'train.rmse': mean_squared_error(target, result, squared=False)}
# _run.log_scalar("training.RMSE", np.sqrt(mse), epoch)
# _run.log_scalar("training.MAE", mae, epoch)
# _run.log_scalar('training.MSE', mse, epoch)
print(
f"Epoch {epoch}\t\tMSE: {metrics['train.mse']},\tRMSE: {metrics['train.rmse']},\tMAE: {metrics['train.mae']}")
return metrics
def evaluate_model(test_dl, model):
predictions, actuals = list(), list()
for _, (inputs, targets) in enumerate(test_dl):
yhat = model(inputs)
# * retrieve numpy array
yhat = yhat.detach().numpy()
actual = targets.numpy()
actual = actual.reshape((len(actual), 1))
# * store predictions
predictions.append(yhat)
actuals.append(actual)
predictions, actuals = np.vstack(predictions), np.vstack(actuals)
# * return MSE value
mse = mean_squared_error(actuals, predictions)
rmse = mean_squared_error(actuals, predictions, squared=False)
mae = mean_absolute_error(actuals, predictions)
return mse, rmse, mae
def predict(row, model):
row = row[0].flatten()
yhat = model(row)
yhat = yhat.detach().numpy()
return yhat
def main(epochs, save_model, log_step):
print(
f"Your model will be trained for {epochs} epochs, logging every {log_step} steps. Trained model will {'not ' if save_model else ''}be saved.")
# * Paths to data
avocado_data = ['./data/avocado.data.train',
'./data/avocado.data.valid',
'./data/avocado.data.test']
# * Data preparation
train_dl, validate_dl, test_dl = prepare_data(paths=avocado_data)
print(f"""
Train set size: {len(train_dl.dataset)},
Validate set size: {len(validate_dl.dataset)}
Test set size: {len(test_dl.dataset)}
""")
# * Model definition
# ! 66 - in case only regions and type are used (among all the categorical vals)
model = AvocadoRegressor(235)
# * Train model
print("Let's start the training, mate!")
with mlflow.start_run() as run:
print("MLflow run experiment_id: {0}".format(run.info.experiment_id))
print("MLflow run artifact_uri: {0}".format(run.info.artifact_uri))
metrics = train_model(train_dl=train_dl, model=model,
epochs=epochs, log_step=log_step)
mlflow.log_param('epochs', epochs)
mlflow.log_metrics(metrics)
# * Evaluate model
val_metrics = {key: val for key, val in zip(
['validate.mse', 'validate.rmse', 'validate.mae'], evaluate_model(validate_dl, model))}
print(
f"\nEvaluation on VALIDATION set\t\tMSE: {val_metrics['validate.mse']}, RMSE: {val_metrics['validate.rmse']}, MAE: {val_metrics['validate.mae']}")
mlflow.log_metrics(val_metrics)
test_loss = {key: val for key, val in zip(
['test.mse', 'test.rmse', 'test.mae'], evaluate_model(test_dl, model))}
print(
f"\nEvaluation on TEST set\t\tMSE: {test_loss['test.mse']}, RMSE: {test_loss['test.rmse']}, MAE: {test_loss['test.mae']}")
mlflow.log_metrics(test_loss)
# tracking_url_type_store = urlparse(mlflow.get_tracking_uri()).scheme
# if tracking_url_type_store != 'file':
# print('First option')
# model_logger.log_model(
# model, "avocados-model", registered_model_name="AvocadoModel_478841")
# else:
# print('Second option')
# model_logger.log_model(model, "model")
# * Save the trained model
if save_model:
print("Your model has been saved - have a nice day!")
scripted_model = torch.jit.script(model)
scripted_model.save('./data/model_scripted.pt')
# ex.add_artifact('./data/model_scripted.pt')
# ex.run()
if __name__ == '__main__':
# * Model parameters
parser = argparse.ArgumentParser(description="Script performing logistic regression model training",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument(
"-e", "--epochs", default=100, help="Number of epochs the model will be trained for")
parser.add_argument(
"-s", "--step", default=10, help="Number of steps to repeat logging loss values on")
parser.add_argument("--save", action="store_true",
help="Save trained model to file 'trained_model.h5'")
args = vars(parser.parse_args())
epochs = int(args['epochs'])
save_model = args['save']
log_step = int(args['step'])
main(epochs, save_model, log_step)

View File

@ -1,7 +1,3 @@
from audioop import rms
from cgi import test
from multiprocessing.spawn import prepare
from xml.etree.ElementPath import prepare_star
from sacred import Experiment
from sacred.observers import FileStorageObserver, MongoObserver