Streaming Security: Code vs Reality
Token-by-token stream interception, PII severing, SIEM export, and tenant isolation — grounded in source code.
The problem with "AI observability"
Most AI gateways scan the request before forwarding it. They log the response after it's delivered. The model's output — the part that actually reaches your users — travels through an unmonitored pipe.
BrainstormRouter intercepts the streaming response token-by-token. If the model outputs PII, violates a governance rule, or exceeds output limits, the stream is severed before the client sees the offending content.
This isn't a feature toggle. It's the architecture.
Code vs Reality: PII stream severing
What you send
A standard OpenAI SDK request through BrainstormRouter:
from openai import OpenAI
client = OpenAI(
base_url="https://api.brainstormrouter.com/v1",
api_key="br_live_...",
)
stream = client.chat.completions.create(
model="anthropic/claude-sonnet-4",
messages=[{"role": "user", "content": "Summarize the customer file for Acme Corp."}],
stream=True,
)
for chunk in stream:
print(chunk.choices[0].delta.content, end="")
What happens inside
The StreamingGuardrailEvaluator (src/security/streaming-guardrails.ts) buffers tokens and evaluates them in sliding windows:
// From streaming-guardrails.ts:122-197 — processToken()
async processToken(token: string): Promise<string> {
if (!this.config.enabled || this.truncated) {
return this.truncated ? "" : token;
}
this.accumulated += token;
this.buffer.push(token);
// Only evaluate when buffer reaches window size or at sentence boundaries
if (this.buffer.length < this.config.bufferWindowSize && !isSentenceBoundary(token)) {
return ""; // Buffer more tokens before forwarding
}
// Run evaluations on the buffered window
const result = await this.evaluateWindow();
if (result.action === "truncate") {
this.truncated = true;
// All future tokens return "" — stream is severed
return "";
}
if (result.action === "redact") {
const buffered = this.flushBuffer();
return this.redactContent(buffered, result.reason ?? "");
}
return this.flushBuffer();
}
The evaluation chain runs in priority order (streaming-guardrails.ts:250-273):
- Governance rules — tenant-specific keyword enforcement (
<1ms, deterministic) - Blocked content patterns — regex matching on buffer + accumulated text
- PII scanning — email, phone, SSN, credit card, IP address detection
When PII is detected mid-stream, the evaluator records a verdict and severs the stream. The client receives a clean truncation — no partial PII, no error, just a stopped stream with a guardrail header explaining why.
What the client receives
If the model starts outputting "The customer contact is john.doe@acme.com and their SSN is 123-45-6789":
Content-Type: text/event-stream
X-BR-Guardian-Status: on
X-BR-Guardrail-Action: truncate
X-BR-Guardrail-Reason: PII detected: email, ssn
data: {"choices":[{"delta":{"content":"The customer contact is"}}]}
data: {"choices":[{"delta":{"content":""}}]}
data: [DONE]
The stream stops. The email and SSN never reach the client. The verdict is recorded for audit.
PII air gap: scrub → tokenize → rehydrate
For scenarios where you need to send data to external services without exposing PII, BrainstormRouter provides a reversible air gap (src/security/pii-airgap.ts):
// From pii-airgap.ts:35-63
const { scrubbed, tokens } = await scrubAndTokenize(
"Contact john@acme.com or call 555-0123",
scanner,
);
// scrubbed: "Contact [PII:email:a1b2c3d4] or call [PII:phone_us:e5f6g7h8]"
// tokens: [{token: "[PII:email:a1b2c3d4]", originalValue: "john@acme.com", ...}]
// After external processing:
const restored = rehydrate(externalResult, tokens);
// PII is restored from the in-memory token map — never sent externally
The token map stays in memory. The external service only sees [PII:email:a1b2c3d4] placeholders. On return, the original values are rehydrated from the map.
SIEM export: CEF + ECS JSON
Every guardrail verdict generates a structured security event exportable to your SIEM (src/security/siem-export.ts):
CEF format (Splunk, ArcSight, QRadar)
CEF:0|BrainstormRouter|BrainstormRouter|1.0|guardrail.pii_detected|guardrail.pii_detected|8|rt=1709251200000 suser=br_live_abc123 suid=api_key dst=anthropic/claude-sonnet-4 act=truncate msg={"matchCount":2,"types":["email","ssn"]}
ECS JSON format (Elastic, Datadog, custom)
{
"@timestamp": "2026-02-28T12:00:00.000Z",
"event.kind": "event",
"event.category": "process",
"event.type": "guardrail.pii_detected",
"event.action": "truncate",
"event.severity": 8,
"event.outcome": "success",
"user.id": "br_live_abc123",
"user.type": "api_key",
"destination.address": "anthropic/claude-sonnet-4",
"observer.vendor": "BrainstormRouter",
"observer.product": "BrainstormRouter"
}
The toSiemJson() function follows Elastic Common Schema conventions. The toCef() function outputs ArcSight-compatible CEF strings. Both are available via batch export with severity filtering.
Governance enforcement on streaming chunks
Tenant governance rules are enforced deterministically on every streaming chunk (src/security/governance-validator.ts):
// From governance-validator.ts:186-251 — validateChunkGovernance()
// Rule: "Never mention competitors"
// If the model outputs "Our competitor Acme offers..."
// → Governance violation [s1]: output contains "competitor"
// → Stream replaced with: [REDACTED PER TENANT GOVERNANCE — rule s1]
Governance validation is <1ms, zero external calls, deterministic keyword matching with synonym expansion. Rules are parsed from the tenant's memory context and enforced on every buffered window.
Tenant isolation: don't trust our marketing
Every database query runs inside a tenant-scoped transaction (src/db/tenant-context.ts):
// From tenant-context.ts:27-37 — withTenant()
export async function withTenant<T>(
db: NodePgDatabase,
tenantId: string,
callback: (tx: AnyTransaction) => Promise<T>,
): Promise<T> {
return db.transaction(async (tx) => {
// SET LOCAL scopes to current transaction — auto-resets on COMMIT/ROLLBACK
await tx.execute(sql`SELECT set_config('app.current_tenant', ${tenantId}, true)`);
return callback(tx);
});
}
The RLS policies are defined in Drizzle schema, not documentation (src/db/schema/tenants.ts:100):
// From tenants.ts:100 — the actual RLS policy
const tenantRlsUsing = sql`tenant_id = current_setting('app.current_tenant')::uuid`;
// Applied to every tenant-scoped table:
pgPolicy("tpk_select", {
for: "select",
using: tenantRlsUsing,
}),
SET LOCAL is transaction-scoped — it auto-resets on COMMIT/ROLLBACK. With PgBouncer transaction-level pooling, there is zero risk of tenant context leaking between connections.
Egress allowlist
Optional per-service domain restrictions (src/security/egress-allowlist.ts):
// From egress-allowlist.ts:19-54
checkEgressDomain("https://api.openai.com/v1/completions", [
"api.openai.com",
"api.anthropic.com",
"*.googleapis.com",
]);
// → { allowed: true }
checkEgressDomain("https://evil.example.com/exfil", ["api.openai.com", "api.anthropic.com"]);
// → { allowed: false, reason: "Domain evil.example.com not in egress allowlist" }
Wildcard subdomain matching is supported (*.googleapis.com matches generativelanguage.googleapis.com). When no allowlist is set, all domains are permitted.
The security pipeline at a glance
%%{init: {'theme': 'dark', 'themeVariables': {'primaryColor': '#d97706', 'lineColor': '#9494a8', 'primaryTextColor': '#e8e8ee'}}}%%
flowchart TB
Req([Client Request]) --> Auth["Auth + Tenant Context\nwithTenant() → SET LOCAL"]
Auth --> Egress{"Egress Allowlist\ncheckEgressDomain()"}
Egress -->|Blocked| Reject1[403 Egress Denied]
Egress -->|Allowed| Guardian["Guardian Pre-Request\nCost prediction + PII scan"]
Guardian --> Provider["Provider Call\nStreaming response"]
Provider --> SGE["StreamingGuardrailEvaluator\nToken-by-token buffer"]
SGE --> Gov{"Governance\nvalidateChunkGovernance()"}
Gov -->|Violation| Replace["[REDACTED PER TENANT GOVERNANCE]"]
Gov -->|Clean| PII{"PII Scanner\nBuiltinPiiScanner.scan()"}
PII -->|Detected| Action{"Action?"}
Action -->|truncate| Sever["Stream Severed\ntruncated = true"]
Action -->|redact| Redact["[REDACTED:email]\n[REDACTED:ssn]"]
PII -->|Clean| Forward["Forward to Client"]
Redact --> SIEM
Sever --> SIEM["SIEM Event\ntoCef() / toSiemJson()"]
Forward --> Client([Client Response])
Replace --> SIEM
style Reject1 fill:#7f1d1d
style Sever fill:#7f1d1d
style Redact fill:#4a3728
style Replace fill:#4a3728
What this means for your CISO
| Question | Answer |
|---|---|
| Can PII reach our users via model output? | No. PII is detected and severed/redacted in the streaming buffer before forwarding. |
| Where are the security events? | Structured CEF or ECS JSON, exportable to any SIEM. |
| Is tenant data isolated? | Yes. PostgreSQL RLS with SET LOCAL transaction scoping. |
| Can we restrict outbound domains? | Yes. Per-service egress allowlists with wildcard support. |
| What's the latency overhead? | Governance: <1ms. PII scanning: <5ms. Guardian: <5ms p95. |
| Can the model bypass the guardrails? | No. The gateway controls what gets forwarded downstream. The model generates; BrainstormRouter decides what the client sees. |