(9-2-01)智能编程助手(IDA Pro+VS Code+MCP):MCP服务器

9.3 MCP服务器
本项目的MCP服务器通过MCP协议实现IDA Pro功能的外部暴露,支持加载二进制文件、执行分析等核心操作,提供标准化接口供外部工具调用。其能动态解析插件代码生成工具函数,区分安全与不安全函数并通过命令行参数管控,同时支持多种传输协议和跨平台配置安装。作为交互桥梁,它可与MCP客户端协同,实现逆向工程自动化与大语言模型集成,还能生成配置文档辅助部署。
9.3.1 IDA Pro内部服务器
文件idalib_server.py是一个通过MCP (Model Context Protocol) 协议提供 IDA Pro功能的服务器程序。它使用idalib(IDA Pro的Python 接口库)实现对二进制文件的分析和操作功能,并通过HTTP服务将这些功能暴露给外部工具。文件idalib_server.py的功能有加载二进制文件、执行自动分析、初始化反编译器,并提供了一系列工具方法用于获取函数信息、交叉引用、设置注释等。并且可以通过命令行参数配置服务器的监听地址、端口,以及设置是否启用不安全功能。
logger = logging.getLogger(__name__)
mcp = FastMCP("github.com/mrexodia/ida-pro-mcp#idalib")
#FastMCP.list_tools()是异步的,所以我们违反最佳实践直接访问`._tool_manager`
# 而不是为了获取(非异步)工具列表而启动一个asyncio运行时。
for tool in mcp._tool_manager.list_tools():
sig = inspect.signature(tool.fn)
for name, parameter in sig.parameters.items():
# 这个实例是一个原始的`typing._AnnotatedAlias`,我们无法直接对其进行操作。
# 它的表现类似于:
#
# typing.Annotated[str, 'Name of the function to get']
if not parameter.annotation:
continue
# 这个实例看起来像这样:
#
# InspectedAnnotation(type=, qualifiers=set(), metadata=['Name of the function to get'])
#
annotation = intro.inspect_annotation(
parameter.annotation,
annotation_source=intro.AnnotationSource.ANY
)
# 对于我们的用例,我们附加一个字符串注释作为文档说明,
# 我们提取该字符串并将其分配给工具元数据中的"description"。
if annotation.type is not str:
continue
if len(annotation.metadata) != 1:
continue
description = annotation.metadata[0]
if not isinstance(description, str):
continue
logger.debug("添加参数文档 %s(%s='%s')", tool.name, name, description)
tool.parameters["properties"][name]["description"] = description
def main():
parser = argparse.ArgumentParser(description="通过idalib为IDA Pro提供的MCP服务器")
parser.add_argument("--verbose", "-v", action="store_true", help="显示调试信息")
parser.add_argument("--host", type=str, default="127.0.0.1", help="监听的主机地址,默认:127.0.0.1")
parser.add_argument("--port", type=int, default=8745, help="监听的端口,默认:8745")
parser.add_argument("--unsafe", action="store_true", help="启用不安全功能(危险)")
parser.add_argument("input_path", type=Path, help="要分析的输入文件路径。")
args = parser.parse_args()
if args.verbose:
log_level = logging.DEBUG
idapro.enable_console_messages(True)
else:
log_level = logging.INFO
idapro.enable_console_messages(False)
mcp.settings.log_level = logging.getLevelName(log_level)
mcp.settings.host = args.host
mcp.settings.port = args.port
logging.basicConfig(level=log_level)
# 重置可能在idapythonrc.py中初始化的日志级别
# 这在导入idalib期间会被执行。
logging.getLogger().setLevel(log_level)
if not args.input_path.exists():
raise FileNotFoundError(f"输入文件不存在: {args.input_path}")
# TODO: 添加一个用于指定idb/输入文件的工具(沙箱化)
logger.info("打开数据库: %s", args.input_path)
if idapro.open_database(str(args.input_path), run_auto_analysis=True):
raise RuntimeError("分析输入文件失败")
logger.debug("idalib: 等待分析完成...")
ida_auto.auto_wait()
if not ida_hexrays.init_hexrays_plugin():
raise RuntimeError("初始化Hex-Rays反编译器失败")
plugin = importlib.import_module("ida_pro_mcp.mcp-plugin")
logger.debug("添加工具...")
for name, callable in plugin.rpc_registry.methods.items():
if args.unsafe or name not in plugin.rpc_registry.unsafe:
logger.debug("添加工具: %s: %s", name, callable)
mcp.add_tool(callable, name)
# 注意: https://github.com/modelcontextprotocol/python-sdk/issues/466
fixup_tool_argument_descriptions(mcp)
# 注意: 使用npx @modelcontextprotocol/inspector进行调试
logger.info("MCP服务器可在以下地址访问: http://%s:%d/sse", mcp.settings.host, mcp.settings.port)
try:
mcp.run(transport="sse")
except KeyboardInterrupt:
pass
if __name__ == "__main__":
main()
9.3.2 主服务器
文件server.py是本项目的核心服务器程序,主要功能包括创建MCP服务器实例以支持与IDA Pro插件的交互;通过JSON-RPC协议向IDA插件发送请求,实现函数信息获取、反编译等逆向工程功能;动态解析 mcp-plugin.py 生成工具函数并注册到MCP服务器;提供命令行参数支持(如安装/卸载IDA插件和MCP客户端配置、生成文档、指定传输协议等);管理安全与不安全函数的启用,确保调试等敏感操作需显式授权。
(1)下面代码的功能是设置日志级别,创建一个名为"github.com/mrexodia/ida-pro-mcp"的FastMCP实例,并定义全局变量 jsonrpc_request_id、ida_host和ida_port用于后续的JSON-RPC请求。
# 日志级别对Cline的正常工作至关重要
mcp = FastMCP("github.com/mrexodia/ida-pro-mcp", log_level="ERROR")
jsonrpc_request_id = 1
ida_host = "127.0.0.1"
ida_port = 13337
(2)下面这段代码定义了函数make_jsonrpc_request,用于构建并发送一个JSON-RPC请求到IDA Pro插件。该函数接受一个方法名和可变数量的参数,构建请求体,并通过HTTP POST请求发送到IDA Pro插件。函数make_jsonrpc_request能够处理响应,如果响应中包含错误则抛出异常,如果结果为空则返回字符串“success”。
def make_jsonrpc_request(method: str, *params):
"""向IDA插件发送JSON-RPC请求"""
global jsonrpc_request_id, ida_host, ida_port
conn = http.client.HTTPConnection(ida_host, ida_port)
request = {
"jsonrpc": "2.0",
"method": method,
"params": list(params),
"id": jsonrpc_request_id,
}
jsonrpc_request_id += 1
try:
conn.request("POST", "/mcp", json.dumps(request), {
"Content-Type": "application/json"
})
response = conn.getresponse()
data = json.loads(response.read().decode())
if "error" in data:
error = data["error"]
code = error["code"]
message = error["message"]
pretty = f"JSON-RPC错误 {code}: {message}"
if "data" in error:
pretty += "
" + error["data"]
raise Exception(pretty)
result = data["result"]
# 注意:大语言模型对空响应处理不佳
if result is None:
result = "success"
return result
except Exception:
raise
finally:
conn.close()
(3)下面这段代码定义了函数check_connection,它是一个MCP工具,用于检查IDA Pro插件是否正在运行。函数check_connection调用make_jsonrpc_request函数发送get_metadata请求,如果成功则返回连接成功的信息,如果失败则根据操作系统提供启动IDA Pro插件的快捷键。
@mcp.tool()
def check_connection() -> str:
"""检查IDA插件是否正在运行"""
try:
metadata = make_jsonrpc_request("get_metadata")
return f"已成功连接到IDA Pro(打开的文件:{metadata['module']})"
except Exception as e:
if sys.platform == "darwin": # macOS系统
shortcut = "Ctrl+Option+M"
else:
shortcut = "Ctrl+Alt+M"
return f"无法连接到IDA Pro!请确认已通过“编辑 -> 插件 -> MCP”(快捷键:{shortcut})启动服务器"
(4)下面代码定义了类MCPVisitor,它继承自ast.NodeVisitor,用于遍历抽象语法树(AST)。类MCPVisitor用于解析IDA Pro插件的Python代码,提取函数定义、类型定义和函数描述,并标记不安全函数。
# 代码来自https://github.com/mrexodia/ida-pro-mcp(MIT许可证)
class MCPVisitor(ast.NodeVisitor):
def __init__(self):
self.types: dict[str, ast.ClassDef] = {} # 存储类型定义
self.functions: dict[str, ast.FunctionDef] = {} # 存储函数定义
self.descriptions: dict[str, str] = {} # 存储函数描述
self.unsafe: list[str] = [] # 存储不安全函数名
def visit_FunctionDef(self, node):
"""处理函数定义,解析注释并生成MCP工具函数"""
for decorator in node.decorator_list:
if isinstance(decorator, ast.Name):
if decorator.id == "jsonrpc":
# 处理函数参数的类型注释(Annotated类型)
for i, arg in enumerate(node.args.args):
arg_name = arg.arg
arg_type = arg.annotation
if arg_type is None:
raise Exception(f"函数{node.name}的参数{arg_name}缺少类型注释")
if isinstance(arg_type, ast.Subscript):
# 解析Annotated类型(如Annotated[str, "描述"])
assert isinstance(arg_type.value, ast.Name)
assert arg_type.value.id == "Annotated"
assert isinstance(arg_type.slice, ast.Tuple)
assert len(arg_type.slice.elts) == 2
annot_type = arg_type.slice.elts[0]
annot_description = arg_type.slice.elts[1]
assert isinstance(annot_description, ast.Constant)
# 转换为带Field描述的Annotated类型(适配MCP)
node.args.args[i].annotation = ast.Subscript(
value=ast.Name(id="Annotated", ctx=ast.Load()),
slice=ast.Tuple(
elts=[
annot_type,
ast.Call(
func=ast.Name(id="Field", ctx=ast.Load()),
args=[],
keywords=[
ast.keyword(
arg="description",
value=annot_description)])],
ctx=ast.Load()),
ctx=ast.Load())
elif isinstance(arg_type, ast.Name):
pass # 普通类型注释,无需处理
else:
raise Exception(f"函数{node.name}的参数{arg_name}存在意外的类型注释:{type(arg_type)}")
# 提取函数文档注释
if node.body and isinstance(node.body[0], ast.Expr) and isinstance(node.body[0].value, ast.Constant):
new_body = [node.body[0]] # 保留原注释
self.descriptions[node.name] = node.body[0].value.value # 存储描述
else:
new_body = [] # 无注释则为空
# 生成调用make_jsonrpc_request的代码
call_args = [ast.Constant(value=node.name)] # 第一个参数是函数名
for arg in node.args.args:
call_args.append(ast.Name(id=arg.arg, ctx=ast.Load())) # 后续参数是函数参数
new_body.append(ast.Return(
value=ast.Call(
func=ast.Name(id="make_jsonrpc_request", ctx=ast.Load()),
args=call_args,
keywords=[])))
# 为函数添加@mcp.tool()装饰器
decorator_list = [
ast.Call(
func=ast.Attribute(
value=ast.Name(id="mcp", ctx=ast.Load()),
attr="tool",
ctx=ast.Load()),
args=[],
keywords=[]
)
]
# 创建新的函数节点(用于生成MCP工具)
node_nobody = ast.FunctionDef(node.name, node.args, new_body, decorator_list, node.returns, node.type_comment, lineno=node.lineno, col_offset=node.col_offset)
assert node.name not in self.functions, f"函数重复定义:{node.name}"
self.functions[node.name] = node_nobody
elif decorator.id == "unsafe":
# 标记不安全函数
self.unsafe.append(node.name)
def visit_ClassDef(self, node):
"""处理类定义(仅保留TypedDict类型)"""
for base in node.bases:
if isinstance(base, ast.Name) and base.id == "TypedDict":
self.types[node.name] = node # 存储TypedDict类型定义
(5)下面代码用于读取IDA Pro插件的Python代码文件,使用ast.parse解析为AST,并使用MCPVisitor类遍历AST,提取函数和类型定义。
# 动态生成MCP工具函数(从mcp-plugin.py解析)
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) # 当前脚本目录
IDA_PLUGIN_PY = os.path.join(SCRIPT_DIR, "mcp-plugin.py") # IDA插件路径
GENERATED_PY = os.path.join(SCRIPT_DIR, "server_generated.py") # 生成的工具函数文件
# 检查插件文件是否存在
if not os.path.exists(IDA_PLUGIN_PY):
raise RuntimeError(f"未找到IDA插件:{IDA_PLUGIN_PY}(是否被移动?)")
with open(IDA_PLUGIN_PY, "r") as f:
code = f.read() # 读取插件代码
module = ast.parse(code, IDA_PLUGIN_PY) # 解析为AST
visitor = MCPVisitor()
visitor.visit(module) # 遍历AST提取函数和类型
(6)下面这段代码用于初始化一个字符串code,用于存储生成的Python代码。它包含了导入语句和类型变量定义,为后续添加类型定义和函数定义做准备。
# 生成工具函数代码
code = """# 注意:此文件为自动生成,请勿修改!
# 架构基于https://github.com/mrexodia/ida-pro-mcp(MIT许可证)
import sys
if sys.version_info >= (3, 12):
from typing import Annotated, Optional, TypedDict, Generic, TypeVar, NotRequired
else:
from typing_extensions import Annotated, Optional, TypedDict, Generic, TypeVar, NotRequired
from pydantic import Field
T = TypeVar("T")
"""
(7)下面代码的功能是遍历类MCPVisitor中提取的类型定义和函数定义,使用ast.unparse将它们转换为字符串,并添加到code字符串中。然后,将生成的代码写入server_generated.py文件,并执行该文件以动态生成MCP工具函数。
# 添加类型定义
for type in visitor.types.values():
code += ast.unparse(type)
code += "
"
# 添加函数定义
for function in visitor.functions.values():
code += ast.unparse(function)
code += "
"
# 写入生成的文件并执行
with open(GENERATED_PY, "w") as f:
f.write(code)
exec(compile(code, GENERATED_PY, "exec"))
(8)下面代码定义了两个列表MCP_FUNCTIONS和UNSAFE_FUNCTIONS,分别包含所有MCP函数和不安全函数的名称。SAFE_FUNCTIONS列表包含除了不安全函数之外的所有函数名称。
# 分类函数(安全/不安全)
MCP_FUNCTIONS = ["check_connection"] + list(visitor.functions.keys()) # 所有MCP函数
UNSAFE_FUNCTIONS = visitor.unsafe # 不安全函数
SAFE_FUNCTIONS = [f for f in visitor.functions.keys() if f not in UNSAFE_FUNCTIONS] # 安全函数








