genetic route algorithm
0
Andrzej_Preibisz.md
Normal file → Executable file
0
Drzewo.png
Normal file → Executable file
Before Width: | Height: | Size: 338 KiB After Width: | Height: | Size: 338 KiB |
0
Test_Results.png
Normal file → Executable file
Before Width: | Height: | Size: 74 KiB After Width: | Height: | Size: 74 KiB |
210
agent.py
Normal file → Executable file
@ -1,11 +1,16 @@
|
||||
from pandas.tests.io.json.test_ujson import numpy
|
||||
|
||||
from warehouse import Coordinates, Tile, Pack
|
||||
from queue import PriorityQueue
|
||||
from math import sqrt
|
||||
from math import sqrt, ceil
|
||||
from attributes import TURN_LEFT_DIRECTIONS, TURN_RIGHT_DIRECTIONS, PackStatus
|
||||
import pygame
|
||||
import sys
|
||||
import pdb
|
||||
import numpy as np
|
||||
from package_location_classifier.classifier import PackageLocationClassifier
|
||||
from genetic_route import genetic_trace_route
|
||||
|
||||
|
||||
class Node:
|
||||
def __init__(self, coord_x, coord_y, package=None, is_rack=False):
|
||||
@ -16,9 +21,11 @@ class Node:
|
||||
self.is_rack = is_rack
|
||||
self.g_cost = 0
|
||||
self.h_cost = 0
|
||||
self.graph_map = None
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, Node):
|
||||
return self.x == other.x and self.y == self.y
|
||||
return self.x == other.x and self.y == other.y
|
||||
return False
|
||||
|
||||
def __lt__(self, other):
|
||||
@ -27,50 +34,61 @@ class Node:
|
||||
def __repr__(self):
|
||||
return "Node:{}x{}".format(self.x, self.y)
|
||||
|
||||
|
||||
class Agent:
|
||||
def __init__(self, start_x, start_y, assigned_warehouse, radius=5):
|
||||
self.x = start_x
|
||||
self.y = start_y
|
||||
self.radius = radius
|
||||
self.warehouse = assigned_warehouse
|
||||
self.graph_map = np.zeros(((self.warehouse.no_of_packages * 2) + 1, (self.warehouse.no_of_packages * 2) + 1))
|
||||
self.is_loaded = False
|
||||
self.transported_package = None
|
||||
self.direction = "up"
|
||||
self.dest = Node(1, 1)
|
||||
self.closed = list()
|
||||
self.open = PriorityQueue()
|
||||
self.path = list()
|
||||
self.location_classifier = PackageLocationClassifier()
|
||||
self.check_packages_locations()
|
||||
self.route = []
|
||||
self.destination = None
|
||||
|
||||
def check_packages_locations(self):
|
||||
for pack in self.warehouse.packages:
|
||||
if pack.lays_on_field.category.name in self.warehouse.storage_types:
|
||||
can_place = self.location_classifier.check_if_can_place(pack, pack.lays_on_field)
|
||||
pack.status = PackStatus.STORED if (can_place and pack.lays_on_field.capacity >= 0) else PackStatus.STORED_BAD_LOCATION
|
||||
pack.status = PackStatus.STORED if (
|
||||
can_place and pack.lays_on_field.capacity >= 0) else PackStatus.STORED_BAD_LOCATION
|
||||
|
||||
|
||||
def find_path(self):
|
||||
def find_path(self, *args, **kwargs):
|
||||
self.closed = []
|
||||
self.path = []
|
||||
self.open = PriorityQueue()
|
||||
packages_to_go = [p for p in self.warehouse.packages if p.status != PackStatus.STORED]
|
||||
if not packages_to_go and not self.transported_package:
|
||||
return
|
||||
if self.is_loaded:
|
||||
rack = self.find_nearest_rack_for(self.transported_package)
|
||||
self.dest = Node(rack.x_position, rack.y_position, is_rack=True)
|
||||
if len(args) != 0:
|
||||
if len(args) == 1:
|
||||
start_node = Node(self.x, self.y)
|
||||
else:
|
||||
start_node = args[1]
|
||||
self.destination = args[0]
|
||||
else:
|
||||
package = self.find_nearest_package()
|
||||
self.dest = Node(package.lays_on_field.x_position, package.lays_on_field.y_position, package=package)
|
||||
packages_to_go = [p for p in self.warehouse.packages if p.status != PackStatus.STORED]
|
||||
if not packages_to_go and not self.transported_package:
|
||||
return
|
||||
if self.is_loaded:
|
||||
print(self.transported_package)
|
||||
rack = self.find_nearest_rack_for(self.transported_package)
|
||||
self.destination = Node(rack.x_position, rack.y_position, is_rack=True)
|
||||
else:
|
||||
package = self.find_package()
|
||||
self.destination = Node(package.lays_on_field.x_position, package.lays_on_field.y_position, package=package)
|
||||
|
||||
start_node = Node(self.x, self.y)
|
||||
start_node = Node(self.x, self.y)
|
||||
|
||||
self.open.put((0, start_node))
|
||||
while self.open:
|
||||
_, current_node = self.open.get()
|
||||
self.closed.append(current_node)
|
||||
if current_node.x == self.dest.x and current_node.y == self.dest.y:
|
||||
if current_node.x == self.destination.x and current_node.y == self.destination.y:
|
||||
while current_node.x != start_node.x or current_node.y != start_node.y:
|
||||
self.path.append(current_node)
|
||||
current_node = current_node.parent
|
||||
@ -86,7 +104,7 @@ class Agent:
|
||||
neighbour.parent = current_node
|
||||
else:
|
||||
neighbour.g_cost = cost
|
||||
neighbour.h_cost = self.heuristic(neighbour, self.dest)
|
||||
neighbour.h_cost = self.heuristic(neighbour, self.destination)
|
||||
neighbour.parent = current_node
|
||||
self.open.put((neighbour.g_cost, neighbour))
|
||||
return False
|
||||
@ -103,10 +121,10 @@ class Agent:
|
||||
diff_x = pow(goal.x - start.x, 2)
|
||||
diff_y = pow(goal.y - start.y, 2)
|
||||
additional_cost = 0
|
||||
return round(sqrt(diff_x + diff_y), 3) + float(10*additional_cost)
|
||||
|
||||
return round(sqrt(diff_x + diff_y), 3) + float(10 * additional_cost)
|
||||
|
||||
def check_if_open(self, node: Node):
|
||||
return (node.x, node.y) in [(n.x, n.y) for (_,n) in self.open.queue]
|
||||
return (node.x, node.y) in [(n.x, n.y) for (_, n) in self.open.queue]
|
||||
|
||||
def check_if_closed(self, node: Node):
|
||||
return (node.x, node.y) in [(n.x, n.y) for n in self.closed]
|
||||
@ -122,80 +140,72 @@ class Agent:
|
||||
if self.check_if_can_move(Coordinates(node.x, node.y - 1)):
|
||||
neighbours.append(Node(node.x, node.y - 1))
|
||||
return neighbours
|
||||
|
||||
|
||||
def move(self):
|
||||
dest_coords = (self.dest.x, self.dest.y)
|
||||
if not self.path:
|
||||
if not self.find_path():
|
||||
return
|
||||
else:
|
||||
next = self.path.pop()
|
||||
star_dir = self.direction
|
||||
if self.x > next.x and not self.direction == 'left':
|
||||
if self.direction == 'down':
|
||||
self.turn_right()
|
||||
else:
|
||||
self.turn_left()
|
||||
elif self.x < next.x and not self.direction == 'right':
|
||||
if self.direction == 'down':
|
||||
self.turn_left()
|
||||
else:
|
||||
self.turn_right()
|
||||
elif self.y > next.y and not self.direction == 'up':
|
||||
if self.direction == 'left':
|
||||
self.turn_right()
|
||||
else:
|
||||
self.turn_left()
|
||||
elif self.y < next.y and not self.direction == 'down':
|
||||
if self.direction == 'right':
|
||||
self.turn_right()
|
||||
else:
|
||||
self.turn_left()
|
||||
|
||||
if (next.x, next.y) == dest_coords:
|
||||
if self.dest.package:
|
||||
self.pick_up_package(self.dest.package)
|
||||
return
|
||||
elif self.dest.is_rack:
|
||||
self.unload_package(self.dest)
|
||||
return
|
||||
|
||||
if star_dir == self.direction:
|
||||
self.x = next.x
|
||||
self.y = next.y
|
||||
if len(self.path) == 0:
|
||||
self.find_path()
|
||||
next_coord = self.path.pop()
|
||||
star_dir = self.direction
|
||||
if self.x > next_coord.x and not self.direction == 'left':
|
||||
if self.direction == 'down':
|
||||
self.turn_right()
|
||||
else:
|
||||
self.path.append(next)
|
||||
self.turn_left()
|
||||
elif self.x < next_coord.x and not self.direction == 'right':
|
||||
if self.direction == 'down':
|
||||
self.turn_left()
|
||||
else:
|
||||
self.turn_right()
|
||||
elif self.y > next_coord.y and not self.direction == 'up':
|
||||
if self.direction == 'left':
|
||||
self.turn_right()
|
||||
else:
|
||||
self.turn_left()
|
||||
elif self.y < next_coord.y and not self.direction == 'down':
|
||||
if self.direction == 'right':
|
||||
self.turn_right()
|
||||
else:
|
||||
self.turn_left()
|
||||
if Node(next_coord.x, next_coord.y) == self.destination or Node(self.x, self.y) == self.destination:
|
||||
if self.destination.package:
|
||||
self.pick_up_package(self.destination.package)
|
||||
return
|
||||
elif self.destination.is_rack:
|
||||
self.unload_package(self.destination)
|
||||
return
|
||||
|
||||
if star_dir == self.direction:
|
||||
self.x = next_coord.x
|
||||
self.y = next_coord.y
|
||||
else:
|
||||
self.path.append(next_coord)
|
||||
self.closed = []
|
||||
|
||||
|
||||
def check_if_can_move(self, next_coords: Coordinates):
|
||||
tile_on_map = 0 <= next_coords.x < self.warehouse.width and 0 <= next_coords.y < self.warehouse.height
|
||||
tile_passable = True
|
||||
if not tile_on_map:
|
||||
return False
|
||||
next_tile = self.warehouse.tiles[next_coords.x][next_coords.y]
|
||||
if (next_coords.x, next_coords.y) != (self.dest.x, self.dest.y):
|
||||
if (next_coords.x, next_coords.y) != (self.destination.x, self.destination.y):
|
||||
tile_passable = isinstance(next_tile, Tile) and next_tile.category.passable
|
||||
return tile_passable
|
||||
|
||||
def find_nearest_package(self):
|
||||
packages_costs = []
|
||||
start_node = Node(self.x, self.y)
|
||||
if not self.warehouse.packages:
|
||||
return None
|
||||
for package in self.warehouse.packages:
|
||||
if package.status == PackStatus.STORED:
|
||||
continue
|
||||
new_node = Node(package.lays_on_field.x_position, package.lays_on_field.y_position)
|
||||
cost = self.heuristic(start_node, new_node)
|
||||
if cost > 0:
|
||||
packages_costs.append((package, cost))
|
||||
if not packages_costs:
|
||||
return
|
||||
# pygame.quit()
|
||||
# sys.exit()
|
||||
|
||||
package = min(packages_costs, key=lambda l: l[1])[0]
|
||||
return package
|
||||
def find_package(self):
|
||||
if len(self.route) == 0:
|
||||
pygame.quit()
|
||||
sys.exit()
|
||||
pack_id = self.route[0]
|
||||
self.route = self.route[1:]
|
||||
print("Next package ID:")
|
||||
print(pack_id)
|
||||
dst_package = None
|
||||
for pack in self.warehouse.packages:
|
||||
if pack.id + 1 == pack_id:
|
||||
dst_package = pack
|
||||
break
|
||||
if dst_package is not None:
|
||||
return dst_package
|
||||
|
||||
def rack_heuristics(self, start, goal, can_place):
|
||||
heur_can_place = not can_place
|
||||
@ -210,8 +220,8 @@ class Agent:
|
||||
if package.category == "freezed":
|
||||
storage = "Fridge"
|
||||
start_node = Node(self.x, self.y)
|
||||
quarter_x = int(self.warehouse.width/4) + expand_box
|
||||
quarter_y = int(self.warehouse.height/4) + expand_box
|
||||
quarter_x = int(self.warehouse.width / 4) + expand_box
|
||||
quarter_y = int(self.warehouse.height / 4) + expand_box
|
||||
start_quarter_x = self.x - quarter_x if self.x - quarter_x > 0 else 0
|
||||
end_quarter_x = self.x + quarter_x if self.x + quarter_x < self.warehouse.width else self.warehouse.width - 1
|
||||
start_quarter_y = self.y - quarter_y if self.y - quarter_y > 0 else 0
|
||||
@ -227,17 +237,17 @@ class Agent:
|
||||
if cost > 0:
|
||||
racks_costs.append((rack, cost))
|
||||
|
||||
rack = self.find_nearest_rack_for(package, expand_box + 1) if not racks_costs else min(racks_costs, key=lambda l: l[1])[0]
|
||||
rack = self.find_nearest_rack_for(package, expand_box + 1) if not racks_costs else \
|
||||
min(racks_costs, key=lambda l: l[1])[0]
|
||||
return rack
|
||||
|
||||
|
||||
def pick_up_package(self, pack):
|
||||
self.warehouse.packages.remove(pack)
|
||||
self.is_loaded = True
|
||||
if pack.lays_on_field.category.name in ['Rack', 'Fridge']:
|
||||
pack.lays_on_field.capacity += pack.size
|
||||
|
||||
self.dest.package = None
|
||||
self.destination.package = None
|
||||
pack.lays_on_field = None
|
||||
self.transported_package = pack
|
||||
|
||||
@ -250,4 +260,28 @@ class Agent:
|
||||
pack.lays_on_field.capacity -= pack.size
|
||||
pack.status = PackStatus.STORED
|
||||
self.warehouse.packages.append(pack)
|
||||
# print(tile.air_temperature, tile.humidity)
|
||||
|
||||
# print(tile.air_temperature, tile.humidity)
|
||||
|
||||
def create_graph_map(self):
|
||||
for package1 in self.warehouse.packages:
|
||||
rack = self.find_nearest_rack_for(package1)
|
||||
for package2 in self.warehouse.packages:
|
||||
self.find_path(Node(package2.x, package2.y), Node(rack.x_position, rack.y_position))
|
||||
self.graph_map[package2.id + 1][package1.id + ceil((len(self.warehouse.packages) / 2)) + 1] = len(self.path)
|
||||
self.graph_map[package1.id + ceil((len(self.warehouse.packages) / 2)) + 1][package2.id + 1] = len(self.path)
|
||||
if package1 == package2:
|
||||
continue
|
||||
self.find_path(Node(package1.x, package1.y), Node(package2.x, package2.y))
|
||||
self.graph_map[package1.id + 1][package2.id + 1] = len(self.path)
|
||||
self.find_path(Node(package1.x, package1.y))
|
||||
self.graph_map[package1.id + 1][0] = len(self.path)
|
||||
self.graph_map[0][package1.id + 1] = len(self.path)
|
||||
print(self.graph_map)
|
||||
|
||||
def trace_route(self):
|
||||
for packs in self.warehouse.packages:
|
||||
print(packs.id)
|
||||
self.route = genetic_trace_route(self.graph_map, len(self.warehouse.packages))
|
||||
print("best route")
|
||||
print(self.route)
|
||||
|
0
attributes.py
Normal file → Executable file
0
environment.md
Normal file → Executable file
BIN
forklift_loaded.png
Executable file
After Width: | Height: | Size: 1.2 KiB |
0
genetic_algorithm.md
Normal file → Executable file
0
genetic_algorithm.py
Normal file → Executable file
103
genetic_route.py
Executable file
@ -0,0 +1,103 @@
|
||||
import numpy as np
|
||||
from random import randrange, random
|
||||
from math import floor
|
||||
import copy
|
||||
|
||||
num_of_surviving = 6
|
||||
num_of_couples = 12
|
||||
mutation_probability = 0.07
|
||||
max_population = 20
|
||||
iterations = 50
|
||||
|
||||
|
||||
def create_new_route(points):
|
||||
route = np.random.permutation(points)
|
||||
route = [x + 1 for x in route]
|
||||
return route
|
||||
|
||||
|
||||
def create_population(points):
|
||||
population = []
|
||||
for i in range(max_population):
|
||||
population.append(create_new_route(points))
|
||||
return population
|
||||
|
||||
|
||||
def score_route(graph_map, route):
|
||||
score = graph_map[0][route[0]]
|
||||
for i in range(len(route) - 2):
|
||||
rack = len(route) + route[0]
|
||||
score = score + graph_map[rack][route[i + 1]]
|
||||
score = score + graph_map[route[i + 1]][route[i + 2]]
|
||||
return score
|
||||
|
||||
|
||||
def score_all(graph_map, population):
|
||||
scores = []
|
||||
for i in range(len(population)):
|
||||
tmp = [i, score_route(graph_map, population[i])]
|
||||
scores.append(tmp)
|
||||
return scores
|
||||
|
||||
|
||||
def crossover(a, b):
|
||||
new_a = copy.deepcopy(a)
|
||||
new_b = copy.deepcopy(b)
|
||||
for i in range(floor(len(a) / 2)):
|
||||
rel = randrange(len(a))
|
||||
tmp_a = new_a[rel]
|
||||
tmp_b = new_b[rel]
|
||||
if tmp_a == tmp_b:
|
||||
continue
|
||||
new_a[new_a.index(tmp_b)] = tmp_a
|
||||
new_b[new_b.index(tmp_a)] = tmp_b
|
||||
new_a[rel] = tmp_b
|
||||
new_b[rel] = tmp_a
|
||||
|
||||
return new_a, new_b
|
||||
|
||||
|
||||
def mutate(route):
|
||||
new_route = copy.deepcopy(route)
|
||||
for i in range(len(route) - 1):
|
||||
if random() < mutation_probability:
|
||||
tmp = new_route[i]
|
||||
new_route[i] = new_route[i + 1]
|
||||
new_route[i + 1] = tmp
|
||||
return new_route
|
||||
|
||||
|
||||
def genetic_trace_route(graph_map, packages):
|
||||
population = create_population(packages)
|
||||
for i in range(iterations):
|
||||
new_population = []
|
||||
scores = score_all(graph_map, population)
|
||||
scores.sort(key=lambda x: x[1])
|
||||
# breeding
|
||||
for j in range(0, num_of_couples, 2):
|
||||
a, b = crossover(population[scores[j][0]], population[scores[j+1][0]])
|
||||
new_population.append(a)
|
||||
new_population.append(b)
|
||||
# mutations
|
||||
for j in range(len(new_population)):
|
||||
mutate(new_population[j])
|
||||
# survival
|
||||
for j in range(0, num_of_surviving):
|
||||
new_population.append(population[scores[j][0]])
|
||||
# random new
|
||||
for j in range(max_population - (num_of_surviving + num_of_couples)):
|
||||
new_population.append(create_new_route(packages))
|
||||
population.clear()
|
||||
population = copy.deepcopy(new_population)
|
||||
scores = score_all(graph_map, population)
|
||||
scores.sort(key=lambda x: x[1])
|
||||
print(population)
|
||||
print(scores[0][1])
|
||||
|
||||
scores = score_all(graph_map, population)
|
||||
scores.sort(key=lambda x: x[1])
|
||||
print("best route lenght")
|
||||
print(scores[0][1])
|
||||
print("secand best route lenght")
|
||||
print(scores[1][1])
|
||||
return population[scores[0][0]]
|
13
main.py
Normal file → Executable file
@ -26,6 +26,7 @@ class MainGameFrame:
|
||||
pygame.display.set_caption("Smart ForkLift")
|
||||
agent_radius = int(TILE_WIDTH/2)
|
||||
self.agent_tex = pygame.image.load('forklift.png')
|
||||
self.agent_tex2 = pygame.image.load('forklift_loaded.png')
|
||||
self.font = pygame.font.Font('freesansbold.ttf', 16)
|
||||
self.warehouse_map = warehouse.Warehouse(20, 20, 150, 10)
|
||||
starting_x, starting_y = self.set_starting_agent_position()
|
||||
@ -41,6 +42,9 @@ class MainGameFrame:
|
||||
print("wagi paczek: ",packs_sizes)
|
||||
print("pojemności regałów: ",racks_capacities)
|
||||
gen_alg(packs_sizes, racks_capacities, number_of_generations, generation_size, mutation_prob, amount_of_promotion)
|
||||
self.agent.create_graph_map()
|
||||
self.agent.trace_route()
|
||||
self.agent.find_path()
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
@ -101,9 +105,16 @@ class MainGameFrame:
|
||||
if cell.category.name in self.warehouse_map.storage_types:
|
||||
text_surface = self.font.render(str(cell.capacity), True, (0, 0, 0))
|
||||
self.display.blit(text_surface, ((cell.x_position * TILE_WIDTH) + 6, (cell.y_position * TILE_HEIGHT) + 6))
|
||||
for package in self.warehouse_map.packages:
|
||||
if package.status == PackStatus.LOOSE:
|
||||
text_surface = self.font.render(str(package.id + 1), True, (0, 0, 0))
|
||||
self.display.blit(text_surface, ((package.lays_on_field.x_position * TILE_WIDTH) + 6, (package.lays_on_field.y_position * TILE_HEIGHT) + 6))
|
||||
|
||||
def draw_agent(self):
|
||||
rotated = pygame.transform.rotate(self.agent_tex, DIRECTION_ANGLES.get(self.agent.direction))
|
||||
if self.agent.is_loaded:
|
||||
rotated = pygame.transform.rotate(self.agent_tex2, DIRECTION_ANGLES.get(self.agent.direction))
|
||||
else:
|
||||
rotated = pygame.transform.rotate(self.agent_tex, DIRECTION_ANGLES.get(self.agent.direction))
|
||||
self.display.blit(rotated, (self.agent.x*TILE_WIDTH, self.agent.y*TILE_WIDTH))
|
||||
|
||||
def set_starting_agent_position(self):
|
||||
|
0
products_types.py
Normal file → Executable file
0
route_planning.md
Normal file → Executable file
0
sieci_n.py
Normal file → Executable file
0
srodowisko_agenta.png
Normal file → Executable file
Before Width: | Height: | Size: 92 KiB After Width: | Height: | Size: 92 KiB |
3
warehouse.py
Normal file → Executable file
@ -242,6 +242,9 @@ class Warehouse:
|
||||
package_field = self.tiles[pack_x][pack_y]
|
||||
new_package = Pack(lays_on_field=package_field)
|
||||
new_package.size = new_package_size
|
||||
new_package.id = i
|
||||
new_package.x = pack_x
|
||||
new_package.y = pack_y
|
||||
if package_field.category.name in self.storage_types:
|
||||
package_field.capacity -= new_package.size
|
||||
packages.append(new_package)
|
||||
|