From d72364e06905de1f57f1b9ae4f02e3639d9b6d4c Mon Sep 17 00:00:00 2001 From: Makrellka Date: Thu, 28 Apr 2022 21:22:19 +0200 Subject: [PATCH] clean-up, rotation weight fix --- GameModel.py | 6 +- decision/StateTree.py | 5 +- main.py | 2 +- pathfinding/PathFinderState.py | 17 ++++ pathfinding/Pathfinder.py | 147 ------------------------------ pathfinding/PathfinderOnStates.py | 138 ++++++++++++++++++++++++++++ pathfinding/PrioritizedItem.py | 8 ++ util/PathVisualiser.py | 43 --------- util/Pathfinder.py | 36 -------- 9 files changed, 168 insertions(+), 234 deletions(-) create mode 100644 pathfinding/PathFinderState.py delete mode 100644 pathfinding/Pathfinder.py create mode 100644 pathfinding/PathfinderOnStates.py create mode 100644 pathfinding/PrioritizedItem.py delete mode 100644 util/PathVisualiser.py delete mode 100644 util/Pathfinder.py diff --git a/GameModel.py b/GameModel.py index 3ee570d..01ae27e 100644 --- a/GameModel.py +++ b/GameModel.py @@ -11,7 +11,7 @@ from PatchAgent import PatchAgent from PatchType import PatchType from data.GameConstants import GameConstants from decision.ActionType import ActionType -from pathfinding.Pathfinder import PathFinderOnStates, PathFinderState +from pathfinding.PathfinderOnStates import PathFinderOnStates, PathFinderState from util.PathDefinitions import GridLocation, GridWithWeights @@ -43,7 +43,7 @@ class GameModel(Model): self.grid.place_agent(self.forklift_agent, (x, y)) self.forklift_agent.current_position = (x, y) - start, goal = (x, y), (2, 3) + start, goal = (x, y), (1, 3) pathFinder = PathFinderOnStates( self.game_constants, @@ -56,7 +56,7 @@ class GameModel(Model): self.place_walls_agents(graph.walls) self.place_puddles(graph2.puddles) - actions = pathFinder.getActionList() + actions = pathFinder.get_action_list() print("PATHFINDING") print(actions) self.forklift_agent.queue_movement_actions(actions) diff --git a/decision/StateTree.py b/decision/StateTree.py index 6fbec66..3dcb59f 100644 --- a/decision/StateTree.py +++ b/decision/StateTree.py @@ -22,7 +22,4 @@ class StateTree: ) def expansion(self, from_state: State) -> List[State]: - return [] - # a* na przestrzeni stanow - # heura -> np manhatan - # funkcja kosztu zroznicowany \ No newline at end of file + return [] \ No newline at end of file diff --git a/main.py b/main.py index 17cec37..ab7c146 100644 --- a/main.py +++ b/main.py @@ -61,7 +61,7 @@ if __name__ == '__main__': diagram4.walls = [(6, 5), (6, 6), (6, 7), (6, 8), (2, 3), (2, 4), (3, 4), (4, 4), (6, 4)] diagram5 = GridWithWeights(gridWidth, gridHeight) - diagram5.puddles = [(2, 2), (2, 5), (5, 4)] + diagram5.puddles = [(2, 2), (2, 5), (2, 6), (5, 4)] grid = CanvasGrid(agent_portrayal, gridWidth, gridHeight, scale * gridWidth, scale * gridHeight) diff --git a/pathfinding/PathFinderState.py b/pathfinding/PathFinderState.py new file mode 100644 index 0000000..4195016 --- /dev/null +++ b/pathfinding/PathFinderState.py @@ -0,0 +1,17 @@ +from typing import List + +from data.Direction import Direction +from decision.ActionType import ActionType +from util.PathDefinitions import GridLocation + + +class PathFinderState: + + def __init__(self, agent_position: GridLocation, agent_direction: Direction, cost: float, + last_action: ActionType, action_taken: List[ActionType]): + super().__init__() + self.agent_position = agent_position + self.agent_direction = agent_direction + self.cost = cost + self.last_action = last_action + self.action_taken = action_taken \ No newline at end of file diff --git a/pathfinding/Pathfinder.py b/pathfinding/Pathfinder.py deleted file mode 100644 index 77f7e57..0000000 --- a/pathfinding/Pathfinder.py +++ /dev/null @@ -1,147 +0,0 @@ -from dataclasses import dataclass, field -from typing import List, Any -from typing import Tuple - -from data.Direction import Direction -from data.GameConstants import GameConstants -from decision.ActionType import ActionType -from util.PathDefinitions import GridLocation -from util.PriorityQueue import PriorityQueue - - -@dataclass(order=True) -class PrioritizedItem: - priority: float - item: Any = field(compare=False) - - -class PathFinderState: - - def __init__(self, agent_position: GridLocation, agent_direction: Direction, cost: float, - last_action: ActionType, action_taken: List[ActionType]): - super().__init__() - self.agent_position = agent_position - self.agent_direction = agent_direction - self.cost = cost - self.last_action = last_action - self.action_taken = action_taken - - -class PathFinderOnStates: - def __init__(self, game_constants: GameConstants, goal: GridLocation, root_state: PathFinderState): - super().__init__() - self.game_constants = game_constants - self.goal = goal - self.queue = PriorityQueue() - self.queue.put(PrioritizedItem(root_state.cost, root_state), root_state.cost) - - def heuristic(self, a: Tuple[int, int], b: Tuple[int, int]) -> float: - # tutaj mozna uzyc heury np. manhatan distance (zmodyfikowany bo masz obroty a to zmienia oplacalnosc) - (x1, y1) = a - (x2, y2) = b - return abs(x1 - x2) + abs(y1 - y2) - - def evaluate(self, currState: PathFinderState) -> float: - # koszt dojscia do danego stanu+ heura - return currState.cost + self.heuristic(currState.agent_position, self.goal) - - def getPositionAfterMove(self, currState: PathFinderState) -> GridLocation: - if currState.agent_direction == Direction.top: - return currState.agent_position[0], currState.agent_position[1] + 1 - - elif currState.agent_direction == Direction.down: - return currState.agent_position[0], currState.agent_position[1] - 1 - - elif currState.agent_direction == Direction.right: - return currState.agent_position[0] + 1, currState.agent_position[1] - - elif currState.agent_direction == Direction.left: - return currState.agent_position[0] - 1, currState.agent_position[1] - - def isMovePossible(self, currState: PathFinderState) -> bool: - positionAfterMove = self.getPositionAfterMove(currState) - if positionAfterMove in self.game_constants.walls: - return False - elif positionAfterMove[0] < 0 or positionAfterMove[0] > self.game_constants.grid_width: - return False - elif positionAfterMove[1] < 0 or positionAfterMove[1] > self.game_constants.grid_height: - return False - else: - return True - - def createState(self, currState: PathFinderState, action: ActionType) -> PathFinderState: - if currState.agent_position in self.game_constants.diffTerrain: - cost = currState.cost + 5 - else: - cost = currState.cost + 1 - last_action = action - action_taken: List[ActionType] = [] - action_taken.extend(currState.action_taken) - action_taken.append(last_action) - agent_position = currState.agent_position - agent_direction = currState.agent_direction - - if action == ActionType.ROTATE_UP: - agent_direction = Direction.top - elif action == ActionType.ROTATE_DOWN: - agent_direction = Direction.down - elif action == ActionType.ROTATE_LEFT: - agent_direction = Direction.left - elif action == ActionType.ROTATE_RIGHT: - agent_direction = Direction.right - elif action == ActionType.MOVE: - agent_position = self.getPositionAfterMove(currState) - - return PathFinderState(agent_position, agent_direction, cost, last_action, action_taken) - - def expansion(self, currState: PathFinderState) -> List[PathFinderState]: - # dla stanu sprawdzamy jakie akcje z tego miejsca mozemy podjac (ActionType) - # reprezentacja kazdego stanu co moge podjac z tego miejsca - # generowanie stanu - # sprawdz w ktorym kierunku obrocony - possibleNextStates: List[PathFinderState] = [] - if self.isMovePossible(currState): - possibleNextStates.append(self.createState(currState, ActionType.MOVE)) - - if currState.agent_direction == Direction.top: - possibleNextStates.append(self.createState(currState, ActionType.ROTATE_RIGHT)) - possibleNextStates.append(self.createState(currState, ActionType.ROTATE_LEFT)) - possibleNextStates.append(self.createState(currState, ActionType.ROTATE_DOWN)) - - elif currState.agent_direction == Direction.down: - possibleNextStates.append(self.createState(currState, ActionType.ROTATE_RIGHT)) - possibleNextStates.append(self.createState(currState, ActionType.ROTATE_LEFT)) - possibleNextStates.append(self.createState(currState, ActionType.ROTATE_UP)) - - elif currState.agent_direction == Direction.left: - possibleNextStates.append(self.createState(currState, ActionType.ROTATE_RIGHT)) - possibleNextStates.append(self.createState(currState, ActionType.ROTATE_UP)) - possibleNextStates.append(self.createState(currState, ActionType.ROTATE_DOWN)) - - elif currState.agent_direction == Direction.right: - possibleNextStates.append(self.createState(currState, ActionType.ROTATE_UP)) - possibleNextStates.append(self.createState(currState, ActionType.ROTATE_LEFT)) - possibleNextStates.append(self.createState(currState, ActionType.ROTATE_DOWN)) - - return possibleNextStates - - def getActionList(self) -> List[ActionType]: - already_visited = {} - - while not self.queue.empty(): - item: PrioritizedItem = self.queue.get() - best_state: PathFinderState = item.item - - if best_state.agent_position == self.goal or (self.heuristic(best_state.agent_position, self.goal) == 1 - and self.goal in self.game_constants.walls): - break - - for state in self.expansion(best_state): - s_tuple = (state.agent_position[0], state.agent_position[1], state.agent_direction) - - if s_tuple not in already_visited: - priority = self.evaluate(state) - self.queue.put(PrioritizedItem(priority, state), priority) - already_visited[s_tuple] = state - - return best_state.action_taken diff --git a/pathfinding/PathfinderOnStates.py b/pathfinding/PathfinderOnStates.py new file mode 100644 index 0000000..b2639c2 --- /dev/null +++ b/pathfinding/PathfinderOnStates.py @@ -0,0 +1,138 @@ +from typing import List +from typing import Tuple + +from data.Direction import Direction +from data.GameConstants import GameConstants +from decision.ActionType import ActionType +from pathfinding.PathFinderState import PathFinderState +from pathfinding.PrioritizedItem import PrioritizedItem +from util.PathDefinitions import GridLocation +from util.PriorityQueue import PriorityQueue + + +class PathFinderOnStates: + def __init__(self, game_constants: GameConstants, goal: GridLocation, root_state: PathFinderState): + super().__init__() + self.game_constants = game_constants + self.goal = goal + self.queue = PriorityQueue() + self.queue.put(PrioritizedItem(root_state.cost, root_state), root_state.cost) + + def heuristic(self, a: Tuple[int, int], b: Tuple[int, int]) -> float: + # tutaj mozna uzyc heury np. manhatan distance (zmodyfikowany bo masz obroty a to zmienia oplacalnosc) + (x1, y1) = a + (x2, y2) = b + return abs(x1 - x2) + abs(y1 - y2) + + def evaluate(self, curr_state: PathFinderState) -> float: + # koszt dojscia do danego stanu+ heura + return curr_state.cost + self.heuristic(curr_state.agent_position, self.goal) + + def get_position_after_move(self, curr_state: PathFinderState) -> GridLocation: + if curr_state.agent_direction == Direction.top: + return curr_state.agent_position[0], curr_state.agent_position[1] + 1 + + elif curr_state.agent_direction == Direction.down: + return curr_state.agent_position[0], curr_state.agent_position[1] - 1 + + elif curr_state.agent_direction == Direction.right: + return curr_state.agent_position[0] + 1, curr_state.agent_position[1] + + elif curr_state.agent_direction == Direction.left: + return curr_state.agent_position[0] - 1, curr_state.agent_position[1] + + def is_move_possible(self, curr_state: PathFinderState) -> bool: + position_after_move = self.get_position_after_move(curr_state) + if position_after_move in self.game_constants.walls: + return False + elif position_after_move[0] < 0 or position_after_move[0] > self.game_constants.grid_width: + return False + elif position_after_move[1] < 0 or position_after_move[1] > self.game_constants.grid_height: + return False + else: + return True + + def create_state(self, curr_state: PathFinderState, action: ActionType) -> PathFinderState: + + if action == action.MOVE: + if curr_state.agent_position in self.game_constants.diffTerrain: + cost = curr_state.cost + 20 + # tutaj koszt kaluzy + else: + cost = curr_state.cost + 1 + #TODO: jezeli bedziemy rozpatrywac rozne stany to nalezy rozbic na kazdy mozliwy obrot + else: + cost = curr_state.cost + 10 + + last_action = action + action_taken: List[ActionType] = [] + action_taken.extend(curr_state.action_taken) + action_taken.append(last_action) + agent_position = curr_state.agent_position + agent_direction = curr_state.agent_direction + + if action == ActionType.ROTATE_UP: + agent_direction = Direction.top + elif action == ActionType.ROTATE_DOWN: + agent_direction = Direction.down + elif action == ActionType.ROTATE_LEFT: + agent_direction = Direction.left + elif action == ActionType.ROTATE_RIGHT: + agent_direction = Direction.right + elif action == ActionType.MOVE: + agent_position = self.get_position_after_move(curr_state) + + return PathFinderState(agent_position, agent_direction, cost, last_action, action_taken) + + # Funkcja następnika + def expansion(self, curr_state: PathFinderState) -> List[PathFinderState]: + # dla stanu sprawdzamy jakie akcje z tego miejsca mozemy podjac (ActionType) + # reprezentacja kazdego stanu co moge podjac z tego miejsca + # generowanie stanu + # sprawdz w ktorym kierunku obrocony + possible_next_states: List[PathFinderState] = [] + if self.is_move_possible(curr_state): + possible_next_states.append(self.create_state(curr_state, ActionType.MOVE)) + + if curr_state.agent_direction == Direction.top: + possible_next_states.append(self.create_state(curr_state, ActionType.ROTATE_RIGHT)) + possible_next_states.append(self.create_state(curr_state, ActionType.ROTATE_LEFT)) + possible_next_states.append(self.create_state(curr_state, ActionType.ROTATE_DOWN)) + + elif curr_state.agent_direction == Direction.down: + possible_next_states.append(self.create_state(curr_state, ActionType.ROTATE_RIGHT)) + possible_next_states.append(self.create_state(curr_state, ActionType.ROTATE_LEFT)) + possible_next_states.append(self.create_state(curr_state, ActionType.ROTATE_UP)) + + elif curr_state.agent_direction == Direction.left: + possible_next_states.append(self.create_state(curr_state, ActionType.ROTATE_RIGHT)) + possible_next_states.append(self.create_state(curr_state, ActionType.ROTATE_UP)) + possible_next_states.append(self.create_state(curr_state, ActionType.ROTATE_DOWN)) + + elif curr_state.agent_direction == Direction.right: + possible_next_states.append(self.create_state(curr_state, ActionType.ROTATE_UP)) + possible_next_states.append(self.create_state(curr_state, ActionType.ROTATE_LEFT)) + possible_next_states.append(self.create_state(curr_state, ActionType.ROTATE_DOWN)) + + return possible_next_states + + def get_action_list(self) -> List[ActionType]: + already_visited = {} + + while not self.queue.empty(): + item: PrioritizedItem = self.queue.get() + best_state: PathFinderState = item.item + + if best_state.agent_position == self.goal or (self.heuristic(best_state.agent_position, self.goal) == 1 + and self.goal in self.game_constants.walls): + break + + for state in self.expansion(best_state): + s_tuple = (state.agent_position[0], state.agent_position[1], state.agent_direction) + + if s_tuple not in already_visited: + priority = self.evaluate(state) + self.queue.put(PrioritizedItem(priority, state), priority) + already_visited[s_tuple] = state + + return best_state.action_taken diff --git a/pathfinding/PrioritizedItem.py b/pathfinding/PrioritizedItem.py new file mode 100644 index 0000000..e729849 --- /dev/null +++ b/pathfinding/PrioritizedItem.py @@ -0,0 +1,8 @@ +from dataclasses import dataclass, field +from typing import Any + + +@dataclass(order=True) +class PrioritizedItem: + priority: float + item: Any = field(compare=False) \ No newline at end of file diff --git a/util/PathVisualiser.py b/util/PathVisualiser.py deleted file mode 100644 index 327fe35..0000000 --- a/util/PathVisualiser.py +++ /dev/null @@ -1,43 +0,0 @@ -# thanks to @m1sp for this simpler version of -# reconstruct_path that doesn't have duplicate entries -from typing import Dict, List - -from util.Pathfinder import Location - - -def reconstruct_path(came_from: Dict[Location, Location], - start: Location, goal: Location) -> List[Location]: - current: Location = goal - path: List[Location] = [] - while current != start: # note: this will fail if no path found - path.append(current) - current = came_from[current] - path.append(start) # optional - path.reverse() # optional - return path - - -def draw_grid(graph, **style): - print("___" * graph.width) - for y in range(graph.height): - for x in range(graph.width): - print("%s" % draw_tile(graph, (x, y), style), end="") - print() - print("~~~" * graph.width) - - -def draw_tile(graph, id, style): - r = " . " - if 'number' in style and id in style['number']: r = " %-2d" % style['number'][id] - if 'point_to' in style and style['point_to'].get(id, None) is not None: - (x1, y1) = id - (x2, y2) = style['point_to'][id] - if x2 == x1 + 1: r = " > " - if x2 == x1 - 1: r = " < " - if y2 == y1 + 1: r = " v " - if y2 == y1 - 1: r = " ^ " - if 'path' in style and id in style['path']: r = " @ " - if 'start' in style and id == style['start']: r = " A " - if 'goal' in style and id == style['goal']: r = " Z " - if id in graph.walls: r = "###" - return r diff --git a/util/Pathfinder.py b/util/Pathfinder.py deleted file mode 100644 index 51ec199..0000000 --- a/util/Pathfinder.py +++ /dev/null @@ -1,36 +0,0 @@ -from typing import Optional, Dict, Tuple - -from util.PathDefinitions import WeightedGraph, Location -from util.PriorityQueue import PriorityQueue - - -def heuristic(a: Tuple[int, int], b: Tuple[int, int]) -> float: - (x1, y1) = a - - (x2, y2) = b - return abs(x1 - x2) + abs(y1 - y2) - - -def a_star_search(graph: WeightedGraph, start: Tuple[int, int], goal: Tuple[int, int]): - frontier = PriorityQueue() - frontier.put(start, 0) - came_from: Dict[Location, Optional[Location]] = {} - cost_so_far: Dict[Location, float] = {} - came_from[start] = None - cost_so_far[start] = 0 - - while not frontier.empty(): - current: Location = frontier.get() - - if current == goal: - break - - for next in graph.neighbors(current): - new_cost = cost_so_far[current] + graph.cost(current, next) - if next not in cost_so_far or new_cost < cost_so_far[next]: - cost_so_far[next] = new_cost - priority = new_cost + heuristic(next, goal) - frontier.put(next, priority) - came_from[next] = current - - return came_from, cost_so_far.keys()