高级技巧:利用Lua编写安全场景的测试数据生成工具

渗透技巧 2个月前 admin
198 0 0

高级技巧:利用Lua编写安全场景的测试数据生成工具

高级技巧:利用Lua编写安全场景的测试数据生成工具
高级技巧:利用Lua编写安全场景的测试数据生成工具
Tim
@PortalLab实验室

1. 背景

在流量采集和分析的场景中,一种常见架构如下所示:

高级技巧:利用Lua编写安全场景的测试数据生成工具

在上述架构中,交换机通过流量镜像的方式,将用户与应用服务器之间的流量“复制”给流量采集/分析服务器。流量服务器上部署的采集探针负责协议数据包的重组,以及一部分流量分析工作,比如判断数据包是否触发某些规则。此时,需要对流量采集探针进行两方面的测试工作:

  • 性能测试:如果采集探针重组和分析数据包的性能不够高,那么将导致丢包,进而影响后续的进一步分析

  • 功能测试:从大量的流量中,准确地识别出风险事件、敏感数据等是流量分析的基础工作,如果无法做好这些工作,那么流量采集和分析将失去其意义

为进行性能测试,需要在模拟的用户和应用服务器之间,发送大量请求。为进行功能测试,需要在模拟的用户和应用服务器之间发送多种具有特定特征的流量。当前最主流的应用层协议非 HTTP 莫属。接下来将讲述如何使用 Lua 语言扩展 Nginx 和 Wrk,实现针对 HTTP 协议的性能测试和功能测试。

高级技巧:利用Lua编写安全场景的测试数据生成工具

2. 测试环境

操作系统:CentOS 7.9

3. 安装 Openresty

Openresty 是完全成熟的 Web 应用服务器,它捆绑了标准的 Nginx 核心,大量的第三方模块,以及它们的大部分外部依赖。


3.1 安装依赖包

sudo yum install -y pcre pcre-devel openssl openssl-devel perl make gcc curl zlib zlib-devel

3.2 下载源码包

去官网的 Download 页面,下载 Openrestry 源码包。本文使用的是 openresty-1.19.9.1.tar.gz。

wget https://openresty.org/download/openresty-1.19.9.1.tar.gz

3.3 安装

tar zxf openresty-1.19.9.1.tar.gzcd openresty-1.19.9.1/./configure --with-luajit --with-http_iconv_modulemake -j8 && sudo make install

Openresty 默认被安装到 /usr/local/openresty/。


3.4 验证

/usr/local/openresty/bin/openresty -V

4. 安装 Wrk

wrk 是现代的 HTTP 基准测试工具,当在单个多核 CPU 上运行时,能够产生显著的负载。它结合多线程设计和可扩展的事件通知系统,比如 epoll 和 kqueue。

可选的 LuaJIT 脚本可以执行 HTTP 请求生成、响应处理和自定义报告。


4.1 安装依赖包

sudo yum install -y gcc openssl openssl-devel git curl

4.2 克隆源码

git clone https://github.com/wg/wrk.git wrk

4.3 编译

cd wrk/make

编译完成后,生成的二进制可执行文件 wrk 被保存当前目录中。可以将其移动到 PATH 中的某个目录下。


4.4 验证

./wrk -v

5. Wrk 脚本简介


5.1 概览

Wrk 支持在三个不同阶段期间执行 LuaJIT 脚本:Setup、Running和Done。每个Wrk 线程拥有独立的脚本环境,Setup和Done阶段在单独的环境中执行,该环境不参与 Running 阶段。
公有 Lua API 包含全局表和多个全局函数:
  wrk = {    scheme  = "http",    host    = "localhost",    port    = nil,    method  = "GET",    path    = "/",    headers = {},    body    = nil,    thread  = <userdata>,  }
function wrk.format(method, path, headers, body)

wrk.format 返回由传入参数与 wrk 表中的值合并得到的 HTTP 请求字符串。

function wrk.lookup(host, service)

wrk.lookup 返回包含 host 和 service 对的所有已知地址的表。与 POSIX getaddrinfo() 函数对应。

function wrk.connect(addr)

如果能够连接到 addr,wrk.connect 返回 true,否则返回 false。addr 必须是从 wrk.lookup 返回的地址。

如下全局变量是可选的,如果定义,那么必须是函数:

  • global setup -- 在线程 Setup 期间调用

  • global init -- 在线程启动时调用

  • global delay -- 用于获取请求延迟

  • global request -- 用于生成 HTTP 请求

  • global response -- 使用 HTTP 响应数据调用

  • global done -- 使用运行结果调用

5.2 Setup

function setup(thread)

在已解析目标 IP 地址,并且所有线程已初始化,但尚未启动之后,Setup 阶段开始。

为每个线程,调用一次 setup(),该函数接收代表线程的 userdata 对象。

  • thread.addr - 获取或设置线程的服务端地址

  • thread:get(name) - 获取线程环境中的全局变量的值

  • thread:set(name, value) - 设置线程环境中的全局变量的值

  • thread:stop() - 停止线程

只有布尔值、nil、number 和字符串值或相同的表可以通过 get()/set() 传递,thread:stop() 只能在线程运行时调用。


5.3 Running

function init(args)function delay()function request()function response(status, headers, body)

Running 阶段从对 init() 的单次调用开始,接下来为每个请求周期调用 request() 和 response()。

init() 函数为脚本接受额外的命令行参数,必须用 “--” 将其与 wrk 参数隔开。

delay() 返回延迟发送下个请求的毫秒数。

request() 返回包含 HTTP 请求的字符串。在测试高性能服务器时,每次都构建新请求代价很大。一个方案是在 init() 中预生成所有请求,然后在 request() 中进行快速查询。

使用 HTTP 响应状态码、头和体调用 response()。解析头和体代价很大,因此如果在调用 init() 后,response 全局变量是 nil,wrk 将忽略头和体。


5.4 Done

function done(summary, latency, requests)

done() 函数接收包含结果数据,以及代表每个请求延迟和每个线程请求速率的两个统计对象的表。持续时间和延迟都是微秒值,而速率以每秒的请求数来衡量。

  • latency.min -- 所见的最小值

  • latency.max -- 所见的最大值

  • latency.mean -- 所见的平均值

  • latency.stdev -- 标准偏差

  • latency:percentile(99.0) -- 百分之 99 的值

  • latency(i) -- 原始值和计数

summary = {    duration = N,  -- 运行持续时间,单位为微秒    requests = N,  -- 已完成的请求总数    bytes    = N,  -- 接收的总字节数    errors   = {      connect = N, -- Socket 连接错误总数      read    = N, -- Socket 读取错误总数      write   = N, -- Socket 写错误总数      status  = N, -- 大于 399 的 HTTP 状态码总数      timeout = N  -- 请求超时总数    }  }

6. 使用 Python 生成随机图片

图片是非常常见的资源类型,常见图片格式包括 JPG、PNG、GIF 等。测试过程中,可能希望模拟的服务端返回具有指定宽度和高度的图片。Pillow 是 Python 中强大的图片处理库,接下来使用 Pillow 生成随机的 JPG、PNG、GIF 图片。

首先,需要安装 Pillow:

pip install pillow

下面是实现代码:

import stringimport typingfrom optparse import OptionParserimport randomimport os
from PIL import Image, ImageDraw

def generate_jpg(width: int, height: int, output: str) -> None: """ 生成一张随机的 JPG 图片 :param width: 生成的图片的宽度 :param height: 生成的图片的高度 :param output: 输出文件名称 """ img: Image = Image.new("RGB", (width, height)) pixels = img.load() for x in range(width): for y in range(height): r = random.randint(0, 255) g = random.randint(0, 255) b = random.randint(0, 255) pixels[x, y] = (r, g, b) img.save(output, format="JPEG") print(f"the generated JPEG image is stored in {output}, file size is {os.stat(output).st_size / 1024} KB")

def generate_png(width: int, height: int, output: str) -> None: """ 生成一张随机的 PNG 图片 :param width: 生成的图片的宽度 :param height: 生成的图片的高度 :param output: 输出文件名称 """ img: Image = Image.new("RGBA", (width, height)) draw: ImageDraw = ImageDraw.Draw(img) for x in range(width): for y in range(height): alpha = random.randint(0, 255) r = random.randint(0, 255) g = random.randint(0, 255) b = random.randint(0, 255) draw.point((x, y), fill=(r, g, b, alpha)) img.save(output, format="PNG") print(f"the generated PNG image is stored in {output}, file size is {os.stat(output).st_size / 1024} KB")

def generate_gif(width: int, height: int, num_frames: int, output: str) -> None: """ 生成一张随机的 GIF 图片 :param width: 生成的图片的宽度 :param height: 生成的图片的高度 :param num_frames: 生成的图片的桢数 :param output: 输出文件名称 """ frames: typing.List[Image] = [] for _ in range(num_frames): # 生成每一帧的随机图像 image = Image.new("RGB", (width, height)) for x in range(width): for y in range(height): r = random.randint(0, 255) g = random.randint(0, 255) b = random.randint(0, 255) image.putpixel((x, y), (r, g, b)) # 将当前帧添加到帧列表中 frames.append(image) # 保存图像 frames[0].save(output, format="GIF", append_images=frames[1:], save_all=True, duration=200, loop=1) print(f"the generated GIF image is stored in {output}, file size is {os.stat(output).st_size / 1024} KB")

def generate_text(size: int, output: str) -> None: """ 生成特定长度的随机文本 :param size: 生成的随机文本的长度 :param output: 输出文件名称 """ with open(output, "wb") as fd: current_size: int = size while current_size > 0: # 每次生成 4K batch: int = min(4096, current_size) fd.write("".join([random.choice(string.printable) for _ in range(batch)]).encode()) current_size -= batch print(f"the generated text is store in {output}, file size is {os.stat(output).st_size / 1024} KB")

def main() -> None: parser: OptionParser = OptionParser(usage="python %prog options...") parser.add_option("-t", "--type", dest="type", default="txt", type=str, help="the type of generated file, including jpg, png, gif, txt") parser.add_option("-w", "--width", dest="width", default=200, type=int, help="the width of image, if type is image") parser.add_option("-H", "--height", dest="height", default=200, type=int, help="the height of image, if type is image") parser.add_option("-s", "--size", dest="size", default=1024, type=int, help="the size of generated file, in bytes") parser.add_option("-o", "--output", dest="output", default="a", type=str, help="output file name") parser.add_option("-n", "--num-frames", dest="num_frames", default=10, type=int, help="the frame number of generated GIF image") options, _ = parser.parse_args()
_, ext = os.path.splitext(options.output) if options.type.lower() == "jpg": if ext not in [".jpg", "jpeg", ".jfif"]: options.output += ".jpg" generate_jpg(options.width, options.height, options.output) return if options.type.lower() == "png": if ext not in [".png"]: options.output += ".png" generate_png(options.width, options.height, options.output) return if options.type.lower() == "gif": if ext not in [".gif"]: options.output += ".gif" generate_gif(options.width, options.height, options.num_frames, options.output) return if options.type.lower() == "txt": if ext not in [".txt"]: options.output += ".txt" generate_text(options.size, options.output)

if __name__ == "__main__": main()

比如,执行如下命令将生成 100x100 的 GIF 图片:

python3 generate_image.py -t gif -o 100x100.gif --width 100 --height 100 --num-frames 20

7. 测试项目

下文假定以 root 用户进行操作,工作目录是 /root/demo/。


7.1 创建测试项目

.├── generate_image.py├── nginx.conf└── wrk.lua

创建 static/ 目录,用于存储图片、文本等静态文件:

mkdir -p static/

创建 logs/ 目录,用于保存 Nginx 的日志文件等:

mkdir -p logs/

将 Nginx conf/ 目录下的 mime.types 文件拷贝到当前目录:

cp /usr/local/openresty/nginx/conf/mime.types .

使用如下命令生成 wrk.lua 中使用的图片和文本文件:

python3 generate_image.py -t gif -w 100 -H 100 -o static/100x100.gifpython3 generate_image.py -t png -w 100 -H 100 -o static/100x100.pngpython3 generate_image.py -t txt -s 131072 -o static/128k.txt

wrk.lua 的内容如下:

local counter = 1local threads = {}
function setup(thread) thread:set("id", counter) table.insert(threads, thread) counter = counter + 1end
-- 在 init 中预生成所有请求,在 request 中顺序选择function init(args) current_index = 0
-- 保存所有预生成请求的表 pregenerated_requests = {} table.insert( pregenerated_requests, wrk.format( "GET", "/path/1", { ["X-Predefined-Strategy"] = "png,100x100.png" } ) ) table.insert( pregenerated_requests, wrk.format( "POST", "/path/2", { ["X-Predefined-Strategy"] = "gif,100x100.gif", ["Content-Type"] = "application/x-www-form-urlencoded" }, "foo=bar&baz=quux" ) ) table.insert( pregenerated_requests, wrk.format( "PUT", "/path/3/arbitrary/here", { ["X-Predefined-Strategy"] = "text,128k.txt", ["Content-Type"] = "application/x-www-form-urlencoded" }, "foo=bar&baz=quux" ) ) table.insert( pregenerated_requests, wrk.format( "GET", "/path/4", { ["Content-Type"] = "application/json;charset=utf8" }, [[ { "headers": {"x-header-a": "a", "content-type": "text/plain"}, "status_code": 200, "body": "this is a very very simple text body, but it maybe meet some rules." }]] ) )end
function request() current_index = current_index + 1 return pregenerated_requests[current_index%#pregenerated_requests+1]end

nginx.conf 的内容如下:

worker_processes  auto;
error_log logs/error.log;error_log logs/error.log notice;error_log logs/error.log info;
pid        logs/nginx.pid;
events { worker_connections 4096;}
http { include mime.types; default_type application/octet-stream;
log_format main '$remote_addr - $remote_user [$time_local] "$request" ' '$status $body_bytes_sent "$http_referer" ' '"$http_user_agent" "$http_x_forwarded_for"'; access_log logs/access.log main;
sendfile on; tcp_nodelay on; keepalive_timeout 15;
open_file_cache max=2048 inactive=30s; open_file_cache_valid 10s; open_file_cache_min_uses 1; open_file_cache_errors on;
# 请求体不能超过该设置。 # 如果客户端需要传递更大的请求体,那么调整该设置 client_max_body_size 10m;
# 预置策略。 # 预置策略通过名称进行唯一标识。 # 预置策略封装响应码、响应头和响应体(可选),可避免客户端重复传输这些内容。 # 客户端通过请求头指定使用的策略。 # 如此一来,客户端可以同时自由地定制请求和响应,只多出用于指定所用策略的请求头。 # 服务端使用紧跟在策略名称后面的磁盘文件名称,获取响应体。 # 因此,需要提前将生成的响应体存储到磁盘文件。 # 如果未提供文件名称,并且策略中存在响应体,那么使用策略中的响应体。 # 如果都未提供,那么返回空响应体。 # 请求头类似 X-Predefined-Strategy: png,100x100.png 或 X-Predefined-Strategy: forbidden。 # 注意: # 1. 预置的响应体不能太大,否则将占用太多的共享内存 # 2. 预置策略名称不能包含 "," # 3. 预置策略中指定的响应头名称不会被规范化,因此设置时,需要注意 lua_shared_dict predefined_stategies 128m;
init_by_lua_block { -- 保存策略 local function save_strategy(name, status_code, headers, body) if name == nil then return ngx.log(ngx.ERR, "no strategy name provided") end local t = { status_code = status_code or ngx.HTTP_OK, headers = {}, body = body or "" } for name, value in pairs(headers or {}) do t.headers[name] = value end
local cjson = require("cjson.safe") local j = cjson.encode(t) if j == nil then return end local s = ngx.shared.predefined_stategies local suc, err = s:set(name, j) if suc then ngx.log(ngx.INFO, "setting strategy " .. name .. " succeeded") else ngx.log(ngx.ERR, "setting strategy " .. name .. " failed with " .. err) end end
-- 按需添加策略 save_strategy("png", ngx.HTTP_OK, {["Content-Type"]="image/png"}) save_strategy("jpeg", ngx.HTTP_OK, {["Content-Type"]="image/jpeg"}) save_strategy("gif", ngx.HTTP_OK, {["Content-Type"]="image/gif"}) save_strategy("ico", ngx.HTTP_OK, {["Content-Type"]="image/x-icon"}) save_strategy("text", ngx.HTTP_OK, {["Content-Type"]="text/plain"}) save_strategy("json", ngx.HTTP_OK, {["Content-Type"]="application/json;charset=utf8"}) save_strategy( "forbidden_default", ngx.HTTP_FORBIDDEN, {["Content-Type"]="text/plain"}, "forbidden" ) save_strategy( "notfound_default", ngx.HTTP_NOT_FOUND, {["Content-Type"]="text/plain"}, "not found" ) }
server { listen 80; server_name localhost;
location /static { alias static/; }
location / { content_by_lua_block { local cjson = require "cjson.safe"
-- 规范化响应名称 local normalize_header = function(name) -- 1. _ 替换成 - name = string.gsub(name, "_", "-") -- 2. 第一个字符大写 name = string.gsub(name, "%w", function(m) return string.upper(m) end, 1) -- 3. - 后面的字符大写 return string.gsub(name, "-%w", function(m) return string.upper(m) end) end
-- 生成指定长度的随机字符串 local function generate_random_string(length) local chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" -- 使用当前时间作为随机种子 math.randomseed(os.time()) local r = "" for _ = 1, length do local idx = math.random(1, #chars) r = r .. string.sub(chars, idx, idx) end return r end
-- 当指定的策略或路径不存在时,返回该响应码 local strategy_not_found_status = ngx.HTTP_NOT_FOUND
local strategy local path local predefined_strategy_hdr = ngx.var.http_x_predefined_strategy -- 如果客户端指定策略 if predefined_strategy_hdr ~= nil then local start_pos, end_pos = string.find(predefined_strategy_hdr, ",", 1, true) if start_pos == nil then strategy = predefined_strategy_hdr else strategy = string.sub(predefined_strategy_hdr, 1, start_pos-1) path = string.sub(predefined_strategy_hdr, end_pos+1) end -- 获取策略 local s = ngx.shared.predefined_stategies strategy = s:get(strategy) -- 如果策略不存在,那么返回错误 if strategy == nil then ngx.status = strategy_not_found_status ngx.header.content_type = "text/plain" ngx.say("the provided stategy not found") return ngx.exit(ngx.HTTP_OK) end -- 根据策略设置响应 strategy = cjson.decode(strategy) ngx.status = strategy.status_code for name, value in pairs(strategy.headers or {}) do ngx.header[name] = value end -- 设置响应体 if path == nil then ngx.print(strategy.body) return ngx.exit(ngx.HTTP_OK) end -- 发起子请求,从磁盘获取响应体 local res = ngx.location.capture("/static".."/"..path) if res.status == ngx.HTTP_OK then ngx.print(res.body) else ngx.status = strategy_not_found_status ngx.header.content_type = "text/plain" ngx.say("the provided path not found") end return ngx.exit(ngx.HTTP_OK) end
-- 非策略模式用于构建期望的响应
-- 当请求体不合法时,返回该响应码 local bad_request_status_code = ngx.HTTP_BAD_REQUEST
-- 读取请求体 ngx.req.read_body() local body_raw = ngx.req.get_body_data() -- 解析请求体 local body, err = cjson.decode(body_raw) -- 解析请求体失败 if err ~= nil then ngx.status = bad_request_status_code ngx.say(err) return ngx.exit(ngx.HTTP_OK) end -- 设置响应码 local status_code = body["status_code"] if status_code == nil then status_code = ngx.HTTP_OK end ngx.status = status_code -- 设置响应头 for name, value in pairs(body["headers"] or {}) do ngx.header[normalize_header(name)] = value end -- 设置响应体 if body["body"] then -- 使用客户端指定的响应体 ngx.print(body["body"]) else -- 使用客户端指定的随机响应体长度生成响应体。 -- 指定的长度越长,生成随机字符串的效率越低,请设置合理的长度 if body["random_body_length"] then ngx.print(generate_random_string(tonumber(body["random_body_length"]))) else -- 返回空响应体 ngx.print("") end end return ngx.exit(ngx.HTTP_OK) } } }}

关于 generate_image.py 的内容,请参阅前一章节。


7.2 启动 Openresty

/usr/local/openresty/bin/openresty -p . -c nginx.conf

注意参数 -p .,其中 . 代表工作目录 /root/demo/。

使用类似下面的命令测试 Openresty 是否启动成功:

curl     -w 'url effective: %{url_effective}nhttp code:%{http_code}ntime total: %{time_total}ntime namelookup: %{time_namelookup}ntime connect: %{time_connect}ntime appconnect: %{time_appconnect}ntime pretransfer: %{time_pretransfer}ntime redirect: %{time_redirect}ntime starttransfer: %{time_starttransfer}nsize download: %{size_download}nsize request: %{size_request}n'     -X POST     -H "content-type: application/json"     -d '{"headers": {"x-header-a": "a", "x-header_b": "b"}, "status_code": 200, "random_body_length": "65536"}'     -v -o /dev/null     http://127.0.0.1/path

7.3 使用 Wrk 进行压测

Wrk 的命令行选项如下:

  • -c/--connections:保持打开的 HTTP 连接总数,每个线程处理 N = connections/threads
  • -d/--duration:测试的持续时间,比如 2s、2m、2h
  • -t/--threads:使用的线程总数
  • -s/--script:LuaJIT 脚本
  • -H/--header:添加到请求的 HTTP 头,比如 "User-Agent: wrk"
  • --latency:打印详细的延迟统计
  • --timeout:如果在此时间内未收到响应,则记录超时时间

使用如下命令进行测试:

wrk -s wrk.lua --latency -t 16 -c 1600 -d 300s http://127.0.0.1

请按照机器配置适当调整线程数和连接数。这里使用的测试服务器的配置是 8C/16G。

在响应大小(单位是字节)如下的情况下:

"GET /path/1 HTTP/1.1" 200 40181"POST /path/2 HTTP/1.1" 200 141424"PUT /path/3/arbitrary/here HTTP/1.1" 200 131086"GET /path/4 HTTP/1.1" 200 78

测试结果下:

Running 5m test @ http://127.0.0.1  16 threads and 1600 connections  Thread Stats   Avg      Stdev     Max   +/- Stdev    Latency    18.22ms   27.01ms   1.84s    94.58%    Req/Sec     6.00k     0.96k   16.17k    72.14%  Latency Distribution     50%   11.36ms     75%   19.14ms     90%   30.21ms     99%  133.45ms  28642285 requests in 5.00m, 2.04TB readRequests/sec:  95443.39Transfer/sec:      6.97GB


关于Portal Lab

星阑科技 Portal Lab 致力于前沿安全技术研究及能力工具化。主要研究方向为数据流动安全、API 安全、应用安全、攻防对抗等领域。实验室成员研究成果曾发表于BlackHat、HITB、BlueHat、KCon、XCon等国内外知名安全会议,并多次发布开源安全工具。未来,Portal Lab将继续以开放创新的态度积极投入各类安全技术研究,持续为安全社区及企业级客户提供高质量技术输出。

高级技巧:利用Lua编写安全场景的测试数据生成工具


原文始发于微信公众号(星阑实验室):高级技巧:利用Lua编写安全场景的测试数据生成工具

版权声明:admin 发表于 2023年8月10日 上午9:01。
转载请注明:高级技巧:利用Lua编写安全场景的测试数据生成工具 | CTF导航

相关文章

暂无评论

暂无评论...