A_star #25
97
bfs.py
97
bfs.py
@ -1,49 +1,58 @@
|
||||
from agentState import AgentState
|
||||
from typing import Dict, Tuple
|
||||
from typing import Dict, Tuple, List, Set
|
||||
from city import City
|
||||
from gridCellType import GridCellType
|
||||
from agentActionType import AgentActionType
|
||||
from agentOrientation import AgentOrientation
|
||||
from queue import Queue
|
||||
from queue import PriorityQueue
|
||||
from turnCar import turn_left_orientation, turn_right_orientation
|
||||
import heapq
|
||||
|
||||
|
||||
class Succ:
|
||||
state: AgentState
|
||||
action: AgentActionType
|
||||
##cost: int
|
||||
cost: int
|
||||
|
||||
def __init__(self, state: AgentState, action: AgentActionType) -> None:
|
||||
def __init__(self, state: AgentState, action: AgentActionType, cost: int) -> None:
|
||||
self.state = state
|
||||
self.action = action
|
||||
##self.cost = cost
|
||||
self.cost = cost
|
||||
|
||||
def find_path_to_nearest_can(startState: AgentState, grid: Dict[Tuple[int, int], GridCellType]) -> list[AgentActionType]:
|
||||
q: Queue[list[Succ]] = Queue()
|
||||
visited: list[AgentState] = []
|
||||
startStates: list[Succ] = [Succ(startState, AgentActionType.UNKNOWN)]
|
||||
q.put(startStates)
|
||||
while not q.empty():
|
||||
currently_checked = q.get()
|
||||
visited.append(currently_checked[-1].state)
|
||||
if is_state_success(currently_checked[-1].state, grid):
|
||||
|
||||
def find_path_to_nearest_can(startState: AgentState, grid: Dict[Tuple[int, int], GridCellType], city: City) -> list[AgentActionType]:
|
||||
pq: PriorityQueue[Tuple[int, List[Succ]]] = PriorityQueue()
|
||||
visited: set[AgentState] = set()
|
||||
startStates: list[Succ] = [Succ(startState, AgentActionType.UNKNOWN, 0)]
|
||||
pq.put((0, startStates))
|
||||
|
||||
while not pq.empty():
|
||||
_, currently_checked = pq.get()
|
||||
last_state = currently_checked[-1].state
|
||||
if last_state in visited:
|
||||
continue
|
||||
visited.add(last_state)
|
||||
|
||||
if is_state_success(last_state, grid):
|
||||
return extract_actions(currently_checked)
|
||||
successors = succ(currently_checked[-1].state)
|
||||
|
||||
successors = succ(last_state)
|
||||
for s in successors:
|
||||
already_visited = False
|
||||
for v in visited:
|
||||
if v.position[0] == s.state.position[0] and v.position[1] == s.state.position[1] and s.state.orientation == v.orientation:
|
||||
already_visited = True
|
||||
break
|
||||
if already_visited:
|
||||
if s.state in visited:
|
||||
continue
|
||||
if is_state_valid(s.state, grid):
|
||||
new_list = currently_checked.copy()
|
||||
new_list.append(s)
|
||||
q.put(new_list)
|
||||
if not is_state_valid(s.state, grid):
|
||||
continue
|
||||
|
||||
g_cost = currently_checked[-1].cost + get_cost_for_action(s.action, grid.get(s.state.position, GridCellType.STREET_HORIZONTAL))
|
||||
h_cost = _heuristics(s.state.position, city)
|
||||
f_cost = g_cost + h_cost
|
||||
new_list = currently_checked.copy()
|
||||
new_list.append(s)
|
||||
pq.put((f_cost, new_list))
|
||||
|
||||
return []
|
||||
|
||||
|
||||
|
||||
|
||||
def extract_actions(successors: list[Succ]) -> list[AgentActionType]:
|
||||
output: list[AgentActionType] = []
|
||||
for s in successors:
|
||||
@ -51,20 +60,23 @@ def extract_actions(successors: list[Succ]) -> list[AgentActionType]:
|
||||
output.append(s.action)
|
||||
return output
|
||||
|
||||
|
||||
def succ(state: AgentState) -> list[Succ]:
|
||||
result: list[Succ] = []
|
||||
result.append(Succ(AgentState(state.position, turn_left_orientation(state.orientation)), AgentActionType.TURN_LEFT))
|
||||
result.append(Succ(AgentState(state.position, turn_right_orientation(state.orientation)), AgentActionType.TURN_RIGHT))
|
||||
result.append(Succ(AgentState(state.position, turn_left_orientation(state.orientation)), AgentActionType.TURN_LEFT, 0))
|
||||
result.append(Succ(AgentState(state.position, turn_right_orientation(state.orientation)), AgentActionType.TURN_RIGHT, 0))
|
||||
state_succ = move_forward_succ(state)
|
||||
if state_succ != None:
|
||||
result.append(move_forward_succ(state))
|
||||
if state_succ is not None:
|
||||
result.append(Succ(state_succ.state, AgentActionType.MOVE_FORWARD, state_succ.cost))
|
||||
return result
|
||||
|
||||
|
||||
def move_forward_succ(state: AgentState) -> Succ:
|
||||
position = get_next_cell(state)
|
||||
if position == None:
|
||||
if position is None:
|
||||
return None
|
||||
return Succ(AgentState(position, state.orientation), AgentActionType.MOVE_FORWARD)
|
||||
return Succ(AgentState(position, state.orientation), AgentActionType.MOVE_FORWARD,
|
||||
get_cost_for_action(AgentActionType.MOVE_FORWARD, GridCellType.STREET_HORIZONTAL))
|
||||
|
||||
|
||||
def get_next_cell(state: AgentState) -> Tuple[int, int]:
|
||||
@ -84,13 +96,15 @@ def get_next_cell(state: AgentState) -> Tuple[int, int]:
|
||||
return None
|
||||
return (state.position[0] + 1, state.position[1])
|
||||
|
||||
|
||||
def is_state_success(state: AgentState, grid: Dict[Tuple[int, int], GridCellType]) -> bool:
|
||||
next_cell = get_next_cell(state)
|
||||
try:
|
||||
return grid[next_cell] == GridCellType.GARBAGE_CAN
|
||||
except:
|
||||
except KeyError:
|
||||
return False
|
||||
|
||||
|
||||
def get_cost_for_action(action: AgentActionType, cell_type: GridCellType) -> int:
|
||||
if action == AgentActionType.TURN_LEFT or action == AgentActionType.TURN_RIGHT:
|
||||
return 1
|
||||
@ -102,12 +116,14 @@ def get_cost_for_action(action: AgentActionType, cell_type: GridCellType) -> int
|
||||
|
||||
|
||||
def is_state_valid(state: AgentState, grid: Dict[Tuple[int, int], GridCellType]) -> bool:
|
||||
try:
|
||||
return grid[state.position] == GridCellType.STREET_HORIZONTAL or grid[state.position] == GridCellType.STREET_VERTICAL or grid[state.position] == GridCellType.SPEED_BUMP
|
||||
except:
|
||||
try:
|
||||
return grid[state.position] == GridCellType.STREET_HORIZONTAL or grid[
|
||||
state.position] == GridCellType.STREET_VERTICAL or grid[state.position] == GridCellType.SPEED_BUMP
|
||||
except KeyError:
|
||||
return False
|
||||
|
||||
def _heuristics(position: Tuple[int, int], city: City):
|
||||
|
||||
|
||||
def _heuristics(position: Tuple[int, int], city: City) -> int:
|
||||
min_distance: int = 300
|
||||
found_nonvisited: bool = False
|
||||
for can in city.cans:
|
||||
@ -120,4 +136,5 @@ def _heuristics(position: Tuple[int, int], city: City):
|
||||
if found_nonvisited:
|
||||
return min_distance
|
||||
return -1
|
||||
|
||||
|
||||
|
||||
|
15
city.py
15
city.py
@ -2,8 +2,9 @@ from typing import List, Dict, Tuple
|
||||
from garbageCan import GarbageCan
|
||||
from speedBump import SpeedBump
|
||||
from street import Street
|
||||
from gameContext import GameContext
|
||||
|
||||
from gameContext import GameContext
|
||||
|
||||
|
||||
class City:
|
||||
cans: List[GarbageCan]
|
||||
bumps: List[SpeedBump]
|
||||
@ -18,10 +19,10 @@ class City:
|
||||
def add_can(self, can: GarbageCan) -> None:
|
||||
self.nodes.append(can)
|
||||
self.cans_dict[can.position] = can
|
||||
|
||||
|
||||
def add_street(self, street: Street) -> None:
|
||||
self.streets.append(street)
|
||||
|
||||
|
||||
def add_bump(self, bump: SpeedBump) -> None:
|
||||
self.streets.append(bump)
|
||||
|
||||
@ -33,11 +34,11 @@ class City:
|
||||
def _render_streets(self, game_context: GameContext) -> None:
|
||||
for street in self.streets:
|
||||
street.render(game_context)
|
||||
|
||||
|
||||
def _render_nodes(self, game_context: GameContext) -> None:
|
||||
for node in self.nodes:
|
||||
node.render(game_context)
|
||||
|
||||
|
||||
def _render_bumps(self, game_context: GameContext) -> None:
|
||||
for bump in self.bumps:
|
||||
bump.render(game_context)
|
||||
bump.render(game_context)
|
||||
|
7
main.py
7
main.py
@ -1,4 +1,6 @@
|
||||
import pygame
|
||||
|
||||
from city import City
|
||||
from gameEventHandler import handle_game_event
|
||||
from gameContext import GameContext
|
||||
from startup import startup
|
||||
@ -17,8 +19,11 @@ game_context = GameContext()
|
||||
game_context.dust_car_pil = dust_car_pil
|
||||
game_context.dust_car_pygame = pygame.image.frombuffer(dust_car_pil.tobytes(), dust_car_pil.size, 'RGB')
|
||||
game_context.canvas = canvas
|
||||
|
||||
city = City()
|
||||
|
||||
startup(game_context)
|
||||
collect_garbage(game_context)
|
||||
collect_garbage(game_context, city)
|
||||
|
||||
exit = False
|
||||
|
||||
|
17
movement.py
17
movement.py
@ -9,19 +9,21 @@ from agentOrientation import AgentOrientation
|
||||
import pygame
|
||||
from bfs import find_path_to_nearest_can
|
||||
from agentState import AgentState
|
||||
from city import City
|
||||
|
||||
def collect_garbage(game_context: GameContext) -> None:
|
||||
|
||||
def collect_garbage(game_context: GameContext, city: City) -> None:
|
||||
while True:
|
||||
start_agent_state = AgentState(game_context.dust_car.position, game_context.dust_car.orientation)
|
||||
path = find_path_to_nearest_can(start_agent_state, game_context.grid)
|
||||
if path == None or len(path) == 0:
|
||||
path = find_path_to_nearest_can(start_agent_state, game_context.grid, city)
|
||||
if path is None or len(path) == 0:
|
||||
break
|
||||
move_dust_car(path, game_context)
|
||||
next_position = calculate_next_position(game_context.dust_car)
|
||||
game_context.grid[next_position] = GridCellType.VISITED_GARBAGE_CAN
|
||||
game_context.city.cans_dict[next_position].is_visited = True
|
||||
pass
|
||||
|
||||
|
||||
def move_dust_car(actions: list[AgentActionType], game_context: GameContext) -> None:
|
||||
for action in actions:
|
||||
street_position = game_context.dust_car.position
|
||||
@ -39,12 +41,9 @@ def move_dust_car(actions: list[AgentActionType], game_context: GameContext) ->
|
||||
game_context.render_in_cell(street_position, "imgs/street_horizontal.png")
|
||||
elif game_context.grid[street_position] == GridCellType.STREET_VERTICAL:
|
||||
game_context.render_in_cell(street_position, "imgs/street_vertical.png")
|
||||
elif game_context.grid[street_position] == GridCellType.SPEED_BUMP:
|
||||
game_context.render_in_cell(street_position, "imgs/speed_bump.png")
|
||||
pygame.display.update()
|
||||
time.sleep(0.15)
|
||||
time.sleep(0.5)
|
||||
|
||||
|
||||
|
||||
def calculate_next_position(car: GarbageTruck) -> Tuple[int, int]:
|
||||
if car.orientation == AgentOrientation.UP:
|
||||
@ -61,4 +60,4 @@ def calculate_next_position(car: GarbageTruck) -> Tuple[int, int]:
|
||||
return (car.position[0] - 1, car.position[1])
|
||||
if car.position[0] + 1 > 27:
|
||||
return None
|
||||
return (car.position[0] + 1, car.position[1])
|
||||
return (car.position[0] + 1, car.position[1])
|
||||
|
Loading…
Reference in New Issue
Block a user