Examples¶
This page provides practical examples of using PyEventEngine.
Basic Usage¶
Creating and Starting an Engine¶
from event_engine import EventEngine, Topic
# Create engine with queue capacity of 8192 messages
engine = EventEngine(capacity=8192)
# Start the event loop in a background thread
engine.start()
# ... use the engine ...
# Clean shutdown
engine.stop()
engine.clear()
Simple Publish/Subscribe¶
from event_engine import EventEngine, Topic
engine = EventEngine()
engine.start()
# Define a handler
def on_message(text: str, topic=None):
print(f"Received: {text} on {topic.value if topic else 'unknown'}")
# Register handler for exact topic
topic = Topic('App.Messages')
engine.register_handler(topic, on_message)
# Publish a message
engine.put(topic, 'Hello, World!')
import time
time.sleep(0.1) # Give handler time to process
engine.stop()
Topic Patterns¶
Wildcard Topics¶
Use {name} syntax for named wildcards:
from event_engine import EventEngine, Topic
engine = EventEngine()
engine.start()
# Register handler for wildcard topic
pattern = Topic('Market.Data.{symbol}')
results = []
def on_tick(price: float, symbol: str, topic=None):
results.append((symbol, price))
engine.register_handler(pattern, on_tick)
# Publish to specific topics (they match the pattern)
engine.put(Topic('Market.Data.AAPL'), 150.25, symbol='AAPL')
engine.put(Topic('Market.Data.TSLA'), 680.50, symbol='TSLA')
import time
time.sleep(0.1)
print(results) # [('AAPL', 150.25), ('TSLA', 680.50)]
engine.stop()
Range Topics¶
Use (option1|option2|...) for multiple choices:
from event_engine import Topic
# This topic matches either Equity or Futures
topic = Topic('Market.(Equity|Futures).Trade')
engine.register_handler(topic, lambda **kw: print(f"Trade: {kw}"))
engine.start()
engine.put(Topic('Market.Equity.Trade'), symbol='AAPL')
engine.put(Topic('Market.Futures.Trade'), symbol='ES')
# Does NOT match:
# engine.put(Topic('Market.Options.Trade'), symbol='AAPL')
import time
time.sleep(0.1)
engine.stop()
Pattern Topics (Regex)¶
Use /regex/ for complex matching:
from event_engine import Topic
# Match 4-letter stock symbols
pattern = Topic(r'Market.Data./^[A-Z]{4}$/')
engine.register_handler(pattern, lambda symbol, **kw: print(f"Symbol: {symbol}"))
engine.start()
engine.put(Topic('Market.Data.AAPL'), symbol='AAPL') # Matches
engine.put(Topic('Market.Data.TSLA'), symbol='TSLA') # Matches
engine.put(Topic('Market.Data.A'), symbol='A') # Does NOT match (too short)
import time
time.sleep(0.1)
engine.stop()
Topic Formatting¶
Format topics with wildcards by providing values:
from event_engine import Topic
template = Topic('Market.{market}.{symbol}')
# Format the template
specific = template.format(market='Equity', symbol='AAPL')
print(specific.value) # 'Market.Equity.AAPL'
print(specific.is_exact) # True
# Or use call syntax
specific2 = template(market='Futures', symbol='ES')
print(specific2.value) # 'Market.Futures.ES'
Event Hooks¶
Direct Hook Registration¶
For more control, register EventHook objects directly:
from event_engine import EventEngine, EventHook, Topic
engine = EventEngine()
# Create a hook
topic = Topic('App.Events')
hook = EventHook(topic)
# Add handlers to the hook
hook.add_handler(lambda msg: print(f"Handler 1: {msg}"))
hook.add_handler(lambda msg: print(f"Handler 2: {msg}"))
# Register the hook with the engine
engine.register_hook(hook)
engine.start()
engine.put(topic, 'Test message')
import time
time.sleep(0.1)
engine.stop()
Handler with Statistics¶
Use EventHookEx to track execution stats:
from event_engine import EventHookEx, Topic
topic = Topic('Perf.Test')
hook = EventHookEx(topic)
def slow_handler(x):
import time
time.sleep(0.01) # Simulate work
return x * 2
hook.add_handler(slow_handler)
# Trigger multiple times
from event_engine import PyMessagePayload
for i in range(10):
msg = PyMessagePayload(alloc=True)
msg.topic = topic
msg.args = (i,)
hook.trigger(msg)
# Check stats
stats = hook.get_stats(slow_handler)
print(f"Calls: {stats['calls']}")
print(f"Total time: {stats['total_time']:.4f}s")
Timers¶
Using Built-in Timers (EventEngineEx)¶
from event_engine import EventEngine, EventEngineEx
# Use EventEngineEx for timer support
engine = EventEngineEx(capacity=4096)
engine.start()
# Get a 1-second timer
timer_topic = engine.get_timer(interval=1.0)
tick_count = [0]
def on_tick(interval, trigger_time, **kwargs):
tick_count[0] += 1
print(f"Tick #{tick_count[0]} at {trigger_time}")
engine.register_handler(timer_topic, on_tick)
# Let it run for a few seconds
import time
time.sleep(3.5)
print(f"Total ticks: {tick_count[0]}") # Should be ~3
engine.stop()
Minute and Second Timers¶
from event_engine import EventEngineEx
engine = EventEngineEx()
engine.start()
# Second-aligned timer (fires exactly on the second)
sec_timer = engine.get_timer(interval=1)
engine.register_handler(sec_timer, lambda **kw: print(f"Second: {kw['timestamp']}"))
# Minute-aligned timer (fires exactly on the minute)
min_timer = engine.get_timer(interval=60)
engine.register_handler(min_timer, lambda **kw: print(f"Minute: {kw['timestamp']}"))
import time
time.sleep(65) # Wait for at least one minute tick
engine.stop()
Advanced Usage¶
Multiple Handlers per Topic¶
from event_engine import EventEngine, Topic
engine = EventEngine()
topic = Topic('Multi.Handler')
# Register multiple handlers
engine.register_handler(topic, lambda x: print(f"Handler A: {x}"))
engine.register_handler(topic, lambda x: print(f"Handler B: {x}"))
engine.register_handler(topic, lambda x: print(f"Handler C: {x}"))
engine.start()
engine.put(topic, 'broadcast')
# All three handlers will be called
import time
time.sleep(0.1)
engine.stop()
Deduplication¶
Prevent registering the same handler multiple times:
from event_engine import EventEngine, Topic
engine = EventEngine()
topic = Topic('Dedupe.Test')
def my_handler(x):
print(x)
# Without deduplication
engine.register_handler(topic, my_handler, deduplicate=False)
engine.register_handler(topic, my_handler, deduplicate=False)
# Handler is registered twice
# With deduplication
engine.register_handler(topic, my_handler, deduplicate=True)
engine.register_handler(topic, my_handler, deduplicate=True)
# Handler is only registered once
Custom Logging¶
Integrate with your application’s logger:
import logging
from event_engine import set_logger
# Create your logger
logger = logging.getLogger('MyApp')
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(message)s'))
logger.addHandler(handler)
# Tell PyEventEngine to use it
set_logger(logger)
# Now all PyEventEngine logs go through your logger
from event_engine import EventEngine
engine = EventEngine()
engine.start() # Will log via your logger
engine.stop()
Performance Testing¶
Basic Throughput Test¶
import time
import threading
from event_engine import EventEngine, Topic
engine = EventEngine(capacity=8192)
topic = Topic('Perf.Test')
received = []
done = threading.Event()
def handler(msg_id: int):
received.append(msg_id)
if msg_id >= 99999:
done.set()
engine.register_handler(topic, handler)
engine.start()
# Send 100k messages
start = time.perf_counter()
for i in range(100_000):
engine.put(topic, i)
# Wait for all processed
done.wait(timeout=10)
elapsed = time.perf_counter() - start
print(f"Throughput: {len(received) / elapsed:.0f} msg/s")
print(f"Latency: {elapsed / len(received) * 1000:.3f} ms/msg")
engine.stop()
Latency Measurement¶
import time
from event_engine import EventEngine, Topic
engine = EventEngine()
topic = Topic('Latency.Test')
latencies = []
def handler(send_time: float):
latency = time.perf_counter() - send_time
latencies.append(latency * 1000) # Convert to ms
engine.register_handler(topic, handler)
engine.start()
# Send 1000 messages with timestamps
for _ in range(1000):
engine.put(topic, time.perf_counter())
time.sleep(0.001) # 1ms between sends
time.sleep(0.5) # Let queue drain
engine.stop()
# Calculate statistics
latencies.sort()
print(f"Min: {latencies[0]:.3f} ms")
print(f"P50: {latencies[len(latencies)//2]:.3f} ms")
print(f"P95: {latencies[int(len(latencies)*0.95)]:.3f} ms")
print(f"Max: {latencies[-1]:.3f} ms")
See Also¶
API Reference for complete API documentation
demo/folder in the repository for more examplesdemo/native_performance_test.pyfor comprehensive performance testsdemo/capi_performance_test.pyfor Cython performance comparison