New design

This commit is contained in:
2025-07-30 22:00:29 +02:00
parent 47a13d3083
commit d0cbc63859
22 changed files with 580 additions and 842 deletions

1
negstation/__init__.py Normal file
View File

@ -0,0 +1 @@
from .negstation import EditorManager as NegStation

44
negstation/event_bus.py Normal file
View File

@ -0,0 +1,44 @@
import threading
import queue
import logging
class EventBus:
def __init__(self, logger: logging.Logger):
self.logger = logger
self.subscribers = {}
self.event_queue = queue.Queue()
self.main_queue = queue.Queue()
threading.Thread(target=self._dispatch_loop, daemon=True).start()
def subscribe(self, event_type: str, callback: callable, main_thread: bool = False):
self.logger.debug(f"Subscribed to {event_type}")
self.subscribers.setdefault(event_type, []).append((callback, main_thread))
def publish_deferred(self, event_type: str, data=None):
self.logger.debug(f"publish {event_type}")
self.event_queue.put((event_type, data))
def _dispatch_loop(self):
while True:
event_type, data = self.event_queue.get()
self.logger.debug(f"Dispatching {event_type}")
for callback, main_thread in self.subscribers.get(event_type, []):
if main_thread:
self.main_queue.put((callback, data))
else:
try:
callback(data)
except Exception as e:
self.logger.error(
f"Error in background handler '{
event_type}': {e}"
)
def process_main_queue(self):
while True:
try:
callback, data = self.main_queue.get_nowait()
callback(data)
except queue.Empty:
break

View File

@ -0,0 +1,18 @@
import numpy as np
from .event_bus import EventBus
class ImagePipeline:
def __init__(self, bus: EventBus):
self.bus = bus
self.stages = {}
def add_stage(self, name: str, img: np.ndarray):
self.stages[name] = img.astype(np.float32)
# notify widgets of updated stage list and data
self.bus.publish_deferred("pipeline_stages", list(self.stages.keys()))
self.bus.publish_deferred("pipeline_stage", (name, self.stages[name]))
def get_stage(self, name: str):
return self.stages.get(name)

View File

@ -0,0 +1,43 @@
import dearpygui.dearpygui as dpg
import logging
import json
import os
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..negstation import EditorManager
class LayoutManager:
INI_PATH = "negstation_layout.ini"
WIDGET_DATA_PATH = "negstation_widgets.json"
def __init__(self, manager: "EditorManager", logger: logging.Logger):
self.manager = manager
self.logger = logger
def save_layout(self):
self.logger.info("Saving layout...")
dpg.save_init_file(self.INI_PATH)
widget_data = [
{"widget_type": type(w).__name__, "config": w.get_config()}
for w in self.manager.widgets
]
with open(self.WIDGET_DATA_PATH, "w") as f:
json.dump(widget_data, f, indent=4)
self.logger.info("Layout saved successfully.")
def load_layout(self):
self.logger.info("Loading layout...")
if not os.path.exists(self.WIDGET_DATA_PATH):
return
with open(self.WIDGET_DATA_PATH, "r") as f:
widget_data = json.load(f)
for data in widget_data:
if data.get("widget_type") in self.manager.widget_classes:
self.manager._add_widget(widget_type=data.get("widget_type"))
if os.path.exists(self.INI_PATH):
dpg.configure_app(init_file=self.INI_PATH)
self.logger.info(f"Applied UI layout from {self.INI_PATH}")

110
negstation/negstation.py Normal file
View File

@ -0,0 +1,110 @@
import dearpygui.dearpygui as dpg
import logging
import os
import importlib
import inspect
import sys
from pathlib import Path
from importlib.machinery import SourceFileLoader
from .event_bus import EventBus
from .image_pipeline import ImagePipeline
from .layout_manager import LayoutManager
from .widgets.base_widget import BaseWidget
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
class EditorManager:
def __init__(self):
dpg.create_context()
self.bus = EventBus(logger)
self.pipeline = ImagePipeline(self.bus)
self.layout_manager = LayoutManager(self, logger)
self.widgets = []
self.widget_classes = {}
def _discover_and_register_widgets(self, directory="widgets"):
logging.info(f"Discovering widgets in '{directory}' directory...")
dir_path = Path(directory)
if not dir_path.is_dir():
logging.error(f"Path '{directory}' is not a directory")
return
parent = str(dir_path.parent.resolve())
if parent not in sys.path:
sys.path.insert(0, parent)
pkg_name = dir_path.name # e.g. 'widgets'
# 1) Load the packages own BaseWidget
try:
base_mod = importlib.import_module(f"{pkg_name}.base_widget")
ModuleBaseWidget = getattr(base_mod, "BaseWidget")
except Exception:
ModuleBaseWidget = None
for py_file in dir_path.glob("*.py"):
if py_file.name.startswith("__"):
continue
module_name = f"{pkg_name}.{py_file.stem}"
try:
module = importlib.import_module(module_name)
for name, cls in inspect.getmembers(module, inspect.isclass):
# 2) Use the BaseWidget defined *in* widgets/base_widget.py
if (
ModuleBaseWidget
and issubclass(cls, ModuleBaseWidget)
and cls is not ModuleBaseWidget
):
logging.info(f" -> Found and registered widget: {name}")
self._register_widget(name, cls)
except Exception as e:
logging.error(f"Failed to import widget '{py_file.name}': {e}")
def _register_widget(self, name: str, widget_class: object):
if name in self.widget_classes:
logging.warning(f"Widget '{name}' is already registered. Overwriting.")
self.widget_classes[name] = widget_class
def _add_widget(self, widget_type: str):
WidgetClass = self.widget_classes[widget_type]
instance = WidgetClass(self, logger)
self.widgets.append(instance)
instance.create()
def setup(self):
self._discover_and_register_widgets(
f"{os.path.dirname(os.path.realpath(__file__))}/widgets"
)
self.layout_manager.load_layout()
dpg.create_viewport(title="NegStation", width=960, height=400)
dpg.configure_app(docking=True, docking_space=True)
with dpg.viewport_menu_bar():
with dpg.menu(label="File"):
dpg.add_menu_item(
label="Save Layout", callback=self.layout_manager.save_layout
)
with dpg.menu(label="View"):
for widget_name in sorted(self.widget_classes.keys()):
dpg.add_menu_item(
label=self.widget_classes[widget_name].name,
callback=lambda s, a, ud: self._add_widget(ud),
user_data=widget_name,
)
def run(self):
self.setup()
dpg.setup_dearpygui()
dpg.show_viewport()
while dpg.is_dearpygui_running():
self.bus.process_main_queue()
for w in self.widgets:
w.update()
dpg.render_dearpygui_frame()
dpg.destroy_context()

View File

View File

@ -0,0 +1,33 @@
import dearpygui.dearpygui as dpg
import logging
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..negstation import EditorManager
class BaseWidget:
name: str = "BaseWidget"
def __init__(self, manager: "EditorManager", logger: logging.Logger):
self.manager = manager
self.logger = logger
self.window_tag = dpg.generate_uuid()
self.config = {}
def create(self):
raise NotImplementedError
def update(self):
pass
def get_config(self):
return self.config
def _on_window_close(self):
try:
dpg.delete_item(self.window_tag)
self.manager.widgets.remove(self)
except ValueError:
pass

View File

@ -0,0 +1,59 @@
import dearpygui.dearpygui as dpg
from PIL import Image
import numpy as np
from .base_widget import BaseWidget
class OpenImageWidget(BaseWidget):
name: str = "Open Image"
def __init__(self, manager, logger, stage_out="loaded_image"):
super().__init__(manager, logger)
self.stage_out = stage_out
self.dialog_tag = dpg.generate_uuid()
self.output_tag = dpg.generate_uuid()
def create(self):
with dpg.file_dialog(
directory_selector=False,
show=False,
callback=self._on_file_selected,
tag=self.dialog_tag,
height=300,
width=400,
):
dpg.add_file_extension(".*")
with dpg.window(
label="Open Image File",
tag=self.window_tag,
width=300,
height=150,
on_close=self._on_window_close,
):
dpg.add_input_text(label="Stage Output Name", tag=self.output_tag)
dpg.add_button(label="Open File...", callback=self._on_open_file)
dpg.set_value(self.output_tag, self.stage_out)
def _on_open_file(self):
dpg.configure_item(self.dialog_tag, show=True)
def _on_file_selected(self, sender, app_data):
# app_data[0] is dict with selected file paths
selection = (
f"{app_data['current_path']}/{list(app_data['selections'].keys())[0]}"
if isinstance(app_data, dict)
else None
)
if not selection:
return
self.logger.info(f"Selected file '{selection}'")
try:
img = Image.open(selection).convert("RGBA")
arr = np.asarray(img).astype(np.float32) / 255.0 # normalize to [0,1]
# Publish into pipeline
self.manager.pipeline.add_stage(dpg.get_value(self.output_tag), arr)
except Exception as e:
self.logger.error(f"Failed to load image {selection}: {e}")

View File

@ -0,0 +1,226 @@
import dearpygui.dearpygui as dpg
import numpy as np
from .base_widget import BaseWidget
# class StageViewerWidget(BaseWidget):
# name: str = "Stage Viewer"
# def __init__(self, manager, logger):
# super().__init__(manager, logger)
# # ensure texture registry
# if not hasattr(manager, "texture_registry"):
# manager.texture_registry = dpg.add_texture_registry(tag=dpg.generate_uuid())
# self.registry = manager.texture_registry
# self.stages = []
# self.current = None
# self.texture_tag = dpg.generate_uuid()
# self.image_item = None
# manager.bus.subscribe("pipeline_stages", self.on_stage_list, main_thread=True)
# manager.bus.subscribe("pipeline_stage", self.on_stage_data, main_thread=True)
# def create(self):
# with dpg.window(
# label="Stage Viewer",
# tag=self.window_tag,
# width=400,
# height=400,
# on_close=self._on_window_close,
# ):
# self.combo = dpg.add_combo(label="Stage", items=[], callback=self.on_select)
# # placeholder 1×1 texture
# dpg.add_dynamic_texture(
# width=1,
# height=1,
# default_value=[0.0, 0.0, 0.0, 0.0],
# tag=self.texture_tag,
# parent=self.registry,
# )
# self.image_item = dpg.add_image(self.texture_tag)
# def on_stage_list(self, stages):
# self.stages = stages
# dpg.configure_item(self.combo, items=stages)
# if not self.current and stages:
# self.current = stages[0]
# dpg.set_value(self.combo, self.current)
# def on_select(self, sender, stage_name):
# self.current = stage_name
# img = self.manager.pipeline.get_stage(stage_name)
# if img is not None:
# self.update_texture(img)
# def on_stage_data(self, data):
# name, img = data
# if name == self.current:
# self.update_texture(img)
# def update_texture(self, img: np.ndarray):
# h, w, _ = img.shape
# flat = img.flatten().tolist()
# # recreate texture at correct size
# if dpg.does_item_exist(self.texture_tag):
# dpg.delete_item(self.texture_tag)
# dpg.add_dynamic_texture(
# width=w,
# height=h,
# default_value=flat,
# tag=self.texture_tag,
# parent=self.registry,
# )
# # determine available window size
# win_w, win_h = dpg.get_item_rect_size(self.window_tag)
# # reserve space for combo box (approx 30px)
# available_h = max(win_h - 30, 1)
# # compute scale to fit
# scale = min(win_w / w, available_h / h)
# disp_w = int(w * scale)
# disp_h = int(h * scale)
# # update image widget
# dpg.configure_item(
# self.image_item, texture_tag=self.texture_tag, width=disp_w, height=disp_h
# )
class StageViewerWidget(BaseWidget):
"""
A robust, zoomable stage viewer using a Dear PyGui Plot to display
dynamic textures without ever deleting them—avoiding segfaults.
"""
name = "Stage Viewer"
def __init__(self, manager, logger):
super().__init__(manager, logger)
self.manager.bus.subscribe("pipeline_stages", self._on_stage_list, main_thread=True)
self.manager.bus.subscribe("pipeline_stage", self._on_stage_data, main_thread=True)
# onetime flags and tags
self._initialized = False
self.texture_tag = dpg.generate_uuid()
self.image_draw_tag = dpg.generate_uuid()
self.plot_tag = dpg.generate_uuid()
self.xaxis_tag = dpg.generate_uuid()
self.yaxis_tag = dpg.generate_uuid()
self.last_size = (1, 1)
self.current_stage = None
self.needs_fit = False
def create(self):
if dpg.does_item_exist(self.window_tag):
return
# ensure a texture registry exists
if not hasattr(self.manager, "texture_registry"):
self.manager.texture_registry = dpg.add_texture_registry(tag=dpg.generate_uuid())
with dpg.window(label="Stage Viewer",
tag=self.window_tag,
on_close=self._on_window_close,
width=600, height=600):
# stage selector
self.combo = dpg.add_combo(label="Stage", items=[], callback=self._on_select)
# plot container, equal_aspects ensures no distortion
with dpg.plot(label="Image Plot", tag=self.plot_tag, height=-1, width=-1, equal_aspects=True):
self.xaxis_tag = dpg.add_plot_axis(dpg.mvXAxis,
no_tick_labels=True, no_gridlines=True)
self.yaxis_tag = dpg.add_plot_axis(dpg.mvYAxis,
no_tick_labels=True, no_gridlines=True)
# resize handler to refit on window/plot size changes
with dpg.item_handler_registry() as handler:
dpg.add_item_resize_handler(callback=lambda s,a,u: self._fit_image(), user_data=None)
dpg.bind_item_handler_registry(self.window_tag, handler)
def _on_stage_list(self, stages):
dpg.configure_item(self.combo, items=stages)
if not self.current_stage and stages:
self.current_stage = stages[0]
dpg.set_value(self.combo, self.current_stage)
def _on_select(self, sender, stage_name):
self.current_stage = stage_name
img = self.manager.pipeline.get_stage(stage_name)
if img is not None:
self._update_image(img)
def _on_stage_data(self, data):
name, img = data
if name == self.current_stage:
self._update_image(img)
def _update_image(self, img: np.ndarray):
h, w, _ = img.shape
self.last_size = (w, h)
# First time: create texture & draw-image inside the plot
if not self._initialized:
dpg.add_dynamic_texture(w, h, img, tag=self.texture_tag,
parent=self.manager.texture_registry)
dpg.draw_image(self.texture_tag,
pmin=(0, h), pmax=(w, 0),
tag=self.image_draw_tag,
parent=self.plot_tag)
self._initialized = True
else:
# Subsequent updates: just set_value and adjust draw coords
dpg.set_value(self.texture_tag, img)
dpg.configure_item(self.image_draw_tag,
pmin=(0, self.last_size[1]),
pmax=(self.last_size[0], 0))
# show & focus window
dpg.configure_item(self.window_tag, show=True)
dpg.focus_item(self.window_tag)
# flag to refit axes
self.needs_fit = True
def _fit_image(self):
"""Adjust plot axes so the image fills the available space."""
if not self._initialized or not self.needs_fit:
return
# get plot area size
plot_w = dpg.get_item_width(self.window_tag)
plot_h = dpg.get_item_height(self.window_tag) - 30 # reserve combo height
if plot_w <= 0 or plot_h <= 0:
return
img_w, img_h = self.last_size
if img_w <= 0 or img_h <= 0:
return
plot_aspect = plot_w / plot_h
img_aspect = img_w / img_h
if img_aspect > plot_aspect:
x_min, x_max = 0, img_w
needed_h = img_w / plot_aspect
center_y = img_h / 2
y_min = center_y - needed_h / 2
y_max = center_y + needed_h / 2
else:
y_min, y_max = 0, img_h
needed_w = img_h * plot_aspect
center_x = img_w / 2
x_min = center_x - needed_w / 2
x_max = center_x + needed_w / 2
dpg.set_axis_limits(self.xaxis_tag, x_min, x_max)
dpg.set_axis_limits(self.yaxis_tag, y_min, y_max)
self.needs_fit = False
def update(self):
# If we flagged a refit, do it now
if self.needs_fit:
self._fit_image()