>
← 返回投肯智能知识库首页

多Agent协作实战

作者:重庆投肯小刚更新日期:2026年5月

目录

    概述

    单个 AI Agent 在处理简单任务时表现良好,但面对复杂业务场景时,往往需要多个专业化的 Agent 协同工作。多 Agent 协作(Multi-Agent Collaboration)是指将一个复杂任务分解为多个子任务,由不同的 Agent 各自负责擅长的领域,通过消息传递、任务委派和结果聚合来完成整体目标。

    本教程将以一个软件开发团队模拟为例,构建一个包含产品经理、架构师、开发者、测试工程师和 DevOps 工程师的多 Agent 协作系统。

    技术栈:Python 3.11+ / LangGraph / OpenAI API / Redis

    前置条件

    项目要求
    Python3.11 或更高版本
    OpenAI API有效的 API 密钥(GPT-4o 推荐)
    Redis用于 Agent 间消息传递(可选)
    基础知识Python 异步编程、LangChain 基础

    环境搭建

    bash
    mkdir -p ~/multi-agent && cd ~/multi-agent
    python3 -m venv venv && source venv/bin/activate
    pip install langchain langchain-openai langgraph \
        redis pydantic pyyaml rich typer

    多Agent架构设计

    2.1 协作模式

    模式描述适用场景
    顺序管道Agent 按固定顺序依次处理流水线式任务
    层级委派主 Agent 将任务分派给子 Agent任务分解
    环形讨论Agent 轮流发言,达成共识决策讨论
    动态路由根据任务类型动态选择 Agent智能调度

    本教程采用层级委派 + 动态路由的混合模式。

    2.2 系统架构

                        ┌──────────────┐
                        │   用户输入    │
                        └──────┬───────┘
                               │
                        ┌──────▼───────┐
                        │  Orchestrator │  (调度器/主Agent)
                        │  (任务分解)   │
                        └──────┬───────┘
                               │
              ┌────────────────┼────────────────┐
              │                │                │
       ┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐
       │  PM Agent   │ │Architect    │ │Dev Agent    │
       │ (产品经理)  │ │Agent(架构师)│ │(开发者)     │
       └──────┬──────┘ └──────┬──────┘ └──────┬──────┘
              │                │                │
       ┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐
       │QA Agent     │ │DevOps Agent │ │Review Agent │
       │(测试工程师) │ │(运维工程师) │ │(代码审查)   │
       └─────────────┘ └─────────────┘ └─────────────┘
                               │
                        ┌──────▼───────┐
                        │   最终输出    │
                        └──────────────┘

    核心代码实现

    3.1 Agent 基类定义

    python
    # agents/base.py
    """Agent 基类定义"""
    
    from abc import ABC, abstractmethod
    from dataclasses import dataclass, field
    from datetime import datetime
    from langchain_openai import ChatOpenAI
    from langchain_core.messages import SystemMessage, HumanMessage
    
    
    @dataclass
    class AgentMessage:
        sender: str
        receiver: str
        content: str
        timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
        metadata: dict = field(default_factory=dict)
    
    
    @dataclass
    class TaskResult:
        agent_name: str
        task: str
        result: str
        status: str = "success"
        artifacts: list = field(default_factory=list)
    
    
    class BaseAgent(ABC):
        def __init__(self, name: str, role: str, model: str = "gpt-4o"):
            self.name = name
            self.role = role
            self.llm = ChatOpenAI(model=model, temperature=0.7)
            self.memory: list[AgentMessage] = []
    
        @abstractmethod
        def get_system_prompt(self) -> str: pass
    
        @abstractmethod
        def process(self, task: str, context: dict = None) -> TaskResult: pass
    
        def think(self, user_message: str, context: str = "") -> str:
            messages = [SystemMessage(content=self.get_system_prompt())]
            if context:
                messages.append(SystemMessage(content=f"上下文信息:\n{context}"))
            messages.append(HumanMessage(content=user_message))
            response = self.llm.invoke(messages)
            return response.content
    
        def send_message(self, receiver: str, content: str) -> AgentMessage:
            msg = AgentMessage(sender=self.name, receiver=receiver, content=content)
            self.memory.append(msg)
            return msg

    3.2 具体角色 Agent 实现

    python
    # agents/roles.py
    """各角色 Agent 实现"""
    from agents.base import BaseAgent, TaskResult
    
    
    class ProductManagerAgent(BaseAgent):
        def __init__(self):
            super().__init__(name="PM", role="产品经理")
    
        def get_system_prompt(self) -> str:
            return """你是一位经验丰富的产品经理。
    职责:分析用户需求、分解技术任务、定义验收标准。
    输出格式:JSON 格式,每个任务包含 title, description, priority, estimated_hours。"""
    
        def process(self, task: str, context: dict = None) -> TaskResult:
            prompt = f"请分析以下产品需求,并分解为技术任务:\n需求描述:{task}\n\n请输出:需求分析摘要、功能点列表、技术任务分解、风险评估。"
            result = self.think(prompt)
            return TaskResult(agent_name=self.name, task=task, result=result,
                artifacts=["requirements.md", "task_breakdown.json"])
    
    
    class ArchitectAgent(BaseAgent):
        def __init__(self):
            super().__init__(name="Architect", role="架构师")
    
        def get_system_prompt(self) -> str:
            return """你是一位资深系统架构师。
    职责:设计系统架构、选择技术栈、设计数据库和API接口。
    原则:简单优先,避免过度设计。"""
    
        def process(self, task: str, context: dict = None) -> TaskResult:
            req = context.get("requirements", "") if context else ""
            prompt = f"请根据以下需求设计系统架构:\n需求:{task}\n需求分析:{req}\n\n请输出:架构概述、技术栈、数据库设计、API设计。"
            result = self.think(prompt, context=req)
            return TaskResult(agent_name=self.name, task=task, result=result,
                artifacts=["architecture.md", "api_design.md"])
    
    
    class DeveloperAgent(BaseAgent):
        def __init__(self):
            super().__init__(name="Developer", role="全栈开发者")
    
        def get_system_prompt(self) -> str:
            return "你是一位高级全栈开发者。规范:PEP 8, Black, 类型注解, docstring。"
    
        def process(self, task: str, context: dict = None) -> TaskResult:
            arch = context.get("architecture", "") if context else ""
            prompt = f"请根据以下架构设计编写代码:\n任务:{task}\n架构:{arch}"
            result = self.think(prompt, context=arch)
            return TaskResult(agent_name=self.name, task=task, result=result,
                artifacts=["source_code.py"])
    
    
    class QAEngineerAgent(BaseAgent):
        def __init__(self):
            super().__init__(name="QA", role="测试工程师")
    
        def get_system_prompt(self) -> str:
            return "你是一位专业的测试工程师。使用 pytest,覆盖率要求 80% 以上。"
    
        def process(self, task: str, context: dict = None) -> TaskResult:
            code = context.get("source_code", "") if context else ""
            prompt = f"请为以下代码编写测试:\n任务:{task}\n源代码:{code}"
            result = self.think(prompt, context=code)
            return TaskResult(agent_name=self.name, task=task, result=result,
                artifacts=["test_code.py"])
    
    
    class DevOpsAgent(BaseAgent):
        def __init__(self):
            super().__init__(name="DevOps", role="DevOps 工程师")
    
        def get_system_prompt(self) -> str:
            return "你是一位 DevOps 工程师。原则:基础设施即代码、CI/CD、容器化。"
    
        def process(self, task: str, context: dict = None) -> TaskResult:
            arch = context.get("architecture", "") if context else ""
            prompt = f"请编写 DevOps 配置:\n项目:{task}\n架构:{arch}\n\n请输出:Dockerfile、docker-compose.yml、CI/CD 配置。"
            result = self.think(prompt, context=arch)
            return TaskResult(agent_name=self.name, task=task, result=result,
                artifacts=["Dockerfile", "docker-compose.yml"])

    3.3 调度器(Orchestrator)

    python
    # orchestrator.py
    """多 Agent 调度器"""
    import json, logging
    from rich.console import Console
    from rich.panel import Panel
    from agents.roles import (
        ProductManagerAgent, ArchitectAgent, DeveloperAgent,
        QAEngineerAgent, DevOpsAgent,
    )
    
    console = Console()
    
    class Orchestrator:
        def __init__(self):
            self.agents = {
                "pm": ProductManagerAgent(),
                "architect": ArchitectAgent(),
                "developer": DeveloperAgent(),
                "qa": QAEngineerAgent(),
                "devops": DevOpsAgent(),
            }
            self.context = {}
            self.results = {}
    
        def run(self, requirement: str) -> dict:
            console.print(Panel(f"[bold cyan]需求:[/bold cyan] {requirement}",
                title="多 Agent 协作系统", border_style="cyan"))
    
            stages = [
                ("需求分析(产品经理)", "pm", "requirements"),
                ("架构设计(架构师)", "architect", "architecture"),
                ("代码实现(开发者)", "developer", "development"),
                ("测试编写(测试工程师)", "qa", "testing"),
                ("DevOps 配置(运维工程师)", "devops", "devops"),
            ]
    
            for i, (label, agent_key, result_key) in enumerate(stages, 1):
                console.print(f"\n[bold yellow]阶段 {i}/5: {label}[/bold yellow]")
                agent = self.agents[agent_key]
                result = agent.process(requirement, context=self.context)
                self.results[result_key] = result
                # Pass output as context to next agent
                self.context[result_key] = result.result
    
            console.print("\n" + "=" * 60)
            console.print("[bold cyan]协作完成!各阶段产出:[/bold cyan]")
            for stage, result in self.results.items():
                arts = ", ".join(result.artifacts)
                console.print(f"  {stage}: {result.status} | 产出: {arts}")
            console.print("\n[green]所有阶段已完成。[/green]")
            return self.results

    运行与测试

    python
    # run.py
    import os, json
    from orchestrator import Orchestrator
    
    os.environ["OPENAI_API_KEY"] = "sk-your-key-here"
    orchestrator = Orchestrator()
    
    requirement = """
    开发一个在线任务管理系统,功能需求:
    1. 用户注册、登录、JWT 认证
    2. 任务的增删改查(CRUD)
    3. 任务分配给团队成员
    4. 任务优先级和状态管理
    5. RESTful API 设计
    6. 使用 FastAPI + PostgreSQL + Redis
    """
    
    results = orchestrator.run(requirement)
    
    # 保存结果
    os.makedirs("output", exist_ok=True)
    save_data = {}
    for key, val in results.items():
        save_data[key] = {
            "agent": val.agent_name, "result": val.result,
            "status": val.status, "artifacts": val.artifacts,
        }
    with open("output/results.json", "w", encoding="utf-8") as f:
        json.dump(save_data, f, ensure_ascii=False, indent=2)
    print("结果已保存到 output/results.json")

    常见问题

    Q1: Agent 间上下文传递不完整

    确保每个 Agent 的输出格式规范,Orchestrator 在传递上下文时要进行格式化处理。建议使用结构化的 JSON 格式传递关键信息。

    Q2: 如何处理 Agent 间的依赖关系

    python
    # 使用 LangGraph 构建更灵活的工作流
    from langgraph.graph import StateGraph, END
    
    def build_workflow():
        workflow = StateGraph(AgentState)
        workflow.add_node("pm", pm_node)
        workflow.add_node("architect", architect_node)
        workflow.add_node("developer", developer_node)
        workflow.add_node("qa", qa_node)
        workflow.add_node("devops", devops_node)
    
        workflow.set_entry_point("pm")
        workflow.add_edge("pm", "architect")
        workflow.add_conditional_edges("architect", should_review, {
            "revise": "pm", "proceed": "developer",
        })
        workflow.add_edge("developer", "qa")
        workflow.add_conditional_edges("qa", tests_pass, {
            "fix": "developer", "deploy": "devops",
        })
        workflow.add_edge("devops", END)
        return workflow.compile()

    Q3: 如何降低 API 调用成本

    总结

    本教程介绍了多 Agent 协作系统的设计与实现,包括:

    1. 架构设计:层级委派 + 动态路由的混合模式
    2. Agent 基类:统一的接口和消息传递机制
    3. 角色实现:PM、架构师、开发者、测试、DevOps 五个角色
    4. 调度器:Orchestrator 编排整个协作流程
    5. 扩展方案:LangGraph 构建更灵活的工作流

    建议进一步学习:

    如有任何问题,欢迎通过微信 toukenai 联系我们。

    相关推荐

    Agent间通信协议设计

    # 定义Agent通信的标准化消息格式
    class AgentMessage:
        def __init__(self, sender, receiver, content, msg_type="request"):
            self.sender = sender        # 发送者Agent ID
            self.receiver = receiver    # 接收者Agent ID
            self.content = content      # 消息内容
            self.msg_type = msg_type   # request/response/notification
            self.timestamp = datetime.now()
            self.message_id = str(uuid.uuid4())
        
        def to_json(self):
            return json.dumps({
                "sender": self.sender,
                "receiver": self.receiver,
                "content": self.content,
                "msg_type": self.msg_type,
                "timestamp": self.timestamp.isoformat(),
                "message_id": self.message_id
            })
    
    # Agent通信中枢
    class MessageBroker:
        def __init__(self):
            self.agents = {}
            self.message_queue = Queue()
        
        def register(self, agent_id, agent_instance):
            self.agents[agent_id] = agent_instance
        
        def send(self, message: AgentMessage):
            self.message_queue.put(message)
        
        def dispatch(self):
            while not self.message_queue.empty():
                msg = self.message_queue.get()
                target_agent = self.agents.get(msg.receiver)
                if target_agent:
                    response = target_agent.receive(msg)
                    # 将响应发送回原发送者
                    reply = AgentMessage(
                        sender=msg.receiver,
                        receiver=msg.sender,
                        content=response,
                        msg_type="response"
                    )
                    self.message_queue.put(reply)
    
    broker = MessageBroker()
    broker.register("researcher", researcher_agent)
    broker.register("writer", writer_agent)
    
    # 研究员Agent发送消息给写手Agent
    msg = AgentMessage(
        sender="researcher",
        receiver="writer",
        content={"topic": "AI发展", "data": research_results}
    )
    broker.send(msg)
    broker.dispatch()

    多Agent系统的任务分解策略

    # 任务分解示例:将"写一篇AI行业报告"分解为多个子任务
    TASK_DECOMPOSITION_PROMPT = """
    请将以下复杂任务分解为多个可并行执行的子任务:
    
    任务:写一篇《2024年AI大模型发展趋势报告》,需要:
    1. 搜集最新的AI大模型技术进展
    2. 分析各大科技公司的AI布局
    3. 总结AI大模型的商业化现状
    4. 预测2024年发展趋势
    5. 生成一份完整的报告文档
    
    要求:
    - 每个子任务必须可独立执行
    - 子任务之间尽量减少依赖
    - 标注每个子任务的优先级和预期执行时间
    """
    
    # 使用LLM进行任务分解
    decomposition = llm.invoke(TASK_DECOMPOSITION_PROMPT)
    sub_tasks = parse_tasks(decomposition)
    
    print("分解后的子任务:")
    for i, task in enumerate(sub_tasks):
        print(f"{i+1}. {task['name']} (优先级:{task['priority']}, 预计:{task['duration']})")

    Agent协作的冲突处理机制

    # 冲突类型和处理策略
    class ConflictResolver:
        def __init__(self):
            self.strategies = {
                "resource_conflict": self.resolve_resource,
                "goal_conflict": self.resolve_goal,
                "information_conflict": self.resolve_information
            }
        
        def resolve_resource(self, agents, resource):
            # 资源冲突:按优先级分配
            sorted_agents = sorted(agents, key=lambda a: a.priority, reverse=True)
            return sorted_agents[0]
        
        def resolve_goal(self, goal1, goal2):
            # 目标冲突:合并或选择更重要的
            if goal1.category == goal2.category:
                return goal1 if goal1.weight > goal2.weight else goal2
            return goal1  # 默认选择第一个
        
        def resolve_information(self, info1, info2):
            # 信息冲突:信任度加权
            return info1 if info1.confidence > info2.confidence else info2
    
    resolver = ConflictResolver()
    
    # 检测并解决冲突
    for conflict in detected_conflicts:
        strategy = resolver.strategies.get(conflict.type)
        resolution = strategy(conflict.agents)
        print(f"冲突解决: {conflict.type} -> {resolution}")

    多Agent系统的性能优化

    多Agent系统的监控与调试

    # 完整的Agent执行追踪系统
    class AgentTracer:
        def __init__(self):
            self.traces = []
        
        def trace(self, agent_id, action, input_data, output_data, duration):
            self.traces.append({
                "timestamp": datetime.now().isoformat(),
                "agent_id": agent_id,
                "action": action,
                "input_size": len(str(input_data)),
                "output_size": len(str(output_data)),
                "duration_ms": duration,
                "status": "success" if output_data else "failed"
            })
        
        def get_trace_summary(self):
            total = len(self.traces)
            success = sum(1 for t in self.traces if t["status"] == "success")
            avg_duration = sum(t["duration_ms"] for t in self.traces) / max(total, 1)
            
            return {
                "total_actions": total,
                "success_rate": success / max(total, 1),
                "avg_duration_ms": avg_duration,
                "by_agent": self.groupby_agent()
            }
        
        def groupby_agent(self):
            by_agent = {}
            for trace in self.traces:
                aid = trace["agent_id"]
                if aid not in by_agent:
                    by_agent[aid] = []
                by_agent[aid].append(trace)
            return by_agent
    
    tracer = AgentTracer()
    summary = tracer.get_trace_summary()
    print(f"总执行次数: {summary['total_actions']}")
    print(f"成功率: {summary['success_rate']:.1%}")
    print(f"平均耗时: {summary['avg_duration_ms']:.0f}ms")

    实际多Agent协作案例:智能投研平台

    一个典型的多Agent应用场景是智能投研平台。这个系统需要同时处理信息搜索、数据分析、报告撰写等多个任务,单一Agent无法高效完成。

    系统架构

    # 智能投研平台Agent团队
    ├── 数据采集Agent(负责爬取新闻、公告、财务数据)
    ├── 数据分析Agent(负责数据清洗、统计、异常检测)
    ├── 行业研究Agent(负责行业趋势、竞争格局分析)
    ├── 报告撰写Agent(负责生成研究报告初稿)
    ├── 质量审核Agent(负责审核报告逻辑、数据准确性)
    └── 协调Agent(负责任务分配、进度跟踪、结果整合)
    
    # 协调Agent的核心逻辑
    class ResearchCoordinator:
        def __init__(self):
            self.team = {
                "collector": DataCollectorAgent(),
                "analyst": DataAnalystAgent(),
                "researcher": IndustryResearcherAgent(),
                "writer": ReportWriterAgent(),
                "reviewer": QualityReviewerAgent()
            }
        
        def run_research(self, topic: str) -> dict:
            # Step 1: 并行收集数据和行业信息
            collector_task = self.team["collector"].async_run(topic)
            researcher_task = self.team["researcher"].async_run(topic)
            
            collected_data = collector_task.get()
            industry_info = researcher_task.get()
            
            # Step 2: 分析数据
            analysis = self.team["analyst"].run(
                raw_data=collected_data,
                industry_context=industry_info
            )
            
            # Step 3: 撰写报告
            draft = self.team["writer"].run(
                analysis=analysis,
                topic=topic
            )
            
            # Step 4: 审核报告
            final_report = self.team["reviewer"].run(draft)
            
            return final_report
    
    coordinator = ResearchCoordinator()
    report = coordinator.run_research("新能源汽车行业2024年发展趋势")

    常见问题与解决方案

    问题:多个Agent之间通信延迟高
    解决方案:使用异步消息队列(Redis/ RabbitMQ)替代同步HTTP调用。将同步等待改为消息订阅模式,减少Agent间的阻塞时间。
    问题:某个Agent执行时间过长导致整体超时
    解决方案:每个Agent设置独立的超时时间(建议5-30秒),超时后自动降级或返回部分结果。整体流程设置熔断器,失败率达到阈值时快速失败。
    问题:多Agent结果不一致,难以整合
    解决方案:建立统一的结果Schema,所有Agent输出遵循同一格式。整合时使用LLM进行结果融合,或设置优先级规则自动选择。

    多Agent系统的评估指标

    指标定义优秀值
    任务完成率成功完成的任务数/总任务数>95%
    平均响应时间从任务提交到结果返回的总时间<5秒
    Agent利用率各Agent实际工作时间/总时间>60%
    冲突解决率成功解决的Agent间冲突数/总冲突数>90%
    系统可用性系统正常服务时间/总运行时间>99.5%