Skip to content

Event Topics

Annihilation uses a Registry-based PubSub system implemented in Annihilation.Event. The registry is started as the first child in the supervision tree under the name Annihilation.Event.Registry with :duplicate keys, allowing multiple processes to subscribe to the same topic.

Annihilation.Event.subscribe(topic) # Subscribe calling process
Annihilation.Event.unsubscribe(topic) # Unsubscribe calling process
Annihilation.Event.broadcast(topic, event) # Broadcast to all subscribers
Annihilation.Event.subscribers(topic) # List subscriber PIDs

Events arrive as standard Erlang messages:

{:event, topic, payload}
defmodule MySubscriber do
use GenServer
def init(state) do
Annihilation.Event.subscribe("agent:*")
Annihilation.Event.subscribe("burst:events")
{:ok, state}
end
def handle_info({:event, "burst:events", {:wave_complete, stats}}, state) do
Logger.info("Wave complete: #{inspect(stats)}")
{:noreply, state}
end
end
TopicPayloadPublisherDescription
"agent:#{id}"{:phase_change, old_phase, new_phase, state}Agent.ServerPhase machine transition
"agent:#{id}"{:delta, %Delta{}}Agent.ServerStreaming text/tool delta from LLM
"agent:#{id}"{:tool_results, [%ToolResult{}]}Agent.ServerBatch of tool execution results
"agent:#{id}"{:tool_update, call_id, partial}Agent.ServerStreaming tool output (progressive)
"agent:#{id}":doneAgent.ServerAgent completed its task
"agent:#{id}"{:error, reason}Agent.ServerAgent entered error state
TopicPayloadPublisherDescription
"agent:mail:#{id}"%{type: :new_message, from:, subject:, message_id:, priority:}Agent.MailboxNew direct message for agent
"agent:mail:broadcast"%{type: :broadcast_message, from:, subject:, message_id:}Agent.MailboxBroadcast message to all agents
TopicPayloadPublisherDescription
"tether:reaching"{:question_asked, %Question{}}QuestionQueuePsychonaut reaching for the tether
"tether:reaching"{:question_answered, %Question{}}QuestionQueueTether answered a question
"tether:reaching"{:question_timed_out, %Question{}}QuestionQueueQuestion timed out (2 min default)
"tether:reaching"{:late_answer, %Question{}, answer_text}QuestionQueueTether answered after timeout
TopicPayloadPublisherDescription
"tether:drifts"{:drift_created, %AssumptionMade{}}AssumptionsLedgerNew drift (assumption) recorded
"tether:drifts"{:drift_confirmed, %{drift_id:, agent_id:, text:, note:}}DriftReviewTether confirmed assumption was correct
"tether:drifts"{:drift_rejected, %{drift_id:, agent_id:, text:, reason:, correction_bead_id:}}DriftReviewTether rejected assumption, correction bead created
"tether:drifts"{:drift_noted, %{drift_id:, note:}}DriftReviewTether added note to drift
"tether:drifts"{:drift_superseded, %{drift_id:, question_id:, answer:, correction_bead_id:}}LateBeaconHandlerLate answer superseded the drift
"tether:drifts"{:pipeline_drift, %{entry:, assumption:, mutation_type:}}Pipeline.GroundingQueuePipeline mutation timed out and drifted
TopicPayloadPublisherDescription
"tether:questions"{:pipeline_mutation_submitted, entry}Pipeline.GroundingQueueNew pipeline mutation awaiting approval
"tether:questions"{:pipeline_mutation_responded, entry}Pipeline.GroundingQueueTether responded to pipeline mutation
TopicPayloadPublisherDescription
"burst:events"{:burst_start, %{burst_id:, wave_count:, started_at:}}Burst.ManagerNew burst began
"burst:events"{:burst_complete, %{burst_id:, succeeded:, failed:}}Burst.ManagerBurst finished (draining done)
"burst:events"{:wave_complete, %{wave_count:, burst_count:, total_beads:, succeeded:, failed:, duration_ms:}}Burst.ManagerWave finished (no more ready beads)
TopicPayloadPublisherDescription
"reflection:started"%{burst_id:}Reflection.PipelineReflection pipeline began
"reflection:extracting"%{burst_id:}Reflection.PipelineExtracting diary entries
"reflection:proposing"%{burst_id:}Reflection.PipelineProposing playbook deltas
"reflection:evaluating"%{burst_id:}Reflection.PipelineEvaluating deltas through evidence gate
"reflection:curating"%{burst_id:}Reflection.PipelineCurating accepted deltas
"reflection:completed"%{burst_id:, summary:}Reflection.PipelineReflection pipeline completed
"reflection:cached"%{burst_id:}Reflection.PipelineReturning cached reflection result
"reflection:error"%{stage:, error:}Reflection.PipelineError in a reflection stage

The event module is defined in lib/annihilation/event.ex:

defmodule Annihilation.Event do
@registry Annihilation.Event.Registry
def subscribe(topic) do
Registry.register(@registry, topic, [])
end
def unsubscribe(topic) do
Registry.unregister(@registry, topic)
end
def broadcast(topic, event) do
Registry.dispatch(@registry, topic, fn entries ->
for {pid, _} <- entries do
send(pid, {:event, topic, event})
end
end)
end
def subscribers(topic) do
Registry.lookup(@registry, topic)
|> Enum.map(fn {pid, _} -> pid end)
end
end

Key design decisions:

  • Uses OTP Registry with :duplicate keys (not :unique), allowing fan-out to multiple subscribers per topic.
  • No GenServer wrapper — the module is a stateless function interface to the Registry.
  • The registry is started directly in the supervision tree as {Registry, keys: :duplicate, name: Annihilation.Event.Registry}.
  • Events are fire-and-forget (send/2). There is no acknowledgment or back-pressure mechanism.