Improvement on code + path creation
This commit is contained in:
parent
af7027a90f
commit
8b1e390e6b
74
AI_brain/genetic_algorytm.py
Normal file
74
AI_brain/genetic_algorytm.py
Normal file
@ -0,0 +1,74 @@
|
||||
import random
|
||||
import configparser
|
||||
import math
|
||||
from domain.entities.entity import Entity
|
||||
|
||||
|
||||
config = configparser.ConfigParser()
|
||||
config.read("config.ini")
|
||||
|
||||
from domain.world import World
|
||||
from AI_brain.rotate_and_go_aStar import RotateAndGoAStar, State
|
||||
|
||||
|
||||
steps_distance_cashed = {}
|
||||
|
||||
|
||||
class Path:
|
||||
def __init__(self):
|
||||
self.walk = []
|
||||
self.distance = 0
|
||||
|
||||
def random_walk(self, dusts: list[Entity]):
|
||||
random_permutation = generate_random_permutation(len(dusts))
|
||||
self.walk = addStopsForStopStation(
|
||||
random_permutation, config.getint("CONSTANT", "BananaFilling")
|
||||
)
|
||||
|
||||
def calculate_distance(self, world: World):
|
||||
distance = 0
|
||||
for i in range(len(self.walk) - 1):
|
||||
distance += self.step_distance(self.walk[i], self.walk[i + 1], world)
|
||||
self.distance = distance
|
||||
|
||||
def step_distance(self, from_id: int, to_id: int, world: World) -> int:
|
||||
if (from_id, to_id) in steps_distance_cashed:
|
||||
return steps_distance_cashed[(from_id, to_id)]
|
||||
|
||||
path_searcher = RotateAndGoAStar(
|
||||
world,
|
||||
self.getPosition(from_id, world.dustList, world.doc_station),
|
||||
self.getPosition(to_id, world.dustList, world.doc_station),
|
||||
)
|
||||
path_searcher.search()
|
||||
number_of_go = path_searcher.number_of_moves_forward()
|
||||
steps_distance_cashed[(from_id, to_id)] = path_searcher.cost
|
||||
steps_distance_cashed[(to_id, from_id)] = path_searcher.cost
|
||||
return path_searcher.cost
|
||||
|
||||
def getPosition(self, number: int, dusts: list[Entity], station: Entity) -> State:
|
||||
if number == -1:
|
||||
return State(station.x, station.y)
|
||||
|
||||
return State(dusts[number].x, dusts[number].y)
|
||||
|
||||
|
||||
def generate_random_permutation(n):
|
||||
# Create a list of numbers from 1 to n
|
||||
numbers = list(range(0, n))
|
||||
|
||||
# Shuffle the list using the random.shuffle function
|
||||
random.shuffle(numbers)
|
||||
|
||||
return numbers
|
||||
|
||||
|
||||
def addStopsForStopStation(permutation: list[int], bananaFilling: int):
|
||||
frequency = math.ceil(100 / bananaFilling)
|
||||
numer_of_stops = math.ceil(len(permutation) / frequency)
|
||||
|
||||
for i in range(1, numer_of_stops):
|
||||
permutation.insert((frequency + 1) * i - 1, -1)
|
||||
permutation.insert(len(permutation), -1)
|
||||
|
||||
return permutation
|
@ -3,11 +3,10 @@ from domain.world import World
|
||||
|
||||
|
||||
class State:
|
||||
def __init__(self, x, y, direction=(1, 0), entity=None):
|
||||
def __init__(self, x: int, y: int, direction=(1, 0), entity=None):
|
||||
self.x = x
|
||||
self.y = y
|
||||
self.direction = direction
|
||||
|
||||
|
||||
def __hash__(self):
|
||||
return hash((self.x, self.y))
|
||||
@ -19,7 +18,7 @@ class State:
|
||||
and self.direction == other.direction
|
||||
)
|
||||
|
||||
def heuristic(self, goal_state):
|
||||
def heuristic(self, goal_state) -> int:
|
||||
return abs(self.x - goal_state.x) + abs(self.y - goal_state.y)
|
||||
|
||||
|
||||
@ -53,19 +52,19 @@ class RotateAndGoAStar:
|
||||
self.enqueued_states = set()
|
||||
self.explored = set()
|
||||
self.actions = []
|
||||
self.cost = 0
|
||||
|
||||
def get_g_score(self, state):
|
||||
def get_g_score(self, state) -> int:
|
||||
return self.world.get_cost(state.x, state.y)
|
||||
|
||||
def search(self):
|
||||
heapq.heappush(
|
||||
self.fringe, Node(self.start_state, 0, self.goal_state)
|
||||
)
|
||||
heapq.heappush(self.fringe, Node(self.start_state, 0, self.goal_state))
|
||||
|
||||
while self.fringe:
|
||||
elem = heapq.heappop(self.fringe)
|
||||
elem: Node = heapq.heappop(self.fringe)
|
||||
if self.is_goal(elem.state):
|
||||
self.actions = action_sequence(elem)
|
||||
self.cost = elem.g_score
|
||||
return True
|
||||
self.explored.add(elem.state)
|
||||
|
||||
@ -73,7 +72,7 @@ class RotateAndGoAStar:
|
||||
if state in self.explored:
|
||||
continue
|
||||
|
||||
new_g_score = new_g_score = elem.g_score + self.world.get_cost(state.x, state.y)
|
||||
new_g_score = elem.g_score + self.world.get_cost(state.x, state.y)
|
||||
if state not in self.enqueued_states:
|
||||
next_node = Node(state, new_g_score, self.goal_state)
|
||||
next_node.action = action
|
||||
@ -84,12 +83,12 @@ class RotateAndGoAStar:
|
||||
for node in self.fringe:
|
||||
if node.state == state:
|
||||
node.g_score = new_g_score
|
||||
node.f_score = (
|
||||
new_g_score + node.state.heuristic(self.goal_state)
|
||||
node.f_score = new_g_score + node.state.heuristic(
|
||||
self.goal_state
|
||||
)
|
||||
node.parent = elem
|
||||
node.action = action
|
||||
heapq.heapify(self.fringe)
|
||||
heapq.heapify(self.fringe)
|
||||
break
|
||||
|
||||
return False
|
||||
@ -102,12 +101,12 @@ class RotateAndGoAStar:
|
||||
next_x = state.x + state.direction[0]
|
||||
next_y = state.y + state.direction[1]
|
||||
if self.world.accepted_move(next_x, next_y):
|
||||
new_successors.append(
|
||||
("GO", State(next_x, next_y, state.direction))
|
||||
)
|
||||
new_successors.append(("GO", State(next_x, next_y, state.direction)))
|
||||
return new_successors
|
||||
|
||||
|
||||
def is_goal(self, state: State) -> bool:
|
||||
return (
|
||||
state.x == self.goal_state.x
|
||||
and state.y == self.goal_state.y )
|
||||
return state.x == self.goal_state.x and state.y == self.goal_state.y
|
||||
|
||||
def number_of_moves_forward(self) -> int:
|
||||
go_count = self.actions.count("GO")
|
||||
return go_count
|
||||
|
@ -4,9 +4,14 @@ movement = robot
|
||||
#accept: human, robot
|
||||
|
||||
[CONSTANT]
|
||||
NumberOfBananas = 10
|
||||
NumberOfBananas = 5
|
||||
NumberOfEarrings = 3
|
||||
NumberOfPlants = 5
|
||||
BananaFilling = 25
|
||||
|
||||
[NEURAL_NETWORK]
|
||||
is_neural_network_off = True
|
||||
is_neural_network_off = True
|
||||
|
||||
[AI_BRAIN]
|
||||
mode = full_clean
|
||||
#accept: full_clean, to_station
|
@ -21,12 +21,12 @@ class VacuumMoveCommand(Command):
|
||||
if not self.world.accepted_move(end_x, end_y):
|
||||
return
|
||||
|
||||
tmp = self.world.is_garbage_at(end_x, end_y)
|
||||
if len(tmp) > 0:
|
||||
for t in tmp:
|
||||
garbage = self.world.garbage_at(end_x, end_y)
|
||||
if len(garbage) > 0:
|
||||
for item in garbage:
|
||||
if self.vacuum.get_container_filling() < 100:
|
||||
self.vacuum.increase_container_filling()
|
||||
self.world.dust[end_x][end_y].remove(t)
|
||||
self.world.delete_entities_at_Of_type(item.x, item.y, item.type)
|
||||
|
||||
if self.world.is_docking_station_at(end_x, end_y):
|
||||
self.vacuum.dump_trash()
|
||||
|
@ -1,5 +1,9 @@
|
||||
from domain.entities.entity import Entity
|
||||
from domain.world import World
|
||||
import configparser
|
||||
|
||||
config = configparser.ConfigParser()
|
||||
config.read("config.ini")
|
||||
|
||||
|
||||
class Vacuum(Entity):
|
||||
@ -11,7 +15,7 @@ class Vacuum(Entity):
|
||||
self.container_filling = 0
|
||||
|
||||
def increase_container_filling(self) -> None:
|
||||
self.container_filling += 5
|
||||
self.container_filling += config.getint("CONSTANT", "BananaFilling")
|
||||
|
||||
def dump_trash(self) -> None:
|
||||
self.container_filling = 0
|
||||
|
@ -8,7 +8,9 @@ class World:
|
||||
self.width = width
|
||||
self.height = height
|
||||
self.dust = [[[] for j in range(height)] for i in range(width)]
|
||||
self.dustList = []
|
||||
self.obstacles = [[[] for j in range(height)] for i in range(width)]
|
||||
self.entity = [[[] for j in range(height)] for i in range(width)]
|
||||
|
||||
self.vacuum = None
|
||||
self.cat = None
|
||||
@ -19,8 +21,10 @@ class World:
|
||||
self.doc_station = entity
|
||||
elif entity.type == "PEEL":
|
||||
self.dust[entity.x][entity.y].append(entity)
|
||||
self.dustList.append(Entity(entity.x, entity.y, "PEEL"))
|
||||
elif entity.type == "EARRING":
|
||||
self.dust[entity.x][entity.y].append(entity)
|
||||
self.dustList.append(Entity(entity.x, entity.y, "EARRING"))
|
||||
elif entity.type == "VACUUM":
|
||||
self.vacuum = entity
|
||||
elif entity.type == "CAT":
|
||||
@ -29,10 +33,27 @@ class World:
|
||||
else:
|
||||
self.obstacles[entity.x][entity.y].append(entity)
|
||||
|
||||
self.entity[entity.x][entity.y].append(entity)
|
||||
|
||||
def is_entity_at(
|
||||
self,
|
||||
x: int,
|
||||
y: int,
|
||||
) -> bool:
|
||||
if len(self.entity[x][y]) > 0:
|
||||
return True
|
||||
return False
|
||||
|
||||
def delete_entities_at_Of_type(self, x: int, y: int, type: str):
|
||||
entities = self.entity[x][y]
|
||||
for entity in entities:
|
||||
if entity.type == type:
|
||||
entities.remove(entity)
|
||||
|
||||
def is_obstacle_at(self, x: int, y: int) -> bool:
|
||||
return bool(self.obstacles[x][y])
|
||||
|
||||
def is_garbage_at(self, x: int, y: int):
|
||||
def garbage_at(self, x: int, y: int) -> list[Entity]:
|
||||
if len(self.dust[x][y]) == 0:
|
||||
return []
|
||||
return [i for i in self.dust[x][y] if evaluate([i.properties])[0] == 1]
|
||||
@ -54,5 +75,5 @@ class World:
|
||||
|
||||
return True
|
||||
|
||||
def get_cost(self, x, y):
|
||||
def get_cost(self, x, y) -> float:
|
||||
return self.costs[x][y]
|
||||
|
50
main.py
50
main.py
@ -16,6 +16,7 @@ from domain.entities.earring import Earring
|
||||
from domain.entities.docking_station import Doc_Station
|
||||
from domain.world import World
|
||||
from view.renderer import Renderer
|
||||
from AI_brain.genetic_algorytm import Path
|
||||
|
||||
if not config.getboolean("NEURAL_NETWORK", "is_neural_network_off"):
|
||||
from AI_brain.image_recognition import VacuumRecognizer
|
||||
@ -52,24 +53,38 @@ class Main:
|
||||
def run_robot(self):
|
||||
self.renderer.render(self.world)
|
||||
|
||||
start_state = State(self.world.vacuum.x, self.world.vacuum.y)
|
||||
end_state = State(self.world.doc_station.x, self.world.doc_station.y)
|
||||
|
||||
# path_searcher = GoAnyDirectionBFS(self.world, start_state, end_state)
|
||||
# path_searcher = RotateAndGoBFS(self.world, start_state, end_state)
|
||||
path_searcher = RotateAndGoAStar(self.world, start_state, end_state)
|
||||
if not path_searcher.search():
|
||||
print("No solution")
|
||||
|
||||
if config["AI_BRAIN"]["mode"] == "to_station":
|
||||
start_state = State(self.world.vacuum.x, self.world.vacuum.y)
|
||||
end_state = State(self.world.doc_station.x, self.world.doc_station.y)
|
||||
|
||||
path_searcher = RotateAndGoAStar(self.world, start_state, end_state)
|
||||
|
||||
if not path_searcher.search():
|
||||
print("No solution")
|
||||
exit(0)
|
||||
print(path_searcher.actions)
|
||||
print(path_searcher.cost)
|
||||
elif config["AI_BRAIN"]["mode"] == "full_clean":
|
||||
x = Path()
|
||||
x.random_walk(self.world.dustList)
|
||||
x.calculate_distance(self.world)
|
||||
print(x.walk)
|
||||
print(x.distance)
|
||||
exit(0)
|
||||
else:
|
||||
print("Wrong mode")
|
||||
exit(0)
|
||||
|
||||
path_searcher.actions.reverse()
|
||||
while self.running:
|
||||
for event in pygame.event.get():
|
||||
if event.type == pygame.QUIT:
|
||||
self.running = False
|
||||
|
||||
if len(path_searcher.actions) > 0:
|
||||
action_direction = path_searcher.actions.pop()
|
||||
action_direction = path_searcher.actions.pop(0)
|
||||
# self.handle_action1(action_direction)
|
||||
self.handle_action2(action_direction)
|
||||
|
||||
@ -148,15 +163,14 @@ class Main:
|
||||
def generate_world(tiles_x: int, tiles_y: int) -> World:
|
||||
if config.getboolean("NEURAL_NETWORK", "is_neural_network_off"):
|
||||
world = World(tiles_x, tiles_y)
|
||||
for _ in range(config.getint("CONSTANT", "NumberOfBananas")):
|
||||
temp_x = randint(0, tiles_x - 1)
|
||||
temp_y = randint(0, tiles_y - 1)
|
||||
world.add_entity(Garbage(temp_x, temp_y))
|
||||
world.vacuum = Vacuum(1, 1)
|
||||
world.doc_station = Doc_Station(9, 8)
|
||||
if config.getboolean("APP", "cat"):
|
||||
world.cat = Cat(7, 8)
|
||||
world.add_entity(world.cat)
|
||||
|
||||
world.add_entity(world.doc_station)
|
||||
world.add_entity(world.vacuum)
|
||||
world.add_entity(Entity(2, 8, "PLANT1"))
|
||||
world.add_entity(Entity(4, 1, "PLANT1"))
|
||||
world.add_entity(Entity(3, 4, "PLANT2"))
|
||||
@ -165,6 +179,16 @@ def generate_world(tiles_x: int, tiles_y: int) -> World:
|
||||
world.add_entity(Earring(9, 7))
|
||||
world.add_entity(Earring(5, 5))
|
||||
world.add_entity(Earring(4, 6))
|
||||
|
||||
for _ in range(config.getint("CONSTANT", "NumberOfBananas")):
|
||||
temp_x = randint(0, tiles_x - 1)
|
||||
temp_y = randint(0, tiles_y - 1)
|
||||
|
||||
while world.is_entity_at(temp_x, temp_y):
|
||||
temp_x = randint(0, tiles_x - 1)
|
||||
temp_y = randint(0, tiles_y - 1)
|
||||
|
||||
world.add_entity(Garbage(temp_x, temp_y))
|
||||
else:
|
||||
|
||||
def world_adder(x, y, object, style=None):
|
||||
@ -207,7 +231,7 @@ def generate_world(tiles_x: int, tiles_y: int) -> World:
|
||||
|
||||
for x in range(world.width):
|
||||
for y in range(world.height):
|
||||
if world.is_garbage_at(x, y):
|
||||
if world.garbage_at(x, y):
|
||||
world.costs[x][y] = 1
|
||||
else:
|
||||
world.costs[x][y] = 10
|
||||
|
@ -94,7 +94,7 @@ class Renderer:
|
||||
self.tile_height + self.tile_height / 4,
|
||||
),
|
||||
),
|
||||
"EARRING": pygame.transform.scale(
|
||||
"EARRING": pygame.transform.scale(
|
||||
pygame.image.load("media/sprites/earrings.webp"),
|
||||
(
|
||||
self.tile_width + self.tile_width / 4,
|
||||
@ -115,16 +115,8 @@ class Renderer:
|
||||
self.render_board()
|
||||
for x in range(world.width):
|
||||
for y in range(world.height):
|
||||
for entity in world.dust[x][y]:
|
||||
for entity in world.entity[x][y]:
|
||||
self.draw_entity(entity)
|
||||
for x in range(world.width):
|
||||
for y in range(world.height):
|
||||
for entity in world.obstacles[x][y]:
|
||||
self.draw_entity(entity)
|
||||
self.draw_entity(world.vacuum)
|
||||
self.draw_entity(world.doc_station)
|
||||
if config.getboolean("APP", "cat"):
|
||||
self.draw_entity(world.cat)
|
||||
pygame.display.update()
|
||||
|
||||
def line(self, x_1, y_1, x_2, y_2, color=None):
|
||||
|
Loading…
Reference in New Issue
Block a user