Pathfinder
This commit is contained in:
parent
c4dd4561d4
commit
59676e176a
@ -1,8 +1,54 @@
|
||||
from mesa import Agent, Model
|
||||
from typing import Tuple
|
||||
|
||||
from mesa import Agent
|
||||
|
||||
from data.Direction import Direction
|
||||
|
||||
|
||||
class ForkliftAgent(Agent):
|
||||
|
||||
def __init__(self, unique_id, model):
|
||||
super().__init__(unique_id, model)
|
||||
print("Created forklift Agent with ID: {}".format(unique_id))
|
||||
self.movement_queue = [Tuple[int, int]]
|
||||
self.current_position = Tuple[int, int]
|
||||
self.current_rotation = Direction.right
|
||||
print("Created forklift Agent with ID: {}".format(unique_id))
|
||||
|
||||
def assign_new_movement_task(self, movement_list):
|
||||
self.movement_queue = []
|
||||
|
||||
for m in movement_list:
|
||||
self.movement_queue.append(m)
|
||||
|
||||
print("Assigned new movement queue to forklift agent")
|
||||
|
||||
def get_proper_rotation(self, next_pos: Tuple[int, int]) -> Direction:
|
||||
|
||||
if next_pos[0] < self.current_position[0]:
|
||||
return Direction.left
|
||||
elif next_pos[0] > self.current_position[0]:
|
||||
return Direction.right
|
||||
elif next_pos[1] > self.current_position[1]:
|
||||
return Direction.top
|
||||
elif next_pos[1] < self.current_position[1]:
|
||||
return Direction.down
|
||||
elif next_pos == self.current_position:
|
||||
return self.current_rotation
|
||||
|
||||
def move(self):
|
||||
if len(self.movement_queue) > 0:
|
||||
next_pos = self.movement_queue.pop(0)
|
||||
|
||||
dir = self.get_proper_rotation(next_pos)
|
||||
|
||||
if dir == self.current_rotation:
|
||||
print("move {} --> {}".format(self.current_position, next_pos))
|
||||
self.current_position = next_pos
|
||||
else:
|
||||
print("rotate {} --> {}".format(self.current_rotation, dir))
|
||||
self.current_rotation = dir
|
||||
self.movement_queue.insert(0, next_pos)
|
||||
|
||||
def step(self) -> None:
|
||||
print("forklift step")
|
||||
self.move()
|
||||
|
@ -1,58 +0,0 @@
|
||||
import random
|
||||
|
||||
from mesa import Agent, Model
|
||||
from mesa.time import RandomActivation
|
||||
from mesa.space import MultiGrid
|
||||
from mesa.datacollection import DataCollector
|
||||
|
||||
from PatchType import PatchType
|
||||
from ForkliftAgent import ForkliftAgent
|
||||
from PatchAgent import PatchAgent
|
||||
|
||||
|
||||
class ForkliftModel(Model):
|
||||
|
||||
def __init__(self, width, height):
|
||||
# self.num_agents = 5
|
||||
self.running = True
|
||||
self.grid = MultiGrid(height, width, True)
|
||||
self.schedule = RandomActivation(self)
|
||||
# self.datacollector = DataCollector(
|
||||
# model_reporters={"Gini": compute_gini},
|
||||
# agent_reporters={"Wealth": lambda a: a.wealth}
|
||||
# )
|
||||
|
||||
agent = ForkliftAgent(0, self)
|
||||
self.schedule.add(agent)
|
||||
# Add the agent to a random grid cell
|
||||
x = 5
|
||||
y = 5
|
||||
self.grid.place_agent(agent, (x, y))
|
||||
|
||||
agent = PatchAgent(1, self, PatchType.pickUp)
|
||||
self.schedule.add(agent)
|
||||
self.grid.place_agent(agent, (self.grid.width-1, self.grid.height-1))
|
||||
|
||||
agent = PatchAgent(2, self, PatchType.dropOff)
|
||||
self.schedule.add(agent)
|
||||
self.grid.place_agent(agent, (0, self.grid.height-1))
|
||||
|
||||
for i in range(3):
|
||||
a = PatchAgent(i+3, self, PatchType.item)
|
||||
self.schedule.add(a)
|
||||
self.grid.place_agent(a, (i, 0))
|
||||
|
||||
# Create patch agents
|
||||
# for i in range(self.num_agents):
|
||||
# a = PatchAgent(i, self)
|
||||
# self.schedule.add(a)
|
||||
# # Add the agent to a random grid cell
|
||||
# x = random.randrange(self.grid.width)
|
||||
# y = random.randrange(self.grid.height)
|
||||
# self.grid.place_agent(a, (x, y))
|
||||
|
||||
|
||||
|
||||
def step(self):
|
||||
# self.datacollector.collect(self)
|
||||
self.schedule.step()
|
66
GameModel.py
Normal file
66
GameModel.py
Normal file
@ -0,0 +1,66 @@
|
||||
from mesa import Model
|
||||
from mesa.space import MultiGrid
|
||||
from mesa.time import RandomActivation
|
||||
|
||||
from ForkliftAgent import ForkliftAgent
|
||||
from PatchAgent import PatchAgent
|
||||
from PatchType import PatchType
|
||||
from util.PathDefinitions import inverse_y
|
||||
from util.PathVisualiser import draw_grid, reconstruct_path
|
||||
from util.Pathfinder import a_star_search
|
||||
|
||||
|
||||
class GameModel(Model):
|
||||
|
||||
def __init__(self, width, height, graph):
|
||||
# self.num_agents = 5
|
||||
self.running = True
|
||||
self.grid = MultiGrid(height, width, True)
|
||||
self.schedule = RandomActivation(self)
|
||||
self.agents = []
|
||||
|
||||
self.forklift_agent = ForkliftAgent(0, self)
|
||||
self.schedule.add(self.forklift_agent)
|
||||
self.agents.append(self.forklift_agent)
|
||||
|
||||
# Add the agent to a random grid cell
|
||||
x = 5
|
||||
y = 5
|
||||
self.grid.place_agent(self.forklift_agent, (x, y))
|
||||
self.forklift_agent.current_position = (x, y)
|
||||
|
||||
start, goal = (x, y), (2, 1)
|
||||
came_from, cost_so_far = a_star_search(graph, start, goal)
|
||||
draw_grid(graph, point_to=came_from, start=start, goal=goal)
|
||||
|
||||
path = map(lambda t: (t[0], inverse_y(height, t[1])),
|
||||
reconstruct_path(came_from=came_from, start=start, goal=goal))
|
||||
|
||||
print("cam from: {}".format(came_from))
|
||||
print("costerino: {}".format(cost_so_far))
|
||||
draw_grid(graph, path=reconstruct_path(came_from, start=start, goal=goal))
|
||||
|
||||
self.forklift_agent.assign_new_movement_task(path)
|
||||
|
||||
agent = PatchAgent(1, self, PatchType.pickUp)
|
||||
# self.schedule.add(agent)
|
||||
self.grid.place_agent(agent, (self.grid.width - 1, self.grid.height - 1))
|
||||
self.agents.append(agent)
|
||||
|
||||
agent = PatchAgent(2, self, PatchType.dropOff)
|
||||
# self.schedule.add(agent)
|
||||
self.grid.place_agent(agent, (0, self.grid.height - 1))
|
||||
self.agents.append(agent)
|
||||
|
||||
for i in range(3):
|
||||
a = PatchAgent(i + 3, self, PatchType.item)
|
||||
self.agents.append(a)
|
||||
self.grid.place_agent(a, (i, 0))
|
||||
|
||||
def step(self):
|
||||
self.schedule.step()
|
||||
|
||||
print("update multiGrid")
|
||||
|
||||
self.grid.remove_agent(self.forklift_agent)
|
||||
self.grid.place_agent(self.forklift_agent, self.forklift_agent.current_position)
|
8
data/Direction.py
Normal file
8
data/Direction.py
Normal file
@ -0,0 +1,8 @@
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class Direction(Enum):
|
||||
top = 1
|
||||
right = 2
|
||||
down = 3
|
||||
left = 4
|
Before Width: | Height: | Size: 317 KiB After Width: | Height: | Size: 317 KiB |
49
main.py
49
main.py
@ -1,50 +1,65 @@
|
||||
import random
|
||||
|
||||
from mesa.visualization.modules import CanvasGrid
|
||||
from mesa.visualization.ModularVisualization import ModularServer
|
||||
from mesa.visualization.modules import CanvasGrid
|
||||
|
||||
from ForkliftModel import ForkliftModel
|
||||
from ForkliftAgent import ForkliftAgent
|
||||
from GameModel import GameModel
|
||||
from PatchAgent import PatchAgent
|
||||
from PatchType import PatchType
|
||||
from data.Direction import Direction
|
||||
from util.PathDefinitions import GridWithWeights
|
||||
|
||||
colors = [
|
||||
'blue', 'cyan', 'orange', 'yellow', 'magenta', 'purple', '#103d3e', '#9fc86c',
|
||||
'#b4c2ed', '#31767d', '#31a5fa', '#ba96e0', '#fef3e4', '#6237ac', '#f9cacd', '#1e8123'
|
||||
]
|
||||
|
||||
|
||||
def agent_portrayal(agent):
|
||||
|
||||
|
||||
if isinstance(agent, ForkliftAgent):
|
||||
portrayal = {"Shape": "image.png", "scale": 1.0, "Layer": 0}
|
||||
shape = ""
|
||||
if agent.current_rotation == Direction.top:
|
||||
shape = "img/image_top.png"
|
||||
elif agent.current_rotation == Direction.right:
|
||||
shape = "img/image_right.png"
|
||||
elif agent.current_rotation == Direction.down:
|
||||
shape = "img/image_down.png"
|
||||
elif agent.current_rotation == Direction.left:
|
||||
shape = "img/image_left.png"
|
||||
|
||||
portrayal = {"Shape": shape, "scale": 1.0, "Layer": 0}
|
||||
|
||||
if isinstance(agent, PatchAgent):
|
||||
color = colors[0]
|
||||
if agent.type == PatchType.dropOff:
|
||||
portrayal = {"Shape": "truck.png", "scale": 1.0, "Layer": 0}
|
||||
portrayal = {"Shape": "img/truck.png", "scale": 1.0, "Layer": 0}
|
||||
elif agent.type == PatchType.pickUp:
|
||||
portrayal = {"Shape": "okB00mer.png", "scale": 1.0, "Layer": 0}
|
||||
portrayal = {"Shape": "img/okB00mer.png", "scale": 1.0, "Layer": 0}
|
||||
else:
|
||||
color = colors[random.randrange(13)+3]
|
||||
color = colors[random.randrange(13) + 3]
|
||||
portrayal = {"Shape": "rect",
|
||||
"Filled": "true",
|
||||
"Layer": 0,
|
||||
"Color": color,
|
||||
"w": 1,
|
||||
"h": 1}
|
||||
"Filled": "true",
|
||||
"Layer": 0,
|
||||
"Color": color,
|
||||
"w": 1,
|
||||
"h": 1}
|
||||
return portrayal
|
||||
|
||||
|
||||
base = 512
|
||||
gridWidth = 10
|
||||
gridHeight = 10
|
||||
scale = base/gridWidth
|
||||
scale = base / gridWidth
|
||||
|
||||
grid = CanvasGrid(agent_portrayal, gridWidth, gridHeight, scale*gridWidth, scale*gridHeight)
|
||||
diagram4 = GridWithWeights(gridWidth, gridHeight)
|
||||
diagram4.walls = []
|
||||
|
||||
server = ModularServer(ForkliftModel,
|
||||
grid = CanvasGrid(agent_portrayal, gridWidth, gridHeight, scale * gridWidth, scale * gridHeight)
|
||||
|
||||
server = ModularServer(GameModel,
|
||||
[grid],
|
||||
"Automatyczny Wózek Widłowy",
|
||||
{"width": gridHeight, "height": gridWidth})
|
||||
{"width": gridHeight, "height": gridWidth, "graph": diagram4})
|
||||
server.port = 8888
|
||||
server.launch()
|
||||
|
56
util/PathDefinitions.py
Normal file
56
util/PathDefinitions.py
Normal file
@ -0,0 +1,56 @@
|
||||
from typing import Iterator, Protocol, List, TypeVar, Tuple, Dict
|
||||
|
||||
GridLocation = Tuple[int, int]
|
||||
Location = TypeVar('Location')
|
||||
|
||||
|
||||
class Graph(Protocol):
|
||||
def neighbors(self, id: Location) -> List[Location]: pass
|
||||
|
||||
|
||||
class SquareGrid:
|
||||
def __init__(self, width: int, height: int):
|
||||
self.width = width
|
||||
self.height = height
|
||||
self.walls: List[GridLocation] = []
|
||||
|
||||
def in_bounds(self, id: GridLocation) -> bool:
|
||||
(x, y) = id
|
||||
return 0 <= x < self.width and 0 <= y < self.height
|
||||
|
||||
def passable(self, id: GridLocation) -> bool:
|
||||
return id not in self.walls
|
||||
|
||||
def neighbors(self, id: GridLocation) -> Iterator[GridLocation]:
|
||||
(x, y) = id
|
||||
neighbors = [(x + 1, y), (x - 1, y), (x, y - 1), (x, y + 1)]
|
||||
if (x + y) % 2 == 0: neighbors.reverse()
|
||||
results = filter(self.in_bounds, neighbors)
|
||||
results = filter(self.passable, results)
|
||||
return results
|
||||
|
||||
|
||||
class WeightedGraph(Graph):
|
||||
def cost(self, from_id: Location, to_id: Location) -> float: pass
|
||||
|
||||
|
||||
class GridWithWeights(SquareGrid):
|
||||
def __init__(self, width: int, height: int):
|
||||
super().__init__(width, height)
|
||||
self.weights: Dict[GridLocation, float] = {}
|
||||
|
||||
def cost(self, from_node: GridLocation, to_node: GridLocation) -> float:
|
||||
return self.weights.get(to_node, 1)
|
||||
|
||||
|
||||
# utility functions for dealing with square grids
|
||||
def from_id_width(id, width):
|
||||
return (id % width, id // width)
|
||||
|
||||
|
||||
def inverse_y(height, y):
|
||||
return height - y
|
||||
|
||||
|
||||
class Graph(Protocol):
|
||||
def neighbors(self, id: Location) -> List[Location]: pass
|
43
util/PathVisualiser.py
Normal file
43
util/PathVisualiser.py
Normal file
@ -0,0 +1,43 @@
|
||||
# thanks to @m1sp <Jaiden Mispy> for this simpler version of
|
||||
# reconstruct_path that doesn't have duplicate entries
|
||||
from typing import Dict, List
|
||||
|
||||
from util.Pathfinder import Location
|
||||
|
||||
|
||||
def reconstruct_path(came_from: Dict[Location, Location],
|
||||
start: Location, goal: Location) -> List[Location]:
|
||||
current: Location = goal
|
||||
path: List[Location] = []
|
||||
while current != start: # note: this will fail if no path found
|
||||
path.append(current)
|
||||
current = came_from[current]
|
||||
path.append(start) # optional
|
||||
path.reverse() # optional
|
||||
return path
|
||||
|
||||
|
||||
def draw_grid(graph, **style):
|
||||
print("___" * graph.width)
|
||||
for y in range(graph.height):
|
||||
for x in range(graph.width):
|
||||
print("%s" % draw_tile(graph, (x, y), style), end="")
|
||||
print()
|
||||
print("~~~" * graph.width)
|
||||
|
||||
|
||||
def draw_tile(graph, id, style):
|
||||
r = " . "
|
||||
if 'number' in style and id in style['number']: r = " %-2d" % style['number'][id]
|
||||
if 'point_to' in style and style['point_to'].get(id, None) is not None:
|
||||
(x1, y1) = id
|
||||
(x2, y2) = style['point_to'][id]
|
||||
if x2 == x1 + 1: r = " > "
|
||||
if x2 == x1 - 1: r = " < "
|
||||
if y2 == y1 + 1: r = " v "
|
||||
if y2 == y1 - 1: r = " ^ "
|
||||
if 'path' in style and id in style['path']: r = " @ "
|
||||
if 'start' in style and id == style['start']: r = " A "
|
||||
if 'goal' in style and id == style['goal']: r = " Z "
|
||||
if id in graph.walls: r = "###"
|
||||
return r
|
36
util/Pathfinder.py
Normal file
36
util/Pathfinder.py
Normal file
@ -0,0 +1,36 @@
|
||||
from typing import Optional, Dict, Tuple
|
||||
|
||||
from util.PathDefinitions import WeightedGraph, Location
|
||||
from util.PriorityQueue import PriorityQueue
|
||||
|
||||
|
||||
def heuristic(a: Tuple[int, int], b: Tuple[int, int]) -> float:
|
||||
(x1, y1) = a
|
||||
|
||||
(x2, y2) = b
|
||||
return abs(x1 - x2) + abs(y1 - y2)
|
||||
|
||||
|
||||
def a_star_search(graph: WeightedGraph, start: Tuple[int, int], goal: Tuple[int, int]):
|
||||
frontier = PriorityQueue()
|
||||
frontier.put(start, 0)
|
||||
came_from: Dict[Location, Optional[Location]] = {}
|
||||
cost_so_far: Dict[Location, float] = {}
|
||||
came_from[start] = None
|
||||
cost_so_far[start] = 0
|
||||
|
||||
while not frontier.empty():
|
||||
current: Location = frontier.get()
|
||||
|
||||
if current == goal:
|
||||
break
|
||||
|
||||
for next in graph.neighbors(current):
|
||||
new_cost = cost_so_far[current] + graph.cost(current, next)
|
||||
if next not in cost_so_far or new_cost < cost_so_far[next]:
|
||||
cost_so_far[next] = new_cost
|
||||
priority = new_cost + heuristic(next, goal)
|
||||
frontier.put(next, priority)
|
||||
came_from[next] = current
|
||||
|
||||
return came_from, cost_so_far.keys()
|
18
util/PriorityQueue.py
Normal file
18
util/PriorityQueue.py
Normal file
@ -0,0 +1,18 @@
|
||||
import heapq
|
||||
from typing import List, Tuple, TypeVar
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
class PriorityQueue:
|
||||
def __init__(self):
|
||||
self.elements: List[Tuple[float, T]] = []
|
||||
|
||||
def empty(self) -> bool:
|
||||
return not self.elements
|
||||
|
||||
def put(self, item: T, priority: float):
|
||||
heapq.heappush(self.elements, (priority, item))
|
||||
|
||||
def get(self) -> T:
|
||||
return heapq.heappop(self.elements)[1]
|
Loading…
Reference in New Issue
Block a user