149 lines
4.2 KiB
Python
149 lines
4.2 KiB
Python
import heapq
|
|
|
|
from world import set_world
|
|
|
|
|
|
class PriorityQueue:
|
|
def __init__(self, order='min', f=lambda x: x):
|
|
self.queue = []
|
|
if order == 'min':
|
|
self.f = f
|
|
|
|
def insert(self, item):
|
|
heapq.heappush(self.queue, (self.f(item), item))
|
|
|
|
def pop(self):
|
|
if self.queue:
|
|
return heapq.heappop(self.queue)[1]
|
|
|
|
def __len__(self):
|
|
return len(self.queue)
|
|
|
|
|
|
def __contains__(self, item):
|
|
return (self.f(item), item) in self.queue
|
|
|
|
|
|
class Problem(object):
|
|
def __init__(self, initial, goal=None):
|
|
self.initial = initial
|
|
self.goal = goal
|
|
|
|
def path_cost(self, c, state1, action, state2):
|
|
world_map_file = set_world('worldmap.txt')
|
|
l = state2[1]
|
|
k = state2[0]
|
|
return int(world_map_file[l][k])
|
|
|
|
|
|
class Node:
|
|
def __init__(self, state, parent=None, action=None, path_cost=0):
|
|
self.state = state
|
|
self.parent = parent
|
|
self.action = action
|
|
self.path_cost = path_cost
|
|
self.depth = 0
|
|
if parent:
|
|
self.depth = parent.depth + 1
|
|
|
|
def __lt__(self, node):
|
|
return self.state < node.state
|
|
|
|
def child_node(self, problem, action):
|
|
next = problem.result(self.state, action)
|
|
return Node(next, self, action, problem.path_cost(self.path_cost, self.state, action, next))
|
|
|
|
def expand(self, route_planning):
|
|
return [self.child_node(route_planning, action) for action in route_planning.actions(self.state)]
|
|
|
|
def path(self):
|
|
node, parents_path = self, []
|
|
while node:
|
|
parents_path.append(node)
|
|
node = node.parent
|
|
return list(reversed(parents_path))
|
|
|
|
def reconstruct_path(self):
|
|
return [node.action for node in self.path()[1:]]
|
|
|
|
|
|
class RoutePlanning(Problem):
|
|
def __init__(self, initial, goal, paths):
|
|
Problem.__init__(self, initial, goal)
|
|
self.x_size = 20
|
|
self.y_size = 14
|
|
self.paths = paths
|
|
|
|
def goaltest(self, state):
|
|
return state == self.goal
|
|
|
|
def actions(self, state):
|
|
actions = ["Go up", "Go left", "Go down", "Go right"]
|
|
x, y = state
|
|
if y == 0:
|
|
if "Go up" in actions:
|
|
actions.remove("Go up")
|
|
if x == 0:
|
|
if "Go left" in actions:
|
|
actions.remove("Go left")
|
|
if y == (self.y_size - 1):
|
|
if "Go down" in actions:
|
|
actions.remove("Go down")
|
|
if x == (self.x_size - 1):
|
|
if "Go right" in actions:
|
|
actions.remove("Go right")
|
|
return actions
|
|
|
|
def result(self, state, action):
|
|
x, y = state
|
|
location = tuple()
|
|
|
|
if action == "Go up":
|
|
location = (x, y - 1)
|
|
elif action == "Go down":
|
|
location = (x, y + 1)
|
|
elif action == "Go left":
|
|
location = (x - 1, y)
|
|
elif action == "Go right":
|
|
location = (x + 1, y)
|
|
if location in self.paths:
|
|
state = location
|
|
return state
|
|
|
|
def heuristic(self, node):
|
|
pos_x1, pos_y1 = node.state
|
|
pos_x2, pos_y2 = self.goal
|
|
return abs(pos_x2 - pos_x1) + abs(pos_y2 - pos_y1)
|
|
|
|
|
|
def graphsearch(problem, f):
|
|
fringe = PriorityQueue('min', f)
|
|
explored = set()
|
|
|
|
node = Node(problem.initial)
|
|
fringe.insert(node)
|
|
while fringe:
|
|
node = fringe.pop()
|
|
if problem.goaltest(node.state):
|
|
return node
|
|
explored.add(node.state)
|
|
for child in node.expand(problem):
|
|
if child.state not in explored and child not in fringe: #cost2 to estimated cost
|
|
fringe.insert(child)
|
|
elif child in fringe:
|
|
current_path = fringe[child]
|
|
cost2 = f(current_path)
|
|
cost1 = f(child)
|
|
if cost2 >= cost1:
|
|
del fringe[current_path]
|
|
fringe.insert(child)
|
|
|
|
return None
|
|
|
|
|
|
def astar(route_planning):
|
|
heuristic = route_planning.heuristic
|
|
fun = lambda x: x.path_cost + heuristic(x)
|
|
return graphsearch(route_planning, fun)
|
|
|