SI_InteligentnyWozekWidlowy/GameModel.py

302 lines
11 KiB
Python

import copy
from enum import Enum
from typing import List, Tuple
from mesa import Model
from mesa.space import MultiGrid
from mesa.time import RandomActivation
from AgentBase import AgentBase
from ForkliftAgent import ForkliftAgent
from InitialStateFactory import InitialStateFactory
from ItemDisplayAgent import ItemDisplayAgent
from PatchAgent import PatchAgent
from PatchType import PatchType
from PictureVisualizationAgent import PictureVisualizationAgent
from data.GameConstants import GameConstants
from data.Item import Item
from data.Order import Order
from data.enum.ItemType import ItemType
from decision.Action import Action
from decision.ActionType import ActionType
from genetic_order.GeneticOrder import GeneticOrder
from imageClasification.Classificator import image_classification
from pathfinding.PathfinderOnStates import PathFinderOnStates, PathFinderState
from tree.DecisionTree import DecisionTree
from util.PathByEnum import PathByEnum
from util.PathDefinitions import GridLocation, GridWithWeights
class Phase(Enum):
INIT = 1
ITEM_RECOGNITION = 2
CLIENT_SORTING = 3
PLAN_MOVEMENT = 4
EXECUTION = 5
class GameModel(Model):
def __init__(self, width, height, graph: GridWithWeights, items: int, orders: int, classificator,
item_display_pos: List[GridLocation]):
# self.num_agents = 5
self.first = True
self.item_recognised = False
self.running = True
self.grid = MultiGrid(height, width, True)
self.schedule = RandomActivation(self)
self.current_item_recognition = None
self.current_item = None
self.client_delivery: PatchAgent = None
self.drop_off: PatchAgent = None
self.graph = graph
self.cut_orders : List[Order] = []
self.game_constants = GameConstants(
width,
height,
graph.walls,
graph.puddles
)
self.agents = [AgentBase]
self.forklift_agent = ForkliftAgent(
self,
self.game_constants,
self.client_delivery,
self.drop_off,
self.graph
)
self.schedule.add(self.forklift_agent)
self.agents.append(self.forklift_agent)
self.item_display_agents: List[ItemDisplayAgent] = []
# INITIALIZATION #
print("############## INITIALIZATION ##############")
self.phase = Phase.INIT
self.initialize_grid(graph, item_display_pos)
self.orderList: List[Order] = InitialStateFactory.generate_order_list(orders)
self.fulfilled_orders: List[Order] = []
self.forklift_agent.fulfilled_orders = self.fulfilled_orders
self.forklift_agent.set_base(self.drop_off)
self.classificator = classificator
print("############## RECOGNISE ITEMS ##############")
self.phase = Phase.ITEM_RECOGNITION
self.provided_items = InitialStateFactory.generate_item_list(items)
self.items_for_recognization = copy.deepcopy(self.provided_items)
self.recognised_items: List[Item] = []
self.current_order_delivered_items = self.forklift_agent.current_order_delivered_items
print("Relocate forklift agent to loading area for item recognition")
pathFinder = PathFinderOnStates(
self.game_constants,
self.drop_off.location,
PathFinderState(self.forklift_agent.current_position, self.forklift_agent.current_rotation, 0,
Action(ActionType.NONE), [])
)
actions = pathFinder.get_action_list()
print("PATHFINDING")
print(actions)
self.forklift_agent.queue_movement_actions(actions)
self.current_order = self.forklift_agent.current_order
def initialize_grid(self, graph: GridWithWeights, item_display_pos):
print("INITIALIZING GRID")
# Add the agent to a random grid cell
x = 5
y = 5
self.grid.place_agent(self.forklift_agent, (x, y))
self.forklift_agent.current_position = (x, y)
self.picture_visualization = PictureVisualizationAgent(
self,
(1, 11),
)
self.schedule.add(self.picture_visualization)
self.grid.place_agent(self.picture_visualization, self.picture_visualization.location)
self.agents.append(self.picture_visualization)
self.place_logistics()
self.place_dividers()
self.place_walls_agents(graph.walls)
self.place_puddles(graph.puddles)
self.place_packing_stations(graph.packingStations)
self.place_order_items_display(item_display_pos)
def place_dividers(self):
for i in range(0, 10):
for j in range(10, 13):
agent = PatchAgent(self, (i, j), PatchType.divider)
self.agents.append(agent)
self.grid.place_agent(agent, (i, j))
def place_logistics(self):
agent = PatchAgent(self, (self.grid.width - 1, int(self.grid.height / 2)), PatchType.pickUp)
self.schedule.add(agent)
self.grid.place_agent(agent, agent.location)
self.agents.append(agent)
self.client_delivery = agent
self.forklift_agent.client_delivery = self.client_delivery
agent = PatchAgent(self, (0, int(self.grid.height / 2)), PatchType.dropOff)
self.grid.place_agent(agent, agent.location)
self.agents.append(agent)
self.drop_off = agent
self.forklift_agent.drop_off = self.drop_off
def place_walls_agents(self, walls: List[GridLocation]):
for w in walls:
agent = PatchAgent(self, w, PatchType.wall)
self.agents.append(agent)
self.grid.place_agent(agent, w)
def place_puddles(self, puddles: List[GridLocation]):
for p in puddles:
agent = PatchAgent(self, p, PatchType.diffTerrain)
self.agents.append(agent)
self.grid.place_agent(agent, p)
def place_packing_stations(self, packing_stations: List[Tuple[PatchType, GridLocation]]):
for p in packing_stations:
agent = PatchAgent(self, p[1], p[0])
self.agents.append(agent)
self.grid.place_agent(agent, p[1])
def place_order_items_display(self, item_positions: List[GridLocation]):
for p in item_positions:
agent = ItemDisplayAgent(self, p)
self.item_display_agents.append(agent)
self.grid.place_agent(agent, p)
def update_item_display(self):
self.current_item = self.forklift_agent.current_item
for i in range(4):
self.item_display_agents[i].image = None
if len(self.forklift_agent.current_order_delivered_items) > i:
self.item_display_agents[i].image = self.forklift_agent.current_order_delivered_items[i].image
def step(self):
self.schedule.step()
self.grid.remove_agent(self.forklift_agent)
self.grid.place_agent(self.forklift_agent, self.forklift_agent.current_position)
self.update_item_display()
if self.phase == Phase.ITEM_RECOGNITION:
if not self.item_recognised and self.forklift_agent.current_position == self.drop_off.location:
if len(self.items_for_recognization) == 0:
print("FINISHED ITEM RECOGNITION")
self.item_recognised = True
self.phase = Phase.CLIENT_SORTING
self.forklift_agent.ready_for_execution = True
else:
print("BEGIN ITEM RECOGNITION, left: {}".format(len(self.items_for_recognization)))
item_to_recognise = self.items_for_recognization.pop()
self.picture_visualization.img = PathByEnum.get_random_path(item_to_recognise.real_type)
recognised = self.recognise_item(item_to_recognise)
self.recognised_items.append(recognised)
if self.phase == Phase.CLIENT_SORTING:
orders: [Order] = self.orderList
tree: DecisionTree = DecisionTree()
# CLIENT RECOGNITION
orders_with_prio = tree.get_data_good(orders)
# print("before:" )
# for i in range(len(orders_with_prio)):
# print("ORDER {}, PRIO: {}".format(orders_with_prio[i].id, orders_with_prio[i].priority))
# GENERICS SORTING
genericOrder: GeneticOrder = GeneticOrder(orders_with_prio)
new_orders = genericOrder.get_orders_sorted(orders)
# print("after:" )
# for i in range(len(new_orders)):
# print("ORDER {}, PRIO: {}".format(new_orders[i].id, new_orders[i].priority))
self.orderList = new_orders
self.count_recognised_items()
self.sort_orders()
self.forklift_agent.orderList = self.orderList
print("FINISHED CLIENT ORDER SORTING")
self.phase = Phase.EXECUTION
if self.phase == Phase.EXECUTION:
self.current_order = self.forklift_agent.current_order
pass
# print("Execution")
def sort_orders(self):
orders_to_fill: [Order] = []
cut_orders: [Order] = []
for i in range(len(self.orderList)):
o: Order = self.orderList[i]
refrige = self.count_item_type(o, ItemType.REFRIGERATOR)
shelf = self.count_item_type(o, ItemType.SHELF)
door = self.count_item_type(o, ItemType.DOOR)
if self.count_shelf - shelf >= 0 and self.count_refrige - refrige >= 0 and self.count_door - door >= 0:
self.count_shelf -= shelf
self.count_door -= door
self.count_refrige -= refrige
orders_to_fill.append(o)
else:
cut_orders.append(o)
self.cut_orders = cut_orders
self.orderList = orders_to_fill
self.forklift_agent.orderList = orders_to_fill
def count_item_type(self, o: Order, itemType: ItemType) -> int:
res = 0
for i in range(len(o.items)):
it: Item = o.items[i]
if it.guessed_type == itemType:
res += 1
return res
def count_recognised_items(self):
count_refrige: int = 0
count_door: int = 0
count_shelf: int = 0
for i in range(len(self.recognised_items)):
item: Item = self.recognised_items[i]
if item.guessed_type == ItemType.DOOR:
count_door += 1
elif item.guessed_type == ItemType.SHELF:
count_shelf += 1
else:
count_refrige += 1
self.count_door = count_door
self.count_shelf = count_shelf
self.count_refrige = count_refrige
def recognise_item(self, item: Item):
val = image_classification(self.picture_visualization.img, self.classificator)
print("VAL: {}".format(val))
if val == ItemType.DOOR:
item.guessed_type = ItemType.DOOR
elif val == ItemType.REFRIGERATOR:
item.guessed_type = ItemType.REFRIGERATOR
elif val == ItemType.SHELF:
item.guessed_type = ItemType.SHELF
return item