Skip to content

Creating Plugins v4.0.0

Extend Kreuzberg with custom extractors, post-processors, OCR backends, and validators. Plugins can be written in Rust or Python, and are registered globally for use across all extraction calls.

WASM

Custom plugins are not supported in WASM environments. Use Python, Rust, or other native bindings.

Plugin Types

Type Purpose Use case
DocumentExtractor Extract content from file formats New format support, override built-in extractors
PostProcessor Transform extraction results Metadata enrichment, content filtering, text normalization
OcrBackend Perform OCR on images Cloud OCR services, custom OCR engines
Validator Validate extraction quality Minimum content length, quality score thresholds

All plugins must be thread-safe (Send + Sync in Rust, thread-safe in Python) and implement initialize() / shutdown() lifecycle methods.

Document Extractors

Add support for new file formats or override built-in extractors.

Implementation

Rust
use kreuzberg::plugins::{DocumentExtractor, Plugin};
use kreuzberg::{Result, ExtractionResult, ExtractionConfig, Metadata};
use async_trait::async_trait;
use std::path::Path;

struct CustomJsonExtractor;

impl Plugin for CustomJsonExtractor {
    fn name(&self) -> &str { "custom-json-extractor" }
    fn version(&self) -> String { "1.0.0".to_string() }
    fn initialize(&self) -> Result<()> { Ok(()) }
    fn shutdown(&self) -> Result<()> { Ok(()) }
}

#[async_trait]
impl DocumentExtractor for CustomJsonExtractor {
    async fn extract_bytes(
        &self,
        content: &[u8],
        _mime_type: &str,
        _config: &ExtractionConfig,
    ) -> Result<ExtractionResult> {
        let json: serde_json::Value = serde_json::from_slice(content)?;
        let text = extract_text_from_json(&json);

        Ok(ExtractionResult {
            content: text,
            mime_type: "application/json".to_string(),
            metadata: Metadata::default(),
            tables: vec![],
            detected_languages: None,
            chunks: None,
            images: None,
        })
    }

    fn supported_mime_types(&self) -> &[&str] {
        &["application/json", "text/json"]
    }

    fn priority(&self) -> i32 { 50 }
}

fn extract_text_from_json(value: &serde_json::Value) -> String {
    match value {
        serde_json::Value::String(s) => format!("{}\n", s),
        serde_json::Value::Array(arr) => arr.iter().map(extract_text_from_json).collect(),
        serde_json::Value::Object(obj) => obj.values().map(extract_text_from_json).collect(),
        _ => String::new(),
    }
}
Python
from kreuzberg import register_document_extractor, ExtractionResult
import json

class CustomJsonExtractor:
    def name(self) -> str:
        return "custom-json-extractor"

    def version(self) -> str:
        return "1.0.0"

    def supported_mime_types(self) -> list[str]:
        return ["application/json"]

    def priority(self) -> int:
        return 50

    def extract_bytes(
        self, content: bytes, mime_type: str, config: dict
    ) -> ExtractionResult:
        data: dict = json.loads(content)
        text: str = self._extract_text(data)
        return {"content": text, "mime_type": "application/json"}

    def _extract_text(self, obj: object) -> str:
        if isinstance(obj, str):
            return f"{obj}\n"
        if isinstance(obj, list):
            return "".join(self._extract_text(item) for item in obj)
        if isinstance(obj, dict):
            return "".join(self._extract_text(v) for v in obj.values())
        return ""

    def initialize(self) -> None:
        pass

    def shutdown(self) -> None:
        pass

extractor: CustomJsonExtractor = CustomJsonExtractor()
register_document_extractor(extractor)

Registration

Python
from kreuzberg import register_document_extractor

class CustomExtractor:
    def name(self) -> str:
        return "custom"

    def version(self) -> str:
        return "1.0.0"

extractor = CustomExtractor()
register_document_extractor(extractor)
print("Extractor registered")
TypeScript
import {
    listDocumentExtractors,
    unregisterDocumentExtractor,
    clearDocumentExtractors,
} from '@kreuzberg/node';

/**
 * Note: Custom document extractors are not supported in TypeScript v4.0.
 * Document extraction logic lives in the Rust core.
 *
 * You can list, unregister, or clear built-in extractors.
 */

// List all registered document extractors
const extractors = listDocumentExtractors();
console.log("Available extractors:", extractors);

// Unregister a specific extractor (use with caution)
unregisterDocumentExtractor("SomeExtractor");

// Clear all extractors (use with extreme caution)
// clearDocumentExtractors();
Rust
use kreuzberg::plugins::registry::get_document_extractor_registry;
use std::sync::Arc;

fn register_custom_extractor() -> kreuzberg::Result<()> {
    let extractor = Arc::new(CustomJsonExtractor);
    let registry = get_document_extractor_registry();
    registry.write().unwrap().register(extractor)?;
    Ok(())
}
Go
package main

import (
    "log"

    "github.com/kreuzberg-dev/kreuzberg/packages/go/v4"
)

func main() {
    // Register custom extractor with priority 50
    if err := kreuzberg.RegisterDocumentExtractor("custom-json-extractor", 50); err != nil {
        log.Fatalf("register extractor failed: %v", err)
    }

    result, err := kreuzberg.ExtractFileSync("document.json", nil)
    if err != nil {
        log.Fatalf("extract failed: %v", err)
    }
    log.Printf("Extracted content length: %d", len(result.Content))
}
Java
import dev.kreuzberg.Kreuzberg;
import dev.kreuzberg.ExtractionResult;
import dev.kreuzberg.KreuzbergException;
import java.io.IOException;

public class CustomExtractorExample {
    public static void main(String[] args) {
        try {
            ExtractionResult result = Kreuzberg.extractFile("document.json");
            System.out.println("Extracted content length: " + result.getContent().length());
        } catch (IOException | KreuzbergException e) {
            e.printStackTrace();
        }
    }
}
C#
using Kreuzberg;

public class CustomExtractor : IDocumentExtractor
{
    public string Name() => "custom";
    public string Version() => "1.0.0";

    public Dictionary<string, object> ExtractBytes(byte[] data, string mimeType, Dictionary<string, object> config)
    {
        return new Dictionary<string, object>
        {
            { "content", "Extracted content" },
            { "mime_type", mimeType }
        };
    }
}

var extractor = new CustomExtractor();
KreuzbergClient.RegisterDocumentExtractor(extractor);
Console.WriteLine("Extractor registered");
Ruby
require 'kreuzberg'

# Register custom extractor with priority 50
Kreuzberg.register_document_extractor(
  name: "custom-json-extractor",
  extractor: ->(content, mime_type, config) {
    JSON.parse(content.to_s)
  },
  priority: 50
)

result = Kreuzberg.extract_file("document.json")
puts "Extracted content length: #{result.content.length}"
R
library(kreuzberg)

custom_extractor <- function(path, mime_type) {
  content <- sprintf("Extracted from %s (%s)", path, mime_type)
  return(list(
    content = content,
    mime_type = mime_type,
    pages = 1L
  ))
}

register_document_extractor("custom_format", custom_extractor)

result <- extract_file_sync("custom_document.xyz", "application/custom", NULL)

cat(sprintf("Custom extractor result:\n"))
cat(sprintf("Content: %s\n", result$content))
cat(sprintf("Mime type: %s\n", result$mime_type))

Priority System

When multiple extractors support the same MIME type, the highest priority wins:

Range Level
0–25 Fallback / low-quality
26–49 Alternative
50 Default (built-in)
51–75 Enhanced / premium
76–100 Specialized / high-priority

Post-Processors

Transform and enrich results after extraction. Processors execute in three stages:

  • Early — Foundational: language detection, quality scoring, text normalization
  • Middle — Transformation: keyword extraction, token reduction, summarization
  • Late — Final: custom metadata, analytics, output formatting

Implementation

Rust
use kreuzberg::plugins::{Plugin, PostProcessor, ProcessingStage};
use kreuzberg::{Result, ExtractionResult, ExtractionConfig};
use async_trait::async_trait;

struct WordCountProcessor;

impl Plugin for WordCountProcessor {
    fn name(&self) -> &str { "word-count" }
    fn version(&self) -> String { "1.0.0".to_string() }
    fn initialize(&self) -> Result<()> { Ok(()) }
    fn shutdown(&self) -> Result<()> { Ok(()) }
}

#[async_trait]
impl PostProcessor for WordCountProcessor {
    async fn process(
        &self,
        result: &mut ExtractionResult,
        _config: &ExtractionConfig
    ) -> Result<()> {
        let word_count = result.content.split_whitespace().count();

        result.processing_warnings.push(ProcessingWarning {
            source: "word-count".to_string(),
            message: format!("Processed with word count: {}", word_count)
        });

        Ok(())
    }

    fn processing_stage(&self) -> ProcessingStage {
        ProcessingStage::Early
    }

    fn should_process(
        &self,
        result: &ExtractionResult,
        _config: &ExtractionConfig
    ) -> bool {
        !result.content.is_empty()
    }
}
Python
from kreuzberg import register_post_processor, ExtractionResult

class WordCountProcessor:
    def name(self) -> str:
        return "word_count"

    def version(self) -> str:
        return "1.0.0"

    def processing_stage(self) -> str:
        return "early"

    def process(self, result: ExtractionResult) -> ExtractionResult:
        word_count: int = len(result.content.split())
        result.metadata["word_count"] = word_count
        return result

    def should_process(self, result: ExtractionResult) -> bool:
        return bool(result.content)

    def initialize(self) -> None:
        pass

    def shutdown(self) -> None:
        pass

processor: WordCountProcessor = WordCountProcessor()
register_post_processor(processor)

Conditional Processing

Python
from kreuzberg import ExtractionResult, register_post_processor

class PdfOnlyProcessor:
    def name(self) -> str:
        return "pdf-only-processor"

    def version(self) -> str:
        return "1.0.0"

    def process(self, result: ExtractionResult) -> ExtractionResult:
        return result

    def should_process(self, result: ExtractionResult) -> bool:
        return result.mime_type == "application/pdf"

processor: PdfOnlyProcessor = PdfOnlyProcessor()
register_post_processor(processor)
Rust
impl PostProcessor for PdfOnlyProcessor {
    async fn process(
        &self,
        result: &mut ExtractionResult,
        _config: &ExtractionConfig
    ) -> Result<()> {
        Ok(())
    }

    fn processing_stage(&self) -> ProcessingStage {
        ProcessingStage::Middle
    }

    fn should_process(
        &self,
        result: &ExtractionResult,
        _config: &ExtractionConfig
    ) -> bool {
        result.mime_type == "application/pdf"
    }
}
Go
package main

import (
    "encoding/json"
    "log"
    "unsafe"

    "github.com/kreuzberg-dev/kreuzberg/packages/go/v4"
)

/*
#cgo CFLAGS: -I${SRCDIR}/../../../crates/kreuzberg-ffi
#cgo LDFLAGS: -L${SRCDIR}/../../../target/release -L${SRCDIR}/../../../target/debug -lkreuzberg_ffi
#include "../../../crates/kreuzberg-ffi/kreuzberg.h"
#include <stdlib.h>
*/
import "C"

// pdfOnlyProcessor applies PDF-specific processing logic only to PDF documents
//export pdfOnlyProcessor
func pdfOnlyProcessor(resultJSON *C.char) *C.char {
    jsonStr := C.GoString(resultJSON)
    var result map[string]interface{}

    if err := json.Unmarshal([]byte(jsonStr), &result); err != nil {
        return C.CString("{\"error\":\"Failed to parse result JSON\"}")
    }

    // Check MIME type - only process PDFs
    mimeType, ok := result["mime_type"].(string)
    if !ok || mimeType != "application/pdf" {
        // Return unchanged for non-PDF documents
        outputJSON, err := json.Marshal(result)
        if err != nil {
            return C.CString("{\"error\":\"Failed to serialize result\"}")
        }
        return C.CString(string(outputJSON))
    }

    // Perform PDF-specific processing
    metadata, ok := result["metadata"].(map[string]interface{})
    if !ok {
        metadata = make(map[string]interface{})
    }

    // Example PDF-specific processing:
    // - Extract tables as structured data
    // - Handle PDF-specific formatting
    // - Preserve document hierarchy

    metadata["pdf_specific_processing"] = true
    metadata["processor_type"] = "pdf_only"

    // Check for tables in PDF
    if tablesJSON, ok := result["tables_json"].(string); ok && tablesJSON != "" {
        var tables []interface{}
        if err := json.Unmarshal([]byte(tablesJSON), &tables); err == nil {
            metadata["table_count"] = len(tables)
        }
    }

    result["metadata"] = metadata

    // Serialize back to JSON
    outputJSON, err := json.Marshal(result)
    if err != nil {
        return C.CString("{\"error\":\"Failed to serialize result\"}")
    }

    return C.CString(string(outputJSON))
}

func main() {
    // Register the post-processor with priority 70
    if err := kreuzberg.RegisterPostProcessor("pdf_only_processor", 70,
        (C.PostProcessorCallback)(C.pdfOnlyProcessor)); err != nil {
        log.Fatalf("failed to register post-processor: %v", err)
    }
    defer func() {
        if err := kreuzberg.UnregisterPostProcessor("pdf_only_processor"); err != nil {
            log.Printf("warning: failed to unregister post-processor: %v", err)
        }
    }()

    // Process multiple documents - processor will only affect PDFs
    files := []string{
        "document.pdf",
        "image.jpg",
        "spreadsheet.xlsx",
    }

    for _, file := range files {
        result, err := kreuzberg.ExtractFileSync(file, nil)
        if err != nil {
            log.Printf("Warning: extraction failed for %s: %v", file, err)
            continue
        }

        // Parse metadata to check if PDF processing occurred
        var metadata map[string]interface{}
        if metaJSON, ok := result.MetadataJSON.(string); ok {
            if err := json.Unmarshal([]byte(metaJSON), &metadata); err == nil {
                if pdfProcessing, ok := metadata["pdf_specific_processing"].(bool); ok && pdfProcessing {
                    log.Printf("PDF-specific processing applied to: %s", file)
                    if tableCount, ok := metadata["table_count"].(float64); ok {
                        log.Printf("  Tables found: %.0f", tableCount)
                    }
                } else {
                    log.Printf("Skipped PDF processor for: %s (MIME: %s)", file, result.MimeType)
                }
            }
        }
    }
}
Java
import dev.kreuzberg.PostProcessor;
import java.util.HashMap;
import java.util.Map;

PostProcessor pdfOnly = result -> {
    if (!result.getMimeType().equals("application/pdf")) {
        return result;
    }

    Map<String, Object> metadata = new HashMap<>(result.getMetadata());
    metadata.put("pdf_processed", true);

    return result;
};
C#
using Kreuzberg;

public class PdfOnlyProcessor : IPostProcessor
{
    public string Name() => "pdf-only-processor";
    public string Version() => "1.0.0";

    public ExtractionResult Process(ExtractionResult result) => result;

    public bool ShouldProcess(ExtractionResult result)
        => result.MimeType == "application/pdf";
}

var processor = new PdfOnlyProcessor();
KreuzbergClient.RegisterPostProcessor(processor);

OCR Backends

Integrate cloud OCR services or custom engines:

Implementation

Rust
use kreuzberg::plugins::{Plugin, OcrBackend, OcrBackendType};
use kreuzberg::{Result, ExtractionResult, OcrConfig, Metadata};
use async_trait::async_trait;
use std::path::Path;

struct CloudOcrBackend {
    api_key: String,
    supported_langs: Vec<String>,
}

impl Plugin for CloudOcrBackend {
    fn name(&self) -> &str { "cloud-ocr" }
    fn version(&self) -> String { "1.0.0".to_string() }
    fn initialize(&self) -> Result<()> { Ok(()) }
    fn shutdown(&self) -> Result<()> { Ok(()) }
}

#[async_trait]
impl OcrBackend for CloudOcrBackend {
    async fn process_image(
        &self,
        image_bytes: &[u8],
        config: &OcrConfig,
    ) -> Result<ExtractionResult> {
        let text = self.call_cloud_api(image_bytes, &config.language).await?;

        Ok(ExtractionResult {
            content: text,
            mime_type: "text/plain".to_string(),
            metadata: Metadata::default(),
            tables: vec![],
            detected_languages: None,
            chunks: None,
            images: None,
        })
    }

    fn supports_language(&self, lang: &str) -> bool {
        self.supported_langs.iter().any(|l| l == lang)
    }

    fn backend_type(&self) -> OcrBackendType {
        OcrBackendType::Custom
    }

    fn supported_languages(&self) -> Vec<String> {
        self.supported_langs.clone()
    }
}

impl CloudOcrBackend {
    async fn call_cloud_api(
        &self,
        image: &[u8],
        language: &str
    ) -> Result<String> {
        Ok("Extracted text".to_string())
    }
}
Python
from kreuzberg import register_ocr_backend
import httpx

class CloudOcrBackend:
    def __init__(self, api_key: str):
        self.api_key: str = api_key
        self.langs: list[str] = ["eng", "deu", "fra"]

    def name(self) -> str:
        return "cloud-ocr"

    def version(self) -> str:
        return "1.0.0"

    def supported_languages(self) -> list[str]:
        return self.langs

    def process_image(self, image_bytes: bytes, config: dict) -> dict:
        with httpx.Client() as client:
            response = client.post(
                "https://api.example.com/ocr",
                files={"image": image_bytes},
                json={"language": config.get("language", "eng")},
            )
            text: str = response.json()["text"]
            return {"content": text, "mime_type": "text/plain"}

    def initialize(self) -> None:
        pass

    def shutdown(self) -> None:
        pass

backend: CloudOcrBackend = CloudOcrBackend(api_key="your-api-key")
register_ocr_backend(backend)
Java
import dev.kreuzberg.*;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.net.http.*;
import java.net.URI;

public class CloudOcrExample {
    public static void main(String[] args) {
        Arena callbackArena = Arena.ofAuto();
        String apiKey = "your-api-key";

        OcrBackend cloudOcr = (imageBytes, imageLength, configJson) -> {
            try {
                // Read image bytes from native memory
                byte[] image = imageBytes.reinterpret(imageLength)
                    .toArray(ValueLayout.JAVA_BYTE);

                // Read config JSON
                String config = configJson.reinterpret(Long.MAX_VALUE)
                    .getString(0);

                // Call cloud OCR API
                HttpClient client = HttpClient.newHttpClient();
                HttpRequest request = HttpRequest.newBuilder()
                    .uri(URI.create("https://api.example.com/ocr"))
                    .header("Authorization", "Bearer " + apiKey)
                    .POST(HttpRequest.BodyPublishers.ofByteArray(image))
                    .build();

                HttpResponse<String> response = client.send(request,
                    HttpResponse.BodyHandlers.ofString());

                String text = parseTextFromResponse(response.body());

                // Return result as C string
                return callbackArena.allocateFrom(text);
            } catch (Exception e) {
                return MemorySegment.NULL;
            }
        };

        try (Arena arena = Arena.ofConfined()) {
            Kreuzberg.registerOcrBackend("cloud-ocr", cloudOcr, arena);

            // Use custom OCR backend in extraction
            // Note: Requires ExtractionConfig with OCR enabled
            ExtractionResult result = Kreuzberg.extractFileSync("scanned.pdf");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static String parseTextFromResponse(String json) {
        // Parse JSON response and extract text field
        return json; // Simplified
    }
}
C#
using Kreuzberg;
using System.Net.Http;
using System.Text.Json;

public class CloudOcrBackend : IOcrBackend
{
    private readonly string _apiKey;
    private readonly List<string> _langs = new() { "eng", "deu", "fra" };

    public CloudOcrBackend(string apiKey)
    {
        _apiKey = apiKey;
    }

    public string Name() => "cloud-ocr";
    public string Version() => "1.0.0";
    public List<string> SupportedLanguages() => _langs;

    public Dictionary<string, object> ProcessImage(byte[] imageBytes, Dictionary<string, object> config)
    {
        using (var client = new HttpClient())
        {
            using (var form = new MultipartFormDataContent())
            {
                form.Add(new ByteArrayContent(imageBytes), "image");
                var lang = config.ContainsKey("language") ? config["language"].ToString() : "eng";
                form.Add(new StringContent(lang), "language");

                var response = client.PostAsync("https://api.example.com/ocr", form).Result;
                var json = response.Content.ReadAsStringAsync().Result;
                var doc = JsonDocument.Parse(json);
                var text = doc.RootElement.GetProperty("text").GetString();

                return new Dictionary<string, object>
                {
                    { "content", text },
                    { "mime_type", "text/plain" }
                };
            }
        }
    }

    public void Initialize() { }
    public void Shutdown() { }
}

var backend = new CloudOcrBackend(apiKey: "your-api-key");
KreuzbergClient.RegisterOcrBackend(backend);
Ruby
require 'kreuzberg'
require 'net/http'

class CloudOcrBackend
  def name
    'cloud-ocr'
  end

  def supported_languages
    %w[eng fra deu]
  end

  def process_image(image_data, language)
    uri = URI('https://api.example.com/ocr')
    req = Net::HTTP::Post.new(uri)
    req['Authorization'] = "Bearer #{ENV['OCR_API_KEY']}"
    req.body = image_data
    res = Net::HTTP.start(uri.hostname, uri.port, use_ssl: true) { |h| h.request(req) }
    raise Kreuzberg::Errors::OCRError, res.message unless res.is_a?(Net::HTTPSuccess)
    { content: JSON.parse(res.body)['text'] }
  rescue StandardError => e
    raise Kreuzberg::Errors::OCRError, e.message
  end
end

Kreuzberg.register_ocr_backend(CloudOcrBackend.new)
config = Kreuzberg::Config::Extraction.new(
  ocr: Kreuzberg::Config::OCR.new(backend: 'cloud-ocr')
)
Kreuzberg.extract_file_sync('doc.pdf', config: config)
R
library(kreuzberg)

custom_ocr_backend <- function(image_path, language) {
  cat(sprintf("Processing image: %s\n", image_path))
  return(sprintf("Extracted text from %s", image_path))
}

register_ocr_backend("custom_cloud", custom_ocr_backend)

ocr_cfg <- ocr_config(backend = "custom_cloud", language = "en")
config <- extraction_config(force_ocr = TRUE, ocr = ocr_cfg)

result <- extract_file_sync("document.pdf", "application/pdf", config)
cat(sprintf("Custom backend result: %d chars\n", nchar(result$content)))

Registration

Once you've implemented a backend (see above), register it so the extraction pipeline can use it. Set the backend name in OcrConfig to route OCR through your custom engine:

Python
from kreuzberg import register_ocr_backend, unregister_ocr_backend

backend = CloudOcrBackend(api_key="your-api-key")
register_ocr_backend(backend)

from kreuzberg import extract_file_sync, ExtractionConfig, OcrConfig

config = ExtractionConfig(ocr=OcrConfig(backend="cloud-ocr", language="eng"))
result = extract_file_sync("scanned.pdf", config=config)

unregister_ocr_backend("cloud-ocr")

Using EasyOCR (Built-in)

Kreuzberg ships with an EasyOCR backend that supports 80+ languages and optional GPU acceleration. You don't need to implement anything — just point OcrConfig at it:

Python
from kreuzberg import extract_file_sync, ExtractionConfig, OcrConfig

config: ExtractionConfig = ExtractionConfig(
    ocr=OcrConfig(backend="easyocr", language="en")
)

# EasyOCR-specific options (use_gpu, beam_width, etc.) go in easyocr_kwargs,
# not in OcrConfig — OcrConfig only accepts backend, language, and backend-specific configs.
result = extract_file_sync("scanned.pdf", config=config, easyocr_kwargs={"use_gpu": True})

content: str = result.content
preview: str = content[:100]
total_length: int = len(content)

print(f"Extracted content (preview): {preview}")
print(f"Total characters: {total_length}")

Validators

Enforce quality requirements on extraction results.

Warning

Validation errors cause extraction to fail. Use validators for critical quality checks only.

Rust
use kreuzberg::plugins::{Plugin, Validator};
use kreuzberg::{Result, ExtractionResult, ExtractionConfig, KreuzbergError};
use async_trait::async_trait;

struct MinLengthValidator {
    min_length: usize,
}

impl Plugin for MinLengthValidator {
    fn name(&self) -> &str { "min-length-validator" }
    fn version(&self) -> String { "1.0.0".to_string() }
    fn initialize(&self) -> Result<()> { Ok(()) }
    fn shutdown(&self) -> Result<()> { Ok(()) }
}

#[async_trait]
impl Validator for MinLengthValidator {
    async fn validate(
        &self,
        result: &ExtractionResult,
        _config: &ExtractionConfig,
    ) -> Result<()> {
        if result.content.len() < self.min_length {
            return Err(KreuzbergError::validation(format!(
                "Content too short: {} < {} characters",
                result.content.len(),
                self.min_length
            )));
        }
        Ok(())
    }

    fn priority(&self) -> i32 {
        100
    }
}
Python
from kreuzberg import register_validator, ExtractionResult, ValidationError

class MinLengthValidator:
    def __init__(self, min_length: int = 100):
        self.min_length: int = min_length

    def name(self) -> str:
        return "min_length_validator"

    def version(self) -> str:
        return "1.0.0"

    def priority(self) -> int:
        return 100

    def validate(self, result: ExtractionResult) -> None:
        content_len: int = len(result.content)
        if content_len < self.min_length:
            raise ValidationError(f"Content too short: {content_len}")

    def should_validate(self, result: ExtractionResult) -> bool:
        return True

    def initialize(self) -> None:
        pass

    def shutdown(self) -> None:
        pass

validator: MinLengthValidator = MinLengthValidator(min_length=100)
register_validator(validator)
Java
import dev.kreuzberg.Kreuzberg;
import dev.kreuzberg.ExtractionResult;
import dev.kreuzberg.Validator;
import dev.kreuzberg.ValidationException;
import dev.kreuzberg.KreuzbergException;
import java.io.IOException;

public class MinLengthValidatorExample {
    public static void main(String[] args) {
        int minLength = 100;

        Validator minLengthValidator = result -> {
            if (result.getContent().length() < minLength) {
                throw new ValidationException(
                    "Content too short: " + result.getContent().length() +
                    " < " + minLength
                );
            }
        };

        try {
            Kreuzberg.registerValidator("min-length", minLengthValidator, 100);

            ExtractionResult result = Kreuzberg.extractFile("document.pdf");
            System.out.println("Validation passed!");
        } catch (ValidationException e) {
            System.err.println("Validation failed: " + e.getMessage());
        } catch (IOException | KreuzbergException e) {
            e.printStackTrace();
        }
    }
}
C#
using Kreuzberg;

public class MinLengthValidator : IValidator
{
    private readonly int _minLength;

    public MinLengthValidator(int minLength = 100)
    {
        _minLength = minLength;
    }

    public string Name() => "min_length_validator";
    public string Version() => "1.0.0";
    public int Priority() => 100;

    public void Validate(Dictionary<string, object> result)
    {
        var contentLength = result["content"].ToString()?.Length ?? 0;
        if (contentLength < _minLength)
            throw new ValidationError($"Content too short: {contentLength}");
    }

    public bool ShouldValidate(Dictionary<string, object> result) => true;
    public void Initialize() { }
    public void Shutdown() { }
}

var validator = new MinLengthValidator(minLength: 100);
KreuzbergClient.RegisterValidator(validator);

Quality Score Validator

Rust
#[async_trait]
impl Validator for QualityValidator {
    async fn validate(
        &self,
        result: &ExtractionResult,
        _config: &ExtractionConfig,
    ) -> Result<()> {
        let score = result.metadata
            .additional
            .get("quality_score")
            .and_then(|v| v.as_f64())
            .unwrap_or(0.0);

        if score < 0.5 {
            return Err(KreuzbergError::validation(format!(
                "Quality score too low: {:.2} < 0.50",
                score
            )));
        }

        Ok(())
    }
}
Python
from kreuzberg import ExtractionResult, ValidationError, register_validator

class QualityValidator:
    def name(self) -> str:
        return "quality-validator"

    def version(self) -> str:
        return "1.0.0"

    def validate(self, result: ExtractionResult) -> None:
        score: float = result.metadata.get("quality_score", 0.0)
        if score < 0.5:
            raise ValidationError(
                f"Quality score too low: {score:.2f}"
            )

validator: QualityValidator = QualityValidator()
register_validator(validator)
Java
Validator qualityValidator = result -> {
    double score = result.getQualityScore() != null ? result.getQualityScore() : 0.0;

    if (score < 0.5) {
        throw new ValidationException(
            String.format("Quality score too low: %.2f < 0.50", score)
        );
    }
};
C#
using Kreuzberg;

public class QualityValidator : IValidator
{
    public string Name() => "quality-validator";
    public string Version() => "1.0.0";

    public void Validate(ExtractionResult result)
    {
        var score = result.QualityScore;

        if (score < 0.5)
            throw new ValidationError($"Quality score too low: {score:F2}");
    }

    public bool ShouldValidate(Dictionary<string, object> result) => true;
    public int Priority() => 100;
    public void Initialize() { }
    public void Shutdown() { }
}

var validator = new QualityValidator();
KreuzbergClient.RegisterValidator(validator);

Plugin Management

Listing

Python
from kreuzberg import (
    list_document_extractors,
    list_post_processors,
    list_ocr_backends,
    list_validators,
)

extractors: list[str] = list_document_extractors()
processors: list[str] = list_post_processors()
ocr_backends: list[str] = list_ocr_backends()
validators: list[str] = list_validators()

print(f"Extractors: {extractors}")
print(f"Processors: {processors}")
print(f"OCR backends: {ocr_backends}")
print(f"Validators: {validators}")
Rust
use kreuzberg::plugins::registry::*;

let registry = get_document_extractor_registry();
let extractors = registry.list()?;
println!("Registered extractors: {:?}", extractors);

let registry = get_post_processor_registry();
let processors = registry.list()?;
println!("Registered processors: {:?}", processors);

let registry = get_ocr_backend_registry();
let backends = registry.list()?;
println!("Registered OCR backends: {:?}", backends);

let registry = get_validator_registry();
let validators = registry.list()?;
println!("Registered validators: {:?}", validators);
Java
// Java does not provide plugin listing functionality in v4.0.0
// Plugins are registered and managed through the FFI layer
C#
using Kreuzberg;

var extractors = KreuzbergClient.ListDocumentExtractors();
var processors = KreuzbergClient.ListPostProcessors();
var ocrBackends = KreuzbergClient.ListOcrBackends();
var validators = KreuzbergClient.ListValidators();

Console.WriteLine($"Extractors: {string.Join(", ", extractors)}");
Console.WriteLine($"Processors: {string.Join(", ", processors)}");
Console.WriteLine($"OCR backends: {string.Join(", ", ocrBackends)}");
Console.WriteLine($"Validators: {string.Join(", ", validators)}");

Unregistering

Python
from kreuzberg import (
    unregister_document_extractor,
    unregister_post_processor,
    unregister_ocr_backend,
    unregister_validator,
)

names: list[str] = [
    "custom-json-extractor",
    "word_count",
    "cloud-ocr",
    "min_length_validator",
]

unregister_document_extractor(names[0])
unregister_post_processor(names[1])
unregister_ocr_backend(names[2])
unregister_validator(names[3])
Rust
use kreuzberg::plugins::registry::get_document_extractor_registry;

let registry = get_document_extractor_registry();
registry.remove("custom-json-extractor")?;
Java
import dev.kreuzberg.Kreuzberg;

try {
    // Unregister specific plugins
    Kreuzberg.unregisterPostProcessor("word-count");
    Kreuzberg.unregisterValidator("min-length");
} catch (KreuzbergException e) {
    System.err.println("Failed to unregister: " + e.getMessage());
}
C#
using Kreuzberg;

var names = new List<string>
{
    "custom-json-extractor",
    "word_count",
    "cloud-ocr",
    "min_length_validator"
};

KreuzbergClient.UnregisterDocumentExtractor(names[0]);
KreuzbergClient.UnregisterPostProcessor(names[1]);
KreuzbergClient.UnregisterOcrBackend(names[2]);
KreuzbergClient.UnregisterValidator(names[3]);

Clearing All

Python
from kreuzberg import (
    clear_document_extractors,
    clear_post_processors,
    clear_ocr_backends,
    clear_validators,
)

clear_post_processors()
clear_validators()
clear_ocr_backends()
clear_document_extractors()

print("All plugins cleared")
Rust
use kreuzberg::{clear_document_extractors, clear_post_processors, clear_ocr_backends, clear_validators};

fn main() {
    clear_document_extractors();
    clear_post_processors();
    clear_ocr_backends();
    clear_validators();

    println!("All plugins cleared");
}
Java
// Java does not provide bulk clearing functionality in v4.0.0
// Unregister plugins individually using unregisterPostProcessor() and unregisterValidator()
C#
using Kreuzberg;

KreuzbergClient.ClearPostProcessors();
KreuzbergClient.ClearValidators();
KreuzbergClient.ClearOcrBackends();
KreuzbergClient.ClearDocumentExtractors();

Console.WriteLine("All plugins cleared");

Thread Safety

Rust
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
use kreuzberg::KreuzbergError;

struct StatefulPlugin {
    call_count: AtomicUsize,
    cache: Mutex<HashMap<String, String>>,
}

impl Plugin for StatefulPlugin {
    fn name(&self) -> &str { "stateful-plugin" }
    fn version(&self) -> String { "1.0.0".to_string() }

    fn initialize(&self) -> Result<()> {
        self.call_count.store(0, Ordering::Release);
        Ok(())
    }

    fn shutdown(&self) -> Result<()> {
        let count = self.call_count.load(Ordering::Acquire);
        println!("Plugin called {} times", count);
        Ok(())
    }
}

#[async_trait]
impl PostProcessor for StatefulPlugin {
    async fn process(
        &self,
        result: &mut ExtractionResult,
        _config: &ExtractionConfig
    ) -> Result<()> {
        self.call_count.fetch_add(1, Ordering::AcqRel);

        let mut cache = self.cache.lock()
            .map_err(|_| KreuzbergError::plugin("Cache lock poisoned"))?;
        cache.insert("last_mime".to_string(), result.mime_type.clone());

        Ok(())
    }

    fn processing_stage(&self) -> ProcessingStage {
        ProcessingStage::Middle
    }
}
Python
import threading
from kreuzberg import ExtractionResult

class StatefulPlugin:
    def __init__(self):
        self.lock: threading.Lock = threading.Lock()
        self.call_count: int = 0
        self.cache: dict = {}

    def name(self) -> str:
        return "stateful-plugin"

    def version(self) -> str:
        return "1.0.0"

    def process(self, result: ExtractionResult) -> ExtractionResult:
        with self.lock:
            self.call_count += 1
            self.cache["last_mime"] = result.mime_type
        return result

    def initialize(self) -> None:
        pass

    def shutdown(self) -> None:
        pass
Java
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

class StatefulPlugin implements PostProcessor {
    // Use atomic types for simple counters
    private final AtomicInteger callCount = new AtomicInteger(0);

    // Use concurrent collections for complex state
    private final ConcurrentHashMap<String, String> cache = new ConcurrentHashMap<>();

    @Override
    public ExtractionResult process(ExtractionResult result) {
        // Increment counter atomically
        callCount.incrementAndGet();

        // Update cache (thread-safe)
        cache.put("last_mime", result.mimeType());

        return result;
    }

    public int getCallCount() {
        return callCount.get();
    }
}
C#
using System.Collections.Generic;
using System.Threading;

public class StatefulPlugin
{
    private readonly object _lock = new();
    private int _callCount = 0;
    private readonly Dictionary<string, object> _cache = new();

    public string Name() => "stateful-plugin";
    public string Version() => "1.0.0";

    public Dictionary<string, object> Process(Dictionary<string, object> result)
    {
        lock (_lock)
        {
            _callCount++;
            _cache["last_mime"] = result["mime_type"];
        }
        return result;
    }

    public void Initialize() { }
    public void Shutdown() { }
}

Best Practices

Naming: Use kebab-case (my-custom-plugin), lowercase only, no spaces or special characters.

Logging

Python
import logging

logger = logging.getLogger(__name__)

class MyPlugin:
    def name(self) -> str:
        return "my-plugin"

    def version(self) -> str:
        return "1.0.0"

    def initialize(self) -> None:
        logger.info(f"Initializing plugin: {self.name()}")

    def shutdown(self) -> None:
        logger.info(f"Shutting down plugin: {self.name()}")

    def extract_bytes(
        self, content: bytes, mime_type: str, config: dict
    ) -> dict:
        logger.info(f"Extracting {mime_type} ({len(content)} bytes)")
        result: dict = {"content": "", "mime_type": mime_type}
        if not result["content"]:
            logger.warning("Extraction resulted in empty content")
        return result
Rust
use log::{info, warn, error};

impl Plugin for MyPlugin {
    fn initialize(&self) -> Result<()> {
        info!("Initializing plugin: {}", self.name());
        Ok(())
    }

    fn shutdown(&self) -> Result<()> {
        info!("Shutting down plugin: {}", self.name());
        Ok(())
    }
}

#[async_trait]
impl DocumentExtractor for MyPlugin {
    async fn extract_bytes(
        &self,
        content: &[u8],
        mime_type: &str,
        _config: &ExtractionConfig,
    ) -> Result<ExtractionResult> {
        info!("Extracting {} ({} bytes)", mime_type, content.len());

        let result = ExtractionResult::default();

        if result.content.is_empty() {
            warn!("Extraction resulted in empty content");
        }

        Ok(result)
    }
}
Java
import java.util.logging.Logger;
import java.util.logging.Level;

class MyPlugin implements PostProcessor {
    private static final Logger logger = Logger.getLogger(MyPlugin.class.getName());

    @Override
    public ExtractionResult process(ExtractionResult result) {
        logger.info("Processing " + result.mimeType() +
            " (" + result.content().length() + " bytes)");

        // Processing...

        if (result.content().isEmpty()) {
            logger.warning("Processing resulted in empty content");
        }

        return result;
    }
}
C#
using Kreuzberg;
using Microsoft.Extensions.Logging;

public class MyPlugin
{
    private readonly ILogger _logger;

    public MyPlugin(ILogger logger)
    {
        _logger = logger;
    }

    public string Name() => "my-plugin";
    public string Version() => "1.0.0";

    public void Initialize()
    {
        _logger.LogInformation($"Initializing plugin: {Name()}");
    }

    public void Shutdown()
    {
        _logger.LogInformation($"Shutting down plugin: {Name()}");
    }

    public Dictionary<string, object> ExtractBytes(
        byte[] content, string mimeType, Dictionary<string, object> config)
    {
        _logger.LogInformation($"Extracting {mimeType} ({content.Length} bytes)");
        var result = new Dictionary<string, object> { { "content", "" }, { "mime_type", mimeType } };
        if (string.IsNullOrEmpty((string?)result["content"]))
        {
            _logger.LogWarning("Extraction resulted in empty content");
        }
        return result;
    }
}

Testing

Python
import pytest
from kreuzberg import ExtractionResult

def test_custom_extractor() -> None:
    extractor = CustomJsonExtractor()
    json_data: bytes = b'{"message": "Hello, world!"}'
    config: dict = {}
    result: ExtractionResult = extractor.extract_bytes(
        json_data, "application/json", config
    )
    assert "Hello, world!" in result.content
    assert result.mime_type == "application/json"
Rust
#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_custom_extractor() {
        let extractor = CustomJsonExtractor;

        let json_data = br#"{"message": "Hello, world!"}"#;
        let config = ExtractionConfig::default();

        let result = extractor
            .extract_bytes(json_data, "application/json", &config)
            .await
            .expect("Extraction failed");

        assert!(result.content.contains("Hello, world!"));
        assert_eq!(result.mime_type, "application/json");
    }
}
Java
import dev.kreuzberg.ExtractionResult;
import dev.kreuzberg.PostProcessor;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.*;

class PostProcessorTest {
    @Test
    void testWordCountProcessor() {
        PostProcessor processor = result -> {
            long count = result.getContent().split("\\s+").length;

            Map<String, Object> metadata = new HashMap<>(result.getMetadata());
            metadata.put("word_count", count);

            return result;
        };

        ExtractionResult input = new ExtractionResult(
            "Hello world test",
            "text/plain",
            new HashMap<>(),
            java.util.List.of(),
            java.util.List.of(),
            java.util.List.of(),
            java.util.List.of(),
            true
        );

        ExtractionResult output = processor.process(input);

        assertEquals(3, output.getMetadata().get("word_count"));
    }
}
C#
using Xunit;

public class CustomExtractorTests
{
    [Fact]
    public void TestCustomExtractor()
    {
        var extractor = new CustomJsonExtractor();
        var jsonData = System.Text.Encoding.UTF8.GetBytes(@"{""message"": ""Hello, world!""}");
        var config = new Dictionary<string, object>();

        var result = extractor.ExtractBytes(jsonData, "application/json", config);

        Assert.Contains("Hello, world!", (string)result["content"]);
        Assert.Equal("application/json", (string)result["mime_type"]);
    }
}

Complete Example: PDF Metadata Extractor

Python
from kreuzberg import register_post_processor, ExtractionResult
import logging

logger = logging.getLogger(__name__)

class PdfMetadataExtractor:
    def __init__(self):
        self.processed_count: int = 0

    def name(self) -> str:
        return "pdf_metadata_extractor"

    def version(self) -> str:
        return "1.0.0"

    def description(self) -> str:
        return "Extracts and enriches PDF metadata"

    def processing_stage(self) -> str:
        return "early"

    def should_process(self, result: ExtractionResult) -> bool:
        return result.mime_type == "application/pdf"

    def process(self, result: ExtractionResult) -> ExtractionResult:
        self.processed_count += 1
        result.metadata["pdf_processed"] = True
        return result

    def initialize(self) -> None:
        logger.info("PDF metadata extractor initialized")

    def shutdown(self) -> None:
        logger.info(f"Processed {self.processed_count} PDFs")

processor: PdfMetadataExtractor = PdfMetadataExtractor()
register_post_processor(processor)
Go
package main

import (
    "encoding/json"
    "log"
    "sync/atomic"
    "unsafe"

    "github.com/kreuzberg-dev/kreuzberg/packages/go/v4"
)

/*
#cgo CFLAGS: -I${SRCDIR}/../../../crates/kreuzberg-ffi
#cgo LDFLAGS: -L${SRCDIR}/../../../target/release -L${SRCDIR}/../../../target/debug -lkreuzberg_ffi
#include "../../../crates/kreuzberg-ffi/kreuzberg.h"
#include <stdlib.h>
*/
import "C"

// pdfMetadataState tracks statistics about PDF processing
var pdfMetadataState = struct {
    processedCount int64
}{
    processedCount: 0,
}

// pdfMetadataExtractor enriches PDF extraction results with additional metadata
//export pdfMetadataExtractor
func pdfMetadataExtractor(resultJSON *C.char) *C.char {
    jsonStr := C.GoString(resultJSON)
    var result map[string]interface{}

    if err := json.Unmarshal([]byte(jsonStr), &result); err != nil {
        return C.CString("{\"error\":\"Failed to parse result JSON\"}")
    }

    // Only process PDFs
    mimeType, ok := result["mime_type"].(string)
    if !ok || mimeType != "application/pdf" {
        // Return unchanged for non-PDF documents
        outputJSON, err := json.Marshal(result)
        if err != nil {
            return C.CString("{\"error\":\"Failed to serialize result\"}")
        }
        return C.CString(string(outputJSON))
    }

    // Process PDF-specific metadata
    metadata, ok := result["metadata"].(map[string]interface{})
    if !ok {
        metadata = make(map[string]interface{})
    }

    // Mark as processed by this processor
    metadata["pdf_processed"] = true

    // Add content statistics
    content, ok := result["content"].(string)
    if ok {
        metadata["content_length"] = len(content)
    }

    // Increment processed count atomically
    atomic.AddInt64(&pdfMetadataState.processedCount, 1)
    metadata["pdf_processor_version"] = "1.0.0"

    result["metadata"] = metadata

    // Serialize back to JSON
    outputJSON, err := json.Marshal(result)
    if err != nil {
        return C.CString("{\"error\":\"Failed to serialize result\"}")
    }

    return C.CString(string(outputJSON))
}

func main() {
    // Register the post-processor with priority 80, early stage
    if err := kreuzberg.RegisterPostProcessor("pdf_metadata_extractor", 80,
        (C.PostProcessorCallback)(C.pdfMetadataExtractor)); err != nil {
        log.Fatalf("failed to register post-processor: %v", err)
    }
    defer func() {
        if err := kreuzberg.UnregisterPostProcessor("pdf_metadata_extractor"); err != nil {
            log.Printf("warning: failed to unregister post-processor: %v", err)
        }

        log.Printf("Total PDFs processed: %d", atomic.LoadInt64(&pdfMetadataState.processedCount))
    }()

    // Extract PDF document
    result, err := kreuzberg.ExtractFileSync("document.pdf", nil)
    if err != nil {
        log.Fatalf("extraction failed: %v", err)
    }

    log.Printf("PDF MIME type: %s", result.MimeType)

    // Parse and display metadata
    var metadata map[string]interface{}
    if metaJSON, ok := result.MetadataJSON.(string); ok {
        if err := json.Unmarshal([]byte(metaJSON), &metadata); err == nil {
            if pdfProcessed, ok := metadata["pdf_processed"].(bool); ok && pdfProcessed {
                log.Printf("PDF metadata extracted successfully")
                if contentLen, ok := metadata["content_length"].(float64); ok {
                    log.Printf("Content length: %.0f bytes", contentLen)
                }
            }
        }
    }
}
Java
import dev.kreuzberg.Kreuzberg;
import dev.kreuzberg.ExtractionResult;
import dev.kreuzberg.PostProcessor;
import dev.kreuzberg.KreuzbergException;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;

public class PdfMetadataExtractorExample {
    private static final Logger logger = Logger.getLogger(
        PdfMetadataExtractorExample.class.getName()
    );

    public static void main(String[] args) {
        AtomicInteger processedCount = new AtomicInteger(0);

        PostProcessor pdfMetadata = result -> {
            if (!result.getMimeType().equals("application/pdf")) {
                return result;
            }

            processedCount.incrementAndGet();

            Map<String, Object> metadata = new HashMap<>(result.getMetadata());
            metadata.put("pdf_processed", true);
            metadata.put("processing_timestamp", System.currentTimeMillis());

            logger.info("Processed PDF: " + processedCount.get());

            return result;
        };

        try {
            Kreuzberg.registerPostProcessor("pdf-metadata-extractor", pdfMetadata, 50);

            logger.info("PDF metadata extractor initialized");

            ExtractionResult result = Kreuzberg.extractFile("document.pdf");
            System.out.println("PDF processed: " + result.getMetadata().get("pdf_processed"));

            logger.info("Processed " + processedCount.get() + " PDFs");
        } catch (IOException | KreuzbergException e) {
            e.printStackTrace();
        }
    }
}
C#
using Kreuzberg;

public class PdfMetadataExtractor : IPostProcessor
{
    private int _processedCount = 0;

    public string Name() => "pdf_metadata_extractor";
    public string Version() => "1.0.0";
    public string Description() => "Extracts and enriches PDF metadata";
    public string ProcessingStage() => "early";

    public bool ShouldProcess(ExtractionResult result)
        => result.MimeType == "application/pdf";

    public ExtractionResult Process(ExtractionResult result)
    {
        _processedCount++;
        return result;
    }

    public void Initialize()
    {
        Console.WriteLine("PDF metadata extractor initialized");
    }

    public void Shutdown()
    {
        Console.WriteLine($"Processed {_processedCount} PDFs");
    }
}

var processor = new PdfMetadataExtractor();
KreuzbergClient.RegisterPostProcessor(processor);
Ruby
require 'kreuzberg'

class PdfMetadataExtractor
  def initialize
    @count = 0
  end

  def call(result)
    return result unless result['mime_type'] == 'application/pdf'
    @count += 1
    result['metadata'] ||= {}
    result['metadata']['pdf_order'] = @count
    result
  end
end

extractor = PdfMetadataExtractor.new
Kreuzberg.register_post_processor('pdf_metadata', extractor)

config = Kreuzberg::Config::Extraction.new(
  postprocessor: { enabled: true }
)

result = Kreuzberg.extract_file_sync('report.pdf', config: config)
puts "Metadata: #{result.metadata.inspect}"
R
library(kreuzberg)

extract_pdf_metadata <- function(result) {
  processed_result <- result
  if (!is.null(result$metadata)) {
    cat(sprintf("PDF Metadata:\n"))
    for (key in names(result$metadata)) {
      cat(sprintf("  %s: %s\n", key, result$metadata[[key]]))
    }
  }
  return(processed_result)
}

register_post_processor("pdf_metadata", extract_pdf_metadata)

config <- extraction_config(postprocessor = list(enabled = TRUE))
result <- extract_file_sync("document.pdf", "application/pdf", config)

cat(sprintf("Extraction complete: %d characters\n", nchar(result$content)))