import copy from enum import Enum from typing import List, Tuple from mesa import Model from mesa.space import MultiGrid from mesa.time import RandomActivation from AgentBase import AgentBase from ForkliftAgent import ForkliftAgent from InitialStateFactory import InitialStateFactory from PatchAgent import PatchAgent from PatchType import PatchType from PictureVisualizationAgent import PictureVisualizationAgent from data.GameConstants import GameConstants from data.Item import Item from data.Order import Order from data.enum.ItemType import ItemType from decision.Action import Action from decision.ActionType import ActionType from imageClasification.Classificator import image_classification from pathfinding.PathfinderOnStates import PathFinderOnStates, PathFinderState from util.PathByEnum import PathByEnum from util.PathDefinitions import GridLocation, GridWithWeights class Phase(Enum): INIT = 1 ITEM_RECOGNITION = 2 CLIENT_SORTING = 3 PLAN_MOVEMENT = 4 EXECUTION = 5 class GameModel(Model): def __init__(self, width, height, graph: GridWithWeights, items: int, orders: int, classificator): # self.num_agents = 5 self.first = True self.item_recognised = False self.running = True self.grid = MultiGrid(height, width, True) self.schedule = RandomActivation(self) self.current_item_recognition = None self.client_delivery: PatchAgent = None self.drop_off: PatchAgent = None self.graph = graph self.game_constants = GameConstants( width, height, graph.walls, graph.puddles ) self.agents = [AgentBase] self.forklift_agent = ForkliftAgent( self, self.game_constants, self.client_delivery, self.drop_off, self.graph ) self.schedule.add(self.forklift_agent) self.agents.append(self.forklift_agent) # INITIALIZATION # print("############## INITIALIZATION ##############") self.phase = Phase.INIT self.initialize_grid(graph) self.orderList: List[Order] = InitialStateFactory.generate_order_list(orders) self.fulfilled_orders: List[Order] = [] self.forklift_agent.orderList = self.orderList self.forklift_agent.fulfilled_orders = self.fulfilled_orders self.classificator = classificator print("############## RECOGNISE ITEMS ##############") self.phase = Phase.ITEM_RECOGNITION self.provided_items = InitialStateFactory.generate_item_list(items) self.items_for_recognization = copy.deepcopy(self.provided_items) self.recognised_items: List[Item] = [] print("Relocate forklift agent to loading area for item recognition") pathFinder = PathFinderOnStates( self.game_constants, self.drop_off.location, PathFinderState(self.forklift_agent.current_position, self.forklift_agent.current_rotation, 0, Action(ActionType.NONE), []) ) actions = pathFinder.get_action_list() print("PATHFINDING") print(actions) self.forklift_agent.queue_movement_actions(actions) def initialize_grid(self, graph: GridWithWeights): print("INITIALIZING GRID") # Add the agent to a random grid cell x = 5 y = 5 self.grid.place_agent(self.forklift_agent, (x, y)) self.forklift_agent.current_position = (x, y) self.picture_visualization = PictureVisualizationAgent( self, (1, 11), ) self.schedule.add(self.picture_visualization) self.grid.place_agent(self.picture_visualization, self.picture_visualization.location) self.agents.append(self.picture_visualization) self.place_logistics() self.place_dividers() self.place_walls_agents(graph.walls) self.place_puddles(graph.puddles) self.place_packing_stations(graph.packingStations) def place_dividers(self): for i in range(0, 10): for j in range(10, 13): agent = PatchAgent(self, (i, j), PatchType.divider) self.agents.append(agent) self.grid.place_agent(agent, (i, j)) def place_logistics(self): agent = PatchAgent(self, (self.grid.width - 1, int(self.grid.height / 2)), PatchType.pickUp) self.schedule.add(agent) self.grid.place_agent(agent, agent.location) self.agents.append(agent) self.client_delivery = agent self.forklift_agent.client_delivery = self.client_delivery agent = PatchAgent(self, (0, int(self.grid.height / 2)), PatchType.dropOff) self.grid.place_agent(agent, agent.location) self.agents.append(agent) self.drop_off = agent self.forklift_agent.drop_off = self.drop_off def place_walls_agents(self, walls: List[GridLocation]): for w in walls: agent = PatchAgent(self, w, PatchType.wall) self.agents.append(agent) self.grid.place_agent(agent, w) def place_puddles(self, puddles: List[GridLocation]): for p in puddles: agent = PatchAgent(self, p, PatchType.diffTerrain) self.agents.append(agent) self.grid.place_agent(agent, p) def place_packing_stations(self, packing_stations: List[Tuple[PatchType, GridLocation]]): for p in packing_stations: agent = PatchAgent(self, p[1], p[0]) self.agents.append(agent) self.grid.place_agent(agent, p[1]) def step(self): self.schedule.step() self.grid.remove_agent(self.forklift_agent) self.grid.place_agent(self.forklift_agent, self.forklift_agent.current_position) if self.phase == Phase.ITEM_RECOGNITION: if not self.item_recognised and self.forklift_agent.current_position == self.drop_off.location: if len(self.items_for_recognization) == 0: print("FINISHED ITEM RECOGNITION") self.item_recognised = True self.phase = Phase.CLIENT_SORTING self.forklift_agent.ready_for_execution = True else: print("BEGIN ITEM RECOGNITION, left: {}".format(len(self.items_for_recognization))) item_to_recognise = self.items_for_recognization.pop() self.picture_visualization.img = PathByEnum.get_random_path(item_to_recognise.real_type) recognised = self.recognise_item(item_to_recognise) self.recognised_items.append(recognised) if self.phase == Phase.CLIENT_SORTING: # TODO GENERICS SORTING sorted(self.orderList, key=lambda x: len(x.items)) print("FINISHED CLIENT ORDER SORTING") self.phase = Phase.EXECUTION if self.phase == Phase.EXECUTION: print("Execution") def recognise_item(self, item: Item): # TODO IMAGE PROCESSING val = image_classification(self.picture_visualization.img, self.classificator) print("VAL: {}".format(val)) if val == ItemType.DOOR: item.guessed_type = ItemType.DOOR elif val == ItemType.REFRIGERATOR: item.guessed_type = ItemType.REFRIGERATOR elif val == ItemType.SHELF: item.guessed_type = ItemType.SHELF return item