image-20260225092216111

简单介绍

Mitmproxy,一个结合python的CLI抓包工具,基于MITM形式,类似于BurpSuite。但是,与BurpSuite、Yakit、Charless等工具不同,mitmproxy更偏向于结合python,原生开发一些抓包过程中的插件脚本(addon),自由度更加高,适合用来做一些定制化、批量化的测试,这也是mitmproxy最核心、最强大的地方(python语法简单,容易开发;和burpsuite不同,mitmproxy无需关心额外的API,使得插件更加容易开发)。

  • 拦截HTTP和HTTPS请求和响应并即时修改它们
  • 保存完整的HTTP对话以供以之后重发和分析
  • 重发HTTP对话的客户端
  • 重发先前记录的服务的HTTP响应
  • 反向代理模式将流量转发到指定的服务器
  • 在macOS和Linux上实现透明代理模式
  • 使用Python对HTTP流量进行脚本化修改
  • 实时生成用于拦截的SSL / TLS证书
  • And much, much more…

更加详细的介绍可以参考官方文档:Mitmproxy Introduction

快速开始

下载安装

  1. 官网地址:mitmproxy单独的二进制程序,可以略过

  2. 使用pip安装mitmproxy库以及二进制文件

    1
    pip install mitmproxy
  3. 安装完成之后会得到三个可执行的二进制文件:mitmproxymitmwebmitmdump

    • mitmproxy:最常用的一个工具,在终端命令行中以UI的形式进行展示数据包
    • mitmweb:启动一个web服务,以web界面的形式展示数据包,类似于burpsuite、yakit等工具
    • mitmdump:类似于tcpdump,用来保存数据包到文件中
  4. 验证是否成功

    1
    2
    pip list|grep mitm
    mitmproxy --version

    image-20260225101430985

mitmproxy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
[10:13:40 shaoxinSEC@WIN11 (master)]# mitmproxy --help
用法: python.exe C:\Users\shaoxinSEC\xxxxx\scripts\.venv\Scripts\mitmproxy [options]

选项:
-h, --help 显示此帮助信息并退出
--version 显示版本号并退出
--options 显示所有选项及其默认值
--commands 显示所有命令及其签名
--set option[=value] 设置一个选项。省略 value 时,布尔值会设为 true,
字符串和整数会设为 None(若允许),序列会被清空。
布尔值可为 true、false 或 toggle。序列通过对同一
选项多次调用 set 来设置。
-q, --quiet 静默模式。
-v, --verbose 提高日志详细程度。
--mode, -m MODE 要启动的代理服务器类型(可多次传入)。Mitmproxy
支持 "regular" (HTTP)、"local"、"transparent"、"socks5"、
"reverse:SPEC"、"upstream:SPEC" 和 "wireguard[:PATH]"。
对于 reverse 和 upstream 模式,SPEC 为如下形式的主机说明:
"http[s]://host[:port]"。对于 WireGuard 模式,PATH 可指向
包含密钥材料的文件;若文件不存在,启动时会自动创建。
你可以附加 `@listen_port` 或 `@listen_host:listen_port`
来覆盖某个特定代理模式的 `listen_host` 或 `listen_port`。
客户端回放等功能会使用第一个 mode 来决定上游服务器。
可多次传入。
--no-anticache
--anticache 移除可能导致服务器返回 304-not-modified 的请求头。
--no-showhost
--showhost 使用 Host 头来构造显示用 URL。该选项默认关闭,
因为恶意应用可能发送误导性的 host 头以规避分析。
若这不是你的顾虑,可启用此选项以获得更好的流量显示。
--no-show-ignored-hosts
--show-ignored-hosts 即使不执行 TLS 拦截,也在 UI 中记录被忽略的流量。
该选项会将被忽略流量内容保留在内存中,可能显著增加
内存使用。未来版本将修复此问题、默认记录被忽略流量,
并移除此选项。
--rfile, -r PATH 从文件读取流量。
--scripts, -s SCRIPT 执行脚本。可多次传入。
--stickycookie FILTER 设置粘性 Cookie 过滤器。匹配请求。
--stickyauth FILTER 设置粘性认证过滤器。匹配请求。
--save-stream-file, -w PATH
在流量到达时流式写入文件。路径前缀加 + 表示追加。
完整路径可使用 python strftime() 格式化,缺失目录会按需创建。
每当格式化后的字符串发生变化,都会打开一个新文件。
--no-anticomp
--anticomp 尝试让服务器发送未压缩数据。
--console-layout {horizontal,single,vertical}
控制台布局。
--no-console-layout-headers
--console-layout-headers
显示布局组件标题

代理选项:
--listen-host HOST 代理服务器绑定地址(可被单独模式覆盖,见 `mode`)。
--listen-port, -p PORT
代理服务器绑定端口(可被单独模式覆盖,见 `mode`)。
默认端口由模式决定。regular HTTP 代理默认监听 8080。
--no-server, -n
--server 启动代理服务器。默认启用。
--ignore-hosts HOST 忽略该主机并直接转发所有流量而不处理。在 transparent
模式下,建议使用 IP 地址(范围)而非主机名。在 regular
模式下,仅忽略 SSL 流量,且应使用主机名。给定值按正则表达式
解释,并在 IP 或主机名上匹配。可多次传入。
--allow-hosts HOST 与 --ignore-hosts 相反。可多次传入。
--tcp-hosts HOST 对所有匹配模式的主机启用通用 TCP SSL 代理模式。类似
--ignore-hosts,但会拦截 SSL 连接。通信内容会在 verbose
模式下打印到日志。可多次传入。
--upstream-auth USER:PASS
为上游代理和反向代理请求添加 HTTP Basic 认证。
格式:username:password。
--proxyauth SPEC 要求代理认证。格式:"username:pass","any" 表示接受任意
用户名/密码组合,"@path" 表示使用 Apache htpasswd 文件,
或 "ldap[s]:url_server_ldap[:port]:dn_auth:password:dn_subtree
[?search_filter_key=...]" 表示 LDAP 认证。
--no-store-streamed-bodies
--store-streamed-bodies
在流式传输时存储 HTTP 请求与响应体(见
`stream_large_bodies`)。这会增加内存消耗,但可以检查
流式响应体内容。
--no-rawtcp 禁用原始 TCP 连接。
--rawtcp 启用原始 TCP 连接。默认启用 TCP 连接。
--no-http2 禁用 HTTP/2 支持。
--http2 启用HTTP/2 支持。默认启用 HTTP/2。

SSL:
--certs SPEC SSL 证书,格式为 "[domain=]path"。domain 可包含通配符,
若未指定则等于 "*"。path 指向 PEM 格式证书。若 PEM 中
包含私钥则使用该私钥,否则使用配置目录中的默认私钥。
PEM 文件应包含完整证书链,且叶子证书为第一项。
可多次传入。
--cert-passphrase PASS
用于解密 --cert 选项提供私钥的口令。注意:在命令行传递
cert_passphrase 会让口令暴露在系统进程列表中。
建议在 config.yaml 中指定以避免此问题。
--no-ssl-insecure
--ssl-insecure, -k 不校验上游服务器 SSL/TLS 证书。启用后会跳过证书校验,
mitmproxy 自身也将容易受到 TLS 中间人拦截。

客户端回放:
--client-replay, -C PATH
从已保存文件回放客户端请求。可多次传入。

服务端回放:
--server-replay, -S PATH
从已保存文件回放服务端响应。可多次传入。
--no-server-replay-kill-extra
--server-replay-kill-extra
回放期间终止额外请求(即未找到可回放响应的请求)。
[已弃用,建议使用 server_replay_extra='kill']
--server-replay-extra {forward,kill,204,400,404,500}
回放期间对额外请求(未找到可回放响应)采取的行为。
设置为数字字符串会返回对应状态码的空 HTTP 响应。
--no-server-replay-reuse
--server-replay-reuse
使用后不从服务端回放状态中移除流量,从而可多次回放
同一响应。
--no-server-replay-refresh
--server-replay-refresh
通过调整 date、expires、last-modified 头,以及 Cookie
过期时间,来刷新服务端回放响应。

远程映射:
--map-remote, -M PATTERN
使用如下模式将远程资源映射到另一个远程 URL:
"[/flow-filter]/url-regex/replacement",
其中分隔符可为任意字符。可多次传入。

本地映射:
--map-local PATTERN 使用如下模式将远程资源映射到本地文件:
"[/flow-filter]/url-regex/file-or-directory-path",
其中分隔符可为任意字符。可多次传入。

修改 Body:
--modify-body, -B PATTERN
替换模式格式:
"[/flow-filter]/regex/[@]replacement",
其中分隔符可为任意字符。@ 表示可提供文件路径,
从该文件读取替换字符串。可多次传入。

修改 Headers:
--modify-headers, -H PATTERN
Header 修改模式格式:
"[/flow-filter]/header-name/[@]header-value",
其中分隔符可为任意字符。@ 表示可提供文件路径,
从该文件读取 header 值字符串。若 header-value 为空,
则移除已有的 header-name。可多次传入。

过滤器:
过滤表达式语法请在 mitmproxy 帮助中查看。

--intercept FILTER 拦截过滤表达式。
--view-filter FILTER 仅显示匹配的流量。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
[10:20:02 shaoxinSEC@WIN11 (master)]# mitmproxy --options

# 如果HTTP/2连接空闲超过指定秒数,则发送PING帧以防止远程站点关闭连接。设置为0以禁用此功能。类型为int。
http2_ping_keepalive: 58

# 启用/禁用对QUIC和HTTP/3的支持。默认启用。类型为bool。
http3: true

# 在CONNECT请求中包含Host头。默认启用。类型为bool。
http_connect_send_host_header: true

# 忽略指定主机并转发其所有流量而不进行处理。在透明模式下,建议使用IP地址(范围),而非主机名。在常规模式下,仅忽略SSL流量,应使用主机名。提供的值将被解释为正则表达式,并与IP或主机名进行匹配。类型为sequence of str。
ignore_hosts: []

# 拦截过滤器表达式。类型为optional str。
intercept:

# 拦截开关。类型为bool。
intercept_active: false

# 反向代理:原样保留Alt-Svc头,即使它们不指向mitmproxy。启用此选项可能导致客户端绕过代理。类型为bool。
keep_alt_svc_header: false

# 反向代理:保留原始的Host头,而不将其重写为反向代理目标。类型为bool。
keep_host_header: false

# 证书和CA的TLS密钥大小。类型为int。
key_size: 2048

# 绑定代理服务器(们)的地址(可能被各个模式覆盖,见`mode`)。类型为str。
listen_host: ''

# 绑定代理服务器(们)的端口(可能被各个模式覆盖,见`mode`)。默认情况下,端口是模式特定的。默认的常规HTTP代理在端口8080上启动。类型为optional int。
listen_port:

# 使用形式为"[/flow-filter]/url-regex/file-or-directory-path"的模式将远程资源映射到本地文件,其中分隔符可以是任何字符。类型为sequence of str。
map_local: []

# 使用形式为"[/flow-filter]/url-regex/replacement"的模式将远程资源映射到另一个远程URL,其中分隔符可以是任何字符。类型为sequence of str。
map_remote: []

# 要启动的代理服务器类型。可以传递多次。Mitmproxy支持"regular" (HTTP), "local", "transparent", "socks5", "reverse:SPEC", "upstream:SPEC", 和 "wireguard[:PATH]"代理服务器。对于反向和上游代理模式,SPEC是形式为"http[s]://host[:port]"的主机规范。对于WireGuard模式,PATH可以指向包含密钥材料的文件。如果不存在此类文件,将在启动时创建。您可以附加`@listen_port`或`@listen_host:listen_port`来为特定代理模式覆盖`listen_host`或`listen_port`。诸如客户端回放等功能将使用第一个模式来确定使用哪个上游服务器。类型为sequence of str。
mode:
- regular

# 形式为"[/flow-filter]/regex/[@]replacement"的替换模式,其中分隔符可以是任何字符。@允许提供文件路径,该文件路径用于读取替换字符串。类型为sequence of str。
modify_body: []

# 形式为"[/flow-filter]/header-name/[@]header-value"的头部修改模式,其中分隔符可以是任何字符。@允许提供文件路径,该文件路径用于读取头部值字符串。空的header-value会删除现有的header-name头部。类型为sequence of str。
modify_headers: []

# 规范化出站HTTP/2头部名称,但在这样做时发出警告。HTTP/2不允许大写头部名称。此选项确保在发送之前在自定义脚本中设置的HTTP/2头部被转换为小写。类型为bool。
normalize_outbound_headers: true

# 切换mitmproxy入门应用程序。类型为bool。
onboarding: true

# 入门应用程序域名。对于透明模式,如果应用程序域名的DNS条目不存在,请使用IP地址。类型为str。
onboarding_host: mitm.it

# 用于在美化打印时解析Protobuf字段名的.proto文件路径。类型为optional str。
protobuf_definitions:

# 在代理核心中启用调试日志。类型为bool。
proxy_debug: false

# 要求代理身份验证。格式:"username:pass", "any"以接受任何用户/密码组合, "@path"以使用Apache htpasswd文件,或 "ldap[s]:url_server_ldap[:port]:dn_auth:password:dn_subtree[?search_filter_key=...]"用于LDAP身份验证。类型为optional str。
proxyauth:

# 启用/禁用原始TCP连接。默认启用TCP连接。类型为bool。
rawtcp: true

# 仅读取匹配的流量。类型为optional str。
readfile_filter:

# 请求客户端证书(TLS消息'CertificateRequest')以在客户端和mitmproxy之间建立双向TLS连接(与用于mitmproxy和上游的'client_certs'选项结合使用)。类型为bool。
request_client_cert: false

# 从文件读取流量。类型为optional str。
rfile:

# 在流量到达时流式传输到文件。路径前缀+表示追加。完整路径可以使用python strftime()格式化,缺失的目录将根据需要创建。每次格式化字符串更改时都会打开一个新文件。类型为optional str。
save_stream_file:

# 过滤哪些流量写入文件。类型为optional str。
save_stream_filter:

# 执行脚本。类型为sequence of str。
scripts: []

# 启动代理服务器。默认启用。类型为bool。
server: true

# 从保存的文件重放服务器响应。类型为sequence of str。
server_replay: []

# 在重放期间,对于找不到可重放响应的额外请求的行为。设置数字字符串值将返回具有相应状态码的空HTTP响应。有效值为'forward', 'kill', '204', '400', '404', '500'
server_replay_extra: forward

# 在搜索要重放的已保存流量时忽略请求内容。类型为bool。
server_replay_ignore_content: false

# 在搜索要重放的已保存流量时忽略请求目标主机。类型为bool。
server_replay_ignore_host: false

# 在搜索要重放的已保存流量时忽略的请求参数。类型为sequence of str。
server_replay_ignore_params: []

# 在搜索要重放的已保存流量时忽略的请求负载参数(application/x-www-form-urlencoded 或 multipart/form-data)。类型为sequence of str。
server_replay_ignore_payload_params: []

# 在搜索要重放的已保存流量时忽略请求目标端口。类型为bool。
server_replay_ignore_port: false

# 在重放期间杀死额外请求(对于找不到可重放响应的请求)。[已弃用,建议使用 server_replay_extra='kill'] 类型为bool。
server_replay_kill_extra: false

# `server_replay_reuse`的已弃用别名。类型为bool。
server_replay_nopop: false

# 通过调整日期、expires和last-modified头部以及调整cookie过期时间来刷新服务器重放响应。类型为bool。
server_replay_refresh: true

# 使用后不从服务器重放状态中移除流量。这使得可以多次重放同一响应。类型为bool。
server_replay_reuse: false

# 在搜索要重放的已保存流量时需要匹配的请求头部。类型为sequence of str。
server_replay_use_headers: []

# 即使我们不执行TLS拦截,也在UI中记录被忽略的流量。此选项将把被忽略流量的内容保留在内存中,这可能会大大增加内存使用量。未来的版本将修复此问题,默认记录被忽略的流量,并移除此选项。类型为bool。
show_ignored_hosts: false

# 使用Host头部构建用于显示的URL。默认禁用此选项,因为恶意应用程序可能发送误导性的host头部以逃避您的分析。如果这不是问题,请启用此选项以获得更好的流量显示。类型为bool。
showhost: false

# 不验证上游服务器SSL/TLS证书。如果启用此选项,将跳过证书验证,mitmproxy本身将容易受到TLS拦截攻击。类型为bool。
ssl_insecure: false

# 受信任CA证书的PEM格式文件路径。类型为optional str。
ssl_verify_upstream_trusted_ca:

# 用于上游服务器验证的受信任CA证书目录路径,使用c_rehash工具准备。类型为optional str。
ssl_verify_upstream_trusted_confdir:

# 设置粘性身份验证过滤器。与请求匹配。类型为optional str。
stickyauth:

# 设置粘性cookie过滤器。与请求匹配。类型为optional str。
stickycookie:

# 在流式传输时存储HTTP请求和响应体(见`stream_large_bodies`)。这会增加内存消耗,但使得可以检查流式传输的体。类型为bool。
store_streamed_bodies: false

# 如果请求或响应体超过给定阈值,则向客户端流式传输数据。如果流式传输,体将不会以任何方式存储,并且此类响应无法被修改。理解k/m/g后缀,例如3m表示3兆字节。要存储流式传输的体,请见`store_streamed_bodies`。类型为optional str。
stream_large_bodies:

# 从DNS HTTPS记录中剥离加密的ClientHello (ECH) 数据,以便mitmproxy可以生成匹配的证书。类型为bool。
strip_ech: true

# 用于匹配模式的所有主机的通用TCP SSL代理模式。类似于--ignore-hosts,但SSL连接会被拦截。通信内容在详细模式下打印到日志。类型为sequence of str。
tcp_hosts: []

# 非活动TCP连接的超时(秒)。连接在此时间段不活动后将关闭。类型为int。
tcp_timeout: 600

# 在客户端连接上使用特定椭圆曲线进行ECDHE密钥交换。OpenSSL语法,例如"prime256v1"(见`openssl ecparam -list_curves`)。类型为optional str。
tls_ecdh_curve_client:

# 在服务器连接上使用特定椭圆曲线进行ECDHE密钥交换。OpenSSL语法,例如"prime256v1"(见`openssl ecparam -list_curves`)。类型为optional str。
tls_ecdh_curve_server:

# 设置客户端连接的最大TLS版本。有效值为'UNBOUNDED', 'SSL3', 'TLS1', 'TLS1_1', 'TLS1_2', 'TLS1_3'
tls_version_client_max: UNBOUNDED

# 设置客户端连接的最小TLS版本。UNBOUNDED, SSL3, TLS1 和 TLS1_1 是不安全的。有效值为'UNBOUNDED', 'SSL3', 'TLS1', 'TLS1_1', 'TLS1_2', 'TLS1_3'
tls_version_client_min: TLS1_2

# 设置服务器连接的最大TLS版本。有效值为'UNBOUNDED', 'SSL3', 'TLS1', 'TLS1_1', 'TLS1_2', 'TLS1_3'
tls_version_server_max: UNBOUNDED

# 设置服务器连接的最小TLS版本。UNBOUNDED, SSL3, TLS1 和 TLS1_1 是不安全的。有效值为'UNBOUNDED', 'SSL3', 'TLS1', 'TLS1_1', 'TLS1_2', 'TLS1_3'
tls_version_server_min: TLS1_2

# 用于匹配模式的所有主机的通用UDP SSL代理模式。类似于--ignore-hosts,但SSL连接会被拦截。通信内容在详细模式下打印到日志。类型为sequence of str。
udp_hosts: []

# 向上游代理和反向代理请求添加HTTP基本身份验证。格式:username:password。类型为optional str。
upstream_auth:

# 连接到上游服务器以查找证书详细信息。类型为bool。
upstream_cert: true

# 确保传入的HTTP请求格式正确。禁用此选项会使mitmproxy容易受到HTTP走私攻击。类型为bool。
validate_inbound_headers: true

# 将视图限制为匹配的流量。类型为optional str。
view_filter:

# 流量排序顺序。有效值为'time', 'method', 'url', 'size'
view_order: time

# 反转排序顺序。类型为bool。
view_order_reversed: false

# 启用/禁用WebSocket支持。默认启用WebSocket支持。类型为bool。
websocket: true
1
2
3
4
5
# 监听所有IP的8088端口,加上-s参数可指定addon脚本(如果不指定,mitmproxy默认监听http://localhost:8080)
mitmproxy --listen-host 0.0.0.0 -p 8088

# 浏览器代理到mitmproxy,然后访问下面地址,下载mitmproxy的证书
http://mitm.it

插件

插件(addon)是mitmproxy最强大、最核心的部分,编写回调函数,用来处理不同类型的hook事件(用得较多的一般是http类型中的request事件和response事件)。官方推荐按照下面的方式开发插件,即一个插件就是一个类,在类里面编写回调函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
"""
Basic skeleton of a mitmproxy addon.

Run as follows: mitmproxy -s anatomy.py
"""

import logging


class Counter:
def __init__(self):
self.num = 0

def request(self, flow):
self.num = self.num + 1
logging.info("We've seen %d flows" % self.num)


addons = [Counter()]

hook事件

官方解释

hook事件 回调函数
Lifecycle Events 生命周期事件(mitmproxy本身的运行相关) def load(loader: mitmproxy.addonmanager.Loader):
def running():
def configure(updated: set[str]):
def done():
Connection Events 网络连接相关事件 def client_connected(client: mitmproxy.connection.Client):
def client_disconnected(client: mitmproxy.connection.Client):
def server_connect(data: mitmproxy.proxy.server_hooks.ServerConnectionHookData):
def server_connected(data: mitmproxy.proxy.server_hooks.ServerConnectionHookData):
def server_disconnected(data: mitmproxy.proxy.server_hooks.ServerConnectionHookData):
def server_connect_error(data: mitmproxy.proxy.server_hooks.ServerConnectionHookData):
HTTP Events HTTP相关事件 def requestheaders(flow: mitmproxy.http.HTTPFlow):
def request(flow: mitmproxy.http.HTTPFlow):
def responseheaders(flow: mitmproxy.http.HTTPFlow):
def response(flow: mitmproxy.http.HTTPFlow):
def error(flow: mitmproxy.http.HTTPFlow):
def http_connect(flow: mitmproxy.http.HTTPFlow):
def http_connect_upstream(flow: mitmproxy.http.HTTPFlow):
def http_connected(flow: mitmproxy.http.HTTPFlow):
def http_connect_error(flow: mitmproxy.http.HTTPFlow):
DNS Events DNS相关事件 def dns_request(flow: mitmproxy.dns.DNSFlow):
def dns_response(flow: mitmproxy.dns.DNSFlow):
def dns_error(flow: mitmproxy.dns.DNSFlow):
TCP Events TCP相关事件 def tcp_start(flow: mitmproxy.tcp.TCPFlow):
def tcp_message(flow: mitmproxy.tcp.TCPFlow):
def tcp_end(flow: mitmproxy.tcp.TCPFlow):
def tcp_error(flow: mitmproxy.tcp.TCPFlow):
UDP Events UDP相关事件 def udp_start(flow: mitmproxy.udp.UDPFlow):
def udp_message(flow: mitmproxy.udp.UDPFlow):
def udp_end(flow: mitmproxy.udp.UDPFlow):
def udp_error(flow: mitmproxy.udp.UDPFlow):
QUIC Events QUIC相关事件 def quic_start_client(data: mitmproxy.proxy.layers.quic._hooks.QuicTlsData):
def quic_start_server(data: mitmproxy.proxy.layers.quic._hooks.QuicTlsData):
TLS Events TLS相关事件 def tls_clienthello(data: mitmproxy.tls.ClientHelloData):
def tls_start_client(data: mitmproxy.tls.TlsData):
def tls_start_server(data: mitmproxy.tls.TlsData):
def tls_established_client(data: mitmproxy.tls.TlsData):
def tls_established_server(data: mitmproxy.tls.TlsData):
def tls_failed_client(data: mitmproxy.tls.TlsData):
def tls_failed_server(data: mitmproxy.tls.TlsData):
WebSocket Events WebSocket事件 def websocket_start(flow: mitmproxy.http.HTTPFlow):
def websocket_message(flow: mitmproxy.http.HTTPFlow):
def websocket_end(flow: mitmproxy.http.HTTPFlow):
SOCKS5 Events SOCKS5事件 def socks5_auth(data: mitmproxy.proxy.layers.modes.Socks5AuthData):
Advanced Lifecycle Events 高级生命周期事件 def next_layer(data: mitmproxy.proxy.layer.NextLayer):
def update(flows: Sequence[mitmproxy.flow.Flow]):
def add_log(entry: mitmproxy.log.LogEntry):

API

  1. mitmproxy.addonmanager
  2. mitmproxy.certs
  3. mitmproxy.connection
  4. mitmproxy.contentviews
  5. mitmproxy.coretypes.multidict
  6. mitmproxy.dns
  7. mitmproxy.flow
  8. mitmproxy.http
  9. mitmproxy.net.server_spec
  10. mitmproxy.proxy.context
  11. mitmproxy.proxy.mode_specs
  12. mitmproxy.proxy.server_hooks
  13. mitmproxy.tcp
  14. mitmproxy.tls
  15. mitmproxy.udp
  16. mitmproxy.websocket

常用插件

官方社区中的一些插件

url_extractor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import re
import sys
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Iterable

from mitmproxy import http


# URL 匹配表达式集中定义,便于后续按协议范围扩展。
URL_REGEX = r"(https?|ftp|file|ssh|mysql|ldap)://[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]"


class UrlFinder:
def __init__(self, pattern: str) -> None:
self._compiled_pattern = re.compile(pattern, re.IGNORECASE)

def find_urls(self, content: bytes | None) -> set[str]:
if not content:
return set()
# 只做宽松解码,避免因编码异常中断抓包流程。
text = content.decode("utf-8", errors="ignore")
return {match.group(0) for match in self._compiled_pattern.finditer(text)}


@dataclass(frozen=True)
class OutputConfig:
directory: Path
extension: str = ".txt"


class TimestampFileWriter:
def __init__(self, config: OutputConfig) -> None:
self._config = config
self._file_path = self._build_file_path()
self._prepare_directory()

@property
def file_path(self) -> Path:
return self._file_path

def _build_file_path(self) -> Path:
# 以实例创建时间作为结果文件名,满足单次运行隔离。
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
return self._config.directory / f"{timestamp}{self._config.extension}"

def _prepare_directory(self) -> None:
self._file_path.parent.mkdir(parents=True, exist_ok=True)

def append_many(self, urls: Iterable[str]) -> None:
sorted_urls = sorted(set(urls))
if not sorted_urls:
return
with self._file_path.open("a", encoding="utf-8") as output_file:
for url in sorted_urls:
output_file.write(url + "\n")


class UrlHyperlinkExtractorAddon:
def __init__(self) -> None:
self._finder = UrlFinder(URL_REGEX)
output_directory = Path(__file__).resolve().parent / "output"
self._writer = TimestampFileWriter(OutputConfig(directory=output_directory))
self._seen_urls: set[str] = set()

def _record_urls(self, payload: bytes | None) -> None:
try:
discovered_urls = self._finder.find_urls(payload)
# 仅记录首次出现的 URL,减少重复输出和文件噪音。
fresh_urls = discovered_urls - self._seen_urls
if not fresh_urls:
return
self._writer.append_many(fresh_urls)
self._seen_urls.update(fresh_urls)
for url in sorted(fresh_urls):
print(url, flush=True)
except Exception as error: # pragma: no cover
print(f"ERROR: {error}", file=sys.stderr, flush=True)

def request(self, flow: http.HTTPFlow) -> None:
self._record_urls(flow.request.raw_content)

def response(self, flow: http.HTTPFlow) -> None:
response = flow.response
self._record_urls(response.raw_content if response else None)


addons = [UrlHyperlinkExtractorAddon()]

request_replayer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import hashlib
import sys
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Dict
from uuid import uuid4

from mitmproxy import ctx, http

REPLAY_MARK_HEADER = "X-Replay-Check"
REPLAY_ID_HEADER = "X-Replay-Check-Id"
# 优先覆盖有状态写操作场景,减少无意义重放。
CANDIDATE_METHODS = {"POST", "PUT", "PATCH", "DELETE"}


@dataclass(frozen=True)
class ResponseSnapshot:
status_code: int
body_sha256: str
body_size: int

@classmethod
def from_response(cls, response: http.Response) -> "ResponseSnapshot":
# 用哈希与长度构建轻量指纹,便于快速比较响应是否等价。
body = response.raw_content or b""
body_digest = hashlib.sha256(body).hexdigest()
return cls(
status_code=response.status_code,
body_sha256=body_digest,
body_size=len(body),
)


@dataclass(frozen=True)
class ProbeBaseline:
method: str
url: str
response: ResponseSnapshot


class ProbeStore:
def __init__(self) -> None:
self._baseline_by_probe_id: Dict[str, ProbeBaseline] = {}

def set(self, probe_id: str, baseline: ProbeBaseline) -> None:
self._baseline_by_probe_id[probe_id] = baseline

def pop(self, probe_id: str) -> ProbeBaseline | None:
return self._baseline_by_probe_id.pop(probe_id, None)


class ReplayRiskAnalyzer:
@staticmethod
def is_replay_risky(baseline: ResponseSnapshot, replayed: ResponseSnapshot) -> bool:
# 仅在两次都成功时判定,降低失败路径带来的误报。
if not ReplayRiskAnalyzer._is_success(baseline.status_code):
return False
if not ReplayRiskAnalyzer._is_success(replayed.status_code):
return False
# 绝大多数接口若无防重放,重复请求会返回高度一致的结果。
if baseline.body_sha256 == replayed.body_sha256:
return True
return (
baseline.status_code == replayed.status_code
and baseline.body_size == replayed.body_size
)

@staticmethod
def _is_success(status_code: int) -> bool:
return 200 <= status_code < 300


class TimestampResultWriter:
def __init__(self, output_directory: Path) -> None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self._output_path = output_directory / f"{timestamp}.txt"
self._output_path.parent.mkdir(parents=True, exist_ok=True)

def write_line(self, line: str) -> None:
with self._output_path.open("a", encoding="utf-8") as file:
file.write(line + "\n")


class ReplayRiskDetectorAddon:
def __init__(self) -> None:
output_dir = Path(__file__).resolve().parent / "output" / "replay_risk"
self._store = ProbeStore()
self._analyzer = ReplayRiskAnalyzer()
self._writer = TimestampResultWriter(output_dir)

def response(self, flow: http.HTTPFlow) -> None:
try:
response = flow.response
if response is None:
return

# 带标记的请求视为回放响应,走回放判定分支。
if flow.request.headers.get(REPLAY_MARK_HEADER) == "1":
self._handle_replayed_response(flow)
return

if not self._is_probe_candidate(flow):
return

# 保存基线后立即派发一次客户端重放,使用 probe_id 关联两次响应。
probe_id = str(uuid4())
baseline = ProbeBaseline(
method=flow.request.method.upper(),
url=flow.request.pretty_url,
response=ResponseSnapshot.from_response(response),
)
self._store.set(probe_id, baseline)
self._dispatch_replay(flow, probe_id)
except Exception as error: # pragma: no cover
print(f"ERROR: {error}", file=sys.stderr, flush=True)

def _is_probe_candidate(self, flow: http.HTTPFlow) -> bool:
method = flow.request.method.upper()
if method not in CANDIDATE_METHODS:
return False
if flow.request.headers.get(REPLAY_MARK_HEADER) == "1":
return False
return True

def _dispatch_replay(self, flow: http.HTTPFlow, probe_id: str) -> None:
replay_flow = flow.copy()
replay_flow.request.headers[REPLAY_MARK_HEADER] = "1"
replay_flow.request.headers[REPLAY_ID_HEADER] = probe_id
ctx.master.commands.call("replay.client", [replay_flow])

def _handle_replayed_response(self, flow: http.HTTPFlow) -> None:
probe_id = flow.request.headers.get(REPLAY_ID_HEADER, "")
if not probe_id:
return

# 取出后即移除,避免同一探针被重复消费。
baseline = self._store.pop(probe_id)
if baseline is None or flow.response is None:
return

replay_snapshot = ResponseSnapshot.from_response(flow.response)
if not self._analyzer.is_replay_risky(baseline.response, replay_snapshot):
return

result = (
f"[REPLAY_RISK] {baseline.method} {baseline.url} "
f"original={baseline.response.status_code} "
f"replayed={replay_snapshot.status_code}"
)
print(result, flush=True)
self._writer.write_line(result)


addons = [ReplayRiskDetectorAddon()]

参考文章

  1. Mitmproxy教程 - zha0gongz1 - 博客园
  2. mitmproxy官方文献