Source code for event_engine.native.event

"""
Native Python fallback implementation for PyMessagePayload and EventHook classes.

This module provides a pure Python implementation that mimics the behavior of the 
Cython-based c_event module. It is used as a fallback when the Cython extension 
cannot be compiled (e.g., due to lack of Cython, GCC, or Clang).

The API is designed to match event_engine.capi.c_event as closely as possible.
"""

from __future__ import annotations

import inspect
import time
import traceback
from collections.abc import Callable, Iterator
from logging import Logger
from typing import TypedDict

from .topic import PyTopic

# Get logger from base module
try:
    from ..base import LOGGER
except ImportError:
    import logging

    LOGGER = logging.getLogger(__name__)

LOGGER = LOGGER.getChild('Event')

# Internal constants
_TOPIC_FIELD_NAME = 'topic'
_TOPIC_UNEXPECTED_ERROR = f"an unexpected keyword argument '{_TOPIC_FIELD_NAME}'"


[docs] class PyMessagePayload: """ Python wrapper for a message payload structure. In native Python, all instances own their underlying data (owner, args_owner, kwargs_owner are always True). """ __slots__ = ('_topic', '_args', '_kwargs', '_seq_id')
[docs] def __init__(self, alloc: bool = False) -> None: """ Initialize a ``PyMessagePayload`` instance. Args: alloc: If ``True``, allocate a new message payload (always True in Python). """ self._topic: PyTopic | None = None self._args: tuple | None = None self._kwargs: dict | None = None self._seq_id: int = 0
[docs] def __repr__(self) -> str: """ Return a string representation of the payload. """ if self._topic: return f'<PyMessagePayload "{self.topic.value}">(seq_id={self.seq_id}, args={self.args}, kwargs={self.kwargs})' return f'<PyMessagePayload NO_TOPIC>(seq_id={self.seq_id}, args={self.args}, kwargs={self.kwargs})'
@property def owner(self) -> bool: """bool: Whether this instance owns the underlying payload (always True in native Python).""" return True @property def args_owner(self) -> bool: """bool: Whether this instance owns the positional arguments (always True in native Python).""" return True @property def kwargs_owner(self) -> bool: """bool: Whether this instance owns the keyword arguments (always True in native Python).""" return True @property def topic(self) -> PyTopic | None: """ The topic associated with this payload. """ return self._topic @topic.setter def topic(self, value: PyTopic) -> None: """Set the topic.""" self._topic = value @property def args(self) -> tuple | None: """ The positional arguments of the payload. """ return self._args @args.setter def args(self, value: tuple) -> None: """Set the positional arguments.""" self._args = value @property def kwargs(self) -> dict | None: """ The keyword arguments of the payload. """ return self._kwargs @kwargs.setter def kwargs(self, value: dict) -> None: """Set the keyword arguments.""" self._kwargs = value @property def seq_id(self) -> int: """ The sequence ID of the payload. """ return self._seq_id @seq_id.setter def seq_id(self, value: int) -> None: """Set the sequence ID.""" self._seq_id = value
[docs] class EventHook: """ Event dispatcher for registering and triggering handlers. Handlers are triggered with a ``PyMessagePayload``. The dispatcher supports two calling conventions: - **With-topic**: the handler receives the topic as a positional or keyword argument. - **No-topic**: the handler receives only ``args`` and ``kwargs`` from the payload. Handlers that accept ``**kwargs`` are recommended to ensure compatibility with both conventions. Attributes: topic (PyTopic): The topic associated with this hook. logger (Logger | None): Optional logger instance. retry_on_unexpected_topic (bool): If ``True``, retries with no-topic calling convention if a with-topic handler raises a ``TypeError`` and the error message indicates an unexpected topic argument. """ __slots__ = ('topic', 'logger', 'retry_on_unexpected_topic', '_handlers_no_topic', '_handlers_with_topic')
[docs] def __init__(self, topic: PyTopic, logger: Logger = None, retry_on_unexpected_topic: bool = False) -> None: """ Initialize an ``EventHook``. Args: topic: The topic associated with this hook. logger: Optional logger instance. retry_on_unexpected_topic: If ``True``, enables retrying on unexpected topic argument errors. """ self.topic: PyTopic = topic self.logger: Logger = LOGGER.getChild(f'EventHook.{topic}') if logger is None else logger self.retry_on_unexpected_topic: bool = retry_on_unexpected_topic self._handlers_no_topic: list[Callable] = [] self._handlers_with_topic: list[Callable] = []
[docs] def __call__(self, msg: PyMessagePayload) -> None: """ Trigger all registered handlers with the given message payload. Alias for method ``trigger``. Args: msg: The message payload to dispatch to handlers. """ self.trigger(msg)
[docs] def __iadd__(self, handler: Callable) -> EventHook: """ Add a handler using the ``+=`` operator. Args: handler: The callable to register. Returns: Self, for chaining. """ self.add_handler(handler) return self
[docs] def __isub__(self, handler: Callable) -> EventHook: """ Remove a handler using the ``-=`` operator. Args: handler: The callable to unregister. Returns: Self, for chaining. """ self.remove_handler(handler) return self
[docs] def __len__(self) -> int: """ Return the number of registered handlers. """ return len(self._handlers_no_topic) + len(self._handlers_with_topic)
[docs] def __repr__(self) -> str: """ Return a string representation of the ``EventHook``. """ return f'<EventHook topic="{self.topic}" handlers={len(self)}>'
[docs] def __iter__(self) -> Iterator[Callable]: """ Iterate over all registered handlers. """ yield from self._handlers_no_topic yield from self._handlers_with_topic
[docs] def __contains__(self, handler: Callable) -> bool: """ Check if a handler is registered. Args: handler: The callable to check. Returns: ``True`` if the handler is registered; ``False`` otherwise. """ return handler in self._handlers_no_topic or handler in self._handlers_with_topic
[docs] def trigger(self, msg: PyMessagePayload) -> None: """ Trigger all registered handlers with the given message payload. Handlers are executed in registration order: 1. All **no-topic** handlers (called with ``*args, **kwargs`` only). 2. All **with-topic** handlers (called with ``topic, *args, **kwargs``). In each group, handlers are invoked in the order they were added. If ``retry_on_unexpected_topic`` flag is on and a with-topic handler raises a ``TypeError`` and the error message indicates an unexpected topic argument, the dispatcher retries the call without the topic. Args: msg: The message payload to dispatch. """ args = msg.args if msg.args is not None else () kwargs = msg.kwargs if msg.kwargs is not None else {} topic = msg.topic # Trigger no-topic handlers first for handler in self._handlers_no_topic: try: handler(*args, **kwargs) except Exception: self.logger.error(traceback.format_exc()) # Trigger with-topic handlers # Create a new kwargs dict with topic added kwargs_with_topic = kwargs.copy() kwargs_with_topic[_TOPIC_FIELD_NAME] = topic for handler in self._handlers_with_topic: try: handler(*args, **kwargs_with_topic) except TypeError as e: # Check if this is an "unexpected keyword argument 'topic'" error if self.retry_on_unexpected_topic and _TOPIC_UNEXPECTED_ERROR in str(e): try: handler(*args, **kwargs) except Exception: self.logger.error(traceback.format_exc()) else: self.logger.error(traceback.format_exc()) except Exception: self.logger.error(traceback.format_exc())
[docs] def add_handler(self, handler: Callable, deduplicate: bool = False) -> None: """ Register a new handler. It is strongly recommended that handlers accept ``**kwargs`` to remain compatible with both with-topic and no-topic calling conventions. Args: handler: The callable to register. deduplicate: If ``True``, skip registration if the handler is already present. """ if not callable(handler): raise TypeError(f'Handler must be callable, got {type(handler)}') # Check if handler is already registered if deduplicate and handler in self: return # Inspect the handler signature to determine if it accepts 'topic' with_topic = False try: sig = inspect.signature(handler) for param in sig.parameters.values(): if param.name == _TOPIC_FIELD_NAME or param.kind == param.VAR_KEYWORD: with_topic = True break except (ValueError, TypeError): # Can't inspect signature, assume no topic pass if with_topic: self._handlers_with_topic.append(handler) else: self._handlers_no_topic.append(handler)
[docs] def remove_handler(self, handler: Callable) -> EventHook: """ Remove a handler from the hook. Only the first matching occurrence is removed. If the same callable was added multiple times, subsequent instances remain registered. Args: handler: The callable to remove. Returns: Self, for chaining. """ try: self._handlers_no_topic.remove(handler) except ValueError: try: self._handlers_with_topic.remove(handler) except ValueError: pass # Handler not found, silently ignore return self
[docs] def clear(self) -> None: """ Remove all registered handlers. """ self._handlers_no_topic.clear() self._handlers_with_topic.clear()
@property def handlers(self) -> list[Callable]: """ List all registered handlers. Handlers are ordered as follows: - First, all no-topic handlers (in registration order). - Then, all with-topic handlers (in registration order). """ return self._handlers_no_topic + self._handlers_with_topic
[docs] class HandlerStats(TypedDict): """Statistics for a handler.""" calls: int total_time: float
[docs] class EventHookEx(EventHook): """ Extended ``EventHook`` that tracks per-handler execution statistics. """ __slots__ = ('_stats',)
[docs] def __init__(self, topic: PyTopic, logger: Logger = None, retry_on_unexpected_topic: bool = False) -> None: """ Initialize an ``EventHookEx``. Args: topic: The topic associated with this hook. logger: Optional logger instance. retry_on_unexpected_topic: If ``True``, enables retrying on unexpected topic argument errors. """ super().__init__(topic, logger, retry_on_unexpected_topic) self._stats: dict[int, HandlerStats] = {}
[docs] def trigger(self, msg: PyMessagePayload) -> None: """ Trigger all registered handlers with the given message payload, tracking execution time. Args: msg: The message payload to dispatch. """ args = msg.args if msg.args is not None else () kwargs = msg.kwargs if msg.kwargs is not None else {} topic = msg.topic # Trigger no-topic handlers first for handler in self._handlers_no_topic: handler_id = id(handler) if handler_id not in self._stats: self._stats[handler_id] = {'calls': 0, 'total_time': 0.0} start_time = time.perf_counter() try: handler(*args, **kwargs) except Exception: self.logger.error(traceback.format_exc()) finally: elapsed = time.perf_counter() - start_time self._stats[handler_id]['calls'] += 1 self._stats[handler_id]['total_time'] += elapsed # Trigger with-topic handlers # Create a new kwargs dict with topic added kwargs_with_topic = kwargs.copy() kwargs_with_topic[_TOPIC_FIELD_NAME] = topic for handler in self._handlers_with_topic: handler_id = id(handler) if handler_id not in self._stats: self._stats[handler_id] = {'calls': 0, 'total_time': 0.0} start_time = time.perf_counter() try: handler(*args, **kwargs_with_topic) except TypeError as e: # Check if this is an "unexpected keyword argument 'topic'" error if self.retry_on_unexpected_topic and _TOPIC_UNEXPECTED_ERROR in str(e): try: handler(*args, **kwargs) except Exception: self.logger.error(traceback.format_exc()) else: self.logger.error(traceback.format_exc()) except Exception: self.logger.error(traceback.format_exc()) finally: elapsed = time.perf_counter() - start_time self._stats[handler_id]['calls'] += 1 self._stats[handler_id]['total_time'] += elapsed
[docs] def get_stats(self, py_callable: Callable) -> HandlerStats | None: """ Retrieve execution statistics for a specific handler. Args: py_callable: The handler to query. Returns: A dictionary with keys ``'calls'`` (number of invocations) and ``'total_time'`` (cumulative execution time in seconds), or ``None`` if the handler is not registered or the HandlerStats is not registered. """ handler_id = id(py_callable) return self._stats.get(handler_id)
@property def stats(self) -> Iterator[tuple[Callable, HandlerStats]]: """ Iterate over all registered handlers and their execution statistics. Returns: An iterator yielding ``(handler, stats_dict)`` pairs. """ for handler in self._handlers_no_topic: handler_id = id(handler) if handler_id in self._stats: yield handler, self._stats[handler_id] for handler in self._handlers_with_topic: handler_id = id(handler) if handler_id in self._stats: yield handler, self._stats[handler_id]
[docs] def clear(self) -> None: """ Remove all registered handlers and clear statistics. """ super().clear() self._stats.clear()