from __future__ import annotations import copy from typing import Callable, Union from queue import Queue, PriorityQueue from app.board import Board from config import * from app.graphsearch import Node, Graphsearch class PriorityItem: def __init__(self, node: Node, priority: int): self.node = node self.priority = priority def __lt__(self, other: PriorityItem): return self.priority < other.priority class AStar(Graphsearch): @staticmethod def convert_queue_of_priority_items_to_list(q: Queue[PriorityItem], *args) -> list: items = [] [items.append((i.node.get_x(), i.node.get_y(), i.node.get_direction(), *args)) for i in q.queue] return items @staticmethod def convert_queue_of_priority_items_with_priority_to_list(q: Queue[PriorityItem], *args) -> list: items = [] [items.append((i.node.get_x(), i.node.get_y(), i.node.get_direction(), i.priority, *args)) for i in q.queue] return items @staticmethod def replace_node_in_fringe_queue(fringe: PriorityQueue[PriorityItem], state) -> None: tmp_queue = Queue() while not fringe.empty(): item = fringe.get() if state == item.node.transform_node_to_tuple(): break else: tmp_queue.put(item) while not tmp_queue.empty(): fringe.put(tmp_queue.get()) @staticmethod def replace_nodes(fringe: PriorityQueue[PriorityItem], priority: int, state: tuple[int, int, float]): fringe_items = AStar.convert_queue_of_priority_items_with_priority_to_list(fringe) for s in fringe_items: if s[:-1] == state[:-1]: if s[-1] > priority: # s[-1] is priority (last element of state tuple) # remove and add node to fringe - PriorityQueue AStar.replace_node_in_fringe_queue(fringe, state[:-1]) break @staticmethod def g(board: Board, node: Node) -> int: """cost function""" result = 0 while node.get_node() is not None: field = board.get_field(node.get_x(), node.get_y()) result += field.get_value() node = node.get_node() return result @staticmethod def h(node: Node) -> int: """heuristic function""" crops_to_harvested = AMOUNT_OF_CROPS - node.get_amount_of_harvested_crops() return crops_to_harvested ** 2 + crops_to_harvested * 10 @staticmethod def f(board: Board, node: Node) -> int: """evaluation function""" return AStar.g(board, node) + AStar.h(node) @staticmethod def search(fringe: PriorityQueue, explored: Queue, istate: Node, succ: Callable[[Node, Board], list], goaltest: Callable[[Node], bool], board: Board) -> Union[bool, list]: print(f"Start A*") fringe.put(PriorityItem(istate, 0)) while True: if fringe.empty(): return False item = fringe.get() if goaltest(item.node): return AStar.get_all_moves(item.node) copied_item = copy.deepcopy(item) explored.put(item) for (action, state) in succ(copied_item.node, board): # print(state) fringe_items = AStar.convert_queue_of_priority_items_to_list(fringe) explored_items = AStar.convert_queue_of_priority_items_to_list(explored) n = Node(item.node, *state, *action) priority = AStar.f(board, n) # print(priority) if state[:-1] not in fringe_items and state[:-1] not in explored_items: fringe.put(PriorityItem(n, priority)) elif state[:-1] in fringe_items: AStar.replace_nodes(fringe, priority, state[:-1])