335 lines
13 KiB
Python
335 lines
13 KiB
Python
import random
|
|
import tqdm
|
|
from copy import deepcopy
|
|
from functools import reduce
|
|
|
|
def genetic_algoritm(world_map: list, START: tuple, houses: list, pop_size = 200, rounds = 1000, tournament_size=20, verbose=0):
|
|
|
|
population = [Chromosome(START, world_map, houses) for _ in range(pop_size)]
|
|
|
|
for _ in tqdm.tqdm(range(rounds), position=0, leave=True):
|
|
|
|
candidates = random.sample(population, tournament_size)
|
|
parent_1 = get_best_parent(candidates)
|
|
|
|
candidates = random.sample(population, tournament_size)
|
|
parent_2 = get_best_parent(candidates)
|
|
|
|
offspring_1, offspring_2 = parent_1.cross_over(parent_2,verbose=verbose)
|
|
|
|
offspring_1.mutate(verbose=verbose)
|
|
offspring_2.mutate(verbose=verbose)
|
|
#Cykle czy nie
|
|
offspring_1.remove_cycles()
|
|
offspring_2.remove_cycles()
|
|
|
|
worst, worst_index = get_worst_chromosome(population)
|
|
if offspring_1.fitting_function() < worst.fitting_function():
|
|
# print(offspring_1.fitting_function())
|
|
|
|
population[worst_index] = offspring_1
|
|
|
|
worst, worst_index = get_worst_chromosome(population)
|
|
if offspring_2.fitting_function() < worst.fitting_function():
|
|
# print(offspring_2.fitting_function())
|
|
population[worst_index] = offspring_2
|
|
|
|
if verbose > 0:
|
|
print( get_best_parent(population).fitting_function(), get_worst_chromosome(population)[0].fitting_function())
|
|
|
|
|
|
# optymalne - najkrotsze z poprawnych
|
|
rozwiazanie = get_best_parent(population)
|
|
|
|
|
|
return rozwiazanie
|
|
|
|
|
|
class Chromosome():
|
|
def __init__(self, START: tuple, world_map: list, houses: list):
|
|
self.houses = houses
|
|
self.max_x = len(world_map)
|
|
self.max_y = len(world_map[0])
|
|
self.solution = [START]
|
|
self.world_map = world_map
|
|
|
|
visited_houses = set()
|
|
n_houses_to_visit = len(houses)
|
|
curr_x, curr_y = START
|
|
|
|
while len(visited_houses) < n_houses_to_visit:
|
|
# print(self.solution)
|
|
moved = False
|
|
while not moved:
|
|
move = random.randint(0, 3)
|
|
# 0 - R, 2 - L, 2 - D, 3 - U
|
|
if move == 0 and curr_x + 1 < self.max_x and self.world_map[curr_x + 1][curr_y] != '0':
|
|
self.solution.append((curr_x + 1, curr_y))
|
|
moved = True
|
|
elif move == 1 and curr_x - 1 >= 0 and self.world_map[curr_x - 1][curr_y] != '0':
|
|
self.solution.append((curr_x - 1, curr_y))
|
|
moved = True
|
|
elif move == 2 and curr_y + 1 < self.max_y and self.world_map[curr_x][curr_y + 1] != '0':
|
|
self.solution.append((curr_x, curr_y + 1))
|
|
moved = True
|
|
elif move == 3 and curr_y - 1 >= 0 and self.world_map[curr_x][curr_y - 1] != '0':
|
|
self.solution.append((curr_x, curr_y - 1))
|
|
moved = True
|
|
curr_x, curr_y = self.solution[-1]
|
|
if self.solution[-1] in houses and self.solution[-1] not in visited_houses:
|
|
visited_houses.add(self.solution[-1])
|
|
|
|
# print("Przed", self.solution)
|
|
# print("FF", self.fitting_function(verbose=1))
|
|
self.remove_cycles()
|
|
# print("Po", self.solution)
|
|
# print("FF", self.fitting_function(verbose=1))
|
|
|
|
|
|
|
|
|
|
|
|
def remove_cycles(self):
|
|
#modyfikacja orgfinalnego rozwiazania poprzez usuniecie cyklu bez nieodwiedzonego domku
|
|
#Droga podzielona na frafgmenty od domku do domku
|
|
try:
|
|
checkpoints = sorted([self.solution.index(house) for house in self.houses])
|
|
subsolutions = [self.solution[indx_1:indx_2] for indx_1,indx_2 in zip([0,*checkpoints], [*checkpoints, len(self.solution)])]
|
|
self.solution = reduce(lambda z, y :z + y, self._remove_cycles(subsolutions))
|
|
except:
|
|
pass
|
|
|
|
def fitting_function(self, verbose=0):
|
|
# Rozwiazania nieoprawne
|
|
# nie odwiedza wszystkich domow
|
|
for house in self.houses:
|
|
if house not in self.solution:
|
|
if verbose > 0:
|
|
print("Not visiting all the houses!")
|
|
return 100 * self.max_x * self.max_y
|
|
|
|
# Odwiedza miejsca poza mapą albo niedozwolone
|
|
for x, y in self.solution:
|
|
if x >= self.max_x or x < 0 or y >= self.max_y or y < 0 or self.world_map[x][y] == 0:
|
|
if verbose > 0:
|
|
print("Wrong!")
|
|
return 100 * self.max_x * self.max_y
|
|
|
|
#Sprawdzamy czy droga jest spojna, czy mozna ja wykonac krok po kroku
|
|
for index in range(len(self.solution)-1):
|
|
if abs(self.solution[index][0] - self.solution[index + 1][0]) + abs(self.solution[index][1] - self.solution[index + 1][1]) != 1:
|
|
if verbose > 0:
|
|
print("Not continuous!")
|
|
return 100 * self.max_x * self.max_y
|
|
#Actual fitting value
|
|
return len(self.solution)
|
|
|
|
def mutate(self, patience=10, verbose = 0):
|
|
|
|
mutation_point = random.randint(1, len(self.solution) - 3)
|
|
options = [0, 1, 2, 3]
|
|
|
|
|
|
|
|
curr_x, curr_y = self.solution[mutation_point]
|
|
next_x, next_y = self.solution[mutation_point + 1]
|
|
|
|
if curr_x < next_x:
|
|
move = 0
|
|
elif curr_x > next_x:
|
|
move = 1
|
|
elif curr_y < next_y:
|
|
move = 2
|
|
else:
|
|
move = 3
|
|
|
|
|
|
while patience > 0:
|
|
curr_x, curr_y = self.solution[mutation_point]
|
|
options.remove(move)
|
|
moved = False
|
|
while not moved:
|
|
move = random.choice(options)
|
|
options.remove(move)
|
|
if move == 0 and curr_x + 1 < self.max_x and self.world_map[curr_x + 1][curr_y] != '0':
|
|
self.solution[mutation_point + 1] = (curr_x + 1, curr_y)
|
|
moved = True
|
|
elif move == 1 and curr_x - 1 >= 0 and self.world_map[curr_x - 1][curr_y] != '0':
|
|
self.solution[mutation_point + 1] = (curr_x - 1, curr_y)
|
|
moved = True
|
|
elif move == 2 and curr_y + 1 < self.max_y and self.world_map[curr_x][curr_y + 1] != '0':
|
|
self.solution[mutation_point + 1] = (curr_x, curr_y + 1)
|
|
moved = True
|
|
elif move == 3 and curr_y - 1 >= 0 and self.world_map[curr_x][curr_y - 1] != '0':
|
|
self.solution[mutation_point + 1] = (curr_x, curr_y - 1)
|
|
moved = True
|
|
if len(options) == 0 and not moved:
|
|
return
|
|
|
|
# Szukamy miejsca aby zespolic droge
|
|
if verbose > 0:
|
|
print("\tMade one step in a different direction.")
|
|
# while not done:
|
|
for indx, step in zip(range(mutation_point + 2, len(self.solution)), self.solution[mutation_point + 2:]):
|
|
# jesli krok, ktory wykonalismy jest gdzies na liscie
|
|
if step == self.solution[mutation_point + 1]:
|
|
# print("Change")
|
|
# print("Old:", self.solution)
|
|
new_solution = self.solution[:mutation_point + 1]
|
|
new_solution.extend(self.solution[indx:])
|
|
if verbose > 0:
|
|
print(self.solution[mutation_point:mutation_point + patience])
|
|
self.solution = new_solution
|
|
if verbose > 0:
|
|
print(self.solution[mutation_point:mutation_point+patience])
|
|
return
|
|
if mutation_point < len(self.solution) - 2:
|
|
mutation_point = mutation_point + 1
|
|
else:
|
|
return
|
|
patience = patience - 1
|
|
options = [0, 1, 2, 3]
|
|
move = move + 1 if move % 2 == 0 else move - 1
|
|
|
|
def cross_over(self, other, verbose=0):
|
|
offspring_a = deepcopy(self)
|
|
offspring_b = deepcopy(other)
|
|
|
|
#Select 2 random houses
|
|
|
|
house_k_index, house_m_index = random.sample(range(len(self.houses)), 2)
|
|
if verbose > 0:
|
|
print(house_k_index, house_m_index)
|
|
|
|
a_house_k_index = self.solution.index(self.houses[house_k_index])
|
|
a_house_m_index = max(loc for loc, val in enumerate(self.solution) if val == self.houses[house_m_index])
|
|
# a_house_m_index = self.solution.index(self.houses[house_m_index])
|
|
b_house_k_index = other.solution.index(self.houses[house_k_index])
|
|
b_house_m_index = max(loc for loc, val in enumerate(other.solution) if val == self.houses[house_m_index])
|
|
# b_house_m_index = other.solution.index(self.houses[house_m_index])
|
|
|
|
# offspring_a
|
|
new_a_solution = offspring_a.solution[:a_house_k_index]
|
|
if b_house_k_index < b_house_m_index:
|
|
new_a_solution.extend(offspring_b.solution[b_house_k_index:b_house_m_index])
|
|
else:
|
|
new_a_solution.extend(reversed(offspring_b.solution[b_house_m_index+1:b_house_k_index+1]))
|
|
new_a_solution.extend(offspring_a.solution[a_house_m_index:])
|
|
offspring_a.solution = new_a_solution
|
|
|
|
# offspring_b
|
|
new_b_solution = offspring_b.solution[:b_house_k_index]
|
|
if b_house_k_index < b_house_m_index:
|
|
new_b_solution.extend(offspring_a.solution[a_house_k_index:a_house_m_index])
|
|
else:
|
|
new_b_solution.extend(reversed(offspring_a.solution[a_house_m_index + 1:a_house_k_index + 1]))
|
|
new_b_solution.extend(offspring_b.solution[b_house_m_index:])
|
|
offspring_b.solution = new_b_solution
|
|
|
|
if verbose > 0:
|
|
print(a_house_k_index, a_house_m_index)
|
|
print(b_house_k_index, b_house_m_index)
|
|
|
|
return offspring_a, offspring_b
|
|
|
|
def __str__(self):
|
|
return self.solution.__str__()
|
|
|
|
def _remove_cycles(self, subsolutions):
|
|
for ind, subsolution in enumerate(subsolutions[:-1]):
|
|
# print(subsolutions)
|
|
shorted = subsolution
|
|
modified = False
|
|
beggining_index = 0
|
|
indices_to_remove_left = []
|
|
indices_to_remove_right = []
|
|
#print(len(subsolutions[ind]) == len(set(subsolutions[ind])))
|
|
while beggining_index < len(subsolution[:-2]):
|
|
|
|
for end_index in range(len(subsolution)-1,beggining_index , -1):
|
|
if subsolution[beggining_index] == subsolution[end_index]:
|
|
indices_to_remove_left.append(beggining_index)
|
|
indices_to_remove_right.append(end_index)
|
|
# print("Wycinamy cykl dlugosci", end_index - beggining_index)
|
|
modified = True
|
|
break
|
|
if modified:
|
|
beggining_index += end_index - beggining_index
|
|
else:
|
|
beggining_index += 1
|
|
|
|
|
|
subsolutions[ind] = reduce(lambda z, y :z + y,[subsolution[indx_1:indx_2] for indx_1,indx_2 in zip([0,*indices_to_remove_right], [*indices_to_remove_left, len(subsolution)])])
|
|
#Sprawdzenie czy nie ma cykli
|
|
#print(len(subsolutions[ind]) == len(set(subsolutions[ind])))
|
|
#print(subsolutions)
|
|
|
|
return subsolutions
|
|
|
|
|
|
# given an iterable of pairs return the key corresponding to the greatest value
|
|
def argmax(pairs):
|
|
return max(pairs, key=lambda x: x[1].fitting_function())[0]
|
|
|
|
def argmin(pairs):
|
|
return min(pairs, key=lambda x: x[1].fitting_function())[0]
|
|
|
|
# given an iterable of values return the index of the greatest value
|
|
def argmax_index(values):
|
|
return argmax(enumerate(values))
|
|
|
|
def argmin_index(values):
|
|
return argmin(enumerate(values))
|
|
|
|
def get_best_parent(candidates):
|
|
best_index = argmin_index(candidates)
|
|
return candidates[best_index]
|
|
|
|
def get_worst_chromosome(population):
|
|
worst_index = argmax_index(population)
|
|
return population[worst_index], worst_index
|
|
|
|
|
|
|
|
|
|
def set_world(path):
|
|
array = []
|
|
with open(path) as f:
|
|
content = f.read().splitlines()
|
|
for line in content:
|
|
array.append(line)
|
|
return array
|
|
|
|
def k(START):
|
|
|
|
world_map = set_world('worldmap.txt')
|
|
houses = [
|
|
(1, 5), (2, 11), (6, 6), (6, 12), (9, 3), (9, 7), (9, 17), (12, 6)
|
|
]
|
|
|
|
|
|
|
|
solution = genetic_algoritm(world_map, START, houses, pop_size=20, rounds=200, tournament_size=2, verbose=0)
|
|
|
|
print(solution.solution)
|
|
print(type(solution.solution))
|
|
|
|
actions=[]
|
|
k=0
|
|
for i in solution.solution:
|
|
|
|
|
|
if (k!=0):
|
|
if (i[0]>k[0]):
|
|
actions.append("Go down")
|
|
if (i[0]<k[0]):
|
|
actions.append("Go up")
|
|
if (i[1]>k[1]):
|
|
actions.append("Go right")
|
|
if (i[1]<k[1]):
|
|
actions.append("Go left")
|
|
k=i
|
|
|
|
print(solution.fitting_function())
|
|
print(actions)
|
|
return actions |