Normalized values in Q12

This commit is contained in:
2026-03-20 15:43:14 +01:00
parent 00e354c2b3
commit 1b6bd537ad
5 changed files with 123 additions and 26 deletions

55
plot.py
View File

@@ -114,11 +114,31 @@ def validate_config(config: dict) -> tuple[dict[str, list[str]], list[dict], lis
key = stream.get("key")
if not isinstance(key, str) or not key:
raise SystemExit("Each stream must have a non-empty string `key`.")
interpreter = stream.get("interpreter", "int")
if interpreter not in ("int", "fixed"):
raise SystemExit(f"Unsupported interpreter for stream `{key}`: {interpreter}")
if interpreter == "fixed":
frac_bits = stream.get("frac_bits")
if not isinstance(frac_bits, int) or frac_bits < 0:
raise SystemExit(f"Fixed-point stream `{key}` requires integer `frac_bits`.")
stream_keys.append(key)
return tag_to_fields, graphs, stream_keys
def interpret_value(raw_value: int, stream: dict) -> float:
interpreter = stream.get("interpreter", "int")
if interpreter == "int":
return float(raw_value)
if interpreter == "fixed":
frac_bits = stream.get("frac_bits")
if not isinstance(frac_bits, int) or frac_bits < 0:
raise SystemExit("Fixed-point streams require a non-negative integer `frac_bits`.")
return raw_value / float(1 << frac_bits)
raise SystemExit(f"Unsupported interpreter: {interpreter}")
def serial_reader(port: str, baudrate: int, tag_to_fields: dict[str, list[str]], output: Queue) -> None:
try:
with serial.Serial(port, baudrate=baudrate, timeout=1) as ser:
@@ -162,7 +182,8 @@ def serial_reader(port: str, baudrate: int, tag_to_fields: dict[str, list[str]],
def drain_queue(
queue: Queue,
sample_sets: dict[str, deque[tuple[float, int]]],
sample_sets: dict[str, deque[tuple[float, float]]],
stream_defs: dict[str, dict],
) -> str | None:
error_message = None
while True:
@@ -177,7 +198,9 @@ def drain_queue(
stream_name, timestamp, value = item
if stream_name in sample_sets:
sample_sets[stream_name].append((timestamp, value))
sample_sets[stream_name].append(
(timestamp, interpret_value(value, stream_defs[stream_name]))
)
def draw_grid(
@@ -195,7 +218,12 @@ def draw_grid(
y = rect.top + round(fraction * rect.height)
pygame.draw.line(surface, GRID, (rect.left, y), (rect.right, y), 1)
value = y_max - fraction * (y_max - y_min)
label = f"{value:.0f}"
if abs(y_max - y_min) <= 4.0:
label = f"{value:.2f}".rstrip("0").rstrip(".")
elif abs(y_max - y_min) <= 40.0:
label = f"{value:.1f}".rstrip("0").rstrip(".")
else:
label = f"{value:.0f}"
text = font.render(label, True, AXIS)
surface.blit(text, (10, y - text.get_height() // 2))
@@ -213,13 +241,13 @@ def draw_grid(
def draw_trace(
surface: pygame.Surface,
rect: pygame.Rect,
samples: deque[tuple[float, int]],
samples: deque[tuple[float, float]],
view_span: float,
now: float,
y_min: float,
y_max: float,
color: tuple[int, int, int],
) -> int | None:
) -> float | None:
visible_points = []
latest_value = None
y_span = max(y_max - y_min, 1e-6)
@@ -242,7 +270,7 @@ def draw_trace(
def visible_range(
sample_sets: dict[str, deque[tuple[float, int]]],
sample_sets: dict[str, deque[tuple[float, float]]],
stream_names: list[str],
now: float,
view_span: float,
@@ -274,6 +302,11 @@ def main() -> int:
args = parse_args()
config = load_config(args.config)
tag_to_fields, graphs, stream_keys = validate_config(config)
stream_map = {
stream["key"]: stream
for graph in graphs
for stream in graph["streams"]
}
sample_sets = {key: deque(maxlen=args.max_samples) for key in stream_keys}
queue: Queue = Queue()
@@ -308,7 +341,7 @@ def main() -> int:
elif event.button == 5:
view_span = zoom(view_span, -1, initial_span)
queued_error = drain_queue(queue, sample_sets)
queued_error = drain_queue(queue, sample_sets, stream_map)
if queued_error is not None:
error_message = queued_error
@@ -325,8 +358,8 @@ def main() -> int:
top = 40
for graph in graphs:
rect = pygame.Rect(70, top, plot_width, panel_height)
stream_defs = graph["streams"]
stream_names = [stream["key"] for stream in stream_defs]
graph_streams = graph["streams"]
stream_names = [stream["key"] for stream in graph_streams]
y_min, y_max = visible_range(sample_sets, stream_names, now, view_span)
draw_grid(
screen,
@@ -337,7 +370,7 @@ def main() -> int:
y_max,
graph.get("title", "Graph"),
)
for stream in stream_defs:
for stream in graph_streams:
latest_values[stream["key"]] = draw_trace(
screen,
rect,
@@ -356,7 +389,7 @@ def main() -> int:
for stream in graph["streams"]:
latest_value = latest_values.get(stream["key"])
if latest_value is not None:
graph_parts.append(f"{stream.get('label', stream['key'])}={latest_value}")
graph_parts.append(f"{stream.get('label', stream['key'])}={latest_value:.3f}")
if graph_parts:
header += " | " + " ".join(graph_parts)
screen.blit(font.render(header, True, TEXT), (10, 8))