166 lines
5.6 KiB
Python
166 lines
5.6 KiB
Python
from agentState import AgentState
|
|
from typing import Dict, Tuple, List
|
|
from city import City
|
|
from gridCellType import GridCellType
|
|
from agentActionType import AgentActionType
|
|
from agentOrientation import AgentOrientation
|
|
from queue import Queue, PriorityQueue
|
|
from turnCar import turn_left_orientation, turn_right_orientation
|
|
|
|
|
|
class Successor:
|
|
|
|
def __init__(self, state: AgentState, action: AgentActionType, cost: int, predicted_cost: int) -> None:
|
|
self.state = state
|
|
self.action = action
|
|
self.cost = cost
|
|
self.predicted_cost = cost
|
|
|
|
|
|
class SuccessorList:
|
|
succ_list: list[Successor]
|
|
|
|
def __init__(self, succ_list: list[Successor]) -> None:
|
|
self.succ_list = succ_list
|
|
|
|
def __gt__(self, other):
|
|
return self.succ_list[-1].predicted_cost > other.succ_list[-1].predicted_cost
|
|
|
|
def __lt__(self, other):
|
|
return self.succ_list[-1].predicted_cost < other.succ_list[-1].predicted_cost
|
|
|
|
|
|
def find_path_to_nearest_can(startState: AgentState, grid: Dict[Tuple[int, int], GridCellType], city: City) -> List[
|
|
AgentActionType]:
|
|
visited: List[AgentState] = []
|
|
queue: PriorityQueue[SuccessorList] = PriorityQueue()
|
|
queue.put(SuccessorList([Successor(startState, AgentActionType.UNKNOWN, 0, _heuristics(startState.position, city))]))
|
|
|
|
while not queue.empty():
|
|
current = queue.get()
|
|
previous = current.succ_list[-1]
|
|
visited.append(previous.state)
|
|
|
|
if is_state_success(previous.state, grid):
|
|
return extract_actions(current)
|
|
|
|
successors = get_successors(previous, grid, city)
|
|
for s in successors:
|
|
already_visited = False
|
|
for v in visited:
|
|
if v.position == s.state.position and v.orientation == s.state.orientation:
|
|
already_visited = True
|
|
break
|
|
if already_visited:
|
|
continue
|
|
if is_state_valid(s.state, grid):
|
|
new_list = current.succ_list.copy()
|
|
new_list.append(s)
|
|
queue.put(SuccessorList(new_list))
|
|
|
|
return []
|
|
|
|
|
|
def extract_actions(successors: SuccessorList) -> list[AgentActionType]:
|
|
output: list[AgentActionType] = []
|
|
for s in successors.succ_list:
|
|
if s.action != AgentActionType.UNKNOWN:
|
|
output.append(s.action)
|
|
return output
|
|
|
|
|
|
def get_successors(succ: Successor, grid: Dict[Tuple[int, int], GridCellType], city: City) -> List[Successor]:
|
|
result: List[Successor] = []
|
|
|
|
turn_left_cost = 1 + succ.cost
|
|
turn_left_state = AgentState(succ.state.position, turn_left_orientation(succ.state.orientation))
|
|
turn_left_heuristics = _heuristics(succ.state.position, city)
|
|
result.append(
|
|
Successor(turn_left_state, AgentActionType.TURN_LEFT, turn_left_cost, turn_left_cost + turn_left_heuristics))
|
|
|
|
turn_right_cost = 1 + succ.cost
|
|
turn_right_state = AgentState(succ.state.position, turn_right_orientation(succ.state.orientation))
|
|
turn_right_heuristics = _heuristics(succ.state.position, city)
|
|
result.append(
|
|
Successor(turn_right_state, AgentActionType.TURN_RIGHT, turn_right_cost,
|
|
turn_right_cost + turn_right_heuristics))
|
|
|
|
state_succ = move_forward_succ(succ, city, grid)
|
|
if state_succ is not None:
|
|
result.append(state_succ)
|
|
|
|
return result
|
|
|
|
|
|
def move_forward_succ(succ: Successor, city: City, grid: Dict[Tuple[int, int], GridCellType]) -> Successor:
|
|
position = get_next_cell(succ.state)
|
|
if position is None:
|
|
return None
|
|
|
|
cost = get_cost_for_action(AgentActionType.MOVE_FORWARD, grid[position]) + succ.cost
|
|
predicted_cost = cost + _heuristics(position, city)
|
|
new_state = AgentState(position, succ.state.orientation)
|
|
return Successor(new_state, AgentActionType.MOVE_FORWARD, cost, predicted_cost)
|
|
|
|
|
|
def get_next_cell(state: AgentState) -> Tuple[int, int]:
|
|
x, y = state.position
|
|
orientation = state.orientation
|
|
|
|
if orientation == AgentOrientation.UP:
|
|
if y - 1 < 1:
|
|
return None
|
|
return x, y - 1
|
|
elif orientation == AgentOrientation.DOWN:
|
|
if y + 1 > 27:
|
|
return None
|
|
return x, y + 1
|
|
elif orientation == AgentOrientation.LEFT:
|
|
if x - 1 < 1:
|
|
return None
|
|
return x - 1, y
|
|
elif x + 1 > 27:
|
|
return None
|
|
else:
|
|
return x + 1, y
|
|
|
|
|
|
def is_state_success(state: AgentState, grid: Dict[Tuple[int, int], GridCellType]) -> bool:
|
|
next_cell = get_next_cell(state)
|
|
try:
|
|
return grid[next_cell] == GridCellType.GARBAGE_CAN
|
|
except KeyError:
|
|
return False
|
|
|
|
|
|
def get_cost_for_action(action: AgentActionType, cell_type: GridCellType) -> int:
|
|
if action in [AgentActionType.TURN_LEFT, AgentActionType.TURN_RIGHT]:
|
|
return 1
|
|
if cell_type == GridCellType.SPEED_BUMP and action == AgentActionType.MOVE_FORWARD:
|
|
return 10
|
|
if action == AgentActionType.MOVE_FORWARD:
|
|
return 3
|
|
|
|
|
|
def is_state_valid(state: AgentState, grid: Dict[Tuple[int, int], GridCellType]) -> bool:
|
|
try:
|
|
return grid[state.position] == GridCellType.STREET_HORIZONTAL or grid[
|
|
state.position] == GridCellType.STREET_VERTICAL or grid[state.position] == GridCellType.SPEED_BUMP
|
|
except KeyError:
|
|
return False
|
|
|
|
|
|
def _heuristics(position: Tuple[int, int], city: City):
|
|
min_distance: int = 300
|
|
found_nonvisited: bool = False
|
|
for can in city.cans:
|
|
if can.is_visited:
|
|
continue
|
|
found_nonvisited = True
|
|
distance = 3 * (abs(position[0] - can.position[0]) + abs(position[1] - can.position[1]))
|
|
if distance < min_distance:
|
|
min_distance = distance
|
|
if found_nonvisited:
|
|
return min_distance
|
|
return -1
|