Streaming Security: Code vs Reality

Token-by-token stream interception, PII severing, SIEM export, and tenant isolation — grounded in source code.

The problem with "AI observability"

Most AI gateways scan the request before forwarding it. They log the response after it's delivered. The model's output — the part that actually reaches your users — travels through an unmonitored pipe.

BrainstormRouter intercepts the streaming response token-by-token. If the model outputs PII, violates a governance rule, or exceeds output limits, the stream is severed before the client sees the offending content.

This isn't a feature toggle. It's the architecture.

Code vs Reality: PII stream severing

What you send

A standard OpenAI SDK request through BrainstormRouter:

from openai import OpenAI

client = OpenAI(
    base_url="https://api.brainstormrouter.com/v1",
    api_key="br_live_...",
)

stream = client.chat.completions.create(
    model="anthropic/claude-sonnet-4",
    messages=[{"role": "user", "content": "Summarize the customer file for Acme Corp."}],
    stream=True,
)

for chunk in stream:
    print(chunk.choices[0].delta.content, end="")

What happens inside

The StreamingGuardrailEvaluator (src/security/streaming-guardrails.ts) buffers tokens and evaluates them in sliding windows:

// From streaming-guardrails.ts:122-197 — processToken()
async processToken(token: string): Promise<string> {
  if (!this.config.enabled || this.truncated) {
    return this.truncated ? "" : token;
  }

  this.accumulated += token;
  this.buffer.push(token);

  // Only evaluate when buffer reaches window size or at sentence boundaries
  if (this.buffer.length < this.config.bufferWindowSize && !isSentenceBoundary(token)) {
    return ""; // Buffer more tokens before forwarding
  }

  // Run evaluations on the buffered window
  const result = await this.evaluateWindow();

  if (result.action === "truncate") {
    this.truncated = true;
    // All future tokens return "" — stream is severed
    return "";
  }

  if (result.action === "redact") {
    const buffered = this.flushBuffer();
    return this.redactContent(buffered, result.reason ?? "");
  }

  return this.flushBuffer();
}

The evaluation chain runs in priority order (streaming-guardrails.ts:250-273):

  1. Governance rules — tenant-specific keyword enforcement (<1ms, deterministic)
  2. Blocked content patterns — regex matching on buffer + accumulated text
  3. PII scanning — email, phone, SSN, credit card, IP address detection

When PII is detected mid-stream, the evaluator records a verdict and severs the stream. The client receives a clean truncation — no partial PII, no error, just a stopped stream with a guardrail header explaining why.

What the client receives

If the model starts outputting "The customer contact is john.doe@acme.com and their SSN is 123-45-6789":

Content-Type: text/event-stream
X-BR-Guardian-Status: on
X-BR-Guardrail-Action: truncate
X-BR-Guardrail-Reason: PII detected: email, ssn

data: {"choices":[{"delta":{"content":"The customer contact is"}}]}

data: {"choices":[{"delta":{"content":""}}]}

data: [DONE]

The stream stops. The email and SSN never reach the client. The verdict is recorded for audit.

PII air gap: scrub → tokenize → rehydrate

For scenarios where you need to send data to external services without exposing PII, BrainstormRouter provides a reversible air gap (src/security/pii-airgap.ts):

// From pii-airgap.ts:35-63
const { scrubbed, tokens } = await scrubAndTokenize(
  "Contact john@acme.com or call 555-0123",
  scanner,
);
// scrubbed: "Contact [PII:email:a1b2c3d4] or call [PII:phone_us:e5f6g7h8]"
// tokens: [{token: "[PII:email:a1b2c3d4]", originalValue: "john@acme.com", ...}]

// After external processing:
const restored = rehydrate(externalResult, tokens);
// PII is restored from the in-memory token map — never sent externally

The token map stays in memory. The external service only sees [PII:email:a1b2c3d4] placeholders. On return, the original values are rehydrated from the map.

SIEM export: CEF + ECS JSON

Every guardrail verdict generates a structured security event exportable to your SIEM (src/security/siem-export.ts):

CEF format (Splunk, ArcSight, QRadar)

CEF:0|BrainstormRouter|BrainstormRouter|1.0|guardrail.pii_detected|guardrail.pii_detected|8|rt=1709251200000 suser=br_live_abc123 suid=api_key dst=anthropic/claude-sonnet-4 act=truncate msg={"matchCount":2,"types":["email","ssn"]}

ECS JSON format (Elastic, Datadog, custom)

{
  "@timestamp": "2026-02-28T12:00:00.000Z",
  "event.kind": "event",
  "event.category": "process",
  "event.type": "guardrail.pii_detected",
  "event.action": "truncate",
  "event.severity": 8,
  "event.outcome": "success",
  "user.id": "br_live_abc123",
  "user.type": "api_key",
  "destination.address": "anthropic/claude-sonnet-4",
  "observer.vendor": "BrainstormRouter",
  "observer.product": "BrainstormRouter"
}

The toSiemJson() function follows Elastic Common Schema conventions. The toCef() function outputs ArcSight-compatible CEF strings. Both are available via batch export with severity filtering.

Governance enforcement on streaming chunks

Tenant governance rules are enforced deterministically on every streaming chunk (src/security/governance-validator.ts):

// From governance-validator.ts:186-251 — validateChunkGovernance()
// Rule: "Never mention competitors"
// If the model outputs "Our competitor Acme offers..."
// → Governance violation [s1]: output contains "competitor"
// → Stream replaced with: [REDACTED PER TENANT GOVERNANCE — rule s1]

Governance validation is <1ms, zero external calls, deterministic keyword matching with synonym expansion. Rules are parsed from the tenant's memory context and enforced on every buffered window.

Tenant isolation: don't trust our marketing

Every database query runs inside a tenant-scoped transaction (src/db/tenant-context.ts):

// From tenant-context.ts:27-37 — withTenant()
export async function withTenant<T>(
  db: NodePgDatabase,
  tenantId: string,
  callback: (tx: AnyTransaction) => Promise<T>,
): Promise<T> {
  return db.transaction(async (tx) => {
    // SET LOCAL scopes to current transaction — auto-resets on COMMIT/ROLLBACK
    await tx.execute(sql`SELECT set_config('app.current_tenant', ${tenantId}, true)`);
    return callback(tx);
  });
}

The RLS policies are defined in Drizzle schema, not documentation (src/db/schema/tenants.ts:100):

// From tenants.ts:100 — the actual RLS policy
const tenantRlsUsing = sql`tenant_id = current_setting('app.current_tenant')::uuid`;

// Applied to every tenant-scoped table:
pgPolicy("tpk_select", {
  for: "select",
  using: tenantRlsUsing,
}),

SET LOCAL is transaction-scoped — it auto-resets on COMMIT/ROLLBACK. With PgBouncer transaction-level pooling, there is zero risk of tenant context leaking between connections.

Egress allowlist

Optional per-service domain restrictions (src/security/egress-allowlist.ts):

// From egress-allowlist.ts:19-54
checkEgressDomain("https://api.openai.com/v1/completions", [
  "api.openai.com",
  "api.anthropic.com",
  "*.googleapis.com",
]);
// → { allowed: true }

checkEgressDomain("https://evil.example.com/exfil", ["api.openai.com", "api.anthropic.com"]);
// → { allowed: false, reason: "Domain evil.example.com not in egress allowlist" }

Wildcard subdomain matching is supported (*.googleapis.com matches generativelanguage.googleapis.com). When no allowlist is set, all domains are permitted.

The security pipeline at a glance

%%{init: {'theme': 'dark', 'themeVariables': {'primaryColor': '#d97706', 'lineColor': '#9494a8', 'primaryTextColor': '#e8e8ee'}}}%%
flowchart TB
    Req([Client Request]) --> Auth["Auth + Tenant Context\nwithTenant() → SET LOCAL"]
    Auth --> Egress{"Egress Allowlist\ncheckEgressDomain()"}
    Egress -->|Blocked| Reject1[403 Egress Denied]
    Egress -->|Allowed| Guardian["Guardian Pre-Request\nCost prediction + PII scan"]
    Guardian --> Provider["Provider Call\nStreaming response"]
    Provider --> SGE["StreamingGuardrailEvaluator\nToken-by-token buffer"]

    SGE --> Gov{"Governance\nvalidateChunkGovernance()"}
    Gov -->|Violation| Replace["[REDACTED PER TENANT GOVERNANCE]"]
    Gov -->|Clean| PII{"PII Scanner\nBuiltinPiiScanner.scan()"}
    PII -->|Detected| Action{"Action?"}
    Action -->|truncate| Sever["Stream Severed\ntruncated = true"]
    Action -->|redact| Redact["[REDACTED:email]\n[REDACTED:ssn]"]
    PII -->|Clean| Forward["Forward to Client"]
    Redact --> SIEM
    Sever --> SIEM["SIEM Event\ntoCef() / toSiemJson()"]
    Forward --> Client([Client Response])
    Replace --> SIEM

    style Reject1 fill:#7f1d1d
    style Sever fill:#7f1d1d
    style Redact fill:#4a3728
    style Replace fill:#4a3728

What this means for your CISO

QuestionAnswer
Can PII reach our users via model output?No. PII is detected and severed/redacted in the streaming buffer before forwarding.
Where are the security events?Structured CEF or ECS JSON, exportable to any SIEM.
Is tenant data isolated?Yes. PostgreSQL RLS with SET LOCAL transaction scoping.
Can we restrict outbound domains?Yes. Per-service egress allowlists with wildcard support.
What's the latency overhead?Governance: <1ms. PII scanning: <5ms. Guardian: <5ms p95.
Can the model bypass the guardrails?No. The gateway controls what gets forwarded downstream. The model generates; BrainstormRouter decides what the client sees.