Skip to content

Pipelines

%Annihilation.Pipeline{
id: 1,
name: "default",
match_labels: [], # Bead labels to match
match_types: [], # Bead types to match
priority: 100, # Lower = checked first
source: :config, # :config | :tether | :psychonaut
created_by: "seed_loader",
created_at: ~U[...],
active: true,
stages: [
%Pipeline.Stage{agents: [%{name: "orchestrator"}], fan_out: false},
%Pipeline.Stage{agents: [%{name: "coder"}], fan_out: false},
%Pipeline.Stage{agents: [%{name: "security"}, %{name: "tester"}], fan_out: true}
]
}

Pipelines are seeded from YAML files via Pipeline.SeedLoader:

# .annihilation/pipelines.yaml (project-level)
# ~/.annihilation/pipelines.yaml (global, lower priority)
default:
stages:
- agents: ["orchestrator"]
fan_out: false
- agents: ["coder"]
fan_out: false
- agents: ["security", "tester"]
fan_out: true
frontend:
match_labels: ["ui", "frontend", "css", "react"]
priority: 50
stages:
- agents: ["orchestrator"]
fan_out: false
- agents: ["coder"]
fan_out: false
- agents: ["a11y", "tester"]
fan_out: true
bugfix:
match_labels: ["bug", "hotfix"]
match_types: ["bug"]
priority: 50
stages:
- agents: ["coder"]
fan_out: false
- agents: ["tester"]
fan_out: false
  1. Global (~/.annihilation/pipelines.yaml)
  2. Project (.annihilation/pipelines.yaml) — overrides global with same name
  3. If a pipeline was created by :tether or :psychonaut source, YAML seed is skipped for that name
  4. default pipeline is always ensured to exist
  • fan_out: false — agents run sequentially; each agent’s output feeds the next
  • fan_out: true — agents run concurrently; all must complete for the stage to pass

A stage is complete when all agents reach a terminal status (:done or :error). If any agent fails:

  • Fan-out stage: stage fails after all agents finish
  • Sequential stage: chain is broken immediately (:sequential_blocked)
%Annihilation.Pipeline.Stage{
agents: [%{name: "coder", system_prompt: "..."}],
fan_out: false,
condition: nil, # Reserved for conditional stage execution
retries: 0 # Reserved for retry logic
}

When a bead enters a burst, Keeper.match_pipeline/2:

  1. Reads bead labels and type
  2. Queries pipelines ordered by priority (ascending)
  3. Checks each pipeline’s match_labels and match_types
  4. First match wins
  5. Falls back to default pipeline if no match

Pipelines are stored in the SQLite database via Beads.Keeper:

# Create a pipeline
Keeper.create_pipeline(%{name: "my_pipeline", stages: [...], match_labels: ["api"]})
# Read a pipeline
Keeper.get_pipeline("my_pipeline")
# Update a pipeline
Keeper.update_pipeline("my_pipeline", %{stages: [...]})
# List all pipelines
Keeper.list_pipelines()
# Match a pipeline for a bead
Keeper.match_pipeline(bead_id)

Individual beads can have pipeline overrides via bead_pipelines table:

Keeper.set_bead_pipeline(bead_id, pipeline_name)

This takes absolute precedence over label/type matching.

Psychonauts can modify pipelines at runtime using tools:

Override the pipeline for the current bead:

# Psychonaut invokes set_pipeline tool with:
%{"pipeline_name" => "bugfix", "reason" => "This is a bug, not a feature"}

Create a new pipeline definition:

# Psychonaut invokes create_pipeline tool with:
%{
"name" => "data_migration",
"match_labels" => ["migration", "data"],
"stages" => [
%{"agents" => ["coder"], "fan_out" => false},
%{"agents" => ["tester", "reviewer"], "fan_out" => true}
],
"reason" => "Need a specialized pipeline for data migrations"
}

Pipeline mutations go through Pipeline.GroundingQueue before being applied:

  1. Mutation submitted -> entry created with :pending status
  2. Tether has a timeout window (default 2 minutes) to respond
  3. Three response types:
    • {:approve, _} — apply as-is
    • {:reject, reason} — do not apply
    • {:modify, modified_value} — apply with Tether’s changes
  4. Timeout -> mutation drifts (applied as assumption, recorded in AssumptionsLedger)
# Submit a mutation
{:ok, entry_id} = GroundingQueue.submit(%{
mutation_type: :set_pipeline,
proposed_value: %{stages: [...]},
agent_id: "agent_123",
reason: "Better fit for this task"
})
# Wait for response
case GroundingQueue.await(entry_id) do
{:approved, value} -> apply_pipeline(value)
{:rejected, reason} -> log_rejection(reason)
{:drifted, proposed} -> apply_as_assumption(proposed)
end

Pipeline drifts can be reviewed after the fact via PipelineDriftReview:

PipelineDriftReview.ground(drift_id) # Confirm the pipeline change was fine
PipelineDriftReview.reject(drift_id, "Should have used the default pipeline")

Each agent receives:

  • The original bead description (from Keeper.show/2)
  • Accumulated output from all previous stages (formatted as Markdown)
  • Access to the project filesystem via tools

The orchestrator (first stage) typically writes a plan. Subsequent stages read it and build on it.