Files
BBOT/plot.py

328 lines
10 KiB
Python

#!/usr/bin/env python3
import argparse
import re
import sys
import threading
import time
from collections import deque
from queue import Empty, Queue
try:
import pygame
except ImportError as exc:
raise SystemExit(
"Missing dependency: pygame. Install with `pip install pygame pyserial`."
) from exc
try:
import serial
except ImportError as exc:
raise SystemExit(
"Missing dependency: pyserial. Install with `pip install pygame pyserial`."
) from exc
IMU_RE = re.compile(
r"\[<IMU>\]:\s*(-?\d+)\s+(-?\d+)\s+(-?\d+)\s+(-?\d+)\s+(-?\d+)\s+(-?\d+)"
)
BACKGROUND = (18, 20, 24)
GRID = (48, 54, 64)
AXIS = (140, 148, 160)
ACC_X_TRACE = (90, 170, 255)
ACC_Y_TRACE = (80, 220, 140)
ACC_Z_TRACE = (255, 200, 60)
GYR_X_TRACE = (240, 80, 80)
GYR_Y_TRACE = (220, 80, 220)
GYR_Z_TRACE = (240, 240, 240)
TEXT = (230, 235, 240)
ERROR = (255, 110, 110)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Read BMI160 IMU values from a serial port and draw them live."
)
parser.add_argument("port", help="Serial port, for example /dev/ttyUSB0 or COM3")
parser.add_argument(
"-b",
"--baudrate",
type=int,
default=115200,
help="Serial baudrate (default: 115200)",
)
parser.add_argument(
"--history-seconds",
type=float,
default=10.0,
help="Initial visible history window in seconds (default: 10)",
)
parser.add_argument(
"--max-samples",
type=int,
default=30000,
help="Maximum number of samples to keep in memory (default: 30000)",
)
parser.add_argument(
"--width",
type=int,
default=1200,
help="Window width in pixels (default: 1200)",
)
parser.add_argument(
"--height",
type=int,
default=700,
help="Window height in pixels (default: 700)",
)
return parser.parse_args()
def serial_reader(port: str, baudrate: int, output: Queue) -> None:
try:
with serial.Serial(port, baudrate=baudrate, timeout=1) as ser:
while True:
raw_line = ser.readline()
if not raw_line:
continue
line = raw_line.decode("utf-8", errors="replace").strip()
timestamp = time.monotonic()
match = IMU_RE.search(line)
if not match:
continue
values = [int(group) for group in match.groups()]
for stream_name, value in zip(
("acc_x", "acc_y", "acc_z", "gyr_x", "gyr_y", "gyr_z"),
values,
):
output.put((stream_name, timestamp, value))
except serial.SerialException as exc:
output.put(("error", f"Serial error: {exc}"))
def drain_queue(
queue: Queue,
sample_sets: dict[str, deque[tuple[float, int]]],
) -> str | None:
error_message = None
while True:
try:
item = queue.get_nowait()
except Empty:
return error_message
if item[0] == "error":
error_message = item[1]
continue
stream_name, timestamp, value = item
sample_sets[stream_name].append((timestamp, value))
def draw_grid(
surface: pygame.Surface,
rect: pygame.Rect,
font: pygame.font.Font,
view_span: float,
y_min: float,
y_max: float,
title: str,
) -> None:
pygame.draw.rect(surface, GRID, rect, width=1)
for fraction in (0.0, 0.25, 0.5, 0.75, 1.0):
y = rect.top + round(fraction * rect.height)
pygame.draw.line(surface, GRID, (rect.left, y), (rect.right, y), 1)
value = y_max - fraction * (y_max - y_min)
label = f"{value:.0f}"
text = font.render(label, True, AXIS)
surface.blit(text, (10, y - text.get_height() // 2))
for fraction in (0.0, 0.25, 0.5, 0.75, 1.0):
x = rect.left + round(fraction * rect.width)
pygame.draw.line(surface, GRID, (x, rect.top), (x, rect.bottom), 1)
seconds_ago = view_span * (1.0 - fraction)
label = f"-{seconds_ago:.1f}s" if seconds_ago > 0.05 else "now"
text = font.render(label, True, AXIS)
surface.blit(text, (x - text.get_width() // 2, rect.bottom + 8))
surface.blit(font.render(title, True, TEXT), (rect.left + 8, rect.top + 8))
def draw_trace(
surface: pygame.Surface,
rect: pygame.Rect,
samples: deque[tuple[float, int]],
view_span: float,
now: float,
y_min: float,
y_max: float,
color: tuple[int, int, int],
) -> int | None:
visible_points = []
latest_value = None
y_span = max(y_max - y_min, 1e-6)
for timestamp, value in samples:
age = now - timestamp
if age < 0 or age > view_span:
continue
x = rect.right - (age / view_span) * rect.width
y = rect.bottom - ((value - y_min) / y_span) * rect.height
visible_points.append((round(x), round(y)))
latest_value = value
if len(visible_points) >= 2:
pygame.draw.lines(surface, color, False, visible_points, 2)
elif len(visible_points) == 1:
pygame.draw.circle(surface, color, visible_points[0], 2)
return latest_value
def visible_range(
sample_sets: dict[str, deque[tuple[float, int]]], stream_names: tuple[str, ...], now: float, view_span: float
) -> tuple[float, float]:
max_abs = 1.0
for stream_name in stream_names:
for timestamp, value in sample_sets[stream_name]:
age = now - timestamp
if 0 <= age <= view_span:
max_abs = max(max_abs, abs(value))
return -max_abs, max_abs
def zoom(view_span: float, direction: int, initial_span: float) -> float:
if direction > 0:
return max(1.0, view_span / 1.2)
return min(max(initial_span * 20.0, 60.0), view_span * 1.2)
def main() -> int:
args = parse_args()
sample_sets = {
"acc_x": deque(maxlen=args.max_samples),
"acc_y": deque(maxlen=args.max_samples),
"acc_z": deque(maxlen=args.max_samples),
"gyr_x": deque(maxlen=args.max_samples),
"gyr_y": deque(maxlen=args.max_samples),
"gyr_z": deque(maxlen=args.max_samples),
}
queue: Queue = Queue()
thread = threading.Thread(
target=serial_reader, args=(args.port, args.baudrate, queue), daemon=True
)
thread.start()
pygame.init()
pygame.display.set_caption(f"IMU stream: {args.port} @ {args.baudrate}")
screen = pygame.display.set_mode((args.width, args.height), pygame.RESIZABLE)
font = pygame.font.SysFont("monospace", 18)
small_font = pygame.font.SysFont("monospace", 14)
clock = pygame.time.Clock()
view_span = max(args.history_seconds, 1.0)
error_message = None
running = True
while running:
for event in pygame.event.get():
if event.type == pygame.QUIT:
running = False
elif event.type == pygame.MOUSEWHEEL:
view_span = zoom(view_span, event.y, args.history_seconds)
elif event.type == pygame.MOUSEBUTTONDOWN:
if event.button == 4:
view_span = zoom(view_span, 1, args.history_seconds)
elif event.button == 5:
view_span = zoom(view_span, -1, args.history_seconds)
queued_error = drain_queue(queue, sample_sets)
if queued_error is not None:
error_message = queued_error
width, height = screen.get_size()
now = time.monotonic()
plot_width = max(100, width - 100)
panel_height = max(100, (height - 140) // 2)
acc_rect = pygame.Rect(70, 40, plot_width, panel_height)
gyr_rect = pygame.Rect(70, acc_rect.bottom + 40, plot_width, panel_height)
acc_y_min, acc_y_max = visible_range(
sample_sets, ("acc_x", "acc_y", "acc_z"), now, view_span
)
gyr_y_min, gyr_y_max = visible_range(
sample_sets, ("gyr_x", "gyr_y", "gyr_z"), now, view_span
)
screen.fill(BACKGROUND)
draw_grid(screen, acc_rect, small_font, view_span, acc_y_min, acc_y_max, "Accelerometer")
draw_grid(screen, gyr_rect, small_font, view_span, gyr_y_min, gyr_y_max, "Gyroscope")
latest_acc_x = draw_trace(
screen, acc_rect, sample_sets["acc_x"], view_span, now, acc_y_min, acc_y_max, ACC_X_TRACE
)
latest_acc_y = draw_trace(
screen, acc_rect, sample_sets["acc_y"], view_span, now, acc_y_min, acc_y_max, ACC_Y_TRACE
)
latest_acc_z = draw_trace(
screen, acc_rect, sample_sets["acc_z"], view_span, now, acc_y_min, acc_y_max, ACC_Z_TRACE
)
latest_gyr_x = draw_trace(
screen, gyr_rect, sample_sets["gyr_x"], view_span, now, gyr_y_min, gyr_y_max, GYR_X_TRACE
)
latest_gyr_y = draw_trace(
screen, gyr_rect, sample_sets["gyr_y"], view_span, now, gyr_y_min, gyr_y_max, GYR_Y_TRACE
)
latest_gyr_z = draw_trace(
screen, gyr_rect, sample_sets["gyr_z"], view_span, now, gyr_y_min, gyr_y_max, GYR_Z_TRACE
)
header = f"port={args.port} baud={args.baudrate} zoom={view_span:.1f}s"
if latest_acc_x is not None:
header += f" acc=({latest_acc_x}, {latest_acc_y}, {latest_acc_z})"
if latest_gyr_x is not None:
header += f" gyr=({latest_gyr_x}, {latest_gyr_y}, {latest_gyr_z})"
screen.blit(font.render(header, True, TEXT), (10, 8))
screen.blit(
small_font.render("mouse wheel: zoom horizontal axis", True, AXIS),
(10, height - 24),
)
legend_items = (
("ACC X", ACC_X_TRACE),
("ACC Y", ACC_Y_TRACE),
("ACC Z", ACC_Z_TRACE),
("GYR X", GYR_X_TRACE),
("GYR Y", GYR_Y_TRACE),
("GYR Z", GYR_Z_TRACE),
)
legend_x = 10
legend_y = 34
for label, color in legend_items:
pygame.draw.line(
screen,
color,
(legend_x, legend_y + 9),
(legend_x + 24, legend_y + 9),
3,
)
screen.blit(small_font.render(label, True, TEXT), (legend_x + 30, legend_y))
legend_x += 105
if error_message:
error_surface = font.render(error_message, True, ERROR)
screen.blit(error_surface, (10, 56))
pygame.display.flip()
clock.tick(60)
pygame.quit()
return 0
if __name__ == "__main__":
raise SystemExit(main())