Zero Dropped Events: How We Built Resilient Retry Queues for a Large-Scale Agent OS
Here's a pattern that exists in almost every Python async codebase:
asyncio.create_task(analytics_client.post(url, json=outcome))
Fire and forget. If the analytics service is down, the task raises an exception, nobody catches it, and the outcome record vanishes. The user who just gave you a thumbs-down on a bad response? That feedback never reaches the training loop. The dark factory keeps producing the same bad outputs because it never learned from the correction.
We found 14 of these in AitherOS. Not in obscure background jobs — in the critical user-facing path. Registration, feedback, artifact delivery, conversation indexing, security badge events. Every one of them wrapped in except Exception: pass.
This is the story of how we audited, categorized, and fixed every one of them.
The Audit
We started with two questions: What happens to user-facing operations when a downstream service is unavailable? And what visibility do we have into dropped events?
The answer to the second question was: none. Our nervous system event bus is in-memory pub/sub. It has a 20-event ring buffer. No persistence. No retry. No dead letter. If a subscriber fails, the event is logged at DEBUG level and discarded.
We had three ad-hoc JSONL disk queues scattered around the codebase -- each hand-rolled with the same 2000-line cap, the same threading lock, the same rotation logic. Three copies of the same queue, none of them reusable, none of them observable.
The audit of user-facing operations was worse.
The 14 Vulnerability Points
Every vulnerability followed the same pattern: asyncio.create_task() with no tracking, wrapped in a broad exception handler that swallows the error.
Critical (Event Loss)
1. User Feedback -- When a user clicks thumbs-down on a chat response, the orchestrator POSTs the outcome record to the analytics service. If it returns a non-200, we log a warning. If it throws a connection error, we catch it and move on. The user's feedback is lost. The dark factory learning loop never gets the signal.
# Before
async with httpx.AsyncClient(timeout=5.0) as client:
resp = await client.post(analytics_url, json=outcome)
if resp.status_code != 200:
logger.warning(f"Analytics ingestion failed: {resp.status_code}")
2. Learning Capture -- After every chat response, the chat system fires a learning record to the learning engine for training signal extraction. Fire-and-forget task. If it fails:
except Exception as err:
log.debug(f"Learning capture skipped (non-fatal): {err}")
DEBUG level. In production, you would never see this unless you explicitly set the log level. The dark factory has no idea it's missing training data.
3. Conversation Indexing -- Two fire-and-forget tasks for indexing and crystallizing the exchange. If the conversation store is slow or the database is locked, both tasks fail silently. The user's conversation isn't indexed. Long-context memory breaks.
4. Artifact Delivery — When an agent generates images or documents, they're delivered via a fire-and-forget task. If the delivery pipeline fails, the artifacts are gone. The user sees the response text but never gets the files.
5. Graph Write Buffer -- Knowledge graph writes from tool calls are flushed in a cleanup block. If the flush task fails, a broad exception handler swallows the error silently. The knowledge graph loses tool call results with no trace.
High (Data Integrity)
6. Billing Provisioning -- When a user registers, we POST to the billing service to provision their account and API key. If the billing service is down, the user gets the wrong permission tier. They're stuck on "starter" even if their plan entitles them to more.
7. Security Badge Events -- Security-critical badge events (anomaly tracking, revocation monitoring) are POSTed to the security service with a 3-second timeout. On failure: a debug-level log. Security events silently dropped.
8. Mesh Key Persistence -- When an admin generates a mesh enrollment key for distributed node trust, the key is stored in-memory and then persisted to the analytics service. If the analytics service is down, the key exists only in RAM. Process restart = key lost = inter-node communication breaks.
9-14. Knowledge ingestion on session end, session lifecycle promotion, graph flush on session end — all the same pattern. Fire, forget, and hope.
The Design Decision: One Queue to Replace Them All
We considered three approaches:
Option A: Fix each site individually. Add retry loops to each of the 14 call sites. This is what got us into trouble in the first place. The three existing JSONL queues were each a bespoke implementation of the same concept. Adding 14 more would make the codebase unmaintainable.
Option B: Make the event bus persistent. Tempting, but wrong. The event bus handles 150+ event types across the nervous system. Adding persistence to every event would create massive write amplification. Most events (GPU status, mood updates, daydream thoughts) don't need durability. The ones that do are a specific subset of user-facing operations.
Option C: Build a single durable queue with per-queue configuration. SQLite WAL for persistence (we already use it for conversations, social, access control, expeditions, and packages -- it's battle-tested in our stack). Background worker for retry. Dead letter for exhausted events. Event bus integration for real-time visibility. Pain signals for health monitoring.
We went with Option C.
The Implementation
The core is a singleton SQLite WAL-backed event queue, implemented as a single-file module.
Schema
CREATE TABLE queued_events (
id TEXT PRIMARY KEY,
queue_name TEXT NOT NULL,
payload TEXT NOT NULL, -- JSON
status TEXT NOT NULL, -- pending | processing | dead_letter
attempts INTEGER DEFAULT 0,
max_attempts INTEGER DEFAULT 5,
next_retry_at REAL NOT NULL, -- epoch seconds
created_at REAL NOT NULL,
updated_at REAL NOT NULL,
idempotency_key TEXT -- UNIQUE index for dedup
);
CREATE TABLE dead_letter_events (
id TEXT PRIMARY KEY,
queue_name TEXT NOT NULL,
payload TEXT NOT NULL,
attempts INTEGER NOT NULL,
created_at REAL NOT NULL,
dead_at REAL NOT NULL,
last_error TEXT
);
Two tables, indexed for the hot path (status = 'pending' AND next_retry_at <= now). Dead letter events are preserved indefinitely for inspection and replay.
Per-Queue Configuration
Not every operation needs the same retry behavior. A feedback rating can retry 5 times over a few minutes. A user registration billing provisioning needs to retry 10 times over 10+ minutes because the billing service might be restarting.
Each queue is configured with its own maximum attempts, base delay, and delay cap. For example:
- Analytics ingestion: 5 attempts, 2s base delay
- Learning capture: 3 attempts, 1s base delay
- Identity provisioning: 10 attempts, 5s base, 600s max (for long outages)
- Feedback: 5 attempts, 2s base delay
- Mesh keys: 10 attempts, 5s base, 600s max
- Security notifications: 5 attempts, 3s base delay
11 queues total, each tuned for its specific failure mode.
Retry delay is exponential: base * 2^attempt with 10% jitter, capped at the max delay. A feedback event retries at 2s, 4s, 8s, 16s, 32s. A billing provisioning retries at 5s, 10s, 20s, 40s, 80s, 160s, 300s, 300s, 300s, 300s -- almost 20 minutes of attempts before giving up.
The API
Replacing a fire-and-forget pattern is a one-line change:
# Before: fire-and-forget (event lost on failure)
asyncio.create_task(deliver_artifacts(turns=turns, session_id=sid))
# After: durable queue (survives service outages)
await queue.enqueue("artifact_delivery", {
"turns": turns,
"session_id": sid,
"prompt": request.message,
})
For HTTP calls (the majority of our integration points), there's a shortcut:
await queue.enqueue_http(
"feedback", "POST",
analytics_url,
body=outcome,
idempotency_key=f"feedback_{feedback_id}",
)
The idempotency key prevents duplicate processing when a retry succeeds but the response is lost. The billing service doesn't provision the same user twice. Analytics doesn't double-count the same feedback.
Background Worker
A background asyncio task polls every 5 seconds:
- Clean up stale
processingevents (stuck > 5 minutes — process crash recovery) - Fetch pending events where
next_retry_at <= now - Dispatch to registered handlers
- On success: delete from queue
- On failure: increment attempts, schedule retry with backoff
- On exhaustion: move to dead letter table, emit pain signal
The worker is started during system initialization and stopped cleanly during shutdown. Every queue has a registered handler -- HTTP-typed events use a built-in handler that POSTs to the specified URL, while internal operations (conversation indexing, graph writes, artifact delivery) have custom async handlers.
The Visibility Layer
A durable queue without observability is just a different place to lose events. We built three layers of visibility:
Event Bus Integration
Six new event types in the nervous system:
- Enqueued -- Event accepted into queue
- Retry -- Retry attempt N scheduled
- Completed -- Successfully processed
- Dead letter -- Exhausted retries
- Depth warning -- Queue backing up
- Replayed -- Dead letter replayed
Any service subscribed to the event bus sees queue operations in real-time. The monitoring dashboard picks them up for visualization. The health service sees them for alerting.
Pulse Pain Signals
When a queue crosses health thresholds, it emits reliability pain signals to the health service:
- Queue depth > 100: A downstream service is probably down. Severity scales with depth.
- Dead letter count > 10: Events are being permanently lost. Severity 0.7+.
These pain signals feed into the Dark Factory's self-healing loop. The awareness system sees the pain, dispatches an investigation agent, and if the service can be restarted, the queue drains naturally when it comes back.
REST Endpoints
Five REST endpoints for dashboards and operational tooling:
- Queue stats -- Comprehensive statistics (depths, dead letters, runtime metrics)
- Dead letter inspection -- View dead letter events, filterable by queue
- Dead letter replay -- Move a dead letter back to pending for reprocessing
- Dead letter purge -- Clear dead letter events
- Health check -- Quick status (healthy/degraded + specific issues)
The health endpoint returns structured issue descriptions:
{
"status": "degraded",
"total_pending": 47,
"total_dead_letter": 12,
"issues": [
"analytics_ingest: 12 dead letters",
"identity_provision: 35 pending (backed up)"
]
}
A web dashboard proxy forwards these to the frontend for visualization.
The Wiring: Every Fallback is Graceful
Every integration point follows the same pattern: try the queue first, fall back to the old fire-and-forget behavior if the queue is unavailable. This means the queue is additive -- if the queue system itself has a bug or fails to initialize, the system degrades to exactly the behavior it had before.
try:
queue = get_queue()
await queue.enqueue_http("feedback", "POST", url, body=outcome)
except Exception:
# Fallback: direct POST (old behavior)
async with httpx.AsyncClient(timeout=5.0) as client:
resp = await client.post(url, json=outcome)
This is important for a system that's already in production. We didn't want to introduce a new single point of failure. The queue makes things better when it's running, and changes nothing when it's not.
What We Didn't Build
We didn't make the event bus persistent. The temptation was real. But the event bus processes hundreds of events per second (GPU status, affect changes, daydream thoughts, service heartbeats). Adding write-ahead logging to all of them would be a performance disaster for a system that's designed for real-time context injection, not durable messaging.
We didn't build a distributed queue. AitherOS runs on a single node (with mesh distribution as an overlay). Redis, Kafka, RabbitMQ -- all great for multi-node systems, but they add operational complexity we don't need. SQLite WAL gives us ACID durability, concurrent reads, and zero-config deployment. When we need distributed queuing, we'll use the existing cross-node communication layer.
We didn't replace the existing disk queues. A few services already had hand-rolled JSONL queues that work. They're not broken. Migrating them to the new durable queue is a future cleanup task, not a prerequisite for fixing the 14 user-facing vulnerabilities.
The Numbers
65 new tests covering: schema creation, WAL mode verification, enqueue/fetch operations, idempotency dedup, retry scheduling, dead letter management, handler dispatch (sync and async), the built-in HTTP handler, runtime metrics, health thresholds, event bus integration, worker lifecycle, singleton behavior, multi-queue independence, concurrent safety, and the API router.
693 regression tests across expedition manager (250), content production (93), agent forge (82), dark factory gaps (137), dark factory activation (67), dark factory final gaps (21), and six pillars (43) — all passing.
11 predefined queues. 8 files modified. 3 new files created. Zero new dependencies.
The Takeaway
The pattern of asyncio.create_task() plus except Exception: pass is seductive because it keeps the happy path fast and clean. But in a distributed system with dozens of services, "non-fatal" doesn't mean "unimportant." A dropped feedback rating means the dark factory can't learn. A dropped billing provision means a user is stuck on the wrong tier. A dropped security event means an anomaly goes undetected.
The fix isn't to make everything synchronous and block on every downstream call. The fix is to make the async path durable: accept the event, persist it, and process it when the downstream service is ready. If it's never ready, put it somewhere visible so a human (or another agent) can investigate.
Every event in AitherOS now has a path to completion or a path to visibility. There is no path to silence.