preparation to neural network implementation

This commit is contained in:
Vadzim Valchkovich 2023-06-01 17:45:01 +02:00
parent e9b3a685e6
commit 3651768e3e
10 changed files with 172 additions and 92 deletions

View File

@ -11,17 +11,16 @@ SCREEN_SIZE = [800, 800]
SQUARE_SIZE = 40
waiter = Waiter([0, 0], 0, SQUARE_SIZE, SCREEN_SIZE)
objects = [
Kitchen([0, 0], 0, SQUARE_SIZE, SCREEN_SIZE)
]
kitchen = Kitchen([0, 0], 0, SQUARE_SIZE, SCREEN_SIZE)
objects = []
for i in range(150):
pos = [0, 0]
while any([o.compare_pos(pos) for o in objects]):
pos = [random.randint(1, SCREEN_SIZE[0]/SQUARE_SIZE),
random.randint(1, SCREEN_SIZE[0]/SQUARE_SIZE)]
while any([o.compare_pos(pos) for o in objects]) or pos == [0, 0]:
pos = [random.randint(1, SCREEN_SIZE[0]/SQUARE_SIZE - 1),
random.randint(1, SCREEN_SIZE[0]/SQUARE_SIZE - 1)]
if (random.randint(0, 1)):
objects.append(Block(pos, 0, SQUARE_SIZE, SCREEN_SIZE))
@ -30,7 +29,7 @@ for i in range(150):
user = UserController(waiter)
state = StateController(waiter)
engine = Engine(SCREEN_SIZE, SQUARE_SIZE, user, state)
engine = Engine(SCREEN_SIZE, SQUARE_SIZE, kitchen, user, state)
for o in objects:
engine.subscribe(o)

View File

@ -1,7 +1,9 @@
import time
import pygame
from .obj.Goal import Goal
from .obj.Object import Object
from .obj.Waiter import Waiter
from .obj.Kitchen import Kitchen
from .UserController import UserController
from .StateController import StateController
from .decisionTree.TreeConcept import TreeEngine
@ -10,12 +12,13 @@ from queue import PriorityQueue
class Engine:
def __init__(self, screen_size, square_size, user: UserController, state: StateController):
def __init__(self, screen_size, square_size, kitchen: Kitchen, user: UserController, state: StateController):
pygame.display.set_caption('Waiter Agent')
self.action_clock = 0
self.tree = TreeEngine()
self.kitchen: Kitchen = kitchen
self.user: Waiter = user
self.state: StateController = state
self.screen_size: list[int] = screen_size
@ -79,8 +82,13 @@ class Engine:
# waiter interaction
for o in self.objects:
if self.user.obj.chechNeighbor(o):
o.updateState(self.action_clock)
if o.compare_pos(self.user.obj.position):
o.action(self.user.obj)
o.action(self.user.obj, self.action_clock)
if self.kitchen.compare_pos(self.user.obj.position):
self.kitchen.action(self.user.obj, self.action_clock)
time.sleep(0.5)
@ -97,6 +105,7 @@ class Engine:
for o in self.objects:
o.blit(self.screen)
self.kitchen.blit(self.screen)
self.user.obj.blit(self.screen)
for f in self.state.fringe.queue:
@ -105,10 +114,13 @@ class Engine:
for s in self.state.path:
s.blit(self.screen)
if self.goals:
self.goals[-1].blit(self.screen)
pygame.display.flip()
def appendGoalPosition(self, position):
self.goals.append(position)
self.goals.append(Goal(position, self.square_size, self.screen_size))
def predict(self):
@ -117,6 +129,7 @@ class Engine:
for o in self.objects:
condition = o.agent_role in [
"table",
"order",
"wait",
"done"
@ -125,33 +138,48 @@ class Engine:
if not condition or o.compare_pos(self.user.obj.position):
continue
battery = self.user.obj.battary_status()[0]
distance = o.distance_to(
self.user.obj.position) // (self.num_squares // 2)
mood = o.get_mood(self.action_clock)[0] if condition else 0
memory = self.user.obj.memory_size
dishes_held = self.user.obj.basket_size
customers = o.customers if condition else 0
waiting_for_order = o.is_order() if condition else 0
waiting_for_dish = o.is_done() if condition else 0
medium_dist = (self.screen_size[0] // self.square_size) // 2
p = self.tree.make_predict(
battery,
distance,
mood,
memory,
dishes_held,
customers,
waiting_for_order,
waiting_for_dish
)
dataset = [
# battery
self.user.obj.battery_status(),
# high | low |
# distance between kitchen and object
0 if o.distance_to(self.kitchen.position) > medium_dist else 1,
# far | close |
# mood 
o.get_mood(self.action_clock),
# undefined | good | bad |
# basket is empty
1 if self.user.obj.basket_is_empty() else 0,
# yes | no |
# dish is ready
1 if o.dish_is_ready(self.action_clock) else 0,
# yes | no |
# dish in basket
1 if self.user.obj.dish_in_basket(o) else 0,
# yes | no |
# status
o.get_state_number(),
# empty | new order | waiting for dish | have a dish |
# is actual
1 if o.isActual() else 0,
# yes | no |
]
p = self.tree.make_predict(dataset)
goal_queue.put((p, o.position))
if len(goal_queue.queue):
priority, goal = goal_queue.queue.pop()
if priority:
print(goal, priority, end='\r')
self.appendGoalPosition(goal)
if goal_queue.queue:
priority, goal = goal_queue.queue[0]
if priority == 2:
self.appendGoalPosition(self.kitchen.position)
else:
print(goal, priority, end='\r')
self.appendGoalPosition(goal)

View File

@ -15,9 +15,10 @@ class StateController:
self.explored.clear()
self.fringe = PriorityQueue()
def build_path(self, goal_state):
def build_path(self, goal_state, engine):
total_cost = goal_state.cost
self.path.append(goal_state)
engine.goals.pop()
while self.path[-1].parent.agent_role not in ["blank", "waiter"]:
self.path.append(self.path[-1].parent)
total_cost += self.path[-1].cost
@ -28,7 +29,7 @@ class StateController:
def graphsearch(self, engine): # A*
print("Search path")
self.goal = list(engine.goals.pop())
self.goal = list(engine.goals[-1].position)
self.reset()
@ -36,13 +37,10 @@ class StateController:
self.fringe.put(start)
while self.fringe and not self.path:
while self.fringe.queue and not self.path:
self.explored.append(self.fringe.get())
if self.explored[-1].position == self.goal:
goal_state = self.explored[-1]
self.reset()
return self.build_path(goal_state)
if self.goal_test(engine):
return
self.succ(self.explored[-1].front(), engine)
self.succ(self.explored[-1].left(), engine)
@ -51,6 +49,10 @@ class StateController:
engine.redraw()
self.reset()
for o in engine.objects:
o.compare_pos(engine.goals[-1].position)
o.agent_role = "block"
engine.goals.pop()
print("Not found")
@ -90,3 +92,19 @@ class StateController:
if state.cost_so_far < fringe.cost_so_far:
fringe.replace(state)
def goal_test(self, engine) -> bool:
if self.explored[-1].position == self.goal:
self.__is_goal__(self.explored[-1], engine)
return True
for fringe in self.fringe.queue:
if fringe.position == self.goal:
self.__is_goal__(fringe, engine)
return True
return False
def __is_goal__(self, goal_state, engine):
self.reset()
self.build_path(goal_state, engine)

View File

@ -9,10 +9,3 @@ class UserController:
for event in pygame.event.get():
if event.type == pygame.QUIT:
engine.quit()
elif event.type == pygame.MOUSEBUTTONDOWN:
pos = pygame.mouse.get_pos()
pos = [pos[0] // engine.square_size,
pos[1] // engine.square_size]
for o in engine.objects:
if o.compare_pos(pos):
o.set_order(engine.action_clock)

BIN
src/img/goal.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 46 KiB

9
src/obj/Goal.py Normal file
View File

@ -0,0 +1,9 @@
from src.obj.Object import Object
class Goal(Object):
def __init__(self, position, square_size, screen_size):
super().__init__("goal", position, 0, square_size, screen_size)
def collide_test(self, waiter: Object) -> bool:
return waiter.position == self.position

View File

@ -5,6 +5,6 @@ class Kitchen(Object):
def __init__(self, position, orientation, square_size, screen_size):
super().__init__("kitchen", position, orientation, square_size, screen_size)
def action(self, waiter):
waiter.combine_orders()
def action(self, waiter, current_time):
waiter.combine_orders(current_time)
waiter.recharge()

View File

@ -57,3 +57,6 @@ class Object:
def action(self, obj):
pass
def updateState(self, current_time):
pass

View File

@ -4,29 +4,59 @@ from src.obj.Object import Object
class Table(Object):
def __init__(self, position, orientation, square_size, screen_size):
super().__init__("order", position, orientation, square_size, screen_size)
self.order_time = 0
self.customers = 0
# roles = ["table", "order", "wait", "done"]
super().__init__("table", position, orientation, square_size, screen_size)
self.waiting_time = 0
self.cooking_time = 0
self.is_actual = False
def reset(self):
self.change_role("table")
self.order_time = 0
self.customers = 0
def isActual(self):
return self.is_actual
def updateState(self, current_time):
if self.is_actual:
return
self.is_actual = True
# here must be neural network choise
new_role = random.choice(["table", "order", "wait", "done"])
self.change_role(new_role, current_time)
if self.agent_role == "table":
return
elif self.agent_role == "wait":
self.cooking_time = random.randint(0, 300)
def dish_is_ready(self, current_time):
return current_time - self.waiting_time > self.cooking_time
def get_state_number(self) -> int:
roles = {
"table": 0,
"order": 1,
"wait": 2,
"done": 3
}
return roles[self.agent_role]
def change_role(self, new_role, current_time):
self.waiting_time = current_time
return super().change_role(new_role)
def reset(self, current_time):
self.is_actual = False
self.change_role("table", current_time)
def set_order(self, current_time):
if self.agent_role == "table":
self.change_role("order")
self.customers = random.randint(1, 6)
self.order_time = current_time
self.change_role("order", current_time)
def set_wait(self):
def set_wait(self, current_time):
if self.agent_role == "order":
self.change_role("wait")
self.change_role("wait", current_time)
def set_done(self):
def set_done(self, current_time):
if self.agent_role == "wait":
self.change_role("done")
self.change_role("done", current_time)
def is_order(self):
return self.agent_role == "order"
@ -40,20 +70,15 @@ class Table(Object):
def get_customers_count(self) -> int:
return self.customers
def get_mood(self, current_time) -> str:
def get_mood(self, current_time) -> int: # перапісаць
if self.agent_role == "table":
return None
return 2 # undefined
diff = current_time - self.order_time
if diff < 100:
return (0, "good")
elif diff < 200:
return (1, "medium")
else:
return (2, "bad")
diff = current_time - self.waiting_time
return 0 if diff >= 300 else 1 # 0 - bad; 1 - good
def action(self, waiter):
def action(self, waiter, current_time):
if self.is_order():
waiter.collect_order(self)
waiter.collect_order(self, current_time)
elif self.is_done():
waiter.deliver_dish(self)
waiter.deliver_dish(self, current_time)

View File

@ -26,17 +26,20 @@ class Waiter(Object):
def basket_is_full(self) -> bool:
return self.basket_size == 0
def combine_orders(self):
def basket_is_empty(self) -> bool:
return self.basket_size == 4
def combine_orders(self, current_time):
while not self.basket_is_full() and not self.memory_is_empty():
dish = self.memory.pop()
dish.set_done()
dish.set_done(current_time)
self.basket.append(dish)
self.basket_size -= 1
self.memory_size += 1
def deliver_dish(self, table):
def deliver_dish(self, table, current_time):
if table in self.basket:
table.reset()
table.reset(current_time)
self.basket.remove(table)
self.basket_size += 1
@ -49,21 +52,16 @@ class Waiter(Object):
def memory_is_full(self) -> bool:
return self.memory_size == 0
def collect_order(self, table):
def collect_order(self, table, current_time):
if self.memory_is_full():
return
if table.agent_role == "order":
table.set_wait()
table.set_wait(current_time)
self.memory.append(table)
self.memory_size -= 1
def battary_status(self) -> str:
if self.battery >= 200:
return (2, "hight")
elif self.battery >= 100:
return (1, "medium")
else:
return (0, "low")
def battery_status(self) -> int:
return 1 if self.battery >= 100 else 0
def recharge(self):
self.battery = 300
@ -79,3 +77,10 @@ class Waiter(Object):
self.position[0] += self.orientation - 2 # x (-1 or +1)
else: # y (0 or 2)
self.position[1] += self.orientation - 1 # y (-1 or +1)
def chechNeighbor(self, n, r=1):
cond_x = abs(self.position[0] - n.position[0]) <= r
cond_y = abs(self.position[1] - n.position[1]) <= r
return cond_x and cond_y