SI_InteligentnyWozekWidlowy/pathfinding/dlaMarcina.py
Aleksander Szamałek 88d2ebe087 review
2022-04-28 01:50:56 +02:00

169 lines
7.6 KiB
Python

from dataclasses import dataclass, field
from typing import List, Any
from typing import Tuple
from data.Direction import Direction
from data.GameConstants import GameConstants
from decision.ActionType import ActionType
from util.PathDefinitions import GridLocation
from util.PriorityQueue import PriorityQueue
@dataclass(order=True)
class PrioritizedItem:
priority: float
item: Any = field(compare=False)
class PathFinderState:
def __init__(self, agent_position: GridLocation, agent_direction: Direction, cost: float,
last_action: ActionType, action_taken: List[ActionType]):
super().__init__()
self.agent_position = agent_position
self.agent_direction = agent_direction
self.cost = cost
self.last_action = last_action
self.action_taken = action_taken
class PathFinderOnStates:
def __init__(self, game_constants: GameConstants, goal: GridLocation, root_state: PathFinderState):
super().__init__()
self.game_constants = game_constants
self.goal = goal
self.queue = PriorityQueue()
self.queue.put(PrioritizedItem(root_state.cost, root_state), root_state.cost)
def heuristic(self, a: Tuple[int, int], b: Tuple[int, int]) -> float:
# tutaj mozna uzyc heury np. manhatan distance (zmodyfikowany bo masz obroty a to zmienia oplacalnosc)
(x1, y1) = a
(x2, y2) = b
return abs(x1 - x2) + abs(y1 - y2)
def evaluate(self, currState: PathFinderState) -> float:
# koszt dojscia do danego stanu+ heura
return currState.cost + self.heuristic(currState.agent_position, self.goal)
def getPositionAfterMove(self, currState: PathFinderState) -> GridLocation:
if currState.agent_direction == Direction.top:
return currState.agent_position[0], currState.agent_position[1] + 1
elif currState.agent_direction == Direction.down:
return currState.agent_position[0], currState.agent_position[1] - 1
elif currState.agent_direction == Direction.right:
return currState.agent_position[0] + 1, currState.agent_position[1]
elif currState.agent_direction == Direction.left:
return currState.agent_position[0] - 1, currState.agent_position[1]
def isMovePossible(self, currState: PathFinderState) -> bool:
positionAfterMove = self.getPositionAfterMove(currState)
if positionAfterMove in self.game_constants.walls:
return False
elif positionAfterMove[0] < 0 or positionAfterMove[0] > self.game_constants.grid_width:
return False
elif positionAfterMove[1] < 0 or positionAfterMove[1] > self.game_constants.grid_height:
return False
else:
return True
def createState(self, currState: PathFinderState, action: ActionType) -> PathFinderState:
cost = currState.cost + 1
last_action = action
action_taken: List[ActionType] = []
action_taken.extend(currState.action_taken)
action_taken.append(last_action)
agent_position = currState.agent_position
agent_direction = currState.agent_direction
if action == ActionType.ROTATE_UP:
agent_direction = Direction.top
elif action == ActionType.ROTATE_DOWN:
agent_direction = Direction.down
elif action == ActionType.ROTATE_LEFT:
agent_direction = Direction.left
elif action == ActionType.ROTATE_RIGHT:
agent_direction = Direction.right
elif action == ActionType.MOVE:
agent_position = self.getPositionAfterMove(currState)
return PathFinderState(agent_position, agent_direction, cost, last_action, action_taken)
def expansion(self, currState: PathFinderState) -> List[PathFinderState]:
# dla stanu sprawdzamy jakie akcje z tego miejsca mozemy podjac (ActionType)
# reprezentacja kazdego stanu co moge podjac z tego miejsca
# generowanie stanu
# sprawdz w ktorym kierunku obrocony
possibleNextStates: List[PathFinderState] = []
if currState.agent_direction == Direction.top:
possibleNextStates.append(self.createState(currState, ActionType.ROTATE_RIGHT))
possibleNextStates.append(self.createState(currState, ActionType.ROTATE_LEFT))
possibleNextStates.append(self.createState(currState, ActionType.ROTATE_DOWN))
if self.isMovePossible(currState):
possibleNextStates.append(self.createState(currState, ActionType.MOVE))
elif currState.agent_direction == Direction.down:
possibleNextStates.append(self.createState(currState, ActionType.ROTATE_RIGHT))
possibleNextStates.append(self.createState(currState, ActionType.ROTATE_LEFT))
possibleNextStates.append(self.createState(currState, ActionType.ROTATE_UP))
if self.isMovePossible(currState):
possibleNextStates.append(self.createState(currState, ActionType.MOVE))
elif currState.agent_direction == Direction.left:
possibleNextStates.append(self.createState(currState, ActionType.ROTATE_RIGHT))
possibleNextStates.append(self.createState(currState, ActionType.ROTATE_UP))
possibleNextStates.append(self.createState(currState, ActionType.ROTATE_DOWN))
if self.isMovePossible(currState):
possibleNextStates.append(self.createState(currState, ActionType.MOVE))
elif currState.agent_direction == Direction.right:
possibleNextStates.append(self.createState(currState, ActionType.ROTATE_UP))
possibleNextStates.append(self.createState(currState, ActionType.ROTATE_LEFT))
possibleNextStates.append(self.createState(currState, ActionType.ROTATE_DOWN))
if self.isMovePossible(currState):
possibleNextStates.append(self.createState(currState, ActionType.MOVE))
return possibleNextStates
def getActionList(self) -> List[ActionType]:
already_visited = {}
while not self.queue.empty():
item: PrioritizedItem = self.queue.get()
best_state: PathFinderState = item.item
if best_state.agent_position == self.goal:
break
# dodajesz do kolejki stany z expansion (po cost)
for state in self.expansion(best_state):
s_tuple = (state.agent_position[0], state.agent_position[1], state.agent_direction)
if s_tuple not in already_visited:
priority = self.evaluate(state)
self.queue.put(PrioritizedItem(priority, state), priority)
already_visited[s_tuple] = state
return best_state.action_taken
# do kosztu dokładam koszt starego stanu plus 1
# def a_star(self,stateTree:StateTree, start: Tuple[int, int], goal: Tuple[int, int]):
# frontier = PriorityQueue()
# frontier.put(start, 0)
# came_from = dict()
# cost_so_far = dict()
# came_from[start] = None
# cost_so_far[start] = 0
#
# while not frontier.empty():
# current = frontier.get()
#
# if current == goal:
# break
#
# for next in graph.neighbors(current):
# new_cost = cost_so_far[current] + graph.cost(current, next)
# if next not in cost_so_far or new_cost < cost_so_far[next]:
# cost_so_far[next] = new_cost
# priority = new_cost + heuristic(goal, next)
# frontier.put(next, priority)
# came_from[next] = current