API Reference

This page provides auto-generated API documentation from docstrings and type stubs.

Note

For usage examples and conceptual guides, see:

event_engine Module

Top-level exports and backend selection.

event_engine.get_include() list[str][source]

Topic Classes

event_engine.capi.c_topic

class event_engine.capi.c_topic.Topic

Bases: object

addr
append(topic_part)
format(**kwargs) Topic
format_map(mapping, internalized=True, strict=False)
classmethod from_parts(topic_parts: Iterable[TopicPart]) Topic
is_exact
classmethod join(topic_parts: Iterable[str]) Topic
match(other)
owner
update_literal() Topic
value
class event_engine.capi.c_topic.TopicMatchResult

Bases: object

length
matched
to_dict()
class event_engine.capi.c_topic.TopicPart

Bases: object

addr
next()
owner
ttype
class event_engine.capi.c_topic.TopicPartAny

Bases: TopicPart

name
class event_engine.capi.c_topic.TopicPartExact

Bases: TopicPart

part
class event_engine.capi.c_topic.TopicPartPattern

Bases: TopicPart

pattern
regex
class event_engine.capi.c_topic.TopicPartRange

Bases: TopicPart

options()
class event_engine.capi.c_topic.TopicType(*values)

Bases: IntEnum

TOPIC_PART_EXACT = 0
TOPIC_PART_ANY = 1
TOPIC_PART_RANGE = 2
TOPIC_PART_PATTERN = 3
event_engine.capi.c_topic.get_internal_map()
event_engine.capi.c_topic.get_internal_topic(key, owner=False)

event_engine.native.topic

Native Python fallback implementation for PyTopic and related classes.

This module provides a pure Python implementation that mimics the behavior of the Cython-based c_topic module. It is used as a fallback when the Cython extension cannot be compiled (e.g., due to lack of Cython, GCC, or Clang).

The API is designed to match event_engine.capi.c_topic as closely as possible.

event_engine.native.topic.init_internal_map(default_capacity: int = 1024) dict[str, PyTopic][source]

Initialize or return a shared internal byte map.

Parameters:

default_capacity – Default capacity (in bytes) used when creating the internal map.

Returns:

A ByteMap instance wrapping the shared internal bytemap.

event_engine.native.topic.clear_internal_map() None[source]

Clear and free the shared internal bytemap.

This releases the internal resources and resets any cached references.

event_engine.native.topic.get_internal_topic(key: str, owner: bool = False) PyTopic | None[source]

Get a registered topic from the internal map, if there is any.

Parameters:
  • key – the literal of the topic to look up.

  • owner – whether to return a copy (True) or the original reference (False). In pure Python, ownership is always with the object itself.

Returns:

PyTopic instance if found or None.

event_engine.native.topic.get_internal_map() dict[str, PyTopic][source]

Return a dictionary view of the internal topic map.

Returns:

A dictionary mapping topic literal strings to PyTopic instances.

event_engine.native.topic.init_allocator(init_capacity: int = 4096, with_shm: bool = False) None[source]

Initialize or return the global Allocator.

Parameters:
  • init_capacity – Initial capacity (in bytes) for the allocator. (Ignored in pure Python implementation)

  • with_shm – If True, create an allocator backed by shared memory. (Ignored in pure Python implementation)

Returns:

None (allocator not used in pure Python implementation)

Note

This function is provided for API compatibility with the Cython version, but does nothing in the pure Python implementation.

class event_engine.native.topic.PyTopicType(*values)[source]

Bases: IntEnum

Enumeration of topic part types.

Maps to the underlying C-level TopicType constants.

TOPIC_PART_EXACT = 0
TOPIC_PART_ANY = 1
TOPIC_PART_RANGE = 2
TOPIC_PART_PATTERN = 3
class event_engine.native.topic.PyTopicPart(*args: Any, alloc: bool = False, **kwargs: Any)[source]

Bases: object

Base Python wrapper for a single topic part.

In the native Python implementation, all topic parts own their underlying memory.

__init__(*args: Any, alloc: bool = False, **kwargs: Any) None[source]

Create or attach to a topic part.

Parameters:

alloc – If True, allocate and initialize internal structures.

next() PyTopicPart[source]

Return the next topic part.

Returns:

The next PyTopicPart instance.

Raises:

StopIteration – If this is the last part.

property owner: bool

Whether this Python object owns the underlying memory (always True in native Python).

Type:

bool

property ttype: PyTopicType

The topic part type as a PyTopicType.

Type:

int

property addr: int

Numeric address / id of the underlying C structure.

Type:

int

class event_engine.native.topic.PyTopicPartExact(part: str = None, *args: Any, alloc: bool = False, **kwargs: Any)[source]

Bases: PyTopicPart

Topic part representing an exact literal segment.

__init__(part: str = None, *args: Any, alloc: bool = False, **kwargs: Any) None[source]

Create an exact topic part.

Parameters:
  • part – Literal string to store. If omitted and alloc is True, an empty initialized part is created.

  • alloc – If True, allocate underlying structures.

__repr__() str[source]

Return human-readable representation.

__len__() int[source]

Return the length of the stored literal in bytes.

property part: str

The literal string value for this part.

class event_engine.native.topic.PyTopicPartAny(name: str = None, *args: Any, alloc: bool = False, **kwargs: Any)[source]

Bases: PyTopicPart

Topic part representing a named wildcard.

__init__(name: str = None, *args: Any, alloc: bool = False, **kwargs: Any) None[source]

Create an ‘any’ topic part.

Parameters:
  • name – Optional name for the wildcard.

  • alloc – If True, allocate underlying structures.

__repr__() str[source]

Return human-readable representation.

property name: str

The wildcard name (identifier).

class event_engine.native.topic.PyTopicPartRange(options: list[str] = None, *args: Any, alloc: bool = False, **kwargs: Any)[source]

Bases: PyTopicPart

Topic part representing a range (choice) among multiple literals.

__init__(options: list[str] = None, *args: Any, alloc: bool = False, **kwargs: Any) None[source]

Create a range part.

Parameters:
  • options – List of literal option strings.

  • alloc – If True, allocate underlying structures.

__repr__() str[source]

Return human-readable representation.

__len__() int[source]

Return the number of options.

__iter__() Iterator[str][source]

Iterate over option strings.

options() Iterator[str][source]

Yield option strings in order.

Yields:

Each option as a Python string.

class event_engine.native.topic.PyTopicPartPattern(regex: str = None, *args: Any, alloc: bool = False, **kwargs: Any)[source]

Bases: PyTopicPart

Topic part representing a regex pattern.

__init__(regex: str = None, *args: Any, alloc: bool = False, **kwargs: Any) None[source]

Create a pattern topic part.

Parameters:
  • regex – Regular expression string.

  • alloc – If True, allocate underlying structures.

__repr__() str[source]

Return human-readable representation.

property pattern: str

The raw regex pattern string.

Type:

str

property regex: Pattern

Compiled regex object for the pattern.

Type:

re.Pattern

class event_engine.native.topic.TopicMatchNode[source]

Bases: TypedDict

TypedDict describing a single match node returned by PyTopicMatchResult accessors.

Keys:

matched: Whether this node matched. part_a: The left-side topic part (or None). part_b: The right-side topic part (or None). literal: The literal string associated with this node (if any).

matched: bool
part_a: PyTopicPart | None
part_b: PyTopicPart | None
literal: str | None
class event_engine.native.topic.PyTopicMatchResult(n_parts: int = 0, alloc: bool = False, allocator: Any = None, **kwargs: Any)[source]

Bases: object

Container for topic part match results (linked list-like).

Provides iteration and indexing over match nodes and utilities to convert results.

The public API yields TopicMatchNode entries for per-node accessors.

__init__(n_parts: int = 0, alloc: bool = False, allocator: Any = None, **kwargs: Any) None[source]

Allocate or attach a chain of match result nodes.

Parameters:
  • n_parts – Number of nodes to pre-create.

  • alloc – If True, allocate underlying structures.

  • allocator – Optional Allocator (ignored in pure Python implementation).

__repr__() str[source]

Return a compact representation with success/failure and length.

__bool__() bool[source]

True if all nodes matched.

__len__() int[source]

Return number of nodes in the result chain.

__getitem__(idx: int) TopicMatchNode[source]

Return a single node’s info as a TopicMatchNode.

Parameters:

idx – Index of the node (supports negative indexing).

Returns:

A TopicMatchNode TypedDict containing ‘matched’, ‘part_a’, ‘part_b’, ‘literal’.

Raises:

IndexError – If idx is out of range.

__iter__() Iterator[TopicMatchNode][source]

Iterate over node info dicts in sequence.

to_dict() dict[str, PyTopicPart][source]

Convert match results into a dictionary mapping literal -> matched part.

Returns:

A mapping from literal string to the matched PyTopicPart.

property owner: bool

Whether this Python object owns the underlying memory (always True in native Python).

Type:

bool

property length: int

Number of nodes in the result chain.

Type:

int

property matched: bool

True if every node in the chain reports matched == True.

Type:

bool

class event_engine.native.topic.PyTopic(topic: str = None, *args: Any, alloc: bool = True, allocator: Any = None, **kwargs: Any)[source]

Bases: object

High-level Python representation of a parsed topic.

PyTopic instances internalize their literal content into a shared internal dict. All topics created via the normal constructor are internalized into the global map and do not own the underlying character storage (the buffer is bound to the global map).

__init__(topic: str = None, *args: Any, alloc: bool = True, allocator: Any = None, **kwargs: Any)[source]

Create a PyTopic from a topic string.

Parameters:
  • topic – Topic string to parse.

  • alloc – If True, allocate and initialize (default for normal usage).

  • allocator – Optional allocator (ignored in pure Python implementation).

__len__() int[source]

Return the number of parts in the topic.

__iter__() Iterator[PyTopicPart][source]

Iterate over topic parts (yields PyTopicPart subclasses).

__getitem__(idx: int) PyTopicPart[source]

Return the topic part at index idx.

Supports negative indexing.

Raises:

IndexError – If index is out of range.

__add__(topic: PyTopic) PyTopic[source]
__add__(topic: PyTopicPart) PyTopic

Return a new PyTopic that aggregates this topic with another PyTopic or PyTopicPart.

Behavior:
  • __add__ creates and returns a copy; both operands remain unchanged.

  • Use append or __iadd__ for in-place modifications.

Parameters:

topic – Either a PyTopic or PyTopicPart.

Returns:

A new aggregated PyTopic.

Raises:

TypeError – When the operand type is unsupported.

__iadd__(topic: PyTopic) PyTopic[source]
__iadd__(topic: PyTopicPart) PyTopic

In-place append another PyTopic or PyTopicPart.

Behavior:
  • Modifies self in place and leaves the other operand unchanged.

  • This is equivalent to self.append(…) and returns self.

__hash__()[source]

Return hash based on the topic’s literal value.

The hash is precomputed during initialization for efficiency.

Returns:

A uint64_t hash integer.

__eq__(other: PyTopic) bool[source]

Check equality between this topic and another topic.

Parameters:

other – The other PyTopic to compare against.

Returns:

True if both topics have the same literal value.

__repr__() str[source]

Return string representation.

__str__() str[source]

Return the topic literal value.

__call__(**kwargs) PyTopic[source]

Alias of format method to format the topic by replacing named wildcards with provided values.

classmethod from_parts(topic_parts: Iterable[PyTopicPart]) PyTopic[source]

Build a PyTopic from an iterable of PyTopicPart instances.

classmethod join(topic_parts: Iterable[str]) PyTopic[source]

Build a PyTopic from an iterable of literal strings.

Each string is appended as an exact part.

Notes

  • This is a higher-level helper for simple literal-only topics.

  • For complex parts that need escaping or patterns, use from_parts.

append(topic_part: PyTopicPart) PyTopic[source]

Append a PyTopicPart to this topic (high-level API).

Parameters:

topic_part – The part to append.

Returns:

Self for chaining.

Raises:

RuntimeError – If either topic or part is uninitialized.

match(other: PyTopic) PyTopicMatchResult[source]

Match this topic against another topic.

Parameters:

other – The topic to match against.

Returns:

A PyTopicMatchResult describing per-part matches.

update_literal() PyTopic[source]

Update the internal literal buffer to reflect the current parts.

This is useful after in-place modifications of the subordinate TopicParts. To avoid inconsistencies, call this method to regenerate the internal literal.

Returns:

Self for chaining.

Raises:

RuntimeError – If the topic is uninitialized.

format(**kwargs) PyTopic[source]

Format the topic by replacing named wildcards with provided values.

Parameters:

**kwargs – Mapping from wildcard names to replacement strings.

Returns:

A new Exact PyTopic with wildcards replaced by the provided values.

Raises:
  • KeyError – If a required wildcard name is missing in kwargs.

  • ValueError – If having subordinate parts that are not exact or wildcards.

format_map(mapping: dict[str, str], internalized: bool = True, strict: bool = False) PyTopic[source]

Format the topic by replacing named wildcards with provided values from a mapping.

Parameters:
  • mapping – Dictionary mapping wildcard names to replacement strings.

  • internalized – If True, the returned topic is internalized into the global map and not owning its underlying buffer. If False, the returned topic owns its internal buffer.

  • strict – If True, raises KeyError if a wildcard name is missing in mapping.

Returns:

A new PyTopic with wildcards replaced by the provided values. If strict is True, the new PyTopic is guaranteed to be an exact PyTopic.

Raises:
  • KeyError – If a required wildcard name is missing in mapping.

  • ValueError – If having subordinate parts that are not exact or wildcards.

property value: str

The full topic literal value.

Getting returns the current literal. Setting attempts to assign and will raise ValueError on syntax errors. Setting mutates the internalized buffer and will re-register the topic in the internal mapping.

Type:

str

property owner: bool

Whether this Python object owns the underlying memory (always True in native Python).

Type:

bool

property is_exact: bool

True if the topic consists only of exact parts (no wildcards, ranges, or patterns).

Type:

bool

property addr: int

Numeric address / id of the underlying structure.

Type:

int

Event Classes

event_engine.capi.c_event

class event_engine.capi.c_event.EventHook

Bases: object

add_handler(py_callable, logger=None, deduplicate=False)
clear()
handlers
logger
remove_handler(py_callable)
topic
trigger(msg)
class event_engine.capi.c_event.EventHookEx

Bases: EventHook

stats
class event_engine.capi.c_event.MessagePayload

Bases: object

__init__(*args, **kwargs)
args
kwargs
kwargs_with_topic
owner
seq_id
topic

event_engine.native.event

Native Python fallback implementation for PyMessagePayload and EventHook classes.

This module provides a pure Python implementation that mimics the behavior of the Cython-based c_event module. It is used as a fallback when the Cython extension cannot be compiled (e.g., due to lack of Cython, GCC, or Clang).

The API is designed to match event_engine.capi.c_event as closely as possible.

class event_engine.native.event.PyMessagePayload(alloc: bool = False)[source]

Bases: object

Python wrapper for a message payload structure.

In native Python, all instances own their underlying data (owner, args_owner, kwargs_owner are always True).

__init__(alloc: bool = False) None[source]

Initialize a PyMessagePayload instance.

Parameters:

alloc – If True, allocate a new message payload (always True in Python).

__repr__() str[source]

Return a string representation of the payload.

property owner: bool

Whether this instance owns the underlying payload (always True in native Python).

Type:

bool

property args_owner: bool

Whether this instance owns the positional arguments (always True in native Python).

Type:

bool

property kwargs_owner: bool

Whether this instance owns the keyword arguments (always True in native Python).

Type:

bool

property topic: PyTopic | None

The topic associated with this payload.

property args: tuple | None

The positional arguments of the payload.

property kwargs: dict | None

The keyword arguments of the payload.

property seq_id: int

The sequence ID of the payload.

class event_engine.native.event.EventHook(topic: PyTopic, logger: Logger = None, retry_on_unexpected_topic: bool = False)[source]

Bases: object

Event dispatcher for registering and triggering handlers.

Handlers are triggered with a PyMessagePayload. The dispatcher supports two calling conventions: - With-topic: the handler receives the topic as a positional or keyword argument. - No-topic: the handler receives only args and kwargs from the payload.

Handlers that accept **kwargs are recommended to ensure compatibility with both conventions.

topic

The topic associated with this hook.

Type:

PyTopic

logger

Optional logger instance.

Type:

Logger | None

retry_on_unexpected_topic

If True, retries with no-topic calling convention if a with-topic handler raises a TypeError and the error message indicates an unexpected topic argument.

Type:

bool

__init__(topic: PyTopic, logger: Logger = None, retry_on_unexpected_topic: bool = False) None[source]

Initialize an EventHook.

Parameters:
  • topic – The topic associated with this hook.

  • logger – Optional logger instance.

  • retry_on_unexpected_topic – If True, enables retrying on unexpected topic argument errors.

topic: PyTopic
logger: Logger
retry_on_unexpected_topic: bool
__call__(msg: PyMessagePayload) None[source]

Trigger all registered handlers with the given message payload.

Alias for method trigger.

Parameters:

msg – The message payload to dispatch to handlers.

__iadd__(handler: Callable) EventHook[source]

Add a handler using the += operator.

Parameters:

handler – The callable to register.

Returns:

Self, for chaining.

__isub__(handler: Callable) EventHook[source]

Remove a handler using the -= operator.

Parameters:

handler – The callable to unregister.

Returns:

Self, for chaining.

__len__() int[source]

Return the number of registered handlers.

__repr__() str[source]

Return a string representation of the EventHook.

__iter__() Iterator[Callable][source]

Iterate over all registered handlers.

__contains__(handler: Callable) bool[source]

Check if a handler is registered.

Parameters:

handler – The callable to check.

Returns:

True if the handler is registered; False otherwise.

trigger(msg: PyMessagePayload) None[source]

Trigger all registered handlers with the given message payload.

Handlers are executed in registration order: 1. All no-topic handlers (called with *args, **kwargs only). 2. All with-topic handlers (called with topic, *args, **kwargs). In each group, handlers are invoked in the order they were added.

If retry_on_unexpected_topic flag is on and a with-topic handler raises a TypeError and the error message indicates an unexpected topic argument, the dispatcher retries the call without the topic.

Parameters:

msg – The message payload to dispatch.

add_handler(handler: Callable, deduplicate: bool = False) None[source]

Register a new handler.

It is strongly recommended that handlers accept **kwargs to remain compatible with both with-topic and no-topic calling conventions.

Parameters:
  • handler – The callable to register.

  • deduplicate – If True, skip registration if the handler is already present.

remove_handler(handler: Callable) EventHook[source]

Remove a handler from the hook.

Only the first matching occurrence is removed. If the same callable was added multiple times, subsequent instances remain registered.

Parameters:

handler – The callable to remove.

Returns:

Self, for chaining.

clear() None[source]

Remove all registered handlers.

property handlers: list[Callable]

List all registered handlers.

Handlers are ordered as follows: - First, all no-topic handlers (in registration order). - Then, all with-topic handlers (in registration order).

class event_engine.native.event.HandlerStats[source]

Bases: TypedDict

Statistics for a handler.

calls: int
total_time: float
class event_engine.native.event.EventHookEx(topic: PyTopic, logger: Logger = None, retry_on_unexpected_topic: bool = False)[source]

Bases: EventHook

Extended EventHook that tracks per-handler execution statistics.

__init__(topic: PyTopic, logger: Logger = None, retry_on_unexpected_topic: bool = False) None[source]

Initialize an EventHookEx.

Parameters:
  • topic – The topic associated with this hook.

  • logger – Optional logger instance.

  • retry_on_unexpected_topic – If True, enables retrying on unexpected topic argument errors.

trigger(msg: PyMessagePayload) None[source]

Trigger all registered handlers with the given message payload, tracking execution time.

Parameters:

msg – The message payload to dispatch.

get_stats(py_callable: Callable) HandlerStats | None[source]

Retrieve execution statistics for a specific handler.

Parameters:

py_callable – The handler to query.

Returns:

A dictionary with keys 'calls' (number of invocations) and 'total_time' (cumulative execution time in seconds), or None if the handler is not registered or the HandlerStats is not registered.

property stats: Iterator[tuple[Callable, HandlerStats]]

Iterate over all registered handlers and their execution statistics.

Returns:

An iterator yielding (handler, stats_dict) pairs.

clear() None[source]

Remove all registered handlers and clear statistics.

Engine Classes

event_engine.capi.c_engine

exception event_engine.capi.c_engine.Empty

Bases: Exception

class event_engine.capi.c_engine.EventEngine

Bases: object

activate()
active
capacity
clear() None
deactivate()
engine
event_hooks()
exact_topic_hook_map
generic_topic_hook_map
get(block=True, max_spin=65535, timeout=0.0) MessagePayload
get_hook(topic)
items()
logger
occupied
publish(topic, args, kwargs, block=True, max_spin=65535, timeout=0.0)
put(topic, *args, block=True, max_spin=65535, timeout=0.0, **kwargs)
register_handler(topic, handler, deduplicate=False)
register_hook(hook)
run()
seq_id
start()
stop() None
topics()
unregister_handler(topic, handler)
unregister_hook(topic) EventHook
class event_engine.capi.c_engine.EventEngineEx

Bases: EventEngine

clear() None
get_timer(interval, activate_time=None) Topic
minute_timer(topic)
run_timer(interval, topic, activate_time=None)
second_timer(topic)
stop() None
timer
exception event_engine.capi.c_engine.Full

Bases: Exception

event_engine.native.engine

Native Python fallback implementation for EventEngine and EventEngineEx.

This module provides a pure Python implementation that mimics the behavior of the Cython-based c_engine module. It is used as a fallback when the Cython extension cannot be compiled (e.g., due to lack of Cython, GCC, or Clang).

The API is designed to match event_engine.capi.c_engine as closely as possible.

exception event_engine.native.engine.Full[source]

Bases: Exception

Raised when attempting to publish to a full event queue.

exception event_engine.native.engine.Empty[source]

Bases: Exception

Raised when attempting to retrieve from an empty event queue.

class event_engine.native.engine.EventEngine(capacity: int = 4095, logger: Logger | None = None)[source]

Bases: object

High-performance, topic-driven event dispatcher backed by threading primitives.

The engine manages an internal message queue and dispatches events to registered handlers based on topic matching rules. In this native Python implementation:

  • A threading-based event loop consumes messages and triggers callbacks

  • Message queue uses deque with threading.Lock and Condition variables

  • Two dict instances for exact and generic topic routing

Matching priority: exact topic matches take precedence over generic matches. Exact matches are based on the topic’s literal key. Generic matches are evaluated by testing whether the published topic matches a registered pattern.

capacity

Maximum number of messages the internal queue can hold.

Type:

int

logger

Logger instance used for diagnostics.

Type:

Logger

__init__(capacity: int = 4095, logger: Logger | None = None) None[source]

Initialize an EventEngine.

Parameters:
  • capacity – Maximum number of pending messages.

  • logger – Optional logger. If None, a default logger is created.

logger: Logger
active: bool
engine: Thread | None
__len__() int[source]

Return the total number of registered topics (both exact and generic).

__repr__() str[source]

String representation of the engine.

activate() None[source]

Activate the event engine.

This method is called automatically when start is invoked. It can also be called manually to prepare the engine for operation.

deactivate() None[source]

Deactivate the event engine.

This method is called automatically when stop is invoked. It can also be called manually to halt the engine’s operation.

run() None[source]

Run the event loop in the current thread (blocking).

start() None[source]

Start the event loop in a dedicated background thread.

If the engine is already running, this method has no effect.

stop() None[source]

Stop the event loop and wait for the background thread to terminate.

If the engine is already stopped, this method has no effect.

clear() None[source]

Unregister all event hooks.

Notes

This method only works when the engine is stopped. If called while running, an error is logged and no action is taken.

get(block: bool = True, max_spin: int = 65535, timeout: float = 0.0) PyMessagePayload[source]

Retrieve an event from the internal queue.

Parameters:
  • block – If True, wait until an event is available.

  • max_spin – Maximum number of spin-loop iterations before blocking (ignored in pure Python).

  • timeout – Maximum wait time in seconds when blocking (0.0 means indefinite wait).

Returns:

A PyMessagePayload instance.

Raises:

Empty – If block=False and the queue is empty.

put(topic: PyTopic, *args, block: bool = True, max_spin: int = 65535, timeout: float = 0.0, **kwargs) None[source]

Publish an event to the queue (convenience alias for publish).

Parameters:
  • topic – Must be an exact PyTopic (i.e., topic.is_exact must be True).

  • *args – Positional arguments for the event.

  • block – If True, wait if the queue is full.

  • max_spin – Spin count before blocking (ignored in pure Python).

  • timeout – Maximum wait time in seconds when blocking (0.0 = indefinite).

  • **kwargs – Keyword arguments for the event.

Raises:
  • Full – If block=False and the queue is full.

  • ValueError – If topic is not an exact topic.

publish(topic: PyTopic, args: tuple, kwargs: dict, block: bool = True, timeout: float = 0.0) None[source]

Publish an event to the queue.

Parameters:
  • topic – Must be an exact PyTopic (i.e., topic.is_exact must be True).

  • args – Positional arguments for the event.

  • kwargs – Keyword arguments for the event.

  • block – If True, wait if the queue is full.

  • timeout – Maximum wait time in seconds when blocking (0.0 = indefinite).

Raises:
  • Full – If block=False and the queue is full.

  • ValueError – If topic is not an exact topic.

register_hook(hook: EventHook) None[source]

Register an EventHook for its associated topic.

Parameters:

hook – The hook to register.

Raises:

KeyError – If a hook is already registered for the same topic (exact or generic).

unregister_hook(topic: PyTopic) EventHook[source]

Unregister and return the EventHook associated with a topic.

Parameters:

topic – The topic to unregister.

Returns:

The unregistered EventHook.

Raises:

KeyError – If no hook is registered for the given topic.

register_handler(topic: PyTopic, handler, deduplicate: bool = False) None[source]

Register a handler for a topic (creates hook if needed).

Parameters:
  • topic – Topic to register handler for

  • handler – Callable handler

  • deduplicate – Skip if handler already registered

unregister_handler(topic: PyTopic, handler) None[source]

Unregister a handler from a topic.

Parameters:
  • topic – Topic to unregister handler from

  • handler – Callable handler to remove

event_hooks()[source]

Iterate over all registered event hooks.

topics()[source]

Iterate over all registered topics.

items()[source]

Iterate over (topic, hook) pairs.

property capacity: int

Get the queue capacity.

property occupied: int

Get the current number of messages in the queue.

property exact_topic_hook_map: dict

Get a copy of the exact topic hook mapping.

property generic_topic_hook_map: dict

Get a copy of the generic topic hook mapping.

class event_engine.native.engine.EventEngineEx(capacity: int = 4095, logger: Logger | None = None)[source]

Bases: EventEngine

Extended EventEngine with timer support and statistics tracking.

Provides timer functionality for periodic event triggering and uses EventHookEx for handler statistics.

__init__(capacity: int = 4095, logger: Logger | None = None) None[source]

Initialize EventEngineEx.

Parameters:
  • capacity – Maximum capacity of the message queue

  • logger – Optional logger instance

timer: dict[float, Thread]
__repr__() str[source]

String representation including active timers.

run_timer(interval: float, topic: PyTopic, activate_time: datetime | None = None) None[source]

Run a timer with custom interval (blocking call).

Parameters:
  • interval – Interval in seconds

  • topic – Topic to publish on

  • activate_time – Optional activation time

Raises:

RuntimeError – If the engine is not active

minute_timer(topic: PyTopic) None[source]

Run minute-aligned timer (blocking call).

Parameters:

topic – Topic to publish on

Raises:

RuntimeError – If the engine is not active

second_timer(topic: PyTopic) None[source]

Run second-aligned timer (blocking call).

Parameters:

topic – Topic to publish on

Raises:

RuntimeError – If the engine is not active

get_timer(interval: float, activate_time: datetime | None = None) PyTopic[source]

Get or create a timer with the specified interval.

Parameters:
  • interval – Interval in seconds

  • activate_time – Optional activation time

Returns:

PyTopic for the timer

Raises:

RuntimeError – If the engine is not active

stop() None[source]

Stop the engine and all timers.

clear() None[source]

Clear all hooks and timers.

Fallback Engine

event_engine.capi.fallback_engine

Pure Python fallback implementation of EventEngine for cross-platform compatibility.

This module provides Python-native implementations of EventEngine and EventEngineEx that work on Windows and other platforms without requiring Cython/C extensions.

Uses: - threading.Lock, threading.Condition for synchronization - dict for topic-to-hook mappings (instead of ByteMap) - Topic, MessagePayload, EventHook from c_topic and c_event modules

exception event_engine.capi.fallback_engine.Full[source]

Bases: Exception

Exception raised when the message queue is full.

exception event_engine.capi.fallback_engine.Empty[source]

Bases: Exception

Exception raised when the message queue is empty.

class event_engine.capi.fallback_engine.EventEngine(capacity: int = 4095, logger: Logger | None = None)[source]

Bases: object

Pure Python implementation of EventEngine.

Uses dict-based topic-to-hook mappings and threading primitives for synchronization. Compatible with Windows and all platforms supporting the threading module.

__init__(capacity: int = 4095, logger: Logger | None = None)[source]

Initialize EventEngine.

Parameters:
  • capacity – Maximum capacity of the message queue

  • logger – Optional logger instance

__len__() int[source]

Return the total number of registered hooks.

__repr__() str[source]

String representation of the engine.

activate()[source]

Activate the engine (sets active flag to True).

deactivate()[source]

Deactivate the engine (sets active flag to False).

run()[source]

Run the event loop (blocking call).

start()[source]

Start the engine in a background thread.

stop()[source]

Stop the engine and wait for the background thread to finish.

clear()[source]

Clear all hooks and handlers (engine must be stopped first).

get(block: bool = True, max_spin: int = 65535, timeout: float = 0.0) MessagePayload[source]

Get a message from the queue.

Parameters:
  • block – Whether to block if queue is empty

  • max_spin – Spin limit (ignored in pure Python)

  • timeout – Timeout in seconds

Returns:

MessagePayload

Raises:

Empty – If no message available

put(topic: Topic, *args, block: bool = True, max_spin: int = 65535, timeout: float = 0.0, **kwargs)[source]

Put a message into the queue.

Parameters:
  • topic – Topic for the message

  • *args – Positional arguments

  • block – Whether to block if queue is full

  • max_spin – Spin limit (ignored)

  • timeout – Timeout in seconds

  • **kwargs – Keyword arguments

Raises:

Full – If queue is full

publish(topic: Topic, args: tuple, kwargs: dict, block: bool = True, max_spin: int = 65535, timeout: float = 0.0)[source]

Publish a message to the queue.

Parameters:
  • topic – Topic for the message

  • args – Positional arguments tuple

  • kwargs – Keyword arguments dict

  • block – Whether to block if queue is full

  • max_spin – Spin limit (ignored)

  • timeout – Timeout in seconds

Raises:

Full – If queue is full

register_hook(hook: EventHook)[source]

Register an event hook.

Parameters:

hook – EventHook instance to register

unregister_hook(topic: Topic) EventHook[source]

Unregister a hook by topic.

Parameters:

topic – Topic of the hook to unregister

Returns:

The unregistered EventHook

register_handler(topic: Topic, handler, deduplicate: bool = False)[source]

Register a handler for a topic.

Parameters:
  • topic – Topic to register handler for

  • handler – Callable handler

  • deduplicate – Skip if handler already registered

unregister_handler(topic: Topic, handler)[source]

Unregister a handler from a topic.

Parameters:
  • topic – Topic to unregister handler from

  • handler – Callable handler to remove

event_hooks()[source]

Iterate over all registered event hooks.

topics()[source]

Iterate over all registered topics.

items()[source]

Iterate over (topic, hook) pairs.

property capacity: int

Get the queue capacity.

property occupied: int

Get the current number of messages in the queue.

property exact_topic_hook_map: dict

Get the exact topic hook mapping.

property generic_topic_hook_map: dict

Get the generic topic hook mapping.

class event_engine.capi.fallback_engine.EventEngineEx(capacity: int = 4095, logger: Logger | None = None)[source]

Bases: EventEngine

Extended EventEngine with timer support.

Provides timer functionality for periodic event triggering.

__init__(capacity: int = 4095, logger: Logger | None = None)[source]

Initialize EventEngineEx.

Parameters:
  • capacity – Maximum capacity of the message queue

  • logger – Optional logger instance

__repr__() str[source]

String representation including active timers.

run_timer(interval: float, topic: Topic, activate_time: datetime | None = None)[source]

Run a timer with custom interval (blocking call).

Parameters:
  • interval – Interval in seconds

  • topic – Topic to publish on

  • activate_time – Optional activation time

minute_timer(topic: Topic)[source]

Run minute-aligned timer (blocking call).

Parameters:

topic – Topic to publish on

second_timer(topic: Topic)[source]

Run second-aligned timer (blocking call).

Parameters:

topic – Topic to publish on

get_timer(interval: float, activate_time: datetime | None = None) Topic[source]

Get or create a timer with the specified interval.

Parameters:
  • interval – Interval in seconds

  • activate_time – Optional activation time

Returns:

Topic for the timer

stop()[source]

Stop the engine and all timers.

clear()[source]

Clear all hooks and timers.

Type Stubs

The package includes complete type stubs (.pyi files) for the Cython modules: - event_engine/capi/c_topic.pyi - event_engine/capi/c_event.pyi - event_engine/capi/c_engine.pyi These provide full type information for IDEs and type checkers like mypy. Example mypy usage: .. code-block:: bash

mypy my_app.py

All public APIs are fully typed.

See Also