SI_InteligentnyWozekWidlowy/test/dlaMarcina.py
2022-04-28 00:29:29 +02:00

164 lines
7.2 KiB
Python

import queue
from typing import List
from typing import Optional, Dict, Tuple
from data.Direction import Direction
from data.GameConstants import GameConstants
from decision.ActionType import ActionType
from util.PathDefinitions import GridLocation
from util.PriorityQueue import PriorityQueue
from decision.StateTree import StateTree
class PathFinderState:
def __init__(self, agent_position: GridLocation, agent_direction: Direction, cost: float,
last_action: ActionType, action_taken: List[ActionType]):
super().__init__()
self.agent_position = agent_position
self.agent_direction = agent_direction
self.cost = cost
self.last_action = last_action
self.action_taken = action_taken
def getActionTaken(self):
return self.getActionTaken()
#
#
#
class PathFinderOnStates:
def __init__(self, game_constants: GameConstants, goal: GridLocation, root_state: PathFinderState):
super().__init__()
self.game_constants = game_constants
self.goal = goal
self.queue = PriorityQueue()
self.queue.put(root_state, root_state.cost)
def heuristic(self, a: Tuple[int, int], b: Tuple[int, int]) -> float:
# tutaj mozna uzyc heury np. manhatan distance (zmodyfikowany bo masz obroty a to zmienia oplacalnosc)
(x1, y1) = a
(x2, y2) = b
return abs(x1 - x2) + abs(y1 - y2)
def evaluate(self, currState: PathFinderState) -> float:
# koszt dojscia do danego stanu+ heura
return currState.cost + self.heuristic(currState.agent_position, self.goal)
def getPositionAfterMove(self, currState: PathFinderState) -> GridLocation:
result: GridLocation = None
if currState.agent_position == Direction.top:
currState.agent_position[1] += 1
result = currState.agent_position
elif currState.agent_position == Direction.down:
currState.agent_position[1] -= 1
result = currState.agent_position
elif currState.agent_position == Direction.right:
currState.agent_position[0] += 1
result = currState.agent_position
elif currState.agent_position == Direction.left:
currState.agent_position[0] -= 1
result = currState.agent_position
return result
def isMovePossible(self, currState: PathFinderState) -> bool:
positionAfterMove = self.getPositionAfterMove(currState)
if positionAfterMove in self.game_constants.walls:
return False
elif positionAfterMove[0] < 0 or positionAfterMove[0] > self.game_constants.grid_width:
return False
elif positionAfterMove[1] < 0 or positionAfterMove[1] > self.game_constants.grid_height:
return False
else:
return True
def createState(self, currState: PathFinderState, action: ActionType) -> PathFinderState:
cost = currState.cost + 1
last_action = action
action_taken: List[ActionType] = []
action_taken.extend(currState.action_taken)
agent_position = currState.agent_position
agent_direction = currState.agent_direction
if action == ActionType.ROTATE_UP:
agent_direction = Direction.top
elif action == ActionType.ROTATE_DOWN:
agent_direction = Direction.down
elif action == ActionType.ROTATE_LEFT:
agent_direction = Direction.left
elif action == ActionType.ROTATE_RIGHT:
agent_direction = Direction.right
elif action == ActionType.MOVE:
agent_position = self.getPositionAfterMove(currState)
return PathFinderState(agent_position, agent_direction, cost, last_action, action_taken)
def expansion(self, currState: PathFinderState) -> List[PathFinderState]:
# dla stanu sprawdzamy jakie akcje z tego miejsca mozemy podjac (ActionType)
# reprezentacja kazdego stanu co moge podjac z tego miejsca
# generowanie stanu
# sprawdz w ktorym kierunku obrocony
possibleNextStates: List[PathFinderState] = []
if currState.agent_direction == Direction.top:
possibleNextStates.append(self.createState(currState, ActionType.ROTATE_RIGHT))
possibleNextStates.append(self.createState(currState, ActionType.ROTATE_LEFT))
possibleNextStates.append(self.createState(currState, ActionType.ROTATE_DOWN))
if self.isMovePossible(currState):
possibleNextStates.append(self.createState(currState, ActionType.MOVE))
elif currState.agent_direction == Direction.down:
possibleNextStates.append(self.createState(currState, ActionType.ROTATE_RIGHT))
possibleNextStates.append(self.createState(currState, ActionType.ROTATE_LEFT))
possibleNextStates.append(self.createState(currState, ActionType.ROTATE_UP))
if self.isMovePossible(currState):
possibleNextStates.append(self.createState(currState, ActionType.MOVE))
elif currState.agent_direction == Direction.left:
possibleNextStates.append(self.createState(currState, ActionType.ROTATE_RIGHT))
possibleNextStates.append(self.createState(currState, ActionType.ROTATE_UP))
possibleNextStates.append(self.createState(currState, ActionType.ROTATE_DOWN))
if self.isMovePossible(currState):
possibleNextStates.append(self.createState(currState, ActionType.MOVE))
elif currState.agent_direction == Direction.right:
possibleNextStates.append(self.createState(currState, ActionType.ROTATE_UP))
possibleNextStates.append(self.createState(currState, ActionType.ROTATE_LEFT))
possibleNextStates.append(self.createState(currState, ActionType.ROTATE_DOWN))
if self.isMovePossible(currState):
possibleNextStates.append(self.createState(currState, ActionType.MOVE))
return possibleNextStates
def getActionList(self) -> List[ActionType]:
best_state: PathFinderState = self.queue.get()
while best_state.agent_position != self.goal and not self.queue.empty():
# dodajesz do kolejki stany z expansion (po cost)
for state in self.expansion(best_state):
self.queue.put(state, self.evaluate(state))
best_state = self.queue.get()
return best_state.getActionTaken()
# do kosztu dokładam koszt starego stanu plus 1
# def a_star(self,stateTree:StateTree, start: Tuple[int, int], goal: Tuple[int, int]):
# frontier = PriorityQueue()
# frontier.put(start, 0)
# came_from = dict()
# cost_so_far = dict()
# came_from[start] = None
# cost_so_far[start] = 0
#
# while not frontier.empty():
# current = frontier.get()
#
# if current == goal:
# break
#
# for next in graph.neighbors(current):
# new_cost = cost_so_far[current] + graph.cost(current, next)
# if next not in cost_so_far or new_cost < cost_so_far[next]:
# cost_so_far[next] = new_cost
# priority = new_cost + heuristic(goal, next)
# frontier.put(next, priority)
# came_from[next] = current