#!/usr/bin/env python3 import asyncio import time from Graph import Graph class Tractor: sleep_time = 0.1 graph = Graph() moved_flag = False directions = ['N', 'W', 'S', 'E'] def __init__(self, current_location, current_rotation): self.current_location = current_location self.current_rotation = current_rotation self.last_node = current_rotation def heuristic(self, a, b): (x1, y1) = a (x2, y2) = b return abs(x1 - x2) + abs(y1 - y2) async def rotate(self, direction): reader, writer = await asyncio.open_connection('127.0.0.1', 8888) writer.write(("rotate " + direction + "\n").encode()) await reader.readline() time.sleep(self.sleep_time) writer.close() async def look_at_plants(self): reader, writer = await asyncio.open_connection('127.0.0.1', 8888) writer.write(("rotate " + direction + "\n").encode()) await reader.readline() time.sleep(self.sleep_time) writer.close() async def try_move(self): reader, writer = await asyncio.open_connection('127.0.0.1', 8888) writer.write("try\n".encode()) data = await reader.readline() result = data.decode() time.sleep(self.sleep_time) writer.close() return result async def move(self): reader, writer = await asyncio.open_connection('127.0.0.1', 8888) writer.write("move\n".encode()) await reader.readline() time.sleep(self.sleep_time) writer.close() async def move_to_neighbor(self, current, neighbor, direction): x = current[0] - neighbor[0] y = current[1] - neighbor[1] if x > 0: final_direction = 'N' elif x < 0: final_direction = 'S' elif y > 0: final_direction = 'W' else: final_direction = 'E' if final_direction != direction: await self.rotate(final_direction) await self.move() return final_direction def bfs_shortest_path(self, start, goal): explored = [] queue = [[start]] if start == goal: return [] while queue: path = queue.pop(0) node = path[-1] if node not in explored: neighbours = self.graph.nodes[node][0] for neighbour in neighbours: new_path = list(path) new_path.append(neighbour) queue.append(new_path) if neighbour == goal: return new_path explored.append(node) async def move_to_current_location(self, last_location): if last_location == self.current_location: return self.current_rotation path = self.bfs_shortest_path(last_location, self.current_location) path = path[1:] pos = last_location final_direction = self.current_rotation for node in path: final_direction = await self.move_to_neighbor(pos, node, final_direction) pos = node self.moved_flag = True return final_direction async def look_around(self): for i in range(4): if i == 2: continue direction = self.directions[(self.directions.index(self.current_rotation) + i) % 4] await self.rotate(direction) result = await self.try_move() if result == "OK\n": self.graph.add_neighbor(self.current_location, direction) self.current_rotation = self.directions[(self.directions.index(self.current_rotation) - 1) % 4] def find_optimal(self, frontier, last_location): index = 0 cost = frontier[0][1] + len(self.bfs_shortest_path(last_location, frontier[0][0])) for i, n in enumerate(frontier): tmp_cost = n[1] + len(self.bfs_shortest_path(last_location, n[0])) if tmp_cost < cost: cost = tmp_cost index = i return index def mark_as_visited(self): tmp_set = self.graph.nodes[self.current_location][0] self.graph.nodes[self.current_location] = (tmp_set, True) async def move_tractor(self, goal): start_position = self.current_location last_location = start_position frontier = [] frontier.append((start_position, 0)) cost_so_far = {} cost_so_far[start_position] = 0 await self.look_around() self.mark_as_visited() while frontier: optimal_index = self.find_optimal(frontier, last_location) self.current_location = frontier.pop(optimal_index)[0] self.current_rotation = await self.move_to_current_location(last_location) last_location = self.current_location if self.moved_flag and not self.graph.nodes[self.current_location][1]: await self.look_around() self.mark_as_visited() self.last_node = self.directions[(self.directions.index(self.current_rotation) - 2) % 4] if self.current_location == goal: break for next in self.graph.neighbors(self.current_location): new_cost = cost_so_far[self.current_location] + 1 if next not in cost_so_far or new_cost < cost_so_far[next]: cost_so_far[next] = new_cost priority = new_cost + self.heuristic(goal, next) frontier.append((next, priority)) async def run(self): start_position = self.current_location # await self.move_tractor(start_position) while True: visited, stack = set(), [start_position] while stack: vertex = stack.pop() if vertex not in visited: visited.add(vertex) await self.move_tractor(vertex) self.current_location = vertex stack.extend(self.graph.nodes[vertex][0] - visited) await self.service() await self.move_tractor(start_position) async def service(self): "tutaj trzeba zapytac env z jakimi roslinami sie styka traktor i je obsłuzyc" pass if __name__ == "__main__": start = (0,0) goal = (10,6) tractor = Tractor((0,0), 'N') # asyncio.run(tractor.move_tractor(start, goal)) asyncio.run(tractor.run())