diff --git a/ForkliftAgent.py b/ForkliftAgent.py index d762986..f884155 100644 --- a/ForkliftAgent.py +++ b/ForkliftAgent.py @@ -1,8 +1,54 @@ -from mesa import Agent, Model +from typing import Tuple + +from mesa import Agent + +from data.Direction import Direction class ForkliftAgent(Agent): def __init__(self, unique_id, model): super().__init__(unique_id, model) - print("Created forklift Agent with ID: {}".format(unique_id)) \ No newline at end of file + self.movement_queue = [Tuple[int, int]] + self.current_position = Tuple[int, int] + self.current_rotation = Direction.right + print("Created forklift Agent with ID: {}".format(unique_id)) + + def assign_new_movement_task(self, movement_list): + self.movement_queue = [] + + for m in movement_list: + self.movement_queue.append(m) + + print("Assigned new movement queue to forklift agent") + + def get_proper_rotation(self, next_pos: Tuple[int, int]) -> Direction: + + if next_pos[0] < self.current_position[0]: + return Direction.left + elif next_pos[0] > self.current_position[0]: + return Direction.right + elif next_pos[1] > self.current_position[1]: + return Direction.top + elif next_pos[1] < self.current_position[1]: + return Direction.down + elif next_pos == self.current_position: + return self.current_rotation + + def move(self): + if len(self.movement_queue) > 0: + next_pos = self.movement_queue.pop(0) + + dir = self.get_proper_rotation(next_pos) + + if dir == self.current_rotation: + print("move {} --> {}".format(self.current_position, next_pos)) + self.current_position = next_pos + else: + print("rotate {} --> {}".format(self.current_rotation, dir)) + self.current_rotation = dir + self.movement_queue.insert(0, next_pos) + + def step(self) -> None: + print("forklift step") + self.move() diff --git a/ForkliftModel.py b/ForkliftModel.py deleted file mode 100644 index dbe2d44..0000000 --- a/ForkliftModel.py +++ /dev/null @@ -1,58 +0,0 @@ -import random - -from mesa import Agent, Model -from mesa.time import RandomActivation -from mesa.space import MultiGrid -from mesa.datacollection import DataCollector - -from PatchType import PatchType -from ForkliftAgent import ForkliftAgent -from PatchAgent import PatchAgent - - -class ForkliftModel(Model): - - def __init__(self, width, height): - # self.num_agents = 5 - self.running = True - self.grid = MultiGrid(height, width, True) - self.schedule = RandomActivation(self) - # self.datacollector = DataCollector( - # model_reporters={"Gini": compute_gini}, - # agent_reporters={"Wealth": lambda a: a.wealth} - # ) - - agent = ForkliftAgent(0, self) - self.schedule.add(agent) - # Add the agent to a random grid cell - x = 5 - y = 5 - self.grid.place_agent(agent, (x, y)) - - agent = PatchAgent(1, self, PatchType.pickUp) - self.schedule.add(agent) - self.grid.place_agent(agent, (self.grid.width-1, self.grid.height-1)) - - agent = PatchAgent(2, self, PatchType.dropOff) - self.schedule.add(agent) - self.grid.place_agent(agent, (0, self.grid.height-1)) - - for i in range(3): - a = PatchAgent(i+3, self, PatchType.item) - self.schedule.add(a) - self.grid.place_agent(a, (i, 0)) - - # Create patch agents - # for i in range(self.num_agents): - # a = PatchAgent(i, self) - # self.schedule.add(a) - # # Add the agent to a random grid cell - # x = random.randrange(self.grid.width) - # y = random.randrange(self.grid.height) - # self.grid.place_agent(a, (x, y)) - - - - def step(self): - # self.datacollector.collect(self) - self.schedule.step() diff --git a/GameModel.py b/GameModel.py new file mode 100644 index 0000000..fbc983f --- /dev/null +++ b/GameModel.py @@ -0,0 +1,66 @@ +from mesa import Model +from mesa.space import MultiGrid +from mesa.time import RandomActivation + +from ForkliftAgent import ForkliftAgent +from PatchAgent import PatchAgent +from PatchType import PatchType +from util.PathDefinitions import inverse_y +from util.PathVisualiser import draw_grid, reconstruct_path +from util.Pathfinder import a_star_search + + +class GameModel(Model): + + def __init__(self, width, height, graph): + # self.num_agents = 5 + self.running = True + self.grid = MultiGrid(height, width, True) + self.schedule = RandomActivation(self) + self.agents = [] + + self.forklift_agent = ForkliftAgent(0, self) + self.schedule.add(self.forklift_agent) + self.agents.append(self.forklift_agent) + + # Add the agent to a random grid cell + x = 5 + y = 5 + self.grid.place_agent(self.forklift_agent, (x, y)) + self.forklift_agent.current_position = (x, y) + + start, goal = (x, y), (2, 1) + came_from, cost_so_far = a_star_search(graph, start, goal) + draw_grid(graph, point_to=came_from, start=start, goal=goal) + + path = map(lambda t: (t[0], inverse_y(height, t[1])), + reconstruct_path(came_from=came_from, start=start, goal=goal)) + + print("cam from: {}".format(came_from)) + print("costerino: {}".format(cost_so_far)) + draw_grid(graph, path=reconstruct_path(came_from, start=start, goal=goal)) + + self.forklift_agent.assign_new_movement_task(path) + + agent = PatchAgent(1, self, PatchType.pickUp) + # self.schedule.add(agent) + self.grid.place_agent(agent, (self.grid.width - 1, self.grid.height - 1)) + self.agents.append(agent) + + agent = PatchAgent(2, self, PatchType.dropOff) + # self.schedule.add(agent) + self.grid.place_agent(agent, (0, self.grid.height - 1)) + self.agents.append(agent) + + for i in range(3): + a = PatchAgent(i + 3, self, PatchType.item) + self.agents.append(a) + self.grid.place_agent(a, (i, 0)) + + def step(self): + self.schedule.step() + + print("update multiGrid") + + self.grid.remove_agent(self.forklift_agent) + self.grid.place_agent(self.forklift_agent, self.forklift_agent.current_position) diff --git a/data/Direction.py b/data/Direction.py new file mode 100644 index 0000000..8dc03ee --- /dev/null +++ b/data/Direction.py @@ -0,0 +1,8 @@ +from enum import Enum + + +class Direction(Enum): + top = 1 + right = 2 + down = 3 + left = 4 diff --git a/img/image.png b/img/image_left.png similarity index 100% rename from img/image.png rename to img/image_left.png diff --git a/main.py b/main.py index 92eb7fa..49ccdf4 100644 --- a/main.py +++ b/main.py @@ -1,50 +1,65 @@ import random -from mesa.visualization.modules import CanvasGrid from mesa.visualization.ModularVisualization import ModularServer +from mesa.visualization.modules import CanvasGrid -from ForkliftModel import ForkliftModel from ForkliftAgent import ForkliftAgent +from GameModel import GameModel from PatchAgent import PatchAgent from PatchType import PatchType +from data.Direction import Direction +from util.PathDefinitions import GridWithWeights colors = [ 'blue', 'cyan', 'orange', 'yellow', 'magenta', 'purple', '#103d3e', '#9fc86c', '#b4c2ed', '#31767d', '#31a5fa', '#ba96e0', '#fef3e4', '#6237ac', '#f9cacd', '#1e8123' ] + def agent_portrayal(agent): - - if isinstance(agent, ForkliftAgent): - portrayal = {"Shape": "image.png", "scale": 1.0, "Layer": 0} + shape = "" + if agent.current_rotation == Direction.top: + shape = "img/image_top.png" + elif agent.current_rotation == Direction.right: + shape = "img/image_right.png" + elif agent.current_rotation == Direction.down: + shape = "img/image_down.png" + elif agent.current_rotation == Direction.left: + shape = "img/image_left.png" + + portrayal = {"Shape": shape, "scale": 1.0, "Layer": 0} if isinstance(agent, PatchAgent): color = colors[0] if agent.type == PatchType.dropOff: - portrayal = {"Shape": "truck.png", "scale": 1.0, "Layer": 0} + portrayal = {"Shape": "img/truck.png", "scale": 1.0, "Layer": 0} elif agent.type == PatchType.pickUp: - portrayal = {"Shape": "okB00mer.png", "scale": 1.0, "Layer": 0} + portrayal = {"Shape": "img/okB00mer.png", "scale": 1.0, "Layer": 0} else: - color = colors[random.randrange(13)+3] + color = colors[random.randrange(13) + 3] portrayal = {"Shape": "rect", - "Filled": "true", - "Layer": 0, - "Color": color, - "w": 1, - "h": 1} + "Filled": "true", + "Layer": 0, + "Color": color, + "w": 1, + "h": 1} return portrayal + base = 512 gridWidth = 10 gridHeight = 10 -scale = base/gridWidth +scale = base / gridWidth -grid = CanvasGrid(agent_portrayal, gridWidth, gridHeight, scale*gridWidth, scale*gridHeight) +diagram4 = GridWithWeights(gridWidth, gridHeight) +diagram4.walls = [] -server = ModularServer(ForkliftModel, +grid = CanvasGrid(agent_portrayal, gridWidth, gridHeight, scale * gridWidth, scale * gridHeight) + +server = ModularServer(GameModel, [grid], "Automatyczny Wózek Widłowy", - {"width": gridHeight, "height": gridWidth}) + {"width": gridHeight, "height": gridWidth, "graph": diagram4}) server.port = 8888 server.launch() diff --git a/util/PathDefinitions.py b/util/PathDefinitions.py new file mode 100644 index 0000000..bf5c98b --- /dev/null +++ b/util/PathDefinitions.py @@ -0,0 +1,56 @@ +from typing import Iterator, Protocol, List, TypeVar, Tuple, Dict + +GridLocation = Tuple[int, int] +Location = TypeVar('Location') + + +class Graph(Protocol): + def neighbors(self, id: Location) -> List[Location]: pass + + +class SquareGrid: + def __init__(self, width: int, height: int): + self.width = width + self.height = height + self.walls: List[GridLocation] = [] + + def in_bounds(self, id: GridLocation) -> bool: + (x, y) = id + return 0 <= x < self.width and 0 <= y < self.height + + def passable(self, id: GridLocation) -> bool: + return id not in self.walls + + def neighbors(self, id: GridLocation) -> Iterator[GridLocation]: + (x, y) = id + neighbors = [(x + 1, y), (x - 1, y), (x, y - 1), (x, y + 1)] + if (x + y) % 2 == 0: neighbors.reverse() + results = filter(self.in_bounds, neighbors) + results = filter(self.passable, results) + return results + + +class WeightedGraph(Graph): + def cost(self, from_id: Location, to_id: Location) -> float: pass + + +class GridWithWeights(SquareGrid): + def __init__(self, width: int, height: int): + super().__init__(width, height) + self.weights: Dict[GridLocation, float] = {} + + def cost(self, from_node: GridLocation, to_node: GridLocation) -> float: + return self.weights.get(to_node, 1) + + +# utility functions for dealing with square grids +def from_id_width(id, width): + return (id % width, id // width) + + +def inverse_y(height, y): + return height - y + + +class Graph(Protocol): + def neighbors(self, id: Location) -> List[Location]: pass diff --git a/util/PathVisualiser.py b/util/PathVisualiser.py new file mode 100644 index 0000000..327fe35 --- /dev/null +++ b/util/PathVisualiser.py @@ -0,0 +1,43 @@ +# thanks to @m1sp for this simpler version of +# reconstruct_path that doesn't have duplicate entries +from typing import Dict, List + +from util.Pathfinder import Location + + +def reconstruct_path(came_from: Dict[Location, Location], + start: Location, goal: Location) -> List[Location]: + current: Location = goal + path: List[Location] = [] + while current != start: # note: this will fail if no path found + path.append(current) + current = came_from[current] + path.append(start) # optional + path.reverse() # optional + return path + + +def draw_grid(graph, **style): + print("___" * graph.width) + for y in range(graph.height): + for x in range(graph.width): + print("%s" % draw_tile(graph, (x, y), style), end="") + print() + print("~~~" * graph.width) + + +def draw_tile(graph, id, style): + r = " . " + if 'number' in style and id in style['number']: r = " %-2d" % style['number'][id] + if 'point_to' in style and style['point_to'].get(id, None) is not None: + (x1, y1) = id + (x2, y2) = style['point_to'][id] + if x2 == x1 + 1: r = " > " + if x2 == x1 - 1: r = " < " + if y2 == y1 + 1: r = " v " + if y2 == y1 - 1: r = " ^ " + if 'path' in style and id in style['path']: r = " @ " + if 'start' in style and id == style['start']: r = " A " + if 'goal' in style and id == style['goal']: r = " Z " + if id in graph.walls: r = "###" + return r diff --git a/util/Pathfinder.py b/util/Pathfinder.py new file mode 100644 index 0000000..51ec199 --- /dev/null +++ b/util/Pathfinder.py @@ -0,0 +1,36 @@ +from typing import Optional, Dict, Tuple + +from util.PathDefinitions import WeightedGraph, Location +from util.PriorityQueue import PriorityQueue + + +def heuristic(a: Tuple[int, int], b: Tuple[int, int]) -> float: + (x1, y1) = a + + (x2, y2) = b + return abs(x1 - x2) + abs(y1 - y2) + + +def a_star_search(graph: WeightedGraph, start: Tuple[int, int], goal: Tuple[int, int]): + frontier = PriorityQueue() + frontier.put(start, 0) + came_from: Dict[Location, Optional[Location]] = {} + cost_so_far: Dict[Location, float] = {} + came_from[start] = None + cost_so_far[start] = 0 + + while not frontier.empty(): + current: Location = frontier.get() + + if current == goal: + break + + for next in graph.neighbors(current): + new_cost = cost_so_far[current] + graph.cost(current, next) + if next not in cost_so_far or new_cost < cost_so_far[next]: + cost_so_far[next] = new_cost + priority = new_cost + heuristic(next, goal) + frontier.put(next, priority) + came_from[next] = current + + return came_from, cost_so_far.keys() diff --git a/util/PriorityQueue.py b/util/PriorityQueue.py new file mode 100644 index 0000000..ca866dd --- /dev/null +++ b/util/PriorityQueue.py @@ -0,0 +1,18 @@ +import heapq +from typing import List, Tuple, TypeVar + +T = TypeVar("T") + + +class PriorityQueue: + def __init__(self): + self.elements: List[Tuple[float, T]] = [] + + def empty(self) -> bool: + return not self.elements + + def put(self, item: T, priority: float): + heapq.heappush(self.elements, (priority, item)) + + def get(self) -> T: + return heapq.heappop(self.elements)[1]