Pipelines
Pipeline Struct
Section titled “Pipeline Struct”%Annihilation.Pipeline{ id: 1, name: "default", match_labels: [], # Bead labels to match match_types: [], # Bead types to match priority: 100, # Lower = checked first source: :config, # :config | :tether | :psychonaut created_by: "seed_loader", created_at: ~U[...], active: true, stages: [ %Pipeline.Stage{agents: [%{name: "orchestrator"}], fan_out: false}, %Pipeline.Stage{agents: [%{name: "coder"}], fan_out: false}, %Pipeline.Stage{agents: [%{name: "security"}, %{name: "tester"}], fan_out: true} ]}Pipeline Recipes (YAML)
Section titled “Pipeline Recipes (YAML)”Pipelines are seeded from YAML files via Pipeline.SeedLoader:
# .annihilation/pipelines.yaml (project-level)# ~/.annihilation/pipelines.yaml (global, lower priority)
default: stages: - agents: ["orchestrator"] fan_out: false - agents: ["coder"] fan_out: false - agents: ["security", "tester"] fan_out: true
frontend: match_labels: ["ui", "frontend", "css", "react"] priority: 50 stages: - agents: ["orchestrator"] fan_out: false - agents: ["coder"] fan_out: false - agents: ["a11y", "tester"] fan_out: true
bugfix: match_labels: ["bug", "hotfix"] match_types: ["bug"] priority: 50 stages: - agents: ["coder"] fan_out: false - agents: ["tester"] fan_out: falseLoading Order
Section titled “Loading Order”- Global (
~/.annihilation/pipelines.yaml) - Project (
.annihilation/pipelines.yaml) — overrides global with same name - If a pipeline was created by
:tetheror:psychonautsource, YAML seed is skipped for that name defaultpipeline is always ensured to exist
Stage Semantics
Section titled “Stage Semantics”fan_out: false— agents run sequentially; each agent’s output feeds the nextfan_out: true— agents run concurrently; all must complete for the stage to pass
A stage is complete when all agents reach a terminal status (:done or :error). If any agent fails:
- Fan-out stage: stage fails after all agents finish
- Sequential stage: chain is broken immediately (
:sequential_blocked)
Stage Config
Section titled “Stage Config”%Annihilation.Pipeline.Stage{ agents: [%{name: "coder", system_prompt: "..."}], fan_out: false, condition: nil, # Reserved for conditional stage execution retries: 0 # Reserved for retry logic}Matching Rules
Section titled “Matching Rules”When a bead enters a burst, Keeper.match_pipeline/2:
- Reads bead labels and type
- Queries pipelines ordered by priority (ascending)
- Checks each pipeline’s
match_labelsandmatch_types - First match wins
- Falls back to
defaultpipeline if no match
Pipeline CRUD
Section titled “Pipeline CRUD”Pipelines are stored in the SQLite database via Beads.Keeper:
# Create a pipelineKeeper.create_pipeline(%{name: "my_pipeline", stages: [...], match_labels: ["api"]})
# Read a pipelineKeeper.get_pipeline("my_pipeline")
# Update a pipelineKeeper.update_pipeline("my_pipeline", %{stages: [...]})
# List all pipelinesKeeper.list_pipelines()
# Match a pipeline for a beadKeeper.match_pipeline(bead_id)Per-Bead Overrides
Section titled “Per-Bead Overrides”Individual beads can have pipeline overrides via bead_pipelines table:
Keeper.set_bead_pipeline(bead_id, pipeline_name)This takes absolute precedence over label/type matching.
Pipeline Mutations (Runtime)
Section titled “Pipeline Mutations (Runtime)”Psychonauts can modify pipelines at runtime using tools:
set_pipeline Tool
Section titled “set_pipeline Tool”Override the pipeline for the current bead:
# Psychonaut invokes set_pipeline tool with:%{"pipeline_name" => "bugfix", "reason" => "This is a bug, not a feature"}create_pipeline Tool
Section titled “create_pipeline Tool”Create a new pipeline definition:
# Psychonaut invokes create_pipeline tool with:%{ "name" => "data_migration", "match_labels" => ["migration", "data"], "stages" => [ %{"agents" => ["coder"], "fan_out" => false}, %{"agents" => ["tester", "reviewer"], "fan_out" => true} ], "reason" => "Need a specialized pipeline for data migrations"}Pipeline Grounding Queue
Section titled “Pipeline Grounding Queue”Pipeline mutations go through Pipeline.GroundingQueue before being applied:
- Mutation submitted -> entry created with
:pendingstatus - Tether has a timeout window (default 2 minutes) to respond
- Three response types:
{:approve, _}— apply as-is{:reject, reason}— do not apply{:modify, modified_value}— apply with Tether’s changes
- Timeout -> mutation drifts (applied as assumption, recorded in AssumptionsLedger)
# Submit a mutation{:ok, entry_id} = GroundingQueue.submit(%{ mutation_type: :set_pipeline, proposed_value: %{stages: [...]}, agent_id: "agent_123", reason: "Better fit for this task"})
# Wait for responsecase GroundingQueue.await(entry_id) do {:approved, value} -> apply_pipeline(value) {:rejected, reason} -> log_rejection(reason) {:drifted, proposed} -> apply_as_assumption(proposed)endPipeline Drift Review
Section titled “Pipeline Drift Review”Pipeline drifts can be reviewed after the fact via PipelineDriftReview:
PipelineDriftReview.ground(drift_id) # Confirm the pipeline change was finePipelineDriftReview.reject(drift_id, "Should have used the default pipeline")Data Flow Between Stages
Section titled “Data Flow Between Stages”Each agent receives:
- The original bead description (from
Keeper.show/2) - Accumulated output from all previous stages (formatted as Markdown)
- Access to the project filesystem via tools
The orchestrator (first stage) typically writes a plan. Subsequent stages read it and build on it.