Build a Self-Healing Extraction Pipeline with Fallbacks
Production data extraction pipelines face a harsh reality: models sometimes produce invalid JSON, miss required fields, return hallucinated data, or fail silently. A pipeline that breaks on the first malformed response is useless at scale. LM-Kit.NET combines grammar-constrained decoding, JSON schema validation, FallbackAgentExecutor for automatic recovery, and PipelineOrchestrator for multi-stage processing to build extraction pipelines that detect failures and heal themselves. This guide builds a complete self-healing pipeline that extracts structured data from messy real-world text, validates every output, retries with corrective feedback on failure, and falls back to alternative strategies when the primary approach fails.
Why Self-Healing Pipelines Matter
Two production problems that self-healing pipelines solve:
- Silent data corruption in batch processing. When processing thousands of documents overnight, a single malformed extraction can propagate through your entire data warehouse. Self-healing pipelines validate every output against the expected schema and automatically retry failed extractions before any corrupt data reaches downstream systems.
- Resilience to model variability. The same model can produce different quality outputs depending on input length, language, domain complexity, and even token sampling randomness. A self-healing pipeline adapts to these variations by detecting quality drops in real time and switching strategies: retrying with corrective prompts, falling back to a different model, or escalating to a more capable extraction method.
Prerequisites
| Requirement | Minimum |
|---|---|
| .NET SDK | 8.0+ |
| VRAM | 4+ GB |
| Disk | ~3 GB free for model download |
Step 1: Create the Project
dotnet new console -n SelfHealingPipeline
cd SelfHealingPipeline
dotnet add package LM-Kit.NET
Step 2: Define the Extraction Schema and Validation
Start by defining the target data structure and a validation layer that catches every possible extraction failure:
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
using LMKit.Model;
using LMKit.TextGeneration;
using LMKit.TextGeneration.Sampling;
using LMKit.Agents;
using LMKit.Agents.Orchestration;
using LMKit.Agents.Resilience;
LMKit.Licensing.LicenseManager.SetLicenseKey("");
Console.InputEncoding = Encoding.UTF8;
Console.OutputEncoding = Encoding.UTF8;
// ──────────────────────────────────────
// 1. Define the extraction target schema
// ──────────────────────────────────────
public record ContactInfo(
[property: JsonPropertyName("full_name")] string FullName,
[property: JsonPropertyName("email")] string Email,
[property: JsonPropertyName("phone")] string Phone,
[property: JsonPropertyName("company")] string Company,
[property: JsonPropertyName("job_title")] string JobTitle,
[property: JsonPropertyName("department")] string Department
);
// ──────────────────────────────────────
// 2. Validation logic
// ──────────────────────────────────────
public sealed class ExtractionValidator
{
public (bool IsValid, List<string> Errors) Validate(string json)
{
var errors = new List<string>();
// Step 1: Is it valid JSON?
JsonElement root;
try
{
root = JsonDocument.Parse(json).RootElement;
}
catch (JsonException ex)
{
errors.Add($"Invalid JSON: {ex.Message}");
return (false, errors);
}
// Step 2: Are required fields present?
string[] requiredFields = { "full_name", "email", "company" };
foreach (string field in requiredFields)
{
if (!root.TryGetProperty(field, out JsonElement value) ||
value.ValueKind == JsonValueKind.Null ||
string.IsNullOrWhiteSpace(value.GetString()))
{
errors.Add($"Missing or empty required field: '{field}'");
}
}
// Step 3: Format validation
if (root.TryGetProperty("email", out JsonElement emailEl))
{
string? email = emailEl.GetString();
if (email != null && !email.Contains('@'))
errors.Add($"Invalid email format: '{email}'");
}
if (root.TryGetProperty("phone", out JsonElement phoneEl))
{
string? phone = phoneEl.GetString();
if (phone != null && phone.Length > 0 && phone.Length < 7)
errors.Add($"Phone number too short: '{phone}'");
}
return (errors.Count == 0, errors);
}
}
Step 3: Build the Grammar-Constrained Extractor
Use a JSON schema grammar to ensure structurally valid output on every attempt:
// ──────────────────────────────────────
// 3. Load model
// ──────────────────────────────────────
Console.WriteLine("Loading model...");
using LM model = LM.LoadFromModelID("qwen3:4b",
loadingProgress: p => { Console.Write($"\rLoading: {p * 100:F0}% "); return true; });
Console.WriteLine("\n");
// ──────────────────────────────────────
// 4. Define the JSON schema grammar
// ──────────────────────────────────────
string contactSchema = """
{
"type": "object",
"properties": {
"full_name": { "type": "string" },
"email": { "type": "string" },
"phone": { "type": "string" },
"company": { "type": "string" },
"job_title": { "type": "string" },
"department": { "type": "string" }
},
"required": ["full_name", "email", "company", "job_title"]
}
""";
var schemaGrammar = Grammar.CreateJsonGrammarFromJsonSchema(contactSchema);
// ──────────────────────────────────────
// 5. First extraction attempt
// ──────────────────────────────────────
var extractor = new SingleTurnConversation(model)
{
MaximumCompletionTokens = 512,
Grammar = schemaGrammar,
SamplingMode = new RandomSampling { Temperature = 0.1f, Seed = 42 }
};
string messyInput = """
Hi, I'm reaching out regarding our upcoming partnership. My name is
Dr. Sarah Chen-Williams and I lead the AI Research division at NeuroTech
Solutions. You can reach me at s.chen-williams@neurotech-solutions.com
or call my direct line +1 (415) 555-0187. Looking forward to connecting!
""";
string firstAttempt = extractor.Submit(
$"Extract contact information from this text into the exact JSON schema. " +
$"Use empty string for missing fields.\n\nText: {messyInput}");
Console.WriteLine("── First Extraction Attempt ──");
Console.WriteLine(firstAttempt);
var validator = new ExtractionValidator();
var (isValid, errors) = validator.Validate(firstAttempt);
Console.WriteLine($"Valid: {isValid}");
if (!isValid)
foreach (string error in errors)
Console.WriteLine($" Error: {error}");
Step 4: Implement Retry with Corrective Feedback
When validation fails, feed the errors back to the model as corrective context. The model sees what went wrong and fixes it:
// ──────────────────────────────────────
// 6. Self-healing retry loop
// ──────────────────────────────────────
public sealed class SelfHealingExtractor
{
private readonly LM _model;
private readonly Grammar _grammar;
private readonly ExtractionValidator _validator;
private readonly int _maxRetries;
public SelfHealingExtractor(LM model, Grammar grammar, ExtractionValidator validator, int maxRetries = 3)
{
_model = model;
_grammar = grammar;
_validator = validator;
_maxRetries = maxRetries;
}
public (string Json, int Attempts, bool Success) Extract(string input)
{
string basePrompt = $"Extract contact information from this text into the exact JSON schema. " +
$"Use empty string for missing fields.\n\nText: {input}";
// First attempt
var conversation = new SingleTurnConversation(_model)
{
MaximumCompletionTokens = 512,
Grammar = _grammar,
SamplingMode = new RandomSampling { Temperature = 0.1f }
};
string result = conversation.Submit(basePrompt);
var (isValid, errors) = _validator.Validate(result);
if (isValid)
return (result, 1, true);
// Retry with corrective feedback
for (int attempt = 2; attempt <= _maxRetries; attempt++)
{
Console.ForegroundColor = ConsoleColor.Yellow;
Console.WriteLine($" [Retry {attempt}/{_maxRetries}] Fixing: {string.Join("; ", errors)}");
Console.ResetColor();
string correctionPrompt =
$"{basePrompt}\n\n" +
$"IMPORTANT: Your previous extraction had these errors:\n" +
string.Join("\n", errors.Select(e => $"- {e}")) +
$"\n\nPrevious (incorrect) output:\n{result}\n\n" +
$"Fix these specific errors and extract again.";
// Increase temperature slightly on retries for diversity
var retryConversation = new SingleTurnConversation(_model)
{
MaximumCompletionTokens = 512,
Grammar = _grammar,
SamplingMode = new RandomSampling { Temperature = 0.1f + (attempt * 0.1f) }
};
result = retryConversation.Submit(correctionPrompt);
(isValid, errors) = _validator.Validate(result);
if (isValid)
return (result, attempt, true);
}
return (result, _maxRetries, false);
}
}
// Test the self-healing extractor
var healer = new SelfHealingExtractor(model, schemaGrammar, validator, maxRetries: 3);
string[] testInputs = {
"Contact: Jane Doe, jane@example.com, Acme Corp, Product Manager",
"Bob from sales called. His number is 555-1234. He works at GlobalTech as VP of Sales.",
"Meeting notes: discussed partnership with Maria Garcia (maria.g@innovate.io), CTO at Innovate Labs, AI department",
"ref #4421 - customer inquiry from unknown sender about pricing"
};
Console.WriteLine("\n── Self-Healing Batch Extraction ──\n");
foreach (string input in testInputs)
{
Console.WriteLine($"Input: {input[..Math.Min(70, input.Length)]}...");
var (json, attempts, success) = healer.Extract(input);
Console.ForegroundColor = success ? ConsoleColor.Green : ConsoleColor.Red;
Console.WriteLine($" Status: {(success ? "SUCCESS" : "FAILED")} ({attempts} attempt(s))");
Console.ResetColor();
Console.WriteLine($" Result: {json}\n");
}
Step 5: Add Agent-Level Fallback with FallbackAgentExecutor
For cases where the primary extraction model fails entirely, fall back to an alternative agent with a different strategy:
// ──────────────────────────────────────
// 7. Primary agent: structured extraction with grammar
// ──────────────────────────────────────
Agent primaryExtractor = Agent.CreateBuilder(model)
.WithInstruction(
"You are a precise data extraction agent. Extract contact information " +
"from the input text into the required JSON format. Use empty strings " +
"for missing fields. Never invent or hallucinate data.")
.WithGrammar(schemaGrammar)
.WithMaxIterations(1)
.Build();
// ──────────────────────────────────────
// 8. Fallback agent: more flexible extraction
// ──────────────────────────────────────
Agent fallbackExtractor = Agent.CreateBuilder(model)
.WithInstruction(
"You are a flexible data extraction agent. Extract whatever contact " +
"information you can find from the input text. Return a JSON object " +
"with fields: full_name, email, phone, company, job_title, department. " +
"Use empty strings for fields you cannot determine. Be thorough " +
"and check for implied or indirect mentions.")
.WithGrammar(schemaGrammar)
.WithMaxIterations(2)
.Build();
// ──────────────────────────────────────
// 9. Build fallback chain
// ──────────────────────────────────────
var fallbackPipeline = new FallbackAgentExecutor()
.AddAgent(primaryExtractor)
.AddAgent(fallbackExtractor)
.OnFallback((agent, ex, attempt) =>
{
Console.ForegroundColor = ConsoleColor.Yellow;
Console.WriteLine($" [FALLBACK] Agent failed on attempt {attempt}: {ex.Message}");
Console.WriteLine($" [FALLBACK] Switching to fallback agent...");
Console.ResetColor();
})
.HandleException(ex => ex is InvalidOperationException || ex is TimeoutException);
Console.WriteLine("── Fallback Agent Extraction ──\n");
var fallbackResult = await fallbackPipeline.ExecuteAsync(
"Extract contact info: Meeting with Dr. Alex Rivera, alex@biomedresearch.org, BioMed Research Institute");
Console.WriteLine($"Result: {fallbackResult.Content}");
Console.WriteLine($"Agent used: {fallbackResult.AgentName}");
Step 6: Build a Multi-Stage Extraction Pipeline
Chain multiple extraction and validation stages using PipelineOrchestrator. Each stage refines the previous stage's output:
// ──────────────────────────────────────
// 10. Stage 1: Raw extraction
// ──────────────────────────────────────
Agent rawExtractor = Agent.CreateBuilder(model)
.WithInstruction(
"Extract all contact-related information from the input text. " +
"Return a JSON object with: full_name, email, phone, company, job_title, department. " +
"Include partial information. Use empty strings for truly missing fields.")
.WithGrammar(schemaGrammar)
.WithMaxIterations(1)
.Build();
// ──────────────────────────────────────
// 11. Stage 2: Validation and enrichment
// ──────────────────────────────────────
Agent enrichmentAgent = Agent.CreateBuilder(model)
.WithInstruction(
"You receive a JSON object with extracted contact information. " +
"Review it for completeness and correctness. " +
"Fix obvious errors (e.g., missing @ in emails, incomplete phone numbers). " +
"If job_title is empty but context implies one, infer it. " +
"Return the corrected JSON object in the same format.")
.WithGrammar(schemaGrammar)
.WithMaxIterations(1)
.Build();
// ──────────────────────────────────────
// 12. Stage 3: Quality scoring
// ──────────────────────────────────────
string qualitySchema = """
{
"type": "object",
"properties": {
"data": {
"type": "object",
"properties": {
"full_name": { "type": "string" },
"email": { "type": "string" },
"phone": { "type": "string" },
"company": { "type": "string" },
"job_title": { "type": "string" },
"department": { "type": "string" }
}
},
"quality_score": { "type": "number" },
"completeness": { "type": "number" },
"fields_found": { "type": "integer" },
"confidence": { "type": "string" }
},
"required": ["data", "quality_score", "completeness", "fields_found", "confidence"]
}
""";
Agent qualityScorer = Agent.CreateBuilder(model)
.WithInstruction(
"You receive extracted contact data as JSON. Score it: " +
"quality_score (0.0 to 1.0): overall extraction quality. " +
"completeness (0.0 to 1.0): fraction of fields that are populated. " +
"fields_found: count of non-empty fields. " +
"confidence: 'high', 'medium', or 'low'. " +
"Wrap the original data in a 'data' field and add the scoring fields.")
.WithGrammar(Grammar.CreateJsonGrammarFromJsonSchema(qualitySchema))
.WithMaxIterations(1)
.Build();
// ──────────────────────────────────────
// 13. Assemble the pipeline
// ──────────────────────────────────────
var pipeline = new PipelineOrchestrator()
.AddStage("extract", rawExtractor)
.AddStage("enrich", enrichmentAgent)
.AddStage("score", qualityScorer);
Console.WriteLine("\n── Multi-Stage Extraction Pipeline ──\n");
string[] documents = {
"From: Dr. Elena Vasquez <elena.v@quantum-labs.io>\nSubject: Research Collaboration\n" +
"Hi, I'm the Director of Quantum Computing at Quantum Labs. Let's schedule a call this week. " +
"My direct line is +44 20 7946 0958.",
"Notes from the trade show: Spoke with someone from TechCorp about their new API. " +
"They mentioned they'd send details via email later.",
"LinkedIn message from James Park, Senior ML Engineer at DataFlow Systems. " +
"Profile shows he's in the Machine Learning department. Email: j.park@dataflow.io"
};
foreach (string doc in documents)
{
Console.WriteLine($"Input: {doc[..Math.Min(80, doc.Length)]}...\n");
var pipelineResult = await pipeline.ExecuteAsync(doc);
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine("Pipeline output:");
Console.ResetColor();
// Pretty-print the JSON
try
{
var jsonElement = JsonDocument.Parse(pipelineResult.Content).RootElement;
string pretty = JsonSerializer.Serialize(jsonElement,
new JsonSerializerOptions { WriteIndented = true });
Console.WriteLine(pretty);
}
catch
{
Console.WriteLine(pipelineResult.Content);
}
Console.WriteLine(new string('─', 60) + "\n");
}
Step 7: Batch Processing with Error Isolation
Process large batches without letting a single failure break the entire run:
// ──────────────────────────────────────
// 14. Resilient batch processing
// ──────────────────────────────────────
public sealed class BatchExtractionResult
{
public string Input { get; init; } = "";
public string? Output { get; init; }
public bool Success { get; init; }
public int Attempts { get; init; }
public string? Error { get; init; }
public TimeSpan Duration { get; init; }
}
var batchResults = new List<BatchExtractionResult>();
var stopwatch = new System.Diagnostics.Stopwatch();
Console.WriteLine("── Resilient Batch Processing ──\n");
foreach (string doc in documents)
{
stopwatch.Restart();
try
{
var result = await fallbackPipeline.ExecuteAsync(doc);
var (isValid, validationErrors) = validator.Validate(result.Content);
batchResults.Add(new BatchExtractionResult
{
Input = doc[..Math.Min(50, doc.Length)],
Output = result.Content,
Success = isValid,
Attempts = 1,
Duration = stopwatch.Elapsed
});
}
catch (Exception ex)
{
batchResults.Add(new BatchExtractionResult
{
Input = doc[..Math.Min(50, doc.Length)],
Success = false,
Error = ex.Message,
Attempts = 1,
Duration = stopwatch.Elapsed
});
}
}
// Summary report
int successCount = batchResults.Count(r => r.Success);
int failCount = batchResults.Count(r => !r.Success);
double avgDuration = batchResults.Average(r => r.Duration.TotalMilliseconds);
Console.ForegroundColor = ConsoleColor.Cyan;
Console.WriteLine("╔══════════════════════════════════════╗");
Console.WriteLine($"║ Batch Processing Report ║");
Console.WriteLine($"║ Total: {batchResults.Count,5} ║");
Console.WriteLine($"║ Success: {successCount,5} ({(double)successCount / batchResults.Count * 100:F1}%) ║");
Console.WriteLine($"║ Failed: {failCount,5} ({(double)failCount / batchResults.Count * 100:F1}%) ║");
Console.WriteLine($"║ Avg time: {avgDuration:F0}ms ║");
Console.WriteLine($"╚══════════════════════════════════════╝");
Console.ResetColor();
foreach (var result in batchResults)
{
Console.ForegroundColor = result.Success ? ConsoleColor.Green : ConsoleColor.Red;
Console.Write(result.Success ? " ✅ " : " ❌ ");
Console.ResetColor();
Console.WriteLine($"{result.Input}... [{result.Duration.TotalMilliseconds:F0}ms]");
}
Common Issues
| Problem | Cause | Fix |
|---|---|---|
| Retry loop always fails | Corrective prompt is too vague | Include the specific validation errors in the retry prompt so the model knows exactly what to fix |
| Pipeline stages produce incompatible output | Stage N's output format doesn't match Stage N+1's expected input | Use grammar constraints on each stage; add InputTransformer to reformat between stages |
| Fallback agent never triggers | Exception type not matched | Use HandleException to specify which exception types trigger fallback |
| Batch processing is too slow | Sequential processing of large batches | Process documents in parallel using Task.WhenAll with a semaphore for VRAM control |
| Quality scores are always 1.0 | Scoring agent is too lenient | Add explicit scoring criteria in the agent instruction with examples of low-quality extractions |
Next Steps
- Extract Structured Data from Unstructured Text: use the built-in extraction API for simpler use cases.
- Build a Classification and Extraction Pipeline: classify documents first, then route to type-specific extractors.
- Enforce Structured Output with Grammar-Constrained Decoding: deep dive into grammar constraints for bulletproof output.
- Build a Resilient Production Agent: add circuit breakers and health checks to your extraction pipeline.
- Route Prompts Across Models with RouterOrchestrator: route different document types to specialized extraction models.