from enum import Enum from typing import List from mesa import Model from mesa.space import MultiGrid from mesa.time import RandomActivation from AgentBase import AgentBase from ForkliftAgent import ForkliftAgent from InitialStateFactory import InitialStateFactory from PatchAgent import PatchAgent from PatchType import PatchType from data.GameConstants import GameConstants from data.Item import Item from data.Order import Order from data.enum.Direction import Direction from data.enum.ItemType import ItemType from decision.Action import Action from decision.ActionType import ActionType from pathfinding.PathfinderOnStates import PathFinderOnStates, PathFinderState from util.PathDefinitions import GridLocation, GridWithWeights class Phase(Enum): INIT = 1 ITEM_RECOGNITION = 2 CLIENT_SORTING = 3 PLAN_MOVEMENT = 4 EXECUTION = 5 class GameModel(Model): def __init__(self, width, height, graph: GridWithWeights): # self.num_agents = 5 self.first = True self.item_recognised = False self.running = True self.grid = MultiGrid(height, width, True) self.schedule = RandomActivation(self) self.client_delivery: PatchAgent = None self.drop_off: PatchAgent = None self.agents = [AgentBase] self.forklift_agent = ForkliftAgent(self) self.schedule.add(self.forklift_agent) self.agents.append(self.forklift_agent) self.graph = graph self.game_constants = GameConstants( width, height, graph.walls, graph.puddles ) # INITIALIZATION # print("############## INITIALIZATION ##############") self.phase = Phase.INIT self.initialize_grid(graph) self.orderList: List[Order] = InitialStateFactory.generate_order_list(3) print("############## RECOGNISE ITEMS ##############") self.phase = Phase.ITEM_RECOGNITION self.provided_items = InitialStateFactory.generate_item_list(3) self.recognised_items: List[Item] = [] print("Relocate forklift agent to loading area for item recognition") pathFinder = PathFinderOnStates( self.game_constants, self.drop_off.location, PathFinderState(self.forklift_agent.current_position, self.forklift_agent.current_rotation, 0, Action(ActionType.NONE), []) ) actions = pathFinder.get_action_list() print("PATHFINDING") print(actions) self.forklift_agent.queue_movement_actions(actions) def initialize_grid(self, graph: GridWithWeights): print("INITIALIZING GRID") # Add the agent to a random grid cell x = 5 y = 5 self.grid.place_agent(self.forklift_agent, (x, y)) self.forklift_agent.current_position = (x, y) self.place_logistics() self.place_walls_agents(graph.walls) self.place_puddles(graph.puddles) self.place_packing_stations(graph.packingStations) def place_logistics(self): agent = PatchAgent(self, (self.grid.width - 1, int(self.grid.height / 2)), PatchType.pickUp) self.schedule.add(agent) self.grid.place_agent(agent, agent.location) self.agents.append(agent) self.client_delivery = agent agent = PatchAgent(self, (0, int(self.grid.height / 2)), PatchType.dropOff) self.grid.place_agent(agent, agent.location) self.agents.append(agent) self.drop_off = agent def place_walls_agents(self, walls: List[GridLocation]): for w in walls: agent = PatchAgent(self, w, PatchType.wall) self.agents.append(agent) self.grid.place_agent(agent, w) def place_puddles(self, puddles: List[GridLocation]): for p in puddles: agent = PatchAgent(self, p, PatchType.diffTerrain) self.agents.append(agent) self.grid.place_agent(agent, p) def place_packing_stations(self, packing_stations: List[tuple[PatchType, GridLocation]]): for p in packing_stations: agent = PatchAgent(self, p[1], p[0]) self.agents.append(agent) self.grid.place_agent(agent, p[1]) def step(self): self.schedule.step() self.grid.remove_agent(self.forklift_agent) self.grid.place_agent(self.forklift_agent, self.forklift_agent.current_position) if self.phase == Phase.ITEM_RECOGNITION: if not self.item_recognised and self.forklift_agent.current_position == self.drop_off.location: if len(self.provided_items) == 0: print("FINISHED ITEM RECOGNITION") self.item_recognised = True self.phase = Phase.CLIENT_SORTING else: print("BEGIN ITEM RECOGNITION, left: {}".format(len(self.provided_items))) item_to_recognise = self.provided_items.pop() recognised = self.recognise_item(item_to_recognise) self.recognised_items.append(recognised) if self.phase == Phase.CLIENT_SORTING: # TODO GENERICS SORTING sorted(self.orderList, key=lambda x: len(x.items)) print("FINISHED CLIENT ORDER SORTING") self.phase = Phase.PLAN_MOVEMENT if self.phase == Phase.PLAN_MOVEMENT: if len(self.orderList) > 0: self.plan_actions() self.phase = Phase.EXECUTION else: print("NO MORE ORDERS") if self.phase == Phase.EXECUTION: print("Execution") if (len(self.forklift_agent.action_queue) == 0): self.phase = Phase.PLAN_MOVEMENT def plan_actions(self): order = self.orderList.pop(0) print("PLAN MOVEMENT") for i in order.items: if self.first: # get item pathFinder = PathFinderOnStates( self.game_constants, self.drop_off.location, PathFinderState(self.forklift_agent.current_position, self.forklift_agent.current_rotation, 0, Action(ActionType.NONE), []) ) actions = pathFinder.get_action_list() self.forklift_agent.queue_movement_actions(actions) self.first = False else: # get item pathFinder = PathFinderOnStates( self.game_constants, self.drop_off.location, PathFinderState(self.client_delivery.location, Direction.right, 0, Action(ActionType.NONE), []) ) actions = pathFinder.get_action_list() self.forklift_agent.queue_movement_actions(actions) self.first = False # go through station packing_station: GridLocation = None stations = dict(self.graph.packingStations) if i.real_type == ItemType.PASTA: packing_station = stations[PatchType.packingA] elif i.real_type == ItemType.EGG: packing_station = stations[PatchType.packingB] elif i.real_type == ItemType.PIZZA: packing_station = stations[PatchType.packingC] pathFinder = PathFinderOnStates( self.game_constants, packing_station, PathFinderState(self.drop_off.location, Direction.left, 0, Action( desired_item=i, action_type=ActionType.PICK_ITEM ), []) ) actions = pathFinder.get_action_list() self.forklift_agent.queue_movement_actions(actions) # go to client delivery area pathFinder = PathFinderOnStates( self.game_constants, self.client_delivery.location, PathFinderState(packing_station, Direction.right, 0, Action(ActionType.NONE), []) ) actions = pathFinder.get_action_list() self.forklift_agent.queue_movement_actions(actions) self.forklift_agent.queue_movement_actions( [Action(ActionType.DROP_ITEM)] ) def recognise_item(self, item: Item): # TODO IMAGE PROCESSING item.guessed_type = item.real_type return item