From 271e3365f9e7406c446fee7600f1861cbbe4577d Mon Sep 17 00:00:00 2001 From: majkellll Date: Thu, 25 May 2023 18:18:11 +0200 Subject: [PATCH] A* gawor done --- bfs.py | 158 +++++++++++++++++++++++++++++++++------------------- city.py | 2 +- movement.py | 5 +- 3 files changed, 104 insertions(+), 61 deletions(-) diff --git a/bfs.py b/bfs.py index a559592..711ed8a 100644 --- a/bfs.py +++ b/bfs.py @@ -1,112 +1,155 @@ from agentState import AgentState -from typing import Dict, Tuple +from typing import Dict, Tuple, List from city import City from gridCellType import GridCellType from agentActionType import AgentActionType from agentOrientation import AgentOrientation -from queue import Queue +from queue import Queue, PriorityQueue from turnCar import turn_left_orientation, turn_right_orientation -class Succ: - state: AgentState - action: AgentActionType - ##cost: int - def __init__(self, state: AgentState, action: AgentActionType) -> None: +class Successor: + + def __init__(self, state: AgentState, action: AgentActionType, cost: int, predicted_cost: int) -> None: self.state = state self.action = action - ##self.cost = cost + self.cost = cost + self.predicted_cost = cost -def find_path_to_nearest_can(startState: AgentState, grid: Dict[Tuple[int, int], GridCellType]) -> list[AgentActionType]: - q: Queue[list[Succ]] = Queue() - visited: list[AgentState] = [] - startStates: list[Succ] = [Succ(startState, AgentActionType.UNKNOWN)] - q.put(startStates) - while not q.empty(): - currently_checked = q.get() - visited.append(currently_checked[-1].state) - if is_state_success(currently_checked[-1].state, grid): - return extract_actions(currently_checked) - successors = succ(currently_checked[-1].state) + +class SuccessorList: + succ_list: list[Successor] + + def __init__(self, succ_list: list[Successor]) -> None: + self.succ_list = succ_list + + def __gt__(self, other): + return self.succ_list[-1].predicted_cost > other.succ_list[-1].predicted_cost + + def __lt__(self, other): + return self.succ_list[-1].predicted_cost < other.succ_list[-1].predicted_cost + + +def find_path_to_nearest_can(startState: AgentState, grid: Dict[Tuple[int, int], GridCellType], city: City) -> List[ + AgentActionType]: + visited: List[AgentState] = [] + queue: PriorityQueue[SuccessorList] = PriorityQueue() + queue.put(SuccessorList([Successor(startState, AgentActionType.UNKNOWN, 0, _heuristics(startState.position, city))])) + + while not queue.empty(): + current = queue.get() + previous = current.succ_list[-1] + visited.append(previous.state) + + if is_state_success(previous.state, grid): + return extract_actions(current) + + successors = get_successors(previous, grid, city) for s in successors: already_visited = False for v in visited: - if v.position[0] == s.state.position[0] and v.position[1] == s.state.position[1] and s.state.orientation == v.orientation: + if v.position == s.state.position and v.orientation == s.state.orientation: already_visited = True break if already_visited: continue if is_state_valid(s.state, grid): - new_list = currently_checked.copy() + new_list = current.succ_list.copy() new_list.append(s) - q.put(new_list) + queue.put(SuccessorList(new_list)) + return [] - - -def extract_actions(successors: list[Succ]) -> list[AgentActionType]: + +def extract_actions(successors: SuccessorList) -> list[AgentActionType]: output: list[AgentActionType] = [] - for s in successors: + for s in successors.succ_list: if s.action != AgentActionType.UNKNOWN: output.append(s.action) return output -def succ(state: AgentState) -> list[Succ]: - result: list[Succ] = [] - result.append(Succ(AgentState(state.position, turn_left_orientation(state.orientation)), AgentActionType.TURN_LEFT)) - result.append(Succ(AgentState(state.position, turn_right_orientation(state.orientation)), AgentActionType.TURN_RIGHT)) - state_succ = move_forward_succ(state) - if state_succ != None: - result.append(move_forward_succ(state)) + +def get_successors(succ: Successor, grid: Dict[Tuple[int, int], GridCellType], city: City) -> List[Successor]: + result: List[Successor] = [] + + turn_left_cost = 1 + succ.cost + turn_left_state = AgentState(succ.state.position, turn_left_orientation(succ.state.orientation)) + turn_left_heuristics = _heuristics(succ.state.position, city) + result.append( + Successor(turn_left_state, AgentActionType.TURN_LEFT, turn_left_cost, turn_left_cost + turn_left_heuristics)) + + turn_right_cost = 1 + succ.cost + turn_right_state = AgentState(succ.state.position, turn_right_orientation(succ.state.orientation)) + turn_right_heuristics = _heuristics(succ.state.position, city) + result.append( + Successor(turn_right_state, AgentActionType.TURN_RIGHT, turn_right_cost, + turn_right_cost + turn_right_heuristics)) + + state_succ = move_forward_succ(succ, city, grid) + if state_succ is not None: + result.append(state_succ) + return result -def move_forward_succ(state: AgentState) -> Succ: - position = get_next_cell(state) - if position == None: + +def move_forward_succ(succ: Successor, city: City, grid: Dict[Tuple[int, int], GridCellType]) -> Successor: + position = get_next_cell(succ.state) + if position is None: return None - return Succ(AgentState(position, state.orientation), AgentActionType.MOVE_FORWARD) + + cost = get_cost_for_action(AgentActionType.MOVE_FORWARD, grid[position]) + succ.cost + predicted_cost = cost + _heuristics(position, city) + new_state = AgentState(position, succ.state.orientation) + return Successor(new_state, AgentActionType.MOVE_FORWARD, cost, predicted_cost) def get_next_cell(state: AgentState) -> Tuple[int, int]: - if state.orientation == AgentOrientation.UP: - if state.position[1] - 1 < 1: + x, y = state.position + orientation = state.orientation + + if orientation == AgentOrientation.UP: + if y - 1 < 1: return None - return (state.position[0], state.position[1] - 1) - if state.orientation == AgentOrientation.DOWN: - if state.position[1] + 1 > 27: + return x, y - 1 + elif orientation == AgentOrientation.DOWN: + if y + 1 > 27: return None - return (state.position[0], state.position[1] + 1) - if state.orientation == AgentOrientation.LEFT: - if state.position[0] - 1 < 1: + return x, y + 1 + elif orientation == AgentOrientation.LEFT: + if x - 1 < 1: return None - return (state.position[0] - 1, state.position[1]) - if state.position[0] + 1 > 27: + return x - 1, y + elif x + 1 > 27: return None - return (state.position[0] + 1, state.position[1]) + else: + return x + 1, y + def is_state_success(state: AgentState, grid: Dict[Tuple[int, int], GridCellType]) -> bool: next_cell = get_next_cell(state) try: return grid[next_cell] == GridCellType.GARBAGE_CAN - except: + except KeyError: return False + def get_cost_for_action(action: AgentActionType, cell_type: GridCellType) -> int: - if action == AgentActionType.TURN_LEFT or action == AgentActionType.TURN_RIGHT: + if action in [AgentActionType.TURN_LEFT, AgentActionType.TURN_RIGHT]: return 1 - if cell_type == GridCellType.SPEED_BUMP: - if action == AgentActionType.MOVE_FORWARD: - return 10 + if cell_type == GridCellType.SPEED_BUMP and action == AgentActionType.MOVE_FORWARD: + return 10 if action == AgentActionType.MOVE_FORWARD: return 3 def is_state_valid(state: AgentState, grid: Dict[Tuple[int, int], GridCellType]) -> bool: - try: - return grid[state.position] == GridCellType.STREET_HORIZONTAL or grid[state.position] == GridCellType.STREET_VERTICAL or grid[state.position] == GridCellType.SPEED_BUMP - except: + try: + return grid[state.position] == GridCellType.STREET_HORIZONTAL or grid[ + state.position] == GridCellType.STREET_VERTICAL or grid[state.position] == GridCellType.SPEED_BUMP + except KeyError: return False - + + def _heuristics(position: Tuple[int, int], city: City): min_distance: int = 300 found_nonvisited: bool = False @@ -120,4 +163,3 @@ def _heuristics(position: Tuple[int, int], city: City): if found_nonvisited: return min_distance return -1 - \ No newline at end of file diff --git a/city.py b/city.py index f53305c..68ae600 100644 --- a/city.py +++ b/city.py @@ -41,4 +41,4 @@ class City: def _render_bumps(self, game_context: GameContext) -> None: for bump in self.bumps: - bump.render(game_context) + bump.render(game_context) \ No newline at end of file diff --git a/movement.py b/movement.py index 308c64b..0060c0e 100644 --- a/movement.py +++ b/movement.py @@ -10,10 +10,11 @@ import pygame from bfs import find_path_to_nearest_can from agentState import AgentState + def collect_garbage(game_context: GameContext) -> None: while True: start_agent_state = AgentState(game_context.dust_car.position, game_context.dust_car.orientation) - path = find_path_to_nearest_can(start_agent_state, game_context.grid) + path = find_path_to_nearest_can(start_agent_state, game_context.grid, game_context.city) if path == None or len(path) == 0: break move_dust_car(path, game_context) @@ -22,6 +23,7 @@ def collect_garbage(game_context: GameContext) -> None: game_context.city.cans_dict[next_position].is_visited = True pass + def move_dust_car(actions: list[AgentActionType], game_context: GameContext) -> None: for action in actions: street_position = game_context.dust_car.position @@ -44,7 +46,6 @@ def move_dust_car(actions: list[AgentActionType], game_context: GameContext) -> pygame.display.update() time.sleep(0.15) - def calculate_next_position(car: GarbageTruck) -> Tuple[int, int]: if car.orientation == AgentOrientation.UP: