AI-Project/survival/esper.py
2021-03-28 18:06:22 +02:00

369 lines
15 KiB
Python

import time as _time
from functools import lru_cache as _lru_cache
from typing import List as _List
from typing import Type as _Type
from typing import TypeVar as _TypeVar
from typing import Any as _Any
from typing import Tuple as _Tuple
from typing import Iterable as _Iterable
version = '1.3'
C = _TypeVar('C')
P = _TypeVar('P')
class Processor:
"""Base class for all Processors to inherit from.
Processor instances must contain a `process` method. Other than that,
you are free to add any additional methods that are necessary. The process
method will be called by each call to `World.process`, so you will
generally want to iterate over entities with one (or more) calls to the
appropriate world methods there, such as
`for ent, (rend, vel) in self.world.get_components(Renderable, Velocity):`
"""
world = None
def process(self, *args, **kwargs):
raise NotImplementedError
class World:
"""A World object keeps track of all Entities, Components, and Processors.
A World contains a database of all Entity/Component assignments. The World
is also responsible for executing all Processors assigned to it for each
frame of your game.
"""
def __init__(self, timed=False):
self._processors = []
self._next_entity_id = 0
self._components = {}
self._entities = {}
self._dead_entities = set()
if timed:
self.process_times = {}
self._process = self._timed_process
def clear_cache(self) -> None:
self.get_component.cache_clear()
self.get_components.cache_clear()
def clear_database(self) -> None:
"""Remove all Entities and Components from the World."""
self._next_entity_id = 0
self._dead_entities.clear()
self._components.clear()
self._entities.clear()
self.clear_cache()
def add_processor(self, processor_instance: Processor, priority=0) -> None:
"""Add a Processor instance to the World.
:param processor_instance: An instance of a Processor,
subclassed from the Processor class
:param priority: A higher number is processed first.
"""
assert issubclass(processor_instance.__class__, Processor)
processor_instance.priority = priority
processor_instance.world = self
self._processors.append(processor_instance)
self._processors.sort(key=lambda proc: proc.priority, reverse=True)
def remove_processor(self, processor_type: Processor) -> None:
"""Remove a Processor from the World, by type.
:param processor_type: The class type of the Processor to remove.
"""
for processor in self._processors:
if type(processor) == processor_type:
processor.world = None
self._processors.remove(processor)
def get_processor(self, processor_type: _Type[P]) -> P:
"""Get a Processor instance, by type.
This method returns a Processor instance by type. This could be
useful in certain situations, such as wanting to call a method on a
Processor, from within another Processor.
:param processor_type: The type of the Processor you wish to retrieve.
:return: A Processor instance that has previously been added to the World.
"""
for processor in self._processors:
if type(processor) == processor_type:
return processor
def create_entity(self, *components) -> int:
"""Create a new Entity.
This method returns an Entity ID, which is just a plain integer.
You can optionally pass one or more Component instances to be
assigned to the Entity.
:param components: Optional components to be assigned to the
entity on creation.
:return: The next Entity ID in sequence.
"""
self._next_entity_id += 1
# TODO: duplicate add_component code here for performance
for component in components:
self.add_component(self._next_entity_id, component)
# self.clear_cache()
return self._next_entity_id
def delete_entity(self, entity: int, immediate=False) -> None:
"""Delete an Entity from the World.
Delete an Entity and all of it's assigned Component instances from
the world. By default, Entity deletion is delayed until the next call
to *World.process*. You can request immediate deletion, however, by
passing the "immediate=True" parameter. This should generally not be
done during Entity iteration (calls to World.get_component/s).
Raises a KeyError if the given entity does not exist in the database.
:param entity: The Entity ID you wish to delete.
:param immediate: If True, delete the Entity immediately.
"""
if immediate:
for component_type in self._entities[entity]:
self._components[component_type].discard(entity)
if not self._components[component_type]:
del self._components[component_type]
del self._entities[entity]
self.clear_cache()
else:
self._dead_entities.add(entity)
def entity_exists(self, entity: int) -> bool:
"""Check if a specific entity exists.
Empty entities(with no components) and dead entities(destroyed
by delete_entity) will not count as existent ones.
:param entity: The Entity ID to check existance for.
:return: True if the entity exists, False otherwise.
"""
return entity in self._entities and entity not in self._dead_entities
def component_for_entity(self, entity: int, component_type: _Type[C]) -> C:
"""Retrieve a Component instance for a specific Entity.
Retrieve a Component instance for a specific Entity. In some cases,
it may be necessary to access a specific Component instance.
For example: directly modifying a Component to handle user input.
Raises a KeyError if the given Entity and Component do not exist.
:param entity: The Entity ID to retrieve the Component for.
:param component_type: The Component instance you wish to retrieve.
:return: The Component instance requested for the given Entity ID.
"""
return self._entities[entity][component_type]
def components_for_entity(self, entity: int) -> _Tuple[C, ...]:
"""Retrieve all Components for a specific Entity, as a Tuple.
Retrieve all Components for a specific Entity. The method is probably
not appropriate to use in your Processors, but might be useful for
saving state, or passing specific Components between World instances.
Unlike most other methods, this returns all of the Components as a
Tuple in one batch, instead of returning a Generator for iteration.
Raises a KeyError if the given entity does not exist in the database.
:param entity: The Entity ID to retrieve the Components for.
:return: A tuple of all Component instances that have been
assigned to the passed Entity ID.
"""
return tuple(self._entities[entity].values())
def has_component(self, entity: int, component_type: _Any) -> bool:
"""Check if a specific Entity has a Component of a certain type.
:param entity: The Entity you are querying.
:param component_type: The type of Component to check for.
:return: True if the Entity has a Component of this type,
otherwise False
"""
return component_type in self._entities[entity]
def has_components(self, entity: int, *component_types: _Any) -> bool:
"""Check if an Entity has all of the specified Component types.
:param entity: The Entity you are querying.
:param component_types: Two or more Component types to check for.
:return: True if the Entity has all of the Components,
otherwise False
"""
return all(comp_type in self._entities[entity] for comp_type in component_types)
def add_component(self, entity: int, component_instance: _Any, type_alias: _Type = None) -> None:
"""Add a new Component instance to an Entity.
Add a Component instance to an Entiy. If a Component of the same type
is already assigned to the Entity, it will be replaced.
:param entity: The Entity to associate the Component with.
:param component_instance: A Component instance.
"""
component_type = type_alias or type(component_instance)
if component_type not in self._components:
self._components[component_type] = set()
self._components[component_type].add(entity)
if entity not in self._entities:
self._entities[entity] = {}
self._entities[entity][component_type] = component_instance
self.clear_cache()
def remove_component(self, entity: int, component_type: _Any) -> int:
"""Remove a Component instance from an Entity, by type.
A Component instance can be removed by providing it's type.
For example: world.delete_component(enemy_a, Velocity) will remove
the Velocity instance from the Entity enemy_a.
Raises a KeyError if either the given entity or Component type does
not exist in the database.
:param entity: The Entity to remove the Component from.
:param component_type: The type of the Component to remove.
"""
self._components[component_type].discard(entity)
if not self._components[component_type]:
del self._components[component_type]
del self._entities[entity][component_type]
if not self._entities[entity]:
del self._entities[entity]
self.clear_cache()
return entity
def _get_component(self, component_type: _Type[C]) -> _Iterable[_Tuple[int, C]]:
"""Get an iterator for Entity, Component pairs.
:param component_type: The Component type to retrieve.
:return: An iterator for (Entity, Component) tuples.
"""
entity_db = self._entities
for entity in self._components.get(component_type, []):
yield entity, entity_db[entity][component_type]
def _get_components(self, *component_types: _Type) -> _Iterable[_Tuple[int, ...]]:
"""Get an iterator for Entity and multiple Component sets.
:param component_types: Two or more Component types.
:return: An iterator for Entity, (Component1, Component2, etc)
tuples.
"""
entity_db = self._entities
comp_db = self._components
try:
for entity in set.intersection(*[comp_db[ct] for ct in component_types]):
yield entity, [entity_db[entity][ct] for ct in component_types]
except KeyError:
pass
@_lru_cache()
def get_component(self, component_type: _Type[C]) -> _List[_Tuple[int, C]]:
return [query for query in self._get_component(component_type)]
@_lru_cache()
def get_components(self, *component_types: _Type):
return [query for query in self._get_components(*component_types)]
def try_component(self, entity: int, component_type: _Type):
"""Try to get a single component type for an Entity.
This method will return the requested Component if it exists, but
will pass silently if it does not. This allows a way to access
optional Components that may or may not exist, without having to
first querty the Entity to see if it has the Component type.
:param entity: The Entity ID to retrieve the Component for.
:param component_type: The Component instance you wish to retrieve.
:return: A iterator containg the single Component instance requested,
which is empty if the component doesn't exist.
"""
if component_type in self._entities[entity]:
yield self._entities[entity][component_type]
else:
return None
def try_components(self, entity: int, *component_types: _Type):
"""Try to get a multiple component types for an Entity.
This method will return the requested Components if they exist, but
will pass silently if they do not. This allows a way to access
optional Components that may or may not exist, without first having
to query if the entity has the Component types.
:param entity: The Entity ID to retrieve the Component for.
:param component_types: The Components types you wish to retrieve.
:return: A iterator containg the multiple Component instances requested,
which is empty if the components do not exist.
"""
if all(comp_type in self._entities[entity] for comp_type in component_types):
yield [self._entities[entity][comp_type] for comp_type in component_types]
else:
return None
def _clear_dead_entities(self):
"""Finalize deletion of any Entities that are marked dead.
In the interest of performance, this method duplicates code from the
`delete_entity` method. If that method is changed, those changes should
be duplicated here as well.
"""
for entity in self._dead_entities:
for component_type in self._entities[entity]:
self._components[component_type].discard(entity)
if not self._components[component_type]:
del self._components[component_type]
del self._entities[entity]
self._dead_entities.clear()
self.clear_cache()
def _process(self, *args, **kwargs):
for processor in self._processors:
processor.process(*args, **kwargs)
def _timed_process(self, *args, **kwargs):
"""Track Processor execution time for benchmarking."""
for processor in self._processors:
start_time = _time.process_time()
processor.process(*args, **kwargs)
process_time = int(round((_time.process_time() - start_time) * 1000, 2))
self.process_times[processor.__class__.__name__] = process_time
def process(self, *args, **kwargs):
"""Call the process method on all Processors, in order of their priority.
Call the *process* method on all assigned Processors, respecting their
optional priority setting. In addition, any Entities that were marked
for deletion since the last call to *World.process*, will be deleted
at the start of this method call.
:param args: Optional arguments that will be passed through to the
*process* method of all Processors.
"""
self._clear_dead_entities()
self._process(*args, **kwargs)