Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

11.9 Agentic 数据飞轮:让 Agent 自我进化

🔄 "最好的训练数据不是人工标注的,而是你的 Agent 自己产生的。"

第 11.8 节解决了"如何构建第一批 Agent 训练数据"。但这只是起点。

真正强大的 Agent 系统遵循一个循环:Agent 运行 → 产生轨迹 → 过滤高质量轨迹 → 训练更强模型 → 更强模型产生更好轨迹 → ...

这个闭环就是 Agentic 数据飞轮(Agentic Data Flywheel)——也是 DeepSeek、OpenAI、Anthropic 等顶尖团队能持续迭代的核心秘密之一。


飞轮的整体架构

Agentic 数据飞轮架构图

每一圈飞轮转动,模型能力提升一个台阶;台阶越高,产生的轨迹质量越好,下一圈的原材料更优质。


第一层:轨迹收集

飞轮的原材料是 Agent 运行时产生的完整交互记录(Trajectory)。

什么值得收集?

from dataclasses import dataclass, field
from datetime import datetime

@dataclass
class AgentTrajectory:
    """一条完整的 Agent 运行轨迹"""
    
    # 基本元数据
    trajectory_id: str
    timestamp: datetime
    session_id: str
    
    # 完整交互记录
    system_prompt: str
    messages: list[dict]         # 完整的 system/user/assistant/tool 序列
    tool_definitions: list[dict] # 本次会话的工具定义
    
    # 结果信息
    final_response: str
    task_completed: bool
    
    # 成本信息(用于后续筛选)
    total_tokens: int
    total_tool_calls: int
    tool_call_failures: int
    latency_ms: float
    
    # 用户反馈(如有)
    user_rating: float | None = None
    user_followup: bool = False   # 用户是否继续追问(隐式满意信号)
    
    # 自动标注(异步计算)
    auto_quality_score: float = 0.0
    is_in_training_set: bool = False


class TrajectoryCollector:
    """线上轨迹收集系统"""
    
    def __init__(self, storage_backend, sampling_rate: float = 1.0):
        self.storage = storage_backend
        self.sampling_rate = sampling_rate  # 采样率(高流量时可<1.0)
    
    async def record(self, trajectory: AgentTrajectory):
        """记录一条轨迹(异步,不阻塞主流程)"""
        import random
        
        # 采样:不是每条都存
        if random.random() > self.sampling_rate:
            return
        
        # 脱敏处理
        cleaned = await self._anonymize(trajectory)
        
        # 异步存储(不影响线上延迟)
        await self.storage.save(cleaned)
        
        # 触发异步质量评分
        asyncio.create_task(self._score_trajectory(cleaned))
    
    async def _anonymize(self, traj: AgentTrajectory) -> AgentTrajectory:
        """脱敏:删除 PII"""
        import re
        for msg in traj.messages:
            content = str(msg.get("content", ""))
            content = re.sub(r'\b1[3-9]\d{9}\b', '[PHONE]', content)
            content = re.sub(r'\b[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\b', '[EMAIL]', content)
            msg["content"] = content
        return traj

第二层:质量过滤与标注

原始轨迹质量参差不齐,必须经过严格筛选才能进入训练集。

双维度过滤:结果 × 过程

class TrajectoryFilter:
    """
    双维度过滤:
    - 结果维度:任务完成了吗?用户满意吗?
    - 过程维度:推理合理吗?工具使用规范吗?
    """
    
    def __init__(self, reward_model=None):
        self.reward_model = reward_model
    
    def compute_quality_score(self, traj: AgentTrajectory) -> float:
        """综合质量分(0-1)"""
        scores = {}
        
        # ── 结果维度(占 50%)──────────────────────────────
        # 1a. 显式用户满意度
        if traj.user_rating is not None:
            scores["explicit_satisfaction"] = traj.user_rating / 5.0
        else:
            scores["explicit_satisfaction"] = 0.5  # 中性默认值
        
        # 1b. 隐式满意度(用户继续对话 = 满意)
        scores["implicit_satisfaction"] = 0.8 if traj.user_followup else 0.4
        
        # 1c. 任务是否有实质性完成
        scores["task_completion"] = 1.0 if traj.task_completed else 0.2
        
        # ── 过程维度(占 50%)──────────────────────────────
        # 2a. 工具调用成功率
        if traj.total_tool_calls > 0:
            success_rate = 1 - traj.tool_call_failures / traj.total_tool_calls
            scores["tool_success_rate"] = success_rate
        else:
            scores["tool_success_rate"] = 1.0  # 没调用工具也算正常
        
        # 2b. 效率(轨迹长度惩罚,避免空转)
        optimal_calls = max(1, traj.total_tool_calls)
        efficiency = min(1.0, 5 / max(traj.total_tool_calls, 1))
        scores["efficiency"] = efficiency if traj.total_tool_calls <= 5 else 0.5
        
        # 2c. 格式合规性(工具调用都是合法 JSON)
        scores["format_compliance"] = self._check_format_compliance(traj.messages)
        
        # 2d. 奖励模型打分(如果有)
        if self.reward_model:
            rm_score = self.reward_model.score(traj)
            scores["reward_model"] = rm_score
        
        # 加权平均
        weights = {
            "explicit_satisfaction": 0.2,
            "implicit_satisfaction": 0.1,
            "task_completion": 0.2,
            "tool_success_rate": 0.2,
            "efficiency": 0.1,
            "format_compliance": 0.1,
            "reward_model": 0.1 if self.reward_model else 0,
        }
        
        total_weight = sum(weights[k] for k in scores)
        final_score = sum(scores[k] * weights[k] for k in scores) / total_weight
        return final_score
    
    def _check_format_compliance(self, messages: list[dict]) -> float:
        """检查工具调用格式合规率"""
        tool_call_messages = [m for m in messages 
                              if m.get("role") == "assistant" and "tool_calls" in m]
        if not tool_call_messages:
            return 1.0
        
        valid = 0
        for msg in tool_call_messages:
            try:
                for call in msg["tool_calls"]:
                    assert "name" in call and "arguments" in call
                    assert isinstance(call["arguments"], dict)
                valid += 1
            except (AssertionError, KeyError, TypeError):
                pass
        
        return valid / len(tool_call_messages)
    
    def filter_batch(self, 
                     trajectories: list[AgentTrajectory],
                     min_score: float = 0.7) -> list[AgentTrajectory]:
        """批量过滤,返回高质量样本"""
        filtered = []
        score_distribution = []
        
        for traj in trajectories:
            score = self.compute_quality_score(traj)
            traj.auto_quality_score = score
            score_distribution.append(score)
            
            if score >= min_score:
                filtered.append(traj)
        
        # 打印统计信息
        import statistics
        print(f"过滤结果:{len(filtered)}/{len(trajectories)} 通过")
        print(f"质量分布:mean={statistics.mean(score_distribution):.2f}, "
              f"median={statistics.median(score_distribution):.2f}")
        return filtered

负样本的价值:失败的轨迹也有用

class NegativeSampleExtractor:
    """从失败轨迹中提取对比学习样本"""
    
    def extract_contrastive_pairs(
        self, 
        good_traj: AgentTrajectory,
        bad_traj: AgentTrajectory
    ) -> dict:
        """
        构建偏好对(用于 DPO 训练)
        相同任务,一个成功轨迹(chosen)+ 一个失败轨迹(rejected)
        """
        return {
            "prompt": good_traj.messages[1]["content"],  # 用户问题
            "chosen": self._trajectory_to_text(good_traj),
            "rejected": self._trajectory_to_text(bad_traj),
            "chosen_score": good_traj.auto_quality_score,
            "rejected_score": bad_traj.auto_quality_score,
        }
    
    def extract_error_cases(self, 
                            bad_trajectories: list[AgentTrajectory]) -> list[dict]:
        """
        从失败案例中提取训练样本:
        错误发生点 → 模型应该怎么做(人工修正或强模型修正)
        """
        error_samples = []
        
        for traj in bad_trajectories:
            # 找到第一个出错的位置
            error_point = self._find_first_error(traj.messages)
            if error_point is None:
                continue
            
            # 截取到出错点之前的上下文
            context = traj.messages[:error_point]
            
            # 用强模型生成正确的续写
            correct_continuation = self._get_correct_continuation(context, traj.tool_definitions)
            
            if correct_continuation:
                error_samples.append({
                    "context": context,
                    "wrong_response": traj.messages[error_point],
                    "correct_response": correct_continuation,
                    "error_type": self._classify_error(traj.messages[error_point]),
                })
        
        return error_samples
    
    def _classify_error(self, message: dict) -> str:
        """对错误类型分类(用于分析)"""
        content = str(message.get("content", ""))
        tool_calls = message.get("tool_calls", [])
        
        if tool_calls:
            call_name = tool_calls[0].get("name", "")
            if "not_exist" in call_name.lower() or call_name not in self.available_tools:
                return "hallucinated_tool"
            if not isinstance(tool_calls[0].get("arguments"), dict):
                return "invalid_format"
        
        if "error" in content.lower() and len(content) < 30:
            return "bare_error_propagation"  # 直接把错误消息返回给用户
        
        return "reasoning_error"

第三层:自动标注与奖励模型

对于没有用户反馈的轨迹,需要自动判断质量。

基于可验证任务的奖励(RLVR 思路)

class VerifiableRewardCalculator:
    """
    对于有可验证答案的任务,直接验证结果。
    这是最精准的奖励信号,无需奖励模型。
    
    类似于 DeepSeek-R1 的做法:只训练可验证任务,
    奖励来自于答案本身的正确性。
    """
    
    def __init__(self):
        self.verifiers = {
            "math": self._verify_math,
            "code": self._verify_code_execution,
            "sql": self._verify_sql_result,
            "factual": self._verify_factual_claim,
        }
    
    def compute_reward(self, 
                       traj: AgentTrajectory, 
                       task_type: str,
                       ground_truth=None) -> float:
        """计算奖励分数"""
        if task_type not in self.verifiers:
            return None  # 无法自动验证
        
        final_response = traj.final_response
        return self.verifiers[task_type](final_response, ground_truth)
    
    def _verify_math(self, response: str, ground_truth: float) -> float:
        """验证数学答案"""
        import re
        # 提取回复中的数字
        numbers = re.findall(r'-?\d+\.?\d*', response)
        if not numbers:
            return 0.0
        
        predicted = float(numbers[-1])  # 通常最后一个数字是答案
        if abs(predicted - ground_truth) < 1e-6:
            return 1.0
        elif abs(predicted - ground_truth) / (abs(ground_truth) + 1e-9) < 0.05:
            return 0.5  # 近似正确
        return 0.0
    
    def _verify_code_execution(self, response: str, expected_output: str) -> float:
        """提取并执行代码,验证输出"""
        import subprocess, re, tempfile
        
        # 提取代码块
        code_match = re.search(r'```python\n(.*?)\n```', response, re.DOTALL)
        if not code_match:
            return 0.0
        
        code = code_match.group(1)
        
        try:
            with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
                f.write(code)
                fname = f.name
            
            result = subprocess.run(
                ['python', fname], 
                capture_output=True, text=True, timeout=5
            )
            
            actual_output = result.stdout.strip()
            return 1.0 if actual_output == expected_output.strip() else 0.0
            
        except subprocess.TimeoutExpired:
            return 0.0
        except Exception:
            return 0.0


class LLMRewardModel:
    """
    当任务不可自动验证时,用强 LLM 作为奖励模型。
    
    成本权衡:每条轨迹调用一次 GPT-4.1 评估。
    建议:只对边界分数(0.5-0.7)的样本调用奖励模型。
    """
    
    EVALUATION_PROMPT = """你是一个 Agent 轨迹质量评估专家。

任务描述:{task}

Agent 的完整轨迹:
{trajectory}

请从以下维度评估(1-5分):
1. **推理质量**:思维过程是否清晰、合理?
2. **工具使用**:是否恰当选择并正确调用了工具?
3. **错误处理**:遇到错误时是否优雅处理?
4. **回答质量**:最终回答是否准确、完整?
5. **效率**:是否避免了不必要的步骤?

输出 JSON 格式:
{{"scores": [score1, score2, score3, score4, score5], "overall": <0-1的浮点数>, "reason": "<简短说明>"}}"""
    
    async def score(self, traj: AgentTrajectory) -> float:
        """用 GPT-4.1 对轨迹打分"""
        from openai import AsyncOpenAI
        client = AsyncOpenAI()
        
        trajectory_text = self._format_trajectory(traj)
        
        response = await client.chat.completions.create(
            model="gpt-4.1-mini",  # 用便宜的模型降低成本
            messages=[{
                "role": "user",
                "content": self.EVALUATION_PROMPT.format(
                    task=traj.messages[1]["content"],
                    trajectory=trajectory_text
                )
            }],
            response_format={"type": "json_object"},
            temperature=0,
        )
        
        result = json.loads(response.choices[0].message.content)
        return result.get("overall", 0.5)

第四层:训练与迭代

飞轮的迭代节奏

典型迭代节奏(生产团队)

混合训练:新数据 + 旧数据防止灾难性遗忘

class FlyWheelTrainer:
    """管理飞轮的训练循环"""
    
    def __init__(self, base_model, memory_ratio: float = 0.3):
        self.base_model = base_model
        self.memory_ratio = memory_ratio  # 旧数据占比(防止遗忘)
        self.all_historical_data = []     # 所有历史高质量数据
    
    def prepare_training_data(self, 
                               new_high_quality: list[AgentTrajectory],
                               max_samples: int = 20000) -> list[dict]:
        """
        混合新旧数据:
        - 70% 新数据(当前迭代收集的)
        - 30% 旧数据(防止灾难性遗忘)
        """
        self.all_historical_data.extend(new_high_quality)
        
        # 新数据
        new_samples = [self._to_training_format(t) for t in new_high_quality]
        
        # 历史数据(按质量分数加权采样)
        n_historical = int(max_samples * self.memory_ratio)
        historical_sample = self._weighted_sample(
            self.all_historical_data, 
            n=min(n_historical, len(self.all_historical_data)),
            weight_key="auto_quality_score"
        )
        historical_formatted = [self._to_training_format(t) for t in historical_sample]
        
        # 合并 + 打散
        import random
        combined = new_samples + historical_formatted
        random.shuffle(combined)
        
        print(f"训练数据:{len(new_samples)} 新 + {len(historical_formatted)} 旧 = {len(combined)} 总")
        return combined[:max_samples]
    
    def _weighted_sample(self, data, n, weight_key):
        """按质量分数加权采样(质量高的被选中概率更大)"""
        import random
        weights = [getattr(d, weight_key, 0.5) for d in data]
        return random.choices(data, weights=weights, k=min(n, len(data)))
    
    def run_iteration(self, 
                       new_trajectories: list[AgentTrajectory],
                       iteration: int) -> str:
        """运行一次飞轮迭代"""
        print(f"\n{'='*50}")
        print(f"飞轮第 {iteration} 轮迭代")
        print(f"{'='*50}")
        
        # 1. 过滤
        filter = TrajectoryFilter()
        high_quality = filter.filter_batch(new_trajectories, min_score=0.72)
        print(f"✅ 高质量样本:{len(high_quality)} 条")
        
        # 2. 提取失败案例(用于 DPO)
        low_quality = [t for t in new_trajectories 
                       if t.auto_quality_score < 0.4]
        print(f"📊 低质量样本(DPO 负样本):{len(low_quality)} 条")
        
        # 3. 准备训练数据
        training_data = self.prepare_training_data(high_quality)
        
        # 4. SFT 微调
        output_path = f"./agent-v{iteration}/sft"
        self._run_sft(training_data, output_path)
        
        # 5. DPO(如果有足够的偏好对)
        if len(low_quality) >= 100:
            self._run_dpo(high_quality, low_quality, f"./agent-v{iteration}/dpo")
        
        # 6. 评估新版本
        eval_results = self._evaluate_on_benchmark()
        print(f"📈 评估结果(迭代 {iteration}):{eval_results}")
        
        return f"./agent-v{iteration}"
    
    def _run_sft(self, data: list[dict], output_path: str):
        """执行 SFT 训练"""
        print(f"开始 SFT 训练({len(data)} 条样本)...")
        # ... 实际调用 Trainer
    
    def _run_dpo(self, chosen_trajs, rejected_trajs, output_path: str):
        """执行 DPO 训练"""
        print("开始 DPO 对比学习...")
        # ... 实际调用 DPOTrainer
    
    def _evaluate_on_benchmark(self) -> dict:
        """在标准基准上评估"""
        return {
            "tool_accuracy": 0.0,  # 占位,实际运行评估
            "task_completion": 0.0,
            "format_validity": 0.0,
        }

飞轮的三个关键加速因素

1. 任务难度课程(Curriculum)

class CurriculumManager:
    """
    从简单任务开始,逐步增加难度。
    简单任务 → 基础工具使用 → 多步推理 → 错误恢复 → 复杂长链
    """
    
    DIFFICULTY_LEVELS = {
        1: {"max_tool_calls": 1, "max_steps": 2},   # 单工具调用
        2: {"max_tool_calls": 3, "max_steps": 5},   # 顺序多工具
        3: {"max_tool_calls": 5, "max_steps": 8},   # 条件分支
        4: {"max_tool_calls": 8, "max_steps": 12},  # 复杂多步推理
        5: {"max_tool_calls": None, "max_steps": None},  # 无限制
    }
    
    def get_difficulty_for_iteration(self, iteration: int) -> int:
        """随迭代逐步提升难度"""
        return min(5, 1 + iteration // 3)  # 每3轮提升一级
    
    def filter_by_difficulty(self, 
                              trajectories: list[AgentTrajectory],
                              level: int) -> list[AgentTrajectory]:
        """只保留目标难度级别的轨迹"""
        config = self.DIFFICULTY_LEVELS[level]
        result = []
        for traj in trajectories:
            if (config["max_tool_calls"] is None or 
                    traj.total_tool_calls <= config["max_tool_calls"]):
                result.append(traj)
        return result

2. 探索性采样(增加多样性)

class ExploratoryAgentRunner:
    """
    在线上 Agent 中引入探索:
    用更高的 temperature 运行一部分请求,
    发现"不常走的路径",丰富训练数据多样性。
    """
    
    def __init__(self, base_agent, exploration_rate: float = 0.1):
        self.base_agent = base_agent
        self.exploration_rate = exploration_rate
    
    async def run(self, user_request: str) -> tuple[str, AgentTrajectory]:
        """运行 Agent,部分请求使用探索性配置"""
        import random
        
        is_exploratory = random.random() < self.exploration_rate
        
        if is_exploratory:
            # 高 temperature 激励探索
            config = {"temperature": 0.8, "tag": "exploratory"}
        else:
            # 正常生产配置
            config = {"temperature": 0.2, "tag": "production"}
        
        response, trajectory = await self.base_agent.run(
            user_request, **config
        )
        trajectory.metadata = config
        
        return response, trajectory

3. 合成数据扩充(覆盖盲区)

class BlindSpotFixer:
    """
    分析现有训练数据的盲区,有针对性地合成补充。
    
    例如:发现模型在"工具调用失败后的处理"上准确率只有 40%,
    就专门合成 500 条这类场景的轨迹。
    """
    
    def identify_weaknesses(self, 
                             eval_results: dict,
                             threshold: float = 0.6) -> list[str]:
        """识别表现低于阈值的技能维度"""
        weak_skills = []
        for skill, score in eval_results.items():
            if score < threshold:
                weak_skills.append(skill)
        return weak_skills
    
    async def synthesize_for_weakness(self, 
                                       skill: str,
                                       n: int = 500) -> list[dict]:
        """针对薄弱技能合成专项训练数据"""
        
        skill_to_scenario = {
            "error_recovery": "工具调用返回错误,Agent 需要正确处理",
            "tool_selection": "多个工具可选,需要正确判断使用哪个",
            "format_validity": "需要输出严格的 JSON 格式工具调用",
            "multi_step_planning": "需要多步工具调用才能完成的复杂任务",
        }
        
        scenario = skill_to_scenario.get(skill, skill)
        
        # 批量合成
        # ... 调用合成管道
        print(f"合成 {n} 条 '{skill}' 场景训练数据...")
        return []  # 实际实现

飞轮的效果:现实案例参考

团队方法迭代次数效果
DeepSeekGRPO + 自生成数学轨迹~10 轮数学推理从 GPT-4 级别追上 o1
Reflection-70BSelf-reflection 自我批评~5 轮Llama 70B 超越 GPT-4 (有争议)
STaR / V-STaR用正确推理链自举5 轮数学准确率 +40%
AgentTuning多任务 Agent 轨迹微调1 轮通用 Agent 能力 +30%

📌 核心规律:数据飞轮前 3 轮改善最快(模型从"不会用工具"到"会用工具"的跨越),后续每轮收益递减,需要更精细的数据工程。


实战检查清单

开始构建你的 Agentic 数据飞轮之前,确认以下条件:

基础条件:
□ Agent 线上系统已稳定运行(每天 > 100 次调用)
□ 轨迹记录系统已部署(收集 system/user/tool/assistant 完整上下文)
□ 用户反馈渠道已接入(点赞/踩、满意度评分)

数据管道:
□ 脱敏流水线已就绪(GDPR/PIPL 合规)
□ 质量评分函数已实现并在 100 条样本上验证
□ 存储系统能支撑每天增量(建议用对象存储 + Parquet 格式)

训练条件:
□ 有 GPU 资源(至少 A100/H100 × 1,建议 × 4)
□ 训练代码已验证(本地小规模跑通)
□ 评估基准已定义(工具准确率、任务完成率等)

节奏规划:
□ 明确迭代周期(建议 2-4 周一次)
□ 有版本对比的 A/B 测试方案
□ 明确何时"停止迭代"的条件(边际收益低于阈值)

本节小结

Agentic 数据飞轮的本质是:用 Agent 自身的运行数据来训练更强的 Agent,形成自我强化循环

📋 每一环的关键

  • 收集:记录完整轨迹,不只是输入输出,还要有工具调用细节
  • 过滤:双维度(结果 × 过程),质量分 > 0.7 才入训练集
  • 标注:可验证任务用规则奖励;不可验证用 LLM-as-Judge
  • 训练:新数据 70% + 历史数据 30%(防止灾难性遗忘)
  • 部署:保留 10% 探索流量(发现新场景)

飞轮启动需要初始模型和初始数据,但一旦转起来,数据质量和模型能力会互相拉动提升。这也是为什么"先发优势"在 Agent 领域如此重要——早一步启动飞轮,就早一步积累别人赶不上的数据优势。

🔗 与前序章节的关系

  • 第 3.8 节:训练数据的质量工程(工程基础)
  • 第 11.2 节:SFT + LoRA 训练方法(训练技术)
  • 第 11.3-11.5 节:PPO/DPO/GRPO 算法(奖励信号)
  • 第 11.8 节:Agent 专项数据构建(数据来源)
  • 本节(11.9):将以上所有内容串成一个持续迭代的系统

参考文献

  1. Zelikman et al. "STaR: Bootstrapping Reasoning With Reasoning." NeurIPS 2022.
  2. Singh et al. "Beyond Human Data: Scaling Self-Training for Problem-Solving with Language Models (V-STaR)." 2024.
  3. Zeng et al. "AgentTuning: Enabling Generalized Agent Abilities for LLMs." 2023.
  4. Chen et al. "Self-play Fine-tuning Converts Weak Language Models to Strong Language Models (SPIN)." ICML 2024.
  5. Guo et al. "DeepSeek-R1: Incentivizing Reasoning Capability in LLMs via Reinforcement Learning." DeepSeek 2025.
  6. Mitra et al. "AgentInstruct: Toward Generative Teaching with Agentic Flows." Microsoft Research 2024.