Memory Extraction Queue
Postgres-backed durable job queue with FOR UPDATE SKIP LOCKED for concurrent memory extraction.
Why a durable queue
When a completion response contains useful context (facts, preferences, instructions), BrainstormRouter extracts it into persistent memory — but not synchronously. Extraction involves an LLM call (the Pi Runner), which is too slow for the hot path.
Instead, the gateway enqueues an extraction job and returns the response immediately. A background worker claims and processes jobs asynchronously.
This is the same pattern used by Stripe for webhook delivery, GitHub for CI triggers, and most payment processors. The critical difference: we use Postgres itself as the queue (no Redis, no RabbitMQ, no SQS), because the data is already in Postgres and we need transactional guarantees with the memory tables.
Architecture
%%{init: {'theme': 'dark', 'themeVariables': {'primaryColor': '#d97706', 'lineColor': '#9494a8', 'primaryTextColor': '#e8e8ee'}}}%%
sequenceDiagram
participant API as Completions Route
participant Q as Extraction Queue<br/>(Postgres)
participant W as Worker
participant Pi as Pi Runner
participant RMM as Memory Store<br/>(pgvector)
API->>Q: enqueue(tenantId, payload)
Note over Q: INSERT ... ON CONFLICT DO NOTHING<br/>(idempotency key = requestId)
loop Every 5 seconds
W->>Q: claimBatch(limit=10)
Note over Q: SELECT ... FOR UPDATE SKIP LOCKED
Q-->>W: [{id, tenantId, payload}, ...]
loop Each job
W->>Pi: Extract memories from payload
Pi-->>W: [memory_1, memory_2, ...]
W->>RMM: Store memories (pgvector)
W->>Q: markComplete(id)
end
end
The schema
From src/db/schema/memory-extraction.ts:
export const memoryExtractionQueue = pgTable("memory_extraction_queue", {
id: uuid("id").primaryKey().defaultRandom(),
tenantId: uuid("tenant_id")
.notNull()
.references(() => tenants.id),
userKey: text("user_key").notNull(),
transactionPayload: jsonb("transaction_payload").notNull(),
status: text("status").notNull().default("pending"), // pending | processing | completed | failed
error: text("error"),
createdAt: timestamp("created_at", { withTimezone: true }).notNull().defaultNow(),
updatedAt: timestamp("updated_at", { withTimezone: true }).notNull().defaultNow(),
lockedAt: timestamp("locked_at", { withTimezone: true }),
attemptCount: integer("attempt_count").notNull().default(0),
nextRunAt: timestamp("next_run_at", { withTimezone: true }).notNull().defaultNow(),
idempotencyKey: text("idempotency_key").notNull(),
});
Three purpose-built indexes make this performant:
| Index | Filter | Purpose |
|---|---|---|
idx_meq_pending | WHERE status = 'pending' on next_run_at | Fast pending job lookup |
idx_meq_stale_locks | WHERE status = 'processing' on locked_at | Stale job recovery |
uq_meq_tenant_idempotency | UNIQUE on (tenant_id, idempotency_key) | Deduplication |
No RLS — by design
This table intentionally has no Row-Level Security. The worker must claim jobs across all tenants. RLS would block the withSystemContext pattern (which clears app.current_tenant to '').
Tenant isolation is enforced at the application layer: enqueue is scoped by the authenticated tenantId, and the worker processes each job in its own tenant context for downstream memory writes.
The critical query
From src/db/stores/memory-extraction-store.ts:69-84:
UPDATE memory_extraction_queue SET
status = 'processing',
locked_at = NOW(),
attempt_count = attempt_count + 1,
updated_at = NOW()
WHERE id IN (
SELECT id FROM memory_extraction_queue
WHERE status = 'pending'
AND next_run_at <= $1
ORDER BY next_run_at ASC, created_at ASC
LIMIT $2
FOR UPDATE SKIP LOCKED
)
RETURNING *
FOR UPDATE SKIP LOCKED is the key. It means:
FOR UPDATE— lock the selected rows so no other worker can claim themSKIP LOCKED— if another worker already locked a row, skip it instead of waiting
This gives us safe concurrent processing without worker coordination. Two workers running simultaneously will never claim the same job. No distributed locks needed.
Failure handling
The store provides four lifecycle operations after claim:
| Operation | Method | Effect |
|---|---|---|
| Success | markComplete(id) | Sets status = 'completed', clears lock |
| Permanent failure | markFailed(id, error) | Sets status = 'failed', stores error message |
| Transient failure | requeue(id, nextRunAt) | Resets to pending with backoff delay |
| Stale recovery | recoverStaleJobs(staleBefore) | Resets processing jobs locked too long |
Stale job recovery catches worker crashes — if a job has been processing for longer than the stale threshold, it's automatically requeued.
Idempotency
Every job is keyed by (tenant_id, idempotency_key) with a unique index. The idempotency key is the requestId from the completion. If the same completion is processed twice (retry, duplicate webhook, etc.), the second enqueue is a no-op:
.onConflictDoNothing({
target: [memoryExtractionQueue.tenantId, memoryExtractionQueue.idempotencyKey],
});