156 lines
5.2 KiB
Python
156 lines
5.2 KiB
Python
import re
|
|
from config import MAP_NAME, GRID_WIDTH, GRID_HEIGHT, GC_X, GC_Y
|
|
from VowpalWabbit.VowpalWrapper import wrapper
|
|
|
|
#const
|
|
RADIUS = 1
|
|
##
|
|
|
|
COORDINATES_LIST = []
|
|
MOVES_LIST = []
|
|
|
|
with open( MAP_NAME, 'r' ) as map:
|
|
MAP_CONTENT = map.readlines()[2:]
|
|
MAP_CONTENT = [list(row.strip().replace(" ","")) for row in MAP_CONTENT]
|
|
|
|
moves_mapping = {
|
|
"pick_garbage": 1,
|
|
"right": 2,
|
|
"left": 3,
|
|
"up": 4,
|
|
"down": 5
|
|
}
|
|
|
|
predictions_mapping = {
|
|
1 : "pick_garbage",
|
|
2 : "right",
|
|
3 : "left",
|
|
4 : "up",
|
|
5 : "down"
|
|
}
|
|
|
|
def parse_list(whole_result,current_x,current_y):
|
|
global COORDINATES_LIST, MOVES_LIST
|
|
COORDINATES_LIST = whole_result.copy()
|
|
moves = []
|
|
print("x,y",current_x,current_y,"list",whole_result)
|
|
parser = {'[0,1]':"down",'[0,-1]':"up",'[1,0]':"right",'[-1,0]':"left"}
|
|
for x in range(len(whole_result)):
|
|
if whole_result[x]=="pick_garbage":
|
|
moves.append(whole_result[x])
|
|
else:
|
|
x_subtraction = whole_result[x][0] - current_x
|
|
y_subtraction = whole_result[x][1] - current_y
|
|
current_x = whole_result[x][0]
|
|
current_y = whole_result[x][1]
|
|
moves.append(parser[f"[{x_subtraction},{y_subtraction}]"])
|
|
print(moves)
|
|
MOVES_LIST = moves.copy()
|
|
generate_input(1.0)
|
|
return moves
|
|
|
|
def generate_input(importance):
|
|
i = 1 #we'll use it to map coords to moves
|
|
input_file_content = []
|
|
for position in COORDINATES_LIST[:len(COORDINATES_LIST)-1]:
|
|
|
|
coords = check_position(position, i) #set valid gc position
|
|
|
|
#vowpal config goes here
|
|
label = moves_mapping[MOVES_LIST[i]]
|
|
tag = re.findall("(map_[0-9]+|map[0-9]+_auto)", MAP_NAME)[0]
|
|
gc_position = "|GC_Position "+str(coords[0])+","+str(coords[1])
|
|
input_line = str(label) + " " + str(importance) + " " + tag + gc_position+" |GC_Area "
|
|
|
|
area = get_gc_area(coords, RADIUS)
|
|
for a in area:
|
|
input_line += a + " "
|
|
|
|
i += 1
|
|
input_file_content.append(input_line)
|
|
|
|
#save to file
|
|
filename = "./VowpalWabbit/VowPalInputData/input_" + str(tag) + ".txt"
|
|
input_file = open(filename,"w+")
|
|
for line in input_file_content:
|
|
input_file.write(line+"\n")
|
|
input_file.close()
|
|
|
|
def pass_input(position):
|
|
label = 0
|
|
tag = re.findall("(map_[0-9]+|map[0-9]+_auto)", MAP_NAME)[0]
|
|
gc_position = "|GC_Position "+str(position[0])+","+str(position[1])
|
|
input_line = str(label) + " 1.0 " + tag + gc_position+" |GC_Area "
|
|
area = get_gc_area(position, RADIUS)
|
|
for a in area:
|
|
input_line += a + " "
|
|
|
|
print(input_line)
|
|
|
|
#save to file
|
|
filename = "./VowpalWabbit/VowpalDataCache/constant_input.txt"
|
|
input_file = open(filename,"w+")
|
|
input_file.write(input_line)
|
|
input_file.close()
|
|
return filename
|
|
|
|
|
|
def get_gc_area(position, radius):
|
|
area = []
|
|
for x in range(max(0, position[0] - radius), min(position[0] + radius + 1, GRID_WIDTH)): #prevents giong abroad
|
|
for y in range(max(0, position[1] - radius), min(position[1] + radius + 1, GRID_HEIGHT)):
|
|
if([x,y] == position): #we dont need gc data here
|
|
continue
|
|
area.append(MAP_CONTENT[y][x]+"("+str(x)+","+str(y)+")")
|
|
return area
|
|
|
|
def check_position(position, i):
|
|
if(type(position) is list): #if position valid, return it
|
|
return position
|
|
elif(position == "pick_garbage"): #if invalid, look for recent coords. if not found, return initial coords
|
|
for j in range(i-1,-1,-1):
|
|
if(type(COORDINATES_LIST[j]) is list):
|
|
return COORDINATES_LIST[j]
|
|
return [GC_X, GC_Y]
|
|
else: #in case sh t happened
|
|
print("An error has ocurred while processing GC position.")
|
|
|
|
def get_predicted_move(position):
|
|
input_filename = pass_input(position)
|
|
output_filename = "./VowpalWabbit/VowpalDataCache/constant_output.txt"
|
|
wrapper.wrap_ex("vw -i ./VowpalWabbit/VowpalModels/1k_001.model -t "+input_filename+" -p "+output_filename+" --quiet")
|
|
with open( output_filename, 'r' ) as fout:
|
|
prediction = float(list(fout.readline().split())[0])
|
|
move = make_move_from_prediction(prediction)
|
|
print(position, prediction, move)
|
|
if(move == "pick_garbage"):
|
|
new_position = move
|
|
else:
|
|
axis = 0
|
|
if(move in ["up", "down"]):
|
|
axis = 1
|
|
direction = 1
|
|
if(move in ["up", "left"]):
|
|
direction = -1
|
|
new_position = position.copy()
|
|
new_position[axis] += direction
|
|
if(axis == 1 and (new_position[axis] < 0 or new_position[axis] >= GRID_HEIGHT)):
|
|
new_position = position.copy()
|
|
print("VIOLATED GRID HEIGHT")
|
|
if(axis == 0 and (new_position[axis] < 0 or new_position[axis] >= GRID_WIDTH)):
|
|
new_position = position.copy()
|
|
print("VIOLATED GRID WIDTH")
|
|
return move, new_position
|
|
|
|
def make_move_from_prediction(prediction):
|
|
if(prediction > 4.5):
|
|
move = predictions_mapping[5]
|
|
elif(prediction > 3.5):
|
|
move = predictions_mapping[4]
|
|
elif(prediction > 2.5):
|
|
move = predictions_mapping[3]
|
|
elif(prediction > 1.5):
|
|
move = predictions_mapping[2]
|
|
else:
|
|
move = predictions_mapping[1]
|
|
return move |