Skip to content

Chapter 20: Capstone - Building an AI Data Processing Pipeline

Theoretical Foundations

The capstone project in this chapter simulates a real-world, high-throughput system where raw, unstructured data is transformed into structured, actionable intelligence. At its heart, this pipeline is an application of the Single Responsibility Principle (SRP), a concept introduced in Book 1, Chapter 3, which we now elevate from a code-level best practice to a system architecture philosophy. Instead of a monolithic backend that handles data fetching, processing, API routing, and response generation, we decompose the system into distinct, specialized services, each with a single, well-defined reason to change.

Imagine a high-end restaurant kitchen. A single chef attempting to manage ingredient sourcing, chopping, cooking, plating, and customer service would be a bottleneck and a point of failure. A professional kitchen, however, is organized into stations: the garde manger (cold station), the saucier (sauces), the pâtissier (desserts), and the expeditor (who orchestrates the flow). Our AI pipeline is architected in the same way. The Edge Function acts as the expeditor, the tRPC router is the maître d' managing the flow of orders to the kitchen, and the LLM is the specialized saucier, expertly transforming raw ingredients (data) into a refined final product (structured output).

This architectural separation is not merely for organization; it is critical for performance, scalability, and maintainability in a web environment. A traditional server-rendered page might block while waiting for a database query and a subsequent AI inference, leading to poor user experience. Our pipeline, by contrast, is designed for asynchronous, event-driven processing, where the state of the pipeline is managed separately from the data transformation itself, allowing for real-time feedback to the user.

The Orchestrator: Edge Functions as the Expeditor

In a traditional serverless architecture, a single function might handle the entire request lifecycle. This is akin to our single-chef kitchen. For our pipeline, we delegate the orchestration role to an Edge Function. The "why" here is twofold: latency and statelessness. By placing the orchestrator at the edge (geographically closer to the user), we minimize the initial network round-trip time. The Edge Function's sole responsibility is to act as a lightweight conductor. It receives a request, validates it, fetches the necessary raw data from a source (e.g., a database, a public API, or a file storage service), and then passes this raw data to the next station in the kitchen: the LLM.

This function is deliberately thin. It does not contain complex business logic or data transformation rules. Its "reason to change" is limited to the data source it connects to or the initial request format. If the raw data source's API changes, only this orchestrator needs modification, leaving the rest of the pipeline untouched. This is SRP in action at the infrastructure level.

The Intelligence Layer: LLMs as Specialized Data Transformers

The LLM is the star of our kitchen—the saucier. Its job is not to know how to fetch data or how to render a UI; its single responsibility is intelligent transformation. We feed it raw, often messy, data and a highly specific prompt. The prompt engineering is the recipe. A poor recipe yields a poor sauce, regardless of the quality of the ingredients.

For example, if we are processing a stream of financial news articles, the raw data might be a block of text. The LLM's task, guided by a meticulously crafted prompt, is to extract key entities (companies, people, dates), identify sentiment, and structure this information into a clean JSON object. This is where Chunking Strategy becomes crucial. A large document cannot be fed to an LLM in one go due to context window limitations. We must pre-process the data by breaking it down into semantically meaningful chunks. Think of this as a chef preparing mise en place—organizing all ingredients into manageable portions before cooking begins. A naive chunking strategy (e.g., splitting by a fixed number of characters) can sever a sentence or a logical thought, confusing the LLM. A semantic chunking strategy, which might use embeddings to group related text, ensures that each "chunk" fed to the LLM is a coherent unit of information, leading to more accurate and consistent transformations.

The State Manager: tRPC as the Maître D'

Once the LLM has transformed the data, how does this result get back to the client in a structured, type-safe manner? This is the role of the tRPC router. tRPC is our maître d', managing the state and flow of information between the client and the backend services. In this pipeline, tRPC doesn't perform the heavy lifting of data transformation. Instead, it provides a strongly-typed, real-time communication channel.

We will use tRPC's subscription capabilities to stream the LLM's output. The LLM, especially modern models, often generates responses token by token. A naive implementation would wait for the entire response to be generated before sending it back, creating a noticeable delay for the user. The tRPC router, however, can establish a persistent connection (e.g., via WebSockets) and stream these tokens as they are produced by the LLM. This is analogous to a live news feed updating with each new piece of information, rather than a static page that only loads once. The client receives a continuous flow of structured data, which can be used to update the UI in real-time, providing immediate feedback and a perception of speed.

System Architecture Visualization

The following diagram illustrates the flow of data and the separation of concerns within the pipeline.

Under the Hood: Asynchronous State and Latency Management

The true complexity and elegance of this system lie in managing asynchronous state. The client does not simply make a request and wait. It initiates a process. The tRPC router maintains the state of this process (e.g., "processing," "streaming," "complete"). The Edge Function is stateless and ephemeral; it completes its task and disappears. The LLM inference is a potentially long-running, compute-intensive task.

This is where the concept of Warm Start (Local AI) becomes relevant for performance optimization. In a scenario where we might run a local model for development or specialized tasks (e.g., using WebGPU or WASM in the browser), the first inference run incurs a significant latency penalty as the model weights are loaded into memory and the computational graph is prepared. Subsequent "warm" runs are dramatically faster. Our pipeline architecture, by decoupling the orchestration and state management, allows us to optimize each stage independently. We can cache model weights, pre-warm inference servers, and manage connection pools for databases, all without impacting the client's perceived latency. The tRPC stream ensures that even if the LLM inference takes 5 seconds, the user sees the first meaningful results within milliseconds of the request, creating a fluid, responsive experience that is impossible with a traditional request-response model. This detailed, multi-stage, SRP-driven architecture is what transforms a simple AI API call into a robust, production-grade data processing pipeline.

Basic Code Example

In a Next.js App Router context, Server Components (SCs) are the ideal place to initiate data fetching for an AI pipeline. By fetching data directly on the server, we eliminate client-side waterfalls and ensure that the Large Language Model (LLM) receives the necessary context (user data, conversation history, system prompts) immediately before generating a response. This is crucial for maintaining low latency in real-time data processing pipelines.

The following example demonstrates a "Hello World" level pipeline. It simulates fetching a raw data object (e.g., from a database) within a Server Component, passing it to a tRPC router that acts as the orchestrator. The router sends this data to an LLM with a specific prompt to transform the raw data into a structured JSON format. The result is streamed back to the client.

The router streams raw data to an LLM with a specific prompt, which transforms it into structured JSON and streams the result back to the client.
Hold "Ctrl" to enable pan & zoom

The router streams raw data to an LLM with a specific prompt, which transforms it into structured JSON and streams the result back to the client.

Basic Code Example: Raw Data to Structured JSON

This code sets up a minimal Next.js App Router page (app/page.tsx) and a tRPC router (app/api/trpc/[trpc]/route.ts). It uses the openai package for the LLM interaction.

// app/api/trpc/[trpc]/route.ts
import { initTRPC } from '@trpc/server';
import { fetchRequestHandler } from '@trpc/server/adapters/fetch';
import OpenAI from 'openai';
import { z } from 'zod';

// 1. Initialize tRPC
const t = initTRPC.create();

// 2. Define the Input Schema using Zod
// We expect a 'rawData' string containing unstructured text.
const inputSchema = z.object({
  rawData: z.string(),
});

// 3. Create the Router
const appRouter = t.router({
  // Define the 'transformData' procedure
  transformData: t.procedure
    .input(inputSchema)
    .query(async ({ input }) => {
      // Initialize OpenAI client (Ensure OPENAI_API_KEY is in env)
      const openai = new OpenAI();

      // 4. Construct the Prompt
      // We instruct the LLM to act as a data parser and output strict JSON.
      const prompt = `
        You are a precise data extraction engine.
        Transform the following raw data into a JSON object with keys: "summary", "sentiment", and "priority".

        Raw Data:
        "${input.rawData}"

        Output only valid JSON.
      `;

      // 5. Call the LLM with Streaming
      // We use streaming to simulate real-time processing latency handling.
      const stream = await openai.chat.completions.create({
        model: 'gpt-3.5-turbo', // or gpt-4
        messages: [{ role: 'user', content: prompt }],
        stream: true,
        response_format: { type: 'json_object' }, // Hint for structured output
      });

      // 6. Create a ReadableStream to pipe LLM tokens to the client
      // This bypasses tRPC's default JSON serialization for streaming.
      const encoder = new TextEncoder();
      const readableStream = new ReadableStream({
        async start(controller) {
          for await (const chunk of stream) {
            const content = chunk.choices[0]?.delta?.content || '';
            controller.enqueue(encoder.encode(content));
          }
          controller.close();
        },
      });

      return readableStream;
    }),
});

// 7. Export the Handler for Next.js Route Handlers
export const handler = (req: Request) =>
  fetchRequestHandler({
    endpoint: '/api/trpc',
    req,
    router: appRouter,
    createContext: () => ({}),
  });

// 8. Explicitly export the POST method (Next.js Route Handler requirement)
export { handler as POST, handler as GET };
// app/page.tsx
import { useState } from 'react';

/**

 * Client Component to interact with the tRPC pipeline.
 * In a real app, this would be wrapped with a tRPC Provider.
 * For this "Hello World", we use a raw fetch call to demonstrate the raw stream.
 */
export default function DataPipelinePage() {
  const [result, setResult] = useState('');
  const [loading, setLoading] = useState(false);

  const handleProcess = async () => {
    setLoading(true);
    setResult('');

    // Prepare the input payload matching the Zod schema
    const payload = {
      rawData: "Customer feedback: 'The app crashes every time I click save. Very frustrating!'",
    };

    try {
      // 1. Call the tRPC endpoint
      // Note: tRPC uses POST requests for queries when sending inputs.
      const response = await fetch('/api/trpc/transformData', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({
          // tRPC v11+ format: { 0: { input: payload } }
          // Adjust based on your specific tRPC version configuration
          0: { input: payload }
        }),
      });

      if (!response.body) return;

      // 2. Process the Stream
      // We read the stream token-by-token (or chunk-by-chunk)
      const reader = response.body.getReader();
      const decoder = new TextDecoder();

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        // 3. Decode and Append
        const chunk = decoder.decode(value, { stream: true });
        setResult((prev) => prev + chunk);
      }
    } catch (error) {
      console.error("Pipeline failed:", error);
    } finally {
      setLoading(false);
    }
  };

  return (
    <div style={{ padding: '20px', fontFamily: 'sans-serif' }}>
      <h1>AI Data Processing Pipeline</h1>
      <p>Input: "Customer feedback: 'The app crashes...'"</p>

      <button onClick={handleProcess} disabled={loading}>
        {loading ? 'Processing...' : 'Run Pipeline'}
      </button>

      <div style={{ marginTop: '20px', padding: '10px', border: '1px solid #ccc' }}>
        <strong>Structured Output:</strong>
        <pre>{result}</pre>
      </div>
    </div>
  );
}

Line-by-Line Explanation

1. tRPC Initialization and Schema Definition

const t = initTRPC.create();
const inputSchema = z.object({ rawData: z.string() });
  • Why: We use initTRPC to create the tRPC core instance. zod is used for runtime type safety. Even though TypeScript provides compile-time checking, Zod validates the data at runtime when the API receives the request.
  • Under the Hood: The inputSchema ensures that the rawData field is present and is a string. If the client sends malformed JSON, tRPC will reject the request before it reaches the LLM logic, saving API costs.

2. The tRPC Procedure (The Orchestrator)

.transformData: t.procedure
  .input(inputSchema)
  .query(async ({ input }) => { ... })
  • Why: We define a query procedure (though in real-time pipelines, mutations are often used for state changes, queries are fine for read-only transformations here). The input object is automatically typed as { rawData: string } by TypeScript.
  • Under the Hood: This function runs on the server (Edge Runtime or Node.js). It has access to environment variables (like OPENAI_API_KEY) securely.

3. Prompt Engineering for Structured Data

const prompt = `... Transform ... into JSON ... Output only valid JSON.`;
  • Why: LLMs are probabilistic. Without explicit instructions, they might return conversational text. By specifying the exact keys (summary, sentiment, priority) and demanding "valid JSON", we guide the model to produce machine-readable output.
  • Under the Hood: We are injecting the input.rawData into the prompt context. This is the "Data Transformation" step where the LLM acts as a parser.

4. Streaming the LLM Response

const stream = await openai.chat.completions.create({
  stream: true,
  response_format: { type: 'json_object' },
});
  • Why: In a real-world scenario, LLMs can take several seconds to generate a response. Streaming allows us to send tokens to the client as they are generated, providing a better user experience (perceived lower latency).
  • Under the Hood: The OpenAI SDK returns an AsyncIterator. Each chunk contains a small delta of text (often just a few tokens).

5. Converting Stream to HTTP Response

const readableStream = new ReadableStream({ ... });
return readableStream;
  • Why: tRPC typically expects a JSON object to be returned. However, a stream is not a JSON object. By returning a ReadableStream directly, we bypass tRPC's default serialization.
  • Under the Hood: We create a Web Standard API ReadableStream. We iterate over the OpenAI stream chunks, encode them to Uint8Array (bytes), and enqueue them into our custom stream. This stream is then piped directly to the HTTP response body.

6. Client-Side Stream Consumption

const reader = response.body.getReader();
const { done, value } = await reader.read();
  • Why: On the client, we cannot simply await the full response. We must consume the ReadableStream via its Reader interface.
  • Under the Hood: getReader() creates a lock on the stream. reader.read() returns a promise resolving to { done: boolean, value: Uint8Array }. We loop indefinitely until done is true. TextDecoder converts the binary chunks back into human-readable strings.

Common Pitfalls

  1. Vercel/AWS Lambda Timeouts (The 10s Wall):

    • Issue: Serverless functions (like Vercel Edge Functions) often have strict timeouts (e.g., 10 seconds for Hobby plans). If the LLM API is slow or the prompt is complex, the function might time out before the stream finishes.
    • Solution: Use Edge Config or a database to store the state, or ensure your LLM provider is fast enough. For heavy processing, consider using a background job queue (like Inngest or AWS SQS) rather than a direct synchronous API call.
  2. Async/Await Loops in Streams:

    • Issue: A common mistake is trying to await an operation inside a for await loop that blocks the event loop, or failing to handle stream backpressure.
    • Solution: In the ReadableStream constructor, ensure the start method is async but non-blocking. If processing heavy data, use transformStream to handle backpressure automatically.
  3. Hallucinated JSON:

    • Issue: Even with response_format: { type: 'json_object' }, LLMs might output a JSON string wrapped in markdown code blocks (e.g., json ...) or contain syntax errors (trailing commas).
    • Solution: Never trust the raw LLM output. Always validate the final concatenated string using Zod or a JSON parser on the server before sending it to the client, or parse it defensively on the client side.
  4. Token Counting and Context Windows:

    • Issue: Passing large rawData strings into the prompt can exceed the model's context window (e.g., 4096 tokens for older GPT models).
    • Solution: In a real pipeline, pre-process the data. Summarize or chunk the rawData before feeding it to the LLM. Use a tokenizer library (like gpt-tokenizer) to count tokens server-side before making the API call.
  5. tRPC Streaming Complexity:

    • Issue: Standard tRPC procedures return JSON. Returning a stream requires bypassing the standard tRPC serialization layer, as shown in the example.
    • Solution: If you need to stick strictly within tRPC's ecosystem for streaming, you would typically use superjson for serialization of complex objects, but for raw text streams, returning a standard Response object (as done above) is the most robust method in Next.js App Router.

The chapter continues with advanced code, exercises and solutions with analysis, you can find them on the ebook on Leanpub.com or Amazon


Loading knowledge check...



Code License: All code examples are released under the MIT License. Github repo.

Content Copyright: Copyright © 2026 Edgar Milvus | Privacy & Cookie Policy. All rights reserved.

All textual explanations, original diagrams, and illustrations are the intellectual property of the author. To support the maintenance of this site via AdSense, please read this content exclusively online. Copying, redistribution, or reproduction is strictly prohibited.