Compare commits
5 Commits
5440626353
...
271e3365f9
Author | SHA1 | Date | |
---|---|---|---|
|
271e3365f9 | ||
|
aacee0e493 | ||
|
15638862d3 | ||
|
af32df4474 | ||
|
311a2d0757 |
149
bfs.py
149
bfs.py
@ -1,100 +1,128 @@
|
|||||||
from agentState import AgentState
|
from agentState import AgentState
|
||||||
from typing import Dict, Tuple, List, Set
|
from typing import Dict, Tuple, List
|
||||||
from city import City
|
from city import City
|
||||||
from gridCellType import GridCellType
|
from gridCellType import GridCellType
|
||||||
from agentActionType import AgentActionType
|
from agentActionType import AgentActionType
|
||||||
from agentOrientation import AgentOrientation
|
from agentOrientation import AgentOrientation
|
||||||
from queue import PriorityQueue
|
from queue import Queue, PriorityQueue
|
||||||
from turnCar import turn_left_orientation, turn_right_orientation
|
from turnCar import turn_left_orientation, turn_right_orientation
|
||||||
import heapq
|
|
||||||
|
|
||||||
|
|
||||||
class Succ:
|
class Successor:
|
||||||
state: AgentState
|
|
||||||
action: AgentActionType
|
|
||||||
cost: int
|
|
||||||
|
|
||||||
def __init__(self, state: AgentState, action: AgentActionType, cost: int) -> None:
|
def __init__(self, state: AgentState, action: AgentActionType, cost: int, predicted_cost: int) -> None:
|
||||||
self.state = state
|
self.state = state
|
||||||
self.action = action
|
self.action = action
|
||||||
self.cost = cost
|
self.cost = cost
|
||||||
|
self.predicted_cost = cost
|
||||||
|
|
||||||
|
|
||||||
def find_path_to_nearest_can(startState: AgentState, grid: Dict[Tuple[int, int], GridCellType], city: City) -> list[AgentActionType]:
|
class SuccessorList:
|
||||||
pq: PriorityQueue[Tuple[int, List[Succ]]] = PriorityQueue()
|
succ_list: list[Successor]
|
||||||
visited: set[AgentState] = set()
|
|
||||||
startStates: list[Succ] = [Succ(startState, AgentActionType.UNKNOWN, 0)]
|
|
||||||
pq.put((0, startStates))
|
|
||||||
|
|
||||||
while not pq.empty():
|
def __init__(self, succ_list: list[Successor]) -> None:
|
||||||
_, currently_checked = pq.get()
|
self.succ_list = succ_list
|
||||||
last_state = currently_checked[-1].state
|
|
||||||
if last_state in visited:
|
|
||||||
continue
|
|
||||||
visited.add(last_state)
|
|
||||||
|
|
||||||
if is_state_success(last_state, grid):
|
def __gt__(self, other):
|
||||||
return extract_actions(currently_checked)
|
return self.succ_list[-1].predicted_cost > other.succ_list[-1].predicted_cost
|
||||||
|
|
||||||
successors = succ(last_state)
|
def __lt__(self, other):
|
||||||
|
return self.succ_list[-1].predicted_cost < other.succ_list[-1].predicted_cost
|
||||||
|
|
||||||
|
|
||||||
|
def find_path_to_nearest_can(startState: AgentState, grid: Dict[Tuple[int, int], GridCellType], city: City) -> List[
|
||||||
|
AgentActionType]:
|
||||||
|
visited: List[AgentState] = []
|
||||||
|
queue: PriorityQueue[SuccessorList] = PriorityQueue()
|
||||||
|
queue.put(SuccessorList([Successor(startState, AgentActionType.UNKNOWN, 0, _heuristics(startState.position, city))]))
|
||||||
|
|
||||||
|
while not queue.empty():
|
||||||
|
current = queue.get()
|
||||||
|
previous = current.succ_list[-1]
|
||||||
|
visited.append(previous.state)
|
||||||
|
|
||||||
|
if is_state_success(previous.state, grid):
|
||||||
|
return extract_actions(current)
|
||||||
|
|
||||||
|
successors = get_successors(previous, grid, city)
|
||||||
for s in successors:
|
for s in successors:
|
||||||
if s.state in visited:
|
already_visited = False
|
||||||
|
for v in visited:
|
||||||
|
if v.position == s.state.position and v.orientation == s.state.orientation:
|
||||||
|
already_visited = True
|
||||||
|
break
|
||||||
|
if already_visited:
|
||||||
continue
|
continue
|
||||||
if not is_state_valid(s.state, grid):
|
if is_state_valid(s.state, grid):
|
||||||
continue
|
new_list = current.succ_list.copy()
|
||||||
|
new_list.append(s)
|
||||||
g_cost = currently_checked[-1].cost + get_cost_for_action(s.action, grid.get(s.state.position, GridCellType.STREET_HORIZONTAL))
|
queue.put(SuccessorList(new_list))
|
||||||
h_cost = _heuristics(s.state.position, city)
|
|
||||||
f_cost = g_cost + h_cost
|
|
||||||
new_list = currently_checked.copy()
|
|
||||||
new_list.append(s)
|
|
||||||
pq.put((f_cost, new_list))
|
|
||||||
|
|
||||||
return []
|
return []
|
||||||
|
|
||||||
|
|
||||||
def extract_actions(successors: list[Succ]) -> list[AgentActionType]:
|
def extract_actions(successors: SuccessorList) -> list[AgentActionType]:
|
||||||
output: list[AgentActionType] = []
|
output: list[AgentActionType] = []
|
||||||
for s in successors:
|
for s in successors.succ_list:
|
||||||
if s.action != AgentActionType.UNKNOWN:
|
if s.action != AgentActionType.UNKNOWN:
|
||||||
output.append(s.action)
|
output.append(s.action)
|
||||||
return output
|
return output
|
||||||
|
|
||||||
|
|
||||||
def succ(state: AgentState) -> list[Succ]:
|
def get_successors(succ: Successor, grid: Dict[Tuple[int, int], GridCellType], city: City) -> List[Successor]:
|
||||||
result: list[Succ] = []
|
result: List[Successor] = []
|
||||||
result.append(Succ(AgentState(state.position, turn_left_orientation(state.orientation)), AgentActionType.TURN_LEFT, 0))
|
|
||||||
result.append(Succ(AgentState(state.position, turn_right_orientation(state.orientation)), AgentActionType.TURN_RIGHT, 0))
|
turn_left_cost = 1 + succ.cost
|
||||||
state_succ = move_forward_succ(state)
|
turn_left_state = AgentState(succ.state.position, turn_left_orientation(succ.state.orientation))
|
||||||
|
turn_left_heuristics = _heuristics(succ.state.position, city)
|
||||||
|
result.append(
|
||||||
|
Successor(turn_left_state, AgentActionType.TURN_LEFT, turn_left_cost, turn_left_cost + turn_left_heuristics))
|
||||||
|
|
||||||
|
turn_right_cost = 1 + succ.cost
|
||||||
|
turn_right_state = AgentState(succ.state.position, turn_right_orientation(succ.state.orientation))
|
||||||
|
turn_right_heuristics = _heuristics(succ.state.position, city)
|
||||||
|
result.append(
|
||||||
|
Successor(turn_right_state, AgentActionType.TURN_RIGHT, turn_right_cost,
|
||||||
|
turn_right_cost + turn_right_heuristics))
|
||||||
|
|
||||||
|
state_succ = move_forward_succ(succ, city, grid)
|
||||||
if state_succ is not None:
|
if state_succ is not None:
|
||||||
result.append(Succ(state_succ.state, AgentActionType.MOVE_FORWARD, state_succ.cost))
|
result.append(state_succ)
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
def move_forward_succ(state: AgentState) -> Succ:
|
def move_forward_succ(succ: Successor, city: City, grid: Dict[Tuple[int, int], GridCellType]) -> Successor:
|
||||||
position = get_next_cell(state)
|
position = get_next_cell(succ.state)
|
||||||
if position is None:
|
if position is None:
|
||||||
return None
|
return None
|
||||||
return Succ(AgentState(position, state.orientation), AgentActionType.MOVE_FORWARD,
|
|
||||||
get_cost_for_action(AgentActionType.MOVE_FORWARD, GridCellType.STREET_HORIZONTAL))
|
cost = get_cost_for_action(AgentActionType.MOVE_FORWARD, grid[position]) + succ.cost
|
||||||
|
predicted_cost = cost + _heuristics(position, city)
|
||||||
|
new_state = AgentState(position, succ.state.orientation)
|
||||||
|
return Successor(new_state, AgentActionType.MOVE_FORWARD, cost, predicted_cost)
|
||||||
|
|
||||||
|
|
||||||
def get_next_cell(state: AgentState) -> Tuple[int, int]:
|
def get_next_cell(state: AgentState) -> Tuple[int, int]:
|
||||||
if state.orientation == AgentOrientation.UP:
|
x, y = state.position
|
||||||
if state.position[1] - 1 < 1:
|
orientation = state.orientation
|
||||||
|
|
||||||
|
if orientation == AgentOrientation.UP:
|
||||||
|
if y - 1 < 1:
|
||||||
return None
|
return None
|
||||||
return (state.position[0], state.position[1] - 1)
|
return x, y - 1
|
||||||
if state.orientation == AgentOrientation.DOWN:
|
elif orientation == AgentOrientation.DOWN:
|
||||||
if state.position[1] + 1 > 27:
|
if y + 1 > 27:
|
||||||
return None
|
return None
|
||||||
return (state.position[0], state.position[1] + 1)
|
return x, y + 1
|
||||||
if state.orientation == AgentOrientation.LEFT:
|
elif orientation == AgentOrientation.LEFT:
|
||||||
if state.position[0] - 1 < 1:
|
if x - 1 < 1:
|
||||||
return None
|
return None
|
||||||
return (state.position[0] - 1, state.position[1])
|
return x - 1, y
|
||||||
if state.position[0] + 1 > 27:
|
elif x + 1 > 27:
|
||||||
return None
|
return None
|
||||||
return (state.position[0] + 1, state.position[1])
|
else:
|
||||||
|
return x + 1, y
|
||||||
|
|
||||||
|
|
||||||
def is_state_success(state: AgentState, grid: Dict[Tuple[int, int], GridCellType]) -> bool:
|
def is_state_success(state: AgentState, grid: Dict[Tuple[int, int], GridCellType]) -> bool:
|
||||||
@ -106,11 +134,10 @@ def is_state_success(state: AgentState, grid: Dict[Tuple[int, int], GridCellType
|
|||||||
|
|
||||||
|
|
||||||
def get_cost_for_action(action: AgentActionType, cell_type: GridCellType) -> int:
|
def get_cost_for_action(action: AgentActionType, cell_type: GridCellType) -> int:
|
||||||
if action == AgentActionType.TURN_LEFT or action == AgentActionType.TURN_RIGHT:
|
if action in [AgentActionType.TURN_LEFT, AgentActionType.TURN_RIGHT]:
|
||||||
return 1
|
return 1
|
||||||
if cell_type == GridCellType.SPEED_BUMP:
|
if cell_type == GridCellType.SPEED_BUMP and action == AgentActionType.MOVE_FORWARD:
|
||||||
if action == AgentActionType.MOVE_FORWARD:
|
return 10
|
||||||
return 10
|
|
||||||
if action == AgentActionType.MOVE_FORWARD:
|
if action == AgentActionType.MOVE_FORWARD:
|
||||||
return 3
|
return 3
|
||||||
|
|
||||||
@ -123,7 +150,7 @@ def is_state_valid(state: AgentState, grid: Dict[Tuple[int, int], GridCellType])
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
def _heuristics(position: Tuple[int, int], city: City) -> int:
|
def _heuristics(position: Tuple[int, int], city: City):
|
||||||
min_distance: int = 300
|
min_distance: int = 300
|
||||||
found_nonvisited: bool = False
|
found_nonvisited: bool = False
|
||||||
for can in city.cans:
|
for can in city.cans:
|
||||||
@ -136,5 +163,3 @@ def _heuristics(position: Tuple[int, int], city: City) -> int:
|
|||||||
if found_nonvisited:
|
if found_nonvisited:
|
||||||
return min_distance
|
return min_distance
|
||||||
return -1
|
return -1
|
||||||
|
|
||||||
|
|
||||||
|
6
city.py
6
city.py
@ -12,12 +12,12 @@ class City:
|
|||||||
cans_dict: Dict[Tuple[int, int], GarbageCan] = {}
|
cans_dict: Dict[Tuple[int, int], GarbageCan] = {}
|
||||||
|
|
||||||
def __init__(self) -> None:
|
def __init__(self) -> None:
|
||||||
self.nodes = []
|
self.cans = []
|
||||||
self.streets = []
|
self.streets = []
|
||||||
self.bumps = []
|
self.bumps = []
|
||||||
|
|
||||||
def add_can(self, can: GarbageCan) -> None:
|
def add_can(self, can: GarbageCan) -> None:
|
||||||
self.nodes.append(can)
|
self.cans.append(can)
|
||||||
self.cans_dict[can.position] = can
|
self.cans_dict[can.position] = can
|
||||||
|
|
||||||
def add_street(self, street: Street) -> None:
|
def add_street(self, street: Street) -> None:
|
||||||
@ -36,7 +36,7 @@ class City:
|
|||||||
street.render(game_context)
|
street.render(game_context)
|
||||||
|
|
||||||
def _render_nodes(self, game_context: GameContext) -> None:
|
def _render_nodes(self, game_context: GameContext) -> None:
|
||||||
for node in self.nodes:
|
for node in self.cans:
|
||||||
node.render(game_context)
|
node.render(game_context)
|
||||||
|
|
||||||
def _render_bumps(self, game_context: GameContext) -> None:
|
def _render_bumps(self, game_context: GameContext) -> None:
|
||||||
|
7
main.py
7
main.py
@ -1,6 +1,4 @@
|
|||||||
import pygame
|
import pygame
|
||||||
|
|
||||||
from city import City
|
|
||||||
from gameEventHandler import handle_game_event
|
from gameEventHandler import handle_game_event
|
||||||
from gameContext import GameContext
|
from gameContext import GameContext
|
||||||
from startup import startup
|
from startup import startup
|
||||||
@ -19,11 +17,8 @@ game_context = GameContext()
|
|||||||
game_context.dust_car_pil = dust_car_pil
|
game_context.dust_car_pil = dust_car_pil
|
||||||
game_context.dust_car_pygame = pygame.image.frombuffer(dust_car_pil.tobytes(), dust_car_pil.size, 'RGB')
|
game_context.dust_car_pygame = pygame.image.frombuffer(dust_car_pil.tobytes(), dust_car_pil.size, 'RGB')
|
||||||
game_context.canvas = canvas
|
game_context.canvas = canvas
|
||||||
|
|
||||||
city = City()
|
|
||||||
|
|
||||||
startup(game_context)
|
startup(game_context)
|
||||||
collect_garbage(game_context, city)
|
collect_garbage(game_context)
|
||||||
|
|
||||||
exit = False
|
exit = False
|
||||||
|
|
||||||
|
12
movement.py
12
movement.py
@ -9,18 +9,18 @@ from agentOrientation import AgentOrientation
|
|||||||
import pygame
|
import pygame
|
||||||
from bfs import find_path_to_nearest_can
|
from bfs import find_path_to_nearest_can
|
||||||
from agentState import AgentState
|
from agentState import AgentState
|
||||||
from city import City
|
|
||||||
|
|
||||||
|
|
||||||
def collect_garbage(game_context: GameContext, city: City) -> None:
|
def collect_garbage(game_context: GameContext) -> None:
|
||||||
while True:
|
while True:
|
||||||
start_agent_state = AgentState(game_context.dust_car.position, game_context.dust_car.orientation)
|
start_agent_state = AgentState(game_context.dust_car.position, game_context.dust_car.orientation)
|
||||||
path = find_path_to_nearest_can(start_agent_state, game_context.grid, city)
|
path = find_path_to_nearest_can(start_agent_state, game_context.grid, game_context.city)
|
||||||
if path is None or len(path) == 0:
|
if path == None or len(path) == 0:
|
||||||
break
|
break
|
||||||
move_dust_car(path, game_context)
|
move_dust_car(path, game_context)
|
||||||
next_position = calculate_next_position(game_context.dust_car)
|
next_position = calculate_next_position(game_context.dust_car)
|
||||||
game_context.grid[next_position] = GridCellType.VISITED_GARBAGE_CAN
|
game_context.grid[next_position] = GridCellType.VISITED_GARBAGE_CAN
|
||||||
|
game_context.city.cans_dict[next_position].is_visited = True
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
@ -41,8 +41,10 @@ def move_dust_car(actions: list[AgentActionType], game_context: GameContext) ->
|
|||||||
game_context.render_in_cell(street_position, "imgs/street_horizontal.png")
|
game_context.render_in_cell(street_position, "imgs/street_horizontal.png")
|
||||||
elif game_context.grid[street_position] == GridCellType.STREET_VERTICAL:
|
elif game_context.grid[street_position] == GridCellType.STREET_VERTICAL:
|
||||||
game_context.render_in_cell(street_position, "imgs/street_vertical.png")
|
game_context.render_in_cell(street_position, "imgs/street_vertical.png")
|
||||||
|
elif game_context.grid[street_position] == GridCellType.SPEED_BUMP:
|
||||||
|
game_context.render_in_cell(street_position, "imgs/speed_bump.png")
|
||||||
pygame.display.update()
|
pygame.display.update()
|
||||||
time.sleep(0.5)
|
time.sleep(0.15)
|
||||||
|
|
||||||
|
|
||||||
def calculate_next_position(car: GarbageTruck) -> Tuple[int, int]:
|
def calculate_next_position(car: GarbageTruck) -> Tuple[int, int]:
|
||||||
|
Loading…
Reference in New Issue
Block a user