Skip to content

Creating Plugins

Kreuzberg's plugin system allows you to extend functionality by creating custom extractors, post-processors, OCR backends, and validators. Plugins can be written in Rust or Python.

WASM Support

The WebAssembly bindings use pre-compiled Rust core with tesseract-wasm for OCR. Custom plugins are not supported in WASM environments. For custom plugins, use Python, Rust, or other native language bindings.

Plugin Types

Kreuzberg supports four types of plugins:

Plugin Type Purpose Use Cases
DocumentExtractor Extract content from file formats Add support for new formats, override built-in extractors
PostProcessor Transform extraction results Add metadata, enrich content, apply custom processing
OcrBackend Perform OCR on images Integrate cloud OCR services, custom OCR engines
Validator Validate extraction quality Enforce minimum quality, check completeness

Plugin Architecture

All plugins must implement the base Plugin trait and a type-specific trait. Plugins are:

  • Thread-safe: All plugins must be Send + Sync (Rust) or thread-safe (Python)
  • Lifecycle-managed: Plugins have initialize() and shutdown() methods
  • Registered globally: Use registry functions to register your plugins

Document Extractors

Extract content from custom file formats or override built-in extractors.

Rust Implementation

Rust
use kreuzberg::plugins::{DocumentExtractor, Plugin};
use kreuzberg::{Result, ExtractionResult, ExtractionConfig, Metadata};
use async_trait::async_trait;
use std::path::Path;

struct CustomJsonExtractor;

impl Plugin for CustomJsonExtractor {
    fn name(&self) -> &str { "custom-json-extractor" }
    fn version(&self) -> String { "1.0.0".to_string() }
    fn initialize(&self) -> Result<()> { Ok(()) }
    fn shutdown(&self) -> Result<()> { Ok(()) }
}

#[async_trait]
impl DocumentExtractor for CustomJsonExtractor {
    async fn extract_bytes(
        &self,
        content: &[u8],
        _mime_type: &str,
        _config: &ExtractionConfig,
    ) -> Result<ExtractionResult> {
        let json: serde_json::Value = serde_json::from_slice(content)?;
        let text = extract_text_from_json(&json);

        Ok(ExtractionResult {
            content: text,
            mime_type: "application/json".to_string(),
            metadata: Metadata::default(),
            tables: vec![],
            detected_languages: None,
            chunks: None,
            images: None,
        })
    }

    fn supported_mime_types(&self) -> &[&str] {
        &["application/json", "text/json"]
    }

    fn priority(&self) -> i32 { 50 }
}

fn extract_text_from_json(value: &serde_json::Value) -> String {
    match value {
        serde_json::Value::String(s) => format!("{}\n", s),
        serde_json::Value::Array(arr) => arr.iter().map(extract_text_from_json).collect(),
        serde_json::Value::Object(obj) => obj.values().map(extract_text_from_json).collect(),
        _ => String::new(),
    }
}

Python Implementation

Python
from kreuzberg import register_document_extractor, ExtractionResult
import json

class CustomJsonExtractor:
    def name(self) -> str:
        return "custom-json-extractor"

    def version(self) -> str:
        return "1.0.0"

    def supported_mime_types(self) -> list[str]:
        return ["application/json"]

    def priority(self) -> int:
        return 50

    def extract_bytes(
        self, content: bytes, mime_type: str, config: dict
    ) -> ExtractionResult:
        data: dict = json.loads(content)
        text: str = self._extract_text(data)
        return {"content": text, "mime_type": "application/json"}

    def _extract_text(self, obj: object) -> str:
        if isinstance(obj, str):
            return f"{obj}\n"
        if isinstance(obj, list):
            return "".join(self._extract_text(item) for item in obj)
        if isinstance(obj, dict):
            return "".join(self._extract_text(v) for v in obj.values())
        return ""

    def initialize(self) -> None:
        pass

    def shutdown(self) -> None:
        pass

extractor: CustomJsonExtractor = CustomJsonExtractor()
register_document_extractor(extractor)

Registration

C#
using Kreuzberg;

public class CustomExtractor : IDocumentExtractor
{
    public string Name() => "custom";
    public string Version() => "1.0.0";

    public Dictionary<string, object> ExtractBytes(byte[] data, string mimeType, Dictionary<string, object> config)
    {
        return new Dictionary<string, object>
        {
            { "content", "Extracted content" },
            { "mime_type", mimeType }
        };
    }
}

var extractor = new CustomExtractor();
KreuzbergClient.RegisterDocumentExtractor(extractor);
Console.WriteLine("Extractor registered");
Go
package main

import (
    "log"

    "github.com/kreuzberg-dev/kreuzberg/packages/go/v4"
)

func main() {
    // Register custom extractor with priority 50
    if err := kreuzberg.RegisterDocumentExtractor("custom-json-extractor", 50); err != nil {
        log.Fatalf("register extractor failed: %v", err)
    }

    result, err := kreuzberg.ExtractFileSync("document.json", nil)
    if err != nil {
        log.Fatalf("extract failed: %v", err)
    }
    log.Printf("Extracted content length: %d", len(result.Content))
}
Java
import dev.kreuzberg.Kreuzberg;
import dev.kreuzberg.ExtractionResult;
import dev.kreuzberg.KreuzbergException;
import java.io.IOException;

public class CustomExtractorExample {
    public static void main(String[] args) {
        try {
            ExtractionResult result = Kreuzberg.extractFile("document.json");
            System.out.println("Extracted content length: " + result.getContent().length());
        } catch (IOException | KreuzbergException e) {
            e.printStackTrace();
        }
    }
}
Python
from kreuzberg import register_document_extractor

class CustomExtractor:
    def name(self) -> str:
        return "custom"

    def version(self) -> str:
        return "1.0.0"

extractor = CustomExtractor()
register_document_extractor(extractor)
print("Extractor registered")
Ruby
require 'kreuzberg'

# Register custom extractor with priority 50
Kreuzberg.register_document_extractor(
  name: "custom-json-extractor",
  extractor: ->(content, mime_type, config) {
    JSON.parse(content.to_s)
  },
  priority: 50
)

result = Kreuzberg.extract_file("document.json")
puts "Extracted content length: #{result.content.length}"
Rust
use kreuzberg::plugins::registry::get_document_extractor_registry;
use std::sync::Arc;

fn register_custom_extractor() -> Result<()> {
    let extractor = Arc::new(CustomJsonExtractor);
    let registry = get_document_extractor_registry();
    registry.register(extractor, 50)?;
    Ok(())
}
TypeScript
import {
    listDocumentExtractors,
    unregisterDocumentExtractor,
    clearDocumentExtractors,
} from '@kreuzberg/node';

/**
 * Note: Custom document extractors are not supported in TypeScript v4.0.
 * Document extraction logic lives in the Rust core.
 *
 * You can list, unregister, or clear built-in extractors.
 */

// List all registered document extractors
const extractors = listDocumentExtractors();
console.log("Available extractors:", extractors);

// Unregister a specific extractor (use with caution)
unregisterDocumentExtractor("SomeExtractor");

// Clear all extractors (use with extreme caution)
// clearDocumentExtractors();

Priority System

When multiple extractors support the same MIME type, the highest priority wins:

  • 0-25: Fallback/low-quality extractors
  • 26-49: Alternative implementations
  • 50: Default (built-in extractors)
  • 51-75: Enhanced/premium extractors
  • 76-100: Specialized/high-priority extractors

Post-Processors

Transform and enrich extraction results after initial extraction.

Processing Stages

Post-processors execute in three stages:

  • Early: Run first, use for foundational operations like language detection, quality scoring, or text normalization that other processors may depend on
  • Middle: Run second, use for content transformation like keyword extraction, token reduction, or summarization
  • Late: Run last, use for final enrichment like custom metadata, analytics tracking, or output formatting

Rust Implementation

Rust
use kreuzberg::plugins::{Plugin, PostProcessor, ProcessingStage};
use kreuzberg::{Result, ExtractionResult, ExtractionConfig};
use async_trait::async_trait;

struct WordCountProcessor;

impl Plugin for WordCountProcessor {
    fn name(&self) -> &str { "word-count" }
    fn version(&self) -> String { "1.0.0".to_string() }
    fn initialize(&self) -> Result<()> { Ok(()) }
    fn shutdown(&self) -> Result<()> { Ok(()) }
}

#[async_trait]
impl PostProcessor for WordCountProcessor {
    async fn process(
        &self,
        result: &mut ExtractionResult,
        _config: &ExtractionConfig
    ) -> Result<()> {
        let word_count = result.content.split_whitespace().count();

        result.metadata.additional.insert(
            "word_count".to_string(),
            serde_json::json!(word_count)
        );

        Ok(())
    }

    fn processing_stage(&self) -> ProcessingStage {
        ProcessingStage::Early
    }

    fn should_process(
        &self,
        result: &ExtractionResult,
        _config: &ExtractionConfig
    ) -> bool {
        !result.content.is_empty()
    }
}

Python Implementation

Python
from kreuzberg import register_post_processor, ExtractionResult

class WordCountProcessor:
    def name(self) -> str:
        return "word_count"

    def version(self) -> str:
        return "1.0.0"

    def processing_stage(self) -> str:
        return "early"

    def process(self, result: ExtractionResult) -> ExtractionResult:
        word_count: int = len(result["content"].split())
        result["metadata"]["word_count"] = word_count
        return result

    def should_process(self, result: ExtractionResult) -> bool:
        return bool(result["content"])

    def initialize(self) -> None:
        pass

    def shutdown(self) -> None:
        pass

processor: WordCountProcessor = WordCountProcessor()
register_post_processor(processor)

Conditional Processing

C#
using Kreuzberg;

public class PdfOnlyProcessor : IPostProcessor
{
    public string Name() => "pdf-only-processor";
    public string Version() => "1.0.0";

    public ExtractionResult Process(ExtractionResult result) => result;

    public bool ShouldProcess(ExtractionResult result)
        => result.MimeType == "application/pdf";
}

var processor = new PdfOnlyProcessor();
KreuzbergClient.RegisterPostProcessor(processor);
Go
package main

import (
    "encoding/json"
    "log"
    "unsafe"

    "github.com/kreuzberg-dev/kreuzberg/packages/go/v4"
)

/*
#cgo CFLAGS: -I${SRCDIR}/../../../crates/kreuzberg-ffi
#cgo LDFLAGS: -L${SRCDIR}/../../../target/release -L${SRCDIR}/../../../target/debug -lkreuzberg_ffi
#include "../../../crates/kreuzberg-ffi/kreuzberg.h"
#include <stdlib.h>
*/
import "C"

// pdfOnlyProcessor applies PDF-specific processing logic only to PDF documents
//export pdfOnlyProcessor
func pdfOnlyProcessor(resultJSON *C.char) *C.char {
    jsonStr := C.GoString(resultJSON)
    var result map[string]interface{}

    if err := json.Unmarshal([]byte(jsonStr), &result); err != nil {
        return C.CString("{\"error\":\"Failed to parse result JSON\"}")
    }

    // Check MIME type - only process PDFs
    mimeType, ok := result["mime_type"].(string)
    if !ok || mimeType != "application/pdf" {
        // Return unchanged for non-PDF documents
        outputJSON, err := json.Marshal(result)
        if err != nil {
            return C.CString("{\"error\":\"Failed to serialize result\"}")
        }
        return C.CString(string(outputJSON))
    }

    // Perform PDF-specific processing
    metadata, ok := result["metadata"].(map[string]interface{})
    if !ok {
        metadata = make(map[string]interface{})
    }

    // Example PDF-specific processing:
    // - Extract tables as structured data
    // - Handle PDF-specific formatting
    // - Preserve document hierarchy

    metadata["pdf_specific_processing"] = true
    metadata["processor_type"] = "pdf_only"

    // Check for tables in PDF
    if tablesJSON, ok := result["tables_json"].(string); ok && tablesJSON != "" {
        var tables []interface{}
        if err := json.Unmarshal([]byte(tablesJSON), &tables); err == nil {
            metadata["table_count"] = len(tables)
        }
    }

    result["metadata"] = metadata

    // Serialize back to JSON
    outputJSON, err := json.Marshal(result)
    if err != nil {
        return C.CString("{\"error\":\"Failed to serialize result\"}")
    }

    return C.CString(string(outputJSON))
}

func main() {
    // Register the post-processor with priority 70
    if err := kreuzberg.RegisterPostProcessor("pdf_only_processor", 70,
        (C.PostProcessorCallback)(C.pdfOnlyProcessor)); err != nil {
        log.Fatalf("failed to register post-processor: %v", err)
    }
    defer func() {
        if err := kreuzberg.UnregisterPostProcessor("pdf_only_processor"); err != nil {
            log.Printf("warning: failed to unregister post-processor: %v", err)
        }
    }()

    // Process multiple documents - processor will only affect PDFs
    files := []string{
        "document.pdf",
        "image.jpg",
        "spreadsheet.xlsx",
    }

    for _, file := range files {
        result, err := kreuzberg.ExtractFileSync(file, nil)
        if err != nil {
            log.Printf("Warning: extraction failed for %s: %v", file, err)
            continue
        }

        // Parse metadata to check if PDF processing occurred
        var metadata map[string]interface{}
        if metaJSON, ok := result.MetadataJSON.(string); ok {
            if err := json.Unmarshal([]byte(metaJSON), &metadata); err == nil {
                if pdfProcessing, ok := metadata["pdf_specific_processing"].(bool); ok && pdfProcessing {
                    log.Printf("PDF-specific processing applied to: %s", file)
                    if tableCount, ok := metadata["table_count"].(float64); ok {
                        log.Printf("  Tables found: %.0f", tableCount)
                    }
                } else {
                    log.Printf("Skipped PDF processor for: %s (MIME: %s)", file, result.MimeType)
                }
            }
        }
    }
}
Java
import dev.kreuzberg.PostProcessor;
import java.util.HashMap;
import java.util.Map;

PostProcessor pdfOnly = result -> {
    if (!result.getMimeType().equals("application/pdf")) {
        return result;
    }

    Map<String, Object> metadata = new HashMap<>(result.getMetadata());
    metadata.put("pdf_processed", true);

    return result;
};
Python
from kreuzberg import ExtractionResult, register_post_processor

class PdfOnlyProcessor:
    def name(self) -> str:
        return "pdf-only-processor"

    def version(self) -> str:
        return "1.0.0"

    def process(self, result: ExtractionResult) -> ExtractionResult:
        return result

    def should_process(self, result: ExtractionResult) -> bool:
        return result["mime_type"] == "application/pdf"

processor: PdfOnlyProcessor = PdfOnlyProcessor()
register_post_processor(processor)
Rust
impl PostProcessor for PdfOnlyProcessor {
    async fn process(
        &self,
        result: &mut ExtractionResult,
        _config: &ExtractionConfig
    ) -> Result<()> {
        Ok(())
    }

    fn processing_stage(&self) -> ProcessingStage {
        ProcessingStage::Middle
    }

    fn should_process(
        &self,
        result: &ExtractionResult,
        _config: &ExtractionConfig
    ) -> bool {
        result.mime_type == "application/pdf"
    }
}

OCR Backends

Integrate custom OCR engines or cloud services.

Rust Implementation

Rust
use kreuzberg::plugins::{Plugin, OcrBackend, OcrBackendType};
use kreuzberg::{Result, ExtractionResult, OcrConfig, Metadata};
use async_trait::async_trait;
use std::path::Path;

struct CloudOcrBackend {
    api_key: String,
    supported_langs: Vec<String>,
}

impl Plugin for CloudOcrBackend {
    fn name(&self) -> &str { "cloud-ocr" }
    fn version(&self) -> String { "1.0.0".to_string() }
    fn initialize(&self) -> Result<()> { Ok(()) }
    fn shutdown(&self) -> Result<()> { Ok(()) }
}

#[async_trait]
impl OcrBackend for CloudOcrBackend {
    async fn process_image(
        &self,
        image_bytes: &[u8],
        config: &OcrConfig,
    ) -> Result<ExtractionResult> {
        let text = self.call_cloud_api(image_bytes, &config.language).await?;

        Ok(ExtractionResult {
            content: text,
            mime_type: "text/plain".to_string(),
            metadata: Metadata::default(),
            tables: vec![],
            detected_languages: None,
            chunks: None,
            images: None,
        })
    }

    fn supports_language(&self, lang: &str) -> bool {
        self.supported_langs.iter().any(|l| l == lang)
    }

    fn backend_type(&self) -> OcrBackendType {
        OcrBackendType::Custom
    }

    fn supported_languages(&self) -> Vec<String> {
        self.supported_langs.clone()
    }
}

impl CloudOcrBackend {
    async fn call_cloud_api(
        &self,
        image: &[u8],
        language: &str
    ) -> Result<String> {
        Ok("Extracted text".to_string())
    }
}

Python Implementation

C#
using Kreuzberg;
using System.Net.Http;
using System.Text.Json;

public class CloudOcrBackend : IOcrBackend
{
    private readonly string _apiKey;
    private readonly List<string> _langs = new() { "eng", "deu", "fra" };

    public CloudOcrBackend(string apiKey)
    {
        _apiKey = apiKey;
    }

    public string Name() => "cloud-ocr";
    public string Version() => "1.0.0";
    public List<string> SupportedLanguages() => _langs;

    public Dictionary<string, object> ProcessImage(byte[] imageBytes, Dictionary<string, object> config)
    {
        using (var client = new HttpClient())
        {
            using (var form = new MultipartFormDataContent())
            {
                form.Add(new ByteArrayContent(imageBytes), "image");
                var lang = config.ContainsKey("language") ? config["language"].ToString() : "eng";
                form.Add(new StringContent(lang), "language");

                var response = client.PostAsync("https://api.example.com/ocr", form).Result;
                var json = response.Content.ReadAsStringAsync().Result;
                var doc = JsonDocument.Parse(json);
                var text = doc.RootElement.GetProperty("text").GetString();

                return new Dictionary<string, object>
                {
                    { "content", text },
                    { "mime_type", "text/plain" }
                };
            }
        }
    }

    public void Initialize() { }
    public void Shutdown() { }
}

var backend = new CloudOcrBackend(apiKey: "your-api-key");
KreuzbergClient.RegisterOcrBackend(backend);
Java
import dev.kreuzberg.*;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.net.http.*;
import java.net.URI;

public class CloudOcrExample {
    public static void main(String[] args) {
        Arena callbackArena = Arena.ofAuto();
        String apiKey = "your-api-key";

        OcrBackend cloudOcr = (imageBytes, imageLength, configJson) -> {
            try {
                // Read image bytes from native memory
                byte[] image = imageBytes.reinterpret(imageLength)
                    .toArray(ValueLayout.JAVA_BYTE);

                // Read config JSON
                String config = configJson.reinterpret(Long.MAX_VALUE)
                    .getString(0);

                // Call cloud OCR API
                HttpClient client = HttpClient.newHttpClient();
                HttpRequest request = HttpRequest.newBuilder()
                    .uri(URI.create("https://api.example.com/ocr"))
                    .header("Authorization", "Bearer " + apiKey)
                    .POST(HttpRequest.BodyPublishers.ofByteArray(image))
                    .build();

                HttpResponse<String> response = client.send(request,
                    HttpResponse.BodyHandlers.ofString());

                String text = parseTextFromResponse(response.body());

                // Return result as C string
                return callbackArena.allocateFrom(text);
            } catch (Exception e) {
                return MemorySegment.NULL;
            }
        };

        try (Arena arena = Arena.ofConfined()) {
            Kreuzberg.registerOcrBackend("cloud-ocr", cloudOcr, arena);

            // Use custom OCR backend in extraction
            // Note: Requires ExtractionConfig with OCR enabled
            ExtractionResult result = Kreuzberg.extractFileSync("scanned.pdf");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static String parseTextFromResponse(String json) {
        // Parse JSON response and extract text field
        return json; // Simplified
    }
}
Python
from kreuzberg import register_ocr_backend
import httpx

class CloudOcrBackend:
    def __init__(self, api_key: str):
        self.api_key: str = api_key
        self.langs: list[str] = ["eng", "deu", "fra"]

    def name(self) -> str:
        return "cloud-ocr"

    def version(self) -> str:
        return "1.0.0"

    def supported_languages(self) -> list[str]:
        return self.langs

    def process_image(self, image_bytes: bytes, config: dict) -> dict:
        with httpx.Client() as client:
            response = client.post(
                "https://api.example.com/ocr",
                files={"image": image_bytes},
                json={"language": config.get("language", "eng")},
            )
            text: str = response.json()["text"]
            return {"content": text, "mime_type": "text/plain"}

    def initialize(self) -> None:
        pass

    def shutdown(self) -> None:
        pass

backend: CloudOcrBackend = CloudOcrBackend(api_key="your-api-key")
register_ocr_backend(backend)
Ruby
require 'kreuzberg'
require 'net/http'

class CloudOcrBackend
  def name
    'cloud-ocr'
  end

  def supported_languages
    %w[eng fra deu]
  end

  def process_image(image_data, language)
    uri = URI('https://api.example.com/ocr')
    req = Net::HTTP::Post.new(uri)
    req['Authorization'] = "Bearer #{ENV['OCR_API_KEY']}"
    req.body = image_data
    res = Net::HTTP.start(uri.hostname, uri.port, use_ssl: true) { |h| h.request(req) }
    raise Kreuzberg::Errors::OCRError, res.message unless res.is_a?(Net::HTTPSuccess)
    { content: JSON.parse(res.body)['text'] }
  rescue StandardError => e
    raise Kreuzberg::Errors::OCRError, e.message
  end
end

Kreuzberg.register_ocr_backend(CloudOcrBackend.new)
config = Kreuzberg::Config::Extraction.new(
  ocr: Kreuzberg::Config::OCR.new(backend: 'cloud-ocr')
)
Kreuzberg.extract_file_sync('doc.pdf', config: config)

Validators

Enforce quality requirements on extraction results.

Validators are Fatal

Validation errors cause extraction to fail. Use validators for critical quality checks only.

Rust Implementation

Rust
use kreuzberg::plugins::{Plugin, Validator};
use kreuzberg::{Result, ExtractionResult, ExtractionConfig, KreuzbergError};
use async_trait::async_trait;

struct MinLengthValidator {
    min_length: usize,
}

impl Plugin for MinLengthValidator {
    fn name(&self) -> &str { "min-length-validator" }
    fn version(&self) -> String { "1.0.0".to_string() }
    fn initialize(&self) -> Result<()> { Ok(()) }
    fn shutdown(&self) -> Result<()> { Ok(()) }
}

#[async_trait]
impl Validator for MinLengthValidator {
    async fn validate(
        &self,
        result: &ExtractionResult,
        _config: &ExtractionConfig,
    ) -> Result<()> {
        if result.content.len() < self.min_length {
            return Err(KreuzbergError::validation(format!(
                "Content too short: {} < {} characters",
                result.content.len(),
                self.min_length
            )));
        }
        Ok(())
    }

    fn priority(&self) -> i32 {
        100
    }
}

Python Implementation

C#
using Kreuzberg;

public class MinLengthValidator : IValidator
{
    private readonly int _minLength;

    public MinLengthValidator(int minLength = 100)
    {
        _minLength = minLength;
    }

    public string Name() => "min_length_validator";
    public string Version() => "1.0.0";
    public int Priority() => 100;

    public void Validate(Dictionary<string, object> result)
    {
        var contentLength = result["content"].ToString()?.Length ?? 0;
        if (contentLength < _minLength)
            throw new ValidationError($"Content too short: {contentLength}");
    }

    public bool ShouldValidate(Dictionary<string, object> result) => true;
    public void Initialize() { }
    public void Shutdown() { }
}

var validator = new MinLengthValidator(minLength: 100);
KreuzbergClient.RegisterValidator(validator);
Java
import dev.kreuzberg.Kreuzberg;
import dev.kreuzberg.ExtractionResult;
import dev.kreuzberg.Validator;
import dev.kreuzberg.ValidationException;
import dev.kreuzberg.KreuzbergException;
import java.io.IOException;

public class MinLengthValidatorExample {
    public static void main(String[] args) {
        int minLength = 100;

        Validator minLengthValidator = result -> {
            if (result.getContent().length() < minLength) {
                throw new ValidationException(
                    "Content too short: " + result.getContent().length() +
                    " < " + minLength
                );
            }
        };

        try {
            Kreuzberg.registerValidator("min-length", minLengthValidator, 100);

            ExtractionResult result = Kreuzberg.extractFile("document.pdf");
            System.out.println("Validation passed!");
        } catch (ValidationException e) {
            System.err.println("Validation failed: " + e.getMessage());
        } catch (IOException | KreuzbergException e) {
            e.printStackTrace();
        }
    }
}
Python
from kreuzberg import register_validator, ValidationError

class MinLengthValidator:
    def __init__(self, min_length: int = 100):
        self.min_length: int = min_length

    def name(self) -> str:
        return "min_length_validator"

    def version(self) -> str:
        return "1.0.0"

    def priority(self) -> int:
        return 100

    def validate(self, result: dict) -> None:
        content_len: int = len(result["content"])
        if content_len < self.min_length:
            raise ValidationError(f"Content too short: {content_len}")

    def should_validate(self, result: dict) -> bool:
        return True

    def initialize(self) -> None:
        pass

    def shutdown(self) -> None:
        pass

validator: MinLengthValidator = MinLengthValidator(min_length=100)
register_validator(validator)

Quality Score Validator

C#
using Kreuzberg;

public class QualityValidator : IValidator
{
    public string Name() => "quality-validator";
    public string Version() => "1.0.0";

    public void Validate(Dictionary<string, object> result)
    {
        var metadata = (Dictionary<string, object>)result["metadata"];
        var score = metadata.ContainsKey("quality_score")
            ? Convert.ToDouble(metadata["quality_score"])
            : 0.0;

        if (score < 0.5)
            throw new ValidationError($"Quality score too low: {score:F2}");
    }

    public bool ShouldValidate(Dictionary<string, object> result) => true;
    public int Priority() => 100;
    public void Initialize() { }
    public void Shutdown() { }
}

var validator = new QualityValidator();
KreuzbergClient.RegisterValidator(validator);
Java
Validator qualityValidator = result -> {
    double score = result.getMetadata().containsKey("quality_score")
        ? ((Number) result.getMetadata().get("quality_score")).doubleValue()
        : 0.0;

    if (score < 0.5) {
        throw new ValidationException(
            String.format("Quality score too low: %.2f < 0.50", score)
        );
    }
};
Python
from kreuzberg import ValidationError, register_validator

class QualityValidator:
    def name(self) -> str:
        return "quality-validator"

    def version(self) -> str:
        return "1.0.0"

    def validate(self, result: dict) -> None:
        score: float = result["metadata"].get("quality_score", 0.0)
        if score < 0.5:
            raise ValidationError(
                f"Quality score too low: {score:.2f}"
            )

validator: QualityValidator = QualityValidator()
register_validator(validator)
Rust
#[async_trait]
impl Validator for QualityValidator {
    async fn validate(
        &self,
        result: &ExtractionResult,
        _config: &ExtractionConfig,
    ) -> Result<()> {
        let score = result.metadata
            .additional
            .get("quality_score")
            .and_then(|v| v.as_f64())
            .unwrap_or(0.0);

        if score < 0.5 {
            return Err(KreuzbergError::validation(format!(
                "Quality score too low: {:.2} < 0.50",
                score
            )));
        }

        Ok(())
    }
}

Plugin Management

Listing Plugins

C#
using Kreuzberg;

var extractors = KreuzbergClient.ListDocumentExtractors();
var processors = KreuzbergClient.ListPostProcessors();
var ocrBackends = KreuzbergClient.ListOcrBackends();
var validators = KreuzbergClient.ListValidators();

Console.WriteLine($"Extractors: {string.Join(", ", extractors)}");
Console.WriteLine($"Processors: {string.Join(", ", processors)}");
Console.WriteLine($"OCR backends: {string.Join(", ", ocrBackends)}");
Console.WriteLine($"Validators: {string.Join(", ", validators)}");
Java
// Java does not provide plugin listing functionality in v4.0.0
// Plugins are registered and managed through the FFI layer
Python
from kreuzberg import (
    list_document_extractors,
    list_post_processors,
    list_ocr_backends,
    list_validators,
)

extractors: list[str] = list_document_extractors()
processors: list[str] = list_post_processors()
ocr_backends: list[str] = list_ocr_backends()
validators: list[str] = list_validators()

print(f"Extractors: {extractors}")
print(f"Processors: {processors}")
print(f"OCR backends: {ocr_backends}")
print(f"Validators: {validators}")
Rust
use kreuzberg::plugins::registry::*;

let registry = get_document_extractor_registry();
let extractors = registry.list()?;
println!("Registered extractors: {:?}", extractors);

let registry = get_post_processor_registry();
let processors = registry.list()?;
println!("Registered processors: {:?}", processors);

let registry = get_ocr_backend_registry();
let backends = registry.list()?;
println!("Registered OCR backends: {:?}", backends);

let registry = get_validator_registry();
let validators = registry.list()?;
println!("Registered validators: {:?}", validators);

Unregistering Plugins

C#
using Kreuzberg;

var names = new List<string>
{
    "custom-json-extractor",
    "word_count",
    "cloud-ocr",
    "min_length_validator"
};

KreuzbergClient.UnregisterDocumentExtractor(names[0]);
KreuzbergClient.UnregisterPostProcessor(names[1]);
KreuzbergClient.UnregisterOcrBackend(names[2]);
KreuzbergClient.UnregisterValidator(names[3]);
Java
import dev.kreuzberg.Kreuzberg;

try {
    // Unregister specific plugins
    Kreuzberg.unregisterPostProcessor("word-count");
    Kreuzberg.unregisterValidator("min-length");
} catch (KreuzbergException e) {
    System.err.println("Failed to unregister: " + e.getMessage());
}
Python
from kreuzberg import (
    unregister_document_extractor,
    unregister_post_processor,
    unregister_ocr_backend,
    unregister_validator,
)

names: list[str] = [
    "custom-json-extractor",
    "word_count",
    "cloud-ocr",
    "min_length_validator",
]

unregister_document_extractor(names[0])
unregister_post_processor(names[1])
unregister_ocr_backend(names[2])
unregister_validator(names[3])
Rust
use kreuzberg::plugins::registry::get_document_extractor_registry;

let registry = get_document_extractor_registry();
registry.remove("custom-json-extractor")?;

Clearing All Plugins

C#
using Kreuzberg;

KreuzbergClient.ClearPostProcessors();
KreuzbergClient.ClearValidators();
KreuzbergClient.ClearOcrBackends();
KreuzbergClient.ClearDocumentExtractors();

Console.WriteLine("All plugins cleared");
Java
// Java does not provide bulk clearing functionality in v4.0.0
// Unregister plugins individually using unregisterPostProcessor() and unregisterValidator()
Python
from kreuzberg import (
    clear_document_extractors,
    clear_post_processors,
    clear_ocr_backends,
    clear_validators,
)

clear_post_processors()
clear_validators()
clear_ocr_backends()
clear_document_extractors()

print("All plugins cleared")
Rust
use kreuzberg::{clear_document_extractors, clear_post_processors, clear_ocr_backends, clear_validators};

fn main() {
    clear_document_extractors();
    clear_post_processors();
    clear_ocr_backends();
    clear_validators();

    println!("All plugins cleared");
}

Thread Safety

All plugins must be thread-safe:

Rust Thread Safety

Rust
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
use kreuzberg::KreuzbergError;

struct StatefulPlugin {
    call_count: AtomicUsize,
    cache: Mutex<HashMap<String, String>>,
}

impl Plugin for StatefulPlugin {
    fn name(&self) -> &str { "stateful-plugin" }
    fn version(&self) -> String { "1.0.0".to_string() }

    fn initialize(&self) -> Result<()> {
        self.call_count.store(0, Ordering::Release);
        Ok(())
    }

    fn shutdown(&self) -> Result<()> {
        let count = self.call_count.load(Ordering::Acquire);
        println!("Plugin called {} times", count);
        Ok(())
    }
}

#[async_trait]
impl PostProcessor for StatefulPlugin {
    async fn process(
        &self,
        result: &mut ExtractionResult,
        _config: &ExtractionConfig
    ) -> Result<()> {
        self.call_count.fetch_add(1, Ordering::AcqRel);

        let mut cache = self.cache.lock()
            .map_err(|_| KreuzbergError::plugin("Cache lock poisoned"))?;
        cache.insert("last_mime".to_string(), result.mime_type.clone());

        Ok(())
    }

    fn processing_stage(&self) -> ProcessingStage {
        ProcessingStage::Middle
    }
}

Python Thread Safety

C#
using System.Collections.Generic;
using System.Threading;

public class StatefulPlugin
{
    private readonly object _lock = new();
    private int _callCount = 0;
    private readonly Dictionary<string, object> _cache = new();

    public string Name() => "stateful-plugin";
    public string Version() => "1.0.0";

    public Dictionary<string, object> Process(Dictionary<string, object> result)
    {
        lock (_lock)
        {
            _callCount++;
            _cache["last_mime"] = result["mime_type"];
        }
        return result;
    }

    public void Initialize() { }
    public void Shutdown() { }
}
Java
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

class StatefulPlugin implements PostProcessor {
    // Use atomic types for simple counters
    private final AtomicInteger callCount = new AtomicInteger(0);

    // Use concurrent collections for complex state
    private final ConcurrentHashMap<String, String> cache = new ConcurrentHashMap<>();

    @Override
    public ExtractionResult process(ExtractionResult result) {
        // Increment counter atomically
        callCount.incrementAndGet();

        // Update cache (thread-safe)
        cache.put("last_mime", result.mimeType());

        return result;
    }

    public int getCallCount() {
        return callCount.get();
    }
}
Python
import threading

class StatefulPlugin:
    def __init__(self):
        self.lock: threading.Lock = threading.Lock()
        self.call_count: int = 0
        self.cache: dict = {}

    def name(self) -> str:
        return "stateful-plugin"

    def version(self) -> str:
        return "1.0.0"

    def process(self, result: dict) -> dict:
        with self.lock:
            self.call_count += 1
            self.cache["last_mime"] = result["mime_type"]
        return result

    def initialize(self) -> None:
        pass

    def shutdown(self) -> None:
        pass

Best Practices

Naming

  • Use kebab-case for plugin names: my-custom-plugin
  • Use lowercase only, no spaces or special characters
  • Be descriptive but concise

Error Handling

C#
using Kreuzberg;

try
{
    var result = KreuzbergClient.ExtractFileSync("missing.pdf");
    Console.WriteLine(result.Content);
}
catch (KreuzbergValidationException ex)
{
    Console.Error.WriteLine($"Validation error: {ex.Message}");
}
catch (KreuzbergIOException ex)
{
    Console.Error.WriteLine($"IO error: {ex.Message}");
    throw;
}
catch (KreuzbergException ex)
{
    Console.Error.WriteLine($"Extraction failed: {ex.Message}");
    throw;
}
Go
package main

/*
#cgo CFLAGS: -I${SRCDIR}/../../../crates/kreuzberg-ffi
#cgo LDFLAGS: -L${SRCDIR}/../../../target/release -lkreuzberg_ffi
#include "../../../crates/kreuzberg-ffi/kreuzberg.h"
#include <stdlib.h>
*/
import "C"
import (
    "log"
    "unsafe"

    "github.com/kreuzberg-dev/kreuzberg/packages/go/v4"
)

//export customValidator
func customValidator(resultJSON *C.char) *C.char {
    // Inspect resultJSON, return error message or NULL
    return nil
}

func main() {
    if err := kreuzberg.RegisterValidator("go-validator", 50, (C.ValidatorCallback)(C.customValidator)); err != nil {
        log.Fatalf("register validator failed: %v", err)
    }

    result, err := kreuzberg.ExtractFileSync("document.pdf", nil)
    if err != nil {
        log.Fatalf("extract failed: %v", err)
    }
    log.Printf("Content length: %d", len(result.Content))
}
Ruby
require "kreuzberg"

validator = lambda do |result|
  raise Kreuzberg::ValidationError, "Content too short" if result.content.length < 50
end

Kreuzberg.register_validator("min_length", validator, priority: 10)

result = Kreuzberg.extract_file_sync("document.pdf")
puts "Validated content length: #{result.content.length}"

Kreuzberg.unregister_validator("min_length")

Logging

C#
using Kreuzberg;
using Microsoft.Extensions.Logging;

public class MyPlugin
{
    private readonly ILogger _logger;

    public MyPlugin(ILogger logger)
    {
        _logger = logger;
    }

    public string Name() => "my-plugin";
    public string Version() => "1.0.0";

    public void Initialize()
    {
        _logger.LogInformation($"Initializing plugin: {Name()}");
    }

    public void Shutdown()
    {
        _logger.LogInformation($"Shutting down plugin: {Name()}");
    }

    public Dictionary<string, object> ExtractBytes(
        byte[] content, string mimeType, Dictionary<string, object> config)
    {
        _logger.LogInformation($"Extracting {mimeType} ({content.Length} bytes)");
        var result = new Dictionary<string, object> { { "content", "" }, { "mime_type", mimeType } };
        if (string.IsNullOrEmpty((string?)result["content"]))
        {
            _logger.LogWarning("Extraction resulted in empty content");
        }
        return result;
    }
}
Java
import java.util.logging.Logger;
import java.util.logging.Level;

class MyPlugin implements PostProcessor {
    private static final Logger logger = Logger.getLogger(MyPlugin.class.getName());

    @Override
    public ExtractionResult process(ExtractionResult result) {
        logger.info("Processing " + result.mimeType() +
            " (" + result.content().length() + " bytes)");

        // Processing...

        if (result.content().isEmpty()) {
            logger.warning("Processing resulted in empty content");
        }

        return result;
    }
}
Python
import logging

logger = logging.getLogger(__name__)

class MyPlugin:
    def name(self) -> str:
        return "my-plugin"

    def version(self) -> str:
        return "1.0.0"

    def initialize(self) -> None:
        logger.info(f"Initializing plugin: {self.name()}")

    def shutdown(self) -> None:
        logger.info(f"Shutting down plugin: {self.name()}")

    def extract_bytes(
        self, content: bytes, mime_type: str, config: dict
    ) -> dict:
        logger.info(f"Extracting {mime_type} ({len(content)} bytes)")
        result: dict = {"content": "", "mime_type": mime_type}
        if not result["content"]:
            logger.warning("Extraction resulted in empty content")
        return result
Rust
use log::{info, warn, error};

impl Plugin for MyPlugin {
    fn initialize(&self) -> Result<()> {
        info!("Initializing plugin: {}", self.name());
        Ok(())
    }

    fn shutdown(&self) -> Result<()> {
        info!("Shutting down plugin: {}", self.name());
        Ok(())
    }
}

#[async_trait]
impl DocumentExtractor for MyPlugin {
    async fn extract_bytes(
        &self,
        content: &[u8],
        mime_type: &str,
        _config: &ExtractionConfig,
    ) -> Result<ExtractionResult> {
        info!("Extracting {} ({} bytes)", mime_type, content.len());

        let result = ExtractionResult::default();

        if result.content.is_empty() {
            warn!("Extraction resulted in empty content");
        }

        Ok(result)
    }
}

Testing

C#
using Xunit;

public class CustomExtractorTests
{
    [Fact]
    public void TestCustomExtractor()
    {
        var extractor = new CustomJsonExtractor();
        var jsonData = System.Text.Encoding.UTF8.GetBytes(@"{""message"": ""Hello, world!""}");
        var config = new Dictionary<string, object>();

        var result = extractor.ExtractBytes(jsonData, "application/json", config);

        Assert.Contains("Hello, world!", (string)result["content"]);
        Assert.Equal("application/json", (string)result["mime_type"]);
    }
}
Java
import dev.kreuzberg.ExtractionResult;
import dev.kreuzberg.PostProcessor;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.*;

class PostProcessorTest {
    @Test
    void testWordCountProcessor() {
        PostProcessor processor = result -> {
            long count = result.getContent().split("\\s+").length;

            Map<String, Object> metadata = new HashMap<>(result.getMetadata());
            metadata.put("word_count", count);

            return result;
        };

        ExtractionResult input = new ExtractionResult(
            "Hello world test",
            "text/plain",
            new HashMap<>(),
            java.util.List.of(),
            java.util.List.of(),
            java.util.List.of(),
            java.util.List.of(),
            true
        );

        ExtractionResult output = processor.process(input);

        assertEquals(3, output.getMetadata().get("word_count"));
    }
}
Python
import pytest
from kreuzberg import ExtractionResult

def test_custom_extractor() -> None:
    extractor = CustomJsonExtractor()
    json_data: bytes = b'{"message": "Hello, world!"}'
    config: dict = {}
    result: ExtractionResult = extractor.extract_bytes(
        json_data, "application/json", config
    )
    assert "Hello, world!" in result["content"]
    assert result["mime_type"] == "application/json"
Rust
#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_custom_extractor() {
        let extractor = CustomJsonExtractor;

        let json_data = br#"{"message": "Hello, world!"}"#;
        let config = ExtractionConfig::default();

        let result = extractor
            .extract_bytes(json_data, "application/json", &config)
            .await
            .expect("Extraction failed");

        assert!(result.content.contains("Hello, world!"));
        assert_eq!(result.mime_type, "application/json");
    }
}

Complete Example: PDF Metadata Extractor

C#
using Kreuzberg;

public class PdfMetadataExtractor : IPostProcessor
{
    private int _processedCount = 0;

    public string Name() => "pdf_metadata_extractor";
    public string Version() => "1.0.0";
    public string Description() => "Extracts and enriches PDF metadata";
    public string ProcessingStage() => "early";

    public bool ShouldProcess(ExtractionResult result)
        => result.MimeType == "application/pdf";

    public ExtractionResult Process(ExtractionResult result)
    {
        _processedCount++;
        return result;
    }

    public void Initialize()
    {
        Console.WriteLine("PDF metadata extractor initialized");
    }

    public void Shutdown()
    {
        Console.WriteLine($"Processed {_processedCount} PDFs");
    }
}

var processor = new PdfMetadataExtractor();
KreuzbergClient.RegisterPostProcessor(processor);
Go
package main

import (
    "encoding/json"
    "log"
    "sync/atomic"
    "unsafe"

    "github.com/kreuzberg-dev/kreuzberg/packages/go/v4"
)

/*
#cgo CFLAGS: -I${SRCDIR}/../../../crates/kreuzberg-ffi
#cgo LDFLAGS: -L${SRCDIR}/../../../target/release -L${SRCDIR}/../../../target/debug -lkreuzberg_ffi
#include "../../../crates/kreuzberg-ffi/kreuzberg.h"
#include <stdlib.h>
*/
import "C"

// pdfMetadataState tracks statistics about PDF processing
var pdfMetadataState = struct {
    processedCount int64
}{
    processedCount: 0,
}

// pdfMetadataExtractor enriches PDF extraction results with additional metadata
//export pdfMetadataExtractor
func pdfMetadataExtractor(resultJSON *C.char) *C.char {
    jsonStr := C.GoString(resultJSON)
    var result map[string]interface{}

    if err := json.Unmarshal([]byte(jsonStr), &result); err != nil {
        return C.CString("{\"error\":\"Failed to parse result JSON\"}")
    }

    // Only process PDFs
    mimeType, ok := result["mime_type"].(string)
    if !ok || mimeType != "application/pdf" {
        // Return unchanged for non-PDF documents
        outputJSON, err := json.Marshal(result)
        if err != nil {
            return C.CString("{\"error\":\"Failed to serialize result\"}")
        }
        return C.CString(string(outputJSON))
    }

    // Process PDF-specific metadata
    metadata, ok := result["metadata"].(map[string]interface{})
    if !ok {
        metadata = make(map[string]interface{})
    }

    // Mark as processed by this processor
    metadata["pdf_processed"] = true

    // Add content statistics
    content, ok := result["content"].(string)
    if ok {
        metadata["content_length"] = len(content)
    }

    // Increment processed count atomically
    atomic.AddInt64(&pdfMetadataState.processedCount, 1)
    metadata["pdf_processor_version"] = "1.0.0"

    result["metadata"] = metadata

    // Serialize back to JSON
    outputJSON, err := json.Marshal(result)
    if err != nil {
        return C.CString("{\"error\":\"Failed to serialize result\"}")
    }

    return C.CString(string(outputJSON))
}

func main() {
    // Register the post-processor with priority 80, early stage
    if err := kreuzberg.RegisterPostProcessor("pdf_metadata_extractor", 80,
        (C.PostProcessorCallback)(C.pdfMetadataExtractor)); err != nil {
        log.Fatalf("failed to register post-processor: %v", err)
    }
    defer func() {
        if err := kreuzberg.UnregisterPostProcessor("pdf_metadata_extractor"); err != nil {
            log.Printf("warning: failed to unregister post-processor: %v", err)
        }

        log.Printf("Total PDFs processed: %d", atomic.LoadInt64(&pdfMetadataState.processedCount))
    }()

    // Extract PDF document
    result, err := kreuzberg.ExtractFileSync("document.pdf", nil)
    if err != nil {
        log.Fatalf("extraction failed: %v", err)
    }

    log.Printf("PDF MIME type: %s", result.MimeType)

    // Parse and display metadata
    var metadata map[string]interface{}
    if metaJSON, ok := result.MetadataJSON.(string); ok {
        if err := json.Unmarshal([]byte(metaJSON), &metadata); err == nil {
            if pdfProcessed, ok := metadata["pdf_processed"].(bool); ok && pdfProcessed {
                log.Printf("PDF metadata extracted successfully")
                if contentLen, ok := metadata["content_length"].(float64); ok {
                    log.Printf("Content length: %.0f bytes", contentLen)
                }
            }
        }
    }
}
Java
import dev.kreuzberg.Kreuzberg;
import dev.kreuzberg.ExtractionResult;
import dev.kreuzberg.PostProcessor;
import dev.kreuzberg.KreuzbergException;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;

public class PdfMetadataExtractorExample {
    private static final Logger logger = Logger.getLogger(
        PdfMetadataExtractorExample.class.getName()
    );

    public static void main(String[] args) {
        AtomicInteger processedCount = new AtomicInteger(0);

        PostProcessor pdfMetadata = result -> {
            if (!result.getMimeType().equals("application/pdf")) {
                return result;
            }

            processedCount.incrementAndGet();

            Map<String, Object> metadata = new HashMap<>(result.getMetadata());
            metadata.put("pdf_processed", true);
            metadata.put("processing_timestamp", System.currentTimeMillis());

            logger.info("Processed PDF: " + processedCount.get());

            return result;
        };

        try {
            Kreuzberg.registerPostProcessor("pdf-metadata-extractor", pdfMetadata, 50);

            logger.info("PDF metadata extractor initialized");

            ExtractionResult result = Kreuzberg.extractFile("document.pdf");
            System.out.println("PDF processed: " + result.getMetadata().get("pdf_processed"));

            logger.info("Processed " + processedCount.get() + " PDFs");
        } catch (IOException | KreuzbergException e) {
            e.printStackTrace();
        }
    }
}
Ruby
require 'kreuzberg'

class PdfMetadataExtractor
  def initialize
    @count = 0
  end

  def call(result)
    return result unless result['mime_type'] == 'application/pdf'
    @count += 1
    result['metadata'] ||= {}
    result['metadata']['pdf_order'] = @count
    result
  end
end

extractor = PdfMetadataExtractor.new
Kreuzberg.register_post_processor('pdf_metadata', extractor)

config = Kreuzberg::Config::Extraction.new(
  postprocessor: { enabled: true }
)

result = Kreuzberg.extract_file_sync('report.pdf', config: config)
puts "Metadata: #{result.metadata.inspect}"