from dataclasses import dataclass, field from typing import List, Any from typing import Tuple from data.Direction import Direction from data.GameConstants import GameConstants from decision.ActionType import ActionType from util.PathDefinitions import GridLocation from util.PriorityQueue import PriorityQueue @dataclass(order=True) class PrioritizedItem: priority: float item: Any = field(compare=False) class PathFinderState: def __init__(self, agent_position: GridLocation, agent_direction: Direction, cost: float, last_action: ActionType, action_taken: List[ActionType]): super().__init__() self.agent_position = agent_position self.agent_direction = agent_direction self.cost = cost self.last_action = last_action self.action_taken = action_taken class PathFinderOnStates: def __init__(self, game_constants: GameConstants, goal: GridLocation, root_state: PathFinderState): super().__init__() self.game_constants = game_constants self.goal = goal self.queue = PriorityQueue() self.queue.put(PrioritizedItem(root_state.cost, root_state), root_state.cost) def heuristic(self, a: Tuple[int, int], b: Tuple[int, int]) -> float: # tutaj mozna uzyc heury np. manhatan distance (zmodyfikowany bo masz obroty a to zmienia oplacalnosc) (x1, y1) = a (x2, y2) = b return abs(x1 - x2) + abs(y1 - y2) def evaluate(self, currState: PathFinderState) -> float: # koszt dojscia do danego stanu+ heura return currState.cost + self.heuristic(currState.agent_position, self.goal) def getPositionAfterMove(self, currState: PathFinderState) -> GridLocation: if currState.agent_direction == Direction.top: return currState.agent_position[0], currState.agent_position[1] + 1 elif currState.agent_direction == Direction.down: return currState.agent_position[0], currState.agent_position[1] - 1 elif currState.agent_direction == Direction.right: return currState.agent_position[0] + 1, currState.agent_position[1] elif currState.agent_direction == Direction.left: return currState.agent_position[0] - 1, currState.agent_position[1] def isMovePossible(self, currState: PathFinderState) -> bool: positionAfterMove = self.getPositionAfterMove(currState) if positionAfterMove in self.game_constants.walls: return False elif positionAfterMove[0] < 0 or positionAfterMove[0] > self.game_constants.grid_width: return False elif positionAfterMove[1] < 0 or positionAfterMove[1] > self.game_constants.grid_height: return False else: return True def createState(self, currState: PathFinderState, action: ActionType) -> PathFinderState: cost = currState.cost + 1 last_action = action action_taken: List[ActionType] = [] action_taken.extend(currState.action_taken) action_taken.append(last_action) agent_position = currState.agent_position agent_direction = currState.agent_direction if action == ActionType.ROTATE_UP: agent_direction = Direction.top elif action == ActionType.ROTATE_DOWN: agent_direction = Direction.down elif action == ActionType.ROTATE_LEFT: agent_direction = Direction.left elif action == ActionType.ROTATE_RIGHT: agent_direction = Direction.right elif action == ActionType.MOVE: agent_position = self.getPositionAfterMove(currState) return PathFinderState(agent_position, agent_direction, cost, last_action, action_taken) def expansion(self, currState: PathFinderState) -> List[PathFinderState]: # dla stanu sprawdzamy jakie akcje z tego miejsca mozemy podjac (ActionType) # reprezentacja kazdego stanu co moge podjac z tego miejsca # generowanie stanu # sprawdz w ktorym kierunku obrocony possibleNextStates: List[PathFinderState] = [] if currState.agent_direction == Direction.top: possibleNextStates.append(self.createState(currState, ActionType.ROTATE_RIGHT)) possibleNextStates.append(self.createState(currState, ActionType.ROTATE_LEFT)) possibleNextStates.append(self.createState(currState, ActionType.ROTATE_DOWN)) if self.isMovePossible(currState): possibleNextStates.append(self.createState(currState, ActionType.MOVE)) elif currState.agent_direction == Direction.down: possibleNextStates.append(self.createState(currState, ActionType.ROTATE_RIGHT)) possibleNextStates.append(self.createState(currState, ActionType.ROTATE_LEFT)) possibleNextStates.append(self.createState(currState, ActionType.ROTATE_UP)) if self.isMovePossible(currState): possibleNextStates.append(self.createState(currState, ActionType.MOVE)) elif currState.agent_direction == Direction.left: possibleNextStates.append(self.createState(currState, ActionType.ROTATE_RIGHT)) possibleNextStates.append(self.createState(currState, ActionType.ROTATE_UP)) possibleNextStates.append(self.createState(currState, ActionType.ROTATE_DOWN)) if self.isMovePossible(currState): possibleNextStates.append(self.createState(currState, ActionType.MOVE)) elif currState.agent_direction == Direction.right: possibleNextStates.append(self.createState(currState, ActionType.ROTATE_UP)) possibleNextStates.append(self.createState(currState, ActionType.ROTATE_LEFT)) possibleNextStates.append(self.createState(currState, ActionType.ROTATE_DOWN)) if self.isMovePossible(currState): possibleNextStates.append(self.createState(currState, ActionType.MOVE)) return possibleNextStates def getActionList(self) -> List[ActionType]: already_visited = {} while not self.queue.empty(): item: PrioritizedItem = self.queue.get() best_state: PathFinderState = item.item if best_state.agent_position == self.goal: break # dodajesz do kolejki stany z expansion (po cost) for state in self.expansion(best_state): s_tuple = (state.agent_position[0], state.agent_position[1], state.agent_direction) if s_tuple not in already_visited: priority = self.evaluate(state) self.queue.put(PrioritizedItem(priority, state), priority) already_visited[s_tuple] = state return best_state.action_taken # do kosztu dokładam koszt starego stanu plus 1 # def a_star(self,stateTree:StateTree, start: Tuple[int, int], goal: Tuple[int, int]): # frontier = PriorityQueue() # frontier.put(start, 0) # came_from = dict() # cost_so_far = dict() # came_from[start] = None # cost_so_far[start] = 0 # # while not frontier.empty(): # current = frontier.get() # # if current == goal: # break # # for next in graph.neighbors(current): # new_cost = cost_so_far[current] + graph.cost(current, next) # if next not in cost_so_far or new_cost < cost_so_far[next]: # cost_so_far[next] = new_cost # priority = new_cost + heuristic(goal, next) # frontier.put(next, priority) # came_from[next] = current