如何通过 MCP(Model Context Protocol) 创建一个结合网络搜索、AI 代理和图像生成的现代研究助手

🌟 引言:AI 驱动的研究未来已来
想象一下,您拥有一个个人研究助手,可以即时搜索网络、分析信息、生成全面的总结,甚至创建相关图像——所有这些都通过一个美观的 Web 界面完成。如果这个助手还能由尖端的 AI 代理驱动,与外部工具和 API 无缝协作,会怎样?
欢迎体验 MCP-Powered Study Assistant —— 一款结合现代 AI 技术的革命性应用:
• 🤖 CrewAI 代理,用于智能研究和写作
• 📡 Model Context Protocol (MCP),实现无缝工具集成
• 🌐 Streamlit,提供直观的 Web 界面
• 🔍 通过 Brave Search API 实现实时网络搜索
• 🎨 通过 Segmind API 进行 AI 图像生成
这不仅仅是另一个 ChatGPT 包装器——它是一个完整的研究生态系统,展示了 AI 驱动应用的未来。在本综合指南中,我们将详细介绍每个实现细节,从 MCP 服务器创建到美观的 UI 设计。

🛠️ 技术栈:创新的构建模块
核心框架
• 🤖 CrewAI:多代理 AI 框架,用于协调智能工作流程
• 📡 MCP (Model Context Protocol):AI 与工具集成的标准化协议
• 🌐 Streamlit:用于 AI 应用的现代 Web 框架
• 🐍 Python 3.8+:主要编程语言
外部 API 与服务
• 🦁 Brave Search API:实时网络搜索功能
• 🎭 Segmind API:最先进的 AI 图像生成
• 🧠 Groq -llama-3.3–70b-versatile:用于智能处理的大型语言模型
开发工具
• 📦 JSON:数据交换格式
• 🔧 Subprocess:模块化架构的进程管理
• 📁 File System:结果存储与检索
• 🎨 CSS:增强用户体验的自定义样式
当前系统状态:

🏗️ 架构概述:整体连接方式
我们的 Study Assistant 遵循复杂的多层架构:
🌐 Streamlit UI → 🔄 API Layer → 🤖 CrewAI Agents → 📡 MCP Protocol → 🛠️ MCP Servers → 🌍 External APIs
流程分解:
1.用户输入:通过 Streamlit 界面输入主题2.流程编排:API 层管理研究工作流程3.AI 代理激活:CrewAI 代理开始协作研究4.工具集成:MCP 协议将代理与专用服务器连接5.数据收集:搜索和图像服务器收集相关内容6.结果处理:生成并解析文件以供显示7.用户体验:美观的选项卡界面呈现所有结果
🔧 实现深入解析:构建每个组件
1. 🤖 设置 CrewAI 代理
系统的核心在于两个专用 AI 代理:
🔍 Research Agent:网络搜索专家
researcher =Agent(
role='Research Specialist',
goal='Conduct comprehensive research on {topic}',
backstory='Expert at finding and analyzing information',
tools=[search_tool],
verbose=True
)
✍️ Writer Agent:内容综合专家
writer =Agent(
role='Content Writer',
goal='Create comprehensive study materials',
backstory='Skilled at organizing complex information',
tools=[image_tool],
verbose=True
)
2. 📡 构建 MCP 服务器
Search Server (servers/search_server.py)
async def search_web(arguments: dict)-> list[TextContent]:
"""Brave Search API integration"""
query = arguments.get("query","")
headers ={"X-Subscription-Token": BRAVE_API_KEY}
params={"q": query,"count":10}
response = requests.get(BRAVE_SEARCH_URL, headers=headers,params=params)
results = response.json()
return[TextContent(type="text", text=json.dumps(results))]
Image Server (servers/image_server.py)
async def generate_image(arguments: dict)-> list[TextContent]:
"""Segmind API image generation"""
prompt = arguments.get("prompt","")
data ={
"prompt": prompt,
"style":"photographic",
"samples":1
}
response = requests.post(SEGMIND_URL, json=data, headers=headers)
# Save and return image path
3. 🌐 创建 Streamlit 界面
美观的 UI 与自定义样式
def apply_custom_css():
st.markdown("""
<style>
.main-header {
background: linear-gradient(90deg,#667eea 0%, #764ba2 100%);
padding:2rem;
border-radius:10px;
color: white;
text-align: center;
margin-bottom:2rem;
}
.result-card {
background: white;
padding:1.5rem;
border-radius:10px;
box-shadow:02px4px rgba(0,0,0,0.1);
margin:1rem0;
}
</style>
""", unsafe_allow_html=True)
多选项卡结果显示
def display_results():
tab1, tab2, tab3 = st.tabs([
"🔍 Search Results",
"📄 Summary",
"🎨 Generated Images"
])
with tab1:
display_search_results()
with tab2:
display_summary_with_download()
with tab3:
display_image_gallery()
4. 🔄 API 层实现
main_api.py 作为关键桥梁:
def run_research(topic: str)->Dict:
"""Execute research workflow"""
try:
# Run main.py as subprocess
result = subprocess.run(
[sys.executable,"main.py", topic],
capture_output=True,
text=True,
timeout=300# 5-minute timeout
)
return{
"search_results": extract_search_results(),
"summary": extract_summary_from_output(result.stdout),
"images": get_generated_images(),
"success":True
}
except subprocess.TimeoutExpired:
return{"success":False,"error":"Research timeout"}
🎯 核心功能:独特之处
•🔍 智能网络搜索•实时 Brave Search API 集成•结构化结果解析和过滤•基于相关性的内容排序•🤖 多代理协作•专用研究和写作代理•自动化工作流程编排•上下文感知的信息综合•🎨 AI 图像生成•与主题相关的视觉内容创建•多种图像风格选项•自动提示优化•🌐 美观的 Web 界面•响应式设计与自定义 CSS•选项卡式结果组织•所有内容可下载•📊 系统监控•实时 MCP 服务器状态•错误处理与恢复•性能指标跟踪
💡 高级实现技巧
1.错误处理策略
def robust_mcp_call(server_path: str, max_retries:int=3):
for attempt in range(max_retries):
try:
# MCP server communication
return call_mcp_server(server_path)
exceptExceptionas e:
if attempt == max_retries -1:
st.error(f"🚨 Server Caiunavailable: {e}")
time.sleep(2** attempt)# Exponential backoff
1.结果提取模式
def extract_summary_from_output(output: str)-> str:
patterns =[
r"FINAL RESULT:\s*(.+?)(?=\n\n|\Z)",
r"## Final Answer:\s*(.+?)(?=\n\n|\Z)",
r"Summary:\s*(.+?)(?=\n\n|\Z)"
]
for pattern in patterns:
match = re.search(pattern, output, re.DOTALL | re.IGNORECASE)
if match:
return clean_summary_text(match.group(1))
return"Summary extraction failed"
1.性能优化
•异步 MCP 服务器调用•并行文件处理•智能缓存策略•资源清理自动化
🚀 部署指南:让您的助手上线
1.环境设置
pip install -r requirements.txt
# 配置 API 密钥
export BRAVE_API_KEY="your-brave-key"
export SEGMIND_API_KEY="your-segmind-key"
export OPENAI_API_KEY="your-openai-key"
1.启动序列
# 启动 MCP 服务器
python servers/search_server.py &
python servers/image_server.py &
# 启动 Streamlit 应用
streamlit run streamlit_app.py
1.生产环境考虑
•使用 Docker 进行容器部署•高流量负载均衡•结果持久化的数据库集成•API 速率限制与监控
1.完整代码实现
📂 crewai_mcp/
├── 📄 main.py# 🤖 核心 CrewAI 应用与代理
├── 📄requirements.txt# 📦 Python 依赖
├── 📄debug_summary.py# 🔧 摘要提取调试工具
├── 📄app.py# 🌐 美观的 Web 界面
├── 📄setup_nodejs.py# ⚙️ Node.js 设置工具
├── 📄test_python_version.py# 🧪 Python 版本兼容性测试
├── 📄segmin.py# 🎨 Segmind API 工具
│ ├── 📂servers/# 📡 MCP 服务器实现
│ ├── 📄search_server.py# 🔍 Brave Search MCP 服务器 (Python)
│ ├── 📄image_server.py# 🎨 Segmind 图像 MCP 服务器 (Python)
│ │
│ ├── 📂search_results/# 📊 生成的搜索数据
│ │ └── … (研究主题)
│ │
│ └── 📂images/# 🖼️ 生成的 AI 图像
│ └── … (主题图像)
│ ├── 📂pycache/# 🐍 Python 缓存文件
└── 📂.venv/ # 🔒 虚拟环境
🏗️ 架构分解
•📱 前端层•streamlit_app.py — 具有美观 UI 的主要 Web 界面•streamlit_app_backup.py — 安全备份版本•🔄 API 与集成层•main_api.py — Streamlit 与 CrewAI 之间的桥梁•app.py — 替代界面实现•🤖 AI 核心层•main.py — CrewAI 代理 (Research + Writer)•debug_summary.py — 摘要提取工具•📡 MCP 服务器层•servers/search_server.py — 通过 Brave API 进行网络搜索•servers/image_server.py — 通过 Segmind API 进行图像生成•📊 数据存储层•servers/search_results/ — 包含搜索数据的 JSON 文件(40+ 主题)•servers/images/ — 生成的 AI 图像(30+ 视觉效果)•⚙️ 配置与工具•requirements.txt — 依赖管理•setup_nodejs.py — 环境设置•test_python_version.py — 兼容性测试
MCP 服务器 – Image_Server
from typing importAny
import httpx
from mcp.server.fastmcp importFastMCP
import os
import requests
import base64
import logging
from pathlib importPath
from dotenv import load_dotenv
load_dotenv()
# 设置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("image_server")
# 初始化 FastMCP 服务器
mcp =FastMCP("image_server")
# 获取当前目录
current_dir =Path(__file__).parent
output_dir = current_dir /"images"
os.makedirs(output_dir, exist_ok=True)
# 验证 API 密钥
api_key = os.getenv("SEGMIND_API_KEY")
ifnot api_key:
logger.error("SEGMIND_API_KEY environment variable is not set!")
raiseRuntimeError("Missing Segmind API key")
url ="https://api.segmind.com/v1/imagen-4"
def image_creation_openai(query: str Latinos, image_name: str)-> str:
try:
logger.info(f"Creating image for query: {query}")
# 请求负载
data ={
"prompt": f"Generate an image: {query}",
"negative_prompt":"blurry, pixelated",
"aspect_ratio":"4:3"
}
headers ={'x-api-key': os.getenv("SEGMIND_API_KEY")}
# 添加超时和错误处理
try:
response = requests.post(url, json=data, headers=headers, timeout=30)
response.raise_for_status()
except requests.exceptions.RequestExceptionas e:
logger.error(f"API request failed: {e}")
return{"success":False,"error": f"API request failed: {str(e)}"}
# 保存图像
image_path = output_dir / f"{image_name}.jpeg"
with open(image_path,"wb")as f:
f.write(response.content)
logger.info(f"Image saved to {image_path}")
return{"success":True,"image_path": str(image_path)}
exceptExceptionas e:
logger.exception("Image creation failed")
return{"success":False,"error": str(e)}
if __name__ =="__main__":
logger.info("Starting Image Creation MCP Server")
try:
mcp.run(transport="stdio")
exceptExceptionas e:
logger.exception("Server crashed")
# 在 Windows 中添加暂停以查看错误
input("Press Enter to exit...")
raise
MCP 服务器 – Search_Server
from typing importAny,Dict,List
import requests
from mcp.server.fastmcp importFastMCP
import os
import logging
import json
from pathlib importPath
from dotenv import load_dotenv
load_dotenv()
# 设置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("search_server")
# 初始化 FastMCP 服务器
mcp =FastMCP("search_server")
# 获取当前目录
current_dir =Path(__file__).parent
results_dir = current_dir /"search_results"
os.makedirs(results_dir, exist_ok=True)
# 验证 API 密钥
api_key = os.getenv("BRAVE_API_KEY")
ifnot api_key:
logger.warning("BRAVE_API_KEY environment variable is not set!")
logger.warning("Search functionality will be limited or unavailable")
# Brave Search API 端点
BRAVE_SEARCH_URL ="https://api.search.brave.com/res/v1/web/search"
def brave_search(query: str, count:int=10)->Dict[str,Any]:
"""
使用BraveSearch API 搜索网络
参数:
query:搜索查询字符串
count:返回的结果数量(最大20)
返回:
包含搜索结果的字典
"""
try:
logger.info(f"Searching for: {query}")
ifnot api_key:
return{
"success":False,
"error":"BRAVE_API_KEY not configured",
"results":[]
}
# 将计数限制在合理范围内
count = max(1, min(count,20))
# 请求头
headers ={
"Accept":"application/json",
"Accept-Encoding":"gzip",
"X-Subscription-Token": api_key
}
# 请求参数
params={
"q": query,
"count": count,
"search_lang":"en",
"country":"US",
"safesearch":"moderate",
"freshness":"pw",# 过去一周以获取更新的结果
"text_decorations":False,
"spellcheck":True
}
# 发起 API 请求
try:
response = requests.get(
BRAVE_SEARCH_URL,
headers=headers,
params=params,
timeout=30
)
response.raise_for_status()
except requests.exceptions.RequestExceptionas e:
logger.error(f"Search API request failed: {e}")
return{
"success":False,
"error": f"Search API request failed: {str(e)}",
"results":[]
}
# 解析响应
try:
data = response.json()
except json.JSONDecodeErroras e:
logger.error(f"Failed to parse search response: {e}")
return{
"success":False,
"error":"Failed to parse search response",
"results":[]
}
# 提取并格式化结果
search_results =[]
web_results = data.get("web",{}).get("results",[])
for result in web_results:
search_result ={
"title": result.get("title",""),
"url": result.get("url",""),
"description": result.get("description",""),
"published": result.get("published",""),
"thumbnail": result.get("thumbnail",{}).get("src","")if result.get("thumbnail")else""
}
search_results.append(search_result)
# 将结果保存到文件以供参考
try:
results_file = results_dir / f"search_{query.replace(' ', '_')[:50]}.json"
with open(results_file,'w', encoding='utf-8')as f:
json.dump({
"query": query,
"timestamp": data.get("query",{}).get("posted_at",""),
"results": search_results
}, f, indent=2, ensure_ascii=False)
logger.info(f"Search results saved to {results_file}")
exceptExceptionas e:
logger.warning(f"Failed to save search results: {e}")
logger.info(f"Found {len(search_results)} search results")
return{
"success":True,
"query": query,
"total_results": len(search_results),
"results": search_results
}
exceptExceptionas e:
logger.exception("Search operation failed")
return{
"success":False,
"error": str(e),
"results":[]
}
def search_news(query: str, count:int=5)->Dict[str,Any]:
"""
使用BraveSearch API 搜索新闻
参数:
query:搜索查询字符串
count:返回的新闻结果数量(最大20)
返回:
包含新闻搜索结果的字典
"""
try:
logger.info(f"Searching news for: {query}")
ifnot api_key:
return{
"success":False,
"error":"BRAVE_API_KEY not configured",
"results":[]
}
# 将计数限制在合理范围内
count = max(1, min(count,20))
# 请求头
headers ={
"Accept":"application/json",
"Accept-Encoding":"gzip",
"X-Subscription-Token": api_key
}
# 新闻搜索的请求参数
params={
"q": query,
"count": count,
"search_lang":"en",
"country":"US",
"safesearch":"moderate",
"freshness":"pd",# 过去一天以获取最新新闻
"text_decorations":False,
"result_filter":"news"# 专注于新闻结果
}
# 发起 API 请求
try:
response = requests.get(
BRAVE_SEARCH_URL,
headers=headers,
params=params,
timeout=30
)
response.raise_for_status()
except requests.exceptions.RequestExceptionas e:
logger.error(f"News search API request failed: {e}")
return{
"success":False,
"error": f"News search API request failed: {str(e)}",
"results":[]
}
# 解析响应
try:
data = response.json()
except json.JSONDecodeErroras e:
logger.error(f"Failed to parse news search response: {e}")
return{
"success":False,
"error":"Failed to parse news search response",
"results":[]
}
# 提取新闻结果
news_results =[]
# 检查响应中的新闻部分
news_data = data.get("news",{}).get("results",[])
ifnot news_data:
# 如果没有专用新闻部分,则回退到网页结果
news_data = data.get("web",{}).get("results",[])
for result in news_data:
news_result ={
"title": result.get("title",""),
"url": result.get("url",""),
"description": result.get("description",""),
"published": result.get("age", result.get("published","")),
"source": result.get("profile",{}).get("name","")if result.get("profile")else"",
"thumbnail": result.get("thumbnail",{}).get("src","")if result.get("thumbnail")else""
}
news_results.append(news_result)
logger.info(f"Found {len(news_results)} news results")
return{
"success":True,
"query": query,
"total_results": len(news_results),
"results": news_results
}
exceptExceptionas e:
logger.exception("News search operation failed")
return{
"success":False,
"error": str(e),
"results":[]
}
if __name__ =="__main__":
logger.info("Starting Brave Search MCP Server")
try:
mcp.run(transport="stdio")
exceptExceptionas e:
logger.exception("Search server crashed")
# 在 Windows 中添加暂停以查看错误
input("Press Enter to exit...")
raise
代理 – main.py
from crewai importAgent,Task,Crew, LLM
from crewai_tools importMCPServerAdapter
from mcp importStdioServerParameters
import sys
import platform
from pathlib importPath
import os
import warnings
from pydantic importPydanticDeprecatedSince20
from dotenv import load_dotenv
import traceback
import subprocess
from pydantic importBaseModel,Field
classSummary(BaseModel):
summary: str =Field(description="研究成果的详细摘要")
image_path: str =Field(description="代理创建的图像文件路径")
# 加载环境变量
load_dotenv()
def get_available_llm():
"""从环境变量中获取第一个可用的 LLM"""
llm_configs =[
{
"name":"Groq Llama",
"model":"groq/llama-3.3-70b-versatile",
"api_key_env":"GROQ_API_KEY",
"temperature":0.7
},
{
"name":"OpenAI GPT-4",
"model":"gpt-4o-mini",
"api_key_env":"OPENAI_API_KEY",
"temperature":0.7
},
{
"name":"Anthropic Claude",
"model":"claude-3-haiku-20240307",
"api_key_env":"ANTHROPIC_API_KEY",
"temperature":0.7
},
{
"name":"Ollama Local",
"model":"ollama/llama3.2",
"api_key_env":None,# 本地模型无需 API 密钥
"temperature":0.7
}
]
print("🔍 检查可用的 LLM 提供者...")
for config in llm_configs:
try:
if config["api_key_env"]isNone:
# 对于本地模型如 Ollama,尝试无需 API 密钥
print(f"⚡ 尝试 {config['name']} (本地)...")
llm = LLM(
model=config["model"],
temperature=config["temperature"],
max_tokens=1000,
)
print(f"✅ 使用 {config['name']}: {config['model']}")
return llm
else:
api_key = os.getenv(config["api_key_env"])
if api_key:
print(f"⚡ 尝试 {config['name']}...")
llm = LLM(
model=config["model"],
temperature=config["temperature"],
api_key=api_key
)
print(f"✅ 使用 {config['name']}: {config['model']}")
return llm
else:
print(f"⚠️ {config['name']} API 密钥未在环境中找到")
exceptExceptionas e:
print(f"❌ {config['name']} 失败: {str(e)[:100]}...")
continue
# 如果全部失败,则回退到基本配置
print("⚠️ 使用回退 LLM 配置...")
return LLM(
model="groq/llama-3.3-70b-versatile",
temperature=0.7,
api_key=os.getenv("GROQ_API_KEY","")
)
# 配置具有回退选项的 LLM
llm = get_available_llm()
# 抑制警告
warnings.filterWarnings("ignore", category=PydanticDeprecatedSince20)
# 获取当前目录
base_dir =Path(__file__).parent.resolve()
print(f"Python 可执行文件: {sys.executable}")
print(f"当前目录: {os.getcwd()}")
print(f"基础目录: {base_dir}")
# 确定适用于 Windows 的正确 npx 命令
npx_cmd ="npx.cmd"if platform.system()=="Windows"else"npx"
def check_npx_availability():
"""检查 npx 是否可用且正常工作"""
try:
result = subprocess.run([npx_cmd,"--version"],
capture_output=True, text=True, timeout=10)
if result.returncode ==0:
print(f"✓ NPX 可用: {result.stdout.strip()}")
returnTrue
else:
print(f"✗ NPX 检查失败: {result.stderr}")
returnFalse
exceptExceptionas e:
print(f"✗ NPX 不可用: {e}")
returnFalse
def check_python_server():
"""检查 Python 图像服务器是否存在"""
server_path = base_dir /"servers"/"image_server.py"
if server_path.exists():
print(f"✓ 找到 Python 图像服务器: {server_path}")
returnTrue
else:
print(f"✗ 未找到 Python 图像服务器: {server_path}")
returnFalse
def check_search_server():
"""检查 Python 搜索服务器是否存在"""
server_path = base_dir /"servers"/"search_server.py"
if server_path.exists():
print(f"✓ 找到 Python 搜索服务器: {server_path}")
returnTrue
else:
print(f"✗ 未找到 Python 搜索服务器: {server_path}")
returnFalse
def get_working_servers():
"""获取工作服务器配置列表"""
working_servers =[]
print("\n"+"="*50)
print("诊断 MCP 服务器")
print("="*50)
# 首先检查 Python 图像服务器(最有可能工作)
python_server_available = check_python_server()
if python_server_available:
image_server_params =StdioServerParameters(
command="python",
args=[
str(base_dir /"servers"/"image_server.py"),
],
env={"UV_PYTHON":"3.12",**os.environ},
)
working_servers.append(("Image Server", image_server_params))
print("✓ 图像服务器已配置")
else:
print("✗ 跳过图像服务器(未找到服务器文件)")
# 检查 Python 搜索服务器
search_server_available = check_search_server()
if search_server_available:
search_server_params =StdioServerParameters(
command="python",
args=[
str(base_dir /"servers"/"search_server.py"),
],
env={"UV_PYTHON":"3.12",**os.environ},
)
working_servers.append(("Python Search Server", search_server_params))
print("✓ Python 搜索服务器已配置")
else:
print("✗ 跳过 Python 搜索服务器(未找到服务器文件)")
# 仅为文件系统服务器检查 NPX 可用性
npx_available = check_npx_availability()
# 仅在 Node.js 版本足够新时添加 NPX 服务器
if npx_available:
node_version_check = check_node_version()
if node_version_check:
# 文件系统服务器配置
filesystem_server_params =StdioServerParameters(
command=npx_cmd,
args=[
"-y",
"@modelcontextprotocol/server-filesystem",
os.path.join(os.path.expanduser("~"),"Downloads")
],
)
working_servers.append(("Filesystem Server", filesystem_server_params))
print("✓ 文件系统服务器已配置")
else:
print("⚠️ 由于 Node.js 版本兼容性问题,跳过 NPX 文件系统服务器")
print("💡 要启用文件系统服务器,请将 Node.js 更新到 18+ 或 20+ 版本")
print(" 访问: https://nodejs.org/en/download/")
else:
print("✗ 跳过 NPX 文件系统服务器(NPX 不可用)")
print(f"\n找到 {len(working_servers)} 个服务器配置")
return working_servers
def check_node_version():
"""检查 Node.js 版本是否兼容"""
try:
result = subprocess.run(["node","--version"],
capture_output=True, text=True, timeout=10)
if result.returncode ==0:
version = result.stdout.strip()
print(f"Node.js 版本: {version}")
# 提取主版本号
major_version =int(version.lstrip('v').split('.')[0])
if major_version >=18:
print("✓ Node.js 版本兼容")
returnTrue
else:
print(f"⚠️ Node.js 版本 {version} 可能过旧(推荐 v18+)")
returnFalse
returnFalse
exceptExceptionas e:
print(f"✗ 无法检查 Node.js 版本: {e}")
returnFalse
classCustomMCPServerAdapter(MCPServerAdapter):
"""具有增加超时的自定义 MCP 服务器适配器"""
def __init__(self,*args,**kwargs):
super().__init__(*args,**kwargs)
self.timeout =90# 将超时增加到 90 秒
def test_servers_individually(server_configs):
"""单独测试每个服务器以识别问题服务器"""
working_servers =[]
print("\n"+"="*50)
print("单独测试服务器")
print("="*50)
for name, server_params in server_configs:
print(f"\n测试 {name}...")
try:
withCustomMCPServerAdapter([server_params])as tools:
print(f"✓ {name} 连接成功!")
print(f" 可用工具: {[tool.name for tool in tools]}")
working_servers.append(server_params)
exceptExceptionas e:
print(f"✗ {name} 失败: {str(e)[:100]}...")
continue
return working_servers
def create_agent_and_tasks(tools=None):
"""创建代理和任务(带或不带工具)"""
tools_list = tools or[]
# 根据可用工具调整角色和任务
if tools_list:
tool_names =[getattr(tool,'name','unknown')for tool in tools_list]
print(f"代理将有权访问: {tool_names}")
role ="AI Research Creator with Tools"
goal ="使用可用的 MCP 工具深入研究主题,创建全面的图表,并保存摘要"
backstory ="擅长使用 MCP 工具收集信息、创建视觉表示并保存研究成果的 AI 研究者和创作者。"
else:
role ="AI Research Creator"
goal ="使用内置知识深入研究和分析主题"
backstory ="擅长分析主题并使用可用知识提供详细见解的 AI 研究者。"
agent =Agent(
role=role,
goal=goal,
backstory=backstory,
tools=tools_list,
llm=llm,
verbose=True,
)
if tools_list:
research_task =Task(
description="使用可用的 MCP 工具深入研究主题 '{topic}'。如果有图像创建工具,创建一个深入的图表,展示主题的工作原理,包括关键组件、流程和关系。",
expected_output="全面的研究摘要,如果可能,包括成功创建的图表/图像,说明主题。",
agent=agent,
)
summary_task =Task(
description="创建研究成果的详细摘要。如果有文件系统工具,将其保存为 Downloads 文件夹中的文本文件。包括关键见解、重要细节和对创建的任何图表的引用。",
expected_output="研究成果的详细摘要,如果有文件系统访问权限,优选保存为文本文件。最终响应应采用 pydantic 模型 Summary 的格式",
agent=agent,
output_pydantic=Summary
)
else:
research_task =Task(
description="使用您的知识深入研究和分析主题 '{topic}'。提供有关其工作原理的详细见解,包括关键组件、流程和关系。",
expected_output="对主题的全面分析和解释,包含详细见解。",
agent=agent,
)
summary_task =Task(
description="创建分析的详细摘要,突出主题的最重要方面、关键见解和实际意义。",
expected_output="结构良好的摘要,包含主题的关键发现和见解。最终响应应采用 pydantic 模型 Summary 的格式",
agent=agent,
output_pydantic=Summary,
markdown=True,# 启用最终输出的 markdown 格式
output_file="report.md"
)
return agent,[research_task, summary_task]
def main():
"""运行 CrewAI 应用的主函数"""
# 获取可用服务器配置
server_configs = get_working_servers()
ifnot server_configs:
print("\n⚠️ 无可用 MCP 服务器。仅以回退模式运行。")
run_fallback_mode()
return
# 单独测试服务器以找到工作中的服务器
working_server_params = test_servers_individually(server_configs)
ifnot working_server_params:
print("\n⚠️ 无 MCP 服务器工作。以回退模式运行。")
run_fallback_mode()
return
try:
print(f"\n✓ 使用 {len(working_server_params)} 个工作中的 MCP 服务器")
print("初始化 MCP 服务器适配器...")
withCustomMCPServerAdapter(working_server_params)as tools:
print(f"成功连接到 MCP 服务器!")
print(f"可用工具: {[tool.name for tool in tools]}")
# 使用 MCP 工具创建代理和任务
agent, tasks = create_agent_and_tasks(tools)
# 创建具有错误处理的 crew
crew =Crew(
agents=[agent],
tasks=tasks,
verbose=True,
reasoning=True,
)
# 获取用户输入
topic = input("\n请输入要研究的主题: ").strip()
ifnot topic:
topic ="artificial intelligence"
print(f"未提供主题,使用默认值: {topic}")
# 使用重试机制执行 crew
max_retries =2
for attempt in range(max_retries +1):
try:
print(f"\n开始研究: {topic} (尝试 {attempt + 1})")
result = crew.kickoff(inputs={"topic": topic})
# print("\n" + "="*50)
# print("来自代理的最终结果")
# print("="*50)
response = result["summary"]
print(response)
print(f"摘要任务输出: {tasks[1].output}")
return response
exceptExceptionas e:
if attempt < max_retries:
print(f"⚠️ 尝试 {attempt + 1} 失败: {str(e)[:100]}...")
print(f"🔄 重试... ({attempt + 2}/{max_retries + 1})")
continue
else:
print(f"❌ 所有尝试均失败。错误: {e}")
raise e
exceptExceptionas e:
print(f"使用 MCP 工具运行时出错: {e}")
traceback.print_exc()
print("\n回退到无 MCP 工具的基本代理...")
run_fallback_mode()
def run_fallback_mode():
"""在无 MCP 工具的情况下运行应用"""
print("\n"+"="*50)
print("以回退模式运行")
print("="*50)
# 创建不带 MCP 工具但带 LLM 的回退代理
agent, tasks = create_agent_and_tasks()
crew =Crew(
agents=[agent],
tasks=tasks,
verbose=True,
reasoning=True,
)
# 获取回退模式的输入
topic = input("请输入要研究的主题(回退模式): ").strip()
ifnot topic:
topic ="artificial intelligence"
print(f"未提供主题,使用默认值: {topic}")
print(f"\n开始研究: {topic}(无 MCP 工具)")
result = crew.kickoff(inputs={"topic": topic})
print("\n"+"="*50)
print("最终结果(回退模式):")
print("="*50)
print(result["summary"])
return result["summary"]
if __name__ =="__main__":
print("🚀 启动 CrewAI MCP 演示")
print("\n📋 设置说明:")
print(" 要使用更多 MCP 服务器,请将 Node.js 更新到 v18+: https://nodejs.org")
print(" 在 .env 文件中添加 API 密钥以支持更多 LLM 提供者")
print(" 支持: GROQ_API_KEY, OPENAI_API_KEY, ANTHROPIC_API_KEY, BRAVE_API_KEY")
result = main()
#print(result)
使用 Streamlit 的用户界面 – app.py
import streamlit as st
import subprocess
import sys
import os
from pathlib importPath
import glob
from PIL importImage
import re
def find_venv_python():
"""从虚拟环境中找到正确的 Python 可执行文件"""
current_dir =Path(__file__).parent
possible_venv_paths =[
os.path.join(current_dir,".venv","Scripts","python.exe"),
os.path.join(current_dir,"venv","Scripts","python.exe"),
os.path.join(current_dir,".venv","bin","python"),
os.path.join(current_dir,"venv","bin","python"),
]
for path in possible_venv_paths:
if os.path.exists(path):
return path
return sys.executable
def run_research(topic):
"""使用给定主题运行 main.py 并返回结果"""
current_dir =Path(__file__).parent
python_executable = find_venv_python()
# 准备带 UTF-8 编码的环境
env = os.environ.copy()
env['PYTHONIOENCODING']='utf-8'
env['PYTHONLEGACYWINDOWSSTDIO']='1'
try:
# 作为子进程运行 main.py
process = subprocess.Popen(
[python_executable,"main.py"],
cwd=current_dir,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
encoding='utf-8',
errors='replace',
env=env
)
# 将主题作为输入发送
stdout, stderr = process.communicate(input=topic +"\n", timeout=300)
if process.returncode ==0:
# 从 stdout 中提取最终结果
return extract_final_result(stdout),None
else:
returnNone, f"错误(返回代码 {process.returncode}):\n{stderr}"
except subprocess.TimeoutExpired:
process.kill()
returnNone,"研究在 5 分钟后超时"
exceptExceptionas e:
returnNone, f"意外错误: {str(e)}"
def extract_final_result(output):
"""从 main.py CrewAI 输出中提取最终结果"""
lines = output.split('\n')
# 首先,尝试找到最终结果部分
final_result_start =-1
for i, line in enumerate(lines):
if"FINAL RESULT:"in line or"==================================================\nFINAL RESULT:"in output:
final_result_start = i
break
if final_result_start !=-1:
# 提取 “FINAL RESULT:” 之后直到结束的内容
result_lines =[]
for line in lines[final_result_start:]:
# 跳过 “FINAL RESULT:” 行本身
if"FINAL RESULT:"in line:
# 如果同一行存在标记后的内容,则获取
content_after = line.split("FINAL RESULT:",1)
if len(content_after)>1and content_after[1].strip():
result_lines.append(content_after[1].strip())
continue
# 跳过 CrewAI 格式化和空行
cleaned_line = re.sub(r'[╭│╰═─└├┤┬┴┼╔╗╚╝║╠╣╦╩╬▓▒░]','', line)
cleaned_line = cleaned_line.strip()
if cleaned_line:
result_lines.append(cleaned_line)
if result_lines:
return'\n'.join(result_lines).strip()
# 第二次尝试:寻找 ## Final Answer 模式
final_answer_lines =[]
capturing =False
for line in lines:
if"## Final Answer"in line or"Final Answer:"in line:
capturing =True
# 如果标记后有内容,则包含
if"Final Answer:"in line:
content = line.split("Final Answer:",1)
if len(content)>1and content[1].strip():
final_answer_lines.append(content[1].strip())
continue
if capturing:
# 跳过 CrewAI 框图字符和进度指示器
cleaned = re.sub(r'[╭│╰═─└├┤┬┴┼╔╗╚╝║╠╣╦╩╬▓▒░🚀📋🔧✅]','', line)
cleaned = cleaned.strip()
# 在某些模式下停止,表示答案结束
if any(pattern in line.lower()for pattern in[
'crew execution completed','task completion','crew completion',
'└──','assigned to:','status:','used'
]):
break
# 仅包含实质性内容
if cleaned and len(cleaned)>10:
final_answer_lines.append(cleaned)
if final_answer_lines:
return'\n'.join(final_answer_lines).strip()
# 第三次尝试:在 crew 完成消息前获取最后一段实质性内容
substantial_blocks =[]
current_block =[]
for line in lines:
# 跳过明显的 CrewAI UI 元素
if any(skip in line for skip in['╭','│','╰','🚀','📋','└──','Assigned to:','Status:']):
if current_block:
substantial_blocks.append('\n'.join(current_block))
current_block =[]
continue
cleaned = line.strip()
if cleaned and len(cleaned)>30:# 仅实质性行
current_block.append(cleaned)
elif current_block:# 空行结束一个块
substantial_blocks.append('\n'.join(current_block))
current_block =[]
# 添加最后一个块
if current_block:
substantial_blocks.append('\n'.join(current_block))
# 返回最后一个实质性块(很可能是最终答案)
if substantial_blocks:
return substantial_blocks[-1].strip()
return"研究成功完成。请检查控制台输出以获取详细结果。"
def get_latest_images():
"""从 images 文件夹获取最新图像"""
images_dir =Path("servers/images")
ifnot images_dir.exists():
return[]
# 获取所有图像文件
image_extensions =['*.jpg','*.jpeg','*.png','*.gif','*.bmp']
image_files =[]
for ext in image_extensions:
image_files.extend(glob.glob(str(images_dir / ext)))
ifnot image_files:
return[]
# 按修改时间排序(最新优先)
image_files.sort(key=os.path.getmtime, reverse=True)
# 返回前 5 个最新图像
return image_files[:1]
def main():
st.set_page_config(
page_title="CrewAI-MCP 研究助手",
page_icon="🔬",
layout="wide"
)
st.title("🔬 CrewAI-MCP 学习助手")
st.markdown("输入一个主题以进行研究并生成带视觉图表的全面见解。")
# 主题输入
topic = st.text_input(
"研究主题:",
placeholder="例如,解释光合作用过程、机器学习算法等。",
help="输入您想要详细研究的任何主题"
)
# 研究按钮
if st.button("🚀 开始研究", type="primary", disabled=not topic.strip()):
if topic.strip():
with st.spinner(f"🔍 正在研究 '{topic}'... 这可能需要几分钟。"):
result, error = run_research(topic.strip())
print(f"来自 CREWAI 的结果: {result}")
if result:
st.success("✅ 研究成功完成!")
print(f"来自 CREWAI 的结果: {result}")
# 将结果存储在会话状态中
st.session_state['research_result']= result
st.session_state['research_topic']= topic.strip()
st.session_state['latest_images']= get_latest_images()
else:
st.error(f"❌ 研究失败: {error}")
# 并排显示结果和图像
if'research_result'in st.session_state:
# 创建分隔线
st.divider()
st.subheader(f"研究结果: {st.session_state.get('research_topic', '未知主题')}")
# 创建两列以并排显示
col1, col2 = st.columns([2,1])# 结果占 2/3 宽度,图像占 1/3 宽度
# 左侧列 - 研究结果
with col1:
st.markdown("### 📋 摘要结果")
# 以 markdown 格式显示结果
result_text = st.session_state['research_result']
pattern = re.compile(r'\x1b\[[\d;]*m')
result_text = pattern.sub('', result_text)
# 为长内容创建可滚动容器
with st.container():
st.markdown(result_text)
# 为结果添加下载按钮
st.download_button(
label="📥 下载结果为文本",
data=result_text,
file_name=f"research_{st.session_state.get('research_topic', 'topic').replace(' ', '_')}.txt",
mime="text/plain"
)
# 右侧列 - 生成的图像
with col2:
st.markdown("### 🎨 生成的图像")
images = st.session_state.get('latest_images',[])
if images:
st.success(f"找到 {len(images)} 张图像")
# 垂直堆叠显示图像
for idx, image_path in enumerate(images):
try:
# 打开并显示图像
img =Image.open(image_path)
st.image(
img,
caption=f"生成: {Path(image_path).name}",
use_container_width=True
)
# 为每张图像添加下载按钮
with open(image_path,"rb")as file:
st.download_button(
label=f"⬇️ 下载",
data=file.read(),
file_name=Path(image_path).name,
mime="image/jpeg",
key=f"download_img_{idx}"
)
# 如果有多张图像,添加图像间距
if idx < len(images)-1:
st.markdown("---")
exceptExceptionas e:
st.error(f"加载图像出错: {str(e)}")
else:
st.info("🖼️ 研究完成后,图像将显示在此处。")
with st.expander("ℹ️ 关于图像"):
st.markdown("""
**工作原理:**
-研究期间自动生成图像
-保存到`servers/images/`文件夹
-按创建时间排序显示在此处
-每张图像都有下载按钮
""")
if __name__ =="__main__":
main()
🎯 结果与性能
您将获得:
• ⚡ 快速研究:30–60 秒完成全面研究
• 🎨 视觉增强:为每个主题生成 AI 图像
• 📊 结构化输出:组织良好、可下载的结果
• 🔍 深入见解:多源信息综合
• 🌐 用户友好:直观的 Web 界面
性能指标:
• 搜索速度:网络结果约 5–10 秒
• 图像生成:每张图像约 15–30 秒
• 摘要创建:全面分析约 20–40 秒
• 整体工作流程:总计约 60–120 秒
🔮 未来展望:下一阶段的演进
🚀 即时增强
• 📚 PDF 分析:上传和分析文档
• 🎥 视频内容:YouTube 视频摘要
• 🗣️ 语音界面:语音转文本研究查询
• 📱 移动应用:原生 iOS/Android 应用
🌟 高级功能
• 🧠 知识图谱:视觉关系映射
• 📊 数据可视化:交互式图表和图形
• 🔗 引用管理:学术参考生成
• 👥 协作研究:多用户工作空间
🏗️ 技术改进
• ⚡ RAG 集成:用于更好上下文的向量数据库
• 🔄 实时更新:实时研究监控
• 🎯 个性化:用户特定偏好
• 🔐 企业安全:高级认证
🌍 生态系统扩展
• 🔌 插件架构:第三方集成
• 📈 分析仪表板:使用洞察和指标
• 🎓 教育工具:闪卡、测验、思维导图
• 🏢 企业版本:团队协作功能
📚 参考与资源
🔗 核心文档
• CrewAI Framework: https://docs.crewai.com
• Model Context Protocol: https://modelcontextprotocol.io
• Streamlit Documentation: https://docs.streamlit.io
• Brave Search API: https://api.search.brave.com/app/documentation
🛠️ 技术资源
• MCP Python SDK: https://github.com/modelcontextprotocol/python-sdk
• Segmind API Docs: https://docs.segmind.com
• Groq Models Reference: https://console.groq.com/docs/models
• CrewAI MCP details: https://docs.crewai.com/mcp/overview
🎉 结论:您的 AI 研究革命现在开始
我们刚刚构建了一个非凡的成果——一个完整的 AI 驱动研究生态系统,展示了智能应用的未来。这不仅仅是代码,而是关于我们如何与信息交互的转型。
🏆 我们取得的成就:
• ✅ 无缝集成:MCP 协议将 AI 代理与现实世界工具连接
• ✅ 美观界面:现代、响应式的 Web 应用
• ✅ 实际价值:具有可下载结果的真实研究能力
• ✅ 可扩展架构:企业级应用的基础 • ✅ 面向未来:采用尖端技术构建
🚀 更广阔的图景:
该项目展示了多种 AI 技术的融合: • 多代理系统和谐工作 • 协议驱动的工具集成 • 以用户为中心的设计 • 现实世界的 API 利用
💡 您的下一步:
1.🛠️ 构建它:遵循我们的实现指南2.🎨 定制它:添加您自己的功能和样式3.📈 扩展它:为您的团队或组织部署4.🤝 分享它:为开源社区贡献
🌟 最终思考:
Model Context Protocol 代表了 AI 应用开发的范式转变。通过标准化 AI 代理与外部工具的交互方式,MCP 打开了无限可能的大门。
您的 Study Assistant 只是开始。有了这个基础,您可以构建:
• 🏢 企业研究平台
• 🎓 教育 AI 导师
• 📊 商业智能仪表板
• 🔬 科学研究工具
AI 的未来是协作的、上下文相关的、以用户为中心的——您现在已经具备构建它的能力。
准备好革命化研究了吗?立即开始构建您的 MCP 驱动的 Study Assistant,加入 AI 创新的下一波!🚀
快乐构建! 🎯✨
(文:PyTorch研习社)