#!/usr/bin/env python3 import argparse import json import re import threading import time from collections import deque from pathlib import Path from queue import Empty, Queue try: import pygame except ImportError as exc: raise SystemExit( "Missing dependency: pygame. Install with `pip install pygame pyserial`." ) from exc try: import serial except ImportError as exc: raise SystemExit( "Missing dependency: pyserial. Install with `pip install pygame pyserial`." ) from exc BACKGROUND = (18, 20, 24) GRID = (48, 54, 64) AXIS = (140, 148, 160) TEXT = (230, 235, 240) ERROR = (255, 110, 110) LINE_RE = re.compile(r"\[<([^>]+)>\]:\s*(.*)") DEFAULT_CONFIG = { "title": "IMU stream", "history_seconds": 10.0, "graphs": [ { "title": "Accelerometer", "streams": [ {"key": "acc_x", "label": "ACC X", "color": [90, 170, 255]}, {"key": "acc_y", "label": "ACC Y", "color": [80, 220, 140]}, {"key": "acc_z", "label": "ACC Z", "color": [255, 200, 60]}, ], }, { "title": "Gyroscope", "streams": [ {"key": "gyr_x", "label": "GYR X", "color": [240, 80, 80]}, {"key": "gyr_y", "label": "GYR Y", "color": [220, 80, 220]}, {"key": "gyr_z", "label": "GYR Z", "color": [240, 240, 240]}, ], }, ], "sources": [ { "tag": "IMU", "fields": ["acc_x", "acc_y", "acc_z", "gyr_x", "gyr_y", "gyr_z"], } ], } def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Read tagged serial values and draw them live from a JSON config." ) parser.add_argument("port", help="Serial port, for example /dev/ttyUSB0 or COM3") parser.add_argument( "-b", "--baudrate", type=int, default=115200, help="Serial baudrate (default: 115200)", ) parser.add_argument( "-c", "--config", type=Path, default=None, help="Path to JSON plot config (default: built-in IMU config)", ) parser.add_argument( "--history-seconds", type=float, default=None, help="Initial visible history window in seconds (overrides config)", ) parser.add_argument( "--max-samples", type=int, default=30000, help="Maximum number of samples to keep in memory (default: 30000)", ) parser.add_argument( "--width", type=int, default=1200, help="Window width in pixels (default: 1200)", ) parser.add_argument( "--height", type=int, default=700, help="Window height in pixels (default: 700)", ) return parser.parse_args() def load_config(config_path: Path | None) -> dict: if config_path is None: return DEFAULT_CONFIG with config_path.open("r", encoding="utf-8") as handle: return json.load(handle) def validate_config(config: dict) -> tuple[dict[str, list[str]], list[dict], list[str]]: sources = config.get("sources") graphs = config.get("graphs") if not isinstance(sources, list) or not sources: raise SystemExit("Config must contain a non-empty `sources` list.") if not isinstance(graphs, list) or not graphs: raise SystemExit("Config must contain a non-empty `graphs` list.") tag_to_fields: dict[str, list[str]] = {} for source in sources: tag = source.get("tag") fields = source.get("fields") if not isinstance(tag, str) or not tag: raise SystemExit("Each source must have a non-empty string `tag`.") if not isinstance(fields, list) or not fields or not all(isinstance(field, str) for field in fields): raise SystemExit(f"Source `{tag}` must have a non-empty string list `fields`.") tag_to_fields[tag] = fields stream_keys: list[str] = [] for graph in graphs: streams = graph.get("streams") if not isinstance(streams, list) or not streams: raise SystemExit("Each graph must have a non-empty `streams` list.") for stream in streams: key = stream.get("key") if not isinstance(key, str) or not key: raise SystemExit("Each stream must have a non-empty string `key`.") stream_keys.append(key) return tag_to_fields, graphs, stream_keys def serial_reader(port: str, baudrate: int, tag_to_fields: dict[str, list[str]], output: Queue) -> None: try: with serial.Serial(port, baudrate=baudrate, timeout=1) as ser: while True: raw_line = ser.readline() if not raw_line: continue line = raw_line.decode("utf-8", errors="replace").strip() match = LINE_RE.search(line) if not match: continue tag, payload = match.groups() fields = tag_to_fields.get(tag) if fields is None: continue parts = payload.split() if len(parts) != len(fields): output.put( ( "error", f"Tag {tag} expected {len(fields)} values, got {len(parts)}: {payload}", ) ) continue try: values = [int(part) for part in parts] except ValueError: output.put(("error", f"Tag {tag} has non-integer payload: {payload}")) continue timestamp = time.monotonic() for key, value in zip(fields, values): output.put((key, timestamp, value)) except serial.SerialException as exc: output.put(("error", f"Serial error: {exc}")) def drain_queue( queue: Queue, sample_sets: dict[str, deque[tuple[float, int]]], ) -> str | None: error_message = None while True: try: item = queue.get_nowait() except Empty: return error_message if item[0] == "error": error_message = item[1] continue stream_name, timestamp, value = item if stream_name in sample_sets: sample_sets[stream_name].append((timestamp, value)) def draw_grid( surface: pygame.Surface, rect: pygame.Rect, font: pygame.font.Font, view_span: float, y_min: float, y_max: float, title: str, ) -> None: pygame.draw.rect(surface, GRID, rect, width=1) for fraction in (0.0, 0.25, 0.5, 0.75, 1.0): y = rect.top + round(fraction * rect.height) pygame.draw.line(surface, GRID, (rect.left, y), (rect.right, y), 1) value = y_max - fraction * (y_max - y_min) label = f"{value:.0f}" text = font.render(label, True, AXIS) surface.blit(text, (10, y - text.get_height() // 2)) for fraction in (0.0, 0.25, 0.5, 0.75, 1.0): x = rect.left + round(fraction * rect.width) pygame.draw.line(surface, GRID, (x, rect.top), (x, rect.bottom), 1) seconds_ago = view_span * (1.0 - fraction) label = f"-{seconds_ago:.1f}s" if seconds_ago > 0.05 else "now" text = font.render(label, True, AXIS) surface.blit(text, (x - text.get_width() // 2, rect.bottom + 8)) surface.blit(font.render(title, True, TEXT), (rect.left + 8, rect.top + 8)) def draw_trace( surface: pygame.Surface, rect: pygame.Rect, samples: deque[tuple[float, int]], view_span: float, now: float, y_min: float, y_max: float, color: tuple[int, int, int], ) -> int | None: visible_points = [] latest_value = None y_span = max(y_max - y_min, 1e-6) for timestamp, value in samples: age = now - timestamp if age < 0 or age > view_span: continue x = rect.right - (age / view_span) * rect.width y = rect.bottom - ((value - y_min) / y_span) * rect.height visible_points.append((round(x), round(y))) latest_value = value if len(visible_points) >= 2: pygame.draw.lines(surface, color, False, visible_points, 2) elif len(visible_points) == 1: pygame.draw.circle(surface, color, visible_points[0], 2) return latest_value def visible_range( sample_sets: dict[str, deque[tuple[float, int]]], stream_names: list[str], now: float, view_span: float, ) -> tuple[float, float]: max_abs = 1.0 for stream_name in stream_names: for timestamp, value in sample_sets[stream_name]: age = now - timestamp if 0 <= age <= view_span: max_abs = max(max_abs, abs(value)) return -max_abs, max_abs def zoom(view_span: float, direction: int, initial_span: float) -> float: if direction > 0: return max(1.0, view_span / 1.2) return min(max(initial_span * 20.0, 60.0), view_span * 1.2) def color_tuple(color_value: list[int]) -> tuple[int, int, int]: if not isinstance(color_value, list) or len(color_value) != 3: raise SystemExit("Each stream color must be a 3-element RGB list.") return tuple(int(component) for component in color_value) def main() -> int: args = parse_args() config = load_config(args.config) tag_to_fields, graphs, stream_keys = validate_config(config) sample_sets = {key: deque(maxlen=args.max_samples) for key in stream_keys} queue: Queue = Queue() thread = threading.Thread( target=serial_reader, args=(args.port, args.baudrate, tag_to_fields, queue), daemon=True, ) thread.start() pygame.init() pygame.display.set_caption(f"{config.get('title', 'Serial plotter')}: {args.port} @ {args.baudrate}") screen = pygame.display.set_mode((args.width, args.height), pygame.RESIZABLE) font = pygame.font.SysFont("monospace", 18) small_font = pygame.font.SysFont("monospace", 14) clock = pygame.time.Clock() initial_span = float(args.history_seconds or config.get("history_seconds", 10.0)) view_span = max(initial_span, 1.0) error_message = None running = True while running: for event in pygame.event.get(): if event.type == pygame.QUIT: running = False elif event.type == pygame.MOUSEWHEEL: view_span = zoom(view_span, event.y, initial_span) elif event.type == pygame.MOUSEBUTTONDOWN: if event.button == 4: view_span = zoom(view_span, 1, initial_span) elif event.button == 5: view_span = zoom(view_span, -1, initial_span) queued_error = drain_queue(queue, sample_sets) if queued_error is not None: error_message = queued_error width, height = screen.get_size() now = time.monotonic() graph_count = len(graphs) plot_width = max(100, width - 100) available_height = max(100, height - 120 - (graph_count - 1) * 30) panel_height = max(100, available_height // graph_count) screen.fill(BACKGROUND) latest_values: dict[str, int | None] = {} top = 40 for graph in graphs: rect = pygame.Rect(70, top, plot_width, panel_height) stream_defs = graph["streams"] stream_names = [stream["key"] for stream in stream_defs] y_min, y_max = visible_range(sample_sets, stream_names, now, view_span) draw_grid( screen, rect, small_font, view_span, y_min, y_max, graph.get("title", "Graph"), ) for stream in stream_defs: latest_values[stream["key"]] = draw_trace( screen, rect, sample_sets[stream["key"]], view_span, now, y_min, y_max, color_tuple(stream["color"]), ) top = rect.bottom + 30 header = f"port={args.port} baud={args.baudrate} zoom={view_span:.1f}s" for graph in graphs: graph_parts = [] for stream in graph["streams"]: latest_value = latest_values.get(stream["key"]) if latest_value is not None: graph_parts.append(f"{stream.get('label', stream['key'])}={latest_value}") if graph_parts: header += " | " + " ".join(graph_parts) screen.blit(font.render(header, True, TEXT), (10, 8)) screen.blit( small_font.render("mouse wheel: zoom horizontal axis", True, AXIS), (10, height - 24), ) legend_x = 10 legend_y = 34 for graph in graphs: for stream in graph["streams"]: color = color_tuple(stream["color"]) label = stream.get("label", stream["key"]) pygame.draw.line( screen, color, (legend_x, legend_y + 9), (legend_x + 24, legend_y + 9), 3, ) screen.blit(small_font.render(label, True, TEXT), (legend_x + 30, legend_y)) legend_x += max(90, 42 + len(label) * 10) if error_message: error_surface = font.render(error_message, True, ERROR) screen.blit(error_surface, (10, height - 52)) pygame.display.flip() clock.tick(60) pygame.quit() return 0 if __name__ == "__main__": raise SystemExit(main())