437 lines
14 KiB
Python
437 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
import argparse
|
|
import json
|
|
import re
|
|
import threading
|
|
import time
|
|
from collections import deque
|
|
from pathlib import Path
|
|
from queue import Empty, Queue
|
|
|
|
try:
|
|
import pygame
|
|
except ImportError as exc:
|
|
raise SystemExit(
|
|
"Missing dependency: pygame. Install with `pip install pygame pyserial`."
|
|
) from exc
|
|
|
|
try:
|
|
import serial
|
|
except ImportError as exc:
|
|
raise SystemExit(
|
|
"Missing dependency: pyserial. Install with `pip install pygame pyserial`."
|
|
) from exc
|
|
|
|
|
|
BACKGROUND = (18, 20, 24)
|
|
GRID = (48, 54, 64)
|
|
AXIS = (140, 148, 160)
|
|
TEXT = (230, 235, 240)
|
|
ERROR = (255, 110, 110)
|
|
LINE_RE = re.compile(r"\[<([^>]+)>\]:\s*(.*)")
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Read tagged serial values and draw them live from a JSON config."
|
|
)
|
|
parser.add_argument("port", help="Serial port, for example /dev/ttyUSB0 or COM3")
|
|
parser.add_argument(
|
|
"-b",
|
|
"--baudrate",
|
|
type=int,
|
|
default=115200,
|
|
help="Serial baudrate (default: 115200)",
|
|
)
|
|
parser.add_argument(
|
|
"-c",
|
|
"--config",
|
|
type=Path,
|
|
default=None,
|
|
help="Path to JSON plot config (default: ./plot_config.json)",
|
|
)
|
|
parser.add_argument(
|
|
"--history-seconds",
|
|
type=float,
|
|
default=None,
|
|
help="Initial visible history window in seconds (overrides config)",
|
|
)
|
|
parser.add_argument(
|
|
"--max-samples",
|
|
type=int,
|
|
default=30000,
|
|
help="Maximum number of samples to keep in memory (default: 30000)",
|
|
)
|
|
parser.add_argument(
|
|
"--width",
|
|
type=int,
|
|
default=1200,
|
|
help="Window width in pixels (default: 1200)",
|
|
)
|
|
parser.add_argument(
|
|
"--height",
|
|
type=int,
|
|
default=700,
|
|
help="Window height in pixels (default: 700)",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def load_config(config_path: Path | None) -> dict:
|
|
if config_path is None:
|
|
config_path = Path("plot_config.json")
|
|
|
|
if not config_path.exists():
|
|
raise SystemExit(f"Config file not found: {config_path}")
|
|
|
|
with config_path.open("r", encoding="utf-8") as handle:
|
|
return json.load(handle)
|
|
|
|
|
|
def validate_config(config: dict) -> tuple[dict[str, list[str]], list[dict], list[str]]:
|
|
sources = config.get("sources")
|
|
graphs = config.get("graphs")
|
|
if not isinstance(sources, list) or not sources:
|
|
raise SystemExit("Config must contain a non-empty `sources` list.")
|
|
if not isinstance(graphs, list) or not graphs:
|
|
raise SystemExit("Config must contain a non-empty `graphs` list.")
|
|
|
|
tag_to_fields: dict[str, list[str]] = {}
|
|
for source in sources:
|
|
tag = source.get("tag")
|
|
fields = source.get("fields")
|
|
if not isinstance(tag, str) or not tag:
|
|
raise SystemExit("Each source must have a non-empty string `tag`.")
|
|
if not isinstance(fields, list) or not fields or not all(isinstance(field, str) for field in fields):
|
|
raise SystemExit(f"Source `{tag}` must have a non-empty string list `fields`.")
|
|
tag_to_fields[tag] = fields
|
|
|
|
stream_keys: list[str] = []
|
|
for graph in graphs:
|
|
streams = graph.get("streams")
|
|
if not isinstance(streams, list) or not streams:
|
|
raise SystemExit("Each graph must have a non-empty `streams` list.")
|
|
for stream in streams:
|
|
key = stream.get("key")
|
|
if not isinstance(key, str) or not key:
|
|
raise SystemExit("Each stream must have a non-empty string `key`.")
|
|
interpreter = stream.get("interpreter", "int")
|
|
if interpreter not in ("int", "fixed"):
|
|
raise SystemExit(f"Unsupported interpreter for stream `{key}`: {interpreter}")
|
|
if interpreter == "fixed":
|
|
frac_bits = stream.get("frac_bits")
|
|
if not isinstance(frac_bits, int) or frac_bits < 0:
|
|
raise SystemExit(f"Fixed-point stream `{key}` requires integer `frac_bits`.")
|
|
stream_keys.append(key)
|
|
|
|
return tag_to_fields, graphs, stream_keys
|
|
|
|
|
|
def interpret_value(raw_value: int, stream: dict) -> float:
|
|
interpreter = stream.get("interpreter", "int")
|
|
if interpreter == "int":
|
|
return float(raw_value)
|
|
if interpreter == "fixed":
|
|
frac_bits = stream.get("frac_bits")
|
|
if not isinstance(frac_bits, int) or frac_bits < 0:
|
|
raise SystemExit("Fixed-point streams require a non-negative integer `frac_bits`.")
|
|
return raw_value / float(1 << frac_bits)
|
|
|
|
raise SystemExit(f"Unsupported interpreter: {interpreter}")
|
|
|
|
|
|
def serial_reader(port: str, baudrate: int, tag_to_fields: dict[str, list[str]], output: Queue) -> None:
|
|
try:
|
|
with serial.Serial(port, baudrate=baudrate, timeout=1) as ser:
|
|
while True:
|
|
raw_line = ser.readline()
|
|
if not raw_line:
|
|
continue
|
|
|
|
line = raw_line.decode("utf-8", errors="replace").strip()
|
|
match = LINE_RE.search(line)
|
|
if not match:
|
|
continue
|
|
|
|
tag, payload = match.groups()
|
|
fields = tag_to_fields.get(tag)
|
|
if fields is None:
|
|
continue
|
|
|
|
parts = payload.split()
|
|
if len(parts) != len(fields):
|
|
output.put(
|
|
(
|
|
"error",
|
|
f"Tag {tag} expected {len(fields)} values, got {len(parts)}: {payload}",
|
|
)
|
|
)
|
|
continue
|
|
|
|
try:
|
|
values = [int(part) for part in parts]
|
|
except ValueError:
|
|
output.put(("error", f"Tag {tag} has non-integer payload: {payload}"))
|
|
continue
|
|
|
|
timestamp = time.monotonic()
|
|
for key, value in zip(fields, values):
|
|
output.put((key, timestamp, value))
|
|
except serial.SerialException as exc:
|
|
output.put(("error", f"Serial error: {exc}"))
|
|
|
|
|
|
def drain_queue(
|
|
queue: Queue,
|
|
sample_sets: dict[str, deque[tuple[float, float]]],
|
|
stream_defs: dict[str, dict],
|
|
) -> str | None:
|
|
error_message = None
|
|
while True:
|
|
try:
|
|
item = queue.get_nowait()
|
|
except Empty:
|
|
return error_message
|
|
|
|
if item[0] == "error":
|
|
error_message = item[1]
|
|
continue
|
|
|
|
stream_name, timestamp, value = item
|
|
if stream_name in sample_sets:
|
|
sample_sets[stream_name].append(
|
|
(timestamp, interpret_value(value, stream_defs[stream_name]))
|
|
)
|
|
|
|
|
|
def draw_grid(
|
|
surface: pygame.Surface,
|
|
rect: pygame.Rect,
|
|
font: pygame.font.Font,
|
|
view_span: float,
|
|
y_min: float,
|
|
y_max: float,
|
|
title: str,
|
|
) -> None:
|
|
pygame.draw.rect(surface, GRID, rect, width=1)
|
|
|
|
for fraction in (0.0, 0.25, 0.5, 0.75, 1.0):
|
|
y = rect.top + round(fraction * rect.height)
|
|
pygame.draw.line(surface, GRID, (rect.left, y), (rect.right, y), 1)
|
|
value = y_max - fraction * (y_max - y_min)
|
|
if abs(y_max - y_min) <= 4.0:
|
|
label = f"{value:.2f}".rstrip("0").rstrip(".")
|
|
elif abs(y_max - y_min) <= 40.0:
|
|
label = f"{value:.1f}".rstrip("0").rstrip(".")
|
|
else:
|
|
label = f"{value:.0f}"
|
|
text = font.render(label, True, AXIS)
|
|
surface.blit(text, (10, y - text.get_height() // 2))
|
|
|
|
for fraction in (0.0, 0.25, 0.5, 0.75, 1.0):
|
|
x = rect.left + round(fraction * rect.width)
|
|
pygame.draw.line(surface, GRID, (x, rect.top), (x, rect.bottom), 1)
|
|
seconds_ago = view_span * (1.0 - fraction)
|
|
label = f"-{seconds_ago:.1f}s" if seconds_ago > 0.05 else "now"
|
|
text = font.render(label, True, AXIS)
|
|
surface.blit(text, (x - text.get_width() // 2, rect.bottom + 8))
|
|
|
|
surface.blit(font.render(title, True, TEXT), (rect.left + 8, rect.top + 8))
|
|
|
|
|
|
def draw_graph_info(
|
|
surface: pygame.Surface,
|
|
rect: pygame.Rect,
|
|
font: pygame.font.Font,
|
|
graph_streams: list[dict],
|
|
latest_values: dict[str, float | None],
|
|
) -> None:
|
|
line_height = font.get_height() + 6
|
|
y = rect.top + 8
|
|
for stream in graph_streams:
|
|
color = color_tuple(stream["color"])
|
|
label = stream.get("label", stream["key"])
|
|
latest_value = latest_values.get(stream["key"])
|
|
value_text = "--" if latest_value is None else f"{latest_value:.3f}"
|
|
pygame.draw.line(
|
|
surface,
|
|
color,
|
|
(rect.left, y + 8),
|
|
(rect.left + 24, y + 8),
|
|
3,
|
|
)
|
|
text = font.render(f"{label}: {value_text}", True, TEXT)
|
|
surface.blit(text, (rect.left + 32, y))
|
|
y += line_height
|
|
|
|
|
|
def draw_trace(
|
|
surface: pygame.Surface,
|
|
rect: pygame.Rect,
|
|
samples: deque[tuple[float, float]],
|
|
view_span: float,
|
|
now: float,
|
|
y_min: float,
|
|
y_max: float,
|
|
color: tuple[int, int, int],
|
|
) -> float | None:
|
|
visible_points = []
|
|
latest_value = None
|
|
y_span = max(y_max - y_min, 1e-6)
|
|
|
|
for timestamp, value in samples:
|
|
age = now - timestamp
|
|
if age < 0 or age > view_span:
|
|
continue
|
|
x = rect.right - (age / view_span) * rect.width
|
|
y = rect.bottom - ((value - y_min) / y_span) * rect.height
|
|
visible_points.append((round(x), round(y)))
|
|
latest_value = value
|
|
|
|
if len(visible_points) >= 2:
|
|
pygame.draw.lines(surface, color, False, visible_points, 2)
|
|
elif len(visible_points) == 1:
|
|
pygame.draw.circle(surface, color, visible_points[0], 2)
|
|
|
|
return latest_value
|
|
|
|
|
|
def visible_range(
|
|
sample_sets: dict[str, deque[tuple[float, float]]],
|
|
stream_names: list[str],
|
|
now: float,
|
|
view_span: float,
|
|
) -> tuple[float, float]:
|
|
max_abs = 1.0
|
|
|
|
for stream_name in stream_names:
|
|
for timestamp, value in sample_sets[stream_name]:
|
|
age = now - timestamp
|
|
if 0 <= age <= view_span:
|
|
max_abs = max(max_abs, abs(value))
|
|
|
|
return -max_abs, max_abs
|
|
|
|
|
|
def zoom(view_span: float, direction: int, initial_span: float) -> float:
|
|
if direction > 0:
|
|
return max(1.0, view_span / 1.2)
|
|
return min(max(initial_span * 20.0, 60.0), view_span * 1.2)
|
|
|
|
|
|
def color_tuple(color_value: list[int]) -> tuple[int, int, int]:
|
|
if not isinstance(color_value, list) or len(color_value) != 3:
|
|
raise SystemExit("Each stream color must be a 3-element RGB list.")
|
|
return tuple(int(component) for component in color_value)
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
config = load_config(args.config)
|
|
tag_to_fields, graphs, stream_keys = validate_config(config)
|
|
stream_map = {
|
|
stream["key"]: stream
|
|
for graph in graphs
|
|
for stream in graph["streams"]
|
|
}
|
|
sample_sets = {key: deque(maxlen=args.max_samples) for key in stream_keys}
|
|
queue: Queue = Queue()
|
|
|
|
thread = threading.Thread(
|
|
target=serial_reader,
|
|
args=(args.port, args.baudrate, tag_to_fields, queue),
|
|
daemon=True,
|
|
)
|
|
thread.start()
|
|
|
|
pygame.init()
|
|
pygame.display.set_caption(f"{config.get('title', 'Serial plotter')}: {args.port} @ {args.baudrate}")
|
|
screen = pygame.display.set_mode((args.width, args.height), pygame.RESIZABLE)
|
|
font = pygame.font.SysFont("monospace", 18)
|
|
small_font = pygame.font.SysFont("monospace", 14)
|
|
clock = pygame.time.Clock()
|
|
|
|
initial_span = float(args.history_seconds or config.get("history_seconds", 10.0))
|
|
view_span = max(initial_span, 1.0)
|
|
error_message = None
|
|
running = True
|
|
|
|
while running:
|
|
for event in pygame.event.get():
|
|
if event.type == pygame.QUIT:
|
|
running = False
|
|
elif event.type == pygame.MOUSEWHEEL:
|
|
view_span = zoom(view_span, event.y, initial_span)
|
|
elif event.type == pygame.MOUSEBUTTONDOWN:
|
|
if event.button == 4:
|
|
view_span = zoom(view_span, 1, initial_span)
|
|
elif event.button == 5:
|
|
view_span = zoom(view_span, -1, initial_span)
|
|
|
|
queued_error = drain_queue(queue, sample_sets, stream_map)
|
|
if queued_error is not None:
|
|
error_message = queued_error
|
|
|
|
width, height = screen.get_size()
|
|
now = time.monotonic()
|
|
graph_count = len(graphs)
|
|
left_margin = 70
|
|
right_margin = 20
|
|
info_width = min(280, max(180, width // 4))
|
|
plot_width = max(100, width - left_margin - right_margin - info_width - 16)
|
|
available_height = max(100, height - 120 - (graph_count - 1) * 30)
|
|
panel_height = max(100, available_height // graph_count)
|
|
|
|
screen.fill(BACKGROUND)
|
|
|
|
latest_values: dict[str, float | None] = {}
|
|
top = 40
|
|
for graph in graphs:
|
|
rect = pygame.Rect(left_margin, top, plot_width, panel_height)
|
|
info_rect = pygame.Rect(rect.right + 16, top, info_width, panel_height)
|
|
graph_streams = graph["streams"]
|
|
stream_names = [stream["key"] for stream in graph_streams]
|
|
y_min, y_max = visible_range(sample_sets, stream_names, now, view_span)
|
|
draw_grid(
|
|
screen,
|
|
rect,
|
|
small_font,
|
|
view_span,
|
|
y_min,
|
|
y_max,
|
|
graph.get("title", "Graph"),
|
|
)
|
|
for stream in graph_streams:
|
|
latest_values[stream["key"]] = draw_trace(
|
|
screen,
|
|
rect,
|
|
sample_sets[stream["key"]],
|
|
view_span,
|
|
now,
|
|
y_min,
|
|
y_max,
|
|
color_tuple(stream["color"]),
|
|
)
|
|
draw_graph_info(screen, info_rect, small_font, graph_streams, latest_values)
|
|
top = rect.bottom + 30
|
|
|
|
header = f"port={args.port} baud={args.baudrate} zoom={view_span:.1f}s"
|
|
screen.blit(font.render(header, True, TEXT), (10, 8))
|
|
screen.blit(
|
|
small_font.render("mouse wheel: zoom horizontal axis", True, AXIS),
|
|
(10, height - 24),
|
|
)
|
|
|
|
if error_message:
|
|
error_surface = font.render(error_message, True, ERROR)
|
|
screen.blit(error_surface, (10, height - 52))
|
|
|
|
pygame.display.flip()
|
|
clock.tick(60)
|
|
|
|
pygame.quit()
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|