import copy from enum import Enum from typing import List, Tuple from mesa import Model from mesa.space import MultiGrid from mesa.time import RandomActivation from AgentBase import AgentBase from ForkliftAgent import ForkliftAgent from InitialStateFactory import InitialStateFactory from ItemDisplayAgent import ItemDisplayAgent from PatchAgent import PatchAgent from PatchType import PatchType from PictureVisualizationAgent import PictureVisualizationAgent from data.GameConstants import GameConstants from data.Item import Item from data.Order import Order from data.enum.ItemType import ItemType from decision.Action import Action from decision.ActionType import ActionType from genetic_order.GeneticOrder import GeneticOrder from imageClasification.Classificator import image_classification from pathfinding.PathfinderOnStates import PathFinderOnStates, PathFinderState from tree.DecisionTree import DecisionTree from util.PathByEnum import PathByEnum from util.PathDefinitions import GridLocation, GridWithWeights class Phase(Enum): INIT = 1 ITEM_RECOGNITION = 2 CLIENT_SORTING = 3 PLAN_MOVEMENT = 4 EXECUTION = 5 class GameModel(Model): def __init__(self, width, height, graph: GridWithWeights, items: int, orders: int, classificator, item_display_pos: List[GridLocation]): # self.num_agents = 5 self.first = True self.item_recognised = False self.running = True self.grid = MultiGrid(height, width, True) self.schedule = RandomActivation(self) self.current_item_recognition = None self.current_item = None self.client_delivery: PatchAgent = None self.drop_off: PatchAgent = None self.graph = graph self.cut_orders : List[Order] = [] self.game_constants = GameConstants( width, height, graph.walls, graph.puddles ) self.agents = [AgentBase] self.forklift_agent = ForkliftAgent( self, self.game_constants, self.client_delivery, self.drop_off, self.graph ) self.schedule.add(self.forklift_agent) self.agents.append(self.forklift_agent) self.item_display_agents: List[ItemDisplayAgent] = [] # INITIALIZATION # print("############## INITIALIZATION ##############") self.phase = Phase.INIT self.initialize_grid(graph, item_display_pos) self.orderList: List[Order] = InitialStateFactory.generate_order_list(orders) self.fulfilled_orders: List[Order] = [] self.forklift_agent.fulfilled_orders = self.fulfilled_orders self.forklift_agent.set_base(self.drop_off) self.classificator = classificator print("############## RECOGNISE ITEMS ##############") self.phase = Phase.ITEM_RECOGNITION self.provided_items = InitialStateFactory.generate_item_list(items) self.items_for_recognization = copy.deepcopy(self.provided_items) self.recognised_items: List[Item] = [] self.current_order_delivered_items = self.forklift_agent.current_order_delivered_items print("Relocate forklift agent to loading area for item recognition") pathFinder = PathFinderOnStates( self.game_constants, self.drop_off.location, PathFinderState(self.forklift_agent.current_position, self.forklift_agent.current_rotation, 0, Action(ActionType.NONE), []) ) actions = pathFinder.get_action_list() print("PATHFINDING") print(actions) self.forklift_agent.queue_movement_actions(actions) self.current_order = self.forklift_agent.current_order def initialize_grid(self, graph: GridWithWeights, item_display_pos): print("INITIALIZING GRID") # Add the agent to a random grid cell x = 5 y = 5 self.grid.place_agent(self.forklift_agent, (x, y)) self.forklift_agent.current_position = (x, y) self.picture_visualization = PictureVisualizationAgent( self, (1, 11), ) self.schedule.add(self.picture_visualization) self.grid.place_agent(self.picture_visualization, self.picture_visualization.location) self.agents.append(self.picture_visualization) self.place_logistics() self.place_dividers() self.place_walls_agents(graph.walls) self.place_puddles(graph.puddles) self.place_packing_stations(graph.packingStations) self.place_order_items_display(item_display_pos) def place_dividers(self): for i in range(0, 10): for j in range(10, 13): agent = PatchAgent(self, (i, j), PatchType.divider) self.agents.append(agent) self.grid.place_agent(agent, (i, j)) def place_logistics(self): agent = PatchAgent(self, (self.grid.width - 1, int(self.grid.height / 2)), PatchType.pickUp) self.schedule.add(agent) self.grid.place_agent(agent, agent.location) self.agents.append(agent) self.client_delivery = agent self.forklift_agent.client_delivery = self.client_delivery agent = PatchAgent(self, (0, int(self.grid.height / 2)), PatchType.dropOff) self.grid.place_agent(agent, agent.location) self.agents.append(agent) self.drop_off = agent self.forklift_agent.drop_off = self.drop_off def place_walls_agents(self, walls: List[GridLocation]): for w in walls: agent = PatchAgent(self, w, PatchType.wall) self.agents.append(agent) self.grid.place_agent(agent, w) def place_puddles(self, puddles: List[GridLocation]): for p in puddles: agent = PatchAgent(self, p, PatchType.diffTerrain) self.agents.append(agent) self.grid.place_agent(agent, p) def place_packing_stations(self, packing_stations: List[Tuple[PatchType, GridLocation]]): for p in packing_stations: agent = PatchAgent(self, p[1], p[0]) self.agents.append(agent) self.grid.place_agent(agent, p[1]) def place_order_items_display(self, item_positions: List[GridLocation]): for p in item_positions: agent = ItemDisplayAgent(self, p) self.item_display_agents.append(agent) self.grid.place_agent(agent, p) def update_item_display(self): self.current_item = self.forklift_agent.current_item for i in range(4): self.item_display_agents[i].image = None if len(self.forklift_agent.current_order_delivered_items) > i: self.item_display_agents[i].image = self.forklift_agent.current_order_delivered_items[i].image def step(self): self.schedule.step() self.grid.remove_agent(self.forklift_agent) self.grid.place_agent(self.forklift_agent, self.forklift_agent.current_position) self.update_item_display() if self.phase == Phase.ITEM_RECOGNITION: if not self.item_recognised and self.forklift_agent.current_position == self.drop_off.location: if len(self.items_for_recognization) == 0: print("FINISHED ITEM RECOGNITION") self.item_recognised = True self.phase = Phase.CLIENT_SORTING self.forklift_agent.ready_for_execution = True else: print("BEGIN ITEM RECOGNITION, left: {}".format(len(self.items_for_recognization))) item_to_recognise = self.items_for_recognization.pop() self.picture_visualization.img = PathByEnum.get_random_path(item_to_recognise.real_type) recognised = self.recognise_item(item_to_recognise) self.recognised_items.append(recognised) if self.phase == Phase.CLIENT_SORTING: orders: [Order] = self.orderList tree: DecisionTree = DecisionTree() # CLIENT RECOGNITION orders_with_prio = tree.get_data_good(orders) # print("before:" ) # for i in range(len(orders_with_prio)): # print("ORDER {}, PRIO: {}".format(orders_with_prio[i].id, orders_with_prio[i].priority)) # GENERICS SORTING genericOrder: GeneticOrder = GeneticOrder(orders_with_prio) new_orders = genericOrder.get_orders_sorted(orders) # print("after:" ) # for i in range(len(new_orders)): # print("ORDER {}, PRIO: {}".format(new_orders[i].id, new_orders[i].priority)) self.orderList = new_orders self.count_recognised_items() self.sort_orders() self.forklift_agent.orderList = self.orderList print("FINISHED CLIENT ORDER SORTING") self.phase = Phase.EXECUTION if self.phase == Phase.EXECUTION: self.current_order = self.forklift_agent.current_order pass # print("Execution") def sort_orders(self): orders_to_fill: [Order] = [] cut_orders: [Order] = [] for i in range(len(self.orderList)): o: Order = self.orderList[i] refrige = self.count_item_type(o, ItemType.REFRIGERATOR) shelf = self.count_item_type(o, ItemType.SHELF) door = self.count_item_type(o, ItemType.DOOR) if self.count_shelf - shelf >= 0 and self.count_refrige - refrige >= 0 and self.count_door - door >= 0: self.count_shelf -= shelf self.count_door -= door self.count_refrige -= refrige orders_to_fill.append(o) else: cut_orders.append(o) self.cut_orders = cut_orders self.orderList = orders_to_fill self.forklift_agent.orderList = orders_to_fill def count_item_type(self, o: Order, itemType: ItemType) -> int: res = 0 for i in range(len(o.items)): it: Item = o.items[i] if it.guessed_type == itemType: res += 1 return res def count_recognised_items(self): count_refrige: int = 0 count_door: int = 0 count_shelf: int = 0 for i in range(len(self.recognised_items)): item: Item = self.recognised_items[i] if item.guessed_type == ItemType.DOOR: count_door += 1 elif item.guessed_type == ItemType.SHELF: count_shelf += 1 else: count_refrige += 1 self.count_door = count_door self.count_shelf = count_shelf self.count_refrige = count_refrige def recognise_item(self, item: Item): val = image_classification(self.picture_visualization.img, self.classificator) print("VAL: {}".format(val)) if val == ItemType.DOOR: item.guessed_type = ItemType.DOOR elif val == ItemType.REFRIGERATOR: item.guessed_type = ItemType.REFRIGERATOR elif val == ItemType.SHELF: item.guessed_type = ItemType.SHELF return item