code refactoring, comments added

This commit is contained in:
Vadzim Valchkovich 2023-06-15 16:51:27 +02:00
parent 4969ed0c9a
commit dc2feb07cf
14 changed files with 267 additions and 180 deletions

View File

@ -50,5 +50,10 @@
--- ---
- [x] **Sieci neuronowe: wymagania dot. czwartego przyrostu** - [x] **Sieci neuronowe: wymagania dot. czwartego przyrostu**
- [x] Należy przygotować zbiór uczący zawierający co najmniej 1000 przykładów dla każdej klasy. - [x] Należy przygotować zbiór uczący zawierający co najmniej 1000 przykładów dla każdej klasy.
- [x] Agent powinien wykorzystywać wyuczoną sieć w procesie podejmowania decyzji. - [x] Agent powinien wykorzystywać wyuczoną sieć w procesie podejmowania decyzji.
- [ ] **Algorytmy genetyczne: wymagania dot. piątego przyrostu**
- [ ] ???
- [ ] ???

View File

@ -6,13 +6,13 @@ from src.controller.ImageController import ImageController
SCREEN_SIZE = [800, 800] SCREEN_SIZE = [800, 800]
SQUARE_SIZE = 80 SQUARE_SIZE = 80
SLEEP_DURATION = 0.125 ACTION_DURATION = 0.1
COUNT_OF_OBJECTS = 25 COUNT_OF_OBJECTS = 25
store = ImageController(SQUARE_SIZE) store = ImageController(SQUARE_SIZE)
waiter = Waiter([0, 0], 0, SQUARE_SIZE, SCREEN_SIZE, store) waiter = Waiter([0, 0], 0, SQUARE_SIZE, SCREEN_SIZE, store)
kitchen = Kitchen([0, 0], 0, SQUARE_SIZE, SCREEN_SIZE, store) kitchen = Kitchen([0, 0], 0, SQUARE_SIZE, SCREEN_SIZE, store)
engine = Engine(SCREEN_SIZE, SQUARE_SIZE, kitchen, waiter, SLEEP_DURATION) engine = Engine(SCREEN_SIZE, SQUARE_SIZE, kitchen, waiter, ACTION_DURATION)
layout = LayoutController(engine, store).create_and_subscribe(COUNT_OF_OBJECTS) layout = LayoutController(engine, store).create_and_subscribe(COUNT_OF_OBJECTS)
engine.loop() engine.loop()

View File

@ -1,59 +1,64 @@
import time import time
import pygame import pygame
from queue import PriorityQueue
from termcolor import colored
from src.obj.Goal import Goal from src.obj.Goal import Goal
from src.obj.Table import Table
from src.obj.Object import Object from src.obj.Object import Object
from src.obj.Waiter import Waiter from src.obj.Waiter import Waiter
from src.obj.Kitchen import Kitchen from src.obj.Kitchen import Kitchen
from src.obj.PriorityItem import PriorityItem
from src.obj.TemporaryState import TemporaryState
from src.controller.UserController import UserController from src.controller.UserController import UserController
from src.controller.StateController import StateController from src.controller.StateController import StateController
from src.controller.TreeController import TreeController from src.controller.TreeController import TreeController
from src.controller.NeuralNetworkController import NeuralNetworkController from src.controller.NeuralNetworkController import NeuralNetworkController
from queue import PriorityQueue
from termcolor import colored
from src.obj.PriorityItem import PriorityItem
class Engine: class Engine:
def __init__(self, screen_size, square_size, kitchen: Kitchen, waiter: Waiter, sleep_duration): def __init__(self, screen_size, square_size, kitchen: Kitchen, waiter: Waiter, action_duration):
print(colored("Initialization...", "green")) print(colored("Initialization...", "green"))
pygame.display.set_caption('Waiter Agent') pygame.display.set_caption('Waiter Agent')
self.action_clock = 0 self.action_clock = 0
self.sleep_duration = sleep_duration self.action_duration = action_duration
self.tree = TreeController() self.tree = TreeController()
self.kitchen: Kitchen = kitchen self.kitchen: Kitchen = kitchen
self.user: UserController = UserController(waiter) self.waiter: Waiter = waiter
self.state: StateController = StateController(waiter) self.user_c: UserController = UserController()
self.predictor: NeuralNetworkController = NeuralNetworkController() self.state_c: StateController = StateController(waiter)
self.predictor_c: NeuralNetworkController = NeuralNetworkController()
self.screen_size: list[int] = screen_size self.screen_size: list[int] = screen_size
self.screen = pygame.display.set_mode(self.screen_size) self.screen = pygame.display.set_mode(self.screen_size)
self.square_size: int = square_size self.square_size: int = square_size
self.num_squares = self.screen_size[0] // self.square_size self.num_squares = screen_size[0] // square_size
self.squares = self.__init_squares_field__(
self.num_squares, self.square_size)
self.objects: list[Object] = [] self.objects: list[Object] = []
self.goals: list = [] self.goal: Goal = None
self.build_field()
self.runnin: bool = False self.runnin: bool = False
self.paused: bool = False self.paused: bool = False
self.show_fringe: bool = False
def __init_squares_field__(self, num_squares, square_size): def build_field(self):
squares = [] s = self.square_size
for i in range(num_squares): n = self.num_squares
row = [] self.squares = [
for j in range(num_squares): pygame.Rect(
square_rect = pygame.Rect( j * s,
j * square_size, i * square_size, i * s,
square_size, square_size) s, s
row.append(square_rect) )
squares.append(row) for j in range(n) for i in range(n)
]
return squares
def subscribe(self, object: Object): def subscribe(self, object: Object):
self.objects.append(object) self.objects.append(object)
@ -64,119 +69,157 @@ class Engine:
self.running = True self.running = True
while self.running: while self.running:
self.user_interaction()
if not self.paused:
self.action() self.action()
self.redraw() self.redraw()
def user_interaction(self):
self.user_c.handler(engine=self)
def quit(self): def quit(self):
print(colored("Exiting...", "red"))
self.running = False self.running = False
def action(self): def action(self):
self.user.handler(self)
if self.paused:
return
self.predict() # STEP 1. Predict a goal
if not self.state.path: self.predict_goal()
if self.goals:
if not self.state.graphsearch(self) and self.goals: if not self.state_c.path:
self.objects.remove(self.goals.pop().parent)
# STEP 2. Search a path to the goal position
has_path = self.state_c.graphsearch(self)
# STEP 3. Action for unattainable goal
if not has_path and self.goal:
self.unattainable_goal()
else: else:
if not self.state.path[0].compare_pos(self.goals[-1].position):
self.state.reset() # STEP 4. Skip the empty path
if not self.state_c.path[0].on(self.goal.position):
self.state_c.reset()
return return
# went path # STEP 4. Follow the path
state = self.user.obj.changeState(self.state.path.pop()) state = self.waiter.changeState(self.state_c.path.pop())
self.clock_increment(state.cost) self.clock_increment(state.cost)
print(colored("Action:\t", "blue")+f"{state.agent_role}", end='\t') # STEP 5. Log state details
print(colored("Cost:\t", "blue")+f"{state.cost}", end='\t')
print(colored("Cost so far: ", "blue") +
f"{state.cost_so_far}", end='\t')
print(colored("Battery: ", "blue") +
f"{self.user.obj.battery}", end='\t')
print(colored("Basket capacity: ", "blue") +
f"{self.user.obj.basket_capacity}", end='\t')
print(colored("Memory capacity: ", "blue") +
f"{self.user.obj.memory_capacity}")
# waiter interaction self.log_state(state)
# STEP 5. Interactions with the waiter
for o in self.objects: for o in self.objects:
if self.user.obj.chechNeighbor(o): if type(o) is Table:
o.updateState(self.action_clock, self.predictor) if o.is_neighbor_of(self.waiter):
if o.compare_pos(self.user.obj.position): o.updateState(self.action_clock, self.predictor_c)
o.action(self.user.obj, self.action_clock) if self.waiter.on(o.position):
o.action(self.waiter, self.action_clock)
if self.kitchen.compare_pos(self.user.obj.position): if self.waiter.on(self.kitchen.position):
self.kitchen.action(self.user.obj, self.action_clock) self.kitchen.action(self.waiter, self.action_clock)
time.sleep(self.sleep_duration) # STEP 6. Wait
time.sleep(self.action_duration)
def set_goal(self, parent):
self.goal = Goal(parent)
def revoke_goal(self):
self.goal = None
def unattainable_goal(self):
print(colored("Object unattainable", "red"))
self.objects.remove(self.goal.parent)
self.revoke_goal()
def log_state(self, state: TemporaryState):
LOG_COLOR = "blue"
print(
colored("Action:", LOG_COLOR),
f"{state.agent_role:<5}",
colored("Cost:", LOG_COLOR),
f"{state.cost:<3}",
colored("Cost so far:", LOG_COLOR),
f"{state.cost_so_far:<3}",
colored("Battery:", LOG_COLOR),
f"{self.waiter.battery:<3}",
colored("Basket capacity:", LOG_COLOR),
f"{self.waiter.basket_capacity:<3}",
colored("Memory capacity:", LOG_COLOR),
f"{self.waiter.memory_capacity:<3}"
)
def clock_increment(self, action_time): def clock_increment(self, action_time):
self.action_clock += action_time self.action_clock += action_time
def redraw(self): def redraw(self):
if self.paused: # STEP 1. Clean up screen
return
self.screen.fill((255, 255, 255)) self.screen.fill((255, 255, 255))
for row in self.squares: # STEP 2. Draw grid
for square_rect in row: for square_rect in self.squares:
pygame.draw.rect(self.screen, (0, 0, 0), square_rect, 1) pygame.draw.rect(self.screen, (0, 0, 0), square_rect, 1)
# STEP 3. Draw registered objects
for o in self.objects: for o in self.objects:
o.blit(self.screen) o.blit(self.screen)
self.kitchen.blit(self.screen) self.kitchen.blit(self.screen)
self.user.obj.blit(self.screen) self.waiter.blit(self.screen)
for f in self.state.fringe.queue: # STEP 4. Draw visualization of pathfinding
for f in self.state_c.fringe.queue:
f.blit(self.screen) f.blit(self.screen)
for s in self.state.path: for s in self.state_c.path:
s.blit(self.screen) s.blit(self.screen)
if self.goals: # STEP 5. Draw goal object
self.goals[-1].blit(self.screen)
if self.goal:
self.goal.blit(self.screen)
# STEP 6. Apply change
pygame.display.flip() pygame.display.flip()
def appendGoal(self, parent): def predict_goal(self):
self.goals.append(Goal(parent))
def predict(self):
# STEP 1. Prepare priority queue for potential goals
goal_queue = PriorityQueue() goal_queue = PriorityQueue()
# sorting objects by distance to waiter # STEP 2. Sorting objects by distance to waiter
self.objects.sort( self.objects.sort(
key=lambda o: key=lambda o:
o.distance_to( o.distance_to(
self.user.obj.position, self.waiter.position,
mode="advanced" advanced_mode=True
) )
) )
for o in self.objects: for o in self.objects:
condition = o.agent_role in [ # STEP 3. Collecting data about the current state of the tables
"table",
"order",
"wait",
"done"
]
if not condition or o.compare_pos(self.user.obj.position): if not type(o) is Table or o.on(self.waiter.position):
continue continue
medium_dist = (self.screen_size[0] // self.square_size) // 2 medium_dist = (self.screen_size[0] // self.square_size) // 2
dataset = [ dataset = [
# battery # battery
self.user.obj.battery_status(), self.waiter.battery_status(),
# high | low | # high | low |
# distance between kitchen and object # distance between kitchen and object
@ -188,7 +231,7 @@ class Engine:
# undefined | good | bad | # undefined | good | bad |
# basket is empty # basket is empty
1 if self.user.obj.basket_is_empty() else 0, 1 if self.waiter.basket_is_empty() else 0,
# yes | no | # yes | no |
# dish is ready # dish is ready
@ -196,7 +239,7 @@ class Engine:
# yes | no | # yes | no |
# dish in basket # dish in basket
1 if self.user.obj.dish_in_basket(o) else 0, 1 if self.waiter.dish_in_basket(o) else 0,
# yes | no | # yes | no |
# status # status
@ -208,13 +251,21 @@ class Engine:
# yes | no | # yes | no |
] ]
# STEP 4. Make priority prediction
p = self.tree.predict_priority(dataset) p = self.tree.predict_priority(dataset)
# STEP 5. Writing an object to the priority queue
goal_queue.put(PriorityItem(o, p[0])) goal_queue.put(PriorityItem(o, p[0]))
self.goals.clear() # STEP 6. Goal selection
if goal_queue.queue:
# 0 - High priority
# 1 - Low priority
# 2 - Return to kitchen
item: PriorityItem = goal_queue.get() item: PriorityItem = goal_queue.get()
if item.priority == 2: if item.priority == 2:
self.appendGoal(self.kitchen) self.set_goal(self.kitchen)
else: else:
self.appendGoal(item.obj) self.set_goal(item.obj)

View File

@ -32,7 +32,7 @@ class LayoutController():
pos = [0, 0] pos = [0, 0]
while any([o.compare_pos(pos) for o in objects]) or pos == [0, 0]: while any([o.on(pos) for o in objects]) or pos == [0, 0]:
pos = [random.randint(1, num_squares - 1), pos = [random.randint(1, num_squares - 1),
random.randint(1, num_squares - 1)] random.randint(1, num_squares - 1)]

View File

@ -11,33 +11,35 @@ from termcolor import colored
class NeuralNetworkController: class NeuralNetworkController:
def __init__(self, learn: bool = False): def __init__(self, learn: bool = False):
# STEP 1. Model existence check
if learn or not os.path.isfile('model/model.h5'): if learn or not os.path.isfile('model/model.h5'):
self.learn_model() self.learn_model()
# Turn off interactive logging # STEP 2. Turn off interactive logging
tf.get_logger().setLevel(logging.ERROR) tf.get_logger().setLevel(logging.ERROR)
# Load the trained model # STEP 3. Load the trained model
self.model = keras.models.load_model('model/model.h5') self.model = keras.models.load_model('model/model.h5')
# Load the class names # STEP 4. Load the class names
self.class_names = ['table', 'table', 'order'] self.class_names = ['table', 'table', 'order']
# Path to the folder containing test images # STEP 5. Set up path to the folder containing test images
self.test_images_folder = 'dataset/testset' self.test_images_folder = 'dataset/testset'
def predict(self, image_path): def predict(self, image_path):
# Load and preprocess the test image # STEP 1. Load and preprocess the test image
test_image = keras.preprocessing.image.load_img( test_image = keras.preprocessing.image.load_img(
image_path, target_size=(100, 100)) image_path, target_size=(100, 100))
test_image = keras.preprocessing.image.img_to_array(test_image) test_image = keras.preprocessing.image.img_to_array(test_image)
test_image = np.expand_dims(test_image, axis=0) test_image = np.expand_dims(test_image, axis=0)
test_image = test_image / 255.0 # Normalize the image test_image = test_image / 255.0 # Normalize the image
# Reshape the image array to (1, height, width, channels) # STEP 2. Reshape the image array to (1, height, width, channels)
test_image = np.reshape(test_image, (1, 100, 100, 3)) test_image = np.reshape(test_image, (1, 100, 100, 3))
# Make predictions # STEP 3. Make predictions
predictions = self.model.predict(test_image, verbose=None) predictions = self.model.predict(test_image, verbose=None)
predicted_class_index = np.argmax(predictions[0]) predicted_class_index = np.argmax(predictions[0])
predicted_class = self.class_names[predicted_class_index] predicted_class = self.class_names[predicted_class_index]
@ -46,19 +48,25 @@ class NeuralNetworkController:
return predicted_class return predicted_class
def random_path_img(self) -> str: def random_path_img(self) -> str:
# STEP 1. Choise random class
folder_name = random.choice(os.listdir(self.test_images_folder)) folder_name = random.choice(os.listdir(self.test_images_folder))
folder_path = os.path.join(self.test_images_folder, folder_name) folder_path = os.path.join(self.test_images_folder, folder_name)
# STEP 2. Choise random image
filename = "" filename = ""
while not (filename.endswith('.jpg') or filename.endswith('.jpeg')): while not (filename.endswith('.jpg') or filename.endswith('.jpeg')):
filename = random.choice(os.listdir(folder_path)) filename = random.choice(os.listdir(folder_path))
# STEP 3. Join path
image_path = os.path.join(folder_path, filename) image_path = os.path.join(folder_path, filename)
return image_path return image_path
def learn_model(self): def learn_model(self):
print(colored("Model training... ", "green")) print(colored("Model training... ", "green"))
# Normalizes the pixel values of an image to the range [0, 1]. # STEP 1. Defining parameters
# Normalizes the pixel values of an image to the range [0, 1].
def normalize(image, label): def normalize(image, label):
return image / 255, label return image / 255, label
@ -70,7 +78,8 @@ class NeuralNetworkController:
# Set the image size and input shape # Set the image size and input shape
img_width, img_height = 100, 100 img_width, img_height = 100, 100
input_shape = (img_width, img_height, 3) input_shape = (img_width, img_height, 3)
# Load the training and validation data
# STEP 2. Load the training and validation data
train_ds = tf.keras.utils.image_dataset_from_directory( train_ds = tf.keras.utils.image_dataset_from_directory(
train_data_dir, train_data_dir,
validation_split=0.2, validation_split=0.2,
@ -88,13 +97,14 @@ class NeuralNetworkController:
seed=123, seed=123,
image_size=(img_height, img_width), image_size=(img_height, img_width),
batch_size=batch_size) batch_size=batch_size)
# Get the class names
# STEP 3. Get the class names
class_names = train_ds.class_names class_names = train_ds.class_names
print(class_names) print(class_names)
# Normalize the training and validation data # STEP 4. Normalize the training and validation data
train_ds = train_ds.map(normalize) train_ds = train_ds.map(normalize)
val_ds = val_ds.map(normalize) val_ds = val_ds.map(normalize)
# Define the model architecture # STEP 5. Define the model architecture
model = tf.keras.Sequential([ model = tf.keras.Sequential([
layers.Conv2D(16, 3, padding='same', activation='relu', layers.Conv2D(16, 3, padding='same', activation='relu',
input_shape=input_shape), input_shape=input_shape),
@ -107,19 +117,19 @@ class NeuralNetworkController:
layers.Dense(128, activation='relu'), layers.Dense(128, activation='relu'),
layers.Dense(num_classes, activation='softmax') layers.Dense(num_classes, activation='softmax')
]) ])
# Compile the model # STEP 6. Compile the model
model.compile(optimizer='adam', model.compile(optimizer='adam',
loss=tf.keras.losses.SparseCategoricalCrossentropy( loss=tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True), from_logits=True),
metrics=['accuracy']) metrics=['accuracy'])
# Print the model summary # Print the model summary
model.summary() model.summary()
# Train the model # STEP 7. Train the model
epochs = 10 epochs = 10
model.fit(train_ds, model.fit(train_ds,
validation_data=val_ds, validation_data=val_ds,
epochs=epochs) epochs=epochs)
# Save the trained model # STEP 8. Save the trained model
Path("model/").mkdir(parents=True, exist_ok=True) Path("model/").mkdir(parents=True, exist_ok=True)
model.save('model/model.h5') model.save('model/model.h5')

View File

@ -17,10 +17,16 @@ class StateController:
self.fringe = PriorityQueue() self.fringe = PriorityQueue()
def build_path(self, goal_state, engine): def build_path(self, goal_state, engine):
# STEP 1. Checking the cost of the first step
# If we are already at the destination, we set a penalty cost
goal_state.cost = goal_state.cost if goal_state.agent_role != "blank" else 100 goal_state.cost = goal_state.cost if goal_state.agent_role != "blank" else 100
total_cost = goal_state.cost total_cost = goal_state.cost
self.path.append(goal_state) self.path.append(goal_state)
engine.goals.pop()
# STEP 2. Revoking the target
engine.revoke_goal()
# STEP 3. Restoring the path
while self.path[-1].parent.agent_role not in ["blank", "waiter"]: while self.path[-1].parent.agent_role not in ["blank", "waiter"]:
self.path.append(self.path[-1].parent) self.path.append(self.path[-1].parent)
total_cost += self.path[-1].cost total_cost += self.path[-1].cost
@ -29,27 +35,42 @@ class StateController:
return self.path return self.path
def graphsearch(self, engine): # A* def graphsearch(self, engine): # A*
self.goal = list(engine.goals[-1].position) if not engine.goal:
return False
# STEP 1. Store goal position
self.goal = engine.goal.position
print(colored(f"Search path to ", "yellow")+f"{self.goal}") print(colored(f"Search path to ", "yellow")+f"{self.goal}")
# STEP 2. Reset structures
self.reset() self.reset()
# STEP 3. Adding a start state to the queue
start = TemporaryState(self.istate, 0) start = TemporaryState(self.istate, 0)
self.fringe.put(start) self.fringe.put(start)
# STEP 4. Checking for states to explore
while self.fringe.queue and not self.path: while self.fringe.queue and not self.path:
self.explored.append(self.fringe.get())
if self.goal_test(engine):
return
# STEP 5. Taking a state to explore
self.explored.append(self.fringe.get())
# STEP 6. Goal achievement check
if self.goal_test(engine):
return True
# STEP 7. Handling children
self.succ(self.explored[-1].front(), engine) self.succ(self.explored[-1].front(), engine)
self.succ(self.explored[-1].left(), engine) self.succ(self.explored[-1].left(), engine)
self.succ(self.explored[-1].right(), engine) self.succ(self.explored[-1].right(), engine)
# STEP 8. Draw the fringes (optional)
if engine.show_fringe:
engine.redraw() engine.redraw()
# STEP 9. Reset structures
self.reset() self.reset()
print(colored("Not found", "red")) print(colored("Not found", "red"))
@ -58,28 +79,34 @@ class StateController:
def succ(self, state: TemporaryState, engine): def succ(self, state: TemporaryState, engine):
if state.collide_test(): # STEP 1. State capability check
if state.out_of_bounds_check():
return return
elif any(e.compare(state) for e in self.explored): elif any(e.compare(state) for e in self.explored):
return return
elif any([o.collide_test(state) for o in engine.objects]): elif any([o.collide_test(state) for o in engine.objects]):
return return
# STEP 2. Calculate the state cost
for o in engine.objects: for o in engine.objects:
if state.cost != 1: if state.cost != 1:
break break
if o.position == state.position: if o.position == state.position:
state.change_cost(o) state.change_cost(o)
# STEP 3. Calculate the state cost so far
state.cost_so_far = self.explored[-1].cost_so_far + state.cost state.cost_so_far = self.explored[-1].cost_so_far + state.cost
# STEP 4. Check if we have seen this state before
in_explored = any([state.compare(s) for s in self.explored]) in_explored = any([state.compare(s) for s in self.explored])
in_frige = any([state.compare(f) for f in self.fringe.queue]) in_frige = any([state.compare(f) for f in self.fringe.queue])
# If not seen - add to the queue for research
if not in_explored and not in_frige: if not in_explored and not in_frige:
state.heuristic(self.goal) state.heuristic(self.goal)
self.fringe.put(state) self.fringe.put(state)
# If seen - leave with the cheapest way
elif in_frige: elif in_frige:
fringe = state fringe = state
for f in self.fringe.queue: for f in self.fringe.queue:

View File

@ -12,20 +12,19 @@ class TreeController():
self.generateRawDataset() self.generateRawDataset()
self.convertDataset() self.convertDataset()
# importing the dataset from the disk # STEP 1. Importing the dataset from the disk
train_data_m = np.genfromtxt( train_data_m = np.genfromtxt(
"out/dataset.csv", delimiter=",", skip_header=1) "out/dataset.csv", delimiter=",", skip_header=1)
# Separate the attributes and labels # STEP 2. Separate the attributes and labels
self.X_train = [data[:-1] for data in train_data_m] self.X_train = [data[:-1] for data in train_data_m]
self.y_train = [data[-1] for data in train_data_m] self.y_train = [data[-1] for data in train_data_m]
# Create the decision tree classifier using the ID3 algorithm # STEP 3. Create the decision tree classifier using the ID3 algorithm
self.clf = tree.DecisionTreeClassifier( self.clf = tree.DecisionTreeClassifier(
criterion='entropy', splitter="best") criterion='entropy', splitter="best")
# clf = tree.DecisionTreeClassifier(criterion='gini')
# Train the decision tree on the training data # STEP 4. Train the decision tree on the training data
self.clf.fit(self.X_train, self.y_train) self.clf.fit(self.X_train, self.y_train)
if exportText: if exportText:
@ -64,7 +63,7 @@ class TreeController():
for status in status_values: for status in status_values:
for actual in other: for actual in other:
dataset = self.buildDataset( dataset = [
battery, battery,
distance, distance,
mood, mood,
@ -73,7 +72,10 @@ class TreeController():
dish, dish,
status, status,
actual actual
) ]
priority = self.getPriority(dataset)
dataset.append(priority)
training_data.append(dataset) training_data.append(dataset)
@ -82,22 +84,6 @@ class TreeController():
writer = csv.writer(file) writer = csv.writer(file)
writer.writerows(training_data) writer.writerows(training_data)
def buildDataset(self, b, d, m, e, r, i, s, a) -> list:
dataset = [
b, # battery
d, # distance
m, # mood
e, # basket is empty
r, # ready
i, # dish in basket
s, # status
a, # actual
]
dataset.append(self.getPriority(dataset))
return dataset
def getPriority(self, dataset) -> str: def getPriority(self, dataset) -> str:
PRIORITY = { PRIORITY = {
'high': 'high priority', 'high': 'high priority',
@ -154,7 +140,7 @@ class TreeController():
9: ["high priority", "low priority", "return to kitchen"] 9: ["high priority", "low priority", "return to kitchen"]
} }
# Create a mapping dictionary for attribute values # STEP 1. Create a mapping dictionary for attribute values
mapping = {} mapping = {}
for attr, values in attributes.items(): for attr, values in attributes.items():
mapping[attr] = {value: index for index, mapping[attr] = {value: index for index,
@ -162,7 +148,7 @@ class TreeController():
converted_dataset = [] converted_dataset = []
# Read the input CSV file # STEP 2. Read the input CSV file
Path("out/").mkdir(parents=True, exist_ok=True) Path("out/").mkdir(parents=True, exist_ok=True)
with open("out/rawDataset.csv", "r") as csvfile: with open("out/rawDataset.csv", "r") as csvfile:
reader = csv.reader(csvfile) reader = csv.reader(csvfile)
@ -170,11 +156,11 @@ class TreeController():
header = next(reader) # Skip the header row header = next(reader) # Skip the header row
converted_dataset.append(header) converted_dataset.append(header)
# Convert the data rows # STEP 3. Convert the data rows
for row in reader: for row in reader:
converted_row = [] converted_row = []
for i, value in enumerate(row): for i, value in enumerate(row):
# Convert the attribute values # STEP 4. Convert the attribute values
if i + 1 in mapping: if i + 1 in mapping:
converted_value = mapping[i + 1][value] converted_value = mapping[i + 1][value]
else: else:
@ -184,17 +170,17 @@ class TreeController():
converted_dataset.append(converted_row) converted_dataset.append(converted_row)
# Write the converted dataset to a new CSV file # STEP 5. Write the converted dataset to a new CSV file
Path("out/").mkdir(parents=True, exist_ok=True) Path("out/").mkdir(parents=True, exist_ok=True)
with open("out/dataset.csv", "w", newline="") as csvfile: with open("out/dataset.csv", "w", newline="") as csvfile:
writer = csv.writer(csvfile) writer = csv.writer(csvfile)
# Write the converted data rows # STEP 6. Write the converted data rows
for row in converted_dataset: for row in converted_dataset:
writer.writerow(row) writer.writerow(row)
def exportText(self): def exportText(self):
# Visualize the trained decision tree # STEP 1. Visualize the trained decision tree
tree_text = tree.export_text(self.clf, feature_names=[ tree_text = tree.export_text(self.clf, feature_names=[
"Battery level", "Battery level",
"Distance between kitchen and table", "Distance between kitchen and table",
@ -206,11 +192,13 @@ class TreeController():
"Is actual", "Is actual",
]) ])
# STEP 2. Save the visualization as a text file
Path("out/").mkdir(parents=True, exist_ok=True) Path("out/").mkdir(parents=True, exist_ok=True)
with open('out/decision_tree.txt', 'w') as f: with open('out/decision_tree.txt', 'w') as f:
f.write(tree_text) # Save the visualization as a text file f.write(tree_text)
def exportPdf(self): def exportPdf(self):
# STEP 1. Visualize the trained decision tree
dot_data = tree.export_graphviz(self.clf, out_file=None, feature_names=[ dot_data = tree.export_graphviz(self.clf, out_file=None, feature_names=[
"Battery level", "Battery level",
"Distance between kitchen and table", "Distance between kitchen and table",
@ -227,7 +215,7 @@ class TreeController():
], filled=True, rounded=True) ], filled=True, rounded=True)
graph = graphviz.Source(dot_data) graph = graphviz.Source(dot_data)
# Save the visualization as a PDF file # STEP 2. Save the visualization as a PDF file
Path("out/").mkdir(parents=True, exist_ok=True) Path("out/").mkdir(parents=True, exist_ok=True)
graph.render("out/decision_tree") graph.render("out/decision_tree")

View File

@ -3,15 +3,20 @@ from termcolor import colored
class UserController: class UserController:
def __init__(self, usrObj):
self.obj = usrObj
def handler(self, engine): def handler(self, engine):
for event in pygame.event.get(): for event in pygame.event.get():
if event.type == pygame.QUIT: if event.type == pygame.QUIT:
engine.quit() engine.quit()
elif event.type == pygame.KEYDOWN: elif event.type == pygame.KEYDOWN:
if event.key == pygame.K_SPACE: if event.key == pygame.K_ESCAPE:
engine.quit()
elif event.key == pygame.K_f:
engine.show_fringe = not engine.show_fringe
if engine.show_fringe:
print(colored("The fringes are shown", "green"))
else:
print(colored("The fringes are hidden", "green"))
elif event.key == pygame.K_SPACE:
engine.paused = not engine.paused engine.paused = not engine.paused
if engine.paused: if engine.paused:
print(colored("Paused", "red")) print(colored("Paused", "red"))

View File

@ -3,9 +3,11 @@ from src.obj.Object import Object
class Goal(Object): class Goal(Object):
def __init__(self, parent: Object): def __init__(self, parent: Object):
super().__init__("goal", parent.position, 0, super().__init__(
parent.square_size, parent.screen_size, parent.store) "goal",
parent.position, 0,
parent.square_size,
parent.screen_size,
parent.store
)
self.parent = parent self.parent = parent
def collide_test(self, waiter: Object) -> bool:
return waiter.position == self.position

View File

@ -1,11 +1,12 @@
from src.obj.Object import Object from src.obj.Object import Object
from src.obj.Table import Table
class Kitchen(Object): class Kitchen(Object):
def __init__(self, position, orientation, square_size, screen_size, store): def __init__(self, position, orientation, square_size, screen_size, store):
super().__init__("kitchen", position, orientation, square_size, screen_size, store) super().__init__("kitchen", position, orientation, square_size, screen_size, store)
self.cooking = [] self.cooking: list(Table) = []
self.done = [] self.done: list(Table) = []
def action(self, waiter, current_time): def action(self, waiter, current_time):
self.cook_dishes(current_time) self.cook_dishes(current_time)

View File

@ -44,23 +44,24 @@ class Object:
self.rect.y = self.position[1] * self.square_size self.rect.y = self.position[1] * self.square_size
screen.blit(image, self.rect) screen.blit(image, self.rect)
def compare_pos(self, pos) -> bool: def on(self, position) -> bool:
return self.position == pos return self.position == position
def distance_to(self, pos, mode="basic") -> int: def distance_to(self, pos, advanced_mode: bool = False) -> int:
x = abs(self.position[0] - pos[0]) x = abs(self.position[0] - pos[0])
y = abs(self.position[1] - pos[1]) y = abs(self.position[1] - pos[1])
self.h = ( self.h = (
math.sqrt(pow(x, 2) + pow(y, 2)) math.sqrt(pow(x, 2) + pow(y, 2))
if mode == "advanced" if advanced_mode
else x + y else x + y
) )
return self.h return self.h
def action(self, obj): def is_neighbor_of(self, obj, r=1):
pass
def updateState(self, current_time, predictor): cond_x = abs(self.position[0] - obj.position[0]) <= r
pass cond_y = abs(self.position[1] - obj.position[1]) <= r
return cond_x and cond_y

View File

@ -1,6 +1,7 @@
import random import random
from src.obj.Object import Object from src.obj.Object import Object
from src.obj.Mark import Mark from src.obj.Mark import Mark
from src.controller.NeuralNetworkController import NeuralNetworkController
class Table(Object): class Table(Object):
@ -30,7 +31,7 @@ class Table(Object):
if self.mark: if self.mark:
self.mark.blit(screen) self.mark.blit(screen)
def updateState(self, current_time, predictor): def updateState(self, current_time, predictor: NeuralNetworkController):
if self.is_actual: if self.is_actual:
if self.agent_role == "table": if self.agent_role == "table":
self.waiting_time = current_time self.waiting_time = current_time

View File

@ -46,7 +46,7 @@ class TemporaryState(Waiter):
def front(self): def front(self):
return TemporaryState(self, self.cost_so_far, "front", 1) return TemporaryState(self, self.cost_so_far, "front", 1)
def collide_test(self) -> bool: def out_of_bounds_check(self) -> bool:
out_of_range = [ out_of_range = [
self.position[0] >= self.square_count, self.position[0] >= self.square_count,
self.position[1] >= self.square_count, self.position[1] >= self.square_count,

View File

@ -1,5 +1,8 @@
import copy import copy
from src.obj.Table import Table
from src.obj.Object import Object from src.obj.Object import Object
from src.obj.Kitchen import Kitchen
class Waiter(Object): class Waiter(Object):
@ -22,7 +25,7 @@ class Waiter(Object):
self.battery -= state.cost self.battery -= state.cost
return state return state
def dish_in_basket(self, table) -> bool: def dish_in_basket(self, table: Table) -> bool:
return table in self.basket return table in self.basket
def basket_is_full(self) -> bool: def basket_is_full(self) -> bool:
@ -31,7 +34,7 @@ class Waiter(Object):
def basket_is_empty(self) -> bool: def basket_is_empty(self) -> bool:
return self.basket_capacity == 4 return self.basket_capacity == 4
def combine_orders(self, current_time, kitchen): def combine_orders(self, current_time, kitchen: Kitchen):
# take orders # take orders
for table in self.memory: for table in self.memory:
@ -47,7 +50,7 @@ class Waiter(Object):
self.basket.append(dish) self.basket.append(dish)
self.basket_capacity -= 1 self.basket_capacity -= 1
def deliver_dish(self, table, current_time): def deliver_dish(self, table: Table, current_time):
if table in self.basket: if table in self.basket:
table.reset_role(current_time) table.reset_role(current_time)
self.basket.remove(table) self.basket.remove(table)
@ -87,10 +90,3 @@ class Waiter(Object):
self.position[0] += self.orientation - 2 # x (-1 or +1) self.position[0] += self.orientation - 2 # x (-1 or +1)
else: # y (0 or 2) else: # y (0 or 2)
self.position[1] += self.orientation - 1 # y (-1 or +1) self.position[1] += self.orientation - 1 # y (-1 or +1)
def chechNeighbor(self, n, r=1):
cond_x = abs(self.position[0] - n.position[0]) <= r
cond_y = abs(self.position[1] - n.position[1]) <= r
return cond_x and cond_y