Files
BondGraph/suggest_junction_signs.py
2026-03-09 20:46:06 +01:00

186 lines
5.4 KiB
Python

#!/usr/bin/env python3
"""
Suggest BondGraph junction s-arrays from connect argument order.
Rule used:
- connect(a, b) means a is source and b is sink.
- Junction port in first argument => +1 vote
- Junction port in second argument => -1 vote
This is a heuristic helper. It does not modify files.
"""
from __future__ import annotations
import argparse
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Optional, Tuple
CONNECT_RE = re.compile(r"connect\s*\(\s*([^,]+?)\s*,\s*([^)]+?)\s*\)\s*(?:annotation\s*\(|;)", re.S)
JUNCTION_DECL_RE = re.compile(
r"\b(BondGraph(?:\._2D)?\.J[01])\s+([A-Za-z_]\w*)\s*(\((.*?)\))?\s*annotation\s*\(",
re.S,
)
PORT_RE = re.compile(r"^([A-Za-z_]\w*)\.P\[(\d+)\]$")
N_RE = re.compile(r"\bN\s*=\s*(\d+)")
S_RE = re.compile(r"\bs\s*=\s*\{([^}]*)\}")
@dataclass
class Junction:
cls: str
name: str
n: Optional[int]
current_s: Optional[List[str]]
votes: Dict[int, List[int]] = field(default_factory=dict)
def add_vote(self, port_idx: int, vote: int) -> None:
self.votes.setdefault(port_idx, []).append(vote)
def parse_current_s(params: str) -> Optional[List[str]]:
m = S_RE.search(params)
if not m:
return None
return [x.strip() for x in m.group(1).split(",") if x.strip()]
def parse_junctions(text: str) -> Dict[str, Junction]:
result: Dict[str, Junction] = {}
for m in JUNCTION_DECL_RE.finditer(text):
cls, name, _, params = m.groups()
params = params or ""
n_m = N_RE.search(params)
n = int(n_m.group(1)) if n_m else None
result[name] = Junction(cls=cls, name=name, n=n, current_s=parse_current_s(params))
return result
def normalize_endpoint(ep: str) -> str:
return re.sub(r"\s+", "", ep)
def extract_junction_port(endpoint: str) -> Optional[Tuple[str, int]]:
m = PORT_RE.match(endpoint)
if not m:
return None
return m.group(1), int(m.group(2))
def suggest_sign(votes: List[int]) -> str:
pos = sum(1 for v in votes if v < 0)
neg = sum(1 for v in votes if v > 0)
if pos and neg:
return "?"
if pos:
return "-1"
if neg:
return "+1"
return "0"
def fmt_current(j: Junction, idx: int) -> str:
if not j.current_s:
return "n/a"
if idx - 1 < len(j.current_s):
return j.current_s[idx - 1]
return "n/a"
def as_int_sign(token: str) -> Optional[int]:
t = token.strip()
if t in {"+1", "1", "1.0"}:
return 1
if t in {"-1", "-1.0"}:
return -1
return None
def main() -> int:
ap = argparse.ArgumentParser(description=__doc__)
ap.add_argument("modelica_file", type=Path)
ap.add_argument(
"--emit-patch",
action="store_true",
help="Emit patch-style s-array replacements per junction.",
)
args = ap.parse_args()
text = args.modelica_file.read_text(encoding="utf-8")
junctions = parse_junctions(text)
for m in CONNECT_RE.finditer(text):
left, right = normalize_endpoint(m.group(1)), normalize_endpoint(m.group(2))
l = extract_junction_port(left)
if l and l[0] in junctions:
junctions[l[0]].add_vote(l[1], +1)
r = extract_junction_port(right)
if r and r[0] in junctions:
junctions[r[0]].add_vote(r[1], -1)
print(f"File: {args.modelica_file}")
print("Rule: connect(a,b) => a:-1, b:+1")
print("")
patch_lines: List[str] = []
for name in sorted(junctions):
j = junctions[name]
max_idx = j.n or (max(j.votes) if j.votes else 0)
suggested = []
conflicts = []
for i in range(1, max_idx + 1):
votes = j.votes.get(i, [])
s = suggest_sign(votes)
suggested.append(s)
if s == "?":
conflicts.append(i)
current_s = "{" + ", ".join(j.current_s) + "}" if j.current_s else "n/a"
sug_s = "{" + ", ".join(suggested) + "}" if suggested else "{}"
print(f"{j.name} ({j.cls}, N={j.n if j.n is not None else 'n/a'})")
print(f" current s: {current_s}")
print(f" suggested s: {sug_s}")
for i in range(1, max_idx + 1):
votes = j.votes.get(i, [])
vote_str = ",".join("+1" if v > 0 else "-1" for v in votes) if votes else "-"
print(f" P[{i}]: current={fmt_current(j, i)} votes={vote_str} -> {suggested[i-1]}")
if conflicts:
conflict_ports = ", ".join(f"P[{i}]" for i in conflicts)
print(f" conflicts: {conflict_ports} (used as both source and sink)")
print("")
if args.emit_patch and max_idx > 0:
suggested_int = []
current_int = []
ok = True
for i in range(1, max_idx + 1):
s = suggested[i - 1]
if s == "?":
ok = False
break
suggested_int.append("1" if s == "+1" else "-1")
current_int.append(fmt_current(j, i))
if ok:
old_txt = "{" + ", ".join(current_int) + "}"
new_txt = "{" + ", ".join(suggested_int) + "}"
patch_lines.append(f"- {j.name}.s = {old_txt}")
patch_lines.append(f"+ {j.name}.s = {new_txt}")
if args.emit_patch:
print("Suggested Patch")
for line in patch_lines:
print(line)
return 0
if __name__ == "__main__":
raise SystemExit(main())