refactor code and add a_star algorithm

This commit is contained in:
Dominik Cupał 2021-04-27 21:40:59 +02:00
parent c4a1c40f4a
commit 7098fa397e
9 changed files with 277 additions and 153 deletions

View File

@ -1,14 +1,16 @@
#!/usr/bin/python3
import copy
from queue import Queue
from queue import Queue, PriorityQueue
from threading import Event
import pygame
from config import *
from app.graphsearch import Node, Graphsearch
from app.graphsearch import Node
from app.board import Board
from app.tractor import Tractor
from app.a_star import AStar
from app.bfs import Bfs
class App:
@ -39,15 +41,6 @@ class App:
def keys_pressed_handler(self):
keys = pygame.key.get_pressed()
if keys[pygame.K_UP]:
self.__tractor.direction_up()
if keys[pygame.K_DOWN]:
self.__tractor.direction_down()
if keys[pygame.K_LEFT]:
self.__tractor.direction_left()
if keys[pygame.K_RIGHT]:
self.__tractor.direction_right()
if keys[pygame.K_m]:
self.__tractor.move()
print(self.__tractor)
@ -81,24 +74,40 @@ class App:
else:
print(f"Bfs is succeed")
self.__bot_is_running.set()
self.__tractor.move_by_bfs_handler(self.__moves, self.__bot_is_running)
self.__tractor.run_bot_handler(self.__moves, self.__bot_is_running)
if keys[pygame.K_c]:
self.get_moves_by_a_star()
if not self.__moves:
print(f"A Star is failed")
else:
print(f"A Star is succeed")
self.__bot_is_running.set()
self.__tractor.run_bot_handler(self.__moves, self.__bot_is_running)
def update_screen(self):
def update_screen(self) -> None:
pygame.display.flip()
def quit(self):
def quit(self) -> None:
pygame.quit()
def get_moves_by_bfs(self):
def get_moves_by_a_star(self) -> None:
x, y = self.__tractor.get_position()
node = Node(None, x, y, self.__tractor.get_direction(), 0, "movement", "initial state")
board = copy.deepcopy(self.__board)
self.__moves = Graphsearch.bfs(Queue(), Queue(), node,
lambda n=node, b=board: Graphsearch.succ(n, b),
lambda n=node: Graphsearch.goaltest(n), board)
self.__moves = AStar.search(PriorityQueue(), Queue(), node,
lambda n=node, b=board: AStar.succ(n, b),
lambda n=node: AStar.goaltest(n), board)
def run(self):
def get_moves_by_bfs(self) -> None:
x, y = self.__tractor.get_position()
node = Node(None, x, y, self.__tractor.get_direction(), 0, "movement", "initial state")
board = copy.deepcopy(self.__board)
self.__moves = Bfs.search(Queue(), Queue(), node,
lambda n=node, b=board: Bfs.succ(n, b),
lambda n=node: Bfs.goaltest(n), board)
def run(self) -> None:
self.initialize()
while self.__running:

110
app/a_star.py Normal file
View File

@ -0,0 +1,110 @@
from __future__ import annotations
import copy
from typing import Callable, Union
from queue import Queue, PriorityQueue
from app.board import Board
from config import *
from app.graphsearch import Node, Graphsearch
class PriorityItem:
def __init__(self, node: Node, priority: int):
self.node = node
self.priority = priority
def __lt__(self, other: PriorityItem):
return self.priority < other.priority
class AStar(Graphsearch):
@staticmethod
def convert_queue_of_priority_items_to_list(q: Queue[PriorityItem], *args) -> list:
items = []
[items.append((i.node.get_x(), i.node.get_y(), i.node.get_direction(), *args)) for i in q.queue]
return items
@staticmethod
def convert_queue_of_priority_items_with_priority_to_list(q: Queue[PriorityItem], *args) -> list:
items = []
[items.append((i.node.get_x(), i.node.get_y(), i.node.get_direction(), i.priority, *args)) for i in q.queue]
return items
@staticmethod
def replace_node_in_fringe_queue(fringe: PriorityQueue[PriorityItem], state) -> None:
tmp_queue = Queue()
while not fringe.empty():
item = fringe.get()
if state == item.node.transform_node_to_tuple():
break
else:
tmp_queue.put(item)
while not tmp_queue.empty():
fringe.put(tmp_queue.get())
@staticmethod
def replace_nodes(fringe: PriorityQueue[PriorityItem], priority: int, state: tuple[int, int, float]):
fringe_items = AStar.convert_queue_of_priority_items_with_priority_to_list(fringe)
for s in fringe_items:
if s[:-1] == state[:-1]:
if s[-1] > priority:
# s[-1] is priority (last element of state tuple)
# remove and add node to fringe - PriorityQueue
AStar.replace_node_in_fringe_queue(fringe, state[:-1])
break
@staticmethod
def g(board: Board, node: Node) -> int:
"""cost function"""
result = 0
while node.get_node() is not None:
field = board.get_field(node.get_x(), node.get_y())
result += field.get_value()
node = node.get_node()
return result
@staticmethod
def h(node: Node) -> int:
"""heuristic function"""
crops_to_harvested = AMOUNT_OF_CROPS - node.get_amount_of_harvested_crops()
return crops_to_harvested ** 2 + crops_to_harvested * 10
@staticmethod
def f(board: Board, node: Node) -> int:
"""evaluation function"""
return AStar.g(board, node) + AStar.h(node) + VALUE_OF_MOVEMENT
@staticmethod
def search(fringe: PriorityQueue, explored: Queue, istate: Node,
succ: Callable[[Node, Board], list],
goaltest: Callable[[Node], bool], board: Board) -> Union[bool, list]:
print(f"Start A*")
fringe.put(PriorityItem(istate, 0))
while True:
if fringe.empty():
return False
item = fringe.get()
if goaltest(item.node):
return AStar.get_all_moves(item.node)
copied_item = copy.deepcopy(item)
explored.put(item)
for (action, state) in succ(copied_item.node, board):
# print(state)
fringe_items = AStar.convert_queue_of_priority_items_to_list(fringe)
explored_items = AStar.convert_queue_of_priority_items_to_list(explored)
n = Node(item.node, *state, *action)
priority = AStar.f(board, n)
# print(priority)
if state[:-1] not in fringe_items and state[:-1] not in explored_items:
fringe.put(PriorityItem(n, priority))
elif state[:-1] in fringe_items:
AStar.replace_nodes(fringe, priority, state[:-1])

View File

@ -7,17 +7,12 @@ class BaseField:
def __init__(self, img_path: str):
self._img_path = img_path
def draw_field(self, screen: pygame.Surface, pos_x: int, pos_y: int,
is_centered: bool = False, size: tuple = None, angle: float = 0.0):
def draw_field(self, screen: pygame.Surface, pos_x: int,
pos_y: int, is_centered: bool = False,
size: tuple = None, angle: float = 0.0) -> None:
img = pygame.image.load(self._img_path)
img = pygame.transform.rotate(img, angle)
# def draw_field(self, screen: pygame.Surface, pos_x: int, pos_y: int, is_centered: bool = False, size: tuple = None,
# angle: int = 0):
# pre_img = pygame.image.load(self._img_path)
# if angle == 90:
# pre_img = pygame.transform.flip(pre_img, True, False)
# img = pygame.transform.rotate(pre_img, angle)
scale = pygame.transform.scale(img, (FIELD_SIZE, FIELD_SIZE))
rect = img.get_rect()

44
app/bfs.py Normal file
View File

@ -0,0 +1,44 @@
import copy
from queue import Queue
from typing import Callable, Union
from app import Board
from app.graphsearch import Graphsearch, Node
class Bfs(Graphsearch):
@staticmethod
def search(fringe: Queue, explored: Queue, istate: Node,
succ: Callable[[Node, Board], list],
goaltest: Callable[[Node], bool], board: Board) -> Union[bool, list]:
print(f"Start bfs")
fringe.put(istate)
while True:
if fringe.empty():
# print(list(explored.queue))
# return Graphsearch.get_all_moves(explored.get())
return False
item = fringe.get()
if goaltest(item):
# board.print_board()
return Graphsearch.get_all_moves(item)
copied_item = copy.deepcopy(item)
explored.put(item)
for (action, state) in succ(copied_item, board):
# print(state)
fringe_items = []
explored_items = []
[fringe_items.append((i.get_x(), i.get_y(), i.get_direction()))
for i in fringe.queue]
[explored_items.append((i.get_x(), i.get_y(), i.get_direction()))
for i in explored.queue]
if state[:-1] not in fringe_items and state[:-1] not in explored_items:
n = Node(item, *state, *action)
fringe.put(n)

View File

@ -21,21 +21,21 @@ class Board:
def get_fields(self) -> list:
return self.__fields
def get_field(self, x: int, y: int) -> BaseField:
def get_field(self, x: int, y: int) -> Field:
return self.__fields[x][y]
def create_board(self):
def create_board(self) -> None:
for i in range(HORIZONTAL_NUM_OF_FIELDS):
self.__fields.append([])
for j in range(VERTICAL_NUM_OF_FIELDS):
self.__fields[i].append(None)
def fill(self):
def fill(self) -> None:
for i in range(len(self.__fields)):
for j in range(len(self.__fields[i])):
self.__fields[i][j] = random.choice(FIELD_TYPES).capitalize()
def generate_board(self):
def generate_board(self) -> None:
for x in range(len(self.__fields)):
for y in range(len(self.__fields[x])):
field_type = self.__fields[x][y]
@ -43,7 +43,7 @@ class Board:
field = c()
self.__fields[x][y] = field
def draw(self, screen: pygame.Surface):
def draw(self, screen: pygame.Surface) -> None:
for x in range(len(self.__fields)):
for y in range(len(self.__fields[x])):
field = self.__fields[x][y]
@ -51,7 +51,7 @@ class Board:
pos_y = y * FIELD_SIZE
field.draw_field(screen, pos_x, pos_y)
def print_board(self):
def print_board(self) -> None:
for i in range(HORIZONTAL_NUM_OF_FIELDS):
for j in range(VERTICAL_NUM_OF_FIELDS):
print(f"{j} - {type(self.__fields[i][j]).__name__}", end=" | ")

View File

@ -5,29 +5,41 @@ from app.base_field import BaseField
from config import *
class Soil(BaseField):
class Field(BaseField):
def __init__(self, img_path: str):
super().__init__(img_path)
self._value = 0
def get_value(self) -> int:
return self._value
class Soil(Field):
def __init__(self, img_path: str):
super().__init__(img_path)
class Crops(BaseField):
class Crops(Field):
price = 0
def __init__(self, img_path: str):
super().__init__(img_path)
self.weight = 1.0
self._value = VALUE_OF_CROPS
class Plant(BaseField):
class Plant(Field):
def __init__(self, img_path: str):
super().__init__(img_path)
self.is_hydrated = False
self._value = VALUE_OF_PLANT
class Clay(Soil):
def __init__(self):
super().__init__(os.path.join(RESOURCE_DIR, f"{CLAY}.{PNG}"))
self.is_fertilized = False
self._value = VALUE_OF_CLAY
class Sand(Soil):
@ -35,6 +47,7 @@ class Sand(Soil):
super().__init__(os.path.join(RESOURCE_DIR, f"{SAND}.{PNG}"))
self.is_sowed = False
self.is_hydrated = False
self._value = VALUE_OF_SAND
class Grass(Plant):

View File

@ -1,6 +1,5 @@
from __future__ import annotations
import copy
from abc import abstractmethod
from typing import Callable, Union
from queue import Queue
@ -44,8 +43,17 @@ class Node:
def get_amount_of_harvested_crops(self) -> int:
return self.__amount_of_harvested_crops
def transform_node_to_tuple(self) -> tuple[int, int, float]:
return self.__x, self.__y, self.__direction
class Graphsearch:
@staticmethod
def convert_queue_of_nodes_to_list(q: Queue[Node], *args) -> list:
items = []
[items.append((i.get_x(), i.get_y(), i.get_direction(), *args)) for i in q.queue]
return items
@staticmethod
def succ(item: Node, board: Board) -> list:
# list of tuples (movement,action),(x,y,direction, harvested_crops)
@ -71,9 +79,9 @@ class Graphsearch:
# action_name = A_HYDRATE
elif isinstance(field, Plant):
# hydrate
action_name = A_HYDRATE
field = Tractor.irrigate_plants_succ(field, board, x, y)
# hydrate
elif type(field).__name__ in CROPS:
# harvest
@ -109,7 +117,7 @@ class Graphsearch:
return False
@staticmethod
def get_all_moves(item: Node):
def get_all_moves(item: Node) -> list:
moves = []
str_moves = []
while item.get_node() is not None:
@ -121,35 +129,8 @@ class Graphsearch:
return moves[::-1]
@staticmethod
def bfs(fringe: Queue, explored: Queue, istate: Node,
@abstractmethod
def search(fringe: Queue, explored: Queue, istate: Node,
succ: Callable[[Node, Board], list],
goaltest: Callable[[Node], bool], board: Board):
print(f"Start bfs")
fringe.put(istate)
while True:
if fringe.empty():
# print(list(explored.queue))
# return Graphsearch.get_all_moves(explored.get())
return False
item = fringe.get()
if goaltest(item):
# board.print_board()
return Graphsearch.get_all_moves(item)
copied_item = copy.deepcopy(item)
explored.put(item)
for (action, state) in succ(copied_item, board):
# print(state)
fringe_items = []
explored_items = []
[fringe_items.append((i.get_x(), i.get_y(), i.get_direction()))
for i in fringe.queue]
[explored_items.append((i.get_x(), i.get_y(), i.get_direction()))
for i in explored.queue]
if state[:-1] not in fringe_items and state[:-1] not in explored_items:
n = Node(item, *state, *action)
fringe.put(n)
goaltest: Callable[[Node], bool], board: Board) -> Union[bool, list]:
raise NotImplementedError

View File

@ -2,7 +2,6 @@
from __future__ import annotations
import random
import threading
from queue import Queue
import pygame
import os
@ -13,7 +12,7 @@ from typing import Union
from app.base_field import BaseField
from app.board import Board
from app.utils import get_class
from app.fields import CROPS, PLANTS, SOILS, Sand, Clay
from app.fields import CROPS, PLANTS, Crops, Sand, Clay, Field
from config import *
from app.fields import Plant, Soil, Crops
@ -30,12 +29,12 @@ class Tractor(BaseField):
self.__harvested_corps = []
self.__fuel = 10
def draw(self, screen: pygame.Surface):
def draw(self, screen: pygame.Surface) -> None:
self.draw_field(screen, self.__pos_x + FIELD_SIZE / 2, self.__pos_y + FIELD_SIZE / 2,
is_centered=True, size=(FIELD_SIZE, FIELD_SIZE), angle=self.__direction)
# Key methods handlers
def move(self):
def move(self) -> None:
if self.__direction == D_EAST:
self.move_right()
elif self.__direction == D_NORTH:
@ -45,64 +44,29 @@ class Tractor(BaseField):
else:
self.move_down()
def move_up(self):
def move_up(self) -> None:
if self.__pos_y - self.__move >= 0:
self.__pos_y -= self.__move
def move_down(self):
def move_down(self) -> None:
if self.__pos_y + self.__move + FIELD_SIZE <= HEIGHT:
self.__pos_y += self.__move
def move_left(self):
def move_left(self) -> None:
if self.__pos_x - self.__move >= 0:
self.__pos_x -= self.__move
def move_right(self):
def move_right(self) -> None:
if self.__pos_x + self.__move + FIELD_SIZE <= WIDTH:
self.__pos_x += self.__move
def direction_up(self):
self.__direction = 0
def direction_right(self):
self.__direction = 270
def direction_down(self):
self.__direction = 180
def direction_left(self):
self.__direction = 90
# def move(self):
# if self.__fuel > 0:
# self.__fuel -= 1
#
# if self.__direction == 0:
# if self.__pos_y - self.__move >= 0:
# self.__pos_y -= self.__move
#
# elif self.__direction == 270:
# if self.__pos_x + self.__move + FIELD_SIZE <= WIDTH:
# self.__pos_x += self.__move
#
# elif self.__direction == 180:
# if self.__pos_y + self.__move + FIELD_SIZE <= HEIGHT:
# self.__pos_y += self.__move
#
# elif self.__direction == 90:
# if self.__pos_x - self.__move >= 0:
# self.__pos_x -= self.__move
# else:
# print("Run out of fuel!")
def rotate_left(self):
def rotate_left(self) -> None:
self.__direction = (self.__direction - 90.0) % 360.0
def rotate_right(self):
def rotate_right(self) -> None:
self.__direction = (self.__direction + 90.0) % 360.0
def hydrate(self):
def hydrate(self) -> None:
if self.check_field(Sand):
field = self.get_field_from_board()
if not field.is_hydrated and field.is_sowed:
@ -114,34 +78,34 @@ class Tractor(BaseField):
print("Hydrate plant")
self.irrigate_plants(field)
def hydrate_sand(self):
def hydrate_sand(self) -> None:
if self.check_field(Sand):
field = self.get_field_from_board()
if not field.is_hydrated and field.is_sowed:
print('Hydrate soil')
self.irrigate_sand(field)
def hydrate_plant(self):
def hydrate_plant(self) -> None:
if self.check_field(Plant):
field = self.get_field_from_board()
if not field.is_hydrated:
print("Hydrate plant")
self.irrigate_plants(field)
def sow(self):
def sow(self) -> None:
field = self.get_field_from_board()
if self.check_field(Sand) and not field.is_sowed:
print('Sow')
field.is_sowed = True
def harvest(self):
def harvest(self) -> None:
if self.check_field(Crops):
print('Harvest')
field = self.get_field_from_board()
self.harvest_crops(field)
self.get_result_of_harvesting()
def fertilize(self):
def fertilize(self) -> None:
if self.check_field(Clay):
print('Fertilize soil')
field = self.get_field_from_board()
@ -149,56 +113,56 @@ class Tractor(BaseField):
################################################################################
def fertilize_clay(self, field: Clay):
def fertilize_clay(self, field: Clay) -> None:
field.is_fertilized = True
self.do_action((Sand.__name__,))
def irrigate_plants(self, field: Plant):
def irrigate_plants(self, field: Plant) -> None:
field.is_hydrated = True
# self.do_time_action(CROPS)
self.do_action(CROPS)
def irrigate_sand(self, field: Sand):
def irrigate_sand(self, field: Sand) -> None:
field.is_hydrated = True
# self.do_time_action(PLANTS)
self.do_action(PLANTS)
def harvest_crops(self, field: Crops):
def harvest_crops(self, field: Crops) -> None:
self.__harvested_corps.append(type(field).__name__)
self.do_action(SOILS)
self.do_action((Clay.__name__,))
def do_action(self, TYPE: tuple):
def do_action(self, TYPE: tuple) -> None:
choosen_type = random.choice(TYPE)
obj = get_class("app.fields", choosen_type)
x, y = self.get_position()
self.__board.get_fields()[x][y] = obj()
def do_time_action(self, TYPE: tuple):
def do_time_action(self, TYPE: tuple) -> None:
thread = Thread(target=self.do_time_action_handler, args=(TYPE,), daemon=True)
thread.start()
def do_time_action_handler(self, TYPE: tuple):
def do_time_action_handler(self, TYPE: tuple) -> None:
time.sleep(TIME_OF_GROWING)
self.do_action(TYPE)
def check_field(self, class_name: Union[type(Plant), type(Crops), type(Soil)]):
def check_field(self, class_name: Union[type(Plant), type(Crops), type(Soil)]) -> bool:
if isinstance(self.get_field_from_board(), class_name):
return True
return False
def get_field_from_board(self):
def get_field_from_board(self) -> BaseField:
x, y = self.get_position()
return self.__board.get_fields()[x][y]
def get_field_from_board_by_positions(self, x, y):
def get_field_from_board_by_positions(self, x, y) -> BaseField:
return self.__board.get_fields()[x][y]
def get_position(self):
def get_position(self) -> tuple[int, int]:
x = self.__pos_x // FIELD_SIZE
y = self.__pos_y // FIELD_SIZE
return x, y
def get_result_of_harvesting(self):
def get_result_of_harvesting(self) -> None:
crops = set(self.__harvested_corps)
result = 0.0
for crop in crops:
@ -208,20 +172,20 @@ class Tractor(BaseField):
print(f"Price for collected crops: {result:.2f}")
def __str__(self):
def __str__(self) -> str:
x, y = self.get_position()
return f"Position: {x}:{y} - {type(self.__board.get_fields()[x][y]).__name__}"
def get_direction(self):
def get_direction(self) -> float:
return self.__direction
def move_by_bfs_handler(self, moves: list[tuple[str, str]], is_running: threading.Event):
thread = threading.Thread(target=self.move_by_bfs, args=(moves, is_running), daemon=True)
def run_bot_handler(self, moves: list[tuple[str, str]], is_running: threading.Event) -> None:
thread = threading.Thread(target=self.run_bot, args=(moves, is_running), daemon=True)
thread.start()
def move_by_bfs(self, moves: list[tuple[str, str]], is_running: threading.Event):
def run_bot(self, moves: list[tuple[str, str]], is_running: threading.Event) -> None:
print(moves)
print(f"Length of Moves {len(moves)} - {3 ** len(moves)}")
print(f"Length of Moves {len(moves)}") #- {3 ** len(moves)}")
while len(moves) > 0:
movement, action = moves.pop(0)
# do action
@ -267,34 +231,33 @@ class Tractor(BaseField):
return None
@staticmethod
def fertilize_clay_succ(field: Clay, board: Board, x: int, y: int):
def fertilize_clay_succ(field: Clay, board: Board, x: int, y: int) -> Sand:
field.is_fertilized = True
return Tractor.do_action_succ(board, x, y, (Sand.__name__,))
@staticmethod
def sow_succ(field: Sand):
def sow_succ(field: Sand) -> Sand:
field.is_sowed = True
return field
@staticmethod
def irrigate_plants_succ(field: Plant, board: Board, x: int, y: int):
def irrigate_plants_succ(field: Plant, board: Board, x: int, y: int) -> Crops:
field.is_hydrated = True
return Tractor.do_action_succ(board, x, y, CROPS)
@staticmethod
def irrigate_sand_succ(field: Sand, board: Board, x: int, y: int):
def irrigate_sand_succ(field: Sand, board: Board, x: int, y: int) -> Plant:
field.is_hydrated = True
return Tractor.do_action_succ(board, x, y, PLANTS)
@staticmethod
def harvest_crops_succ(board: Board, x: int, y: int):
def harvest_crops_succ(board: Board, x: int, y: int) -> Clay:
# Tractor.__harvested_corps.append(type(field).__name__)
return Tractor.do_action_succ(board, x, y, SOILS)
return Tractor.do_action_succ(board, x, y, (Clay.__name__,))
@staticmethod
def do_action_succ(board: Board, x: int, y: int, types: tuple):
def do_action_succ(board: Board, x: int, y: int, types: tuple) -> Union[Sand, Clay, Plant, Crops]:
choosen_type = random.choice(types)
obj = get_class("app.fields", choosen_type)
board.get_fields()[x][y] = obj()
return obj()
# return f"Position: {x}:{y} - {type(self.__board.get_fields()[x][y]).__name__}\nFuel: {self.__fuel}\n"

View File

@ -10,7 +10,9 @@ __all__ = (
'FIELD_TYPES', 'TIME_OF_GROWING', 'AMOUNT_OF_CROPS',
'M_GO_FORWARD', 'M_ROTATE_LEFT', 'M_ROTATE_RIGHT',
'A_SOW', 'A_HARVEST', 'A_HYDRATE', 'A_FERTILIZE', 'A_DO_NOTHING',
'D_NORTH', 'D_EAST', 'D_SOUTH', 'D_WEST'
'D_NORTH', 'D_EAST', 'D_SOUTH', 'D_WEST',
'VALUE_OF_MOVEMENT', 'VALUE_OF_CROPS', 'VALUE_OF_PLANT',
'VALUE_OF_SAND', 'VALUE_OF_CLAY'
)
# Board settings:
@ -64,6 +66,13 @@ A_HYDRATE = "hydrate"
A_FERTILIZE = "fertilize"
A_DO_NOTHING = "do nothing"
# Costs movement and fields:
VALUE_OF_MOVEMENT = 8
VALUE_OF_CROPS = 1
VALUE_OF_PLANT = 3
VALUE_OF_SAND = 7
VALUE_OF_CLAY = 10
# Times
TIME_OF_GROWING = 2
TIME_OF_MOVING = 2