Merge pull request 'A_star' (#25) from A_star into master

Reviewed-on: #25
Reviewed-by: Paweł Felcyn <pawfel1@st.amu.edu.pl>
This commit is contained in:
Paweł Felcyn 2023-05-25 18:52:06 +02:00
commit 9287f76ea3
4 changed files with 114 additions and 70 deletions

View File

@ -7,4 +7,4 @@ class AgentState:
def __init__(self, position: Tuple[int, int], orientation: AgentOrientation) -> None:
self.orientation = orientation
self.position = position
self.position = position

158
bfs.py
View File

@ -1,112 +1,155 @@
from agentState import AgentState
from typing import Dict, Tuple
from typing import Dict, Tuple, List
from city import City
from gridCellType import GridCellType
from agentActionType import AgentActionType
from agentOrientation import AgentOrientation
from queue import Queue
from queue import Queue, PriorityQueue
from turnCar import turn_left_orientation, turn_right_orientation
class Succ:
state: AgentState
action: AgentActionType
##cost: int
def __init__(self, state: AgentState, action: AgentActionType) -> None:
class Successor:
def __init__(self, state: AgentState, action: AgentActionType, cost: int, predicted_cost: int) -> None:
self.state = state
self.action = action
##self.cost = cost
self.cost = cost
self.predicted_cost = cost
def find_path_to_nearest_can(startState: AgentState, grid: Dict[Tuple[int, int], GridCellType]) -> list[AgentActionType]:
q: Queue[list[Succ]] = Queue()
visited: list[AgentState] = []
startStates: list[Succ] = [Succ(startState, AgentActionType.UNKNOWN)]
q.put(startStates)
while not q.empty():
currently_checked = q.get()
visited.append(currently_checked[-1].state)
if is_state_success(currently_checked[-1].state, grid):
return extract_actions(currently_checked)
successors = succ(currently_checked[-1].state)
class SuccessorList:
succ_list: list[Successor]
def __init__(self, succ_list: list[Successor]) -> None:
self.succ_list = succ_list
def __gt__(self, other):
return self.succ_list[-1].predicted_cost > other.succ_list[-1].predicted_cost
def __lt__(self, other):
return self.succ_list[-1].predicted_cost < other.succ_list[-1].predicted_cost
def find_path_to_nearest_can(startState: AgentState, grid: Dict[Tuple[int, int], GridCellType], city: City) -> List[
AgentActionType]:
visited: List[AgentState] = []
queue: PriorityQueue[SuccessorList] = PriorityQueue()
queue.put(SuccessorList([Successor(startState, AgentActionType.UNKNOWN, 0, _heuristics(startState.position, city))]))
while not queue.empty():
current = queue.get()
previous = current.succ_list[-1]
visited.append(previous.state)
if is_state_success(previous.state, grid):
return extract_actions(current)
successors = get_successors(previous, grid, city)
for s in successors:
already_visited = False
for v in visited:
if v.position[0] == s.state.position[0] and v.position[1] == s.state.position[1] and s.state.orientation == v.orientation:
if v.position == s.state.position and v.orientation == s.state.orientation:
already_visited = True
break
if already_visited:
continue
if is_state_valid(s.state, grid):
new_list = currently_checked.copy()
new_list = current.succ_list.copy()
new_list.append(s)
q.put(new_list)
queue.put(SuccessorList(new_list))
return []
def extract_actions(successors: list[Succ]) -> list[AgentActionType]:
def extract_actions(successors: SuccessorList) -> list[AgentActionType]:
output: list[AgentActionType] = []
for s in successors:
for s in successors.succ_list:
if s.action != AgentActionType.UNKNOWN:
output.append(s.action)
return output
def succ(state: AgentState) -> list[Succ]:
result: list[Succ] = []
result.append(Succ(AgentState(state.position, turn_left_orientation(state.orientation)), AgentActionType.TURN_LEFT))
result.append(Succ(AgentState(state.position, turn_right_orientation(state.orientation)), AgentActionType.TURN_RIGHT))
state_succ = move_forward_succ(state)
if state_succ != None:
result.append(move_forward_succ(state))
def get_successors(succ: Successor, grid: Dict[Tuple[int, int], GridCellType], city: City) -> List[Successor]:
result: List[Successor] = []
turn_left_cost = 1 + succ.cost
turn_left_state = AgentState(succ.state.position, turn_left_orientation(succ.state.orientation))
turn_left_heuristics = _heuristics(succ.state.position, city)
result.append(
Successor(turn_left_state, AgentActionType.TURN_LEFT, turn_left_cost, turn_left_cost + turn_left_heuristics))
turn_right_cost = 1 + succ.cost
turn_right_state = AgentState(succ.state.position, turn_right_orientation(succ.state.orientation))
turn_right_heuristics = _heuristics(succ.state.position, city)
result.append(
Successor(turn_right_state, AgentActionType.TURN_RIGHT, turn_right_cost,
turn_right_cost + turn_right_heuristics))
state_succ = move_forward_succ(succ, city, grid)
if state_succ is not None:
result.append(state_succ)
return result
def move_forward_succ(state: AgentState) -> Succ:
position = get_next_cell(state)
if position == None:
def move_forward_succ(succ: Successor, city: City, grid: Dict[Tuple[int, int], GridCellType]) -> Successor:
position = get_next_cell(succ.state)
if position is None:
return None
return Succ(AgentState(position, state.orientation), AgentActionType.MOVE_FORWARD)
cost = get_cost_for_action(AgentActionType.MOVE_FORWARD, grid[position]) + succ.cost
predicted_cost = cost + _heuristics(position, city)
new_state = AgentState(position, succ.state.orientation)
return Successor(new_state, AgentActionType.MOVE_FORWARD, cost, predicted_cost)
def get_next_cell(state: AgentState) -> Tuple[int, int]:
if state.orientation == AgentOrientation.UP:
if state.position[1] - 1 < 1:
x, y = state.position
orientation = state.orientation
if orientation == AgentOrientation.UP:
if y - 1 < 1:
return None
return (state.position[0], state.position[1] - 1)
if state.orientation == AgentOrientation.DOWN:
if state.position[1] + 1 > 27:
return x, y - 1
elif orientation == AgentOrientation.DOWN:
if y + 1 > 27:
return None
return (state.position[0], state.position[1] + 1)
if state.orientation == AgentOrientation.LEFT:
if state.position[0] - 1 < 1:
return x, y + 1
elif orientation == AgentOrientation.LEFT:
if x - 1 < 1:
return None
return (state.position[0] - 1, state.position[1])
if state.position[0] + 1 > 27:
return x - 1, y
elif x + 1 > 27:
return None
return (state.position[0] + 1, state.position[1])
else:
return x + 1, y
def is_state_success(state: AgentState, grid: Dict[Tuple[int, int], GridCellType]) -> bool:
next_cell = get_next_cell(state)
try:
return grid[next_cell] == GridCellType.GARBAGE_CAN
except:
except KeyError:
return False
def get_cost_for_action(action: AgentActionType, cell_type: GridCellType) -> int:
if action == AgentActionType.TURN_LEFT or action == AgentActionType.TURN_RIGHT:
if action in [AgentActionType.TURN_LEFT, AgentActionType.TURN_RIGHT]:
return 1
if cell_type == GridCellType.SPEED_BUMP:
if action == AgentActionType.MOVE_FORWARD:
return 10
if cell_type == GridCellType.SPEED_BUMP and action == AgentActionType.MOVE_FORWARD:
return 10
if action == AgentActionType.MOVE_FORWARD:
return 3
def is_state_valid(state: AgentState, grid: Dict[Tuple[int, int], GridCellType]) -> bool:
try:
return grid[state.position] == GridCellType.STREET_HORIZONTAL or grid[state.position] == GridCellType.STREET_VERTICAL or grid[state.position] == GridCellType.SPEED_BUMP
except:
try:
return grid[state.position] == GridCellType.STREET_HORIZONTAL or grid[
state.position] == GridCellType.STREET_VERTICAL or grid[state.position] == GridCellType.SPEED_BUMP
except KeyError:
return False
def _heuristics(position: Tuple[int, int], city: City):
min_distance: int = 300
found_nonvisited: bool = False
@ -120,4 +163,3 @@ def _heuristics(position: Tuple[int, int], city: City):
if found_nonvisited:
return min_distance
return -1

19
city.py
View File

@ -2,8 +2,9 @@ from typing import List, Dict, Tuple
from garbageCan import GarbageCan
from speedBump import SpeedBump
from street import Street
from gameContext import GameContext
from gameContext import GameContext
class City:
cans: List[GarbageCan]
bumps: List[SpeedBump]
@ -11,17 +12,17 @@ class City:
cans_dict: Dict[Tuple[int, int], GarbageCan] = {}
def __init__(self) -> None:
self.nodes = []
self.cans = []
self.streets = []
self.bumps = []
def add_can(self, can: GarbageCan) -> None:
self.nodes.append(can)
self.cans.append(can)
self.cans_dict[can.position] = can
def add_street(self, street: Street) -> None:
self.streets.append(street)
def add_bump(self, bump: SpeedBump) -> None:
self.streets.append(bump)
@ -33,11 +34,11 @@ class City:
def _render_streets(self, game_context: GameContext) -> None:
for street in self.streets:
street.render(game_context)
def _render_nodes(self, game_context: GameContext) -> None:
for node in self.nodes:
for node in self.cans:
node.render(game_context)
def _render_bumps(self, game_context: GameContext) -> None:
for bump in self.bumps:
bump.render(game_context)

View File

@ -10,10 +10,11 @@ import pygame
from bfs import find_path_to_nearest_can
from agentState import AgentState
def collect_garbage(game_context: GameContext) -> None:
while True:
start_agent_state = AgentState(game_context.dust_car.position, game_context.dust_car.orientation)
path = find_path_to_nearest_can(start_agent_state, game_context.grid)
path = find_path_to_nearest_can(start_agent_state, game_context.grid, game_context.city)
if path == None or len(path) == 0:
break
move_dust_car(path, game_context)
@ -22,6 +23,7 @@ def collect_garbage(game_context: GameContext) -> None:
game_context.city.cans_dict[next_position].is_visited = True
pass
def move_dust_car(actions: list[AgentActionType], game_context: GameContext) -> None:
for action in actions:
street_position = game_context.dust_car.position
@ -44,7 +46,6 @@ def move_dust_car(actions: list[AgentActionType], game_context: GameContext) ->
pygame.display.update()
time.sleep(0.15)
def calculate_next_position(car: GarbageTruck) -> Tuple[int, int]:
if car.orientation == AgentOrientation.UP: