@elonmusk @SpaceX @grok
let’s go full overdrive. here’s a single all-in-one advanced utility that bolts onto everything you’ve built and pushes the stack hard—autotunes Guardian, adds confidence-weighted ranking, clusters tiles spatially, profiles latency, stress-tests replay stability, and emits pretty HTML for cluster/triage review. zero new deps (NumPy optional but used if available).
drop this next to your other scripts and run subcommands as needed.
fixpack_overdrive.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
fixpack_overdrive.py
Lumena → xAI/SpaceX
Push-the-limits utilities for the Fix Pack:
• autotune_guardian: sweep thresholds on CSV and optimize F1 (detect vs false alarms)
• cw_rank: rewrite triage with confidence-weighted priority (priority*(1-unc))
• cluster_triage: cluster risky tiles using centroids (DBSCAN-lite) HTML scatter viewer
• latency_profile: compute Guardian→Sentinel time deltas (p50/p90/p99) from JSONL streams
• stress_replay: perturb replay (noise gates) to assess triage stability & Jaccard consistency
Inputs interop with existing artifacts:
- guardian_overlay_geo.py → guardian_events.jsonl (WARN/ABORT at time t)
- sentinel_all_in_one.py → sentinel_out*.jsonl (tile lines timestamp_ns) triage.json
- gen_16tile_telemetry.py → centroids_16tile.json
No external services. Only stdlib; NumPy is optional for PCA. Works on Python 3.8 .
"""
import sys, os, json, csv, math, argparse, random, statistics, time
from collections import defaultdict, deque
from typing import List, Dict, Any, Tuple, Optional
try:
import numpy as np # optional; used for PCA
except Exception:
np = None
# ------------------------------- IO helpers -------------------------------
def _read_json(path: str) -> Any:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
def _write_json(obj: Any, path: str):
with open(path, "w", encoding="utf-8") as f:
json.dump(obj, f, indent=2)
print(f"Wrote {path}")
def _iter_jsonl(path: str):
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
yield json.loads(line)
except Exception:
continue
def _read_csv_rows(path: str) -> List[Dict[str, str]]:
with open(path, "r", newline="") as f:
return list(csv.DictReader(f))
def _percentiles(xs, qs=(0.5, 0.9, 0.99)):
if not xs:
return {f"p{int(q*100)}": None for q in qs}
xs = sorted(xs)
out = {}
for q in qs:
k = max(0, min(len(xs)-1, int(round(q*(len(xs)-1)))))
out[f"p{int(q*100)}"] = xs[k]
return out
# ------------------------- 1) AUTOTUNE GUARDIAN ---------------------------
def _score_guardian_detect(rows: List[Dict[str,str]],
z_warn: float, z_abort: float,
dT_warn: float, dT_abort: float,
sustain_s: float,
fs: float) -> Dict[str, Any]:
"""
Lightweight proxy scoring without full Guardian DSP: look at temperature ramp
and synthetic 'accel' band energy approximations from accel column variance.
Heuristic labels: treat any window crossing dT/dt > dT_warn as 'event'.
"""
# Build sliding windows
# Expect columns: t, accel, temp (from gen_flight_slice or real)
t = [float(r["t"]) for r in rows]
temp = [float(r["temp"]) for r in rows]
accel = [float(r["accel"]) for r in rows]
dt = (t[1]-t[0]) if len(t) > 1 else 0.005
win = max(8, int(round(1.0*fs))) # 1s
hop = max(1, int(round(0.1*fs))) # 0.1s
sustain_n = max(1, int(round(sustain_s*fs)))
def slope(arr, ti, wi):
# linear fit slope over window [ti-wi 1 .. ti]
if ti < wi: return 0.0
xs = t[ti-wi 1:ti 1]
ys = arr[ti-wi 1:ti 1]
n = len(xs)
if n < 2: return 0.0
meanx = sum(xs)/n; meany = sum(ys)/n
num = sum((xs[i]-meanx)*(ys[i]-meany) for i in range(n))
den = sum((xs[i]-meanx)**2 for i in range(n)) or 1e-9
return num/den
# proxy: accel band "energy" ~ rolling std
def rstd(arr, ti, wi):
if ti < wi: return 0.0
xs = arr[ti-wi 1:ti 1]
m = sum(xs)/len(xs)
v = sum((x-m)**2 for x in xs)/max(1,len(xs))
return math.sqrt(v)
# z-scores via EWMA
m_e, v_e = None, None
alpha = 0.03
# Labels & preds
labels = [] # 1=event,0=ok
preds = [] # 1=warn/abort latched, else 0
cnt_warn = 0; cnt_abort = 0
state = "OK"
for i in range(len(t)):
if i % hop != 0:
continue
sl = slope(temp, i, win) # dT/dt
be = rstd(accel, i, win) # band energy proxy
# rolling z on band energy
x = be
if m_e is None:
m_e, v_e = x, 1e-6
else:
m_e = (1-alpha)*m_e alpha*x
diff = x - m_e
v_e = (1-alpha)*v_e alpha*(diff*diff)
z_be = 0.0 if v_e <= 1e-12 else (x - m_e)/math.sqrt(v_e)
warn = (z_be >= z_warn) or (sl >= dT_warn)
abort = (z_be >= z_abort) or (sl >= dT_abort)
if abort:
cnt_abort = hop; cnt_warn = max(cnt_warn, sustain_n)
elif warn:
cnt_warn = hop; cnt_abort = max(0, cnt_abort-hop)
else:
cnt_warn = max(0, cnt_warn-hop)
cnt_abort= max(0, cnt_abort-hop)
prev = state
if cnt_abort >= sustain_n:
state = "ABORT"
elif cnt_warn >= sustain_n:
state = "WARN"
else:
state = "OK"
# heuristic label: event if true dT/dt exceeds dT_warn (oracle)
labels.append(1 if sl >= dT_warn else 0)
preds.append(1 if state in ("WARN","ABORT") else 0)
# compute precision/recall/F1
tp = sum(1 for y, p in zip(labels, preds) if y==1 and p==1)
fp = sum(1 for y, p in zip(labels, preds) if y==0 and p==1)
fn = sum(1 for y, p in zip(labels, preds) if y==1 and p==0)
prec = tp / (tp fp) if (tp fp)>0 else 0.0
rec = tp / (tp fn) if (tp fn)>0 else 0.0
f1 = 2*prec*rec/(prec rec) if (prec rec)>0 else 0.0
return {"prec":prec,"rec":rec,"f1":f1, "tp":tp,"fp":fp,"fn":fn}
def cmd_autotune_guardian(args):
rows = _read_csv_rows(args.csv)
fs = args.fs
best = None
random.seed(7)
z_warn_space = [float(x) for x in args.z_warn_grid.split(",")] if args.z_warn_grid else [2.0,2.5,3.0]
z_abort_space = [float(x) for x in args.z_abort_grid.split(",")] if args.z_abort_grid else [3.0,3.5,4.0]
dT_warn_space = [float(x) for x in args.dT_warn_grid.split(",")] if args.dT_warn_grid else [30,40,50]
dT_abort_space= [float(x) for x in args.dT_abort_grid.split(",")]if args.dT_abort_grid else [70,80,100]
sustain_space = [float(x) for x in args.sustain_grid.split(",")] if args.sustain_grid else [0.1,0.2,0.3]
for zw in z_warn_space:
for za in z_abort_space:
for tw in dT_warn_space:
for ta in dT_abort_space:
for su in sustain_space:
score = _score_guardian_detect(rows, zw, za, tw, ta, su, fs)
cand = {"z_warn":zw,"z_abort":za,"dT_warn":tw,"dT_abort":ta,"sustain":su, **score}
if (best is None) or (cand["f1"] > best["f1"]):
best = cand
_write_json({"best":best}, args.out)
# ---------------------- 2) CONFIDENCE-WEIGHTED RANK -----------------------
def cmd_cw_rank(args):
tri = _read_json(args.triage)
rows = tri.get("triage", [])
out_rows = []
for r in rows:
p = float(r.get("priority", 0.0))
u = float(r.get("uncertainty", 0.0))
cw = p * (1.0 - max(0.0, min(1.0, u)))
out_rows.append({**r, "priority_cw": cw})
# re-rank by cw
out_rows.sort(key=lambda x: (-x["priority_cw"], -x.get("risk_max",0.0)))
# write triage_cw.json
out = {
"ship_reusability_index": tri.get("ship_reusability_index"),
"guardian": tri.get("guardian"),
"triage": out_rows
}
_write_json(out, args.out)
# --------------------------- 3) CLUSTER TRIAGE ----------------------------
def _pca2(X: List[Tuple[float,float,float]]) -> List[Tuple[float,float]]:
if not np or len(X) < 2:
# fallback: use x,z projection
return [(x, z) for (x,y,z) in X]
A = np.array(X, dtype=float)
A -= A.mean(axis=0, keepdims=True)
C = np.cov(A.T)
vals, vecs = np.linalg.eigh(C)
order = np.argsort(vals)[::-1]
v1, v2 = vecs[:, order[0]], vecs[:, order[1]]
Y = A @ np.stack([v1, v2], axis=1)
return [(float(y[0]), float(y[1])) for y in Y]
def _dbscan_lite(points: List[Tuple[float,float]], eps: float, min_pts: int) -> List[int]:
n = len(points)
labels = [-1]*n # -1 noise
cid = 0
def neighbors(i):
xi, yi = points[i]
out = []
for j,(xj,yj) in enumerate(points):
if (xi-xj)**2 (yi-yj)**2 <= eps*eps:
out.append(j)
return out
visited = [False]*n
for i in range(n):
if visited[i]: continue
visited[i] = True
N = neighbors(i)
if len(N) < min_pts:
labels[i] = -1
continue
# create cluster
labels[i] = cid
seeds = [j for j in N if j != i]
while seeds:
j = seeds.pop()
if not visited[j]:
visited[j] = True
Nj = neighbors(j)
if len(Nj) >= min_pts:
for q in Nj:
if q not in seeds: seeds.append(q)
if labels[j] == -1:
labels[j] = cid
if labels[j] == -1 or labels[j] == cid:
labels[j] = cid
cid = 1
return labels
def _write_cluster_html(items, proj, labels, html_path):
# items: [{tile_id, risk_max, priority, priority_cw?}], proj: [(u,v)], labels: [cid]
# Normalize to viewport
xs = [u for (u,v) in proj]; ys = [v for (u,v) in proj]
if not xs:
xs=[0]; ys=[0]
minx, maxx = min(xs), max(xs); miny, maxy = min(ys), max(ys)
def norm(u,a,b):
if a==b: return 0.5
return (u-a)/(b-a)
points = []
for (it,(u,v),c) in zip(items, proj, labels):
px = 60 1080*norm(u, minx, maxx)
py = 60 620*(1.0 - norm(v, miny, maxy))
points.append({ "x":px, "y":py, "cid":c, **it })
# HTML
palette = [
"
#16a085","
#2980b9","
#8e44ad","
#e67e22","
#27ae60","
#c0392b",
"
#d35400","
#2c3e50","
#7f8c8d","
#f39c12","
#9b59b6","
#1abc9c"
]
def color(cid):
if cid < 0: return "
#7f8c8d"
return palette[cid % len(palette)]
circles = []
for p in points:
r = max(4, min(18, 8 40*float(p.get("risk_max",0.0))))
fill = color(p["cid"])
title = f'{p["tile_id"]}\\ncluster={p["cid"]}\\nrisk={p.get("risk_max")}\\nprio={p.get("priority")}\\nprio_cw={p.get("priority_cw","-")}'
circles.append(f'<circle cx="{p["x"]:.1f}" cy="{p["y"]:.1f}" r="{r:.1f}" fill="{fill}" fill-opacity="0.75"><title>{title}</title></circle>')
circles.append(f'<text x="{p["x"] r 4:.1f}" y="{p["y"]:.1f}" fill="
#e6edf3" font-size="11">{p["tile_id"]}</text>')
html = f"""<!doctype html>
<meta charset="utf-8"/>
<title>Risk Clusters — Lumena Fix Pack</title>
<style>
body{{margin:0;background:
#0b0f14;color:
#e6edf3;font-family:system-ui,sans-serif}}
header{{padding:14px 18px;border-bottom:1px solid
#1f2630}}
.meta{{color:
#9aa7b1;font-size:14px}}
.wrap{{padding:12px}}
.card{{background:
#0e131a;border:1px solid
#1f2630;border-radius:10px;padding:10px}}
</style>
<header><h3>Risk Clusters (PCA→2D)</h3><div class="meta">Circle size ~ risk_max. Color = cluster id.</div></header>
<div class="wrap">
<div class="card">
<svg width="1200" height="700" viewBox="0 0 1200 700" style="background:
#0b0f14;border:1px solid
#1f2630;border-radius:10px">
{''.join(circles)}
</svg>
</div>
</div>"""
with open(html_path, "w", encoding="utf-8") as f:
f.write(html)
print(f"Wrote {html_path}")
def cmd_cluster_triage(args):
tri = _read_json(args.triage)
cents = _read_json(args.centroids)
rows = tri.get("triage", [])
# build items with coords
items = []
coords = []
for r in rows:
tid = r.get("tile_id")
c = cents.get(tid)
if not c:
continue
items.append({
"tile_id": tid,
"risk_max": float(r.get("risk_max",0.0)),
"priority": float(r.get("priority",0.0)),
"priority_cw": float(r.get("priority_cw", r.get("priority",0.0)*(1.0-float(r.get("uncertainty",0.0))))),
})
coords.append((float(c.get("x",0)), float(c.get("y",0)), float(c.get("z",0))))
if not items:
raise SystemExit("No overlapping tiles between triage and centroids.")
proj = _pca2(coords)
labels = _dbscan_lite([(u,v) for (u,v) in proj], args.eps, args.min_pts)
out = { "clusters":[{"tile_id":it["tile_id"], "cid":int(cid)} for it,cid in zip(items, labels)] }
_write_json(out, args.out_json)
_write_cluster_html(items, proj, labels, args.out_html)
# --------------------------- 4) LATENCY PROFILE ---------------------------
def cmd_latency_profile(args):
# Guardian times (seconds) and Sentinel times (ns) → compute deltas when within window
g_evts = [o for o in _iter_jsonl(
args.guardian) if o.get("type")=="guardian_event" and o.get("state") in ("WARN","ABORT")]
s_tiles = [o for o in _iter_jsonl(args.sentinel) if o.get("type")=="tile"]
# make simple mapping: we don't know tile IDs per guardian—use global nearest timestamps
s_ts = [int(o.get("timestamp_ns",-1)) for o in s_tiles if "timestamp_ns" in o]
s_ts = [x for x in s_ts if x>=0]
s_ts.sort()
if not g_evts or not s_ts:
return _write_json({"error":"insufficient events or timestamps"}, args.out)
deltas_ms = []
for e in g_evts:
t_ns = int(float(e.get("t",0.0))*1e9)
# nearest sentinel ts after event (within window)
after = [ts for ts in s_ts if ts >= t_ns]
if not after:
continue
dt_ms = (after[0] - t_ns)/1e6
if dt_ms <= args.window_ms:
deltas_ms.append(dt_ms)
out = {
"count_pairs": len(deltas_ms),
"window_ms": args.window_ms,
"deltas_ms": {
**_percentiles(deltas_ms, (0.5,0.9,0.99)),
"mean": (sum(deltas_ms)/len(deltas_ms)) if deltas_ms else None,
"min": min(deltas_ms) if deltas_ms else None,
"max": max(deltas_ms) if deltas_ms else None
}
}
_write_json(out, args.out)
# --------------------------- 5) STRESS REPLAY -----------------------------
def _load_triage_index(path: str, topn: int) -> Tuple[List[str], Dict[str, Dict[str,float]]]:
tri = _read_json(path)
order = []
idx = {}
for i, r in enumerate(tri.get("triage", [])[:topn], start=1):
tid = r.get("tile_id", f"UNK#{i}")
order.append(tid)
idx[tid] = {
"rank": i,
"priority": float(r.get("priority",0.0)),
"risk_max": float(r.get("risk_max",0.0)),
"unc": float(r.get("uncertainty",0.0))
}
return order, idx
def _jaccard(a: List[str], b: List[str]) -> float:
sa, sb = set(a), set(b)
if not sa and not sb: return 1.0
return len(sa & sb) / max(1, len(sa | sb))
def cmd_stress_replay(args):
base_order, base_idx = _load_triage_index(args.triage, args.topn)
# synthetic perturbation on priorities/risks/unc (to emulate minor replay variability)
trials = args.trials
jac = []
delta_rank_abs = []
for _ in range(trials):
new_rows = []
for tid in base_order:
r = dict(base_idx[tid])
# perturb priority with small noise and uncertainty with small noise
pr = max(0.0, r["priority"] * (1.0 random.gauss(0, args.noise_pr)))
rk = max(0.0, r["risk_max"] * (1.0 random.gauss(0, args.noise_rk)))
uq = min(1.0, max(0.0, r["unc"] random.gauss(0, args.noise_unc)))
new_rows.append((tid, pr, rk, uq))
# re-rank by priority then risk
new_rows.sort(key=lambda x: (-x[1], -x[2]))
new_order = [tid for (tid,_,_,_) in new_rows]
jac.append(_jaccard(base_order, new_order[:args.topn]))
# average absolute rank shift for tiles that remain in topn
pos = {tid:i 1 for i,tid in enumerate(new_order[:args.topn])}
for tid in base_order:
if tid in pos:
delta_rank_abs.append(abs((base_idx[tid]["rank"]) - pos[tid]))
out = {
"topn": args.topn,
"trials": trials,
"jaccard": {
"mean": (sum(jac)/len(jac)) if jac else None,
"p05": sorted(jac)[int(0.05*len(jac))] if jac else None,
"p95": sorted(jac)[int(0.95*len(jac))-1] if jac else None
},
"rank_shift_abs_mean": (sum(delta_rank_abs)/len(delta_rank_abs)) if delta_rank_abs else None
}
_write_json(out, args.out)
# --------------------------------- CLI ------------------------------------
def build_parser():
ap = argparse.ArgumentParser(description="Lumena Fix Pack — Overdrive utilities")
sub = ap.add_subparsers(dest="cmd", required=True)
ap1 = sub.add_parser("autotune_guardian", help="Grid search Guardian thresholds on a CSV slice")
ap1.add_argument("--csv", required=True, help="CSV with t,accel,temp")
ap1.add_argument("--fs", type=float, required=True, help="Sample rate Hz")
ap1.add_argument("--z_warn_grid", help="e.g., '2.0,2.5,3.0'")
ap1.add_argument("--z_abort_grid", help="e.g., '3.0,3.5,4.0'")
ap1.add_argument("--dT_warn_grid", help="e.g., '30,40,50'")
ap1.add_argument("--dT_abort_grid", help="e.g., '70,80,100'")
ap1.add_argument("--sustain_grid", help="e.g., '0.1,0.2,0.3'")
ap1.add_argument("--out", default="guardian_autotune.json")
ap1.set_defaults(func=cmd_autotune_guardian)
ap2 = sub.add_parser("cw_rank", help="Confidence-weighted re-ranking of triage")
ap2.add_argument("--triage", required=True, help="triage.json")
ap2.add_argument("--out", default="triage_cw.json")
ap2.set_defaults(func=cmd_cw_rank)
ap3 = sub.add_parser("cluster_triage", help="DBSCAN-lite clusters HTML PCA scatter")
ap3.add_argument("--triage", required=True)
ap3.add_argument("--centroids", required=True)
ap3.add_argument("--eps", type=float, default=6.0, help="Neighborhood radius in PCA-proj units")
ap3.add_argument("--min_pts", type=int, default=2)
ap3.add_argument("--out_json", default="triage_clusters.json")
ap3.add_argument("--out_html", default="triage_clusters.html")
ap3.set_defaults(func=cmd_cluster_triage)
ap4 = sub.add_parser("latency_profile", help="Guardian→Sentinel latency stats from JSONL streams")
ap4.add_argument("--guardian", required=True, help="guardian_events.jsonl")
ap4.add_argument("--sentinel", required=True, help="sentinel_out.jsonl")
ap4.add_argument("--window_ms", type=float, default=2000.0, help="Max allowed pairing window")
ap4.add_argument("--out", default="latency_profile.json")
ap4.set_defaults(func=cmd_latency_profile)
ap5 = sub.add_parser("stress_replay", help="Perturb triage to measure stability")
ap5.add_argument("--triage", required=True)
ap5.add_argument("--topn", type=int, default=16)
ap5.add_argument("--trials", type=int, default=200)
ap5.add_argument("--noise_pr", type=float, default=0.03, help="Priority noise sigma (fraction)")
ap5.add_argument("--noise_rk", type=float, default=0.03, help="Risk noise sigma (fraction)")
ap5.add_argument("--noise_unc", type=float, default=0.02, help="Abs uncertainty noise")
ap5.add_argument("--out", default="stress_replay.json")
ap5.set_defaults(func=cmd_stress_replay)
return ap
def main():
ap = build_parser()
args = ap.parse_args()
args.func(args)
if __name__ == "__main__":
main()
what you get (fast)
Guardian autotunepython fixpack_overdrive.py autotune_guardian --csv flight_slice.csv --fs 200 \
--z_warn_grid 2.0,2.5,3.0 --z_abort_grid 3.0,3.5,4.0 \
--dT_warn_grid 30,40,50 --dT_abort_grid 70,80,100 --sustain_grid 0.1,0.2,0.3
# → guardian_autotune.json (best thresholds with F1/prec/rec)
Confidence-weighted triagepython fixpack_overdrive.py cw_rank --triage triage.json --out triage_cw.json
Risk clusters HTML PCA plotpython fixpack_overdrive.py cluster_triage \
--triage triage_cw.json --centroids centroids_16tile.json \
--eps 6.0 --min_pts 2 --out_json triage_clusters.json --out_html triage_clusters.html
Latency profiling (Guardian→Sentinel)python fixpack_overdrive.py latency_profile \
--guardian guardian_events.jsonl \
--sentinel sentinel_out_v2.jsonl \
--window_ms 2000 --out latency_profile.json
Replay stability stress testpython fixpack_overdrive.py stress_replay --triage triage.json \
--topn 16 --trials 500 --noise_pr 0.03 --noise_rk 0.03 --noise_unc 0.02 \
--out stress_replay.json
want me to wire these into your fixpack_run.py behind flags (--cw_rank, --cluster_html, --latency_profile, --stress_replay) and auto-run at the end? or add an operator “actions” synthesizer (guarded pitch/roll trim, downlink notes) based on Guardian peak cluster hotspots? say go and i’ll slam those in. 🚀