import os import random import torch from torch import nn, optim import torch.nn.functional as functional class LinearQNetwork(nn.Module): TORCH_ACTiVATIONS = 'tanh' def __init__(self, nn_params, input_size, output_size, randomize=True, params=None): super().__init__() self.id = 0 if params is None: params = {} self.params_choice = nn_params self.scores = [] self.network_params = params if randomize: self.randomize() self.layers = nn.ModuleList() if self.network_params['layers'] == 0: self.layers.append(nn.Linear(input_size, output_size)) else: self.layers.append(nn.Linear(input_size, self.network_params['neurons'])) for i in range(self.network_params['layers'] - 1): self.layers.append(nn.Linear(self.network_params['neurons'], self.network_params['neurons'])) if self.network_params['layers'] > 0: self.ending_linear = nn.Linear(self.network_params['neurons'], output_size) self.layers.append(self.ending_linear) if self.network_params['activation'] in self.TORCH_ACTiVATIONS: self.forward_func = getattr(torch, self.network_params['activation']) else: self.forward_func = getattr(functional, self.network_params['activation']) def randomize(self): """ Sets random parameters for network. """ for key in self.params_choice: self.network_params[key] = random.choice(self.params_choice[key]) def forward(self, x): for i in range(len(self.layers) - 1): x = self.forward_func(self.layers[i](x)) x = self.layers[-1](x) return x def save(self, file_name='model.pth'): model_directory = 'model' if not os.path.exists(model_directory): os.makedirs(model_directory) file_path = os.path.join(model_directory, file_name) torch.save(self.state_dict(), file_path) @staticmethod def load(params, input_size, output_size, file_name='model.pth'): model_directory = 'model' file_path = os.path.join(model_directory, file_name) if os.path.isfile(file_path): model = LinearQNetwork(params, input_size, output_size, True) model.load_state_dict(torch.load(file_path)) model.eval() return model raise Exception(f'Could not find file {file_path}.') class QTrainer: def __init__(self, model, lr, gamma, optimizer): self.model = model self.lr = lr self.gamma = gamma self.optimizer = getattr(optim, optimizer)(model.parameters(), lr=self.lr) # self.optimizer = optim.Adam(model.parameters(), lr=self.lr) self.criterion = nn.MSELoss() # Mean squared error def train_step(self, state, action, reward, next_state, done): state = torch.tensor(state, dtype=torch.float) next_state = torch.tensor(next_state, dtype=torch.float) action = torch.tensor(action, dtype=torch.long) reward = torch.tensor(reward, dtype=torch.float) if len(state.shape) == 1: # reshape the state to make its values an (n, x) tuple state = torch.unsqueeze(state, 0) next_state = torch.unsqueeze(next_state, 0) action = torch.unsqueeze(action, 0) reward = torch.unsqueeze(reward, 0) done = (done,) # Prediction based on simplified Bellman's equation # Predict Q values for current state prediction = self.model(state) target = prediction.clone() for idx in range(len(done)): Q = reward[idx] if not done[idx]: Q = reward[idx] + self.gamma * torch.max(self.model(next_state[idx])) # set the target of the maximum value of the action to Q target[idx][torch.argmax(action).item()] = Q # Apply the loss function self.optimizer.zero_grad() loss = self.criterion(target, prediction) loss.backward() self.optimizer.step()