Python API Reference (CAPI)¶
This page documents the high-level Python API provided by event_engine.capi. This is the recommended API for most users.
Note
The API is identical whether using the compiled Cython version or the pure Python fallback. See Native Python Fallback for fallback-specific details.
Quick Import¶
from event_engine import EventEngine, Topic, EventHook, MessagePayload
# Or import from capi explicitly
from event_engine.capi import EventEngine, Topic
All public classes and functions are also available at the top level (event_engine.*).
Topic Classes¶
Topic¶
The Topic alias points to PyTopic.
Constructor
Topic(topic_str: str) -> Topic
Parse a topic string into a structured topic.
Examples:
# Exact topic
exact = Topic('Market.Data.AAPL')
assert exact.is_exact == True
# Wildcard topic
pattern = Topic('Market.Data.{symbol}')
assert pattern.is_exact == False
# Range topic
range_topic = Topic('Market.(Equity|Futures).Trade')
# Pattern topic
regex = Topic(r'Market.Data./^[A-Z]{4}$/')
Properties
value: str- Full topic stringis_exact: bool- True if all parts are exact (no wildcards/patterns)hash_value: int- Hash of the topic for fast comparisonaddr: int- Memory address of underlying C structure
Methods
__len__() -> int- Number of topic parts__getitem__(index: int) -> TopicPart- Get part by index__iter__() -> Iterator[TopicPart]- Iterate over parts__eq__(other: Topic) -> bool- Equality comparison__hash__() -> int- Hash for use in dicts/sets__repr__() -> str- String representationmatch(other: Topic) -> TopicMatchResult- Match against another topicformat(**kwargs) -> Topic- Replace wildcards with valuesappend(part: TopicPart) -> Topic- Append a part (returns new topic)update_literal() -> Topic- Recalculate literal string after modification
Class Methods
Topic.join(parts: Iterable[str]) -> Topic- Join strings with default separatorTopic.from_parts(parts: Iterable[TopicPart]) -> Topic- Build from parts
TopicPart Classes¶
Base class for topic parts:
class TopicPart:
@property
def ttype(self) -> TopicType # Type of this part
@property
def addr(self) -> int # C memory address
Subclasses:
TopicPartExact- Exact literal matchpart: str- The literal string
TopicPartAny- Wildcard match ({name})name: str- The wildcard name
TopicPartRange- One of multiple options ((opt1|opt2))options() -> Iterator[str]- Iterate over options
TopicPartPattern- Regex match (/pattern/)pattern: str- The regex stringregex: re.Pattern- Compiled regex
TopicMatchResult¶
Result of matching two topics:
result = pattern.match(exact_topic)
if result.matched: # All parts matched
for node in result:
print(node['part_a'], node['part_b'], node['literal'])
# Convert to dict of matched values
values = result.to_dict() # {part_name: part_value}
Properties
matched: bool- True if all parts matchedlength: int- Number of match nodes
Methods
__len__() -> int- Number of nodes__getitem__(index: int) -> TopicMatchNode- Get node by index__iter__() -> Iterator[TopicMatchNode]- Iterate nodes__bool__() -> bool- Same asmatchedto_dict() -> dict[str, TopicPart]- Convert to dict
TopicMatchNode TypedDict¶
class TopicMatchNode(TypedDict):
matched: bool # Did this node match?
part_a: TopicPart | None # Part from pattern topic
part_b: TopicPart | None # Part from target topic
literal: str | None # Matched literal value
Topic Helper Functions¶
# Initialize internal topic map
init_internal_map(capacity: int = 1024) -> dict
# Clear internal map
clear_internal_map() -> None
# Get topic from internal map
get_internal_topic(key: str, owner: bool = False) -> Topic | None
# Get all internalized topics
get_internal_map() -> dict[str, Topic]
# Initialize allocator (no-op in pure Python)
init_allocator(capacity: int = 4096, with_shm: bool = False) -> None
Event Classes¶
MessagePayload¶
Container for event data:
msg = MessagePayload(alloc=True)
msg.topic = Topic('My.Topic')
msg.args = (1, 2, 3)
msg.kwargs = {'key': 'value'}
msg.seq_id = 42
Properties
topic: Topic- Associated topicargs: tuple | None- Positional argumentskwargs: dict | None- Keyword argumentsseq_id: int- Sequence ID
All properties are read-write. In the pure Python version, owner, args_owner, and kwargs_owner are always True.
EventHook¶
Event dispatcher for a specific topic:
topic = Topic('App.Events')
hook = EventHook(topic, logger=my_logger, retry_on_unexpected_topic=False)
# Add handlers
hook.add_handler(my_handler, deduplicate=True)
hook += another_handler # Operator shorthand
# Trigger
msg = MessagePayload(alloc=True)
msg.topic = topic
msg.args = (123,)
hook.trigger(msg)
# Remove handlers
hook -= my_handler
hook.clear() # Remove all
Constructor
EventHook(topic: Topic,
logger: Logger = None,
retry_on_unexpected_topic: bool = False)
Attributes
topic: Topic- Associated topiclogger: Logger- Logger instanceretry_on_unexpected_topic: bool- Retry without topic on TypeError
Methods
trigger(msg: MessagePayload)- Dispatch to all handlers__call__(msg: MessagePayload)- Alias fortriggeradd_handler(handler: Callable, deduplicate: bool = False)- Register handlerremove_handler(handler: Callable) -> EventHook- Unregister handler__iadd__(handler: Callable) -> EventHook-hook += handler__isub__(handler: Callable) -> EventHook-hook -= handler__len__() -> int- Number of handlers__iter__() -> Iterator[Callable]- Iterate handlers__contains__(handler: Callable) -> bool- Check if handler registeredclear()- Remove all handlers
Properties
handlers: list[Callable]- All registered handlers (ordered: no-topic first, then with-topic)
EventHookEx¶
Extended hook with statistics tracking:
hook = EventHookEx(topic, logger=my_logger)
# ... register and trigger ...
# Get stats for a specific handler
stats = hook.get_stats(my_handler)
print(stats['calls']) # Number of calls
print(stats['total_time']) # Total execution time in seconds
# Iterate all stats
for handler, stats in hook.stats:
print(f"{handler.__name__}: {stats}")
Additional Methods
get_stats(handler: Callable) -> HandlerStats | None- Get stats for handlerstats: Iterator[tuple[Callable, HandlerStats]]- Iterate (handler, stats) pairs
HandlerStats TypedDict
class HandlerStats(TypedDict):
calls: int # Number of times called
total_time: float # Total execution time (seconds)
Engine Classes¶
EventEngine¶
Main event engine with message queue and routing:
engine = EventEngine(capacity=8192, logger=my_logger)
# Register handlers
topic = Topic('My.Topic')
engine.register_handler(topic, my_handler)
# Start event loop
engine.start()
# Publish messages
engine.put(topic, 'arg1', 'arg2', key='value')
# Stop and clean up
engine.stop()
engine.clear()
Constructor
EventEngine(capacity: int = 4095, logger: Logger = None)
Attributes
capacity: int- Maximum queue sizelogger: Logger- Logger instanceactive: bool- Engine running stateoccupied: int- Current queue size
Methods
start()- Start event loop in background threadstop()- Stop event loop and join threadrun()- Run event loop in current thread (blocking)activate()- Mark engine as active (called bystart())deactivate()- Mark engine as inactive (called bystop())clear()- Remove all hooks (must be stopped first)put(topic: Topic, *args, block: bool = True, max_spin: int = 65535, timeout: float = 0.0, **kwargs)- Publish event (raisesFullif queue full and non-blocking)publish(topic: Topic, args: tuple, kwargs: dict, block: bool = True, timeout: float = 0.0)- Publish with explicit args/kwargsget(block: bool = True, max_spin: int = 65535, timeout: float = 0.0) -> MessagePayload- Get message from queue (raisesEmptyif empty and non-blocking)register_hook(hook: EventHook)- Register an EventHookunregister_hook(topic: Topic) -> EventHook- Unregister and return hookregister_handler(topic: Topic, handler: Callable, deduplicate: bool = False)- Register handler (creates hook if needed)unregister_handler(topic: Topic, handler: Callable)- Unregister handler__len__() -> int- Total number of registered topics
Properties
exact_topic_hook_map: dict- Copy of exact topic hooksgeneric_topic_hook_map: dict- Copy of generic topic hooks
Iterators
event_hooks()- Iterate all hookstopics()- Iterate all topicsitems()- Iterate (topic, hook) pairs
EventEngineEx¶
Extended engine with timer support:
engine = EventEngineEx(capacity=4096)
engine.start()
# Get a timer topic
timer_topic = engine.get_timer(interval=1.0)
engine.register_handler(timer_topic, on_timer)
# Timer fires every 1 second
Additional Methods
get_timer(interval: float, activate_time: datetime = None) -> Topic- Get or create timer topic - Special intervals:1(second-aligned),60(minute-aligned)run_timer(interval: float, topic: Topic, activate_time: datetime = None)- Run timer loop (blocking)second_timer(topic: Topic)- Second-aligned timer loop (blocking)minute_timer(topic: Topic)- Minute-aligned timer loop (blocking)
Timer Behavior
Timers publish events with
intervalandtrigger_time(ortimestamp) in kwargsMultiple calls to
get_timer()with same interval return the same topicTimers stop when engine stops
Exceptions¶
class Full(Exception):
"""Raised when queue is full and non-blocking put"""
class Empty(Exception):
"""Raised when queue is empty and non-blocking get"""
Module Functions¶
# Check if using fallback
from event_engine import USING_FALLBACK
print(USING_FALLBACK) # True if pure Python, False if Cython
# Set logger
from event_engine import set_logger
import logging
set_logger(logging.getLogger('MyApp'))
See Also¶
Examples - Practical usage examples
Cython API Reference - Lower-level Cython API
Native Python Fallback - Pure Python implementation details
API Reference - Auto-generated API reference