Table of Contents

Build a Self-Healing Extraction Pipeline with Fallbacks

Production data extraction pipelines face a harsh reality: models sometimes produce invalid JSON, miss required fields, return hallucinated data, or fail silently. A pipeline that breaks on the first malformed response is useless at scale. LM-Kit.NET combines grammar-constrained decoding, JSON schema validation, FallbackAgentExecutor for automatic recovery, and PipelineOrchestrator for multi-stage processing to build extraction pipelines that detect failures and heal themselves. This guide builds a complete self-healing pipeline that extracts structured data from messy real-world text, validates every output, retries with corrective feedback on failure, and falls back to alternative strategies when the primary approach fails.


Why Self-Healing Pipelines Matter

Two production problems that self-healing pipelines solve:

  1. Silent data corruption in batch processing. When processing thousands of documents overnight, a single malformed extraction can propagate through your entire data warehouse. Self-healing pipelines validate every output against the expected schema and automatically retry failed extractions before any corrupt data reaches downstream systems.
  2. Resilience to model variability. The same model can produce different quality outputs depending on input length, language, domain complexity, and even token sampling randomness. A self-healing pipeline adapts to these variations by detecting quality drops in real time and switching strategies: retrying with corrective prompts, falling back to a different model, or escalating to a more capable extraction method.

Prerequisites

Requirement Minimum
.NET SDK 8.0+
VRAM 4+ GB
Disk ~3 GB free for model download

Step 1: Create the Project

dotnet new console -n SelfHealingPipeline
cd SelfHealingPipeline
dotnet add package LM-Kit.NET

Step 2: Define the Extraction Schema and Validation

Start by defining the target data structure and a validation layer that catches every possible extraction failure:

using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
using LMKit.Model;
using LMKit.TextGeneration;
using LMKit.TextGeneration.Sampling;
using LMKit.Agents;
using LMKit.Agents.Orchestration;
using LMKit.Agents.Resilience;

LMKit.Licensing.LicenseManager.SetLicenseKey("");

Console.InputEncoding = Encoding.UTF8;
Console.OutputEncoding = Encoding.UTF8;

// ──────────────────────────────────────
// 1. Define the extraction target schema
// ──────────────────────────────────────
public record ContactInfo(
    [property: JsonPropertyName("full_name")] string FullName,
    [property: JsonPropertyName("email")] string Email,
    [property: JsonPropertyName("phone")] string Phone,
    [property: JsonPropertyName("company")] string Company,
    [property: JsonPropertyName("job_title")] string JobTitle,
    [property: JsonPropertyName("department")] string Department
);

// ──────────────────────────────────────
// 2. Validation logic
// ──────────────────────────────────────
public sealed class ExtractionValidator
{
    public (bool IsValid, List<string> Errors) Validate(string json)
    {
        var errors = new List<string>();

        // Step 1: Is it valid JSON?
        JsonElement root;
        try
        {
            root = JsonDocument.Parse(json).RootElement;
        }
        catch (JsonException ex)
        {
            errors.Add($"Invalid JSON: {ex.Message}");
            return (false, errors);
        }

        // Step 2: Are required fields present?
        string[] requiredFields = { "full_name", "email", "company" };
        foreach (string field in requiredFields)
        {
            if (!root.TryGetProperty(field, out JsonElement value) ||
                value.ValueKind == JsonValueKind.Null ||
                string.IsNullOrWhiteSpace(value.GetString()))
            {
                errors.Add($"Missing or empty required field: '{field}'");
            }
        }

        // Step 3: Format validation
        if (root.TryGetProperty("email", out JsonElement emailEl))
        {
            string? email = emailEl.GetString();
            if (email != null && !email.Contains('@'))
                errors.Add($"Invalid email format: '{email}'");
        }

        if (root.TryGetProperty("phone", out JsonElement phoneEl))
        {
            string? phone = phoneEl.GetString();
            if (phone != null && phone.Length > 0 && phone.Length < 7)
                errors.Add($"Phone number too short: '{phone}'");
        }

        return (errors.Count == 0, errors);
    }
}

Step 3: Build the Grammar-Constrained Extractor

Use a JSON schema grammar to ensure structurally valid output on every attempt:

// ──────────────────────────────────────
// 3. Load model
// ──────────────────────────────────────
Console.WriteLine("Loading model...");
using LM model = LM.LoadFromModelID("qwen3:4b",
    loadingProgress: p => { Console.Write($"\rLoading: {p * 100:F0}%   "); return true; });
Console.WriteLine("\n");

// ──────────────────────────────────────
// 4. Define the JSON schema grammar
// ──────────────────────────────────────
string contactSchema = """
{
    "type": "object",
    "properties": {
        "full_name": { "type": "string" },
        "email": { "type": "string" },
        "phone": { "type": "string" },
        "company": { "type": "string" },
        "job_title": { "type": "string" },
        "department": { "type": "string" }
    },
    "required": ["full_name", "email", "company", "job_title"]
}
""";

var schemaGrammar = Grammar.CreateJsonGrammarFromJsonSchema(contactSchema);

// ──────────────────────────────────────
// 5. First extraction attempt
// ──────────────────────────────────────
var extractor = new SingleTurnConversation(model)
{
    MaximumCompletionTokens = 512,
    Grammar = schemaGrammar,
    SamplingMode = new RandomSampling { Temperature = 0.1f, Seed = 42 }
};

string messyInput = """
    Hi, I'm reaching out regarding our upcoming partnership. My name is
    Dr. Sarah Chen-Williams and I lead the AI Research division at NeuroTech
    Solutions. You can reach me at s.chen-williams@neurotech-solutions.com
    or call my direct line +1 (415) 555-0187. Looking forward to connecting!
    """;

string firstAttempt = extractor.Submit(
    $"Extract contact information from this text into the exact JSON schema. " +
    $"Use empty string for missing fields.\n\nText: {messyInput}");

Console.WriteLine("── First Extraction Attempt ──");
Console.WriteLine(firstAttempt);

var validator = new ExtractionValidator();
var (isValid, errors) = validator.Validate(firstAttempt);
Console.WriteLine($"Valid: {isValid}");
if (!isValid)
    foreach (string error in errors)
        Console.WriteLine($"  Error: {error}");

Step 4: Implement Retry with Corrective Feedback

When validation fails, feed the errors back to the model as corrective context. The model sees what went wrong and fixes it:

// ──────────────────────────────────────
// 6. Self-healing retry loop
// ──────────────────────────────────────
public sealed class SelfHealingExtractor
{
    private readonly LM _model;
    private readonly Grammar _grammar;
    private readonly ExtractionValidator _validator;
    private readonly int _maxRetries;

    public SelfHealingExtractor(LM model, Grammar grammar, ExtractionValidator validator, int maxRetries = 3)
    {
        _model = model;
        _grammar = grammar;
        _validator = validator;
        _maxRetries = maxRetries;
    }

    public (string Json, int Attempts, bool Success) Extract(string input)
    {
        string basePrompt = $"Extract contact information from this text into the exact JSON schema. " +
                           $"Use empty string for missing fields.\n\nText: {input}";

        // First attempt
        var conversation = new SingleTurnConversation(_model)
        {
            MaximumCompletionTokens = 512,
            Grammar = _grammar,
            SamplingMode = new RandomSampling { Temperature = 0.1f }
        };

        string result = conversation.Submit(basePrompt);
        var (isValid, errors) = _validator.Validate(result);

        if (isValid)
            return (result, 1, true);

        // Retry with corrective feedback
        for (int attempt = 2; attempt <= _maxRetries; attempt++)
        {
            Console.ForegroundColor = ConsoleColor.Yellow;
            Console.WriteLine($"  [Retry {attempt}/{_maxRetries}] Fixing: {string.Join("; ", errors)}");
            Console.ResetColor();

            string correctionPrompt =
                $"{basePrompt}\n\n" +
                $"IMPORTANT: Your previous extraction had these errors:\n" +
                string.Join("\n", errors.Select(e => $"- {e}")) +
                $"\n\nPrevious (incorrect) output:\n{result}\n\n" +
                $"Fix these specific errors and extract again.";

            // Increase temperature slightly on retries for diversity
            var retryConversation = new SingleTurnConversation(_model)
            {
                MaximumCompletionTokens = 512,
                Grammar = _grammar,
                SamplingMode = new RandomSampling { Temperature = 0.1f + (attempt * 0.1f) }
            };

            result = retryConversation.Submit(correctionPrompt);
            (isValid, errors) = _validator.Validate(result);

            if (isValid)
                return (result, attempt, true);
        }

        return (result, _maxRetries, false);
    }
}

// Test the self-healing extractor
var healer = new SelfHealingExtractor(model, schemaGrammar, validator, maxRetries: 3);

string[] testInputs = {
    "Contact: Jane Doe, jane@example.com, Acme Corp, Product Manager",
    "Bob from sales called. His number is 555-1234. He works at GlobalTech as VP of Sales.",
    "Meeting notes: discussed partnership with Maria Garcia (maria.g@innovate.io), CTO at Innovate Labs, AI department",
    "ref #4421 - customer inquiry from unknown sender about pricing"
};

Console.WriteLine("\n── Self-Healing Batch Extraction ──\n");
foreach (string input in testInputs)
{
    Console.WriteLine($"Input: {input[..Math.Min(70, input.Length)]}...");
    var (json, attempts, success) = healer.Extract(input);

    Console.ForegroundColor = success ? ConsoleColor.Green : ConsoleColor.Red;
    Console.WriteLine($"  Status: {(success ? "SUCCESS" : "FAILED")} ({attempts} attempt(s))");
    Console.ResetColor();
    Console.WriteLine($"  Result: {json}\n");
}

Step 5: Add Agent-Level Fallback with FallbackAgentExecutor

For cases where the primary extraction model fails entirely, fall back to an alternative agent with a different strategy:

// ──────────────────────────────────────
// 7. Primary agent: structured extraction with grammar
// ──────────────────────────────────────
Agent primaryExtractor = Agent.CreateBuilder(model)
    .WithInstruction(
        "You are a precise data extraction agent. Extract contact information " +
        "from the input text into the required JSON format. Use empty strings " +
        "for missing fields. Never invent or hallucinate data.")
    .WithGrammar(schemaGrammar)
    .WithMaxIterations(1)
    .Build();

// ──────────────────────────────────────
// 8. Fallback agent: more flexible extraction
// ──────────────────────────────────────
Agent fallbackExtractor = Agent.CreateBuilder(model)
    .WithInstruction(
        "You are a flexible data extraction agent. Extract whatever contact " +
        "information you can find from the input text. Return a JSON object " +
        "with fields: full_name, email, phone, company, job_title, department. " +
        "Use empty strings for fields you cannot determine. Be thorough " +
        "and check for implied or indirect mentions.")
    .WithGrammar(schemaGrammar)
    .WithMaxIterations(2)
    .Build();

// ──────────────────────────────────────
// 9. Build fallback chain
// ──────────────────────────────────────
var fallbackPipeline = new FallbackAgentExecutor()
    .AddAgent(primaryExtractor)
    .AddAgent(fallbackExtractor)
    .OnFallback((agent, ex, attempt) =>
    {
        Console.ForegroundColor = ConsoleColor.Yellow;
        Console.WriteLine($"  [FALLBACK] Agent failed on attempt {attempt}: {ex.Message}");
        Console.WriteLine($"  [FALLBACK] Switching to fallback agent...");
        Console.ResetColor();
    })
    .HandleException(ex => ex is InvalidOperationException || ex is TimeoutException);

Console.WriteLine("── Fallback Agent Extraction ──\n");
var fallbackResult = await fallbackPipeline.ExecuteAsync(
    "Extract contact info: Meeting with Dr. Alex Rivera, alex@biomedresearch.org, BioMed Research Institute");

Console.WriteLine($"Result: {fallbackResult.Content}");
Console.WriteLine($"Agent used: {fallbackResult.AgentName}");

Step 6: Build a Multi-Stage Extraction Pipeline

Chain multiple extraction and validation stages using PipelineOrchestrator. Each stage refines the previous stage's output:

// ──────────────────────────────────────
// 10. Stage 1: Raw extraction
// ──────────────────────────────────────
Agent rawExtractor = Agent.CreateBuilder(model)
    .WithInstruction(
        "Extract all contact-related information from the input text. " +
        "Return a JSON object with: full_name, email, phone, company, job_title, department. " +
        "Include partial information. Use empty strings for truly missing fields.")
    .WithGrammar(schemaGrammar)
    .WithMaxIterations(1)
    .Build();

// ──────────────────────────────────────
// 11. Stage 2: Validation and enrichment
// ──────────────────────────────────────
Agent enrichmentAgent = Agent.CreateBuilder(model)
    .WithInstruction(
        "You receive a JSON object with extracted contact information. " +
        "Review it for completeness and correctness. " +
        "Fix obvious errors (e.g., missing @ in emails, incomplete phone numbers). " +
        "If job_title is empty but context implies one, infer it. " +
        "Return the corrected JSON object in the same format.")
    .WithGrammar(schemaGrammar)
    .WithMaxIterations(1)
    .Build();

// ──────────────────────────────────────
// 12. Stage 3: Quality scoring
// ──────────────────────────────────────
string qualitySchema = """
{
    "type": "object",
    "properties": {
        "data": {
            "type": "object",
            "properties": {
                "full_name": { "type": "string" },
                "email": { "type": "string" },
                "phone": { "type": "string" },
                "company": { "type": "string" },
                "job_title": { "type": "string" },
                "department": { "type": "string" }
            }
        },
        "quality_score": { "type": "number" },
        "completeness": { "type": "number" },
        "fields_found": { "type": "integer" },
        "confidence": { "type": "string" }
    },
    "required": ["data", "quality_score", "completeness", "fields_found", "confidence"]
}
""";

Agent qualityScorer = Agent.CreateBuilder(model)
    .WithInstruction(
        "You receive extracted contact data as JSON. Score it: " +
        "quality_score (0.0 to 1.0): overall extraction quality. " +
        "completeness (0.0 to 1.0): fraction of fields that are populated. " +
        "fields_found: count of non-empty fields. " +
        "confidence: 'high', 'medium', or 'low'. " +
        "Wrap the original data in a 'data' field and add the scoring fields.")
    .WithGrammar(Grammar.CreateJsonGrammarFromJsonSchema(qualitySchema))
    .WithMaxIterations(1)
    .Build();

// ──────────────────────────────────────
// 13. Assemble the pipeline
// ──────────────────────────────────────
var pipeline = new PipelineOrchestrator()
    .AddStage("extract", rawExtractor)
    .AddStage("enrich", enrichmentAgent)
    .AddStage("score", qualityScorer);

Console.WriteLine("\n── Multi-Stage Extraction Pipeline ──\n");

string[] documents = {
    "From: Dr. Elena Vasquez <elena.v@quantum-labs.io>\nSubject: Research Collaboration\n" +
    "Hi, I'm the Director of Quantum Computing at Quantum Labs. Let's schedule a call this week. " +
    "My direct line is +44 20 7946 0958.",

    "Notes from the trade show: Spoke with someone from TechCorp about their new API. " +
    "They mentioned they'd send details via email later.",

    "LinkedIn message from James Park, Senior ML Engineer at DataFlow Systems. " +
    "Profile shows he's in the Machine Learning department. Email: j.park@dataflow.io"
};

foreach (string doc in documents)
{
    Console.WriteLine($"Input: {doc[..Math.Min(80, doc.Length)]}...\n");

    var pipelineResult = await pipeline.ExecuteAsync(doc);

    Console.ForegroundColor = ConsoleColor.Green;
    Console.WriteLine("Pipeline output:");
    Console.ResetColor();

    // Pretty-print the JSON
    try
    {
        var jsonElement = JsonDocument.Parse(pipelineResult.Content).RootElement;
        string pretty = JsonSerializer.Serialize(jsonElement,
            new JsonSerializerOptions { WriteIndented = true });
        Console.WriteLine(pretty);
    }
    catch
    {
        Console.WriteLine(pipelineResult.Content);
    }

    Console.WriteLine(new string('─', 60) + "\n");
}

Step 7: Batch Processing with Error Isolation

Process large batches without letting a single failure break the entire run:

// ──────────────────────────────────────
// 14. Resilient batch processing
// ──────────────────────────────────────
public sealed class BatchExtractionResult
{
    public string Input { get; init; } = "";
    public string? Output { get; init; }
    public bool Success { get; init; }
    public int Attempts { get; init; }
    public string? Error { get; init; }
    public TimeSpan Duration { get; init; }
}

var batchResults = new List<BatchExtractionResult>();
var stopwatch = new System.Diagnostics.Stopwatch();

Console.WriteLine("── Resilient Batch Processing ──\n");

foreach (string doc in documents)
{
    stopwatch.Restart();

    try
    {
        var result = await fallbackPipeline.ExecuteAsync(doc);
        var (isValid, validationErrors) = validator.Validate(result.Content);

        batchResults.Add(new BatchExtractionResult
        {
            Input = doc[..Math.Min(50, doc.Length)],
            Output = result.Content,
            Success = isValid,
            Attempts = 1,
            Duration = stopwatch.Elapsed
        });
    }
    catch (Exception ex)
    {
        batchResults.Add(new BatchExtractionResult
        {
            Input = doc[..Math.Min(50, doc.Length)],
            Success = false,
            Error = ex.Message,
            Attempts = 1,
            Duration = stopwatch.Elapsed
        });
    }
}

// Summary report
int successCount = batchResults.Count(r => r.Success);
int failCount = batchResults.Count(r => !r.Success);
double avgDuration = batchResults.Average(r => r.Duration.TotalMilliseconds);

Console.ForegroundColor = ConsoleColor.Cyan;
Console.WriteLine("╔══════════════════════════════════════╗");
Console.WriteLine($"║  Batch Processing Report             ║");
Console.WriteLine($"║  Total: {batchResults.Count,5}                        ║");
Console.WriteLine($"║  Success: {successCount,5}  ({(double)successCount / batchResults.Count * 100:F1}%)            ║");
Console.WriteLine($"║  Failed: {failCount,5}  ({(double)failCount / batchResults.Count * 100:F1}%)             ║");
Console.WriteLine($"║  Avg time: {avgDuration:F0}ms                    ║");
Console.WriteLine($"╚══════════════════════════════════════╝");
Console.ResetColor();

foreach (var result in batchResults)
{
    Console.ForegroundColor = result.Success ? ConsoleColor.Green : ConsoleColor.Red;
    Console.Write(result.Success ? "  ✅ " : "  ❌ ");
    Console.ResetColor();
    Console.WriteLine($"{result.Input}... [{result.Duration.TotalMilliseconds:F0}ms]");
}

Common Issues

Problem Cause Fix
Retry loop always fails Corrective prompt is too vague Include the specific validation errors in the retry prompt so the model knows exactly what to fix
Pipeline stages produce incompatible output Stage N's output format doesn't match Stage N+1's expected input Use grammar constraints on each stage; add InputTransformer to reformat between stages
Fallback agent never triggers Exception type not matched Use HandleException to specify which exception types trigger fallback
Batch processing is too slow Sequential processing of large batches Process documents in parallel using Task.WhenAll with a semaphore for VRAM control
Quality scores are always 1.0 Scoring agent is too lenient Add explicit scoring criteria in the agent instruction with examples of low-quality extractions

Next Steps