"""
Native Python fallback implementation for EventEngine and EventEngineEx.
This module provides a pure Python implementation that mimics the behavior of the
Cython-based c_engine module. It is used as a fallback when the Cython extension
cannot be compiled (e.g., due to lack of Cython, GCC, or Clang).
The API is designed to match event_engine.capi.c_engine as closely as possible.
"""
from __future__ import annotations
import threading
from collections import deque
from datetime import datetime, timedelta
from logging import Logger
from threading import Thread
from time import sleep, time
from typing import Optional
from .event import EventHook, EventHookEx, PyMessagePayload
from .topic import PyTopic
# Get logger from base module
try:
from ..base import LOGGER
except ImportError:
import logging
LOGGER = logging.getLogger(__name__)
LOGGER = LOGGER.getChild('Event')
# Default constants (matching c_engine defaults)
DEFAULT_MQ_CAPACITY = 0x0fff # 4095
DEFAULT_MQ_SPIN_LIMIT = 0xffff # 65535
DEFAULT_MQ_TIMEOUT_SECONDS = 1.0
[docs]
class Full(Exception):
"""Raised when attempting to publish to a full event queue."""
pass
[docs]
class Empty(Exception):
"""Raised when attempting to retrieve from an empty event queue."""
pass
[docs]
class EventEngine:
"""
High-performance, topic-driven event dispatcher backed by threading primitives.
The engine manages an internal message queue and dispatches events to registered handlers
based on topic matching rules. In this native Python implementation:
- A threading-based event loop consumes messages and triggers callbacks
- Message queue uses deque with threading.Lock and Condition variables
- Two dict instances for exact and generic topic routing
**Matching priority**: exact topic matches take precedence over generic matches.
Exact matches are based on the topic's literal key.
Generic matches are evaluated by testing whether the published topic matches a registered pattern.
Attributes:
capacity (int): Maximum number of messages the internal queue can hold.
logger (Logger): Logger instance used for diagnostics.
"""
__slots__ = ('logger', '_capacity', '_queue', '_lock', '_not_empty', '_not_full', '_exact_topic_hooks', '_generic_topic_hooks', '_seq_id', 'active', 'engine')
[docs]
def __init__(self, capacity: int = DEFAULT_MQ_CAPACITY, logger: Optional[Logger] = None) -> None:
"""
Initialize an ``EventEngine``.
Args:
capacity: Maximum number of pending messages.
logger: Optional logger. If ``None``, a default logger is created.
"""
self.logger: Logger = LOGGER.getChild('EventEngine') if logger is None else logger
# Message queue using deque with threading primitives
self._capacity: int = capacity
self._queue: deque[PyMessagePayload] = deque()
self._lock: threading.Lock = threading.Lock()
self._not_empty: threading.Condition = threading.Condition(self._lock)
self._not_full: threading.Condition = threading.Condition(self._lock)
# Topic-to-hook mappings using dict (key: topic.value string)
self._exact_topic_hooks: dict[str, EventHook] = {}
self._generic_topic_hooks: dict[str, EventHook] = {}
# Sequence ID counter
self._seq_id: int = 0
# Engine state
self.active: bool = False
self.engine: Optional[Thread] = None
[docs]
def __len__(self) -> int:
"""Return the total number of registered topics (both exact and generic)."""
return len(self._exact_topic_hooks) + len(self._generic_topic_hooks)
[docs]
def __repr__(self) -> str:
"""String representation of the engine."""
status = "active" if self.active else "idle"
return f'<{self.__class__.__name__} {status}>(capacity={self.capacity})'
def _loop(self) -> None:
"""Main event loop - consumes and processes messages."""
while self.active:
msg = self._get_message(block=True, timeout=DEFAULT_MQ_TIMEOUT_SECONDS)
if msg is not None:
self._trigger(msg)
def _get_message(self, block: bool = True, timeout: float = 0.0) -> Optional[PyMessagePayload]:
"""
Get a message from the queue.
Args:
block: Whether to block if queue is empty
timeout: Timeout in seconds (0 means no timeout)
Returns:
PyMessagePayload or None if timeout/non-blocking and queue is empty
"""
with self._not_empty:
if not block:
if len(self._queue) == 0:
return None
msg = self._queue.popleft()
self._not_full.notify()
return msg
# Blocking mode
if timeout > 0:
end_time = time() + timeout
while len(self._queue) == 0 and self.active:
remaining = end_time - time()
if remaining <= 0:
return None
self._not_empty.wait(timeout=remaining)
else:
# No timeout - wait indefinitely
while len(self._queue) == 0 and self.active:
self._not_empty.wait(timeout=DEFAULT_MQ_TIMEOUT_SECONDS)
if len(self._queue) == 0:
return None
msg = self._queue.popleft()
self._not_full.notify()
return msg
def _publish(self, topic: PyTopic, args: tuple, kwargs: dict, block: bool = True, max_spin: int = DEFAULT_MQ_SPIN_LIMIT, timeout: float = 0.0) -> int:
"""
Publish a message to the queue.
Args:
topic: Topic for the message
args: Positional arguments
kwargs: Keyword arguments
block: Whether to block if queue is full
max_spin: Spin limit (ignored in pure Python implementation)
timeout: Timeout in seconds
Returns:
0 on success, non-zero on failure
"""
if not topic.is_exact:
raise ValueError('Topic must be all of exact parts')
# Create payload
payload = PyMessagePayload(alloc=True)
payload.topic = topic
payload.args = args
payload.kwargs = kwargs
payload.seq_id = self._seq_id
with self._not_full:
if not block:
if len(self._queue) >= self._capacity:
return 1 # Queue full
self._queue.append(payload)
self._seq_id += 1
self._not_empty.notify()
return 0
# Blocking mode
if timeout > 0:
end_time = time() + timeout
while len(self._queue) >= self._capacity:
remaining = end_time - time()
if remaining <= 0:
return 1 # Timeout
self._not_full.wait(timeout=remaining)
else:
# No timeout - wait indefinitely
while len(self._queue) >= self._capacity:
self._not_full.wait(timeout=DEFAULT_MQ_TIMEOUT_SECONDS)
if len(self._queue) >= self._capacity:
return 1 # Failed
self._queue.append(payload)
self._seq_id += 1
self._not_empty.notify()
return 0
def _trigger(self, msg: PyMessagePayload) -> None:
"""
Trigger event hooks matching the message topic.
Args:
msg: Message payload to dispatch
"""
msg_topic = msg.topic
# Step 1: Match exact topic hooks
hook = self._exact_topic_hooks.get(msg_topic.value)
if hook:
hook.trigger(msg)
# Step 2: Match generic topic hooks (wildcards, patterns, etc.)
for hook in self._generic_topic_hooks.values():
# Use topic matching from PyTopic
if hook.topic.match(msg_topic).matched:
hook.trigger(msg)
def _register_hook(self, hook: EventHook) -> None:
"""
Register an event hook.
Args:
hook: EventHook to register
Raises:
KeyError: If a hook is already registered for the same topic
"""
topic = hook.topic
topic_str = topic.value
if topic.is_exact:
if topic_str in self._exact_topic_hooks:
raise KeyError(f'Another EventHook already registered for {topic_str}')
self._exact_topic_hooks[topic_str] = hook
else:
if topic_str in self._generic_topic_hooks:
raise KeyError(f'Another EventHook already registered for {topic_str}')
self._generic_topic_hooks[topic_str] = hook
def _unregister_hook(self, topic: PyTopic) -> EventHook:
"""
Unregister an event hook by topic.
Args:
topic: Topic of the hook to unregister
Returns:
The unregistered EventHook
Raises:
KeyError: If no hook is registered for the given topic
"""
topic_str = topic.value
if topic.is_exact:
if topic_str not in self._exact_topic_hooks:
raise KeyError(f'No EventHook registered for {topic_str}')
return self._exact_topic_hooks.pop(topic_str)
else:
if topic_str not in self._generic_topic_hooks:
raise KeyError(f'No EventHook registered for {topic_str}')
return self._generic_topic_hooks.pop(topic_str)
def _register_handler(self, topic: PyTopic, handler, deduplicate: bool = False) -> None:
"""
Register a handler for a topic (creates hook if needed).
Args:
topic: Topic to register handler for
handler: Callable handler
deduplicate: Whether to skip if handler already registered
"""
topic_str = topic.value
if topic.is_exact:
hook_map = self._exact_topic_hooks
else:
hook_map = self._generic_topic_hooks
if topic_str not in hook_map:
hook = EventHook(topic, self.logger)
hook_map[topic_str] = hook
else:
hook = hook_map[topic_str]
hook.add_handler(handler, deduplicate=deduplicate)
def _unregister_handler(self, topic: PyTopic, handler) -> None:
"""
Unregister a handler from a topic.
Args:
topic: Topic to unregister handler from
handler: Callable handler to remove
Raises:
KeyError: If no hook is registered for the given topic
"""
topic_str = topic.value
if topic.is_exact:
hook_map = self._exact_topic_hooks
else:
hook_map = self._generic_topic_hooks
if topic_str not in hook_map:
raise KeyError(f'No EventHook registered for {topic_str}')
hook = hook_map[topic_str]
hook.remove_handler(handler)
# Remove hook if no handlers left
if len(hook) == 0:
del hook_map[topic_str]
def _clear(self) -> None:
"""Clear all hooks and handlers."""
# Clear all hooks
for hook in self._exact_topic_hooks.values():
hook.clear()
self._exact_topic_hooks.clear()
for hook in self._generic_topic_hooks.values():
hook.clear()
self._generic_topic_hooks.clear()
# --- Public API ---
[docs]
def activate(self) -> None:
"""
Activate the event engine.
This method is called automatically when ``start`` is invoked.
It can also be called manually to prepare the engine for operation.
"""
self.active = True
[docs]
def deactivate(self) -> None:
"""
Deactivate the event engine.
This method is called automatically when ``stop`` is invoked.
It can also be called manually to halt the engine's operation.
"""
self.active = False
[docs]
def run(self) -> None:
"""
Run the event loop in the current thread (blocking).
"""
self._loop()
[docs]
def start(self) -> None:
"""
Start the event loop in a dedicated background thread.
If the engine is already running, this method has no effect.
"""
if self.active:
self.logger.warning(f'{self} already started!')
return
self.active = True
self.engine = Thread(target=self.run, name='EventEngine')
self.engine.start()
self.logger.info(f'{self} started.')
[docs]
def stop(self) -> None:
"""
Stop the event loop and wait for the background thread to terminate.
If the engine is already stopped, this method has no effect.
"""
if not self.active:
self.logger.warning('EventEngine already stopped!')
return
self.active = False
# Wake up the event loop if it's waiting
with self._not_empty:
self._not_empty.notify_all()
if self.engine:
self.engine.join()
self.engine = None
[docs]
def clear(self) -> None:
"""
Unregister all event hooks.
Notes:
This method only works when the engine is stopped. If called while running,
an error is logged and no action is taken.
"""
if self.active:
self.logger.error('EventEngine must be stopped before cleared!')
return
self._clear()
[docs]
def get(self, block: bool = True, max_spin: int = DEFAULT_MQ_SPIN_LIMIT, timeout: float = 0.0) -> PyMessagePayload:
"""
Retrieve an event from the internal queue.
Args:
block: If ``True``, wait until an event is available.
max_spin: Maximum number of spin-loop iterations before blocking (ignored in pure Python).
timeout: Maximum wait time in seconds when blocking (``0.0`` means indefinite wait).
Returns:
A ``PyMessagePayload`` instance.
Raises:
Empty: If ``block=False`` and the queue is empty.
"""
msg = self._get_message(block=block, timeout=timeout)
if msg is None:
raise Empty()
return msg
[docs]
def put(self, topic: PyTopic, *args, block: bool = True, max_spin: int = DEFAULT_MQ_SPIN_LIMIT, timeout: float = 0.0, **kwargs) -> None:
"""
Publish an event to the queue (convenience alias for ``publish``).
Args:
topic: Must be an **exact** ``PyTopic`` (i.e., ``topic.is_exact`` must be ``True``).
*args: Positional arguments for the event.
block: If ``True``, wait if the queue is full.
max_spin: Spin count before blocking (ignored in pure Python).
timeout: Maximum wait time in seconds when blocking (``0.0`` = indefinite).
**kwargs: Keyword arguments for the event.
Raises:
Full: If ``block=False`` and the queue is full.
ValueError: If ``topic`` is not an exact topic.
"""
ret_code = self._publish(topic, args, kwargs, block, max_spin, timeout)
if ret_code:
raise Full()
[docs]
def publish(self, topic: PyTopic, args: tuple, kwargs: dict, block: bool = True, timeout: float = 0.0) -> None:
"""
Publish an event to the queue.
Args:
topic: Must be an **exact** ``PyTopic`` (i.e., ``topic.is_exact`` must be ``True``).
args: Positional arguments for the event.
kwargs: Keyword arguments for the event.
block: If ``True``, wait if the queue is full.
timeout: Maximum wait time in seconds when blocking (``0.0`` = indefinite).
Raises:
Full: If ``block=False`` and the queue is full.
ValueError: If ``topic`` is not an exact topic.
"""
ret_code = self._publish(topic, args, kwargs, block, DEFAULT_MQ_SPIN_LIMIT, timeout)
if ret_code:
raise Full()
[docs]
def register_hook(self, hook: EventHook) -> None:
"""
Register an ``EventHook`` for its associated topic.
Args:
hook: The hook to register.
Raises:
KeyError: If a hook is already registered for the same topic (exact or generic).
"""
self._register_hook(hook)
[docs]
def unregister_hook(self, topic: PyTopic) -> EventHook:
"""
Unregister and return the ``EventHook`` associated with a topic.
Args:
topic: The topic to unregister.
Returns:
The unregistered ``EventHook``.
Raises:
KeyError: If no hook is registered for the given topic.
"""
return self._unregister_hook(topic)
[docs]
def register_handler(self, topic: PyTopic, handler, deduplicate: bool = False) -> None:
"""
Register a handler for a topic (creates hook if needed).
Args:
topic: Topic to register handler for
handler: Callable handler
deduplicate: Skip if handler already registered
"""
self._register_handler(topic, handler, deduplicate)
[docs]
def unregister_handler(self, topic: PyTopic, handler) -> None:
"""
Unregister a handler from a topic.
Args:
topic: Topic to unregister handler from
handler: Callable handler to remove
"""
self._unregister_handler(topic, handler)
[docs]
def event_hooks(self):
"""Iterate over all registered event hooks."""
yield from self._exact_topic_hooks.values()
yield from self._generic_topic_hooks.values()
[docs]
def topics(self):
"""Iterate over all registered topics."""
for hook in self.event_hooks():
yield hook.topic
[docs]
def items(self):
"""Iterate over (topic, hook) pairs."""
for hook in self.event_hooks():
yield (hook.topic, hook)
@property
def capacity(self) -> int:
"""Get the queue capacity."""
return self._capacity
@property
def occupied(self) -> int:
"""Get the current number of messages in the queue."""
with self._lock:
return len(self._queue)
@property
def exact_topic_hook_map(self) -> dict:
"""Get a copy of the exact topic hook mapping."""
return self._exact_topic_hooks.copy()
@property
def generic_topic_hook_map(self) -> dict:
"""Get a copy of the generic topic hook mapping."""
return self._generic_topic_hooks.copy()
[docs]
class EventEngineEx(EventEngine):
"""
Extended EventEngine with timer support and statistics tracking.
Provides timer functionality for periodic event triggering and uses EventHookEx
for handler statistics.
"""
__slots__ = ('timer',)
[docs]
def __init__(self, capacity: int = DEFAULT_MQ_CAPACITY, logger: Optional[Logger] = None) -> None:
"""
Initialize EventEngineEx.
Args:
capacity: Maximum capacity of the message queue
logger: Optional logger instance
"""
super().__init__(capacity, logger)
self.timer: dict[float, Thread] = {}
[docs]
def __repr__(self) -> str:
"""String representation including active timers."""
status = "active" if self.active else "idle"
timer_intervals = list(self.timer.keys())
return f'<{self.__class__.__name__} {status}>(capacity={self.capacity}, timers={timer_intervals})'
def _timer_loop(self, interval: float, topic: PyTopic, activate_time: Optional[datetime]) -> None:
"""
Timer loop for custom intervals.
Args:
interval: Interval in seconds
topic: Topic to publish on
activate_time: Optional activation time
"""
if activate_time is None:
scheduled_time = datetime.now()
else:
scheduled_time = activate_time
kwargs = {'interval': interval, 'trigger_time': scheduled_time}
while self.active:
sleep_time = (scheduled_time - datetime.now()).total_seconds()
if sleep_time > 0:
sleep(sleep_time)
self._publish(topic, (), kwargs, True, DEFAULT_MQ_SPIN_LIMIT, 0.0)
while scheduled_time < datetime.now():
scheduled_time += timedelta(seconds=interval)
kwargs['trigger_time'] = scheduled_time
def _minute_timer_loop(self, topic: PyTopic) -> None:
"""
Minute-aligned timer loop.
Args:
topic: Topic to publish on
"""
kwargs = {'interval': 60}
while self.active:
t = time()
scheduled_time = t // 60 * 60
next_time = scheduled_time + 60
sleep_time = next_time - t
sleep(sleep_time)
kwargs['timestamp'] = scheduled_time
self._publish(topic, (), kwargs, True, DEFAULT_MQ_SPIN_LIMIT, 0.0)
def _second_timer_loop(self, topic: PyTopic) -> None:
"""
Second-aligned timer loop.
Args:
topic: Topic to publish on
"""
kwargs = {'interval': 1}
while self.active:
t = time()
scheduled_time = t // 1
next_time = scheduled_time + 1
sleep_time = next_time - t
sleep(sleep_time)
kwargs['timestamp'] = scheduled_time
self._publish(topic, (), kwargs, True, DEFAULT_MQ_SPIN_LIMIT, 0.0)
[docs]
def run_timer(self, interval: float, topic: PyTopic, activate_time: Optional[datetime] = None) -> None:
"""
Run a timer with custom interval (blocking call).
Args:
interval: Interval in seconds
topic: Topic to publish on
activate_time: Optional activation time
Raises:
RuntimeError: If the engine is not active
"""
if not self.active:
raise RuntimeError('EventEngine must be started before getting timer!')
self._timer_loop(interval, topic, activate_time)
[docs]
def minute_timer(self, topic: PyTopic) -> None:
"""
Run minute-aligned timer (blocking call).
Args:
topic: Topic to publish on
Raises:
RuntimeError: If the engine is not active
"""
if not self.active:
raise RuntimeError('EventEngine must be started before getting timer!')
self._minute_timer_loop(topic)
[docs]
def second_timer(self, topic: PyTopic) -> None:
"""
Run second-aligned timer (blocking call).
Args:
topic: Topic to publish on
Raises:
RuntimeError: If the engine is not active
"""
if not self.active:
raise RuntimeError('EventEngine must be started before getting timer!')
self._second_timer_loop(topic)
[docs]
def get_timer(self, interval: float, activate_time: Optional[datetime] = None) -> PyTopic:
"""
Get or create a timer with the specified interval.
Args:
interval: Interval in seconds
activate_time: Optional activation time
Returns:
PyTopic for the timer
Raises:
RuntimeError: If the engine is not active
"""
if not self.active:
raise RuntimeError('EventEngine must be started before getting timer!')
if interval == 1:
topic = PyTopic('EventEngine.Internal.Timer.Second')
timer = Thread(target=self.second_timer, kwargs={'topic': topic})
elif interval == 60:
topic = PyTopic('EventEngine.Internal.Timer.Minute')
timer = Thread(target=self.minute_timer, kwargs={'topic': topic})
else:
topic = PyTopic.join(['EventEngine', 'Internal', 'Timer', str(interval)])
timer = Thread(target=self.run_timer,
kwargs={'interval': interval, 'topic': topic, 'activate_time': activate_time})
if interval not in self.timer:
self.timer[interval] = timer
timer.start()
else:
if activate_time is not None:
self.logger.debug(
f'Timer thread with interval [{timedelta(seconds=interval)}] already initialized! '
f'Argument [activate_time] takes no effect!'
)
return topic
[docs]
def stop(self) -> None:
"""Stop the engine and all timers."""
super().stop()
for timer in self.timer.values():
timer.join()
[docs]
def clear(self) -> None:
"""Clear all hooks and timers."""
super().clear()
for t in self.timer.values():
t.join(timeout=0)
self.timer.clear()