from pandas.tests.io.json.test_ujson import numpy from warehouse import Coordinates, Tile, Pack from queue import PriorityQueue from math import sqrt, ceil from attributes import TURN_LEFT_DIRECTIONS, TURN_RIGHT_DIRECTIONS, PackStatus import pygame import sys import pdb import numpy as np from package_location_classifier.classifier import PackageLocationClassifier from genetic_route import genetic_trace_route class Node: def __init__(self, coord_x, coord_y, package=None, is_rack=False): self.x = coord_x self.y = coord_y self.parent = None self.package = package self.is_rack = is_rack self.g_cost = 0 self.h_cost = 0 self.graph_map = None def __eq__(self, other): if isinstance(other, Node): return self.x == other.x and self.y == other.y return False def __lt__(self, other): return isinstance(other, Node) and self.g_cost < other.g_cost def __repr__(self): return "Node:{}x{}".format(self.x, self.y) class Agent: def __init__(self, start_x, start_y, assigned_warehouse, radius=5): self.x = start_x self.y = start_y self.radius = radius self.warehouse = assigned_warehouse self.graph_map = np.zeros(((self.warehouse.no_of_packages * 2) + 1, (self.warehouse.no_of_packages * 2) + 1)) self.is_loaded = False self.transported_package = None self.direction = "up" self.closed = list() self.open = PriorityQueue() self.path = list() self.location_classifier = PackageLocationClassifier() self.check_packages_locations() self.route = [] self.destination = None def check_packages_locations(self): for pack in self.warehouse.packages: if pack.lays_on_field.category.name in self.warehouse.storage_types: can_place = self.location_classifier.check_if_can_place(pack, pack.lays_on_field) pack.status = PackStatus.STORED if ( can_place and pack.lays_on_field.capacity >= 0) else PackStatus.STORED_BAD_LOCATION def find_path(self, *args, **kwargs): self.closed = [] self.path = [] self.open = PriorityQueue() if len(args) != 0: if len(args) == 1: start_node = Node(self.x, self.y) else: start_node = args[1] self.destination = args[0] else: packages_to_go = [p for p in self.warehouse.packages if p.status != PackStatus.STORED] if not packages_to_go and not self.transported_package: return if self.is_loaded: print(self.transported_package) rack = self.find_nearest_rack_for(self.transported_package) self.destination = Node(rack.x_position, rack.y_position, is_rack=True) else: package = self.find_package() self.destination = Node(package.lays_on_field.x_position, package.lays_on_field.y_position, package=package) start_node = Node(self.x, self.y) self.open.put((0, start_node)) while self.open: _, current_node = self.open.get() self.closed.append(current_node) if current_node.x == self.destination.x and current_node.y == self.destination.y: while current_node.x != start_node.x or current_node.y != start_node.y: self.path.append(current_node) current_node = current_node.parent return True neighbour_list = self.get_neighbours(current_node) for neighbour in neighbour_list: cost = current_node.g_cost + self.heuristic(current_node, neighbour) if self.check_if_closed(neighbour): continue if self.check_if_open(neighbour): if neighbour.g_cost > cost: neighbour.g_cost = cost neighbour.parent = current_node else: neighbour.g_cost = cost neighbour.h_cost = self.heuristic(neighbour, self.destination) neighbour.parent = current_node self.open.put((neighbour.g_cost, neighbour)) return False def turn_left(self): new_direction = TURN_LEFT_DIRECTIONS.get(self.direction, self.direction) self.direction = new_direction def turn_right(self): new_direction = TURN_RIGHT_DIRECTIONS.get(self.direction, self.direction) self.direction = new_direction def heuristic(self, start: Node, goal: Node): diff_x = pow(goal.x - start.x, 2) diff_y = pow(goal.y - start.y, 2) additional_cost = 0 return round(sqrt(diff_x + diff_y), 3) + float(10 * additional_cost) def check_if_open(self, node: Node): return (node.x, node.y) in [(n.x, n.y) for (_, n) in self.open.queue] def check_if_closed(self, node: Node): return (node.x, node.y) in [(n.x, n.y) for n in self.closed] def get_neighbours(self, node: Node): neighbours = [] if self.check_if_can_move(Coordinates(node.x + 1, node.y)): neighbours.append(Node(node.x + 1, node.y)) if self.check_if_can_move(Coordinates(node.x - 1, node.y)): neighbours.append(Node(node.x - 1, node.y)) if self.check_if_can_move(Coordinates(node.x, node.y + 1)): neighbours.append(Node(node.x, node.y + 1)) if self.check_if_can_move(Coordinates(node.x, node.y - 1)): neighbours.append(Node(node.x, node.y - 1)) return neighbours def move(self): if len(self.path) == 0: self.find_path() next_coord = self.path.pop() star_dir = self.direction if self.x > next_coord.x and not self.direction == 'left': if self.direction == 'down': self.turn_right() else: self.turn_left() elif self.x < next_coord.x and not self.direction == 'right': if self.direction == 'down': self.turn_left() else: self.turn_right() elif self.y > next_coord.y and not self.direction == 'up': if self.direction == 'left': self.turn_right() else: self.turn_left() elif self.y < next_coord.y and not self.direction == 'down': if self.direction == 'right': self.turn_right() else: self.turn_left() if Node(next_coord.x, next_coord.y) == self.destination or Node(self.x, self.y) == self.destination: if self.destination.package: self.pick_up_package(self.destination.package) return elif self.destination.is_rack: self.unload_package(self.destination) return if star_dir == self.direction: self.x = next_coord.x self.y = next_coord.y else: self.path.append(next_coord) self.closed = [] def check_if_can_move(self, next_coords: Coordinates): tile_on_map = 0 <= next_coords.x < self.warehouse.width and 0 <= next_coords.y < self.warehouse.height tile_passable = True if not tile_on_map: return False next_tile = self.warehouse.tiles[next_coords.x][next_coords.y] if (next_coords.x, next_coords.y) != (self.destination.x, self.destination.y): tile_passable = isinstance(next_tile, Tile) and next_tile.category.passable return tile_passable def find_package(self): if len(self.route) == 0: pygame.quit() sys.exit() pack_id = self.route[0] self.route = self.route[1:] print("Next package ID:") print(pack_id) dst_package = None for pack in self.warehouse.packages: if pack.id + 1 == pack_id: dst_package = pack break if dst_package is not None: return dst_package def rack_heuristics(self, start, goal, can_place): heur_can_place = not can_place diff_x = pow(goal.x - start.x, 2) diff_y = pow(goal.y - start.y, 2) place_cost = 100 * float(heur_can_place) return round(sqrt(diff_x + diff_y), 3) + float(place_cost) def find_nearest_rack_for(self, package, expand_box=0): weight = package.size storage = "Rack" if package.category == "freezed": storage = "Fridge" start_node = Node(self.x, self.y) quarter_x = int(self.warehouse.width / 4) + expand_box quarter_y = int(self.warehouse.height / 4) + expand_box start_quarter_x = self.x - quarter_x if self.x - quarter_x > 0 else 0 end_quarter_x = self.x + quarter_x if self.x + quarter_x < self.warehouse.width else self.warehouse.width - 1 start_quarter_y = self.y - quarter_y if self.y - quarter_y > 0 else 0 end_quarter_y = self.y + quarter_y if self.y + quarter_y < self.warehouse.height else self.warehouse.height - 1 quarter = [row[start_quarter_y:end_quarter_y] for row in self.warehouse.tiles[start_quarter_x:end_quarter_x]] quarter_racks = [[t for t in row if t.category.name == storage and t.capacity >= weight] for row in quarter] quarter_racks = [t for row in quarter_racks for t in row] racks_costs = [] for rack in quarter_racks: new_node = Node(rack.x_position, rack.y_position) can_place = self.location_classifier.check_if_can_place(package, rack) cost = self.rack_heuristics(start_node, new_node, can_place) if cost > 0: racks_costs.append((rack, cost)) rack = self.find_nearest_rack_for(package, expand_box + 1) if not racks_costs else \ min(racks_costs, key=lambda l: l[1])[0] return rack def pick_up_package(self, pack): self.warehouse.packages.remove(pack) self.is_loaded = True if pack.lays_on_field.category.name in ['Rack', 'Fridge']: pack.lays_on_field.capacity += pack.size self.destination.package = None pack.lays_on_field = None self.transported_package = pack def unload_package(self, rack): pack = self.transported_package tile = self.warehouse.tiles[rack.x][rack.y] self.transported_package = None self.is_loaded = False pack.lays_on_field = tile pack.lays_on_field.capacity -= pack.size pack.status = PackStatus.STORED self.warehouse.packages.append(pack) # print(tile.air_temperature, tile.humidity) def create_graph_map(self): for package1 in self.warehouse.packages: rack = self.find_nearest_rack_for(package1) for package2 in self.warehouse.packages: self.find_path(Node(package2.x, package2.y), Node(rack.x_position, rack.y_position)) self.graph_map[package2.id + 1][package1.id + ceil((len(self.warehouse.packages) / 2)) + 1] = len(self.path) self.graph_map[package1.id + ceil((len(self.warehouse.packages) / 2)) + 1][package2.id + 1] = len(self.path) if package1 == package2: continue self.find_path(Node(package1.x, package1.y), Node(package2.x, package2.y)) self.graph_map[package1.id + 1][package2.id + 1] = len(self.path) self.find_path(Node(package1.x, package1.y)) self.graph_map[package1.id + 1][0] = len(self.path) self.graph_map[0][package1.id + 1] = len(self.path) def trace_route(self): for packs in self.warehouse.packages: print(packs.id) self.route = genetic_trace_route(self.graph_map, len(self.warehouse.packages)) print("best route") print(self.route)