Skip to content

Elixir API Reference

GenServer implementing a single psychonaut agent with phase machine.

# Start a psychonaut
{:ok, pid} = Agent.Server.start_link(%{
model: "claude-sonnet-4-20250514",
provider: Provider.Anthropic,
system_prompt: "You are...",
tools: [Tool.Shell, Tool.FileRead],
config: %{api_key: "sk-..."}
})
# Begin agent work (transitions :idle -> :streaming)
Agent.Server.run(pid)
# Get current state
state = Agent.Server.get_state(pid)
# Get conversation history
messages = Agent.Server.get_messages(pid)
# Stop the agent
Agent.Server.stop(pid)

Behaviour for LLM provider implementations.

@callback stream(messages, tools, config) :: {:ok, Enumerable.t()} | {:error, term()}
@callback format_messages(messages) :: [map()]
@callback format_tools(tools) :: [map()]

Anthropic Messages API provider via Mint HTTP/2 streaming.

Config: :api_key, :model (default claude-sonnet-4-20250514), :max_tokens (default 8192), :system, :temperature.

Behaviour for tool implementations.

@callback name() :: String.t()
@callback description() :: String.t()
@callback parameters_schema() :: map()
@callback execute(args :: map(), context :: map()) ::
{:ok, String.t() | map()} | {:error, String.t()}
# Convert module to definition map for provider API
Tool.to_definition(MyTool)
# -> %{name: "my_tool", description: "...", parameters: %{...}}

ETS-based tool name-to-module registry.

ToolRegistry.lookup("shell", agent_tools) # -> {:ok, Shell} | {:error, :not_found}
ToolRegistry.definitions(agent_tools) # -> [%{name:, description:, parameters:}]

Named supervisor for agent lifecycle management.

DynamicSupervisor.start_agent(config) # -> {:ok, pid, agent_id}
DynamicSupervisor.stop_agent(agent_id) # -> :ok
DynamicSupervisor.list_agents() # -> [{pid, agent_id}]
DynamicSupervisor.get_agent(agent_id) # -> {:ok, pid} | {:error, :not_found}

Recursive sub-call engine.

# Single synchronous sub-call
Recurse.call(parent_state, context, query, opts)
# -> {:ok, result_text} | {:error, reason}
# Concurrent fan-out with optional reduce
Recurse.fan_out(parent_state, tasks, opts)
# tasks = [%{task: "...", context: "..."}, ...]
# opts: :reduce_prompt, :max_concurrency (5), :timeout (300_000)
# -> {:ok, %{result: text, metadata: %{total_partitions:, succeeded:, failed:, reduce_used:}}}

Global counting semaphore for recursive children (default 16 slots).

RecursionSemaphore.acquire() # -> :ok | {:error, :semaphore_timeout}
RecursionSemaphore.release() # -> :ok
RecursionSemaphore.count() # -> non_neg_integer()

ETS-based inter-agent messaging.

Mailbox.send_message(from, to, subject, body, opts) # -> {:ok, %AgentMessage{}}
Mailbox.broadcast_message(from, subject, body, opts) # -> {:ok, %AgentMessage{}}
Mailbox.get_unread(agent_id) # -> [%AgentMessage{}]
Mailbox.get_all(agent_id) # -> [%AgentMessage{}]
Mailbox.mark_read(message_id) # -> :ok
Mailbox.mark_acknowledged(message_id) # -> :ok
Mailbox.clear_burst() # -> :ok

GenServer owning the SQLite database. Single writer, concurrent readers via WAL mode.

# Bead CRUD
Keeper.create(%{title: "...", type: :task, priority: 2}) # -> {:ok, %Bead{}}
Keeper.show(bead_id) # -> {:ok, %Bead{}}
Keeper.update(bead_id, %{status: :in_progress}) # -> :ok
Keeper.close_bead(bead_id, reason) # -> :ok
Keeper.ready() # -> {:ok, [%Bead{}]}
# Dependencies
Keeper.add_dep(bead_id, blocked_by_id) # -> :ok
Keeper.deps(bead_id) # -> {:ok, [dep_id]}
# Comments
Keeper.add_comment(bead_id, author, body) # -> :ok
Keeper.comments(bead_id) # -> {:ok, [%Comment{}]}
# Pipeline CRUD
Keeper.create_pipeline(attrs) # -> {:ok, pipeline_name}
Keeper.get_pipeline(name) # -> {:ok, %Pipeline{}} | {:error, :not_found}
Keeper.update_pipeline(name, changes) # -> :ok
Keeper.list_pipelines() # -> {:ok, [%Pipeline{}]}
Keeper.match_pipeline(bead_id) # -> {:ok, %Pipeline{}}
Keeper.set_bead_pipeline(bead_id, name) # -> :ok
# Search
Keeper.search("query") # -> {:ok, [%Bead{}]}

GenServer for append-only JSONL session management.

Store.open_session(session_id) # -> :ok
Store.close_session(session_id) # -> :ok
Store.append_event(session_id, event) # -> :ok
Writer.append(path, event) # Append JSON line to file
Reader.read_all(path) # -> {:ok, [event]}
Reader.stream(path) # -> Enumerable of events

FTS5 search index over session history.

{:ok, conn} = Index.ensure_index(project_root)
Index.insert(conn, %{burst_id: "...", content: "...", role: "assistant", ...})
{:ok, results} = Index.search(project_root, "SSE parser", limit: 10, agent: "coder")
Index.clear(conn)
Index.close(conn)

Priority-ordered question queue with timeout handling.

# Agent side (blocks until answered or timed out)
QuestionQueue.ask(%{agent_id: "...", text: "...", priority: :normal})
# -> {:ok, answer_text} | {:timed_out, %Question{}}
# Tether side
QuestionQueue.answer(question_id, answer_text)
# -> :ok | {:late, %Question{}} | {:error, :not_found}
QuestionQueue.list_pending() # -> [%Question{}]
QuestionQueue.get(question_id) # -> {:ok, %Question{}} | {:error, :not_found}
QuestionQueue.list_timed_out() # -> [%Question{}]

Drift lifecycle management with JSONL persistence.

AssumptionsLedger.create_drift(attrs) # -> {:ok, %AssumptionMade{}}
AssumptionsLedger.update_drift(id, update_fn) # -> {:ok, %AssumptionMade{}}
AssumptionsLedger.get_drift(id) # -> {:ok, %AssumptionMade{}} | {:error, :not_found}
AssumptionsLedger.find_by_question_id(qid) # -> {:ok, %AssumptionMade{}} | {:error, :not_found}
AssumptionsLedger.list_drifts(opts) # -> [%AssumptionMade{}]
AssumptionsLedger.list_active_drifts() # -> [%AssumptionMade{}]

Actions on drifts.

DriftReview.ground(drift_id, note: "...") # -> {:ok, %AssumptionMade{}}
DriftReview.reject(drift_id, "reason") # -> {:ok, %AssumptionMade{}} (creates correction bead)
DriftReview.note(drift_id, "note text") # -> {:ok, %AssumptionMade{}}

Burst/wave lifecycle state machine.

Manager.start_wave() # -> cast, starts :idle -> :collecting
Manager.current_phase() # -> :idle | :collecting | :running | :draining | :done
Manager.get_state() # -> %Manager{}

Load pipeline definitions from YAML config files.

SeedLoader.load() # -> :ok (loads global + project pipelines)
SeedLoader.parse_yaml(path) # -> %{name => config}

Pipeline mutation grounding with timeout/drift.

GroundingQueue.submit(attrs) # -> {:ok, entry_id}
GroundingQueue.respond(entry_id, response) # -> :ok | {:error, reason}
GroundingQueue.await(entry_id, timeout) # -> {:approved, value} | {:rejected, reason} | {:drifted, value}
GroundingQueue.list_pending(opts) # -> {:ok, [entry]}

YAML storage for playbook rules.

Playbook.load(project_root) # -> {:ok, [%PlaybookRule{}]}
Playbook.save(project_root, rules) # -> :ok
Playbook.add_rule(project_root, rule) # -> :ok
Playbook.update_rule(project_root, id, updates) # -> :ok | {:error, :not_found}
Playbook.remove_rule(project_root, id) # -> :ok
Playbook.search(project_root, "query", opts) # -> [%PlaybookRule{}]
Playbook.load_merged(project_root) # -> {:ok, [%PlaybookRule{}]}

Confidence scoring and maturity management.

Confidence.decay(rule, now) # -> %PlaybookRule{} (time-decayed)
Confidence.apply_success(rule) # -> %PlaybookRule{} (+0.05)
Confidence.apply_failure(rule) # -> %PlaybookRule{} (-0.20)
Confidence.compute_maturity(rule) # -> :nascent | :established | :proven
Confidence.promote_if_ready(rule) # -> %PlaybookRule{}
Confidence.demote_if_warranted(rule) # -> %PlaybookRule{}
Confidence.sweep(project_root) # -> {:ok, %{promoted:, demoted:, flagged:}}

Detection and inversion of harmful rules.

AntiPattern.detect?(rule) # -> boolean (failure_count >= 3, > 2x successes)
AntiPattern.invert(rule) # -> {:ok, %PlaybookRule{text: "AVOID: ..."}}
AntiPattern.scan(project_root) # -> {:ok, [%{original:, proposed_inversion:}]}
AntiPattern.apply_inversions(root, proposals) # -> {:ok, %{deprecated:, inverted:}}

Memory context injection for agent prompts.

Context.select_rules(root, labels, type) # -> %{rules:, anti_patterns:, total_score:, token_count:}
Context.format_for_prompt(context) # -> String.t() (markdown formatted)
Context.track_injection(agent_id, rule_ids, bead_id) # -> :ok
Context.record_outcome(root, agent_id, bead_id, :success | :failure) # -> :ok

Thompson sampling ranking for the skill catalog.

Ranking.rank(skills) # -> [%Skill{}] (Thompson sampling, non-deterministic)
Ranking.rank(skills, limit) # -> top N skills
Ranking.rank_by_mean(skills) # -> [%Skill{}] (deterministic)
Ranking.rank_by_ucb(skills) # -> [%Skill{}] (Upper Confidence Bound)
Ranking.search_ranked(root, query, opts) # -> [%Skill{}]
Ranking.record_feedback(root, skill_id, :success | :failure) # -> :ok

Three-tier risk classification for shell commands.

CommandGuard.classify(command) # -> :safe | :caution | :danger
CommandGuard.danger?(command) # -> boolean
CommandGuard.caution?(command) # -> boolean

Experience-based safety escalation.

TraumaGuard.match(command, records) # -> {:match, record} | :no_match
TraumaGuard.match_all(command, records) # -> [%TraumaRecord{}]
TraumaGuard.classify_with_trauma(command, records) # -> {classification, context}

Cooperative file locking.

LeaseManager.acquire(path, agent_id, :exclusive) # -> :ok | {:conflict, info}
LeaseManager.release(path, agent_id) # -> :ok
LeaseManager.renew(path, agent_id) # -> :ok | {:error, :not_found}
LeaseManager.leases_for_agent(agent_id) # -> [path]
LeaseManager.lease_holder(path) # -> {:ok, lease} | :none
LeaseManager.all_leases() # -> [lease]
LeaseManager.clear_all() # -> :ok

Post-burst reflection pipeline orchestrator.

Reflection.Pipeline.run(project_root, burst_id, opts)
# -> {:ok, %{diary_entries:, deltas:, evaluated:, curation:, errors:}}
Reflection.Pipeline.run_manual(project_root, burst_id, opts)
# Always re-runs (force: true)
Reflection.Pipeline.format_result(result)
# -> String.t() (human-readable summary)

The pipeline runs 6 stages: load transcripts -> extract diary entries -> propose playbook deltas -> validate evidence -> curate accepted deltas -> save results. Each stage is try/rescue wrapped — failures produce partial results, not crashes. Results are cached per burst_id for idempotency.

Registry-based pub/sub.

Event.subscribe(topic) # -> {:ok, pid}
Event.unsubscribe(topic) # -> :ok
Event.broadcast(topic, event) # -> :ok
Event.subscribers(topic) # -> [pid]

Raw terminal mode and ANSI rendering primitives.

Terminal.enter_raw_mode() # -> :ok
Terminal.exit_raw_mode() # -> :ok
Terminal.with_raw_mode(fun) # Execute with automatic cleanup
Terminal.dimensions() # -> {rows, cols}
Terminal.render(cells) # -> iodata
Terminal.write(iodata) # -> :ok

Mode switching and state management.

manager = ModeManager.new(term_size)
ModeManager.current_mode(manager) # -> :overview
ModeManager.switch_to(manager, :focus) # -> updated manager
ModeManager.handle_key(manager, event) # -> {:ok, manager} | :quit
ModeManager.handle_events(manager, events) # -> manager
ModeManager.render(manager) # -> [Terminal.cell()]