111 lines
4.1 KiB
Python
111 lines
4.1 KiB
Python
import os
|
|
import random
|
|
|
|
import torch
|
|
from torch import nn, optim
|
|
import torch.nn.functional as functional
|
|
|
|
|
|
class LinearQNetwork(nn.Module):
|
|
TORCH_ACTiVATIONS = 'tanh'
|
|
|
|
def __init__(self, nn_params, input_size, output_size, randomize=True, params=None):
|
|
super().__init__()
|
|
self.id = 0
|
|
if params is None:
|
|
params = {}
|
|
self.params_choice = nn_params
|
|
self.scores = []
|
|
self.network_params = params
|
|
if randomize:
|
|
self.randomize()
|
|
self.layers = nn.ModuleList()
|
|
if self.network_params['layers'] == 0:
|
|
self.layers.append(nn.Linear(input_size, output_size))
|
|
else:
|
|
self.layers.append(nn.Linear(input_size, self.network_params['neurons']))
|
|
|
|
for i in range(self.network_params['layers'] - 1):
|
|
self.layers.append(nn.Linear(self.network_params['neurons'], self.network_params['neurons']))
|
|
if self.network_params['layers'] > 0:
|
|
self.ending_linear = nn.Linear(self.network_params['neurons'], output_size)
|
|
self.layers.append(self.ending_linear)
|
|
|
|
if self.network_params['activation'] in self.TORCH_ACTiVATIONS:
|
|
self.forward_func = getattr(torch, self.network_params['activation'])
|
|
else:
|
|
self.forward_func = getattr(functional, self.network_params['activation'])
|
|
|
|
def randomize(self):
|
|
"""
|
|
Sets random parameters for network.
|
|
"""
|
|
for key in self.params_choice:
|
|
self.network_params[key] = random.choice(self.params_choice[key])
|
|
|
|
def forward(self, x):
|
|
for i in range(len(self.layers) - 1):
|
|
x = self.forward_func(self.layers[i](x))
|
|
x = self.layers[-1](x)
|
|
return x
|
|
|
|
def save(self, file_name='model.pth'):
|
|
model_directory = 'model'
|
|
if not os.path.exists(model_directory):
|
|
os.makedirs(model_directory)
|
|
|
|
file_path = os.path.join(model_directory, file_name)
|
|
torch.save(self.state_dict(), file_path)
|
|
|
|
@staticmethod
|
|
def load(params, input_size, output_size, file_name='model.pth'):
|
|
model_directory = 'model'
|
|
file_path = os.path.join(model_directory, file_name)
|
|
if os.path.isfile(file_path):
|
|
model = LinearQNetwork(params, input_size, output_size, True)
|
|
model.load_state_dict(torch.load(file_path))
|
|
model.eval()
|
|
return model
|
|
raise Exception(f'Could not find file {file_path}.')
|
|
|
|
|
|
class QTrainer:
|
|
def __init__(self, model, lr, gamma, optimizer):
|
|
self.model = model
|
|
self.lr = lr
|
|
self.gamma = gamma
|
|
self.optimizer = getattr(optim, optimizer)(model.parameters(), lr=self.lr)
|
|
# self.optimizer = optim.Adam(model.parameters(), lr=self.lr)
|
|
self.criterion = nn.MSELoss() # Mean squared error
|
|
|
|
def train_step(self, state, action, reward, next_state, done):
|
|
state = torch.tensor(state, dtype=torch.float)
|
|
next_state = torch.tensor(next_state, dtype=torch.float)
|
|
action = torch.tensor(action, dtype=torch.long)
|
|
reward = torch.tensor(reward, dtype=torch.float)
|
|
|
|
if len(state.shape) == 1:
|
|
# reshape the state to make its values an (n, x) tuple
|
|
state = torch.unsqueeze(state, 0)
|
|
next_state = torch.unsqueeze(next_state, 0)
|
|
action = torch.unsqueeze(action, 0)
|
|
reward = torch.unsqueeze(reward, 0)
|
|
done = (done,)
|
|
|
|
# Prediction based on simplified Bellman's equation
|
|
# Predict Q values for current state
|
|
prediction = self.model(state)
|
|
target = prediction.clone()
|
|
for idx in range(len(done)):
|
|
Q = reward[idx]
|
|
if not done[idx]:
|
|
Q = reward[idx] + self.gamma * torch.max(self.model(next_state[idx]))
|
|
# set the target of the maximum value of the action to Q
|
|
target[idx][torch.argmax(action).item()] = Q
|
|
# Apply the loss function
|
|
self.optimizer.zero_grad()
|
|
loss = self.criterion(target, prediction)
|
|
loss.backward()
|
|
|
|
self.optimizer.step()
|