🚀 快速安装

复制以下命令并运行,立即安装此 Skill:

npx skills add https://skills.sh/aradotso/trending-skills/openai-symphony-autonomous-agents

💡 提示:需要 Node.js 和 NPM

OpenAI Symphony

技能来自 ara.so — 每日 2026 技能合集。

Symphony 将项目工作转化为隔离的、自主的实现运行,让团队能够管理工作而不是监督编码代理。你不再需要观看代理编写代码,而是定义任务(例如在 Linear 中),Symphony 会生成代理来完成这些任务,提供工作证明(CI 状态、PR 审查、演示视频),并自动提交 PR。


Symphony 的功能

  • 监控工作跟踪器(例如 Linear)中的任务
  • 为每个任务生成隔离的代理运行(使用 Codex 或类似工具)
  • 每个代理实现任务,打开 PR,并提供工作证明
  • 工程师审查结果,而不是代理会话
  • 在使用 harness engineering 的代码库中效果最佳

安装选项

选项 1:让代理来构建

将以下提示粘贴到 Claude Code、Cursor 或 Codex 中:

根据以下规范实现 Symphony:
https://github.com/openai/symphony/blob/main/SPEC.md

选项 2:使用 Elixir 参考实现

git clone https://github.com/openai/symphony.git
cd symphony/elixir

按照 elixir/README.md 操作,或者让代理来操作:

根据以下内容为我的仓库设置 Symphony:
https://github.com/openai/symphony/blob/main/elixir/README.md

Elixir 参考实现设置

环境要求

  • 已安装 Elixir + Mix
  • OpenAI API 密钥(用于 Codex 代理)
  • Linear API 密钥(如果使用 Linear 集成)
  • GitHub 令牌(用于 PR 操作)

环境变量

export OPENAI_API_KEY="sk-..."           # 用于 Codex 的 OpenAI API 密钥
export LINEAR_API_KEY="lin_api_..."      # Linear 集成
export GITHUB_TOKEN="ghp_..."           # GitHub PR 操作
export SYMPHONY_REPO_PATH="/path/to/repo"  # 目标仓库

安装依赖项

cd elixir
mix deps.get

配置 (elixir/config/config.exs)

import Config

config :symphony,
  openai_api_key: System.get_env("OPENAI_API_KEY"),
  linear_api_key: System.get_env("LINEAR_API_KEY"),
  github_token: System.get_env("GITHUB_TOKEN"),
  repo_path: System.get_env("SYMPHONY_REPO_PATH", "./"),
  poll_interval_ms: 30_000,
  max_concurrent_agents: 3

运行 Symphony

mix symphony.start
# 或在 IEx 中进行开发
iex -S mix

核心概念

隔离的实现运行

每个任务都有自己隔离的运行环境:

  • 每个任务都有一个全新的 git 分支
  • 代理仅在该分支内操作
  • 运行之间无共享状态
  • 在 PR 合并前收集工作证明

工作证明

在 PR 被接受之前,Symphony 会收集:

  • CI/CD 流水线状态
  • PR 审查反馈
  • 复杂度分析
  • (可选)演示视频

关键 Elixir 模块与模式

启动 Symphony 监控器

# 在你的 application.ex 或直接调用
defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    children = [
      Symphony.Supervisor
    ]
    Supervisor.start_link(children, strategy: :one_for_one)
  end
end

定义任务(Symphony Task 结构体)

defmodule Symphony.Task do
  @type t :: %__MODULE__{
    id: String.t(),
    title: String.t(),
    description: String.t(),
    source: :linear | :manual,
    status: :pending | :running | :completed | :failed,
    branch: String.t() | nil,
    pr_url: String.t() | nil,
    proof_of_work: map() | nil
  }

  defstruct [:id, :title, :description, :source,
             status: :pending, branch: nil,
             pr_url: nil, proof_of_work: nil]
end

生成代理运行

defmodule Symphony.AgentRunner do
  @doc """
  为给定任务生成一个隔离的代理运行。
  每次运行都有自己独立的分支和 Codex 会话。
  """
  def run(task) do
    branch = "symphony/#{task.id}-#{slugify(task.title)}"

    with :ok <- Git.create_branch(branch),
         {:ok, result} <- Codex.implement(task, branch),
         {:ok, pr_url} <- GitHub.open_pr(branch, task),
         {:ok, proof} <- ProofOfWork.collect(pr_url) do
      {:ok, %{task | status: :completed, pr_url: pr_url, proof_of_work: proof}}
    else
      {:error, reason} -> {:error, reason}
    end
  end

  defp slugify(title) do
    title
    |> String.downcase()
    |> String.replace(~r/[^a-z0-9]+/, "-")
    |> String.trim("-")
  end
end

Linear 集成 —— 轮询任务

defmodule Symphony.Linear.Poller do
  use GenServer

  @poll_interval Application.compile_env(:symphony, :poll_interval_ms, 30_000)

  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def init(_opts) do
    schedule_poll()
    {:ok, %{processed_ids: MapSet.new()}}
  end

  def handle_info(:poll, state) do
    case Symphony.Linear.Client.fetch_todo_tasks() do
      {:ok, tasks} ->
        new_tasks = Enum.reject(tasks, &MapSet.member?(state.processed_ids, &1.id))
        Enum.each(new_tasks, &Symphony.AgentRunner.run/1)
        new_ids = Enum.reduce(new_tasks, state.processed_ids, &MapSet.put(&2, &1.id))
        schedule_poll()
        {:noreply, %{state | processed_ids: new_ids}}

      {:error, reason} ->
        Logger.error("Linear 轮询失败: #{inspect(reason)}")
        schedule_poll()
        {:noreply, state}
    end
  end

  defp schedule_poll do
    Process.send_after(self(), :poll, @poll_interval)
  end
end

Linear API 客户端

defmodule Symphony.Linear.Client do
  @linear_api "https://api.linear.app/graphql"

  def fetch_todo_tasks do
    query = """
    query {
      issues(filter: { state: { name: { eq: "Todo" } } }) {
        nodes {
          id
          title
          description
        }
      }
    }
    """

    case HTTPoison.post(@linear_api, Jason.encode!(%{query: query}), headers()) do
      {:ok, %{status_code: 200, body: body}} ->
        tasks =
          body
          |> Jason.decode!()
          |> get_in(["data", "issues", "nodes"])
          |> Enum.map(&to_task/1)
        {:ok, tasks}

      {:error, reason} ->
        {:error, reason}
    end
  end

  defp headers do
    [
      {"Authorization", System.get_env("LINEAR_API_KEY")},
      {"Content-Type", "application/json"}
    ]
  end

  defp to_task(%{"id" => id, "title" => title, "description" => desc}) do
    %Symphony.Task{id: id, title: title, description: desc, source: :linear}
  end
end

工作证明收集

defmodule Symphony.ProofOfWork do
  @doc """
  在 PR 合并前收集其工作证明。
  返回一个包含 CI 状态、审查反馈和复杂度的映射。
  """
  def collect(pr_url) do
    with {:ok, ci_status} <- wait_for_ci(pr_url),
         {:ok, reviews} <- fetch_reviews(pr_url),
         {:ok, complexity} <- analyze_complexity(pr_url) do
      {:ok, %{
        ci_status: ci_status,
        reviews: reviews,
        complexity: complexity,
        collected_at: DateTime.utc_now()
      }}
    end
  end

  defp wait_for_ci(pr_url, retries \\ 30) do
    case GitHub.get_pr_ci_status(pr_url) do
      {:ok, :success} -> {:ok, :success}
      {:ok, :pending} when retries > 0 ->
        Process.sleep(60_000)
        wait_for_ci(pr_url, retries - 1)
      {:ok, status} -> {:ok, status}
      {:error, reason} -> {:error, reason}
    end
  end

  defp fetch_reviews(pr_url), do: GitHub.get_pr_reviews(pr_url)
  defp analyze_complexity(pr_url), do: GitHub.get_pr_diff_complexity(pr_url)
end

实现 SPEC.md(自定义实现)

在另一种语言中构建 Symphony 时,规范定义了:

  1. 任务源 —— 从 Linear/GitHub/Jira 轮询特定状态的任务
  2. 代理调用 —— 使用任务上下文调用 Codex(或其他代理)
  3. 隔离 —— 每次运行都在新分支上进行,如果可能则容器化
  4. 工作证明 —— 合并前进行 CI、审查和分析
  5. 落地 —— 自动合并或提交给工程师审批

伪代码的最小实现循环:

# 核心 symphony 循环
def symphony_loop(state) do
  tasks = fetch_new_tasks(state.source)

  tasks
  |> Enum.filter(&(&1.status == :todo))
  |> Enum.each(fn task ->
    Task.async(fn ->
      branch = create_isolated_branch(task)
      invoke_agent(task, branch)         # Codex / Claude / 等等
      proof = collect_proof_of_work(branch)
      present_for_review(task, proof)
    end)
  end)

  Process.sleep(state.poll_interval)
  symphony_loop(state)
end

常见模式

限制并发代理运行数

defmodule Symphony.AgentPool do
  use GenServer

  @max_concurrent 3

  def start_link(_), do: GenServer.start_link(__MODULE__, %{running: 0, queue: []}, name: __MODULE__)

  def submit(task) do
    GenServer.cast(__MODULE__, {:submit, task})
  end

  def handle_cast({:submit, task}, %{running: n} = state) when n < @max_concurrent do
    spawn_agent(task)
    {:noreply, %{state | running: n + 1}}
  end

  def handle_cast({:submit, task}, %{queue: q} = state) do
    {:noreply, %{state | queue: q ++ [task]}}
  end

  def handle_info({:agent_done, _result}, %{running: n, queue: [next | rest]} = state) do
    spawn_agent(next)
    {:noreply, %{state | running: n, queue: rest}}
  end

  def handle_info({:agent_done, _result}, %{running: n} = state) do
    {:noreply, %{state | running: n - 1}}
  end

  defp spawn_agent(task) do
    parent = self()
    spawn(fn ->
      result = Symphony.AgentRunner.run(task)
      send(parent, {:agent_done, result})
    end)
  end
end

手动任务注入(无需 Linear)

# 在 IEx 或 Mix 任务中
Symphony.AgentPool.submit(%Symphony.Task{
  id: "manual-001",
  title: "为 API 添加速率限制",
  description: "在 /api/v1 端点上实现令牌桶速率限制",
  source: :manual
})

故障排除

问题 可能原因 解决方案
代理未生成 缺少 OPENAI_API_KEY 检查环境变量是否已导出
未检测到 Linear 任务 Linear 状态过滤器错误 更新查询过滤器以匹配你看板的状态名称
PR 未打开 缺少 GITHUB_TOKEN 或仓库错误 验证令牌具有 repo 范围
CI 从未完成 超时时间太短 增加 wait_for_ci/2 中的 retries 次数
并发运行过多 默认池大小 在配置中设置 max_concurrent_agents
分支冲突 代理重复使用分支名称 确保每次运行的任务 ID 是唯一的

调试模式

# 在 config/dev.exs 中
config :symphony, log_level: :debug

# 或者在运行时
Logger.put_module_level(Symphony.AgentRunner, :debug)
Logger.put_module_level(Symphony.Linear.Poller, :debug)

资源

📄 原始文档

完整文档(英文):

https://skills.sh/aradotso/trending-skills/openai-symphony-autonomous-agents

💡 提示:点击上方链接查看 skills.sh 原始英文文档,方便对照翻译。

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。