Files
NegStation/negstation/event_bus.py
2025-08-01 21:35:09 +02:00

57 lines
2.1 KiB
Python

import threading
import queue
import logging
import inspect
import types
class EventBus:
def __init__(self, logger: logging.Logger):
self.logger = logger
self.subscribers = {}
self.event_queue = queue.Queue()
self.main_queue = queue.Queue()
threading.Thread(target=self._dispatch_loop, daemon=True).start()
def subscribe(self, event_type: str, callback: callable, main_thread: bool = False):
self.logger.debug(f"Subscribed to {event_type}")
self.subscribers.setdefault(event_type, []).append((callback, main_thread))
def publish_deferred(self, event_type: str, data=None):
self.logger.debug(f"publish {event_type}")
self.event_queue.put((event_type, data))
def _dispatch_loop(self):
while True:
event_type, data = self.event_queue.get()
self.logger.debug(f"Dispatching {event_type}")
for callback, main_thread in self.subscribers.get(event_type, []):
if main_thread:
self.main_queue.put((callback, data))
else:
try:
callback(data)
except Exception as e:
self.logger.error(
f"Error in background handler '{
event_type}': {e}"
)
def process_main_queue(self):
while True:
try:
callback, data = self.main_queue.get_nowait()
callback(data)
except queue.Empty:
break
def unsubscribe_instance(self, instance):
for event_type, subs in list(self.subscribers.items()):
new_subs = []
for callback, main_thread in subs:
# if it's a bound method to our instance, skip it
if inspect.ismethod(callback) and callback.__self__ is instance:
continue
new_subs.append((callback, main_thread))
if len(new_subs) != len(subs):
self.subscribers[event_type] = new_subs