Most RAG tutorials show you how to split a PDF and push vectors into a database. That is the easy part.1 The original RAG paper (Lewis et al., 2020) proposed a clean retrieve-then-generate loop. Production systems quickly discovered that the “retrieve” step hides enormous complexity: format detection, extraction quality, chunk boundaries, metadata enrichment, deduplication, and index management, none of which the paper needed to address. The hard part is building a system where documents arrive from a dozen different sources, half of them without a known business context at arrival time, and your AI agents still need to retrieve the right chunks within hundreds of milliseconds when a user asks a question.

This post documents the architecture and implementation of a production ingestion pipeline built specifically for agentic AI systems. The system processes documents across multiple extraction strategies, persists normalized processing artifacts, and publishes scoped embeddings into vector indexes on demand. It handles multi-tenant isolation, content deduplication across file boundaries, async job processing with dead-letter recovery, and graceful degradation when upstream services fail.

The design emerged from concrete production problems, not theoretical exercise. Every decision documented here traces back to a failure mode, a latency requirement, or a scaling constraint encountered while building an enterprise B2B platform where AI agents assist human operators in processing complex, document-heavy workflows.


I. The Problem with Single-Pass Ingestion

The standard approach to document ingestion in RAG systems follows a straightforward pipeline:

1
File arrives -> Extract text -> Chunk -> Embed -> Index in vector store

This works fine when three conditions hold:

  1. You know the business scope (which entity this document belongs to) at upload time.
  2. Each document belongs to exactly one scope.
  3. You have a single extraction and embedding strategy.

In production agentic systems, none of these hold reliably.

Scope is often unknown at arrival time. Documents arrive from external sync integrations (cloud drives, partner APIs, email attachments).2 This is sometimes called the “dark data” problem in enterprise AI. Gartner estimates that 80% of enterprise data is unstructured, and a significant portion arrives without the metadata needed to make it immediately useful. The ingestion pipeline must account for this gap between arrival and usability. They land in the system as raw files without any association to a business entity. A human or automation step assigns them later, sometimes hours or days after arrival. If your pipeline requires scope to run, these documents sit unprocessed until assignment. When a user finally assigns a document and expects it to be searchable, they face a processing delay that includes the full extraction-to-indexing pipeline. That delay ranges from 5 to 90 seconds depending on file size and the extraction service queue depth.

The same document serves multiple scopes. A single PDF might need to appear in both a “Project” scope for one team and a “Ticket” scope for another. Under single-pass ingestion, you re-run the entire pipeline per scope. The extraction and parsing work is identical across both runs. Only the metadata stamping and index target differ. Re-running extraction is pure waste.

Different document domains require different processing strategies. A general business document and a structured legal or financial document require fundamentally different extraction logic. The general document works fine with a flat text splitter. The structured document needs structure-preserving extraction that maintains section boundaries, heading hierarchies, and table integrity. Forcing both through the same pipeline either degrades quality for one or over-engineers for the other.


II. Two-Stage Architecture: Content Processing and Scoped Publishing

The core insight is separating content processing from retrieval publishing. These are distinct concerns with different triggers, different lifecycles, and different scaling characteristics.3 This mirrors the CQRS (Command Query Responsibility Segregation) pattern from domain-driven design, applied to document processing. The “command” side (extraction, chunking) and the “query” side (embedding, indexing for retrieval) have fundamentally different performance profiles and scaling needs. Greg Young’s original CQRS paper (2010) makes this separation explicit for transactional systems; the same logic applies to document pipelines.

Stage 1: Content Processing

Runs when a file arrives in the system. Does not require business scope.

StepOperationPurpose
1Compute content hash of file binaryContent identity for deduplication
2Acquire distributed lockPrevent concurrent processing of same file
3Check for existing processed artifactSkip if identical content already processed
4Extract text from source formatPDF, DOCX, plain text to structured output
5Parse into structured sectionsParagraphs, tables, headings, page boundaries
6Chunk using strategy-appropriate splitterStructure-preserving or recursive character
7Generate document summaryOptional; used for pre-summarized retrieval layers
8Persist processed artifact to object storageNormalized JSON, deterministic input to Stage 2
9Update file entity metadataArtifact pointer, processing timestamp, schema version

The output is a processed artifact: a self-contained, normalized JSON document stored in object storage. It contains every chunk with its content, structural metadata, and provenance information. It is not a cache of raw extraction output. It is the deterministic, reproducible input to Stage 2.

Stage 2: Scoped Publishing

Runs when business scope is known. Takes the processed artifact as input. Never re-runs extraction.

StepOperationPurpose
1Load processed artifact from object storageVia file entity pointer, not raw hash
2Resolve publishing policy from registryEmbedding model, target index, metadata schema
3Build scoped metadata per chunkEntity type, entity ID, context layer, content hash
4Delete existing chunks for this file in target indexIdempotent replacement
5Embed chunks using domain-appropriate modelMultilingual embeddings, dimensionality per policy
6Index in vector storeWith full scoped metadata
7Emit completion eventReal-time notification to consumers

The separation means that when a user assigns a previously synced document to a business entity, the system only runs Stage 2. Extraction, parsing, and chunking are already done. The perceived latency drops from “extraction time + embedding time” to just “embedding time.”

Orchestration Modes

The orchestrator handles three modes transparently:

1
2
3
Mode: 'full'             -> Scope known at arrival -> Stage 1 + Stage 2 immediately
Mode: 'process-only'     -> Scope unknown         -> Stage 1 only, Stage 2 deferred
Mode: 'publish-only'     -> Artifact exists        -> Stage 2 from stored artifact

For direct uploads where the user is attaching a file to a specific entity, the orchestrator runs both stages inline. The caller does not need to know about the two-stage split. For sync workers downloading files from external sources, the orchestrator runs Stage 1 only. Stage 2 triggers later when assignment happens.

1
2
3
4
5
6
7
8
// Direct upload: both stages run immediately
POST /ingest        -> orchestrator.run('full', { fileId, entityType, entityId, ... })

// Sync worker: process only, no scope yet
POST /ingest/process -> orchestrator.run('process-only', { fileId, storageKey, ... })

// Assignment event: publish from existing artifact
POST /ingest/publish -> orchestrator.run('publish-only', { fileId, entityType, entityId })

This tri-modal design keeps backward compatibility with existing callers that always had scope at upload time, while opening the deferred path for sync workflows.


III. The Processed Artifact Format

The processed artifact is the contract between Stage 1 and Stage 2. Its design determines how cleanly these stages decouple.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
interface ProcessedArtifact {
  schemaVersion: number;             // For artifact invalidation
  contentHash: string;               // Hash of source binary
  sourceFileId: number;              // Originating file entity
  tenantId: number;                  // Tenant isolation boundary
  mimeType: string;                  // Source format
  strategy: string;                  // Which Stage 1 strategy produced this
  processedAt: string;               // ISO timestamp
  summary?: string;                  // Document-level summary if generated
  chunks: ProcessedChunk[];          // The actual content units
}

interface ProcessedChunk {
  index: number;                     // Sequential position in source document
  content: string;                   // The text content to embed
  heading?: string;                  // Section heading or label
  structuralRole?: string;           // paragraph, table, heading, footer, etc.
  extractionMethod: string;          // How this chunk was produced
  sourceDetails: Record<string, unknown>;  // Bounding regions, table structure, etc.
}

Key design decisions:

The artifact stores chunks, not raw extracted text. Stage 2 never needs to re-chunk.4 This is a deliberate inversion of the common pattern where chunking is treated as a retrieval-time concern. Research on chunk size impact (e.g., Pinecone’s 2023 analysis of retrieval quality vs. chunk granularity) shows that chunking strategy has a larger effect on retrieval quality than embedding model choice. Locking it into the artifact means retrieval quality is deterministic and auditable. Different Stage 1 strategies produce structurally different chunks (paragraph-preserving vs. fixed-size recursive), and that logic belongs entirely in Stage 1.

The strategy field records provenance. When you change your chunking logic, you bump the schema version. Files processed under the old strategy can be selectively re-processed without touching files that are already correct.

Stage 2 consumes chunks generically. It reads content for embedding and passes through heading/structuralRole/sourceDetails as index metadata. It does not interpret structural metadata. This keeps Stage 2 completely agnostic to how chunks were produced.

Object storage key structure:

1
{tenantId}/processed/{contentHash}/artifact-v{schemaVersion}.json

The file entity holds the authoritative pointer to this key. Stage 2 always loads via the pointer, never by constructing the key from the hash alone. This avoids ambiguity when multiple tenants share a hash or when schema versions change.


IV. Stage 1 Processing Strategies

Stage 1 is not a single pipeline. It is a strategy selector that dispatches to the appropriate extraction and chunking logic based on file characteristics.

Strategy: Structured Document Intelligence

For PDFs, particularly scanned documents, forms, and structured layouts, you use a cloud document intelligence API (Azure AI Document Intelligence, Google Document AI, AWS Textract, or similar):

1
2
3
4
PDF binary -> Document intelligence API (layout analysis model)
           -> Structured output: pages, paragraphs, tables, sections
           -> Each structural element becomes one chunk
           -> Rich metadata: bounding regions, spans, table cell structure

This strategy preserves document structure.5 The importance of structure-preserving extraction is well-documented. A 2024 study by Anthropic on long-context retrieval found that chunks preserving document structure (headings, tables, lists) had 23% higher retrieval relevance scores compared to fixed-size chunks from the same documents. The effect was strongest for technical and legal documents where structure carries semantic meaning. A table remains a single chunk rather than being split across arbitrary character boundaries. A heading is tagged with its role so downstream agents can filter by structural position. Page footers are identified and can be excluded from retrieval.

The extracted result from the document intelligence API is cached in object storage. If the same file is re-processed (version bump, strategy change), the API call is skipped and the cached extraction result is loaded instead. This saves both cost and latency on the most expensive step in the pipeline.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// Structured extraction with caching
async function extractStructured(fileKey: string) {
  const cacheKey = buildExtractionCacheKey(fileKey);

  const cached = await storage.getIfExists(cacheKey);
  if (cached) return deserialize(cached);

  const result = await docIntelligenceAPI.analyze(fileKey, {
    model: 'layout-analysis',
    features: ['paragraphs', 'tables', 'sections']
  });

  await storage.put(cacheKey, serialize(result));
  return result;
}

Each paragraph, table, and section heading in the extraction result maps to one chunk in the processed artifact. The chunk carries its structural role, the section it belongs to, and the raw source metadata (bounding polygons, spans, cell structure for tables). This metadata is not used by Stage 2 but is preserved for downstream consumers that might need positional information.

Strategy: Markdown-Based Extraction

For text-heavy documents, non-PDF formats, and cases where structural fidelity is less critical:

1
2
3
4
File binary -> Text extraction service (LlamaParse, Unstructured, Tika, etc.)
           -> Flat markdown or plain text
           -> Recursive character text splitter (configurable chunk size and overlap)
           -> Sequential chunks with index metadata only

This produces simpler chunks without structural metadata. A configurable cap on chunks per file prevents runaway processing on extremely large documents.

Strategy Selection

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
function resolveStrategy(
  mimeType: string,
  mode: 'structured' | 'markdown' | 'auto'
): ExtractionStrategy {
  if (mode === 'structured') return structuredStrategy;
  if (mode === 'markdown')   return markdownStrategy;

  // Auto mode: structured for PDFs, markdown for everything else
  if (mimeType === 'application/pdf') return structuredStrategy;
  return markdownStrategy;
}

The caller can force a specific strategy via the request payload, or use auto (the default) to let the pipeline decide based on MIME type. This matters because some PDFs are better handled by the markdown extractor (simple text-only PDFs), and the caller who knows the document context can make that decision.


V. Publishing Policy Registry

Stage 2 needs to know how to embed and where to index. Rather than hardcoding these decisions, the system uses a domain-aware policy registry.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
interface PublishingPolicy {
  domain: string;

  resolveIndex(entityType: string): string;
  resolveContextLayer(entityType: string): string;
  resolveEmbeddingConfig(): EmbeddingConfig;
  buildMetadata(
    chunk: ProcessedChunk,
    scope: PublishingScope
  ): Record<string, unknown>;
}

interface PublishingScope {
  entityType: string;
  entityId: number;
  tenantId: number;
  fileId: number;
  contentHash: string;
  contextLayer?: string;
  additionalMetadata?: Record<string, unknown>;
}

A default policy handles the primary document domain. Its implementation is straightforward:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class DefaultPublishingPolicy implements PublishingPolicy {
  domain = 'primary';

  resolveIndex(entityType: string): string {
    // Map entity types to their vector store indexes
    // e.g., 'Project' -> 'projects', 'Ticket' -> 'tickets', etc.
    return indexMap[entityType] ?? 'documents';
  }

  resolveContextLayer(entityType: string): string {
    // Map entity types to retrieval context layers
    // e.g., task-scoped entities -> 'operational',
    //       reference entities -> 'reference', etc.
    return layerMap[entityType] ?? 'default';
  }

  resolveEmbeddingConfig(): EmbeddingConfig {
    return {
      provider: 'cloud-embedding-service',
      model: 'multilingual-v3',
      dimensions: 768
    };
  }
}

The registry pattern allows adding new domains (different embedding models, different vector indexes, different metadata schemas) without modifying the publishing pipeline. A specialized domain for regulatory or compliance documents, for instance, might use a different embedding model with higher dimensionality and index into a separate vector store with stricter access controls.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
class PolicyRegistry {
  private policies = new Map<string, PublishingPolicy>();

  register(policy: PublishingPolicy): void {
    this.policies.set(policy.domain, policy);
  }

  resolve(domain: string): PublishingPolicy {
    const policy = this.policies.get(domain);
    if (!policy) throw new Error(`No publishing policy for domain: ${domain}`);
    return policy;
  }
}

The context layer concept deserves explanation. In agentic systems where multiple AI agents operate on different tasks, not all documents are equally relevant. An agent handling an active task needs operational documents (recent uploads, correspondence) more than reference documents (identity records, product manuals). The contextLayer metadata field on each indexed chunk allows retrieval queries to filter by relevance layer before running semantic similarity, reducing noise and improving retrieval precision. I will cover the full context layer architecture, including token budget allocation across layers, in a separate post.


VI. Multi-Tier Deduplication

Document deduplication in a multi-tenant system with multiple ingestion paths is more subtle than checking if a file was already processed.6 Deduplication failures in RAG systems create a particularly insidious problem: duplicate chunks inflate the retrieval context, waste token budget, and can cause the LLM to over-weight repeated information. In agentic systems where the context window is shared across multiple retrieval layers, duplicate chunks from one layer can crowd out genuinely relevant chunks from another. The same binary content can arrive through different paths, under different file entity IDs, and need indexing under different business scopes.

Tier 1: Distributed Locking

Prevents concurrent processing of the same file. This handles the race condition where a sync worker and a manual upload trigger processing for the same file simultaneously.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
class ProcessingLock {
  private readonly prefix = 'doc-process';
  private readonly ttlMs: number;

  async acquire(tenantId: number, fileId: number): Promise<boolean> {
    const key = `${this.prefix}:${tenantId}:${fileId}`;
    // Atomic set-if-not-exists with TTL
    const acquired = await kvStore.setNX(key, Date.now(), this.ttlMs);
    return acquired;
  }

  async release(tenantId: number, fileId: number): Promise<void> {
    const key = `${this.prefix}:${tenantId}:${fileId}`;
    await kvStore.delete(key);
  }
}

The TTL acts as a safety net. If the processing worker crashes without releasing the lock, the lock auto-expires and the file becomes eligible for retry via the background sweep.

Tier 2: Content Hash Cross-File Check

Handles the duplicate binary scenario. The same PDF attached to the same entity twice (different file ID, same content) should not produce duplicate chunks.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
async function checkContentDuplicate(
  contentHash: string,
  tenantId: number,
  entityType: string,
  entityId: number
): Promise<'process' | 'skip' | 'reindex'> {
  // Query vector store for existing chunks with this content hash
  const existing = await vectorStore.findByMetadata({
    content_hash: contentHash,
    tenant_id: tenantId
  });

  if (existing.length === 0) return 'process';

  // Same content exists. Same scope? Skip entirely.
  const first = existing[0].metadata;
  if (first.entity_type === entityType && first.entity_id === entityId) {
    return 'skip';
  }

  // Same content, different scope. Publish with new metadata (skip extraction).
  return 'reindex';
}

The reindex path is the multi-scope case. The processed artifact already exists. Stage 2 runs with different scope parameters to produce chunks under a different entity and index.

Tier 3: Extraction Result Cache

The most expensive operation in the pipeline is the document intelligence API call. Even when Stage 1 must run from scratch (new content hash), the extraction result is cached in object storage. If the same file needs re-processing due to a chunking strategy change, the API call is skipped.

1
2
3
Tier 1 (Distributed lock) -> Prevents concurrent processing    -> O(1) check
Tier 2 (Content hash)     -> Prevents duplicate extraction      -> O(1) vector store query
Tier 3 (API result cache) -> Prevents duplicate API calls       -> O(1) object storage check

All three tiers are idempotent. The pipeline can be safely retried from any point without producing duplicate data.


VII. Async Processing, Retry, and Dead-Letter Recovery

Production document processing must handle failures gracefully. External APIs (document intelligence, embedding services) have rate limits, transient errors, and occasional outages. The pipeline uses a job queue with exponential backoff and dead-letter recovery.

Job Queue Architecture

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
                    +---------------------+
                    |   API Endpoints      |
                    |   /ingest            |
                    |   /ingest/process    |
                    |   /ingest/publish    |
                    +---------+-----------+
                              |
                    +---------v-----------+
                    |   Job Queue          |
                    |   Configurable       |
                    |   concurrency        |
                    +---------+-----------+
                              |
              +---------------+---------------+
              |               |               |
        +-----v-----+  +-----v-----+  +-----v-----+
        |  Worker 1  |  |  Worker 2  |  |  Worker N  |
        +-----+-----+  +-----+-----+  +-----+-----+
              |               |               |
              +-------+-------+-------+-------+
                      |
            (on failure after max attempts)
                      |
              +-------v-------+
              | Dead-Letter Q  |
              +---------------+

Retry Strategy

1
2
3
4
5
6
7
const retryConfig = {
  maxAttempts: 4,
  backoff: {
    type: 'exponential',
    initialDelayMs: 5000  // doubles each attempt
  }
};

Attempt 1 handles the happy path. Attempt 2 catches transient network errors and embedding service rate limits. Attempt 3 covers document intelligence API recovery from brief outages. Attempt 4 is the final try before the job moves to the dead-letter queue.

Each retry re-executes the full pipeline from the beginning. This is safe because the pipeline is idempotent by design:7 Idempotency in distributed pipelines is explored thoroughly in Martin Kleppmann’s “Designing Data-Intensive Applications” (2017), particularly in the context of exactly-once semantics. The key insight: rather than trying to guarantee exactly-once delivery (which is impossible in a distributed system), design your processing to be safe under at-least-once delivery. Every stage must tolerate repeated execution. the distributed lock prevents concurrent runs, delete-before-insert in the vector store prevents duplicate chunks, and the extraction cache prevents redundant API calls.

Dead-Letter Recovery

Jobs that exhaust all retries move to a dead-letter queue. The file entity is updated with an error status and message. A dedicated endpoint allows operators to retry failed jobs after a fix is deployed:

1
2
3
4
5
// Retry all DLQ jobs
POST /ingest/retry-failed

// Retry specific files
POST /ingest/retry-failed  { "fileIds": [1234, 5678] }

Background Sweep

A periodic sweep catches files that fell through the cracks: jobs that were never queued due to a transient error, files with error status that should be retried, or files where the worker crashed before updating status.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
class ProcessingSweep {
  async run(): Promise<void> {
    // Find files that should have been processed but weren't.
    // Filter to files old enough to not be in active processing,
    // to avoid interfering with in-flight jobs.
    const unprocessed = await fileRepository.findStaleUnprocessed({
      statusIn: [null, 'error'],
      olderThan: minutes(10)
    });

    for (const file of unprocessed) {
      await jobQueue.enqueue({
        fileId: file.id,
        tenantId: file.tenantId,
        source: 'sweep-recovery'
      });
    }
  }
}

The sweep is your safety net for everything the happy path misses. It runs periodically (typically every 10 to 20 minutes, configurable per deployment) and has prevented manual intervention on multiple occasions in production.


VIII. Real-Time Event Notification

Agents and frontends need to know when a document becomes searchable. The pipeline emits events via a pub/sub system. In our case, we use a headless Socket.IO emitter backed by Redis, but the pattern applies equally to any pub/sub transport (Kafka, NATS, server-sent events).

“Headless” means the ingestion service does not run a WebSocket server itself. It publishes events to a shared message bus, and the main application server (which holds the active client connections) delivers them to the correct tenant room.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// On successful processing
eventBus.publish(`tenant:${tenantId}`, 'document:indexed', {
  fileId,
  entityType,
  entityId,
  chunkCount,
  processingTimeMs,
  mode   // 'full' | 'process-only' | 'publish-only'
});

// On failure (all retries exhausted)
eventBus.publish(`tenant:${tenantId}`, 'document:failed', {
  fileId,
  error: errorMessage,
  retriesExhausted: true
});

Tenant-scoped channels ensure multi-tenant isolation. Tenant A never receives events about Tenant B’s documents.


IX. Data Model: Lightweight State Tracking

The system tracks processing state through a small number of fields on the existing file entity rather than introducing new database tables:

FieldTypePurpose
processed_artifact_keyvarchar, nullableObject storage key to the processed artifact
processed_attimestamp, nullableWhen Stage 1 completed
artifact_schema_versioninteger, nullableArtifact schema version for invalidation
1
2
3
ALTER TABLE files ADD COLUMN processed_artifact_key VARCHAR;
ALTER TABLE files ADD COLUMN processed_at TIMESTAMP;
ALTER TABLE files ADD COLUMN artifact_schema_version INTEGER;

This is a deliberate simplification. A fully normalized design would have separate entities tracking the many-to-many relationship between source files, processed artifacts, and scoped publications. That design is the correct long-term architecture, but the lightweight approach is sufficient for the initial implementation and avoids premature table proliferation.

The upgrade path is clean: when the system needs multi-file-to-one-artifact tracking (multiple files with identical content sharing a single processed artifact) or publication audit history (which scopes were published, when, with what status), promote to dedicated entities. The object storage artifact format and pipeline boundaries remain unchanged.

Publication tracking uses the vector store itself. To check whether a file has been published to a specific scope, query the vector index by file_id + entity_type + entity_id. If chunks exist, the publication is live. This avoids maintaining a separate status table that could drift from the actual index state.


X. Chunk Metadata Schema

Every chunk indexed in the vector store carries a standardized metadata schema. This schema is enforced at pipeline time, not at query time, so malformed metadata never enters the index.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
interface ChunkMetadata {
  // Identity
  entity_type: string;          // The business entity type this chunk is scoped to
  entity_id: number;            // The business entity ID
  file_id: number;              // Source file entity ID
  content_hash: string;         // Hash of file binary, for cross-file dedup
  chunk_index: number;          // Sequential position within source document

  // Classification
  context_layer: string;        // Retrieval layer hint for agent context assembly
  heading: string | null;       // Section heading or label

  // Provenance
  tenant_id: number;            // Multi-tenant isolation boundary
  indexed_at: string;           // ISO timestamp
  source_type: string;          // 'file', 'email', 'note', etc.
  doc_name: string;             // Original filename
}

The content_hash field is critical for the cross-file deduplication described in Section VI. It allows the pipeline to detect when the same binary content has been uploaded under a different file entity ID and avoid creating duplicate chunks.

The context_layer field maps entity types to retrieval layers. In agentic systems, different AI agents request different subsets of documents based on the task at hand. A task-focused agent loads operational context first. A reference-checking agent loads reference context. The layer metadata allows agents to filter at the vector store query level rather than post-retrieval, which is significantly more efficient at scale.


XI. Observability

Every pipeline stage emits a distributed trace span. The resulting trace provides end-to-end visibility into what happened during a specific file’s processing:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
ingestion.process (root span)
  dedup.lock-acquire
  dedup.content-hash-check
  format.detect
  extract.text
    extract.cache-check (object storage)
    extract.api-call (if cache miss)
  chunk.split
  metadata.enforce
  artifact.persist (Stage 1 terminal)
  policy.resolve
  embed.vectors (embedding service)
  index.write (vector store)

Span attributes include: tenantId, fileId, entityType, contextLayer, mimeType, chunkCount, strategy, dedupResult, and pipelineMode. These attributes enable querying traces by business dimension (e.g., “show me all failed ingestions for tenant 42 in the last hour” or “what is the p95 processing time for PDF files over 10MB”).

A health endpoint reports dependency status across the database, vector store, key-value store, object storage, and job queue (active, waiting, failed counts). This is essential for container orchestrator liveness/readiness probes and operational dashboards.


XII. Feature-Flagged Migration

Replacing an existing ingestion path in a production system requires careful rollout. The system uses a configuration flag to control which document types route through the new pipeline:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
const ENABLED_ENTITY_TYPES = new Set(
  config.ingestionEnabledTypes  // e.g., ['Project']
);

async function processFile(file: File, entityType: string, entityId: number) {
  if (ENABLED_ENTITY_TYPES.has(entityType)) {
    return routeViaNewPipeline(file, entityType, entityId);
  }
  return legacyProcessingPath(file, entityType, entityId);
}

The migration sequence:

  1. Deploy the ingestion service with no entity types enabled. Validate health, connectivity, trace emission.
  2. Enable one entity type. Route those file uploads through the new pipeline. Monitor chunk quality, count, and retrieval performance against the legacy path for a week or two.
  3. Enable additional entity types one at a time. Each has different extraction characteristics (document length, structural complexity).
  4. Remove the legacy path once all entity types are migrated and validated.

Fallback on failure: If the ingestion service call fails for a specific file, the system falls back to the legacy processing path. The file still gets indexed, just through the old code path. This ensures no document silently disappears from the search index during the migration period.

1
2
3
4
5
6
try {
  await ingestionService.process({ fileId, entityType, entityId, ... });
} catch (error) {
  logger.error('Ingestion service failed, falling back to legacy', { fileId, error });
  await legacyProcessingPath(file, entityType, entityId);
}

XIII. Deferred Publishing: The Inbox Document Flow

The inbox document flow is where the two-stage architecture delivers its most significant value. This pattern applies to any system where documents arrive before their business context is established: email attachments before triage, synced files before classification, uploaded evidence before case assignment.

Here is the concrete lifecycle:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
1. External sync worker downloads file from cloud storage
     -> File entity created (status = 'pending', no business scope)
     -> POST /ingest/process triggers Stage 1
     -> Processed artifact persisted to object storage
     -> File.processed_artifact_key set
     -> No vector indexing yet

2. Time passes. The file sits in the inbox awaiting assignment.

3. User assigns the file to a business entity (e.g., a project, ticket, or case)
     -> Assignment handler resolves scope: entityType + entityId
     -> POST /ingest/publish triggers Stage 2
     -> Processed artifact loaded from object storage
     -> Chunks embedded and indexed under the target entity
     -> User sees the document as searchable within seconds

Without the two-stage split, step 3 would trigger the entire pipeline: download from object storage, extract text via document intelligence API (5-30 seconds for a PDF), chunk, embed, index. The user would wait half a minute or more. With the split, step 3 only runs embedding and indexing (typically 2-5 seconds for a normal-sized document).

The scope resolution logic at assignment time varies by application. In our case, a document can be assigned to one of several entity types with a priority ordering (if multiple are set, the most specific wins). The key architectural point is that scope resolution is a business lifecycle event, not an extraction concern. Decoupling them is what makes the two-stage split possible.

The publishing trigger is fire-and-forget from the assignment handler’s perspective. The API call enqueues a job and returns immediately. Assignment does not block on publishing completion. If publishing fails, the retry and DLQ mechanisms handle recovery independently of the assignment lifecycle.


XIV. Testing Strategy

A pipeline with this many integration points requires thorough testing at multiple levels:

Unit tests cover the pure logic: lock acquisition and conflict detection, entity-to-layer mapping, content hash computation, artifact serialization/deserialization, policy resolution, metadata schema enforcement, and dedup decision logic (skip vs. reindex).

Integration tests run against real database, key-value store, and vector store instances via test containers. These validate the full pipeline end-to-end: file creation, content processing, artifact persistence, scoped publishing, and vector store query verification.

Worker tests cover job type dispatch, backoff configuration, DLQ routing after exhausted retries, and graceful shutdown behavior (drain period before forced termination).

The deduplication tests are the most important. They validate: same file ID processed twice (lock blocks second run), same binary under different file IDs in same scope (skip), same binary under different file IDs in different scope (reindex with new metadata), and force-reprocess flag bypassing all dedup checks.


XV. Deployment and Scaling

The ingestion service runs as an independent process, separately scalable from the main application. This is a deliberate architecture choice: document processing is CPU and I/O intensive (downloading files, calling external APIs, computing embeddings) while the main application is request-response latency sensitive. Mixing them on the same process creates resource contention where a burst of file uploads degrades API response times.

The job queue workers run inside the same process as the HTTP server for simplicity. For higher throughput, scale horizontally by running multiple container instances. The queue’s distributed backend (Redis, in our case) ensures jobs are distributed across workers without duplication. Worker concurrency (parallel jobs per instance) is configurable via environment variable.

Graceful shutdown handles the container orchestrator lifecycle:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
async function shutdown(signal: string) {
  logger.info(`Received ${signal}, starting graceful shutdown`);

  // Stop accepting new jobs
  await worker.close();

  // Wait for in-flight jobs to complete (configurable timeout)
  await drainInFlightJobs(config.shutdownDrainMs);

  // Close connections
  await kvStore.disconnect();
  await database.destroy();

  process.exit(0);
}

process.on('SIGTERM', () => shutdown('SIGTERM'));
process.on('SIGINT', () => shutdown('SIGINT'));

XVI. What This Architecture Enables

The two-stage pattern with a processed artifact layer is not just an optimization. It changes what is possible:

Document re-publishing without re-processing. When the embedding model is upgraded (from v3 to v4, or from 768-dim to 1536-dim), only Stage 2 needs to re-run. Every processed artifact in object storage is still valid. The re-indexing job loads each artifact, embeds with the new model, and re-indexes. No file downloads, no extraction API calls, no re-parsing.

Multi-domain indexing from shared content. The same document content indexed into different vector stores with different embedding models and different metadata schemas, all without re-running extraction. The publishing policy registry handles the per-domain differences.

Audit trail of what was processed and when. The processed_at timestamp and artifact_schema_version on the file entity provide a complete record of processing state. Combined with the artifact’s strategy field, you can answer: “Was this file processed with the current extraction strategy, or does it need re-processing?”

Domain isolation for specialized use cases. A regulatory document domain can use a different embedding model with higher dimensionality, a different vector index with different similarity metrics, and a different metadata schema, all plugged into the same pipeline via the publishing policy registry.


XVII. Lessons from Production

Start with the artifact format, not the pipeline. The processed artifact format took more design iterations than any pipeline stage.8 This echoes Fred Brooks’ observation in “The Mythical Man-Month”: “Show me your flowcharts and conceal your tables, and I shall continue to be mystified. Show me your tables, and I won’t usually need your flowcharts; they’ll be obvious.” In a pipeline system, the data contract between stages is the architecture. The pipeline code is just the plumbing. Getting the chunk interface right (what metadata to carry, what to leave in sourceDetails, how to version) determined how cleanly the two stages could decouple. Early prototypes that stored raw text instead of pre-chunked content created implicit coupling between Stage 1 and Stage 2 chunking logic.

Idempotency is not optional; it is the first requirement. In a distributed system with retries, sweeps, and multiple ingestion paths, every operation will execute more than once.9 Pat Helland’s classic paper “Idempotence Is Not a Medical Condition” (2012) makes the case that idempotency should be the default assumption for any service operation, not an optimization. In document processing specifically, the cost of a missed idempotency check (duplicate embeddings, wasted API calls, corrupted indexes) is high and silent. You only discover it when retrieval quality degrades. The pipeline was designed for repeated execution from day one. Delete-before-insert in the vector store, distributed locks with TTL, and content hash checks at every tier make this safe.

Feature flags at the caller, not the pipeline. The pipeline itself does not know about migration phases. It processes whatever it receives. The feature flag lives at the routing layer in the calling service, where it decides whether to send a file to the new pipeline or the legacy path. This keeps the pipeline code clean and the migration logic contained.

The fallback path saves you during rollout. When the ingestion service experienced a transient connectivity issue during the first production deployment, the fallback to the legacy processing path meant zero documents were lost from the search index. The fallback was exercised for a few minutes, then the ingestion service recovered and picked up the backlog from the job queue.

Background sweep is your safety net. The sweep catches everything that falls through the cracks: jobs that were never queued due to a timeout, files where the worker crashed mid-processing, or files that were created during a deployment window. It has prevented manual intervention on multiple occasions.


Further Reading

  • Lewis et al. (2020). Retrieval-Augmented Generation for Knowledge-Intensive NLP Tasks โ€” The foundational RAG paper. Established the retrieve-then-generate paradigm that every production system now extends far beyond the original formulation.
  • Vaswani et al. (2017). Attention Is All You Need โ€” The transformer architecture underlying every modern embedding model. Understanding attention mechanisms helps you reason about why chunk boundaries matter for retrieval quality.
  • Gao et al. (2024). Retrieval-Augmented Generation for Large Language Models: A Survey โ€” Comprehensive survey of RAG architectures and their evolution. Good for understanding where the field is heading.
  • Kleppmann, Martin (2017). Designing Data-Intensive Applications โ€” The definitive reference on distributed system patterns. Chapters on stream processing and exactly-once semantics are directly applicable to pipeline design.
  • Helland, Pat (2012). Idempotence Is Not a Medical Condition โ€” Short, sharp argument for idempotency as a default design constraint, not an optimization.
  • Brooks, Frederick (1975). The Mythical Man-Month โ€” The “show me your tables” insight about data contracts being more important than code flow applies directly to artifact format design.

Changelog

  • 2026-03-26: Initial publication. Covers the two-stage architecture, processed artifact format, extraction strategies, publishing policy registry, multi-tier deduplication, async processing, and production deployment lessons.