3f4b73373a
Co-authored-by: Marcin Matoga <marmat35@st.amu.edu.pl>
627 lines
19 KiB
Python
627 lines
19 KiB
Python
from os import path
|
|
import heapq
|
|
import copy
|
|
|
|
from settings import *
|
|
from sprites import Direction
|
|
|
|
|
|
class PlanRoute():
|
|
""" The problem of moving the Agent from one place to other """
|
|
|
|
def __init__(self, initial, goal, allowed, puddles=None, dimrow=None):
|
|
""" Define goal state and initialize a problem """
|
|
self.initial = initial
|
|
self.goal = goal
|
|
self.dimrow = dimrow
|
|
self.goal = goal
|
|
self.allowed = allowed
|
|
self.puddles = puddles
|
|
|
|
def actions(self, state):
|
|
possible_actions = ['Forward', 'Left', 'Right']
|
|
x, y = state.get_location()
|
|
orientation = state.get_orientation()
|
|
|
|
# Prevent Bumps
|
|
if y == 0 and orientation == 'LEFT':
|
|
if 'Forward' in possible_actions:
|
|
possible_actions.remove('Forward')
|
|
if x == 0 and orientation == 'DOWN':
|
|
if 'Forward' in possible_actions:
|
|
possible_actions.remove('Forward')
|
|
if y == self.dimrow and orientation == 'RIGHT':
|
|
if 'Forward' in possible_actions:
|
|
possible_actions.remove('Forward')
|
|
if x == self.dimrow and orientation == 'UP':
|
|
if 'Forward' in possible_actions:
|
|
possible_actions.remove('Forward')
|
|
|
|
return possible_actions
|
|
|
|
def result(self, state, action):
|
|
""" Given state and action, return a new state that is the result of the action.
|
|
Action is assumed to be a valid action in the state """
|
|
x, y = state.get_location()
|
|
proposed_loc = list()
|
|
#proposed_loc = []
|
|
|
|
# Move Forward
|
|
if action == 'Forward':
|
|
if state.get_orientation() == 'UP':
|
|
proposed_loc = [x + 1, y]
|
|
elif state.get_orientation() == 'DOWN':
|
|
proposed_loc = [x - 1, y]
|
|
elif state.get_orientation() == 'LEFT':
|
|
proposed_loc = [x, y - 1]
|
|
elif state.get_orientation() == 'RIGHT':
|
|
proposed_loc = [x, y + 1]
|
|
else:
|
|
raise Exception('InvalidOrientation')
|
|
|
|
# Rotate counter-clockwise
|
|
elif action == 'Right':
|
|
if state.get_orientation() == 'UP':
|
|
state.set_orientation('LEFT')
|
|
elif state.get_orientation() == 'DOWN':
|
|
state.set_orientation('RIGHT')
|
|
elif state.get_orientation() == 'LEFT':
|
|
state.set_orientation('DOWN')
|
|
elif state.get_orientation() == 'RIGHT':
|
|
state.set_orientation('UP')
|
|
else:
|
|
raise Exception('InvalidOrientation')
|
|
|
|
# Rotate clockwise
|
|
elif action == 'Left':
|
|
if state.get_orientation() == 'UP':
|
|
state.set_orientation('RIGHT')
|
|
elif state.get_orientation() == 'DOWN':
|
|
state.set_orientation('LEFT')
|
|
elif state.get_orientation() == 'LEFT':
|
|
state.set_orientation('UP')
|
|
elif state.get_orientation() == 'RIGHT':
|
|
state.set_orientation('DOWN')
|
|
else:
|
|
raise Exception('InvalidOrientation')
|
|
|
|
|
|
if(proposed_loc):
|
|
tupled_proposed_loc = tuple([proposed_loc[0], proposed_loc[1]])
|
|
|
|
if tupled_proposed_loc in self.allowed:
|
|
state.set_location(proposed_loc[0], proposed_loc[1])
|
|
|
|
return state
|
|
|
|
def goal_test(self, state):
|
|
""" Given a state, return True if state is a goal state or False, otherwise """
|
|
|
|
return state.get_location() == self.goal.get_location()
|
|
|
|
def path_cost(self, c, state1, action, state2):
|
|
if action == "Forward" or action == "Left" or action == "Right":
|
|
|
|
|
|
x1, y1 = state1.get_location()
|
|
location1 = tuple([x1, y1])
|
|
x2, y2 = state2.get_location()
|
|
location2 = tuple([x1, y1])
|
|
|
|
|
|
if location2 in self.puddles:
|
|
return c + 2
|
|
if location1 == location2 and state1 in self.puddles:
|
|
return c + 2
|
|
return c+1
|
|
|
|
def h(self, node):
|
|
""" Return the heuristic value for a given state."""
|
|
|
|
# Manhattan Heuristic Function
|
|
x1, y1 = node.state.get_location()
|
|
x2, y2 = self.goal.get_location()
|
|
|
|
return abs(x2 - x1) + abs(y2 - y1)
|
|
|
|
|
|
class Node:
|
|
def __init__(self, state, parent=None, action=None, path_cost=0):
|
|
"""Create a search tree Node, derived from a parent by an action."""
|
|
self.state = state #AgentPosition?
|
|
self.parent = parent
|
|
self.action = action
|
|
self.path_cost = path_cost
|
|
|
|
def __repr__(self):
|
|
return "<Node {}>".format(self.state)
|
|
|
|
def solution(self):
|
|
"""Return the sequence of actions to go from the root to this node."""
|
|
return [node.action for node in self.path()[1:]]
|
|
|
|
|
|
def expand(self, problem):
|
|
"""List the nodes reachable in one step from this node."""
|
|
test_node_list = [self.child_node(problem, action)
|
|
for action in problem.actions(self.state)]
|
|
|
|
return [self.child_node(problem, action)
|
|
for action in problem.actions(self.state)]
|
|
|
|
def child_node(self, problem, action):
|
|
|
|
|
|
next_state = problem.result(copy.deepcopy(self.state), action)
|
|
next_node = Node(next_state, self, action, problem.path_cost(
|
|
self.path_cost, self.state, action, next_state))
|
|
#print(problem.path_cost(
|
|
# self.path_cost, self.state, action, next_state))
|
|
return next_node
|
|
|
|
def __eq__(self, other):
|
|
return isinstance(other, Node) and self.state == other.state
|
|
|
|
def __lt__(self, other):
|
|
return isinstance(other, Node) and self.state == other.state
|
|
|
|
|
|
|
|
def __hash__(self):
|
|
# We use the hash value of the state
|
|
# stored in the node instead of the node
|
|
# object itself to quickly search a node
|
|
# with the same state in a Hash Table
|
|
return hash(self.state)
|
|
|
|
def path(self):
|
|
"""Return a list of nodes forming the path from the root to this node."""
|
|
node, path_back = self, []
|
|
while node:
|
|
path_back.append(node)
|
|
node = node.parent
|
|
return list(reversed(path_back))
|
|
|
|
|
|
class AgentPosition:
|
|
def __init__(self, x, y, orientation):
|
|
self.X = x
|
|
self.Y = y
|
|
self.orientation = orientation
|
|
|
|
def get_location(self):
|
|
return self.X, self.Y
|
|
|
|
def set_location(self, x, y):
|
|
self.X = x
|
|
self.Y = y
|
|
|
|
def get_orientation(self):
|
|
return self.orientation
|
|
|
|
def set_orientation(self, orientation):
|
|
self.orientation = orientation
|
|
|
|
def __eq__(self, other):
|
|
if (other.get_location() == self.get_location() and
|
|
other.get_orientation() == self.get_orientation()):
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
|
|
def __hash__(self):
|
|
return hash((self.X, self.Y, self.orientation))
|
|
|
|
|
|
class SweeperAgent:
|
|
def __init__(self, dimensions=None):
|
|
self.dimrow = dimensions
|
|
self.current_position = None
|
|
self.orientation = ""
|
|
self.initial = set()
|
|
self.goal = set()
|
|
self.allowed_points = set()
|
|
self.puddle_points = set()
|
|
|
|
|
|
|
|
|
|
def where_am_i(self, map_name = "map.txt"):
|
|
temp_map = [list(item) for item in SweeperAgent.loadMap(map_name)]
|
|
|
|
for row in range(MAP_SIZE):
|
|
for column, pos in enumerate(temp_map[row]):
|
|
if pos == ">" or pos == "<" or pos == "^" or pos == "v":
|
|
self.row = row
|
|
self.column = column
|
|
|
|
return row, column
|
|
|
|
# add orientation
|
|
|
|
def where_to_go(self, map_name = "goal_map.txt"):
|
|
temp_map = [list(item) for item in SweeperAgent.loadMap(map_name)]
|
|
|
|
for row in range(MAP_SIZE):
|
|
for column, pos in enumerate(temp_map[row]):
|
|
if pos == ">" or pos == "<" or pos == "^" or pos == "v":
|
|
self.row = row
|
|
self.column = column
|
|
|
|
return row, column
|
|
|
|
@staticmethod
|
|
def set_allowed(allowed_points, map_name = 'map.txt'):
|
|
temp_map = [list(item) for item in SweeperAgent.loadMap(map_name)]
|
|
|
|
a_row = 0
|
|
a_column = 0
|
|
|
|
for row in range(MAP_SIZE):
|
|
for column, pos in enumerate(temp_map[row]):
|
|
if pos == "." or pos == 'p' or pos == '>' or pos == '<' or pos == 'v' or pos == '^':
|
|
a_row = row
|
|
a_column = column
|
|
location = tuple([a_row, a_column])
|
|
allowed_points.add(location)
|
|
|
|
@staticmethod
|
|
def set_allowed_for_genetic(allowed_points, map_location):
|
|
temp_map = [list(item) for item in SweeperAgent.loadMap(map_location)]
|
|
|
|
a_row = 0
|
|
a_column = 0
|
|
|
|
for row in range(MAP_SIZE):
|
|
for column, pos in enumerate(temp_map[row]):
|
|
if pos == "." or pos == 'x':
|
|
a_row = row
|
|
a_column = column
|
|
location = tuple([a_row, a_column])
|
|
allowed_points.add(location)
|
|
|
|
|
|
@staticmethod
|
|
def set_puddles(puddle_points, map_name = 'map.txt'):
|
|
temp_map = [list(item) for item in SweeperAgent.loadMap(map_name)]
|
|
|
|
a_row = 0
|
|
a_column = 0
|
|
|
|
for row in range(MAP_SIZE):
|
|
for column, pos in enumerate(temp_map[row]):
|
|
if pos == "p" :
|
|
a_row = row
|
|
a_column = column
|
|
location = tuple([a_row, a_column])
|
|
puddle_points.add(location)
|
|
|
|
|
|
@staticmethod
|
|
def get_goal(map_name = 'goal_map.txt'):
|
|
temp_map = [list(item) for item in SweeperAgent.loadMap(map_name)]
|
|
|
|
a_row = 0
|
|
a_column = 0
|
|
|
|
for row in range(MAP_SIZE):
|
|
for column, pos in enumerate(temp_map[row]):
|
|
if pos == '>' or pos == '<' or pos == 'v' or pos == '^':
|
|
a_row = row
|
|
a_column = column
|
|
return a_row, a_column
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
def set_initial(initial):
|
|
temp_map = [list(item) for item in SweeperAgent.loadMap('map.txt')]
|
|
|
|
a_row = 0
|
|
a_column = 0
|
|
|
|
for row in range(MAP_SIZE):
|
|
for column, pos in enumerate(temp_map[row]):
|
|
if pos == '>' or pos == '<' or pos == 'v' or pos == '^':
|
|
a_row = row
|
|
a_column = column
|
|
location = tuple([a_row, a_column])
|
|
initial.add(location)
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
def set_orientation(map_name = 'map.txt'):
|
|
temp_map = [list(item) for item in SweeperAgent.loadMap(map_name)]
|
|
|
|
orientation = ""
|
|
|
|
for row in range(MAP_SIZE):
|
|
for column, pos in enumerate(temp_map[row]):
|
|
if pos == ">":
|
|
orientation = "RIGHT"
|
|
if pos == "<":
|
|
orientation = "LEFT"
|
|
if pos == "^":
|
|
orientation = "UP"
|
|
if pos == "v":
|
|
orientation = "DOWN"
|
|
|
|
return orientation
|
|
|
|
@staticmethod
|
|
def set_goal_orientation(map_name = 'goal_map.txt'):
|
|
temp_map = [list(item) for item in SweeperAgent.loadMap(map_name)]
|
|
|
|
orientation = ""
|
|
|
|
for row in range(MAP_SIZE):
|
|
for column, pos in enumerate(temp_map[row]):
|
|
if pos == ">":
|
|
orientation = "RIGHT"
|
|
if pos == "<":
|
|
orientation = "LEFT"
|
|
if pos == "^":
|
|
orientation = "UP"
|
|
if pos == "v":
|
|
orientation = "DOWN"
|
|
|
|
return orientation
|
|
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
def run_manual(self,
|
|
given_orientation,
|
|
given_goal_orientation,
|
|
x_my_location,
|
|
y_my_location,
|
|
x_goal_location,
|
|
y_goal_location,
|
|
map_location
|
|
):
|
|
|
|
self.orientation = given_orientation
|
|
goal_orientation = given_goal_orientation
|
|
|
|
SweeperAgent.set_allowed(self.set_allowed_for_genetic(map_location))
|
|
|
|
x = x_my_location
|
|
y = y_my_location
|
|
|
|
x1 = x_goal_location
|
|
x2 = y_goal_location
|
|
|
|
agent_position = AgentPosition(x, y, self.orientation)
|
|
goal_position = AgentPosition(x1, x2, goal_orientation)
|
|
|
|
return len(self.plan_route(agent_position, goal_position, self.allowed_points, self.puddle_points))
|
|
|
|
|
|
@staticmethod
|
|
def run_manual_map(self,start_map, goal_map):
|
|
self.orientation = SweeperAgent.set_orientation(start_map)
|
|
goal_orientation = SweeperAgent.set_goal_orientation(goal_map)
|
|
|
|
SweeperAgent.set_allowed(self.allowed_points, start_map)
|
|
SweeperAgent.set_puddles(self.puddle_points, start_map)
|
|
|
|
x, y = self.where_am_i(start_map)
|
|
x1, y1 = SweeperAgent.get_goal(goal_map)
|
|
|
|
agent_position = AgentPosition(x, y, self.orientation)
|
|
goal_position = AgentPosition(x1, y1, goal_orientation)
|
|
|
|
return self.plan_route(agent_position, goal_position, self.allowed_points, self.puddle_points)
|
|
|
|
|
|
|
|
@staticmethod
|
|
def run(self):
|
|
self.orientation = SweeperAgent.set_orientation()
|
|
goal_orientation = SweeperAgent.set_goal_orientation()
|
|
#SweeperAgent.set_initial(self.initial)
|
|
#SweeperAgent.set_goal(self.goal)
|
|
SweeperAgent.set_allowed(self.allowed_points)
|
|
SweeperAgent.set_puddles(self.puddle_points)
|
|
|
|
x, y = self.where_am_i()
|
|
x1, y1 = SweeperAgent.get_goal()
|
|
|
|
agent_position = AgentPosition(x, y, self.orientation)
|
|
goal_position = AgentPosition(x1, y1, goal_orientation)
|
|
|
|
return self.plan_route(agent_position, goal_position, self.allowed_points, self.puddle_points)
|
|
|
|
|
|
|
|
"""print("allowed: ")
|
|
print("(row, column)")
|
|
print(sorted(self.allowed_points))
|
|
print("puddles:")
|
|
print(sorted(self.puddle_points))
|
|
print("initial:")
|
|
print(self.initial)
|
|
print("goal:")
|
|
print(self.goal)
|
|
print("orientation:")
|
|
print(self.orientation)"""
|
|
|
|
|
|
def plan_route(self, current, goals, allowed, puddles):
|
|
problem = PlanRoute(current, goals, allowed, puddles, MAP_SIZE-1)
|
|
return SweeperAgent.astar_search(problem, problem.h)
|
|
#return SweeperAgent.astar_search(problem, problem.h)
|
|
|
|
|
|
|
|
|
|
|
|
"""TODO"""
|
|
# liczenie kosztów
|
|
#
|
|
|
|
@staticmethod
|
|
def loadMap(map_name=''):
|
|
maze = []
|
|
map_folder = path.dirname(__file__)
|
|
with open(path.join(map_folder, map_name), 'rt') as f:
|
|
for line in f:
|
|
maze.append(line.rstrip('\n'))
|
|
|
|
# print(maze)
|
|
return maze
|
|
|
|
@staticmethod
|
|
def astar_search(problem, h=None):
|
|
"""A* search is best-first graph search with f(n) = g(n)+h(n).
|
|
You need to specify the h function when you call astar_search, or
|
|
else in your Problem subclass."""
|
|
# h = memoize(h or problem.h, 'h')
|
|
return SweeperAgent.best_first_graph_search(problem, lambda n: n.path_cost + h(n))
|
|
#return best_first_graph_search(problem)
|
|
|
|
@staticmethod
|
|
#def best_first_graph_search(problem, f, display=False):
|
|
def best_first_graph_search(problem, f, display=True):
|
|
|
|
|
|
"""TODO"""
|
|
# Zaimplementować klasę Node dla Astar
|
|
|
|
history = []
|
|
|
|
node = Node(problem.initial)
|
|
frontier = PriorityQueue('min', f)
|
|
frontier.append(node)
|
|
explored = set()
|
|
while frontier:
|
|
node = frontier.pop()
|
|
if problem.goal_test(node.state):
|
|
if display:
|
|
print(len(explored), "paths have been expanded and", len(frontier), "paths remain in the frontier")
|
|
while(node.parent != None):
|
|
history.append(node.action)
|
|
node = node.parent
|
|
#return child
|
|
history.reverse()
|
|
print(history)
|
|
return history
|
|
#return history
|
|
#break
|
|
#return node
|
|
#break
|
|
explored.add(copy.deepcopy(node.state))
|
|
test_child_chamber = node.expand(problem)
|
|
for child in node.expand(problem):
|
|
if child.state not in explored and child not in frontier:
|
|
frontier.append(copy.deepcopy(child))
|
|
elif child in frontier:
|
|
if f(child) < frontier[child]:
|
|
del frontier[child]
|
|
frontier.append(child)
|
|
|
|
return history
|
|
#return None
|
|
|
|
|
|
|
|
class PriorityQueue:
|
|
"""A Queue in which the minimum (or maximum) element (as determined by f and
|
|
order) is returned first.
|
|
If order is 'min', the item with minimum f(x) is
|
|
returned first; if order is 'max', then it is the item with maximum f(x).
|
|
Also supports dict-like lookup."""
|
|
|
|
def __init__(self, order='min', f=lambda x: x):
|
|
self.heap = []
|
|
if order == 'min':
|
|
self.f = f
|
|
elif order == 'max': # now item with max f(x)
|
|
self.f = lambda x: -f(x) # will be popped first
|
|
else:
|
|
raise ValueError("Order must be either 'min' or 'max'.")
|
|
|
|
def append(self, item):
|
|
"""Insert item at its correct position."""
|
|
heapq.heappush(self.heap, (self.f(item), item))
|
|
|
|
def extend(self, items):
|
|
"""Insert each item in items at its correct position."""
|
|
for item in items:
|
|
self.append(item)
|
|
|
|
def pop(self):
|
|
"""Pop and return the item (with min or max f(x) value)
|
|
depending on the order."""
|
|
if self.heap:
|
|
return heapq.heappop(self.heap)[1]
|
|
else:
|
|
raise Exception('Trying to pop from empty PriorityQueue.')
|
|
|
|
def __len__(self):
|
|
"""Return current capacity of PriorityQueue."""
|
|
return len(self.heap)
|
|
|
|
def __contains__(self, key):
|
|
"""Return True if the key is in PriorityQueue."""
|
|
return any([item == key for _, item in self.heap])
|
|
|
|
def __getitem__(self, key):
|
|
"""Returns the first value associated with key in PriorityQueue.
|
|
Raises KeyError if key is not present."""
|
|
for value, item in self.heap:
|
|
if item == key:
|
|
return value
|
|
raise KeyError(str(key) + " is not in the priority queue")
|
|
|
|
def __delitem__(self, key):
|
|
"""Delete the first occurrence of key."""
|
|
try:
|
|
del self.heap[[item == key for _, item in self.heap].index(True)]
|
|
except ValueError:
|
|
raise KeyError(str(key) + " is not in the priority queue")
|
|
heapq.heapify(self.heap)
|
|
|
|
|
|
class Test:
|
|
@staticmethod
|
|
def run():
|
|
|
|
allowed_points = set()
|
|
puddle_points = set()
|
|
|
|
initial = set()
|
|
goal = set()
|
|
|
|
orientation = SweeperAgent.set_orientation()
|
|
|
|
SweeperAgent.set_initial(initial)
|
|
SweeperAgent.set_goal(goal)
|
|
|
|
SweeperAgent.set_allowed(allowed_points)
|
|
SweeperAgent.set_puddles(puddle_points)
|
|
|
|
|
|
|
|
print("allowed: ")
|
|
print("(row, column)")
|
|
print(sorted(allowed_points))
|
|
print("puddles:")
|
|
print(sorted(puddle_points))
|
|
print("initial:")
|
|
print(initial)
|
|
print("goal:")
|
|
print(goal)
|
|
print("orientation:")
|
|
print(orientation)
|
|
|
|
|