# In plain text
# This tweet is the code itself
# It has no dependencies or API calls
# It only talks to your node
# Chose your run length (dates, blocks)
# Copy paste this to your AI
# Or save it as "taproot_opif.py" and run it
# Your terminal: python3 taproot_opif.py
"""
taproot_opif.py — Analyze OP_IF usage in taproot script path spends via RPC.
Requires a running Bitcoin Core node with server=1.
No external dependencies — stdlib only.
Usage:
python3 taproot_opif.py --start-block 823786 --end-block 951964
python3 taproot_opif.py --start-day 2024-01-01 --end-day 2024-12-31
python3 taproot_opif.py --start-day 2024-01-01
"""
import argparse
import base64
import http.client
import io
import json
import os
import platform
import struct
import sys
import time
from time import time as now
from collections import defaultdict
from datetime import datetime, timezone, date, timedelta
from binascii import unhexlify
from io import BytesIO
from json import loads as json_loads, dumps as json_dumps
from struct import unpack, unpack_from
# ===========================================================================
# RPC connection (copied from open_settlement_price_v10_rc4.py)
# ===========================================================================
_KNOWN_COOKIE_DIRS = [
'/embassy-data/package-data/volumes/bitcoin/data/.bitcoin',
'/embassy-data/package-data/volumes/bitcoind/data/.bitcoin',
'/home/umbrel/umbrel/app-data/bitcoin/data/.bitcoin',
'/home/umbrel/umbrel/data/app-data/bitcoin/data/.bitcoin',
'/opt/umbrel-os/app-data/bitcoin/data/.bitcoin',
'/mnt/hdd/bitcoin',
'/mnt/hdd/.bitcoin',
'/mnt/storage/bitcoin',
'/opt/mynode/bitcoin',
]
_SEARCH_ROOTS = [
'/embassy-data', '/mnt', '/opt', '/home', '/srv', '/var/lib',
]
def _find_data_dir():
system = platform.system()
if system == 'Darwin':
return os.path.expanduser('~/Library/Application Support/Bitcoin')
elif system == 'Windows':
return os.path.join(os.environ.get('APPDATA', ''), 'Bitcoin')
else:
return os.path.expanduser('~/.bitcoin')
def _find_bitcoin_dir():
for d in _KNOWN_COOKIE_DIRS:
if os.path.exists(os.path.join(d, '.cookie')) or \
os.path.exists(os.path.join(d, 'bitcoin.conf')):
return d
for root in _SEARCH_ROOTS:
if not os.path.isdir(root):
continue
try:
for dirpath, dirnames, filenames in os.walk(root):
if '.cookie' in filenames or 'bitcoin.conf' in filenames:
return dirpath
except PermissionError:
continue
return None
def _find_rpc_host_from_ss(port):
import subprocess
for cmd in (['ss', '-tlnp'], ['netstat', '-tlnp']):
try:
raw_out = subprocess.check_output(cmd, stderr=subprocess.DEVNULL)
out = (raw_out
.decode())
for line in (out
.splitlines()):
if ':{}'.format(port) in line:
for part in (line
.split()):
if (part
.endswith(':{}'.format(port))):
host = (part.rsplit(':', 1)[0]
.strip('[]'))
if host not in ('0.0.0.0', '*', '127.0.0.1', '::1', '::'):
return host
except Exception:
continue
return None
def setup_rpc(data_dir=None, rpchost=None, rpcport=None, rpcuser=None, rpcpassword=None):
if data_dir is None:
data_dir = _find_data_dir()
if not os.path.exists(os.path.join(data_dir, '.cookie')) and \
not any(os.path.exists(os.path.join(data_dir, n))
for n in ('bitcoin.conf', 'bitcoin_rw.conf')):
found = _find_bitcoin_dir()
if found:
data_dir = found
conf_path = None
for name in ('bitcoin.conf', 'bitcoin_rw.conf'):
p = os.path.join(data_dir, name)
if os.path.exists(p):
conf_path = p
break
if not conf_path:
print(f"ERROR: Cannot find bitcoin.conf in {data_dir}")
sys.exit(1)
conf = {}
with open(conf_path) as f:
for line in f:
line = (line
.strip())
if not line or (line
.startswith('#')) or '=' not in line:
continue
k, v = (line
.split('=', 1))
k = (k
.strip())
conf[k] = (v
.strip()
.strip('"'))
cget = conf.get
rpc_user = rpcuser if rpcuser else cget('rpcuser')
rpc_pass = rpcpassword if rpcpassword else cget('rpcpassword')
rpc_host = rpchost if rpchost else cget('rpcconnect', '127.0.0.1')
rpc_port = rpcport if rpcport else int(cget('rpcport', '8332'))
cookie_path = cget('rpccookiefile', os.path.join(data_dir, '.cookie'))
_host = [rpc_host]
_conn = [None]
def rpc(method, *params):
u, p = rpc_user, rpc_pass
if not u or not p:
try:
with open(cookie_path) as f:
u, p = (f
.read()
.strip()
.split(':', 1))
except Exception as e:
print(f"ERROR: cannot read RPC cookie at {cookie_path}: {e}")
print("Make sure your Bitcoin node is running.")
sys.exit(1)
auth = (base64.b64encode(f'{u}:{p}'.encode())
.decode())
headers = {'Content-Type': 'application/json',
'Authorization': f'Basic {auth}'}
payload = json_dumps({'jsonrpc': '1.0', 'id': 'taproot_opif',
'method': method, 'params': list(params)})
for attempt in range(3):
try:
if _conn[0] is None:
_conn[0] = http.client.HTTPConnection(_host[0], rpc_port, timeout=60)
(_conn[0]
.request('POST', '/', payload, headers))
resp = (_conn[0]
.getresponse())
raw = (resp
.read())
data = json_loads(raw)
if (data
.get('error')):
raise RuntimeError(data['error'])
return data['result']
except Exception:
_conn[0] = None
if attempt == 2:
raise
try:
rpc('getblockcount')
except Exception:
pass
try:
rpc('getblockcount')
except Exception:
if not rpchost:
detected = _find_rpc_host_from_ss(rpc_port)
if detected:
_host[0] = detected
_conn[0] = None
try:
rpc('getblockcount')
return rpc
except Exception:
pass
print("ERROR: cannot connect to Bitcoin node via RPC.")
print("Make sure bitcoind or bitcoin-qt is running with server=1.")
print("If your node runs in a container, try: --rpchost <container-ip>")
sys.exit(1)
return rpc
# ===========================================================================
# Block / date helpers (copied from open_settlement_price_v10_rc4.py)
# ===========================================================================
_hash_cache = {}
_ts_cache = {}
def _get_block_hash(height, rpc):
if height not in _hash_cache:
_hash_cache[height] = rpc('getblockhash', height)
return _hash_cache[height]
def _get_block_timestamp(height, rpc):
if height not in _ts_cache:
h = _get_block_hash(height, rpc)
raw = unhexlify(rpc('getblockheader', h, False))
_ts_cache[height] = unpack('<I', raw[68:72])[0]
return _ts_cache[height]
def _get_raw_block(height, rpc):
h = _get_block_hash(height, rpc)
hex_block = rpc('getblock', h, 0)
return unhexlify(hex_block)
def _find_first_block_on_or_after(day_start_ts, chain_height, rpc):
lo, hi = 0, chain_height
while lo < hi:
mid = (lo hi) // 2
if _get_block_timestamp(mid, rpc) < day_start_ts:
lo = mid 1
else:
hi = mid
return lo
def _find_last_block_before(day_end_ts, chain_height, rpc):
lo, hi = 0, chain_height
while lo < hi:
mid = (lo hi 1) // 2
if _get_block_timestamp(mid, rpc) < day_end_ts:
lo = mid
else:
hi = mid - 1
return lo
def date_range_to_blocks(start_date, end_date, chain_height, rpc):
"""Return (start_block, end_block) for the given inclusive date range."""
start_ts = int(datetime(start_date.year, start_date.month, start_date.day,
tzinfo=timezone.utc)
.timestamp())
end_ts = int(datetime(end_date.year, end_date.month, end_date.day,
tzinfo=timezone.utc)
.timestamp()) 86_400
print(f"Searching for blocks between {start_date} and {end_date} ...", flush=True)
start_block = _find_first_block_on_or_after(start_ts, chain_height, rpc)
end_block = _find_last_block_before(end_ts, chain_height, rpc)
if start_block > chain_height:
print(f"ERROR: No blocks found on or after {start_date}")
sys.exit(1)
if end_block < start_block:
print(f"ERROR: No blocks found on or before {end_date}")
sys.exit(1)
start_actual = (datetime.fromtimestamp(_get_block_timestamp(start_block, rpc), tz=timezone.utc)
.date())
end_actual = (datetime.fromtimestamp(_get_block_timestamp(end_block, rpc), tz=timezone.utc)
.date())
print(f" Start: block {start_block:,} ({start_actual})")
print(f" End: block {end_block:,} ({end_actual})")
return start_block, end_block
# ===========================================================================
# Script parsing (shared logic with opif_analysis.py)
# ===========================================================================
OPCODE_NAMES = {
0x00: 'OP_0',
0x4f: 'OP_1NEGATE',
0x51: 'OP_1', 0x52: 'OP_2', 0x53: 'OP_3', 0x54: 'OP_4',
0x55: 'OP_5', 0x56: 'OP_6', 0x57: 'OP_7', 0x58: 'OP_8',
0x59: 'OP_9', 0x5a: 'OP_10', 0x5b: 'OP_11', 0x5c: 'OP_12',
0x5d: 'OP_13', 0x5e: 'OP_14', 0x5f: 'OP_15', 0x60: 'OP_16',
0x61: 'OP_NOP',
0x63: 'OP_IF',
0x64: 'OP_NOTIF', # kept for tokenizer completeness, not searched
0x67: 'OP_ELSE',
0x68: 'OP_ENDIF',
0x69: 'OP_VERIFY',
0x6a: 'OP_RETURN',
0x6b: 'OP_TOALTSTACK',
0x6c: 'OP_FROMALTSTACK',
0x6d: 'OP_2DROP',
0x6e: 'OP_2DUP',
0x6f: 'OP_3DUP',
0x73: 'OP_IFDUP',
0x74: 'OP_DEPTH',
0x75: 'OP_DROP',
0x76: 'OP_DUP',
0x77: 'OP_NIP',
0x78: 'OP_OVER',
0x79: 'OP_PICK',
0x7a: 'OP_ROLL',
0x7b: 'OP_ROT',
0x7c: 'OP_SWAP',
0x7d: 'OP_TUCK',
0x7e: 'OP_CAT',
0x82: 'OP_SIZE',
0x85: 'OP_1ADD',
0x86: 'OP_1SUB',
0x87: 'OP_EQUAL',
0x88: 'OP_EQUALVERIFY',
0x8b: 'OP_ADD',
0x8c: 'OP_SUB',
0x8f: 'OP_MOD',
0x91: 'OP_NOT',
0x92: 'OP_0NOTEQUAL',
0x9a: 'OP_BOOLOR',
0x9b: 'OP_NUMEQUAL',
0x9c: 'OP_NUMEQUALVERIFY',
0x9d: 'OP_NUMNOTEQUAL',
0x9e: 'OP_LESSTHAN',
0x9f: 'OP_GREATERTHAN',
0xa0: 'OP_LESSTHANOREQUAL',
0xa1: 'OP_GREATERTHANOREQUAL',
0xa2: 'OP_MIN',
0xa3: 'OP_MAX',
0xa4: 'OP_WITHIN',
0xa8: 'OP_SHA256',
0xa9: 'OP_HASH160',
0xaa: 'OP_HASH256',
0xac: 'OP_CHECKSIG',
0xad: 'OP_CHECKSIGVERIFY',
0xae: 'OP_CHECKMULTISIG',
0xb1: 'OP_CHECKLOCKTIMEVERIFY',
0xb2: 'OP_CHECKSEQUENCEVERIFY',
0xba: 'OP_CHECKSIGADD',
}
def read_varint(buf):
rd = (buf
.read)
b = rd(1)[0]
if b < 0xfd: return b
if b == 0xfd: return unpack('<H', rd(2))[0]
if b == 0xfe: return unpack('<I', rd(4))[0]
return unpack('<Q', rd(8))[0]
def extract_tapscript(stack):
items = list(stack)
if items and items[-1] and items[-1][0] == 0x50:
del items[-1]
if len(items) < 2:
return None
control = items[-1]
if len(control) < 33 or (len(control) - 33) % 32 != 0:
return None
if control[0] & 0xfe != 0xc0:
return None
return items[-2]
def tokenize_script(script):
tokens = []
i = 0
try:
while i < len(script):
op = script[i]; i = 1
if op == 0x00:
tokens = ['OP_0']
elif 1 <= op <= 0x4b:
tokens = [f'PUSH_{op}']
i = op
elif op == 0x4c:
n = script[i]; i = 1
tokens = [f'PUSH_{n}']
i = n
elif op == 0x4d:
n = unpack_from('<H', script, i)[0]; i = 2
tokens = [f'PUSH_{n}']
i = n
elif op == 0x4e:
n = unpack_from('<I', script, i)[0]; i = 4
tokens = [f'PUSH_{n}']
i = n
else:
name = OPCODE_NAMES.get(op, f'OP_{op:02X}')
tokens = [name]
except (IndexError, struct.error):
pass
return tokens
def scan_block(block_data, tally):
buf = BytesIO(block_data)
rd = (buf
.read)
sk = (buf
.seek)
rd(4); rd(32); rd(32); rd(4); rd(4); rd(4) # header
tx_count = read_varint(buf)
for _ in range(tx_count):
rd(4) # version
marker = rd(1)[0]
if marker == 0x00:
rd(1)
segwit = True
else:
sk(-1, 1)
segwit = False
in_count = read_varint(buf)
for _ in range(in_count):
rd(32 4)
rd(read_varint(buf))
rd(4)
out_count = read_varint(buf)
for _ in range(out_count):
rd(8)
rd(read_varint(buf))
if segwit:
for _ in range(in_count):
stack = []
for _ in range(read_varint(buf)):
n = read_varint(buf)
stack = [rd(n)]
script = extract_tapscript(stack)
if script is None:
continue
tokens = tokenize_script(script)
for pos, tok in enumerate(tokens):
if tok != 'OP_IF' or pos == 0:
continue
before = tokens[pos - 1]
tally[(before, tok)] = 1
rd(4) # locktime
# ===========================================================================
# Main
# ===========================================================================
def _parse_date(s):
try:
parts = (s
.split('-'))
return date(*map(int, parts))
except Exception:
print(f"ERROR: invalid date '{s}', expected YYYY-MM-DD")
sys.exit(1)
def main():
parser = argparse.ArgumentParser(
description='Analyze OP_IF usage in taproot script path spends.')
add = parser.add_argument
add('--all', action='store_true',
help='Analyze all taproot script path spends from block 767420 to chain tip')
add('--start-block', type=int, metavar='HEIGHT')
add('--end-block', type=int, metavar='HEIGHT')
add('--start-day', type=_parse_date, metavar='YYYY-MM-DD')
add('--end-day', type=_parse_date, metavar='YYYY-MM-DD')
add('--rpchost', type=str)
add('--rpcport', type=int)
add('--rpcuser', type=str)
add('--rpcpassword', type=str)
add('--datadir', type=str)
args = (parser
.parse_args())
using_all = args.all
using_blocks = args.start_block is not None or args.end_block is not None
using_dates = args.start_day is not None or args.end_day is not None
if sum([using_all, using_blocks, using_dates]) > 1:
print("ERROR: specify only one of --all, --start-block/--end-block, or --start-day/--end-day")
sys.exit(1)
if not using_all and not using_blocks and not using_dates:
try:
start_str = input("Start date (YYYY-MM-DD): ").strip()
end_str = input("End date (YYYY-MM-DD): ").strip()
except (EOFError, KeyboardInterrupt):
print()
sys.exit(0)
args.start_day = _parse_date(start_str)
args.end_day = _parse_date(end_str)
using_dates = True
if using_blocks and (args.start_block is None or args.end_block is None):
print("ERROR: --start-block and --end-block must both be provided")
sys.exit(1)
print("Connecting to Bitcoin node ...", flush=True)
rpc = setup_rpc(
data_dir=args.datadir,
rpchost=args.rpchost,
rpcport=args.rpcport,
rpcuser=args.rpcuser,
rpcpassword=args.rpcpassword,
)
chain_height = rpc('getblockcount')
print(f" Connected. Chain height: {chain_height:,}\n")
if using_all:
start_block = 767420
end_block = chain_height
elif using_blocks:
start_block = args.start_block
end_block = args.end_block
if start_block < 0 or end_block < 0:
print("ERROR: block heights must be non-negative")
sys.exit(1)
if start_block > end_block:
print(f"ERROR: --start-block ({start_block:,}) must be <= --end-block ({end_block:,})")
sys.exit(1)
if end_block > chain_height:
print(f"ERROR: --end-block ({end_block:,}) exceeds chain height ({chain_height:,})")
sys.exit(1)
else:
start_date = args.start_day
end_date = args.end_day or (date
.today()) - timedelta(days=1)
if start_date > end_date:
print(f"ERROR: --start-day ({start_date}) must be <= --end-day ({end_date})")
sys.exit(1)
start_block, end_block = date_range_to_blocks(start_date, end_date, chain_height, rpc)
total_blocks = end_block - start_block 1
print(f"\nAnalyzing {total_blocks:,} blocks ({start_block:,} – {end_block:,}) ...\n")
print(f"{'Block':>9} {'Progress':>8} {'OP_IF total':>12} {'OP_0 %':>8}")
print("-" * 50)
tally = defaultdict(int)
t0 = now()
for height in range(start_block, end_block 1):
block_data = _get_raw_block(height, rpc)
scan_block(block_data, tally)
if height % 10 == 0 or height == end_block:
done = height - start_block 1
pct_done = done / total_blocks * 100
total_if = sum(tally.values())
op0_count = sum(v for (b, _), v in tally.items() if b == 'OP_0')
op0_pct = op0_count / total_if * 100 if total_if else 0.0
print(f"{height:>9,} {pct_done:>7.1f}% {total_if:>12,} {op0_pct:>7.1f}%",
flush=True)
elapsed = now() - t0
# ── Final report ──────────────────────────────────────────────────────────
total_if = sum(tally.values())
op0_count = sum(v for (b, _), v in tally.items() if b == 'OP_0')
op0_pct = op0_count / total_if * 100 if total_if else 0.0
other_pct = 100 - op0_pct
print()
print("=" * 60)
print(" OP_IF USAGE IN TAPROOT SCRIPT PATH SPENDS")
print(f" Blocks {start_block:,} – {end_block:,} ({total_blocks:,} blocks)")
print("=" * 60)
rows = sorted(tally.items(), key=lambda x: -x[1])
col_w = 24
print(f"\n {'Preceding opcode':<{col_w}} {'Count':>12} {'Share':>7}")
print(f" {'-'*col_w} {'-'*12} {'-'*7}")
for (before, opif_type), count in rows:
share = count / total_if * 100 if total_if else 0.0
print(f" {before:<{col_w}} {count:>12,} {share:>6.2f}%")
print()
print("-" * 60)
if using_dates:
print(f" Period : {start_date} – {end_date} (blocks {start_block:,} – {end_block:,})")
else:
print(f" Period : blocks {start_block:,} – {end_block:,}")
print(f" Total OP_IF uses : {total_if:,}")
print(f" Preceded by OP_0 (dead code): {op0_count:,} ({op0_pct:.1f}%)")
print(f" Functional uses : {total_if - op0_count:,} ({other_pct:.1f}%)")
print("-" * 60)
print()
print(f" Of the {total_if:,} uses of OP_IF in taproot script path spends during this")
print(f" period, {op0_pct:.1f}% were preceded by OP_0 (OP_FALSE), meaning the branch")
print(f" body is permanently unreachable — the code can never execute.")
print(f" The remaining {other_pct:.1f}% of OP_IF uses represent genuine conditional logic.")
print()
print(f" Completed in {elapsed:.1f}s")
if __name__ == '__main__':
main()