"""
Pure Python fallback implementation of EventEngine for cross-platform compatibility.
This module provides Python-native implementations of EventEngine and EventEngineEx
that work on Windows and other platforms without requiring Cython/C extensions.
Uses:
- threading.Lock, threading.Condition for synchronization
- dict for topic-to-hook mappings (instead of ByteMap)
- Topic, MessagePayload, EventHook from c_topic and c_event modules
"""
import threading
from collections import deque
from datetime import datetime, timedelta
from logging import Logger
from threading import Thread
from time import sleep, time
from typing import Optional
from .c_event import EventHook, EventHookEx, MessagePayload
from .c_topic import Topic
from ..base import LOGGER
LOGGER = LOGGER.getChild('Event')
# Default constants (matching c_engine defaults)
DEFAULT_MQ_CAPACITY = 0x0fff
DEFAULT_MQ_SPIN_LIMIT = 0xffff
DEFAULT_MQ_TIMEOUT_SECONDS = 1.0
[docs]
class Full(Exception):
"""Exception raised when the message queue is full."""
pass
[docs]
class Empty(Exception):
"""Exception raised when the message queue is empty."""
pass
[docs]
class EventEngine:
"""
Pure Python implementation of EventEngine.
Uses dict-based topic-to-hook mappings and threading primitives for synchronization.
Compatible with Windows and all platforms supporting the threading module.
"""
[docs]
def __init__(self, capacity: int = DEFAULT_MQ_CAPACITY, logger: Optional[Logger] = None):
"""
Initialize EventEngine.
Args:
capacity: Maximum capacity of the message queue
logger: Optional logger instance
"""
self.logger = LOGGER.getChild('EventEngine') if logger is None else logger
# Message queue using deque with threading primitives
self._capacity = capacity
self._queue = deque(maxlen=capacity)
self._lock = threading.Lock()
self._not_empty = threading.Condition(self._lock)
self._not_full = threading.Condition(self._lock)
# Topic-to-hook mappings using dict (key: topic.value string)
self._exact_topic_hooks: dict[str, EventHook] = {}
self._generic_topic_hooks: dict[str, EventHook] = {}
# Sequence ID counter
self._seq_id = 0
# Engine state
self.active = False
self.engine: Optional[Thread] = None
[docs]
def __len__(self) -> int:
"""Return the total number of registered hooks."""
return len(self._exact_topic_hooks) + len(self._generic_topic_hooks)
[docs]
def __repr__(self) -> str:
"""String representation of the engine."""
return f'<{self.__class__.__name__} {"active" if self.active else "idle"}>(capacity={self.capacity})'
def _loop(self):
"""Main event loop - consumes and processes messages."""
while self.active:
msg = self._get_message(block=True, timeout=DEFAULT_MQ_TIMEOUT_SECONDS)
if msg is not None:
self._trigger(msg)
def _get_message(self, block: bool = True, timeout: float = 0.0) -> Optional[MessagePayload]:
"""
Get a message from the queue.
Args:
block: Whether to block if queue is empty
timeout: Timeout in seconds (0 means no timeout)
Returns:
MessagePayload or None if timeout/non-blocking and queue is empty
"""
with self._not_empty:
if not block:
if len(self._queue) == 0:
return None
return self._queue.popleft()
# Blocking mode
if timeout > 0:
end_time = time() + timeout
while len(self._queue) == 0 and self.active:
remaining = end_time - time()
if remaining <= 0:
return None
self._not_empty.wait(timeout=remaining)
else:
# No timeout - wait indefinitely
while len(self._queue) == 0 and self.active:
self._not_empty.wait(timeout=DEFAULT_MQ_TIMEOUT_SECONDS)
if len(self._queue) == 0:
return None
msg = self._queue.popleft()
self._not_full.notify()
return msg
def _publish(self, topic: Topic, args: tuple, kwargs: dict, block: bool = True, max_spin: int = DEFAULT_MQ_SPIN_LIMIT, timeout: float = 0.0) -> int:
"""
Publish a message to the queue.
Args:
topic: Topic for the message
args: Positional arguments
kwargs: Keyword arguments
block: Whether to block if queue is full
max_spin: Spin limit (ignored in pure Python implementation)
timeout: Timeout in seconds
Returns:
0 on success, non-zero on failure
"""
if not topic.is_exact:
raise ValueError('Topic must be all of exact parts')
# Create payload
payload = MessagePayload(alloc=True)
payload.topic = topic
payload.args = args
payload.kwargs = kwargs
payload.seq_id = self._seq_id
# Note: args_owner and kwargs_owner must be False when publishing
payload.args_owner = False
payload.kwargs_owner = False
with self._not_full:
if not block:
if len(self._queue) >= self._capacity:
return 1 # Queue full
self._queue.append(payload)
self._seq_id += 1
self._not_empty.notify()
return 0
# Blocking mode
if timeout > 0:
end_time = time() + timeout
while len(self._queue) >= self._capacity:
remaining = end_time - time()
if remaining <= 0:
return 1 # Timeout
self._not_full.wait(timeout=remaining)
else:
# No timeout - wait indefinitely
while len(self._queue) >= self._capacity:
self._not_full.wait(timeout=DEFAULT_MQ_TIMEOUT_SECONDS)
if len(self._queue) >= self._capacity:
return 1 # Failed
self._queue.append(payload)
self._seq_id += 1
self._not_empty.notify()
return 0
def _trigger(self, msg: MessagePayload):
"""
Trigger event hooks matching the message topic.
Args:
msg: Message payload to dispatch
"""
msg_topic = msg.topic
# Step 1: Match exact topic hooks
hook = self._exact_topic_hooks.get(msg_topic.value)
if hook:
hook.trigger(msg)
# Step 2: Match generic topic hooks (wildcards, patterns, etc.)
for hook_topic_str, hook in self._generic_topic_hooks.items():
# Use topic matching from Topic
if hook.topic.match(msg_topic).matched:
hook.trigger(msg)
def _register_hook(self, hook: EventHook):
"""
Register an event hook.
Args:
hook: EventHook to register
"""
topic = hook.topic
topic_str = topic.value
if topic.is_exact:
if topic_str in self._exact_topic_hooks and self._exact_topic_hooks[topic_str] is not hook:
raise KeyError(f'Another EventHook already registered for {topic_str}')
self._exact_topic_hooks[topic_str] = hook
else:
if topic_str in self._generic_topic_hooks and self._generic_topic_hooks[topic_str] is not hook:
raise KeyError(f'Another EventHook already registered for {topic_str}')
self._generic_topic_hooks[topic_str] = hook
def _unregister_hook(self, topic: Topic) -> EventHook:
"""
Unregister an event hook by topic.
Args:
topic: Topic of the hook to unregister
Returns:
The unregistered EventHook
"""
topic_str = topic.value
if topic.is_exact:
if topic_str not in self._exact_topic_hooks:
raise KeyError(f'No EventHook registered for {topic_str}')
return self._exact_topic_hooks.pop(topic_str)
else:
if topic_str not in self._generic_topic_hooks:
raise KeyError(f'No EventHook registered for {topic_str}')
return self._generic_topic_hooks.pop(topic_str)
def _register_handler(self, topic: Topic, handler, deduplicate: bool = False):
"""
Register a handler for a topic (creates hook if needed).
Args:
topic: Topic to register handler for
handler: Callable handler
deduplicate: Whether to skip if handler already registered
"""
topic_str = topic.value
if topic.is_exact:
hook_map = self._exact_topic_hooks
else:
hook_map = self._generic_topic_hooks
if topic_str not in hook_map:
hook = EventHook(topic, self.logger)
hook_map[topic_str] = hook
else:
hook = hook_map[topic_str]
hook.add_handler(handler, deduplicate=deduplicate)
def _unregister_handler(self, topic: Topic, handler):
"""
Unregister a handler from a topic.
Args:
topic: Topic to unregister handler from
handler: Callable handler to remove
"""
topic_str = topic.value
if topic.is_exact:
hook_map = self._exact_topic_hooks
else:
hook_map = self._generic_topic_hooks
if topic_str not in hook_map:
raise KeyError(f'No EventHook registered for {topic_str}')
hook = hook_map[topic_str]
hook.remove_handler(handler)
# Remove hook if no handlers left
if len(hook) == 0:
del hook_map[topic_str]
def _clear(self):
"""Clear all hooks and handlers."""
# Clear all hooks
for hook in self._exact_topic_hooks.values():
hook.clear()
self._exact_topic_hooks.clear()
for hook in self._generic_topic_hooks.values():
hook.clear()
self._generic_topic_hooks.clear()
# --- Public API ---
[docs]
def activate(self):
"""Activate the engine (sets active flag to True)."""
self.active = True
[docs]
def deactivate(self):
"""Deactivate the engine (sets active flag to False)."""
self.active = False
[docs]
def run(self):
"""Run the event loop (blocking call)."""
self._loop()
[docs]
def start(self):
"""Start the engine in a background thread."""
if self.active:
self.logger.warning(f'{self} already started!')
return
self.active = True
self.engine = Thread(target=self.run, name='EventEngine')
self.engine.start()
self.logger.info(f'{self} started.')
[docs]
def stop(self):
"""Stop the engine and wait for the background thread to finish."""
if not self.active:
self.logger.warning('EventEngine already stopped!')
return
self.active = False
# Wake up the event loop if it's waiting
with self._not_empty:
self._not_empty.notify_all()
if self.engine:
self.engine.join()
[docs]
def clear(self):
"""Clear all hooks and handlers (engine must be stopped first)."""
if self.active:
self.logger.error('EventEngine must be stopped before cleared!')
return
self._clear()
[docs]
def get(self, block: bool = True, max_spin: int = DEFAULT_MQ_SPIN_LIMIT, timeout: float = 0.0) -> MessagePayload:
"""
Get a message from the queue.
Args:
block: Whether to block if queue is empty
max_spin: Spin limit (ignored in pure Python)
timeout: Timeout in seconds
Returns:
MessagePayload
Raises:
Empty: If no message available
"""
msg = self._get_message(block=block, timeout=timeout)
if msg is None:
raise Empty()
# When getting, payload must have args_owner = kwargs_owner = True
msg.args_owner = True
msg.kwargs_owner = True
return msg
[docs]
def put(self, topic: Topic, *args, block: bool = True, max_spin: int = DEFAULT_MQ_SPIN_LIMIT, timeout: float = 0.0, **kwargs):
"""
Put a message into the queue.
Args:
topic: Topic for the message
*args: Positional arguments
block: Whether to block if queue is full
max_spin: Spin limit (ignored)
timeout: Timeout in seconds
**kwargs: Keyword arguments
Raises:
Full: If queue is full
"""
ret_code = self._publish(topic, args, kwargs, block, max_spin, timeout)
if ret_code:
raise Full()
[docs]
def publish(self, topic: Topic, args: tuple, kwargs: dict, block: bool = True, max_spin: int = DEFAULT_MQ_SPIN_LIMIT, timeout: float = 0.0):
"""
Publish a message to the queue.
Args:
topic: Topic for the message
args: Positional arguments tuple
kwargs: Keyword arguments dict
block: Whether to block if queue is full
max_spin: Spin limit (ignored)
timeout: Timeout in seconds
Raises:
Full: If queue is full
"""
ret_code = self._publish(topic, args, kwargs, block, max_spin, timeout)
if ret_code:
raise Full()
[docs]
def register_hook(self, hook: EventHook):
"""
Register an event hook.
Args:
hook: EventHook instance to register
"""
self._register_hook(hook)
[docs]
def unregister_hook(self, topic: Topic) -> EventHook:
"""
Unregister a hook by topic.
Args:
topic: Topic of the hook to unregister
Returns:
The unregistered EventHook
"""
return self._unregister_hook(topic)
[docs]
def register_handler(self, topic: Topic, handler, deduplicate: bool = False):
"""
Register a handler for a topic.
Args:
topic: Topic to register handler for
handler: Callable handler
deduplicate: Skip if handler already registered
"""
self._register_handler(topic, handler, deduplicate)
[docs]
def unregister_handler(self, topic: Topic, handler):
"""
Unregister a handler from a topic.
Args:
topic: Topic to unregister handler from
handler: Callable handler to remove
"""
self._unregister_handler(topic, handler)
[docs]
def event_hooks(self):
"""Iterate over all registered event hooks."""
yield from self._exact_topic_hooks.values()
yield from self._generic_topic_hooks.values()
[docs]
def topics(self):
"""Iterate over all registered topics."""
for hook in self.event_hooks():
yield hook.topic
[docs]
def items(self):
"""Iterate over (topic, hook) pairs."""
for hook in self.event_hooks():
yield (hook.topic, hook)
@property
def capacity(self) -> int:
"""Get the queue capacity."""
return self._capacity
@property
def occupied(self) -> int:
"""Get the current number of messages in the queue."""
with self._lock:
return len(self._queue)
@property
def exact_topic_hook_map(self) -> dict:
"""Get the exact topic hook mapping."""
return self._exact_topic_hooks.copy()
@property
def generic_topic_hook_map(self) -> dict:
"""Get the generic topic hook mapping."""
return self._generic_topic_hooks.copy()
[docs]
class EventEngineEx(EventEngine):
"""
Extended EventEngine with timer support.
Provides timer functionality for periodic event triggering.
"""
[docs]
def __init__(self, capacity: int = DEFAULT_MQ_CAPACITY, logger: Optional[Logger] = None):
"""
Initialize EventEngineEx.
Args:
capacity: Maximum capacity of the message queue
logger: Optional logger instance
"""
super().__init__(capacity, logger)
self.timer: dict[float, Thread] = {}
[docs]
def __repr__(self) -> str:
"""String representation including active timers."""
return (f'<{self.__class__.__name__} {"active" if self.active else "idle"}>'
f'(capacity={self.capacity}, timers={list(self.timer.keys())})')
def _timer_loop(self, interval: float, topic: Topic, activate_time: Optional[datetime]):
"""
Timer loop for custom intervals.
Args:
interval: Interval in seconds
topic: Topic to publish on
activate_time: Optional activation time
"""
if activate_time is None:
scheduled_time = datetime.now()
else:
scheduled_time = activate_time
kwargs = {'interval': interval, 'trigger_time': scheduled_time}
while self.active:
sleep_time = (scheduled_time - datetime.now()).total_seconds()
if sleep_time > 0:
sleep(sleep_time)
self._publish(topic, (), kwargs, True, DEFAULT_MQ_SPIN_LIMIT, 0.0)
while scheduled_time < datetime.now():
scheduled_time += timedelta(seconds=interval)
kwargs['trigger_time'] = scheduled_time
def _minute_timer_loop(self, topic: Topic):
"""
Minute-aligned timer loop.
Args:
topic: Topic to publish on
"""
kwargs = {'interval': 60}
while self.active:
t = time()
scheduled_time = t // 60 * 60
next_time = scheduled_time + 60
sleep_time = next_time - t
sleep(sleep_time)
kwargs['timestamp'] = scheduled_time
self._publish(topic, (), kwargs, True, DEFAULT_MQ_SPIN_LIMIT, 0.0)
def _second_timer_loop(self, topic: Topic):
"""
Second-aligned timer loop.
Args:
topic: Topic to publish on
"""
kwargs = {'interval': 1}
while self.active:
t = time()
scheduled_time = t // 1
next_time = scheduled_time + 1
sleep_time = next_time - t
sleep(sleep_time)
kwargs['timestamp'] = scheduled_time
self._publish(topic, (), kwargs, True, DEFAULT_MQ_SPIN_LIMIT, 0.0)
[docs]
def run_timer(self, interval: float, topic: Topic, activate_time: Optional[datetime] = None):
"""
Run a timer with custom interval (blocking call).
Args:
interval: Interval in seconds
topic: Topic to publish on
activate_time: Optional activation time
"""
if not self.active:
raise RuntimeError('EventEngine must be started before getting timer!')
self._timer_loop(interval, topic, activate_time)
[docs]
def minute_timer(self, topic: Topic):
"""
Run minute-aligned timer (blocking call).
Args:
topic: Topic to publish on
"""
if not self.active:
raise RuntimeError('EventEngine must be started before getting timer!')
self._minute_timer_loop(topic)
[docs]
def second_timer(self, topic: Topic):
"""
Run second-aligned timer (blocking call).
Args:
topic: Topic to publish on
"""
if not self.active:
raise RuntimeError('EventEngine must be started before getting timer!')
self._second_timer_loop(topic)
[docs]
def get_timer(self, interval: float, activate_time: Optional[datetime] = None) -> Topic:
"""
Get or create a timer with the specified interval.
Args:
interval: Interval in seconds
activate_time: Optional activation time
Returns:
Topic for the timer
"""
if not self.active:
raise RuntimeError('EventEngine must be started before getting timer!')
if interval == 1:
topic = Topic('EventEngine.Internal.Timer.Second')
timer = Thread(target=self.second_timer, kwargs={'topic': topic})
elif interval == 60:
topic = Topic('EventEngine.Internal.Timer.Minute')
timer = Thread(target=self.minute_timer, kwargs={'topic': topic})
else:
topic = Topic.join(['EventEngine', 'Internal', 'Timer', str(interval)])
timer = Thread(target=self.run_timer,
kwargs={'interval': interval, 'topic': topic, 'activate_time': activate_time})
if interval not in self.timer:
self.timer[interval] = timer
timer.start()
else:
if activate_time is not None:
self.logger.debug(
f'Timer thread with interval [{timedelta(seconds=interval)}] already initialized! '
f'Argument [activate_time] takes no effect!'
)
return topic
[docs]
def stop(self):
"""Stop the engine and all timers."""
super().stop()
for timer in self.timer.values():
timer.join()
[docs]
def clear(self):
"""Clear all hooks and timers."""
super().clear()
for t in self.timer.values():
t.join(timeout=0)
self.timer.clear()