实战:基于 MCP 的完整工具集成
本节展示如何将自定义工具封装为 MCP 服务器,并在 LangChain Agent 中使用。
MCP 的核心价值在于标准化和解耦:工具的实现(MCP Server)和工具的使用(MCP Client / Agent)完全分离。这意味着你可以用任何语言编写工具服务器,任何支持 MCP 协议的 Agent 框架都能直接使用这些工具,不需要为每个框架单独适配。
下面的代码实现了三个关键组件:
- MCPTool 适配器:将 MCP 工具包装为 LangChain 的
BaseTool,这是一个"桥梁"模式——让 LangChain Agent 能够无缝使用 MCP 工具,就像使用原生工具一样 - 动态工具加载:从 MCP Server 自动发现并加载所有可用的工具,无需手动注册
- Agent 集成:将加载到的 MCP 工具注入到 LangChain Agent 中
关于 Session 生命周期
代码中有一个重要的设计考量:MCP 工具持有对 ClientSession 的引用,session 关闭后工具就无法使用了。在实际项目中,你需要确保 session 的生命周期覆盖工具的使用期——通常的做法是将 session 管理和 Agent 执行放在同一个 async with 上下文中。
# mcp_langchain_integration.py
"""
将 MCP 工具集成到 LangChain Agent 的完整示例
"""
import asyncio
import json
from typing import Any
from langchain_core.tools import BaseTool
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_openai_tools_agent # legacy,新项目推荐 LangGraph
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
# ============================
# 1. MCP Tool 适配器
# ============================
class MCPTool(BaseTool):
"""将 MCP 工具包装为 LangChain Tool"""
name: str
description: str
session: Any # MCP ClientSession
def _run(self, **kwargs) -> str:
"""同步执行(通过 asyncio 桥接)"""
loop = asyncio.new_event_loop()
try:
result = loop.run_until_complete(
self.session.call_tool(self.name, kwargs)
)
if result.content:
return result.content[0].text
return "工具无返回内容"
finally:
loop.close()
async def _arun(self, **kwargs) -> str:
"""异步执行"""
result = await self.session.call_tool(self.name, kwargs)
if result.content:
return result.content[0].text
return "工具无返回内容"
# ============================
# 2. 从 MCP Server 动态加载工具
# ============================
async def load_mcp_tools(server_command: str, server_args: list) -> list[MCPTool]:
"""从 MCP Server 加载所有工具
注意:此函数返回的 tools 需要在 MCP session 存活期间使用。
在实际应用中,应该保持 session 的生命周期覆盖 tools 的使用期。
下面的 build_mcp_agent() 函数展示了正确的使用方式。
"""
server_params = StdioServerParameters(
command=server_command,
args=server_args
)
tools = []
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
tools_result = await session.list_tools()
for mcp_tool in tools_result.tools:
lc_tool = MCPTool(
name=mcp_tool.name,
description=mcp_tool.description or f"使用 {mcp_tool.name} 工具",
session=session
)
tools.append(lc_tool)
# ⚠️ 重要:tools 中持有 session 引用,
# 离开此上下文后 session 会关闭,tools 将无法使用。
# 生产环境中应该将 session 管理和 agent 执行
# 放在同一个 async with 上下文中。
return tools
# ============================
# 3. 构建使用 MCP 工具的 Agent
# ============================
async def build_mcp_agent():
"""构建集成 MCP 工具的 Agent"""
# 加载 MCP 工具
tools = await load_mcp_tools(
server_command="python",
server_args=["production_mcp_server.py"]
)
print(f"已加载 {len(tools)} 个 MCP 工具:{[t.name for t in tools]}")
# 构建 Agent
llm = ChatOpenAI(model="gpt-4o", temperature=0)
prompt = ChatPromptTemplate.from_messages([
("system", """你是一个功能强大的 Agent,通过 MCP 协议使用标准化工具。
你有以下工具可用,请合理选择和使用:
- 文件读写操作
- 数据库查询
- HTTP 请求
遇到需要这些能力的任务时,主动使用相应工具。"""),
MessagesPlaceholder(variable_name="chat_history"),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
])
agent = create_openai_tools_agent(llm, tools, prompt)
executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
return executor
# ============================
# 4. 使用示例
# ============================
async def main():
agent = await build_mcp_agent()
# 测试文件操作
result = agent.invoke({
"input": "读取 README.md 文件,总结其主要内容",
"chat_history": []
})
print(result["output"])
# 测试复合操作
result = agent.invoke({
"input": "查询数据库 ./data/sales.db 中的最新10条销售记录",
"chat_history": []
})
print(result["output"])
if __name__ == "__main__":
asyncio.run(main())
MCP 最佳实践总结
在将 MCP 工具部署到生产环境之前,安全性、错误处理和性能优化是三个必须认真考虑的维度。以下是一些关键的最佳实践:
安全性是首要考虑的问题,因为 MCP 工具通常涉及文件系统、数据库等敏感操作,而这些操作的参数是由 LLM 生成的——LLM 可能被 Prompt 注入攻击利用。
错误处理要确保任何工具调用失败都不会导致整个 Agent 崩溃,而是返回清晰的错误信息,让 LLM 能够理解并做出应对。
性能优化方面,对于只读的数据库查询等操作,使用缓存可以显著减少重复调用的开销。
# 1. 工具安全性
security_checklist = [
"✅ 文件操作:限制在工作目录内,防止路径遍历",
"✅ 数据库:只允许 SELECT,不允许 DDL/DML",
"✅ HTTP:白名单域名,设置超时",
"✅ 代码执行:使用沙箱(Docker/subprocess)",
"✅ 敏感数据:不返回完整的 API Key 等信息",
]
# 2. 错误处理
def safe_tool_call(func):
"""工具调用安全装饰器"""
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except PermissionError as e:
return {"error": "权限拒绝", "message": str(e)}
except FileNotFoundError as e:
return {"error": "文件不存在", "message": str(e)}
except Exception as e:
return {"error": "工具执行失败", "message": str(e)[:200]}
return wrapper
# 3. 性能优化
# 使用 @lru_cache 缓存不变的查询结果
from functools import lru_cache
@lru_cache(maxsize=100)
def cached_database_query(db_path: str, sql: str) -> str:
"""带缓存的数据库查询(仅缓存只读查询)"""
import sqlite3
# 安全检查
sql_upper = sql.strip().upper()
if not sql_upper.startswith("SELECT"):
raise PermissionError("只允许 SELECT 查询")
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute(sql)
result = cursor.fetchall()
conn.close()
return json.dumps(result)
本章小结
本章构建了一个完整的 MCP 工具集成体系:
- ✅ MCP Server:标准化的工具服务器
- ✅ MCP Client:连接和调用工具
- ✅ LangChain 集成:MCPTool 适配器
- ✅ 安全最佳实践:权限控制和错误处理