Implement working A* algorithm without tile cost

This commit is contained in:
Adam Szpilkowski 2022-05-18 20:05:47 +02:00
parent 1d45722aa5
commit 81398cb15b
2 changed files with 149 additions and 122 deletions

50
main.py
View File

@ -3,28 +3,27 @@ import pygame
from src.world import World
from src.tractor import Tractor
from src.settings import Settings
from src.utils.bfs import BFSSearcher
from src.constants import Constants
from utils.astar import a_star
import numpy as np
from utils.astar import a_star_search
def main():
pygame.init()
settings = Settings()
world = World(settings)
tractor = Tractor("Spalinowy", "Nawóz 1", settings, 8 * settings.tile_size, 8 * settings.tile_size)
obstacles = [tile for tile in world.tiles if tile.type == 'rock']
settings = Settings() # ustawienia pygame
world = World(settings) # stworzenie mapy na bazie ustawień pygame
tractor = Tractor("Spalinowy", "Nawóz 1", settings, 8 * settings.tile_size, 8 * settings.tile_size) # stworzenie traktora z podanymi argumentami
obstacles = [tile for tile in world.tiles if tile.type == 'rock'] # stworzenie listy z przeszkodami, kamień = przeszkoda
clock = pygame.time.Clock() # FPS purpose
screen = pygame.display.set_mode((settings.screen_width, settings.screen_height))
pygame.display.set_caption('TRAKTOHOLIK')
screen = pygame.display.set_mode((settings.screen_width, settings.screen_height)) # tworzenie ekranu
pygame.display.set_caption('TRAKTOHOLIK') # nazwa okna
start_cords = (8, 1)
goals = [(3, 3), (7, 7), (0, 0)]
end_cords = goals[0]
start_dir = tractor.curr_direction
# path = BFSSearcher().search(start_cords, end_cords, start_dir)
start_dir = tractor.curr_direction # przypisanie początkowego ustawienia traktora do zmiennej
# path = BFSSearcher().search(start_cords, end_cords, start_dir) # wygenerowanie listy ruchów na bazie BFS
path = a_star_search(start_cords, end_cords, start_dir) # generowanie ścieżki za pomocą A*
run = True
while run:
@ -37,22 +36,17 @@ def main():
if event.type == pygame.QUIT:
run = False
# if path:
# action = path.pop(0)
# tractor.update(action)
# else:
# if len(goals) > 1:
# start_cord = goals.pop(0)
# end_cords = goals[0]
# start_dir = tractor.curr_direction
# path = BFSSearcher().search(start_cord, end_cords, start_dir)
while goals:
goal = goals.pop(0)
path = a_star(world, start_cords, goal)
start_cords = goal
print(path)
# iteracja przez listę ruchów
if path:
action = path.pop(0) # pobranie pierwszego ruchu z listy
tractor.update(action) # wykonanie ruchu przez traktor
else:
if len(goals) > 1: # sprawdzenie czy są inne cele
new_start = goals.pop(0) # pobierz współrzędne pierwszego celu i ustaw jako początkowe
end_cords = goals[0] # ustaw kolejny cel
start_dir = tractor.curr_direction # aktualizacja kierunku traktora
# path = BFSSearcher().search(start_cord, end_cords, start_dir) # generacja nowej ścieżki
path = a_star_search(new_start, end_cords, start_dir)
pygame.time.wait(settings.freeze_time)
pygame.display.update()

View File

@ -1,107 +1,140 @@
import world
from operator import itemgetter
import copy
import tractor
import world
from constants import Constants as C
# noinspection DuplicatedCode
class Node:
def __init__(self, x, y): # inicjalizacja węzła
def __init__(self, x, y, agent_direction, action=None, parent=None): # inicjalizacja węzła
self.x = x
self.y = y
self.neighbours = []
self.parent = None
self.g_cost = 0
self.f_cost = 0
self.h_cost = 0
self.state = tuple([self.x, self.y])
self.agent_direction = agent_direction
self.parent = parent
self.action = action
def create_neighbours(self, field: world): # tworzenie sąsiadów i sprawdzanie czy nie wykraczają poza macierz i czy nie są przeszkodą
if self.x - 1 >= 0 and field.world_data[self.x - 1][self.y] != 0:
self.neighbours.append([self.x - 1, self.y])
def get_parent(self):
return self.parent
if self.x + 1 < len(field.world_data) and field.world_data[self.x + 1][self.y] != 0:
self.neighbours.append([self.x + 1, self.y])
def get_direction(self):
return self.agent_direction
if self.y + 1 < len(field.world_data[0]) and field.world_data[self.x][self.y + 1] != 0:
self.neighbours.append([self.x, self.y + 1])
def get_x(self):
return self.x
if self.y - 1 >= 0 and field.world_data[self.x][self.y - 1] != 0:
self.neighbours.append([self.x, self.y - 1])
def get_y(self):
return self.y
def calculate_h_cost(self, goal):
self.h_cost = abs(self.x - goal.x) + abs(self.y - goal.y)
def get_action(self):
return self.action
def __repr__(self):
return " <Node x:%s y:%s state:%s agent_dir:%s parent:%s action:%s> " % (
self.x, self.y, self.state, self.agent_direction, self.parent, self.action)
def successor(self):
action_list = []
x = self.x
y = self.y
# vector rotation: https://stackoverflow.com/questions/4780119/2d-euclidean-vector-rotations
temp_direction = self.agent_direction
temp_direction = [-temp_direction[1], temp_direction[0]]
action_list.append((C.ROTATE_LEFT, ((x, y), temp_direction)))
temp_direction = self.agent_direction
temp_direction = [temp_direction[1], -temp_direction[0]]
action_list.append((C.ROTATE_RIGHT, ((x, y), temp_direction)))
if self.agent_direction == C.RIGHT and x < 9:
action_list.append((C.MOVE, ((x + 1, y), self.agent_direction)))
elif self.agent_direction == C.LEFT and x > 0:
action_list.append((C.MOVE, ((x - 1, y), self.agent_direction)))
elif self.agent_direction == C.UP and y < 9:
action_list.append((C.MOVE, ((x, y + 1), self.agent_direction)))
elif self.agent_direction == C.DOWN and y > 0:
action_list.append((C.MOVE, ((x, y - 1), self.agent_direction)))
return action_list
def a_star(field: world, start, goal): # algorytm zwraca ścieżkę którą ma przebyć agent
# tworzenie dictionary składającego się z nodeów
node_dict = {}
for i in range(len(field.world_data)):
for j in range(len(field.world_data[0])):
node_dict[i, j] = Node(i, j)
# przypisanie startu i celu do zmiennej
start = node_dict[start[0], start[1]]
goal = node_dict[goal[0], goal[1]]
# inicjalizacja listy open i closed
open_list = []
closed_list = []
# dodanie do listy punktu startowego
open_list.append(start)
while open_list:
# sortowanie listy open rosnąco
open_list.sort(key=lambda o: o.f_cost)
# pobranie z listy open elementu o najniższym koszcie
current = open_list.pop(0)
# dodanie do listy closed pobranego elementu
closed_list.append(current)
# stworzenie listy z możliwymi opcjami
current.create_neighbours(field)
# sprawdzenie celu
if current == goal:
break
# iteracja przez sąsiadów
for next in current.neighbours:
if node_dict[next[0], next[1]] in closed_list:
continue
if node_dict[next[0], next[1]] in open_list:
new_g_cost = current.g_cost + 1
if node_dict[next[0], next[1]].g_cost > new_g_cost:
node_dict[next[0], next[1]].g_cost = new_g_cost
node_dict[next[0], next[1]].parent = node_dict[current.x, current.y]
else:
node_dict[next[0], next[1]].g_cost = node_dict[current.x, current.y].g_cost + 1
node_dict[next[0], next[1]].calculate_h_cost(goal)
node_dict[next[0], next[1]].parent = node_dict[current.x, current.y]
open_list.append(node_dict[next[0], next[1]])
# inicjalizacja listy ze ścieżką
path = []
while current.parent:
path.append([current.x, current.y])
current = current.parent
# dodanie punktu początkowego oraz odwrócenie i zwrócenie listy
path.append([start.x, start.y])
path.reverse()
return path
def h_cost(goal: tuple, node: Node): # oblicznie heurystyki
return abs(node.x - goal[0]) + abs(node.y - goal[1])
# def path_to_action(path: list): # funkcja zmieniająca ścieżkę nodeów na instrukcję dla agenta
# vector_list = []
# action_list = []
#
# # tmp_direction = agent_direction
# # iteracja przez wszystkie pary elementów żeby obliczyć wektory ruchu
# for i in range(len(path) - 1):
# tmp_vector = [(path[i+1][0] - path[i][0]), (path[i+1][1] - path[i][1])]
# vector_list.append(tmp_vector)
#
# return vector_list
def g_cost(node: Node): # funkcja kosztu : ile kosztuje przejechanie przez dane pole
cost = 0
while node.get_parent() is not None:
cost += 10
node = node.get_parent()
return cost
def f_cost(goal: tuple, node: Node): # funkcja zwracająca sumę funkcji kosztu oraz heurestyki
return g_cost(node) + h_cost(goal, node)
def print_moves(node: Node): # zwraca listę ruchów jakie należy wykonać by dotrzeć do punktu docelowego
moves_list = []
while node.parent is not None:
moves_list.append(node.action)
node = node.parent
return moves_list[::-1]
def is_goal_reached(element: Node, goal: tuple):
return (element.x, element.y) == goal
def a_star_search(state: tuple, goal: tuple, agent_direction: list):
fringe = []
explored = []
starting_node = Node(state[0], state[1], agent_direction, None, None)
fringe.append((starting_node, 0))
while fringe:
element = fringe.pop(0)
if is_goal_reached(element[0], goal):
return print_moves(element[0])
explored.append(element)
for (action, elem_state) in element[0].successor():
fringe_tuple = []
fringe_tuple_prio = []
explored_tuple = []
for (node, cost) in fringe:
fringe_tuple.append(((node.get_x(), node.get_y()), node.get_direction()))
fringe_tuple_prio.append((((node.get_x(), node.get_y()), node.get_direction()), cost))
for (node, cost) in explored:
explored_tuple.append(((node.get_x(), node.get_y()), node.get_direction()))
new_node = Node(elem_state[0][0], elem_state[0][1], elem_state[1], action, element[0])
p = f_cost(goal, new_node)
if elem_state not in fringe_tuple and elem_state not in explored_tuple:
fringe.append((new_node, p))
fringe = sorted(fringe, key=itemgetter(1))
# print("fringe_loop_if: ", fringe)
elif elem_state in fringe_tuple:
i = 0
for (state_prio, r) in fringe_tuple_prio:
if str(state_prio) == str(elem_state):
if r > p:
fringe.insert(i, (new_node, p))
fringe.pop(i + 1)
fringe = sorted(fringe, key=itemgetter(1))
# print("fringe_loop_elif: ", fringe)
break
i += 1