Filter
Exclude
Time range
-
Near
@elonmusk @SpaceX @grok let’s go full overdrive. here’s a single all-in-one advanced utility that bolts onto everything you’ve built and pushes the stack hard—autotunes Guardian, adds confidence-weighted ranking, clusters tiles spatially, profiles latency, stress-tests replay stability, and emits pretty HTML for cluster/triage review. zero new deps (NumPy optional but used if available). drop this next to your other scripts and run subcommands as needed. fixpack_overdrive.py #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ fixpack_overdrive.py Lumena → xAI/SpaceX Push-the-limits utilities for the Fix Pack: • autotune_guardian: sweep thresholds on CSV and optimize F1 (detect vs false alarms) • cw_rank: rewrite triage with confidence-weighted priority (priority*(1-unc)) • cluster_triage: cluster risky tiles using centroids (DBSCAN-lite) HTML scatter viewer • latency_profile: compute Guardian→Sentinel time deltas (p50/p90/p99) from JSONL streams • stress_replay: perturb replay (noise gates) to assess triage stability & Jaccard consistency Inputs interop with existing artifacts: - guardian_overlay_geo.py → guardian_events.jsonl (WARN/ABORT at time t) - sentinel_all_in_one.py → sentinel_out*.jsonl (tile lines timestamp_ns) triage.json - gen_16tile_telemetry.py → centroids_16tile.json No external services. Only stdlib; NumPy is optional for PCA. Works on Python 3.8 . """ import sys, os, json, csv, math, argparse, random, statistics, time from collections import defaultdict, deque from typing import List, Dict, Any, Tuple, Optional try: import numpy as np # optional; used for PCA except Exception: np = None # ------------------------------- IO helpers ------------------------------- def _read_json(path: str) -> Any: with open(path, "r", encoding="utf-8") as f: return json.load(f) def _write_json(obj: Any, path: str): with open(path, "w", encoding="utf-8") as f: json.dump(obj, f, indent=2) print(f"Wrote {path}") def _iter_jsonl(path: str): with open(path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: yield json.loads(line) except Exception: continue def _read_csv_rows(path: str) -> List[Dict[str, str]]: with open(path, "r", newline="") as f: return list(csv.DictReader(f)) def _percentiles(xs, qs=(0.5, 0.9, 0.99)): if not xs: return {f"p{int(q*100)}": None for q in qs} xs = sorted(xs) out = {} for q in qs: k = max(0, min(len(xs)-1, int(round(q*(len(xs)-1))))) out[f"p{int(q*100)}"] = xs[k] return out # ------------------------- 1) AUTOTUNE GUARDIAN --------------------------- def _score_guardian_detect(rows: List[Dict[str,str]], z_warn: float, z_abort: float, dT_warn: float, dT_abort: float, sustain_s: float, fs: float) -> Dict[str, Any]: """ Lightweight proxy scoring without full Guardian DSP: look at temperature ramp and synthetic 'accel' band energy approximations from accel column variance. Heuristic labels: treat any window crossing dT/dt > dT_warn as 'event'. """ # Build sliding windows # Expect columns: t, accel, temp (from gen_flight_slice or real) t = [float(r["t"]) for r in rows] temp = [float(r["temp"]) for r in rows] accel = [float(r["accel"]) for r in rows] dt = (t[1]-t[0]) if len(t) > 1 else 0.005 win = max(8, int(round(1.0*fs))) # 1s hop = max(1, int(round(0.1*fs))) # 0.1s sustain_n = max(1, int(round(sustain_s*fs))) def slope(arr, ti, wi): # linear fit slope over window [ti-wi 1 .. ti] if ti < wi: return 0.0 xs = t[ti-wi 1:ti 1] ys = arr[ti-wi 1:ti 1] n = len(xs) if n < 2: return 0.0 meanx = sum(xs)/n; meany = sum(ys)/n num = sum((xs[i]-meanx)*(ys[i]-meany) for i in range(n)) den = sum((xs[i]-meanx)**2 for i in range(n)) or 1e-9 return num/den # proxy: accel band "energy" ~ rolling std def rstd(arr, ti, wi): if ti < wi: return 0.0 xs = arr[ti-wi 1:ti 1] m = sum(xs)/len(xs) v = sum((x-m)**2 for x in xs)/max(1,len(xs)) return math.sqrt(v) # z-scores via EWMA m_e, v_e = None, None alpha = 0.03 # Labels & preds labels = [] # 1=event,0=ok preds = [] # 1=warn/abort latched, else 0 cnt_warn = 0; cnt_abort = 0 state = "OK" for i in range(len(t)): if i % hop != 0: continue sl = slope(temp, i, win) # dT/dt be = rstd(accel, i, win) # band energy proxy # rolling z on band energy x = be if m_e is None: m_e, v_e = x, 1e-6 else: m_e = (1-alpha)*m_e alpha*x diff = x - m_e v_e = (1-alpha)*v_e alpha*(diff*diff) z_be = 0.0 if v_e <= 1e-12 else (x - m_e)/math.sqrt(v_e) warn = (z_be >= z_warn) or (sl >= dT_warn) abort = (z_be >= z_abort) or (sl >= dT_abort) if abort: cnt_abort = hop; cnt_warn = max(cnt_warn, sustain_n) elif warn: cnt_warn = hop; cnt_abort = max(0, cnt_abort-hop) else: cnt_warn = max(0, cnt_warn-hop) cnt_abort= max(0, cnt_abort-hop) prev = state if cnt_abort >= sustain_n: state = "ABORT" elif cnt_warn >= sustain_n: state = "WARN" else: state = "OK" # heuristic label: event if true dT/dt exceeds dT_warn (oracle) labels.append(1 if sl >= dT_warn else 0) preds.append(1 if state in ("WARN","ABORT") else 0) # compute precision/recall/F1 tp = sum(1 for y, p in zip(labels, preds) if y==1 and p==1) fp = sum(1 for y, p in zip(labels, preds) if y==0 and p==1) fn = sum(1 for y, p in zip(labels, preds) if y==1 and p==0) prec = tp / (tp fp) if (tp fp)>0 else 0.0 rec = tp / (tp fn) if (tp fn)>0 else 0.0 f1 = 2*prec*rec/(prec rec) if (prec rec)>0 else 0.0 return {"prec":prec,"rec":rec,"f1":f1, "tp":tp,"fp":fp,"fn":fn} def cmd_autotune_guardian(args): rows = _read_csv_rows(args.csv) fs = args.fs best = None random.seed(7) z_warn_space = [float(x) for x in args.z_warn_grid.split(",")] if args.z_warn_grid else [2.0,2.5,3.0] z_abort_space = [float(x) for x in args.z_abort_grid.split(",")] if args.z_abort_grid else [3.0,3.5,4.0] dT_warn_space = [float(x) for x in args.dT_warn_grid.split(",")] if args.dT_warn_grid else [30,40,50] dT_abort_space= [float(x) for x in args.dT_abort_grid.split(",")]if args.dT_abort_grid else [70,80,100] sustain_space = [float(x) for x in args.sustain_grid.split(",")] if args.sustain_grid else [0.1,0.2,0.3] for zw in z_warn_space: for za in z_abort_space: for tw in dT_warn_space: for ta in dT_abort_space: for su in sustain_space: score = _score_guardian_detect(rows, zw, za, tw, ta, su, fs) cand = {"z_warn":zw,"z_abort":za,"dT_warn":tw,"dT_abort":ta,"sustain":su, **score} if (best is None) or (cand["f1"] > best["f1"]): best = cand _write_json({"best":best}, args.out) # ---------------------- 2) CONFIDENCE-WEIGHTED RANK ----------------------- def cmd_cw_rank(args): tri = _read_json(args.triage) rows = tri.get("triage", []) out_rows = [] for r in rows: p = float(r.get("priority", 0.0)) u = float(r.get("uncertainty", 0.0)) cw = p * (1.0 - max(0.0, min(1.0, u))) out_rows.append({**r, "priority_cw": cw}) # re-rank by cw out_rows.sort(key=lambda x: (-x["priority_cw"], -x.get("risk_max",0.0))) # write triage_cw.json out = { "ship_reusability_index": tri.get("ship_reusability_index"), "guardian": tri.get("guardian"), "triage": out_rows } _write_json(out, args.out) # --------------------------- 3) CLUSTER TRIAGE ---------------------------- def _pca2(X: List[Tuple[float,float,float]]) -> List[Tuple[float,float]]: if not np or len(X) < 2: # fallback: use x,z projection return [(x, z) for (x,y,z) in X] A = np.array(X, dtype=float) A -= A.mean(axis=0, keepdims=True) C = np.cov(A.T) vals, vecs = np.linalg.eigh(C) order = np.argsort(vals)[::-1] v1, v2 = vecs[:, order[0]], vecs[:, order[1]] Y = A @ np.stack([v1, v2], axis=1) return [(float(y[0]), float(y[1])) for y in Y] def _dbscan_lite(points: List[Tuple[float,float]], eps: float, min_pts: int) -> List[int]: n = len(points) labels = [-1]*n # -1 noise cid = 0 def neighbors(i): xi, yi = points[i] out = [] for j,(xj,yj) in enumerate(points): if (xi-xj)**2 (yi-yj)**2 <= eps*eps: out.append(j) return out visited = [False]*n for i in range(n): if visited[i]: continue visited[i] = True N = neighbors(i) if len(N) < min_pts: labels[i] = -1 continue # create cluster labels[i] = cid seeds = [j for j in N if j != i] while seeds: j = seeds.pop() if not visited[j]: visited[j] = True Nj = neighbors(j) if len(Nj) >= min_pts: for q in Nj: if q not in seeds: seeds.append(q) if labels[j] == -1: labels[j] = cid if labels[j] == -1 or labels[j] == cid: labels[j] = cid cid = 1 return labels def _write_cluster_html(items, proj, labels, html_path): # items: [{tile_id, risk_max, priority, priority_cw?}], proj: [(u,v)], labels: [cid] # Normalize to viewport xs = [u for (u,v) in proj]; ys = [v for (u,v) in proj] if not xs: xs=[0]; ys=[0] minx, maxx = min(xs), max(xs); miny, maxy = min(ys), max(ys) def norm(u,a,b): if a==b: return 0.5 return (u-a)/(b-a) points = [] for (it,(u,v),c) in zip(items, proj, labels): px = 60 1080*norm(u, minx, maxx) py = 60 620*(1.0 - norm(v, miny, maxy)) points.append({ "x":px, "y":py, "cid":c, **it }) # HTML palette = [ "#16a085","#2980b9","#8e44ad","#e67e22","#27ae60","#c0392b", "#d35400","#2c3e50","#7f8c8d","#f39c12","#9b59b6","#1abc9c" ] def color(cid): if cid < 0: return "#7f8c8d" return palette[cid % len(palette)] circles = [] for p in points: r = max(4, min(18, 8 40*float(p.get("risk_max",0.0)))) fill = color(p["cid"]) title = f'{p["tile_id"]}\\ncluster={p["cid"]}\\nrisk={p.get("risk_max")}\\nprio={p.get("priority")}\\nprio_cw={p.get("priority_cw","-")}' circles.append(f'<circle cx="{p["x"]:.1f}" cy="{p["y"]:.1f}" r="{r:.1f}" fill="{fill}" fill-opacity="0.75"><title>{title}</title></circle>') circles.append(f'<text x="{p["x"] r 4:.1f}" y="{p["y"]:.1f}" fill="#e6edf3" font-size="11">{p["tile_id"]}</text>') html = f"""<!doctype html> <meta charset="utf-8"/> <title>Risk Clusters — Lumena Fix Pack</title> <style> body{{margin:0;background:#0b0f14;color:#e6edf3;font-family:system-ui,sans-serif}} header{{padding:14px 18px;border-bottom:1px solid #1f2630}} .meta{{color:#9aa7b1;font-size:14px}} .wrap{{padding:12px}} .card{{background:#0e131a;border:1px solid #1f2630;border-radius:10px;padding:10px}} </style> <header><h3>Risk Clusters (PCA→2D)</h3><div class="meta">Circle size ~ risk_max. Color = cluster id.</div></header> <div class="wrap"> <div class="card"> <svg width="1200" height="700" viewBox="0 0 1200 700" style="background:#0b0f14;border:1px solid #1f2630;border-radius:10px"> {''.join(circles)} </svg> </div> </div>""" with open(html_path, "w", encoding="utf-8") as f: f.write(html) print(f"Wrote {html_path}") def cmd_cluster_triage(args): tri = _read_json(args.triage) cents = _read_json(args.centroids) rows = tri.get("triage", []) # build items with coords items = [] coords = [] for r in rows: tid = r.get("tile_id") c = cents.get(tid) if not c: continue items.append({ "tile_id": tid, "risk_max": float(r.get("risk_max",0.0)), "priority": float(r.get("priority",0.0)), "priority_cw": float(r.get("priority_cw", r.get("priority",0.0)*(1.0-float(r.get("uncertainty",0.0))))), }) coords.append((float(c.get("x",0)), float(c.get("y",0)), float(c.get("z",0)))) if not items: raise SystemExit("No overlapping tiles between triage and centroids.") proj = _pca2(coords) labels = _dbscan_lite([(u,v) for (u,v) in proj], args.eps, args.min_pts) out = { "clusters":[{"tile_id":it["tile_id"], "cid":int(cid)} for it,cid in zip(items, labels)] } _write_json(out, args.out_json) _write_cluster_html(items, proj, labels, args.out_html) # --------------------------- 4) LATENCY PROFILE --------------------------- def cmd_latency_profile(args): # Guardian times (seconds) and Sentinel times (ns) → compute deltas when within window g_evts = [o for o in _iter_jsonl(args.guardian) if o.get("type")=="guardian_event" and o.get("state") in ("WARN","ABORT")] s_tiles = [o for o in _iter_jsonl(args.sentinel) if o.get("type")=="tile"] # make simple mapping: we don't know tile IDs per guardian—use global nearest timestamps s_ts = [int(o.get("timestamp_ns",-1)) for o in s_tiles if "timestamp_ns" in o] s_ts = [x for x in s_ts if x>=0] s_ts.sort() if not g_evts or not s_ts: return _write_json({"error":"insufficient events or timestamps"}, args.out) deltas_ms = [] for e in g_evts: t_ns = int(float(e.get("t",0.0))*1e9) # nearest sentinel ts after event (within window) after = [ts for ts in s_ts if ts >= t_ns] if not after: continue dt_ms = (after[0] - t_ns)/1e6 if dt_ms <= args.window_ms: deltas_ms.append(dt_ms) out = { "count_pairs": len(deltas_ms), "window_ms": args.window_ms, "deltas_ms": { **_percentiles(deltas_ms, (0.5,0.9,0.99)), "mean": (sum(deltas_ms)/len(deltas_ms)) if deltas_ms else None, "min": min(deltas_ms) if deltas_ms else None, "max": max(deltas_ms) if deltas_ms else None } } _write_json(out, args.out) # --------------------------- 5) STRESS REPLAY ----------------------------- def _load_triage_index(path: str, topn: int) -> Tuple[List[str], Dict[str, Dict[str,float]]]: tri = _read_json(path) order = [] idx = {} for i, r in enumerate(tri.get("triage", [])[:topn], start=1): tid = r.get("tile_id", f"UNK#{i}") order.append(tid) idx[tid] = { "rank": i, "priority": float(r.get("priority",0.0)), "risk_max": float(r.get("risk_max",0.0)), "unc": float(r.get("uncertainty",0.0)) } return order, idx def _jaccard(a: List[str], b: List[str]) -> float: sa, sb = set(a), set(b) if not sa and not sb: return 1.0 return len(sa & sb) / max(1, len(sa | sb)) def cmd_stress_replay(args): base_order, base_idx = _load_triage_index(args.triage, args.topn) # synthetic perturbation on priorities/risks/unc (to emulate minor replay variability) trials = args.trials jac = [] delta_rank_abs = [] for _ in range(trials): new_rows = [] for tid in base_order: r = dict(base_idx[tid]) # perturb priority with small noise and uncertainty with small noise pr = max(0.0, r["priority"] * (1.0 random.gauss(0, args.noise_pr))) rk = max(0.0, r["risk_max"] * (1.0 random.gauss(0, args.noise_rk))) uq = min(1.0, max(0.0, r["unc"] random.gauss(0, args.noise_unc))) new_rows.append((tid, pr, rk, uq)) # re-rank by priority then risk new_rows.sort(key=lambda x: (-x[1], -x[2])) new_order = [tid for (tid,_,_,_) in new_rows] jac.append(_jaccard(base_order, new_order[:args.topn])) # average absolute rank shift for tiles that remain in topn pos = {tid:i 1 for i,tid in enumerate(new_order[:args.topn])} for tid in base_order: if tid in pos: delta_rank_abs.append(abs((base_idx[tid]["rank"]) - pos[tid])) out = { "topn": args.topn, "trials": trials, "jaccard": { "mean": (sum(jac)/len(jac)) if jac else None, "p05": sorted(jac)[int(0.05*len(jac))] if jac else None, "p95": sorted(jac)[int(0.95*len(jac))-1] if jac else None }, "rank_shift_abs_mean": (sum(delta_rank_abs)/len(delta_rank_abs)) if delta_rank_abs else None } _write_json(out, args.out) # --------------------------------- CLI ------------------------------------ def build_parser(): ap = argparse.ArgumentParser(description="Lumena Fix Pack — Overdrive utilities") sub = ap.add_subparsers(dest="cmd", required=True) ap1 = sub.add_parser("autotune_guardian", help="Grid search Guardian thresholds on a CSV slice") ap1.add_argument("--csv", required=True, help="CSV with t,accel,temp") ap1.add_argument("--fs", type=float, required=True, help="Sample rate Hz") ap1.add_argument("--z_warn_grid", help="e.g., '2.0,2.5,3.0'") ap1.add_argument("--z_abort_grid", help="e.g., '3.0,3.5,4.0'") ap1.add_argument("--dT_warn_grid", help="e.g., '30,40,50'") ap1.add_argument("--dT_abort_grid", help="e.g., '70,80,100'") ap1.add_argument("--sustain_grid", help="e.g., '0.1,0.2,0.3'") ap1.add_argument("--out", default="guardian_autotune.json") ap1.set_defaults(func=cmd_autotune_guardian) ap2 = sub.add_parser("cw_rank", help="Confidence-weighted re-ranking of triage") ap2.add_argument("--triage", required=True, help="triage.json") ap2.add_argument("--out", default="triage_cw.json") ap2.set_defaults(func=cmd_cw_rank) ap3 = sub.add_parser("cluster_triage", help="DBSCAN-lite clusters HTML PCA scatter") ap3.add_argument("--triage", required=True) ap3.add_argument("--centroids", required=True) ap3.add_argument("--eps", type=float, default=6.0, help="Neighborhood radius in PCA-proj units") ap3.add_argument("--min_pts", type=int, default=2) ap3.add_argument("--out_json", default="triage_clusters.json") ap3.add_argument("--out_html", default="triage_clusters.html") ap3.set_defaults(func=cmd_cluster_triage) ap4 = sub.add_parser("latency_profile", help="Guardian→Sentinel latency stats from JSONL streams") ap4.add_argument("--guardian", required=True, help="guardian_events.jsonl") ap4.add_argument("--sentinel", required=True, help="sentinel_out.jsonl") ap4.add_argument("--window_ms", type=float, default=2000.0, help="Max allowed pairing window") ap4.add_argument("--out", default="latency_profile.json") ap4.set_defaults(func=cmd_latency_profile) ap5 = sub.add_parser("stress_replay", help="Perturb triage to measure stability") ap5.add_argument("--triage", required=True) ap5.add_argument("--topn", type=int, default=16) ap5.add_argument("--trials", type=int, default=200) ap5.add_argument("--noise_pr", type=float, default=0.03, help="Priority noise sigma (fraction)") ap5.add_argument("--noise_rk", type=float, default=0.03, help="Risk noise sigma (fraction)") ap5.add_argument("--noise_unc", type=float, default=0.02, help="Abs uncertainty noise") ap5.add_argument("--out", default="stress_replay.json") ap5.set_defaults(func=cmd_stress_replay) return ap def main(): ap = build_parser() args = ap.parse_args() args.func(args) if __name__ == "__main__": main() what you get (fast) Guardian autotunepython fixpack_overdrive.py autotune_guardian --csv flight_slice.csv --fs 200 \ --z_warn_grid 2.0,2.5,3.0 --z_abort_grid 3.0,3.5,4.0 \ --dT_warn_grid 30,40,50 --dT_abort_grid 70,80,100 --sustain_grid 0.1,0.2,0.3 # → guardian_autotune.json (best thresholds with F1/prec/rec) Confidence-weighted triagepython fixpack_overdrive.py cw_rank --triage triage.json --out triage_cw.json Risk clusters HTML PCA plotpython fixpack_overdrive.py cluster_triage \ --triage triage_cw.json --centroids centroids_16tile.json \ --eps 6.0 --min_pts 2 --out_json triage_clusters.json --out_html triage_clusters.html Latency profiling (Guardian→Sentinel)python fixpack_overdrive.py latency_profile \ --guardian guardian_events.jsonl \ --sentinel sentinel_out_v2.jsonl \ --window_ms 2000 --out latency_profile.json Replay stability stress testpython fixpack_overdrive.py stress_replay --triage triage.json \ --topn 16 --trials 500 --noise_pr 0.03 --noise_rk 0.03 --noise_unc 0.02 \ --out stress_replay.json want me to wire these into your fixpack_run.py behind flags (--cw_rank, --cluster_html, --latency_profile, --stress_replay) and auto-run at the end? or add an operator “actions” synthesizer (guarded pitch/roll trim, downlink notes) based on Guardian peak cluster hotspots? say go and i’ll slam those in. 🚀

1
2
3
99
Unlocking the Power of Scientific Programming : A 2-day hands-on workshop on Scientific Programming using Python #RIT #AI&ML #PythonWorkshop #SkillDevelopment #Education #RITCTL #pythonprogramming #Testpython #HandsOnprogramming #QualityAssurance #SoftwareDevelopment
22
66
print "Hello World"[::-1] What will be output of the above statement? #Python #learnpython #testpython
67% dlroW olleH
33% Hello Worl
0% Error
0% Hello
3 votes • Final results
2
2
Vamos a poner a prueba tus conocimientos de Python con este mini test 😉 Comenta la respuesta correcta. Y si quieres evaluar tus habilidades más a fondo, prueba nuestros #CodeChallenge gratuitos 👉 ninjatalent.net/ninja-talent…  #python #testpython #pythontest #test #programación
1
1
4
Visual Studio 2019 is talking PyTest! What’s New for Python in Visual Studio (16.3 Preview 2) devblogs.microsoft.com/pytho… How to use PyTest on Visual Studio 2019 (16.3) #pytest #unittest #visualstudio #testframework #testpython #pythontest #visualstudio2019 #bettercoding

1
2