From 5440626353e83c9d80f090f75c3556caf644da1c Mon Sep 17 00:00:00 2001 From: majkellll Date: Mon, 15 May 2023 08:07:08 +0200 Subject: [PATCH 1/5] =?UTF-8?q?dodany=20A*=20-=20co=C5=9B=20jeszcze=20nie?= =?UTF-8?q?=20dzia=C5=82a?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bfs.py | 97 +++++++++++++++++++++++++++++++---------------------- city.py | 15 +++++---- main.py | 7 +++- movement.py | 17 +++++----- 4 files changed, 79 insertions(+), 57 deletions(-) diff --git a/bfs.py b/bfs.py index a559592..831c74a 100644 --- a/bfs.py +++ b/bfs.py @@ -1,49 +1,58 @@ from agentState import AgentState -from typing import Dict, Tuple +from typing import Dict, Tuple, List, Set from city import City from gridCellType import GridCellType from agentActionType import AgentActionType from agentOrientation import AgentOrientation -from queue import Queue +from queue import PriorityQueue from turnCar import turn_left_orientation, turn_right_orientation +import heapq + class Succ: state: AgentState action: AgentActionType - ##cost: int + cost: int - def __init__(self, state: AgentState, action: AgentActionType) -> None: + def __init__(self, state: AgentState, action: AgentActionType, cost: int) -> None: self.state = state self.action = action - ##self.cost = cost + self.cost = cost -def find_path_to_nearest_can(startState: AgentState, grid: Dict[Tuple[int, int], GridCellType]) -> list[AgentActionType]: - q: Queue[list[Succ]] = Queue() - visited: list[AgentState] = [] - startStates: list[Succ] = [Succ(startState, AgentActionType.UNKNOWN)] - q.put(startStates) - while not q.empty(): - currently_checked = q.get() - visited.append(currently_checked[-1].state) - if is_state_success(currently_checked[-1].state, grid): + +def find_path_to_nearest_can(startState: AgentState, grid: Dict[Tuple[int, int], GridCellType], city: City) -> list[AgentActionType]: + pq: PriorityQueue[Tuple[int, List[Succ]]] = PriorityQueue() + visited: set[AgentState] = set() + startStates: list[Succ] = [Succ(startState, AgentActionType.UNKNOWN, 0)] + pq.put((0, startStates)) + + while not pq.empty(): + _, currently_checked = pq.get() + last_state = currently_checked[-1].state + if last_state in visited: + continue + visited.add(last_state) + + if is_state_success(last_state, grid): return extract_actions(currently_checked) - successors = succ(currently_checked[-1].state) + + successors = succ(last_state) for s in successors: - already_visited = False - for v in visited: - if v.position[0] == s.state.position[0] and v.position[1] == s.state.position[1] and s.state.orientation == v.orientation: - already_visited = True - break - if already_visited: + if s.state in visited: continue - if is_state_valid(s.state, grid): - new_list = currently_checked.copy() - new_list.append(s) - q.put(new_list) + if not is_state_valid(s.state, grid): + continue + + g_cost = currently_checked[-1].cost + get_cost_for_action(s.action, grid.get(s.state.position, GridCellType.STREET_HORIZONTAL)) + h_cost = _heuristics(s.state.position, city) + f_cost = g_cost + h_cost + new_list = currently_checked.copy() + new_list.append(s) + pq.put((f_cost, new_list)) + return [] - - + def extract_actions(successors: list[Succ]) -> list[AgentActionType]: output: list[AgentActionType] = [] for s in successors: @@ -51,20 +60,23 @@ def extract_actions(successors: list[Succ]) -> list[AgentActionType]: output.append(s.action) return output + def succ(state: AgentState) -> list[Succ]: result: list[Succ] = [] - result.append(Succ(AgentState(state.position, turn_left_orientation(state.orientation)), AgentActionType.TURN_LEFT)) - result.append(Succ(AgentState(state.position, turn_right_orientation(state.orientation)), AgentActionType.TURN_RIGHT)) + result.append(Succ(AgentState(state.position, turn_left_orientation(state.orientation)), AgentActionType.TURN_LEFT, 0)) + result.append(Succ(AgentState(state.position, turn_right_orientation(state.orientation)), AgentActionType.TURN_RIGHT, 0)) state_succ = move_forward_succ(state) - if state_succ != None: - result.append(move_forward_succ(state)) + if state_succ is not None: + result.append(Succ(state_succ.state, AgentActionType.MOVE_FORWARD, state_succ.cost)) return result + def move_forward_succ(state: AgentState) -> Succ: position = get_next_cell(state) - if position == None: + if position is None: return None - return Succ(AgentState(position, state.orientation), AgentActionType.MOVE_FORWARD) + return Succ(AgentState(position, state.orientation), AgentActionType.MOVE_FORWARD, + get_cost_for_action(AgentActionType.MOVE_FORWARD, GridCellType.STREET_HORIZONTAL)) def get_next_cell(state: AgentState) -> Tuple[int, int]: @@ -84,13 +96,15 @@ def get_next_cell(state: AgentState) -> Tuple[int, int]: return None return (state.position[0] + 1, state.position[1]) + def is_state_success(state: AgentState, grid: Dict[Tuple[int, int], GridCellType]) -> bool: next_cell = get_next_cell(state) try: return grid[next_cell] == GridCellType.GARBAGE_CAN - except: + except KeyError: return False + def get_cost_for_action(action: AgentActionType, cell_type: GridCellType) -> int: if action == AgentActionType.TURN_LEFT or action == AgentActionType.TURN_RIGHT: return 1 @@ -102,12 +116,14 @@ def get_cost_for_action(action: AgentActionType, cell_type: GridCellType) -> int def is_state_valid(state: AgentState, grid: Dict[Tuple[int, int], GridCellType]) -> bool: - try: - return grid[state.position] == GridCellType.STREET_HORIZONTAL or grid[state.position] == GridCellType.STREET_VERTICAL or grid[state.position] == GridCellType.SPEED_BUMP - except: + try: + return grid[state.position] == GridCellType.STREET_HORIZONTAL or grid[ + state.position] == GridCellType.STREET_VERTICAL or grid[state.position] == GridCellType.SPEED_BUMP + except KeyError: return False - -def _heuristics(position: Tuple[int, int], city: City): + + +def _heuristics(position: Tuple[int, int], city: City) -> int: min_distance: int = 300 found_nonvisited: bool = False for can in city.cans: @@ -120,4 +136,5 @@ def _heuristics(position: Tuple[int, int], city: City): if found_nonvisited: return min_distance return -1 - \ No newline at end of file + + diff --git a/city.py b/city.py index 431f582..01b8649 100644 --- a/city.py +++ b/city.py @@ -2,8 +2,9 @@ from typing import List, Dict, Tuple from garbageCan import GarbageCan from speedBump import SpeedBump from street import Street -from gameContext import GameContext - +from gameContext import GameContext + + class City: cans: List[GarbageCan] bumps: List[SpeedBump] @@ -18,10 +19,10 @@ class City: def add_can(self, can: GarbageCan) -> None: self.nodes.append(can) self.cans_dict[can.position] = can - + def add_street(self, street: Street) -> None: self.streets.append(street) - + def add_bump(self, bump: SpeedBump) -> None: self.streets.append(bump) @@ -33,11 +34,11 @@ class City: def _render_streets(self, game_context: GameContext) -> None: for street in self.streets: street.render(game_context) - + def _render_nodes(self, game_context: GameContext) -> None: for node in self.nodes: node.render(game_context) - + def _render_bumps(self, game_context: GameContext) -> None: for bump in self.bumps: - bump.render(game_context) \ No newline at end of file + bump.render(game_context) diff --git a/main.py b/main.py index e1dafa0..721211c 100644 --- a/main.py +++ b/main.py @@ -1,4 +1,6 @@ import pygame + +from city import City from gameEventHandler import handle_game_event from gameContext import GameContext from startup import startup @@ -17,8 +19,11 @@ game_context = GameContext() game_context.dust_car_pil = dust_car_pil game_context.dust_car_pygame = pygame.image.frombuffer(dust_car_pil.tobytes(), dust_car_pil.size, 'RGB') game_context.canvas = canvas + +city = City() + startup(game_context) -collect_garbage(game_context) +collect_garbage(game_context, city) exit = False diff --git a/movement.py b/movement.py index 308c64b..fb241f2 100644 --- a/movement.py +++ b/movement.py @@ -9,19 +9,21 @@ from agentOrientation import AgentOrientation import pygame from bfs import find_path_to_nearest_can from agentState import AgentState +from city import City -def collect_garbage(game_context: GameContext) -> None: + +def collect_garbage(game_context: GameContext, city: City) -> None: while True: start_agent_state = AgentState(game_context.dust_car.position, game_context.dust_car.orientation) - path = find_path_to_nearest_can(start_agent_state, game_context.grid) - if path == None or len(path) == 0: + path = find_path_to_nearest_can(start_agent_state, game_context.grid, city) + if path is None or len(path) == 0: break move_dust_car(path, game_context) next_position = calculate_next_position(game_context.dust_car) game_context.grid[next_position] = GridCellType.VISITED_GARBAGE_CAN - game_context.city.cans_dict[next_position].is_visited = True pass + def move_dust_car(actions: list[AgentActionType], game_context: GameContext) -> None: for action in actions: street_position = game_context.dust_car.position @@ -39,12 +41,9 @@ def move_dust_car(actions: list[AgentActionType], game_context: GameContext) -> game_context.render_in_cell(street_position, "imgs/street_horizontal.png") elif game_context.grid[street_position] == GridCellType.STREET_VERTICAL: game_context.render_in_cell(street_position, "imgs/street_vertical.png") - elif game_context.grid[street_position] == GridCellType.SPEED_BUMP: - game_context.render_in_cell(street_position, "imgs/speed_bump.png") pygame.display.update() - time.sleep(0.15) + time.sleep(0.5) - def calculate_next_position(car: GarbageTruck) -> Tuple[int, int]: if car.orientation == AgentOrientation.UP: @@ -61,4 +60,4 @@ def calculate_next_position(car: GarbageTruck) -> Tuple[int, int]: return (car.position[0] - 1, car.position[1]) if car.position[0] + 1 > 27: return None - return (car.position[0] + 1, car.position[1]) \ No newline at end of file + return (car.position[0] + 1, car.position[1]) -- 2.20.1 From 311a2d075793bb5750f5e6e7e3aeb97eac3163a0 Mon Sep 17 00:00:00 2001 From: Pawel Felcyn Date: Mon, 15 May 2023 10:59:30 +0200 Subject: [PATCH 2/5] =?UTF-8?q?astar=20for=20MIko=C5=82aj=20Gawor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bfs.py | 62 ++++++++++++++++++++++++++++++++++------------------- city.py | 6 +++--- movement.py | 2 +- 3 files changed, 44 insertions(+), 26 deletions(-) diff --git a/bfs.py b/bfs.py index a559592..92556dd 100644 --- a/bfs.py +++ b/bfs.py @@ -4,30 +4,44 @@ from city import City from gridCellType import GridCellType from agentActionType import AgentActionType from agentOrientation import AgentOrientation -from queue import Queue +from queue import Queue, PriorityQueue from turnCar import turn_left_orientation, turn_right_orientation class Succ: state: AgentState action: AgentActionType - ##cost: int + cost: int + predicted_cost: int - def __init__(self, state: AgentState, action: AgentActionType) -> None: + def __init__(self, state: AgentState, action: AgentActionType, cost: int, predicted_cost: int) -> None: self.state = state self.action = action - ##self.cost = cost + self.cost = cost + self.predicted_cost = cost -def find_path_to_nearest_can(startState: AgentState, grid: Dict[Tuple[int, int], GridCellType]) -> list[AgentActionType]: - q: Queue[list[Succ]] = Queue() +class SuccList: + succ_list: list[Succ] + + def __init__(self, succ_list: list[Succ]) -> None: + self.succ_list = succ_list + + def __lt__(self, other): + return self.succ_list[-1].predicted_cost < other.succ_list[-1].predicted_cost + + def __gt__(self, other): + return self.succ_list[-1].predicted_cost > other.succ_list[-1].predicted_cost + +def find_path_to_nearest_can(startState: AgentState, grid: Dict[Tuple[int, int], GridCellType], city: City) -> list[AgentActionType]: + q: PriorityQueue[SuccList] = PriorityQueue() visited: list[AgentState] = [] - startStates: list[Succ] = [Succ(startState, AgentActionType.UNKNOWN)] + startStates: SuccList = SuccList([Succ(startState, AgentActionType.UNKNOWN, 0, _heuristics(startState.position, city))]) q.put(startStates) while not q.empty(): currently_checked = q.get() - visited.append(currently_checked[-1].state) - if is_state_success(currently_checked[-1].state, grid): + visited.append(currently_checked.succ_list[-1].state) + if is_state_success(currently_checked.succ_list[-1].state, grid): return extract_actions(currently_checked) - successors = succ(currently_checked[-1].state) + successors = succ(currently_checked.succ_list[-1], grid, city) for s in successors: already_visited = False for v in visited: @@ -37,34 +51,38 @@ def find_path_to_nearest_can(startState: AgentState, grid: Dict[Tuple[int, int], if already_visited: continue if is_state_valid(s.state, grid): - new_list = currently_checked.copy() + new_list = currently_checked.succ_list.copy() new_list.append(s) - q.put(new_list) + q.put(SuccList(new_list)) return [] -def extract_actions(successors: list[Succ]) -> list[AgentActionType]: +def extract_actions(successors: SuccList) -> list[AgentActionType]: output: list[AgentActionType] = [] - for s in successors: + for s in successors.succ_list: if s.action != AgentActionType.UNKNOWN: output.append(s.action) return output -def succ(state: AgentState) -> list[Succ]: +def succ(succ: Succ, grid: Dict[Tuple[int, int], GridCellType], city: City) -> list[Succ]: result: list[Succ] = [] - result.append(Succ(AgentState(state.position, turn_left_orientation(state.orientation)), AgentActionType.TURN_LEFT)) - result.append(Succ(AgentState(state.position, turn_right_orientation(state.orientation)), AgentActionType.TURN_RIGHT)) - state_succ = move_forward_succ(state) + turn_left_cost = 1 + succ.cost + result.append(Succ(AgentState(succ.state.position, turn_left_orientation(succ.state.orientation)), AgentActionType.TURN_LEFT, turn_left_cost, turn_left_cost + _heuristics(succ.state.position, city))) + turn_right_cost = 1 + succ.cost + result.append(Succ(AgentState(succ.state.position, turn_right_orientation(succ.state.orientation)), AgentActionType.TURN_RIGHT, turn_right_cost, turn_right_cost + _heuristics(succ.state.position, city))) + state_succ = move_forward_succ(succ, city, grid) if state_succ != None: - result.append(move_forward_succ(state)) + result.append(state_succ) return result -def move_forward_succ(state: AgentState) -> Succ: - position = get_next_cell(state) +def move_forward_succ(succ: Succ, city: City, grid: Dict[Tuple[int, int], GridCellType]) -> Succ: + position = get_next_cell(succ.state) if position == None: return None - return Succ(AgentState(position, state.orientation), AgentActionType.MOVE_FORWARD) + + cost = get_cost_for_action(AgentActionType.MOVE_FORWARD, grid[position]) + succ.cost + return Succ(AgentState(position, succ.state.orientation), AgentActionType.MOVE_FORWARD, cost, cost + _heuristics(position, city)) def get_next_cell(state: AgentState) -> Tuple[int, int]: diff --git a/city.py b/city.py index 431f582..004e987 100644 --- a/city.py +++ b/city.py @@ -11,12 +11,12 @@ class City: cans_dict: Dict[Tuple[int, int], GarbageCan] = {} def __init__(self) -> None: - self.nodes = [] + self.cans = [] self.streets = [] self.bumps = [] def add_can(self, can: GarbageCan) -> None: - self.nodes.append(can) + self.cans.append(can) self.cans_dict[can.position] = can def add_street(self, street: Street) -> None: @@ -35,7 +35,7 @@ class City: street.render(game_context) def _render_nodes(self, game_context: GameContext) -> None: - for node in self.nodes: + for node in self.cans: node.render(game_context) def _render_bumps(self, game_context: GameContext) -> None: diff --git a/movement.py b/movement.py index 308c64b..a9990e7 100644 --- a/movement.py +++ b/movement.py @@ -13,7 +13,7 @@ from agentState import AgentState def collect_garbage(game_context: GameContext) -> None: while True: start_agent_state = AgentState(game_context.dust_car.position, game_context.dust_car.orientation) - path = find_path_to_nearest_can(start_agent_state, game_context.grid) + path = find_path_to_nearest_can(start_agent_state, game_context.grid, game_context.city) if path == None or len(path) == 0: break move_dust_car(path, game_context) -- 2.20.1 From af32df4474e3b854ae30fee22f0abf6cdfc70889 Mon Sep 17 00:00:00 2001 From: majkellll Date: Mon, 15 May 2023 08:07:08 +0200 Subject: [PATCH 3/5] =?UTF-8?q?dodany=20A*=20-=20co=C5=9B=20jeszcze=20nie?= =?UTF-8?q?=20dzia=C5=82a?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- city.py | 15 ++++++++------- main.py | 7 ++++++- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/city.py b/city.py index 004e987..f53305c 100644 --- a/city.py +++ b/city.py @@ -2,8 +2,9 @@ from typing import List, Dict, Tuple from garbageCan import GarbageCan from speedBump import SpeedBump from street import Street -from gameContext import GameContext - +from gameContext import GameContext + + class City: cans: List[GarbageCan] bumps: List[SpeedBump] @@ -18,10 +19,10 @@ class City: def add_can(self, can: GarbageCan) -> None: self.cans.append(can) self.cans_dict[can.position] = can - + def add_street(self, street: Street) -> None: self.streets.append(street) - + def add_bump(self, bump: SpeedBump) -> None: self.streets.append(bump) @@ -33,11 +34,11 @@ class City: def _render_streets(self, game_context: GameContext) -> None: for street in self.streets: street.render(game_context) - + def _render_nodes(self, game_context: GameContext) -> None: for node in self.cans: node.render(game_context) - + def _render_bumps(self, game_context: GameContext) -> None: for bump in self.bumps: - bump.render(game_context) \ No newline at end of file + bump.render(game_context) diff --git a/main.py b/main.py index e1dafa0..721211c 100644 --- a/main.py +++ b/main.py @@ -1,4 +1,6 @@ import pygame + +from city import City from gameEventHandler import handle_game_event from gameContext import GameContext from startup import startup @@ -17,8 +19,11 @@ game_context = GameContext() game_context.dust_car_pil = dust_car_pil game_context.dust_car_pygame = pygame.image.frombuffer(dust_car_pil.tobytes(), dust_car_pil.size, 'RGB') game_context.canvas = canvas + +city = City() + startup(game_context) -collect_garbage(game_context) +collect_garbage(game_context, city) exit = False -- 2.20.1 From 15638862d363f98f5564c55130c56556551fe8d6 Mon Sep 17 00:00:00 2001 From: majkellll Date: Mon, 15 May 2023 11:49:58 +0200 Subject: [PATCH 4/5] A* working ok --- agentState.py | 2 +- bfs.py | 45 ++++++++++++++++++++++++++++++--------------- city.py | 2 +- main.py | 4 +--- movement.py | 3 ++- 5 files changed, 35 insertions(+), 21 deletions(-) diff --git a/agentState.py b/agentState.py index 1ac708d..ace580e 100644 --- a/agentState.py +++ b/agentState.py @@ -7,4 +7,4 @@ class AgentState: def __init__(self, position: Tuple[int, int], orientation: AgentOrientation) -> None: self.orientation = orientation - self.position = position \ No newline at end of file + self.position = position diff --git a/bfs.py b/bfs.py index 92556dd..02fd4a0 100644 --- a/bfs.py +++ b/bfs.py @@ -7,6 +7,7 @@ from agentOrientation import AgentOrientation from queue import Queue, PriorityQueue from turnCar import turn_left_orientation, turn_right_orientation + class Succ: state: AgentState action: AgentActionType @@ -19,6 +20,7 @@ class Succ: self.cost = cost self.predicted_cost = cost + class SuccList: succ_list: list[Succ] @@ -27,14 +29,17 @@ class SuccList: def __lt__(self, other): return self.succ_list[-1].predicted_cost < other.succ_list[-1].predicted_cost - + def __gt__(self, other): return self.succ_list[-1].predicted_cost > other.succ_list[-1].predicted_cost -def find_path_to_nearest_can(startState: AgentState, grid: Dict[Tuple[int, int], GridCellType], city: City) -> list[AgentActionType]: + +def find_path_to_nearest_can(startState: AgentState, grid: Dict[Tuple[int, int], GridCellType], city: City) -> list[ + AgentActionType]: q: PriorityQueue[SuccList] = PriorityQueue() visited: list[AgentState] = [] - startStates: SuccList = SuccList([Succ(startState, AgentActionType.UNKNOWN, 0, _heuristics(startState.position, city))]) + startStates: SuccList = SuccList( + [Succ(startState, AgentActionType.UNKNOWN, 0, _heuristics(startState.position, city))]) q.put(startStates) while not q.empty(): currently_checked = q.get() @@ -45,7 +50,8 @@ def find_path_to_nearest_can(startState: AgentState, grid: Dict[Tuple[int, int], for s in successors: already_visited = False for v in visited: - if v.position[0] == s.state.position[0] and v.position[1] == s.state.position[1] and s.state.orientation == v.orientation: + if v.position[0] == s.state.position[0] and v.position[1] == s.state.position[ + 1] and s.state.orientation == v.orientation: already_visited = True break if already_visited: @@ -56,8 +62,7 @@ def find_path_to_nearest_can(startState: AgentState, grid: Dict[Tuple[int, int], q.put(SuccList(new_list)) return [] - - + def extract_actions(successors: SuccList) -> list[AgentActionType]: output: list[AgentActionType] = [] for s in successors.succ_list: @@ -65,24 +70,31 @@ def extract_actions(successors: SuccList) -> list[AgentActionType]: output.append(s.action) return output + def succ(succ: Succ, grid: Dict[Tuple[int, int], GridCellType], city: City) -> list[Succ]: result: list[Succ] = [] turn_left_cost = 1 + succ.cost - result.append(Succ(AgentState(succ.state.position, turn_left_orientation(succ.state.orientation)), AgentActionType.TURN_LEFT, turn_left_cost, turn_left_cost + _heuristics(succ.state.position, city))) + result.append( + Succ(AgentState(succ.state.position, turn_left_orientation(succ.state.orientation)), AgentActionType.TURN_LEFT, + turn_left_cost, turn_left_cost + _heuristics(succ.state.position, city))) turn_right_cost = 1 + succ.cost - result.append(Succ(AgentState(succ.state.position, turn_right_orientation(succ.state.orientation)), AgentActionType.TURN_RIGHT, turn_right_cost, turn_right_cost + _heuristics(succ.state.position, city))) + result.append(Succ(AgentState(succ.state.position, turn_right_orientation(succ.state.orientation)), + AgentActionType.TURN_RIGHT, turn_right_cost, + turn_right_cost + _heuristics(succ.state.position, city))) state_succ = move_forward_succ(succ, city, grid) if state_succ != None: result.append(state_succ) return result + def move_forward_succ(succ: Succ, city: City, grid: Dict[Tuple[int, int], GridCellType]) -> Succ: position = get_next_cell(succ.state) if position == None: return None - + cost = get_cost_for_action(AgentActionType.MOVE_FORWARD, grid[position]) + succ.cost - return Succ(AgentState(position, succ.state.orientation), AgentActionType.MOVE_FORWARD, cost, cost + _heuristics(position, city)) + return Succ(AgentState(position, succ.state.orientation), AgentActionType.MOVE_FORWARD, cost, + cost + _heuristics(position, city)) def get_next_cell(state: AgentState) -> Tuple[int, int]: @@ -102,6 +114,7 @@ def get_next_cell(state: AgentState) -> Tuple[int, int]: return None return (state.position[0] + 1, state.position[1]) + def is_state_success(state: AgentState, grid: Dict[Tuple[int, int], GridCellType]) -> bool: next_cell = get_next_cell(state) try: @@ -109,6 +122,7 @@ def is_state_success(state: AgentState, grid: Dict[Tuple[int, int], GridCellType except: return False + def get_cost_for_action(action: AgentActionType, cell_type: GridCellType) -> int: if action == AgentActionType.TURN_LEFT or action == AgentActionType.TURN_RIGHT: return 1 @@ -120,11 +134,13 @@ def get_cost_for_action(action: AgentActionType, cell_type: GridCellType) -> int def is_state_valid(state: AgentState, grid: Dict[Tuple[int, int], GridCellType]) -> bool: - try: - return grid[state.position] == GridCellType.STREET_HORIZONTAL or grid[state.position] == GridCellType.STREET_VERTICAL or grid[state.position] == GridCellType.SPEED_BUMP + try: + return grid[state.position] == GridCellType.STREET_HORIZONTAL or grid[ + state.position] == GridCellType.STREET_VERTICAL or grid[state.position] == GridCellType.SPEED_BUMP except: return False - + + def _heuristics(position: Tuple[int, int], city: City): min_distance: int = 300 found_nonvisited: bool = False @@ -137,5 +153,4 @@ def _heuristics(position: Tuple[int, int], city: City): min_distance = distance if found_nonvisited: return min_distance - return -1 - \ No newline at end of file + return -1 \ No newline at end of file diff --git a/city.py b/city.py index f53305c..68ae600 100644 --- a/city.py +++ b/city.py @@ -41,4 +41,4 @@ class City: def _render_bumps(self, game_context: GameContext) -> None: for bump in self.bumps: - bump.render(game_context) + bump.render(game_context) \ No newline at end of file diff --git a/main.py b/main.py index 721211c..deb76bc 100644 --- a/main.py +++ b/main.py @@ -20,10 +20,8 @@ game_context.dust_car_pil = dust_car_pil game_context.dust_car_pygame = pygame.image.frombuffer(dust_car_pil.tobytes(), dust_car_pil.size, 'RGB') game_context.canvas = canvas -city = City() - startup(game_context) -collect_garbage(game_context, city) +collect_garbage(game_context) exit = False diff --git a/movement.py b/movement.py index a9990e7..0060c0e 100644 --- a/movement.py +++ b/movement.py @@ -10,6 +10,7 @@ import pygame from bfs import find_path_to_nearest_can from agentState import AgentState + def collect_garbage(game_context: GameContext) -> None: while True: start_agent_state = AgentState(game_context.dust_car.position, game_context.dust_car.orientation) @@ -22,6 +23,7 @@ def collect_garbage(game_context: GameContext) -> None: game_context.city.cans_dict[next_position].is_visited = True pass + def move_dust_car(actions: list[AgentActionType], game_context: GameContext) -> None: for action in actions: street_position = game_context.dust_car.position @@ -44,7 +46,6 @@ def move_dust_car(actions: list[AgentActionType], game_context: GameContext) -> pygame.display.update() time.sleep(0.15) - def calculate_next_position(car: GarbageTruck) -> Tuple[int, int]: if car.orientation == AgentOrientation.UP: -- 2.20.1 From 271e3365f9e7406c446fee7600f1861cbbe4577d Mon Sep 17 00:00:00 2001 From: majkellll Date: Thu, 25 May 2023 18:18:11 +0200 Subject: [PATCH 5/5] A* gawor done --- bfs.py | 158 +++++++++++++++++++++++++++++++++------------------- city.py | 2 +- movement.py | 5 +- 3 files changed, 104 insertions(+), 61 deletions(-) diff --git a/bfs.py b/bfs.py index a559592..711ed8a 100644 --- a/bfs.py +++ b/bfs.py @@ -1,112 +1,155 @@ from agentState import AgentState -from typing import Dict, Tuple +from typing import Dict, Tuple, List from city import City from gridCellType import GridCellType from agentActionType import AgentActionType from agentOrientation import AgentOrientation -from queue import Queue +from queue import Queue, PriorityQueue from turnCar import turn_left_orientation, turn_right_orientation -class Succ: - state: AgentState - action: AgentActionType - ##cost: int - def __init__(self, state: AgentState, action: AgentActionType) -> None: +class Successor: + + def __init__(self, state: AgentState, action: AgentActionType, cost: int, predicted_cost: int) -> None: self.state = state self.action = action - ##self.cost = cost + self.cost = cost + self.predicted_cost = cost -def find_path_to_nearest_can(startState: AgentState, grid: Dict[Tuple[int, int], GridCellType]) -> list[AgentActionType]: - q: Queue[list[Succ]] = Queue() - visited: list[AgentState] = [] - startStates: list[Succ] = [Succ(startState, AgentActionType.UNKNOWN)] - q.put(startStates) - while not q.empty(): - currently_checked = q.get() - visited.append(currently_checked[-1].state) - if is_state_success(currently_checked[-1].state, grid): - return extract_actions(currently_checked) - successors = succ(currently_checked[-1].state) + +class SuccessorList: + succ_list: list[Successor] + + def __init__(self, succ_list: list[Successor]) -> None: + self.succ_list = succ_list + + def __gt__(self, other): + return self.succ_list[-1].predicted_cost > other.succ_list[-1].predicted_cost + + def __lt__(self, other): + return self.succ_list[-1].predicted_cost < other.succ_list[-1].predicted_cost + + +def find_path_to_nearest_can(startState: AgentState, grid: Dict[Tuple[int, int], GridCellType], city: City) -> List[ + AgentActionType]: + visited: List[AgentState] = [] + queue: PriorityQueue[SuccessorList] = PriorityQueue() + queue.put(SuccessorList([Successor(startState, AgentActionType.UNKNOWN, 0, _heuristics(startState.position, city))])) + + while not queue.empty(): + current = queue.get() + previous = current.succ_list[-1] + visited.append(previous.state) + + if is_state_success(previous.state, grid): + return extract_actions(current) + + successors = get_successors(previous, grid, city) for s in successors: already_visited = False for v in visited: - if v.position[0] == s.state.position[0] and v.position[1] == s.state.position[1] and s.state.orientation == v.orientation: + if v.position == s.state.position and v.orientation == s.state.orientation: already_visited = True break if already_visited: continue if is_state_valid(s.state, grid): - new_list = currently_checked.copy() + new_list = current.succ_list.copy() new_list.append(s) - q.put(new_list) + queue.put(SuccessorList(new_list)) + return [] - - -def extract_actions(successors: list[Succ]) -> list[AgentActionType]: + +def extract_actions(successors: SuccessorList) -> list[AgentActionType]: output: list[AgentActionType] = [] - for s in successors: + for s in successors.succ_list: if s.action != AgentActionType.UNKNOWN: output.append(s.action) return output -def succ(state: AgentState) -> list[Succ]: - result: list[Succ] = [] - result.append(Succ(AgentState(state.position, turn_left_orientation(state.orientation)), AgentActionType.TURN_LEFT)) - result.append(Succ(AgentState(state.position, turn_right_orientation(state.orientation)), AgentActionType.TURN_RIGHT)) - state_succ = move_forward_succ(state) - if state_succ != None: - result.append(move_forward_succ(state)) + +def get_successors(succ: Successor, grid: Dict[Tuple[int, int], GridCellType], city: City) -> List[Successor]: + result: List[Successor] = [] + + turn_left_cost = 1 + succ.cost + turn_left_state = AgentState(succ.state.position, turn_left_orientation(succ.state.orientation)) + turn_left_heuristics = _heuristics(succ.state.position, city) + result.append( + Successor(turn_left_state, AgentActionType.TURN_LEFT, turn_left_cost, turn_left_cost + turn_left_heuristics)) + + turn_right_cost = 1 + succ.cost + turn_right_state = AgentState(succ.state.position, turn_right_orientation(succ.state.orientation)) + turn_right_heuristics = _heuristics(succ.state.position, city) + result.append( + Successor(turn_right_state, AgentActionType.TURN_RIGHT, turn_right_cost, + turn_right_cost + turn_right_heuristics)) + + state_succ = move_forward_succ(succ, city, grid) + if state_succ is not None: + result.append(state_succ) + return result -def move_forward_succ(state: AgentState) -> Succ: - position = get_next_cell(state) - if position == None: + +def move_forward_succ(succ: Successor, city: City, grid: Dict[Tuple[int, int], GridCellType]) -> Successor: + position = get_next_cell(succ.state) + if position is None: return None - return Succ(AgentState(position, state.orientation), AgentActionType.MOVE_FORWARD) + + cost = get_cost_for_action(AgentActionType.MOVE_FORWARD, grid[position]) + succ.cost + predicted_cost = cost + _heuristics(position, city) + new_state = AgentState(position, succ.state.orientation) + return Successor(new_state, AgentActionType.MOVE_FORWARD, cost, predicted_cost) def get_next_cell(state: AgentState) -> Tuple[int, int]: - if state.orientation == AgentOrientation.UP: - if state.position[1] - 1 < 1: + x, y = state.position + orientation = state.orientation + + if orientation == AgentOrientation.UP: + if y - 1 < 1: return None - return (state.position[0], state.position[1] - 1) - if state.orientation == AgentOrientation.DOWN: - if state.position[1] + 1 > 27: + return x, y - 1 + elif orientation == AgentOrientation.DOWN: + if y + 1 > 27: return None - return (state.position[0], state.position[1] + 1) - if state.orientation == AgentOrientation.LEFT: - if state.position[0] - 1 < 1: + return x, y + 1 + elif orientation == AgentOrientation.LEFT: + if x - 1 < 1: return None - return (state.position[0] - 1, state.position[1]) - if state.position[0] + 1 > 27: + return x - 1, y + elif x + 1 > 27: return None - return (state.position[0] + 1, state.position[1]) + else: + return x + 1, y + def is_state_success(state: AgentState, grid: Dict[Tuple[int, int], GridCellType]) -> bool: next_cell = get_next_cell(state) try: return grid[next_cell] == GridCellType.GARBAGE_CAN - except: + except KeyError: return False + def get_cost_for_action(action: AgentActionType, cell_type: GridCellType) -> int: - if action == AgentActionType.TURN_LEFT or action == AgentActionType.TURN_RIGHT: + if action in [AgentActionType.TURN_LEFT, AgentActionType.TURN_RIGHT]: return 1 - if cell_type == GridCellType.SPEED_BUMP: - if action == AgentActionType.MOVE_FORWARD: - return 10 + if cell_type == GridCellType.SPEED_BUMP and action == AgentActionType.MOVE_FORWARD: + return 10 if action == AgentActionType.MOVE_FORWARD: return 3 def is_state_valid(state: AgentState, grid: Dict[Tuple[int, int], GridCellType]) -> bool: - try: - return grid[state.position] == GridCellType.STREET_HORIZONTAL or grid[state.position] == GridCellType.STREET_VERTICAL or grid[state.position] == GridCellType.SPEED_BUMP - except: + try: + return grid[state.position] == GridCellType.STREET_HORIZONTAL or grid[ + state.position] == GridCellType.STREET_VERTICAL or grid[state.position] == GridCellType.SPEED_BUMP + except KeyError: return False - + + def _heuristics(position: Tuple[int, int], city: City): min_distance: int = 300 found_nonvisited: bool = False @@ -120,4 +163,3 @@ def _heuristics(position: Tuple[int, int], city: City): if found_nonvisited: return min_distance return -1 - \ No newline at end of file diff --git a/city.py b/city.py index f53305c..68ae600 100644 --- a/city.py +++ b/city.py @@ -41,4 +41,4 @@ class City: def _render_bumps(self, game_context: GameContext) -> None: for bump in self.bumps: - bump.render(game_context) + bump.render(game_context) \ No newline at end of file diff --git a/movement.py b/movement.py index 308c64b..0060c0e 100644 --- a/movement.py +++ b/movement.py @@ -10,10 +10,11 @@ import pygame from bfs import find_path_to_nearest_can from agentState import AgentState + def collect_garbage(game_context: GameContext) -> None: while True: start_agent_state = AgentState(game_context.dust_car.position, game_context.dust_car.orientation) - path = find_path_to_nearest_can(start_agent_state, game_context.grid) + path = find_path_to_nearest_can(start_agent_state, game_context.grid, game_context.city) if path == None or len(path) == 0: break move_dust_car(path, game_context) @@ -22,6 +23,7 @@ def collect_garbage(game_context: GameContext) -> None: game_context.city.cans_dict[next_position].is_visited = True pass + def move_dust_car(actions: list[AgentActionType], game_context: GameContext) -> None: for action in actions: street_position = game_context.dust_car.position @@ -44,7 +46,6 @@ def move_dust_car(actions: list[AgentActionType], game_context: GameContext) -> pygame.display.update() time.sleep(0.15) - def calculate_next_position(car: GarbageTruck) -> Tuple[int, int]: if car.orientation == AgentOrientation.UP: -- 2.20.1