概述
单个 AI Agent 在处理简单任务时表现良好,但面对复杂业务场景时,往往需要多个专业化的 Agent 协同工作。多 Agent 协作(Multi-Agent Collaboration)是指将一个复杂任务分解为多个子任务,由不同的 Agent 各自负责擅长的领域,通过消息传递、任务委派和结果聚合来完成整体目标。
本教程将以一个软件开发团队模拟为例,构建一个包含产品经理、架构师、开发者、测试工程师和 DevOps 工程师的多 Agent 协作系统。
技术栈:Python 3.11+ / LangGraph / OpenAI API / Redis
前置条件
| 项目 | 要求 |
|---|---|
| Python | 3.11 或更高版本 |
| OpenAI API | 有效的 API 密钥(GPT-4o 推荐) |
| Redis | 用于 Agent 间消息传递(可选) |
| 基础知识 | Python 异步编程、LangChain 基础 |
环境搭建
bash
mkdir -p ~/multi-agent && cd ~/multi-agent
python3 -m venv venv && source venv/bin/activate
pip install langchain langchain-openai langgraph \
redis pydantic pyyaml rich typer多Agent架构设计
2.1 协作模式
| 模式 | 描述 | 适用场景 |
|---|---|---|
| 顺序管道 | Agent 按固定顺序依次处理 | 流水线式任务 |
| 层级委派 | 主 Agent 将任务分派给子 Agent | 任务分解 |
| 环形讨论 | Agent 轮流发言,达成共识 | 决策讨论 |
| 动态路由 | 根据任务类型动态选择 Agent | 智能调度 |
本教程采用层级委派 + 动态路由的混合模式。
2.2 系统架构
┌──────────────┐
│ 用户输入 │
└──────┬───────┘
│
┌──────▼───────┐
│ Orchestrator │ (调度器/主Agent)
│ (任务分解) │
└──────┬───────┘
│
┌────────────────┼────────────────┐
│ │ │
┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐
│ PM Agent │ │Architect │ │Dev Agent │
│ (产品经理) │ │Agent(架构师)│ │(开发者) │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐
│QA Agent │ │DevOps Agent │ │Review Agent │
│(测试工程师) │ │(运维工程师) │ │(代码审查) │
└─────────────┘ └─────────────┘ └─────────────┘
│
┌──────▼───────┐
│ 最终输出 │
└──────────────┘
核心代码实现
3.1 Agent 基类定义
python
# agents/base.py
"""Agent 基类定义"""
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage, HumanMessage
@dataclass
class AgentMessage:
sender: str
receiver: str
content: str
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
metadata: dict = field(default_factory=dict)
@dataclass
class TaskResult:
agent_name: str
task: str
result: str
status: str = "success"
artifacts: list = field(default_factory=list)
class BaseAgent(ABC):
def __init__(self, name: str, role: str, model: str = "gpt-4o"):
self.name = name
self.role = role
self.llm = ChatOpenAI(model=model, temperature=0.7)
self.memory: list[AgentMessage] = []
@abstractmethod
def get_system_prompt(self) -> str: pass
@abstractmethod
def process(self, task: str, context: dict = None) -> TaskResult: pass
def think(self, user_message: str, context: str = "") -> str:
messages = [SystemMessage(content=self.get_system_prompt())]
if context:
messages.append(SystemMessage(content=f"上下文信息:\n{context}"))
messages.append(HumanMessage(content=user_message))
response = self.llm.invoke(messages)
return response.content
def send_message(self, receiver: str, content: str) -> AgentMessage:
msg = AgentMessage(sender=self.name, receiver=receiver, content=content)
self.memory.append(msg)
return msg3.2 具体角色 Agent 实现
python
# agents/roles.py
"""各角色 Agent 实现"""
from agents.base import BaseAgent, TaskResult
class ProductManagerAgent(BaseAgent):
def __init__(self):
super().__init__(name="PM", role="产品经理")
def get_system_prompt(self) -> str:
return """你是一位经验丰富的产品经理。
职责:分析用户需求、分解技术任务、定义验收标准。
输出格式:JSON 格式,每个任务包含 title, description, priority, estimated_hours。"""
def process(self, task: str, context: dict = None) -> TaskResult:
prompt = f"请分析以下产品需求,并分解为技术任务:\n需求描述:{task}\n\n请输出:需求分析摘要、功能点列表、技术任务分解、风险评估。"
result = self.think(prompt)
return TaskResult(agent_name=self.name, task=task, result=result,
artifacts=["requirements.md", "task_breakdown.json"])
class ArchitectAgent(BaseAgent):
def __init__(self):
super().__init__(name="Architect", role="架构师")
def get_system_prompt(self) -> str:
return """你是一位资深系统架构师。
职责:设计系统架构、选择技术栈、设计数据库和API接口。
原则:简单优先,避免过度设计。"""
def process(self, task: str, context: dict = None) -> TaskResult:
req = context.get("requirements", "") if context else ""
prompt = f"请根据以下需求设计系统架构:\n需求:{task}\n需求分析:{req}\n\n请输出:架构概述、技术栈、数据库设计、API设计。"
result = self.think(prompt, context=req)
return TaskResult(agent_name=self.name, task=task, result=result,
artifacts=["architecture.md", "api_design.md"])
class DeveloperAgent(BaseAgent):
def __init__(self):
super().__init__(name="Developer", role="全栈开发者")
def get_system_prompt(self) -> str:
return "你是一位高级全栈开发者。规范:PEP 8, Black, 类型注解, docstring。"
def process(self, task: str, context: dict = None) -> TaskResult:
arch = context.get("architecture", "") if context else ""
prompt = f"请根据以下架构设计编写代码:\n任务:{task}\n架构:{arch}"
result = self.think(prompt, context=arch)
return TaskResult(agent_name=self.name, task=task, result=result,
artifacts=["source_code.py"])
class QAEngineerAgent(BaseAgent):
def __init__(self):
super().__init__(name="QA", role="测试工程师")
def get_system_prompt(self) -> str:
return "你是一位专业的测试工程师。使用 pytest,覆盖率要求 80% 以上。"
def process(self, task: str, context: dict = None) -> TaskResult:
code = context.get("source_code", "") if context else ""
prompt = f"请为以下代码编写测试:\n任务:{task}\n源代码:{code}"
result = self.think(prompt, context=code)
return TaskResult(agent_name=self.name, task=task, result=result,
artifacts=["test_code.py"])
class DevOpsAgent(BaseAgent):
def __init__(self):
super().__init__(name="DevOps", role="DevOps 工程师")
def get_system_prompt(self) -> str:
return "你是一位 DevOps 工程师。原则:基础设施即代码、CI/CD、容器化。"
def process(self, task: str, context: dict = None) -> TaskResult:
arch = context.get("architecture", "") if context else ""
prompt = f"请编写 DevOps 配置:\n项目:{task}\n架构:{arch}\n\n请输出:Dockerfile、docker-compose.yml、CI/CD 配置。"
result = self.think(prompt, context=arch)
return TaskResult(agent_name=self.name, task=task, result=result,
artifacts=["Dockerfile", "docker-compose.yml"])3.3 调度器(Orchestrator)
python
# orchestrator.py
"""多 Agent 调度器"""
import json, logging
from rich.console import Console
from rich.panel import Panel
from agents.roles import (
ProductManagerAgent, ArchitectAgent, DeveloperAgent,
QAEngineerAgent, DevOpsAgent,
)
console = Console()
class Orchestrator:
def __init__(self):
self.agents = {
"pm": ProductManagerAgent(),
"architect": ArchitectAgent(),
"developer": DeveloperAgent(),
"qa": QAEngineerAgent(),
"devops": DevOpsAgent(),
}
self.context = {}
self.results = {}
def run(self, requirement: str) -> dict:
console.print(Panel(f"[bold cyan]需求:[/bold cyan] {requirement}",
title="多 Agent 协作系统", border_style="cyan"))
stages = [
("需求分析(产品经理)", "pm", "requirements"),
("架构设计(架构师)", "architect", "architecture"),
("代码实现(开发者)", "developer", "development"),
("测试编写(测试工程师)", "qa", "testing"),
("DevOps 配置(运维工程师)", "devops", "devops"),
]
for i, (label, agent_key, result_key) in enumerate(stages, 1):
console.print(f"\n[bold yellow]阶段 {i}/5: {label}[/bold yellow]")
agent = self.agents[agent_key]
result = agent.process(requirement, context=self.context)
self.results[result_key] = result
# Pass output as context to next agent
self.context[result_key] = result.result
console.print("\n" + "=" * 60)
console.print("[bold cyan]协作完成!各阶段产出:[/bold cyan]")
for stage, result in self.results.items():
arts = ", ".join(result.artifacts)
console.print(f" {stage}: {result.status} | 产出: {arts}")
console.print("\n[green]所有阶段已完成。[/green]")
return self.results运行与测试
python
# run.py
import os, json
from orchestrator import Orchestrator
os.environ["OPENAI_API_KEY"] = "sk-your-key-here"
orchestrator = Orchestrator()
requirement = """
开发一个在线任务管理系统,功能需求:
1. 用户注册、登录、JWT 认证
2. 任务的增删改查(CRUD)
3. 任务分配给团队成员
4. 任务优先级和状态管理
5. RESTful API 设计
6. 使用 FastAPI + PostgreSQL + Redis
"""
results = orchestrator.run(requirement)
# 保存结果
os.makedirs("output", exist_ok=True)
save_data = {}
for key, val in results.items():
save_data[key] = {
"agent": val.agent_name, "result": val.result,
"status": val.status, "artifacts": val.artifacts,
}
with open("output/results.json", "w", encoding="utf-8") as f:
json.dump(save_data, f, ensure_ascii=False, indent=2)
print("结果已保存到 output/results.json")常见问题
Q1: Agent 间上下文传递不完整
确保每个 Agent 的输出格式规范,Orchestrator 在传递上下文时要进行格式化处理。建议使用结构化的 JSON 格式传递关键信息。
Q2: 如何处理 Agent 间的依赖关系
python
# 使用 LangGraph 构建更灵活的工作流
from langgraph.graph import StateGraph, END
def build_workflow():
workflow = StateGraph(AgentState)
workflow.add_node("pm", pm_node)
workflow.add_node("architect", architect_node)
workflow.add_node("developer", developer_node)
workflow.add_node("qa", qa_node)
workflow.add_node("devops", devops_node)
workflow.set_entry_point("pm")
workflow.add_edge("pm", "architect")
workflow.add_conditional_edges("architect", should_review, {
"revise": "pm", "proceed": "developer",
})
workflow.add_edge("developer", "qa")
workflow.add_conditional_edges("qa", tests_pass, {
"fix": "developer", "deploy": "devops",
})
workflow.add_edge("devops", END)
return workflow.compile()Q3: 如何降低 API 调用成本
- 使用 GPT-3.5-turbo 处理简单子任务,GPT-4o 处理复杂决策
- 缓存常见任务的输出结果
- 限制每个 Agent 的 max_tokens
- 使用 Prompt 模板减少重复描述
总结
本教程介绍了多 Agent 协作系统的设计与实现,包括:
- 架构设计:层级委派 + 动态路由的混合模式
- Agent 基类:统一的接口和消息传递机制
- 角色实现:PM、架构师、开发者、测试、DevOps 五个角色
- 调度器:Orchestrator 编排整个协作流程
- 扩展方案:LangGraph 构建更灵活的工作流
建议进一步学习:
如有任何问题,欢迎通过微信 toukenai 联系我们。