From 5440626353e83c9d80f090f75c3556caf644da1c Mon Sep 17 00:00:00 2001 From: majkellll Date: Mon, 15 May 2023 08:07:08 +0200 Subject: [PATCH] =?UTF-8?q?dodany=20A*=20-=20co=C5=9B=20jeszcze=20nie=20dz?= =?UTF-8?q?ia=C5=82a?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bfs.py | 97 +++++++++++++++++++++++++++++++---------------------- city.py | 15 +++++---- main.py | 7 +++- movement.py | 17 +++++----- 4 files changed, 79 insertions(+), 57 deletions(-) diff --git a/bfs.py b/bfs.py index a559592..831c74a 100644 --- a/bfs.py +++ b/bfs.py @@ -1,49 +1,58 @@ from agentState import AgentState -from typing import Dict, Tuple +from typing import Dict, Tuple, List, Set from city import City from gridCellType import GridCellType from agentActionType import AgentActionType from agentOrientation import AgentOrientation -from queue import Queue +from queue import PriorityQueue from turnCar import turn_left_orientation, turn_right_orientation +import heapq + class Succ: state: AgentState action: AgentActionType - ##cost: int + cost: int - def __init__(self, state: AgentState, action: AgentActionType) -> None: + def __init__(self, state: AgentState, action: AgentActionType, cost: int) -> None: self.state = state self.action = action - ##self.cost = cost + self.cost = cost -def find_path_to_nearest_can(startState: AgentState, grid: Dict[Tuple[int, int], GridCellType]) -> list[AgentActionType]: - q: Queue[list[Succ]] = Queue() - visited: list[AgentState] = [] - startStates: list[Succ] = [Succ(startState, AgentActionType.UNKNOWN)] - q.put(startStates) - while not q.empty(): - currently_checked = q.get() - visited.append(currently_checked[-1].state) - if is_state_success(currently_checked[-1].state, grid): + +def find_path_to_nearest_can(startState: AgentState, grid: Dict[Tuple[int, int], GridCellType], city: City) -> list[AgentActionType]: + pq: PriorityQueue[Tuple[int, List[Succ]]] = PriorityQueue() + visited: set[AgentState] = set() + startStates: list[Succ] = [Succ(startState, AgentActionType.UNKNOWN, 0)] + pq.put((0, startStates)) + + while not pq.empty(): + _, currently_checked = pq.get() + last_state = currently_checked[-1].state + if last_state in visited: + continue + visited.add(last_state) + + if is_state_success(last_state, grid): return extract_actions(currently_checked) - successors = succ(currently_checked[-1].state) + + successors = succ(last_state) for s in successors: - already_visited = False - for v in visited: - if v.position[0] == s.state.position[0] and v.position[1] == s.state.position[1] and s.state.orientation == v.orientation: - already_visited = True - break - if already_visited: + if s.state in visited: continue - if is_state_valid(s.state, grid): - new_list = currently_checked.copy() - new_list.append(s) - q.put(new_list) + if not is_state_valid(s.state, grid): + continue + + g_cost = currently_checked[-1].cost + get_cost_for_action(s.action, grid.get(s.state.position, GridCellType.STREET_HORIZONTAL)) + h_cost = _heuristics(s.state.position, city) + f_cost = g_cost + h_cost + new_list = currently_checked.copy() + new_list.append(s) + pq.put((f_cost, new_list)) + return [] - - + def extract_actions(successors: list[Succ]) -> list[AgentActionType]: output: list[AgentActionType] = [] for s in successors: @@ -51,20 +60,23 @@ def extract_actions(successors: list[Succ]) -> list[AgentActionType]: output.append(s.action) return output + def succ(state: AgentState) -> list[Succ]: result: list[Succ] = [] - result.append(Succ(AgentState(state.position, turn_left_orientation(state.orientation)), AgentActionType.TURN_LEFT)) - result.append(Succ(AgentState(state.position, turn_right_orientation(state.orientation)), AgentActionType.TURN_RIGHT)) + result.append(Succ(AgentState(state.position, turn_left_orientation(state.orientation)), AgentActionType.TURN_LEFT, 0)) + result.append(Succ(AgentState(state.position, turn_right_orientation(state.orientation)), AgentActionType.TURN_RIGHT, 0)) state_succ = move_forward_succ(state) - if state_succ != None: - result.append(move_forward_succ(state)) + if state_succ is not None: + result.append(Succ(state_succ.state, AgentActionType.MOVE_FORWARD, state_succ.cost)) return result + def move_forward_succ(state: AgentState) -> Succ: position = get_next_cell(state) - if position == None: + if position is None: return None - return Succ(AgentState(position, state.orientation), AgentActionType.MOVE_FORWARD) + return Succ(AgentState(position, state.orientation), AgentActionType.MOVE_FORWARD, + get_cost_for_action(AgentActionType.MOVE_FORWARD, GridCellType.STREET_HORIZONTAL)) def get_next_cell(state: AgentState) -> Tuple[int, int]: @@ -84,13 +96,15 @@ def get_next_cell(state: AgentState) -> Tuple[int, int]: return None return (state.position[0] + 1, state.position[1]) + def is_state_success(state: AgentState, grid: Dict[Tuple[int, int], GridCellType]) -> bool: next_cell = get_next_cell(state) try: return grid[next_cell] == GridCellType.GARBAGE_CAN - except: + except KeyError: return False + def get_cost_for_action(action: AgentActionType, cell_type: GridCellType) -> int: if action == AgentActionType.TURN_LEFT or action == AgentActionType.TURN_RIGHT: return 1 @@ -102,12 +116,14 @@ def get_cost_for_action(action: AgentActionType, cell_type: GridCellType) -> int def is_state_valid(state: AgentState, grid: Dict[Tuple[int, int], GridCellType]) -> bool: - try: - return grid[state.position] == GridCellType.STREET_HORIZONTAL or grid[state.position] == GridCellType.STREET_VERTICAL or grid[state.position] == GridCellType.SPEED_BUMP - except: + try: + return grid[state.position] == GridCellType.STREET_HORIZONTAL or grid[ + state.position] == GridCellType.STREET_VERTICAL or grid[state.position] == GridCellType.SPEED_BUMP + except KeyError: return False - -def _heuristics(position: Tuple[int, int], city: City): + + +def _heuristics(position: Tuple[int, int], city: City) -> int: min_distance: int = 300 found_nonvisited: bool = False for can in city.cans: @@ -120,4 +136,5 @@ def _heuristics(position: Tuple[int, int], city: City): if found_nonvisited: return min_distance return -1 - \ No newline at end of file + + diff --git a/city.py b/city.py index 431f582..01b8649 100644 --- a/city.py +++ b/city.py @@ -2,8 +2,9 @@ from typing import List, Dict, Tuple from garbageCan import GarbageCan from speedBump import SpeedBump from street import Street -from gameContext import GameContext - +from gameContext import GameContext + + class City: cans: List[GarbageCan] bumps: List[SpeedBump] @@ -18,10 +19,10 @@ class City: def add_can(self, can: GarbageCan) -> None: self.nodes.append(can) self.cans_dict[can.position] = can - + def add_street(self, street: Street) -> None: self.streets.append(street) - + def add_bump(self, bump: SpeedBump) -> None: self.streets.append(bump) @@ -33,11 +34,11 @@ class City: def _render_streets(self, game_context: GameContext) -> None: for street in self.streets: street.render(game_context) - + def _render_nodes(self, game_context: GameContext) -> None: for node in self.nodes: node.render(game_context) - + def _render_bumps(self, game_context: GameContext) -> None: for bump in self.bumps: - bump.render(game_context) \ No newline at end of file + bump.render(game_context) diff --git a/main.py b/main.py index e1dafa0..721211c 100644 --- a/main.py +++ b/main.py @@ -1,4 +1,6 @@ import pygame + +from city import City from gameEventHandler import handle_game_event from gameContext import GameContext from startup import startup @@ -17,8 +19,11 @@ game_context = GameContext() game_context.dust_car_pil = dust_car_pil game_context.dust_car_pygame = pygame.image.frombuffer(dust_car_pil.tobytes(), dust_car_pil.size, 'RGB') game_context.canvas = canvas + +city = City() + startup(game_context) -collect_garbage(game_context) +collect_garbage(game_context, city) exit = False diff --git a/movement.py b/movement.py index 308c64b..fb241f2 100644 --- a/movement.py +++ b/movement.py @@ -9,19 +9,21 @@ from agentOrientation import AgentOrientation import pygame from bfs import find_path_to_nearest_can from agentState import AgentState +from city import City -def collect_garbage(game_context: GameContext) -> None: + +def collect_garbage(game_context: GameContext, city: City) -> None: while True: start_agent_state = AgentState(game_context.dust_car.position, game_context.dust_car.orientation) - path = find_path_to_nearest_can(start_agent_state, game_context.grid) - if path == None or len(path) == 0: + path = find_path_to_nearest_can(start_agent_state, game_context.grid, city) + if path is None or len(path) == 0: break move_dust_car(path, game_context) next_position = calculate_next_position(game_context.dust_car) game_context.grid[next_position] = GridCellType.VISITED_GARBAGE_CAN - game_context.city.cans_dict[next_position].is_visited = True pass + def move_dust_car(actions: list[AgentActionType], game_context: GameContext) -> None: for action in actions: street_position = game_context.dust_car.position @@ -39,12 +41,9 @@ def move_dust_car(actions: list[AgentActionType], game_context: GameContext) -> game_context.render_in_cell(street_position, "imgs/street_horizontal.png") elif game_context.grid[street_position] == GridCellType.STREET_VERTICAL: game_context.render_in_cell(street_position, "imgs/street_vertical.png") - elif game_context.grid[street_position] == GridCellType.SPEED_BUMP: - game_context.render_in_cell(street_position, "imgs/speed_bump.png") pygame.display.update() - time.sleep(0.15) + time.sleep(0.5) - def calculate_next_position(car: GarbageTruck) -> Tuple[int, int]: if car.orientation == AgentOrientation.UP: @@ -61,4 +60,4 @@ def calculate_next_position(car: GarbageTruck) -> Tuple[int, int]: return (car.position[0] - 1, car.position[1]) if car.position[0] + 1 > 27: return None - return (car.position[0] + 1, car.position[1]) \ No newline at end of file + return (car.position[0] + 1, car.position[1])