From f5f47c26fdd5cb6b4d637d13f10268e1ddaffbb1 Mon Sep 17 00:00:00 2001 From: Joppe Blondel Date: Thu, 19 Mar 2026 18:47:45 +0100 Subject: [PATCH] New graph plotter --- plot.py | 278 +++++++++++++++++++++++++++++++---------------- plot_config.json | 28 +++++ 2 files changed, 215 insertions(+), 91 deletions(-) create mode 100644 plot_config.json diff --git a/plot.py b/plot.py index 6191037..2fd46f8 100644 --- a/plot.py +++ b/plot.py @@ -1,10 +1,11 @@ #!/usr/bin/env python3 import argparse +import json import re -import sys import threading import time from collections import deque +from pathlib import Path from queue import Empty, Queue try: @@ -22,26 +23,46 @@ except ImportError as exc: ) from exc -IMU_RE = re.compile( - r"\[\]:\s*(-?\d+)\s+(-?\d+)\s+(-?\d+)\s+(-?\d+)\s+(-?\d+)\s+(-?\d+)" -) - BACKGROUND = (18, 20, 24) GRID = (48, 54, 64) AXIS = (140, 148, 160) -ACC_X_TRACE = (90, 170, 255) -ACC_Y_TRACE = (80, 220, 140) -ACC_Z_TRACE = (255, 200, 60) -GYR_X_TRACE = (240, 80, 80) -GYR_Y_TRACE = (220, 80, 220) -GYR_Z_TRACE = (240, 240, 240) TEXT = (230, 235, 240) ERROR = (255, 110, 110) +LINE_RE = re.compile(r"\[<([^>]+)>\]:\s*(.*)") + +DEFAULT_CONFIG = { + "title": "IMU stream", + "history_seconds": 10.0, + "graphs": [ + { + "title": "Accelerometer", + "streams": [ + {"key": "acc_x", "label": "ACC X", "color": [90, 170, 255]}, + {"key": "acc_y", "label": "ACC Y", "color": [80, 220, 140]}, + {"key": "acc_z", "label": "ACC Z", "color": [255, 200, 60]}, + ], + }, + { + "title": "Gyroscope", + "streams": [ + {"key": "gyr_x", "label": "GYR X", "color": [240, 80, 80]}, + {"key": "gyr_y", "label": "GYR Y", "color": [220, 80, 220]}, + {"key": "gyr_z", "label": "GYR Z", "color": [240, 240, 240]}, + ], + }, + ], + "sources": [ + { + "tag": "IMU", + "fields": ["acc_x", "acc_y", "acc_z", "gyr_x", "gyr_y", "gyr_z"], + } + ], +} def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( - description="Read BMI160 IMU values from a serial port and draw them live." + description="Read tagged serial values and draw them live from a JSON config." ) parser.add_argument("port", help="Serial port, for example /dev/ttyUSB0 or COM3") parser.add_argument( @@ -51,11 +72,18 @@ def parse_args() -> argparse.Namespace: default=115200, help="Serial baudrate (default: 115200)", ) + parser.add_argument( + "-c", + "--config", + type=Path, + default=None, + help="Path to JSON plot config (default: built-in IMU config)", + ) parser.add_argument( "--history-seconds", type=float, - default=10.0, - help="Initial visible history window in seconds (default: 10)", + default=None, + help="Initial visible history window in seconds (overrides config)", ) parser.add_argument( "--max-samples", @@ -78,25 +106,83 @@ def parse_args() -> argparse.Namespace: return parser.parse_args() -def serial_reader(port: str, baudrate: int, output: Queue) -> None: +def load_config(config_path: Path | None) -> dict: + if config_path is None: + return DEFAULT_CONFIG + + with config_path.open("r", encoding="utf-8") as handle: + return json.load(handle) + + +def validate_config(config: dict) -> tuple[dict[str, list[str]], list[dict], list[str]]: + sources = config.get("sources") + graphs = config.get("graphs") + if not isinstance(sources, list) or not sources: + raise SystemExit("Config must contain a non-empty `sources` list.") + if not isinstance(graphs, list) or not graphs: + raise SystemExit("Config must contain a non-empty `graphs` list.") + + tag_to_fields: dict[str, list[str]] = {} + for source in sources: + tag = source.get("tag") + fields = source.get("fields") + if not isinstance(tag, str) or not tag: + raise SystemExit("Each source must have a non-empty string `tag`.") + if not isinstance(fields, list) or not fields or not all(isinstance(field, str) for field in fields): + raise SystemExit(f"Source `{tag}` must have a non-empty string list `fields`.") + tag_to_fields[tag] = fields + + stream_keys: list[str] = [] + for graph in graphs: + streams = graph.get("streams") + if not isinstance(streams, list) or not streams: + raise SystemExit("Each graph must have a non-empty `streams` list.") + for stream in streams: + key = stream.get("key") + if not isinstance(key, str) or not key: + raise SystemExit("Each stream must have a non-empty string `key`.") + stream_keys.append(key) + + return tag_to_fields, graphs, stream_keys + + +def serial_reader(port: str, baudrate: int, tag_to_fields: dict[str, list[str]], output: Queue) -> None: try: with serial.Serial(port, baudrate=baudrate, timeout=1) as ser: while True: raw_line = ser.readline() if not raw_line: continue + line = raw_line.decode("utf-8", errors="replace").strip() - timestamp = time.monotonic() - match = IMU_RE.search(line) + match = LINE_RE.search(line) if not match: continue - values = [int(group) for group in match.groups()] - for stream_name, value in zip( - ("acc_x", "acc_y", "acc_z", "gyr_x", "gyr_y", "gyr_z"), - values, - ): - output.put((stream_name, timestamp, value)) + tag, payload = match.groups() + fields = tag_to_fields.get(tag) + if fields is None: + continue + + parts = payload.split() + if len(parts) != len(fields): + output.put( + ( + "error", + f"Tag {tag} expected {len(fields)} values, got {len(parts)}: {payload}", + ) + ) + continue + + try: + values = [int(part) for part in parts] + except ValueError: + output.put(("error", f"Tag {tag} has non-integer payload: {payload}")) + continue + + timestamp = time.monotonic() + for key, value in zip(fields, values): + output.put((key, timestamp, value)) except serial.SerialException as exc: output.put(("error", f"Serial error: {exc}")) @@ -117,7 +203,8 @@ def drain_queue( continue stream_name, timestamp, value = item - sample_sets[stream_name].append((timestamp, value)) + if stream_name in sample_sets: + sample_sets[stream_name].append((timestamp, value)) def draw_grid( @@ -182,7 +269,10 @@ def draw_trace( def visible_range( - sample_sets: dict[str, deque[tuple[float, int]]], stream_names: tuple[str, ...], now: float, view_span: float + sample_sets: dict[str, deque[tuple[float, int]]], + stream_names: list[str], + now: float, + view_span: float, ) -> tuple[float, float]: max_abs = 1.0 @@ -201,31 +291,35 @@ def zoom(view_span: float, direction: int, initial_span: float) -> float: return min(max(initial_span * 20.0, 60.0), view_span * 1.2) +def color_tuple(color_value: list[int]) -> tuple[int, int, int]: + if not isinstance(color_value, list) or len(color_value) != 3: + raise SystemExit("Each stream color must be a 3-element RGB list.") + return tuple(int(component) for component in color_value) + + def main() -> int: args = parse_args() - sample_sets = { - "acc_x": deque(maxlen=args.max_samples), - "acc_y": deque(maxlen=args.max_samples), - "acc_z": deque(maxlen=args.max_samples), - "gyr_x": deque(maxlen=args.max_samples), - "gyr_y": deque(maxlen=args.max_samples), - "gyr_z": deque(maxlen=args.max_samples), - } + config = load_config(args.config) + tag_to_fields, graphs, stream_keys = validate_config(config) + sample_sets = {key: deque(maxlen=args.max_samples) for key in stream_keys} queue: Queue = Queue() thread = threading.Thread( - target=serial_reader, args=(args.port, args.baudrate, queue), daemon=True + target=serial_reader, + args=(args.port, args.baudrate, tag_to_fields, queue), + daemon=True, ) thread.start() pygame.init() - pygame.display.set_caption(f"IMU stream: {args.port} @ {args.baudrate}") + pygame.display.set_caption(f"{config.get('title', 'Serial plotter')}: {args.port} @ {args.baudrate}") screen = pygame.display.set_mode((args.width, args.height), pygame.RESIZABLE) font = pygame.font.SysFont("monospace", 18) small_font = pygame.font.SysFont("monospace", 14) clock = pygame.time.Clock() - view_span = max(args.history_seconds, 1.0) + initial_span = float(args.history_seconds or config.get("history_seconds", 10.0)) + view_span = max(initial_span, 1.0) error_message = None running = True @@ -234,12 +328,12 @@ def main() -> int: if event.type == pygame.QUIT: running = False elif event.type == pygame.MOUSEWHEEL: - view_span = zoom(view_span, event.y, args.history_seconds) + view_span = zoom(view_span, event.y, initial_span) elif event.type == pygame.MOUSEBUTTONDOWN: if event.button == 4: - view_span = zoom(view_span, 1, args.history_seconds) + view_span = zoom(view_span, 1, initial_span) elif event.button == 5: - view_span = zoom(view_span, -1, args.history_seconds) + view_span = zoom(view_span, -1, initial_span) queued_error = drain_queue(queue, sample_sets) if queued_error is not None: @@ -247,74 +341,76 @@ def main() -> int: width, height = screen.get_size() now = time.monotonic() + graph_count = len(graphs) plot_width = max(100, width - 100) - panel_height = max(100, (height - 140) // 2) - acc_rect = pygame.Rect(70, 40, plot_width, panel_height) - gyr_rect = pygame.Rect(70, acc_rect.bottom + 40, plot_width, panel_height) - acc_y_min, acc_y_max = visible_range( - sample_sets, ("acc_x", "acc_y", "acc_z"), now, view_span - ) - gyr_y_min, gyr_y_max = visible_range( - sample_sets, ("gyr_x", "gyr_y", "gyr_z"), now, view_span - ) + available_height = max(100, height - 120 - (graph_count - 1) * 30) + panel_height = max(100, available_height // graph_count) screen.fill(BACKGROUND) - draw_grid(screen, acc_rect, small_font, view_span, acc_y_min, acc_y_max, "Accelerometer") - draw_grid(screen, gyr_rect, small_font, view_span, gyr_y_min, gyr_y_max, "Gyroscope") - latest_acc_x = draw_trace( - screen, acc_rect, sample_sets["acc_x"], view_span, now, acc_y_min, acc_y_max, ACC_X_TRACE - ) - latest_acc_y = draw_trace( - screen, acc_rect, sample_sets["acc_y"], view_span, now, acc_y_min, acc_y_max, ACC_Y_TRACE - ) - latest_acc_z = draw_trace( - screen, acc_rect, sample_sets["acc_z"], view_span, now, acc_y_min, acc_y_max, ACC_Z_TRACE - ) - latest_gyr_x = draw_trace( - screen, gyr_rect, sample_sets["gyr_x"], view_span, now, gyr_y_min, gyr_y_max, GYR_X_TRACE - ) - latest_gyr_y = draw_trace( - screen, gyr_rect, sample_sets["gyr_y"], view_span, now, gyr_y_min, gyr_y_max, GYR_Y_TRACE - ) - latest_gyr_z = draw_trace( - screen, gyr_rect, sample_sets["gyr_z"], view_span, now, gyr_y_min, gyr_y_max, GYR_Z_TRACE - ) + latest_values: dict[str, int | None] = {} + top = 40 + for graph in graphs: + rect = pygame.Rect(70, top, plot_width, panel_height) + stream_defs = graph["streams"] + stream_names = [stream["key"] for stream in stream_defs] + y_min, y_max = visible_range(sample_sets, stream_names, now, view_span) + draw_grid( + screen, + rect, + small_font, + view_span, + y_min, + y_max, + graph.get("title", "Graph"), + ) + for stream in stream_defs: + latest_values[stream["key"]] = draw_trace( + screen, + rect, + sample_sets[stream["key"]], + view_span, + now, + y_min, + y_max, + color_tuple(stream["color"]), + ) + top = rect.bottom + 30 header = f"port={args.port} baud={args.baudrate} zoom={view_span:.1f}s" - if latest_acc_x is not None: - header += f" acc=({latest_acc_x}, {latest_acc_y}, {latest_acc_z})" - if latest_gyr_x is not None: - header += f" gyr=({latest_gyr_x}, {latest_gyr_y}, {latest_gyr_z})" + for graph in graphs: + graph_parts = [] + for stream in graph["streams"]: + latest_value = latest_values.get(stream["key"]) + if latest_value is not None: + graph_parts.append(f"{stream.get('label', stream['key'])}={latest_value}") + if graph_parts: + header += " | " + " ".join(graph_parts) screen.blit(font.render(header, True, TEXT), (10, 8)) screen.blit( small_font.render("mouse wheel: zoom horizontal axis", True, AXIS), (10, height - 24), ) - legend_items = ( - ("ACC X", ACC_X_TRACE), - ("ACC Y", ACC_Y_TRACE), - ("ACC Z", ACC_Z_TRACE), - ("GYR X", GYR_X_TRACE), - ("GYR Y", GYR_Y_TRACE), - ("GYR Z", GYR_Z_TRACE), - ) + legend_x = 10 legend_y = 34 - for label, color in legend_items: - pygame.draw.line( - screen, - color, - (legend_x, legend_y + 9), - (legend_x + 24, legend_y + 9), - 3, - ) - screen.blit(small_font.render(label, True, TEXT), (legend_x + 30, legend_y)) - legend_x += 105 + for graph in graphs: + for stream in graph["streams"]: + color = color_tuple(stream["color"]) + label = stream.get("label", stream["key"]) + pygame.draw.line( + screen, + color, + (legend_x, legend_y + 9), + (legend_x + 24, legend_y + 9), + 3, + ) + screen.blit(small_font.render(label, True, TEXT), (legend_x + 30, legend_y)) + legend_x += max(90, 42 + len(label) * 10) if error_message: error_surface = font.render(error_message, True, ERROR) - screen.blit(error_surface, (10, 56)) + screen.blit(error_surface, (10, height - 52)) pygame.display.flip() clock.tick(60) diff --git a/plot_config.json b/plot_config.json new file mode 100644 index 0000000..ad1ee35 --- /dev/null +++ b/plot_config.json @@ -0,0 +1,28 @@ +{ + "title": "BBot IMU", + "history_seconds": 10.0, + "sources": [ + { + "tag": "IMU", + "fields": ["acc_x", "acc_y", "acc_z", "gyr_x", "gyr_y", "gyr_z"] + } + ], + "graphs": [ + { + "title": "Accelerometer", + "streams": [ + { "key": "acc_x", "label": "ACC X", "color": [90, 170, 255] }, + { "key": "acc_y", "label": "ACC Y", "color": [80, 220, 140] }, + { "key": "acc_z", "label": "ACC Z", "color": [255, 200, 60] } + ] + }, + { + "title": "Gyroscope", + "streams": [ + { "key": "gyr_x", "label": "GYR X", "color": [240, 80, 80] }, + { "key": "gyr_y", "label": "GYR Y", "color": [220, 80, 220] }, + { "key": "gyr_z", "label": "GYR Z", "color": [240, 240, 240] } + ] + } + ] +}