Memory Extraction Queue

Postgres-backed durable job queue with FOR UPDATE SKIP LOCKED for concurrent memory extraction.

Why a durable queue

When a completion response contains useful context (facts, preferences, instructions), BrainstormRouter extracts it into persistent memory — but not synchronously. Extraction involves an LLM call (the Pi Runner), which is too slow for the hot path.

Instead, the gateway enqueues an extraction job and returns the response immediately. A background worker claims and processes jobs asynchronously.

This is the same pattern used by Stripe for webhook delivery, GitHub for CI triggers, and most payment processors. The critical difference: we use Postgres itself as the queue (no Redis, no RabbitMQ, no SQS), because the data is already in Postgres and we need transactional guarantees with the memory tables.

Architecture

%%{init: {'theme': 'dark', 'themeVariables': {'primaryColor': '#d97706', 'lineColor': '#9494a8', 'primaryTextColor': '#e8e8ee'}}}%%
sequenceDiagram
    participant API as Completions Route
    participant Q as Extraction Queue<br/>(Postgres)
    participant W as Worker
    participant Pi as Pi Runner
    participant RMM as Memory Store<br/>(pgvector)

    API->>Q: enqueue(tenantId, payload)
    Note over Q: INSERT ... ON CONFLICT DO NOTHING<br/>(idempotency key = requestId)

    loop Every 5 seconds
        W->>Q: claimBatch(limit=10)
        Note over Q: SELECT ... FOR UPDATE SKIP LOCKED
        Q-->>W: [{id, tenantId, payload}, ...]

        loop Each job
            W->>Pi: Extract memories from payload
            Pi-->>W: [memory_1, memory_2, ...]
            W->>RMM: Store memories (pgvector)
            W->>Q: markComplete(id)
        end
    end

The schema

From src/db/schema/memory-extraction.ts:

export const memoryExtractionQueue = pgTable("memory_extraction_queue", {
  id: uuid("id").primaryKey().defaultRandom(),
  tenantId: uuid("tenant_id")
    .notNull()
    .references(() => tenants.id),
  userKey: text("user_key").notNull(),
  transactionPayload: jsonb("transaction_payload").notNull(),
  status: text("status").notNull().default("pending"), // pending | processing | completed | failed
  error: text("error"),
  createdAt: timestamp("created_at", { withTimezone: true }).notNull().defaultNow(),
  updatedAt: timestamp("updated_at", { withTimezone: true }).notNull().defaultNow(),
  lockedAt: timestamp("locked_at", { withTimezone: true }),
  attemptCount: integer("attempt_count").notNull().default(0),
  nextRunAt: timestamp("next_run_at", { withTimezone: true }).notNull().defaultNow(),
  idempotencyKey: text("idempotency_key").notNull(),
});

Three purpose-built indexes make this performant:

IndexFilterPurpose
idx_meq_pendingWHERE status = 'pending' on next_run_atFast pending job lookup
idx_meq_stale_locksWHERE status = 'processing' on locked_atStale job recovery
uq_meq_tenant_idempotencyUNIQUE on (tenant_id, idempotency_key)Deduplication

No RLS — by design

This table intentionally has no Row-Level Security. The worker must claim jobs across all tenants. RLS would block the withSystemContext pattern (which clears app.current_tenant to '').

Tenant isolation is enforced at the application layer: enqueue is scoped by the authenticated tenantId, and the worker processes each job in its own tenant context for downstream memory writes.

The critical query

From src/db/stores/memory-extraction-store.ts:69-84:

UPDATE memory_extraction_queue SET
  status = 'processing',
  locked_at = NOW(),
  attempt_count = attempt_count + 1,
  updated_at = NOW()
WHERE id IN (
  SELECT id FROM memory_extraction_queue
  WHERE status = 'pending'
    AND next_run_at <= $1
  ORDER BY next_run_at ASC, created_at ASC
  LIMIT $2
  FOR UPDATE SKIP LOCKED
)
RETURNING *

FOR UPDATE SKIP LOCKED is the key. It means:

  1. FOR UPDATE — lock the selected rows so no other worker can claim them
  2. SKIP LOCKED — if another worker already locked a row, skip it instead of waiting

This gives us safe concurrent processing without worker coordination. Two workers running simultaneously will never claim the same job. No distributed locks needed.

Failure handling

The store provides four lifecycle operations after claim:

OperationMethodEffect
SuccessmarkComplete(id)Sets status = 'completed', clears lock
Permanent failuremarkFailed(id, error)Sets status = 'failed', stores error message
Transient failurerequeue(id, nextRunAt)Resets to pending with backoff delay
Stale recoveryrecoverStaleJobs(staleBefore)Resets processing jobs locked too long

Stale job recovery catches worker crashes — if a job has been processing for longer than the stale threshold, it's automatically requeued.

Idempotency

Every job is keyed by (tenant_id, idempotency_key) with a unique index. The idempotency key is the requestId from the completion. If the same completion is processed twice (retry, duplicate webhook, etc.), the second enqueue is a no-op:

.onConflictDoNothing({
  target: [memoryExtractionQueue.tenantId, memoryExtractionQueue.idempotencyKey],
});