genetic algorithm

This commit is contained in:
kamil 2023-06-16 08:25:12 +02:00
parent ca23054983
commit 57bc7f081f
4 changed files with 163 additions and 24 deletions

View File

@ -12,16 +12,14 @@ store = ImageController(SQUARE_SIZE)
waiter = Waiter([0, 0], 0, SQUARE_SIZE, SCREEN_SIZE, store)
kitchen = Kitchen([0, 0], 0, SQUARE_SIZE, SCREEN_SIZE, store)
engine = Engine(SCREEN_SIZE, SQUARE_SIZE, kitchen, waiter, ACTION_DURATION)
layout = LayoutController(engine, store).create_and_subscribe(
radius=2, neighbors_count=3, block_chance=30)
layout = LayoutController(engine, store)
params = layout.train_loop(10)
layout.create_and_subscribe(
params['radius'],
params['neighbors_count'],
params['block_probability']
)
engine.loop()
'''
def example_stop(action_clock: int) -> bool:
return action_clock < 1000
print(engine.train_loop(example_stop))
'''

View File

@ -1,4 +1,3 @@
import copy
import time
import pygame
from queue import PriorityQueue
@ -80,6 +79,10 @@ class Engine:
def train_loop(self, stop_condition):
print(colored("Simulation started", "green"))
print(colored("Objects count: ", "blue"), len(self.objects))
if not any([type(o) == Table for o in self.objects]):
return 0
real_action_duration = self.action_duration
real_waiter_state = self.waiter.makeCopy()
@ -91,18 +94,18 @@ class Engine:
while stop_condition(self.action_clock):
self.action()
self.redraw()
result = self.serviced_tables
self.action_duration = real_action_duration
self.action_clock = 0
self.serviced_tables = 0
self.is_simulation = False
self.predictor_c.is_simulation = False
self.waiter.applyState(real_waiter_state)
self.kitchen.restoreDefaultState()
self.unsubscribe_all()
return result
def unsubscribe_all(self):
@ -140,15 +143,15 @@ class Engine:
self.state_c.reset()
return
# STEP 4. Follow the path
# STEP 5. Follow the path
state = self.waiter.changeState(self.state_c.path.pop())
self.clock_increment(state.cost)
# STEP 5. Log state details
# STEP 6. Log state details
self.log_state(state)
# STEP 5. Interactions with the waiter
# STEP 7. Interactions with the waiter
for o in self.objects:
if type(o) is Table:
@ -159,11 +162,11 @@ class Engine:
self.kitchen.action(self)
# STEP 6. Update kitchen state
# STEP 8. Update kitchen state
self.kitchen.updateMark()
# STEP 7. Wait
# STEP 9. Wait
time.sleep(self.action_duration)
@ -176,7 +179,7 @@ class Engine:
def unattainable_goal(self):
if not self.is_simulation:
print(colored("Object unattainable", "red"))
self.objects.remove(self.goal.parent)
self.clock_increment(1000)
self.revoke_goal()
def log_state(self, state: TemporaryState):
@ -239,6 +242,7 @@ class Engine:
pygame.display.flip()
def predict_goal(self):
self.revoke_goal()
# STEP 1. Prepare priority queue for potential goals
goal_queue = PriorityQueue()
@ -256,7 +260,7 @@ class Engine:
# STEP 3. Collecting data about the current state of the tables
if not type(o) is Table or o.on(self.waiter.position):
if not type(o) is Table:
continue
medium_dist = (self.screen_size[0] // self.square_size) // 2

View File

@ -1,8 +1,10 @@
import random
import time
import numpy as np
from src.obj.Block import Block
from src.obj.Table import Table
from src.obj.PriorityItem import PriorityItem
from queue import PriorityQueue
from termcolor import colored
class LayoutController():
@ -16,6 +18,129 @@ class LayoutController():
self.enginie = engine
self.store = store
def load_params(self) -> bool:
pass
def store_params(self):
pass
def get_random_params(self):
params = {
'radius': self.get_random_radius(),
'neighbors_count': self.get_random_neighbors_count(),
'block_probability': self.get_random_block_probability()
}
return params
def get_random_radius(self):
return random.randint(1, (self.enginie.num_squares // 4))
def get_random_neighbors_count(self):
return random.randint(1, self.enginie.num_squares // 3)
def get_random_block_probability(self):
return random.randint(25, 75)
def simulation(self, child_params=None):
unit_number = 0
def stop(action_clock: int) -> bool:
return action_clock < 1000
population = PriorityQueue()
for i in range(20):
params = (
child_params[i]
if child_params
else self.get_random_params()
)
self.create_and_subscribe(
params['radius'],
params['neighbors_count'],
params['block_probability'])
priority = self.enginie.train_loop(stop)
population.put(
PriorityItem(
self.get_random_params(),
priority=priority,
reversed=True
)
)
unit_number += 1
print(
colored(f"unit #{unit_number}", "yellow"),
colored("radius:", "blue"),
f"{params['radius']}",
colored("neighbors_count:", "blue"),
f"{params['neighbors_count']}",
colored("block_probability:", "blue"),
f"{params['block_probability']}",
colored("fit rate:", "blue"),
f"{priority}"
)
return population
def reproduction(self, parents):
parents.queue = parents.queue[:5]
children = []
while len(children) < 20:
p1 = random.choice(parents.queue)
p2 = random.choice(parents.queue)
child = {
'radius': random.choice([p1.o()['radius'], p2.o()['radius']]),
'neighbors_count': random.choice([p1.o()['neighbors_count'], p2.o()['neighbors_count']]),
'block_probability': random.choice([p1.o()['block_probability'], p2.o()['block_probability']])
}
children.append(child)
return children
def mutation(self, params):
for _ in range(5):
unit = random.choice(params)
mutation_type = random.choice(
[
'radius',
'neighbors_count',
'block_probability'
]
)
if mutation_type == 'radius':
unit[mutation_type] = self.get_random_radius()
elif mutation_type == 'neighbors_count':
unit[mutation_type] = self.get_random_neighbors_count()
elif mutation_type == 'block_probability':
unit[mutation_type] = self.get_random_block_probability()
return params
def train_loop(self, goal):
best = 0
generation = 0
params = []
population = []
while best < goal:
generation += 1
population = self.simulation(params)
print(
colored(f"generation #{generation}", "yellow"),
colored("best fit:", "blue"),
population.queue[0].p()
)
best = population.queue[0].p()
params = self.reproduction(population)
params = self.mutation(params)
return population.queue[0].o()
def create_and_subscribe(self, radius: int, neighbors_count: int, block_probability: int):
'''
That function generate a location and a type of objects
@ -30,6 +155,8 @@ class LayoutController():
square_size = self.enginie.square_size
screen_size = self.enginie.screen_size
self.enginie.unsubscribe_all()
pos_matrix = np.full((num_squares, num_squares), -1)
def getFreeSquare(m):
@ -101,7 +228,7 @@ class LayoutController():
for i in range(num_squares):
for j in range(num_squares):
if pos_matrix[i][j] == 1:
if (random.randint(0, 100) >= block_probability):
if (random.randint(0, 100) <= block_probability):
objects.append(
Block([i, j], 0, square_size, screen_size, store))
else:

View File

@ -1,7 +1,17 @@
class PriorityItem():
def __init__(self, obj, priority):
def __init__(self, obj, priority, reversed=False):
self.obj = obj
self.priority = priority
self.reversed = reversed
self.priority = -priority if reversed else priority
def set_priority(self, priority):
self.priority = -priority if self.reversed else priority
def p(self):
return -self.priority if self.reversed else self.priority
def o(self):
return self.obj
def __eq__(self, __value) -> bool:
return self.priority == __value.priority