215 lines
7.7 KiB
Python
215 lines
7.7 KiB
Python
import copy
|
|
from enum import Enum
|
|
from typing import List, Tuple
|
|
|
|
from mesa import Model
|
|
from mesa.space import MultiGrid
|
|
from mesa.time import RandomActivation
|
|
|
|
from AgentBase import AgentBase
|
|
from ForkliftAgent import ForkliftAgent
|
|
from InitialStateFactory import InitialStateFactory
|
|
from PatchAgent import PatchAgent
|
|
from PatchType import PatchType
|
|
from PictureVisualizationAgent import PictureVisualizationAgent
|
|
from data.GameConstants import GameConstants
|
|
from data.Item import Item
|
|
from data.Order import Order
|
|
from data.enum.ItemType import ItemType
|
|
from decision.Action import Action
|
|
from decision.ActionType import ActionType
|
|
from genetic_order.GeneticOrder import GeneticOrder
|
|
from imageClasification.Classificator import image_classification
|
|
from pathfinding.PathfinderOnStates import PathFinderOnStates, PathFinderState
|
|
from tree.DecisionTree import DecisionTree
|
|
from util.PathByEnum import PathByEnum
|
|
from util.PathDefinitions import GridLocation, GridWithWeights
|
|
|
|
|
|
class Phase(Enum):
|
|
INIT = 1
|
|
ITEM_RECOGNITION = 2
|
|
CLIENT_SORTING = 3
|
|
PLAN_MOVEMENT = 4
|
|
EXECUTION = 5
|
|
|
|
|
|
class GameModel(Model):
|
|
|
|
def __init__(self, width, height, graph: GridWithWeights, items: int, orders: int, classificator):
|
|
# self.num_agents = 5
|
|
self.first = True
|
|
self.item_recognised = False
|
|
self.running = True
|
|
self.grid = MultiGrid(height, width, True)
|
|
self.schedule = RandomActivation(self)
|
|
self.current_item_recognition = None
|
|
|
|
self.client_delivery: PatchAgent = None
|
|
self.drop_off: PatchAgent = None
|
|
self.graph = graph
|
|
|
|
self.game_constants = GameConstants(
|
|
width,
|
|
height,
|
|
graph.walls,
|
|
graph.puddles
|
|
)
|
|
|
|
self.agents = [AgentBase]
|
|
|
|
self.forklift_agent = ForkliftAgent(
|
|
self,
|
|
self.game_constants,
|
|
self.client_delivery,
|
|
self.drop_off,
|
|
self.graph
|
|
)
|
|
|
|
self.schedule.add(self.forklift_agent)
|
|
self.agents.append(self.forklift_agent)
|
|
|
|
# INITIALIZATION #
|
|
print("############## INITIALIZATION ##############")
|
|
self.phase = Phase.INIT
|
|
self.initialize_grid(graph)
|
|
self.orderList: List[Order] = InitialStateFactory.generate_order_list(orders)
|
|
self.fulfilled_orders: List[Order] = []
|
|
self.forklift_agent.orderList = self.orderList
|
|
self.forklift_agent.fulfilled_orders = self.fulfilled_orders
|
|
self.classificator = classificator
|
|
|
|
print("############## RECOGNISE ITEMS ##############")
|
|
self.phase = Phase.ITEM_RECOGNITION
|
|
self.provided_items = InitialStateFactory.generate_item_list(items)
|
|
self.items_for_recognization = copy.deepcopy(self.provided_items)
|
|
self.recognised_items: List[Item] = []
|
|
|
|
print("Relocate forklift agent to loading area for item recognition")
|
|
|
|
pathFinder = PathFinderOnStates(
|
|
self.game_constants,
|
|
self.drop_off.location,
|
|
PathFinderState(self.forklift_agent.current_position, self.forklift_agent.current_rotation, 0,
|
|
Action(ActionType.NONE), [])
|
|
)
|
|
|
|
actions = pathFinder.get_action_list()
|
|
print("PATHFINDING")
|
|
print(actions)
|
|
self.forklift_agent.queue_movement_actions(actions)
|
|
|
|
def initialize_grid(self, graph: GridWithWeights):
|
|
print("INITIALIZING GRID")
|
|
# Add the agent to a random grid cell
|
|
x = 5
|
|
y = 5
|
|
self.grid.place_agent(self.forklift_agent, (x, y))
|
|
self.forklift_agent.current_position = (x, y)
|
|
|
|
self.picture_visualization = PictureVisualizationAgent(
|
|
self,
|
|
(1, 11),
|
|
)
|
|
self.schedule.add(self.picture_visualization)
|
|
self.grid.place_agent(self.picture_visualization, self.picture_visualization.location)
|
|
self.agents.append(self.picture_visualization)
|
|
|
|
self.place_logistics()
|
|
self.place_dividers()
|
|
self.place_walls_agents(graph.walls)
|
|
self.place_puddles(graph.puddles)
|
|
self.place_packing_stations(graph.packingStations)
|
|
|
|
def place_dividers(self):
|
|
for i in range(0, 10):
|
|
for j in range(10, 13):
|
|
agent = PatchAgent(self, (i, j), PatchType.divider)
|
|
self.agents.append(agent)
|
|
self.grid.place_agent(agent, (i, j))
|
|
|
|
def place_logistics(self):
|
|
agent = PatchAgent(self, (self.grid.width - 1, int(self.grid.height / 2)), PatchType.pickUp)
|
|
self.schedule.add(agent)
|
|
self.grid.place_agent(agent, agent.location)
|
|
self.agents.append(agent)
|
|
self.client_delivery = agent
|
|
self.forklift_agent.client_delivery = self.client_delivery
|
|
|
|
agent = PatchAgent(self, (0, int(self.grid.height / 2)), PatchType.dropOff)
|
|
self.grid.place_agent(agent, agent.location)
|
|
self.agents.append(agent)
|
|
self.drop_off = agent
|
|
self.forklift_agent.drop_off = self.drop_off
|
|
|
|
def place_walls_agents(self, walls: List[GridLocation]):
|
|
for w in walls:
|
|
agent = PatchAgent(self, w, PatchType.wall)
|
|
self.agents.append(agent)
|
|
self.grid.place_agent(agent, w)
|
|
|
|
def place_puddles(self, puddles: List[GridLocation]):
|
|
for p in puddles:
|
|
agent = PatchAgent(self, p, PatchType.diffTerrain)
|
|
self.agents.append(agent)
|
|
self.grid.place_agent(agent, p)
|
|
|
|
def place_packing_stations(self, packing_stations: List[Tuple[PatchType, GridLocation]]):
|
|
for p in packing_stations:
|
|
agent = PatchAgent(self, p[1], p[0])
|
|
self.agents.append(agent)
|
|
self.grid.place_agent(agent, p[1])
|
|
|
|
def step(self):
|
|
self.schedule.step()
|
|
self.grid.remove_agent(self.forklift_agent)
|
|
self.grid.place_agent(self.forklift_agent, self.forklift_agent.current_position)
|
|
|
|
if self.phase == Phase.ITEM_RECOGNITION:
|
|
if not self.item_recognised and self.forklift_agent.current_position == self.drop_off.location:
|
|
|
|
if len(self.items_for_recognization) == 0:
|
|
print("FINISHED ITEM RECOGNITION")
|
|
self.item_recognised = True
|
|
self.phase = Phase.CLIENT_SORTING
|
|
self.forklift_agent.ready_for_execution = True
|
|
else:
|
|
print("BEGIN ITEM RECOGNITION, left: {}".format(len(self.items_for_recognization)))
|
|
item_to_recognise = self.items_for_recognization.pop()
|
|
self.picture_visualization.img = PathByEnum.get_random_path(item_to_recognise.real_type)
|
|
recognised = self.recognise_item(item_to_recognise)
|
|
self.recognised_items.append(recognised)
|
|
|
|
if self.phase == Phase.CLIENT_SORTING:
|
|
orders: [Order] = self.orderList
|
|
tree: DecisionTree = DecisionTree()
|
|
readyTree = tree.get_decision_tree()
|
|
|
|
|
|
# TODO CLIENT RECOGNITION
|
|
|
|
# GENERICS SORTING
|
|
genericOrder: GeneticOrder = GeneticOrder(orders)
|
|
self.orderList = genericOrder.get_orders_sorted(orders)
|
|
|
|
|
|
print("FINISHED CLIENT ORDER SORTING")
|
|
self.phase = Phase.EXECUTION
|
|
|
|
if self.phase == Phase.EXECUTION:
|
|
print("Execution")
|
|
|
|
def recognise_item(self, item: Item):
|
|
# TODO IMAGE PROCESSING
|
|
val = image_classification(self.picture_visualization.img, self.classificator)
|
|
print("VAL: {}".format(val))
|
|
|
|
if val == ItemType.DOOR:
|
|
item.guessed_type = ItemType.DOOR
|
|
elif val == ItemType.REFRIGERATOR:
|
|
item.guessed_type = ItemType.REFRIGERATOR
|
|
elif val == ItemType.SHELF:
|
|
item.guessed_type = ItemType.SHELF
|
|
|
|
return item
|