Chapter 12: Parallel.ForEachAsync - Batch Processing Embeddings efficiently
Theoretical Foundations
The fundamental challenge in building modern AI applications is bridging the gap between the vast, sequential nature of data storage and the immense, parallelizable power of modern hardware and cloud APIs. When processing a corpus of documents—whether it's a knowledge base for a RAG (Retrieval-Augmented Generation) system, a batch of user feedback, or a dataset for fine-tuning—the naive approach is to iterate through each document one by one, generate its embedding, and wait for the network request to complete before starting the next. This is akin to a single clerk meticulously processing one invoice at a time in a massive accounts payable department, waiting for each stamp to dry before starting the next. While simple and predictable, this approach leaves the vast majority of the CPU and network bandwidth idle, resulting in excruciatingly slow throughput that is unscalable for any serious application.
The solution lies in parallelism, specifically data parallelism, where we process multiple data chunks simultaneously. In the context of C#, the modern paradigm for achieving this without the complexities of raw thread management is Parallel.ForEachAsync. This language feature is the cornerstone of building high-throughput, asynchronous AI pipelines. It allows us to express the intent of "process all these items concurrently" while the runtime handles the intricate details of scheduling, throttling, and asynchronous execution. Understanding this feature is not merely an academic exercise; it is the difference between a proof-of-concept that takes days to index a small dataset and a production system that can process millions of documents in hours.
The Bottleneck: I/O-Bound Latency in AI Operations
To truly appreciate Parallel.ForEachAsync, we must first dissect the nature of the work we are doing. Generating an embedding for a piece of text is a classic I/O-bound operation. The process involves:
- Serializing the text into a request payload.
- Transmitting this payload over a network (e.g., via an HTTP POST request) to an API endpoint (like OpenAI's, Cohere's, or a self-hosted model server).
- The server performing the inference (a computationally intensive task, but from our client's perspective, it's a black box that just takes time).
- The server sending back a response payload containing the vector.
- Deserializing the response.
Steps 2, 3, and 4 are dominated by waiting. The local CPU spends very little time actively working during these phases. In a sequential loop, the CPU would be idle for, say, 500ms per document. If you have 10,000 documents, that's over 80 minutes of pure waiting time, where the processor is doing nothing useful. The goal of parallelism here is to use this waiting time productively. While one request is "on the wire" and waiting for a response, the CPU should be free to initiate another request.
This is where the concepts from Book 3, 'Chapter 11: Asynchronous State Machines and the Task Parallel Library', become critical. The async and await keywords are the syntactic sugar that allows us to write non-blocking code. When we await a network request, the current thread is released back to the thread pool, free to handle other work. Parallel.ForEachAsync is the high-level orchestrator that leverages this non-blocking behavior to manage a collection of tasks concurrently.
Analogy: The Post Office and the Fleet of Couriers
Imagine you are a manager at a central post office (your application) responsible for delivering 10,000 letters (text chunks) to a remote sorting facility (the embedding API) and collecting the sorted packages (the resulting vectors).
The Sequential Approach (The Single, Overworked Clerk): You hire one clerk. This clerk picks up the first letter, walks to the sorting machine, waits for it to be processed, walks back, records the result, and only then picks up the second letter. The vast majority of the clerk's time is spent walking and waiting, not processing. This is incredibly inefficient.
The Parallel.ForEachAsync Approach (The Fleet of Couriers):
Instead, you hire a fleet of couriers and a dispatcher. The dispatcher's job is to manage the couriers based on available resources.
- The Couriers (
Tasks): Each courier is an independent unit of work. They can pick up a letter, drive to the sorting facility, and while they are on the road (waiting for I/O), the dispatcher can send another courier out with the next letter. - The Dispatcher (
Parallel.ForEachAsync): The dispatcher doesn't just send all 10,000 couriers at once. That would create a traffic jam and overwhelm the sorting facility. Instead, the dispatcher has rules:- Degree of Parallelism (
MaxDegreeOfParallelism): The dispatcher decides only 10 couriers can be on the road at any given time. As soon as one courier returns with a result, the dispatcher immediately sends a new one out with the next available letter. This keeps the "pipeline" full without overwhelming the system. - CancellationToken: If the post office manager decides to close for the day, they can wave a red flag. The dispatcher sees this and tells all couriers currently on the road to return immediately and stops sending new ones out.
- Degree of Parallelism (
Parallel.ForEachAsync is this intelligent dispatcher, abstracting away the complexity of managing the couriers (tasks), the lane limits (parallelism), and the cancellation signals.
Core Mechanics of Parallel.ForEachAsync
At its heart, Parallel.ForEachAsync is a method that takes an IEnumerable<TSource> and an asynchronous async delegate function. It returns a Task that completes only when all items in the source collection have been processed.
The magic is in its internal implementation. It does not simply fire-and-forget a task for every single item. That would be Task.WhenAll, which can lead to uncontrolled resource consumption. Instead, it works like a sophisticated producer-consumer queue.
- Partitioning: The method internally iterates over the source collection to understand the work items. It doesn't need to pre-load everything into memory at once, especially if the source is an
IAsyncEnumerable. - Scheduling: It uses the .NET runtime's
TaskSchedulerandThreadPoolto manage the execution. When you specify aMaxDegreeOfParallelismof N, it ensures that at most N delegate invocations are running concurrently. - Asynchronous Execution: For each item, it invokes the provided
asynclambda. Crucially, when the code inside the lambda hits anawait(e.g.,await httpClient.PostAsync(...)), theTaskyields control. TheParallel.ForEachAsyncinfrastructure sees that this "slot" is now temporarily free and can immediately start processing another item if a slot is available, even though the originalTaskhasn't completed yet. - Completion: It internally tracks the state of all spawned tasks and only completes its own returned
Taskwhen the entire source collection has been exhausted and all resulting tasks have settled (either successfully or with an exception).
Managing Resource Contention: ParallelOptions
The "Dispatcher" needs rules. You cannot simply unleash a thousand concurrent requests against an API that has a rate limit of 600 requests per minute. This is where ParallelOptions becomes indispensable.
ParallelOptions is a configuration object you pass to Parallel.ForEachAsync. Its most important property for our use case is MaxDegreeOfParallelism.
-
Setting the Concurrency Level: This property directly controls the "fleet size." A low value (e.g., 2-5) is safe and respectful to the target API, minimizing the chance of being rate-limited. A high value (e.g., 20-50) can be used if you are hitting a local model server with high throughput or if you have confirmed with the API provider that you can handle a high concurrent load. The optimal value is a trade-off between speed and stability. It often requires empirical testing to find the sweet spot.
-
CancellationToken: The
CancellationTokenproperty allows you to plug into the application's broader cancellation mechanism. If the user cancels the operation or the application is shutting down, this token can be triggered.Parallel.ForEachAsyncwill respect it, stopping the dispatch of new work and allowing in-flight operations to complete gracefully (or cancel, if the token is passed down to the HTTP client).
Thread Safety and Data Accumulation
A critical aspect of any parallel operation is how results are collected. In a sequential foreach loop, you can safely add results to a simple List<T> because only one thread is ever modifying it at a time. In a parallel world, multiple threads are executing the async delegate simultaneously and will try to add their results to your collection at the same time.
A standard List<T> is not thread-safe. Concurrent additions will lead to race conditions, data corruption, and exceptions. The solution is to use thread-safe collections or to synchronize access.
-
The Wrong Way (Locking): You could use a
lockstatement around thelist.Add(result)call. This forces all threads to wait in line to add their result, creating a bottleneck and defeating some of the performance benefits of parallelism. -
The Right Way (Concurrent Collections): The
System.Collections.Concurrentnamespace provides collections designed for this exact scenario. The most common one for this pattern isConcurrentBag<T>. It's optimized for the scenario where threads are both adding and removing items, and order is not guaranteed. In an embedding pipeline, we just need to gather all the vectors; the order doesn't matter. -
The Modern Way (Local State):
Parallel.ForEachAsynchas an overload that supports a more advanced pattern for minimizing synchronization overhead. It allows you to initialize a "local state" for each thread/task. Each task accumulates its results into its own private list. When a task is finished with its assigned work items, it then merges its local list into the final, thread-safe collection once. This dramatically reduces the number of lock acquisitions.
Architectural Implications and Trade-offs
Adopting Parallel.ForEachAsync is not just a code change; it's an architectural commitment with significant implications.
-
Memory Footprint: While one
HttpClientrequest consumes minimal memory, having 50 requests in-flight simultaneously means the application is holding 50 payloads, 50 response buffers, and 50 sets of deserialized objects in memory at once. This is a significant increase over the sequential approach. If the text chunks are very large, this can lead toOutOfMemoryException. TheMaxDegreeOfParallelismsetting is therefore not just about API limits, but also about managing local memory pressure. -
Error Handling: In a sequential loop, if an exception occurs, the loop stops immediately. In a parallel loop, exceptions from different tasks are aggregated.
Parallel.ForEachAsyncwill not throw an exception on the first failure. It will attempt to process all items. Once all tasks have settled (either successfully or with an exception), it will throw anAggregateExceptioncontaining all the exceptions that occurred. This requires a different mindset for error handling. You must be prepared to inspect a collection of failures, decide which are transient (and might be retried) and which are permanent (e.g., malformed input), and handle them accordingly. -
Idempotency and Retries: Because of the potential for transient network errors, the async delegate you provide should ideally be idempotent. If a request fails halfway through, the
Parallel.ForEachAsyncwill not automatically retry. You would need to build retry logic (e.g., using a library like Polly) inside your async delegate. This means that a single "item" in the loop might represent several actual HTTP requests. This complexity must be factored into the design.
Visualizing the Data Flow
The flow of data through a Parallel.ForEachAsync pipeline can be visualized. The key is the throttling mechanism that maintains a constant level of concurrency, independent of the source collection's size.
This diagram illustrates the core concept: the Parallel.ForEachAsync dispatcher pulls from the source and enqueues work into an internal queue that respects the MaxDegreeOfParallelism. It only dispatches work to a worker slot when one is free. The workers, upon completion, add their results to the final, thread-safe collection and signal back that their slot is free for the next piece of work. This continuous cycle ensures maximum throughput without overwhelming system resources.
In conclusion, Parallel.ForEachAsync is the essential tool for transforming AI data processing pipelines from sequential, slow operations into high-performance, concurrent systems. It provides a safe, declarative, and highly efficient model for managing asynchronous work at scale, abstracting away the complexities of task scheduling and resource management while leaving the developer in full control of the critical parameters that govern throughput and stability.
Basic Code Example
Here is a self-contained, "Hello World" level example demonstrating Parallel.ForEachAsync for batch processing text embeddings.
Real-World Context
Imagine you are building a semantic search engine for a large collection of documents. To make the text searchable, you need to convert every document into a vector (an embedding) using an AI model. Processing these one by one is too slow. This example simulates converting a list of document chunks into embeddings concurrently, respecting a simulated API rate limit.
Code Example
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
public class EmbeddingBatchProcessor
{
// 1. Define a simple record to hold our data and results
public record TextChunk(string Id, string Content);
public static async Task Main(string[] args)
{
Console.WriteLine("Starting Batch Embedding Generation...");
// 2. Generate dummy data (simulating 100 document chunks)
var chunks = Enumerable.Range(1, 100)
.Select(i => new TextChunk($"ID-{i:D3}", $"Content for document {i}"))
.ToList();
// 3. Thread-safe collection to store results (ConcurrentBag is ideal for unordered inserts)
var embeddings = new ConcurrentBag<(string Id, float[] Vector)>();
// 4. Configure Parallel Options
// MaxDegreeOfParallelism limits concurrent tasks to avoid overwhelming the API or local resources.
var parallelOptions = new ParallelOptions
{
MaxDegreeOfParallelism = 5
};
// 5. Define the cancellation token source
var cts = new CancellationTokenSource();
try
{
// 6. Execute the parallel loop
// 'async' keyword is required here to allow await inside the loop body
await Parallel.ForEachAsync(chunks, parallelOptions, async (chunk, cancellationToken) =>
{
// 7. Simulate generating an embedding (e.g., calling an external API)
// We pass the cancellation token to support cooperative cancellation.
var vector = await GenerateEmbeddingAsync(chunk, cancellationToken);
// 8. Add the result to the thread-safe collection
embeddings.Add((chunk.Id, vector));
// 9. Report progress (Thread-safe writing to Console)
Console.WriteLine($"Processed {chunk.Id} on Thread {Thread.CurrentThread.ManagedThreadId}");
});
}
catch (OperationCanceledException)
{
Console.WriteLine("Batch processing was cancelled.");
}
// 10. Output results
Console.WriteLine($"\nCompleted. Generated {embeddings.Count} embeddings.");
foreach (var emb in embeddings.Take(3)) // Show first 3
{
Console.WriteLine($"ID: {emb.Id}, Vector: [{string.Join(", ", emb.Vector.Take(3))}...]");
}
}
/// <summary>
/// Simulates an asynchronous call to an AI Embedding API.
/// </summary>
private static async Task<float[]> GenerateEmbeddingAsync(TextChunk chunk, CancellationToken ct)
{
// Simulate network latency (random between 100ms and 300ms)
var delay = Random.Shared.Next(100, 300);
// Simulate an API rate limit check or processing time
await Task.Delay(delay, ct);
// Simulate a random vector of size 384 (common for small embedding models)
var vector = new float[384];
Random.Shared.NextBytes(vector); // Filling with random bytes (simplified for demo)
// Convert bytes to floats in range [0, 1]
for (int i = 0; i < vector.Length; i++)
{
vector[i] = vector[i] / 255.0f;
}
return vector;
}
}
Explanation
- Data Preparation: We create a list of
TextChunkrecords. In a real scenario, this data would likely come from a database or file system. We generate 100 items to demonstrate the batch processing capability. - Thread-Safe Storage: We instantiate a
ConcurrentBag<T>. SinceParallel.ForEachAsyncexecutes on multiple threads simultaneously, standard collections likeList<T>are unsafe.ConcurrentBagallows multiple threads to add items without locking the entire collection, which is efficient for unordered additions. - ParallelOptions Configuration: We set
MaxDegreeOfParallelismto 5. This is crucial. Without it, the loop might attempt to process all 100 items simultaneously, which could crash the application (Out of Memory) or get blocked by external API rate limits (e.g., 429 Too Many Requests). We are explicitly limiting concurrency to 5 simultaneous tasks. - The
Parallel.ForEachAsyncLoop:- The first argument is the source data (
chunks). - The second argument is the options (
parallelOptions). - The third argument is the
asynclambda function. Theasynckeyword is mandatory here because we need toawaitthe embedding generation.
- The first argument is the source data (
- Cooperative Cancellation: The lambda accepts a
CancellationToken(the second parameter of the lambda). We pass this token toGenerateEmbeddingAsync(specifically toTask.Delay). If the operation is cancelled externally (viacts.Cancel()), the delay throws an exception immediately, stopping the task efficiently. - Simulating Work: The
GenerateEmbeddingAsyncmethod simulates a network call. It usesTask.Delayto mimic latency. In a real application, this is where you would callHttpClient.PostAsyncor a specific AI SDK method. - Result Collection: Once the await completes, we add the result to our
ConcurrentBag. This happens on whichever thread the scheduler assigned the task to. - Execution Flow: The
Mainmethod awaits the entire parallel loop. The loop does not complete until all source items have been processed (or an exception is thrown).
Common Pitfalls
- Forgetting the
asyncKeyword: If you omit theasynckeyword in the lambda(chunk, ct) => { ... }, you cannot useawait. You would be forced to use blocking calls like.Resultor.Wait(), which defeats the purpose of asynchronous execution and can cause deadlocks. - Ignoring the
CancellationToken: Failing to pass theCancellationTokento downstream asynchronous operations (likeTask.Delayor HTTP requests) prevents the loop from stopping quickly when cancellation is requested. The loop will wait for the current tasks to finish naturally. - Using Non-Thread-Safe Collections: Attempting to use
List<T>orDictionary<TKey, TValue>inside the loop body without explicit locking (e.g.,lock(obj)) will lead to race conditions and corrupted data. Always useConcurrentBag<T>,ConcurrentDictionary<T>, orImmutableList<T>. - Setting
MaxDegreeOfParallelismToo High: Setting this toEnvironment.ProcessorCount * 2is a common starting point for CPU-bound work. However, for I/O-bound work (like API calls), you might need a much higher number. Conversely, setting it too high (e.g., 1000) against a rate-limited API will result in throttling errors.
Visualizing the Pipeline
The following diagram illustrates how Parallel.ForEachAsync manages the flow of data and tasks.
The chapter continues with advanced code, exercises and solutions with analysis, you can find them on the ebook on Leanpub.com or Amazon
Loading knowledge check...
Code License: All code examples are released under the MIT License. Github repo.
Content Copyright: Copyright © 2026 Edgar Milvus | Privacy & Cookie Policy. All rights reserved.
All textual explanations, original diagrams, and illustrations are the intellectual property of the author. To support the maintenance of this site via AdSense, please read this content exclusively online. Copying, redistribution, or reproduction is strictly prohibited.