Creating Plugins¶
Kreuzberg's plugin system allows you to extend functionality by creating custom extractors, post-processors, OCR backends, and validators. Plugins can be written in Rust or Python.
WASM Support
The WebAssembly bindings use pre-compiled Rust core with tesseract-wasm for OCR. Custom plugins are not supported in WASM environments. For custom plugins, use Python, Rust, or other native language bindings.
Plugin Types¶
Kreuzberg supports four types of plugins:
| Plugin Type | Purpose | Use Cases |
|---|---|---|
| DocumentExtractor | Extract content from file formats | Add support for new formats, override built-in extractors |
| PostProcessor | Transform extraction results | Add metadata, enrich content, apply custom processing |
| OcrBackend | Perform OCR on images | Integrate cloud OCR services, custom OCR engines |
| Validator | Validate extraction quality | Enforce minimum quality, check completeness |
Plugin Architecture¶
All plugins must implement the base Plugin trait and a type-specific trait. Plugins are:
- Thread-safe: All plugins must be
Send + Sync(Rust) or thread-safe (Python) - Lifecycle-managed: Plugins have
initialize()andshutdown()methods - Registered globally: Use registry functions to register your plugins
Document Extractors¶
Extract content from custom file formats or override built-in extractors.
Rust Implementation¶
use kreuzberg::plugins::{DocumentExtractor, Plugin};
use kreuzberg::{Result, ExtractionResult, ExtractionConfig, Metadata};
use async_trait::async_trait;
use std::path::Path;
struct CustomJsonExtractor;
impl Plugin for CustomJsonExtractor {
fn name(&self) -> &str { "custom-json-extractor" }
fn version(&self) -> String { "1.0.0".to_string() }
fn initialize(&self) -> Result<()> { Ok(()) }
fn shutdown(&self) -> Result<()> { Ok(()) }
}
#[async_trait]
impl DocumentExtractor for CustomJsonExtractor {
async fn extract_bytes(
&self,
content: &[u8],
_mime_type: &str,
_config: &ExtractionConfig,
) -> Result<ExtractionResult> {
let json: serde_json::Value = serde_json::from_slice(content)?;
let text = extract_text_from_json(&json);
Ok(ExtractionResult {
content: text,
mime_type: "application/json".to_string(),
metadata: Metadata::default(),
tables: vec![],
detected_languages: None,
chunks: None,
images: None,
})
}
fn supported_mime_types(&self) -> &[&str] {
&["application/json", "text/json"]
}
fn priority(&self) -> i32 { 50 }
}
fn extract_text_from_json(value: &serde_json::Value) -> String {
match value {
serde_json::Value::String(s) => format!("{}\n", s),
serde_json::Value::Array(arr) => arr.iter().map(extract_text_from_json).collect(),
serde_json::Value::Object(obj) => obj.values().map(extract_text_from_json).collect(),
_ => String::new(),
}
}
Python Implementation¶
from kreuzberg import register_document_extractor, ExtractionResult
import json
class CustomJsonExtractor:
def name(self) -> str:
return "custom-json-extractor"
def version(self) -> str:
return "1.0.0"
def supported_mime_types(self) -> list[str]:
return ["application/json"]
def priority(self) -> int:
return 50
def extract_bytes(
self, content: bytes, mime_type: str, config: dict
) -> ExtractionResult:
data: dict = json.loads(content)
text: str = self._extract_text(data)
return {"content": text, "mime_type": "application/json"}
def _extract_text(self, obj: object) -> str:
if isinstance(obj, str):
return f"{obj}\n"
if isinstance(obj, list):
return "".join(self._extract_text(item) for item in obj)
if isinstance(obj, dict):
return "".join(self._extract_text(v) for v in obj.values())
return ""
def initialize(self) -> None:
pass
def shutdown(self) -> None:
pass
extractor: CustomJsonExtractor = CustomJsonExtractor()
register_document_extractor(extractor)
Registration¶
using Kreuzberg;
public class CustomExtractor : IDocumentExtractor
{
public string Name() => "custom";
public string Version() => "1.0.0";
public Dictionary<string, object> ExtractBytes(byte[] data, string mimeType, Dictionary<string, object> config)
{
return new Dictionary<string, object>
{
{ "content", "Extracted content" },
{ "mime_type", mimeType }
};
}
}
var extractor = new CustomExtractor();
KreuzbergClient.RegisterDocumentExtractor(extractor);
Console.WriteLine("Extractor registered");
package main
import (
"log"
"github.com/kreuzberg-dev/kreuzberg/packages/go/v4"
)
func main() {
// Register custom extractor with priority 50
if err := kreuzberg.RegisterDocumentExtractor("custom-json-extractor", 50); err != nil {
log.Fatalf("register extractor failed: %v", err)
}
result, err := kreuzberg.ExtractFileSync("document.json", nil)
if err != nil {
log.Fatalf("extract failed: %v", err)
}
log.Printf("Extracted content length: %d", len(result.Content))
}
import dev.kreuzberg.Kreuzberg;
import dev.kreuzberg.ExtractionResult;
import dev.kreuzberg.KreuzbergException;
import java.io.IOException;
public class CustomExtractorExample {
public static void main(String[] args) {
try {
ExtractionResult result = Kreuzberg.extractFile("document.json");
System.out.println("Extracted content length: " + result.getContent().length());
} catch (IOException | KreuzbergException e) {
e.printStackTrace();
}
}
}
require 'kreuzberg'
# Register custom extractor with priority 50
Kreuzberg.register_document_extractor(
name: "custom-json-extractor",
extractor: ->(content, mime_type, config) {
JSON.parse(content.to_s)
},
priority: 50
)
result = Kreuzberg.extract_file("document.json")
puts "Extracted content length: #{result.content.length}"
import {
listDocumentExtractors,
unregisterDocumentExtractor,
clearDocumentExtractors,
} from '@kreuzberg/node';
/**
* Note: Custom document extractors are not supported in TypeScript v4.0.
* Document extraction logic lives in the Rust core.
*
* You can list, unregister, or clear built-in extractors.
*/
// List all registered document extractors
const extractors = listDocumentExtractors();
console.log("Available extractors:", extractors);
// Unregister a specific extractor (use with caution)
unregisterDocumentExtractor("SomeExtractor");
// Clear all extractors (use with extreme caution)
// clearDocumentExtractors();
Priority System¶
When multiple extractors support the same MIME type, the highest priority wins:
- 0-25: Fallback/low-quality extractors
- 26-49: Alternative implementations
- 50: Default (built-in extractors)
- 51-75: Enhanced/premium extractors
- 76-100: Specialized/high-priority extractors
Post-Processors¶
Transform and enrich extraction results after initial extraction.
Processing Stages¶
Post-processors execute in three stages:
- Early: Run first, use for foundational operations like language detection, quality scoring, or text normalization that other processors may depend on
- Middle: Run second, use for content transformation like keyword extraction, token reduction, or summarization
- Late: Run last, use for final enrichment like custom metadata, analytics tracking, or output formatting
Rust Implementation¶
use kreuzberg::plugins::{Plugin, PostProcessor, ProcessingStage};
use kreuzberg::{Result, ExtractionResult, ExtractionConfig};
use async_trait::async_trait;
struct WordCountProcessor;
impl Plugin for WordCountProcessor {
fn name(&self) -> &str { "word-count" }
fn version(&self) -> String { "1.0.0".to_string() }
fn initialize(&self) -> Result<()> { Ok(()) }
fn shutdown(&self) -> Result<()> { Ok(()) }
}
#[async_trait]
impl PostProcessor for WordCountProcessor {
async fn process(
&self,
result: &mut ExtractionResult,
_config: &ExtractionConfig
) -> Result<()> {
let word_count = result.content.split_whitespace().count();
result.metadata.additional.insert(
"word_count".to_string(),
serde_json::json!(word_count)
);
Ok(())
}
fn processing_stage(&self) -> ProcessingStage {
ProcessingStage::Early
}
fn should_process(
&self,
result: &ExtractionResult,
_config: &ExtractionConfig
) -> bool {
!result.content.is_empty()
}
}
Python Implementation¶
from kreuzberg import register_post_processor, ExtractionResult
class WordCountProcessor:
def name(self) -> str:
return "word_count"
def version(self) -> str:
return "1.0.0"
def processing_stage(self) -> str:
return "early"
def process(self, result: ExtractionResult) -> ExtractionResult:
word_count: int = len(result["content"].split())
result["metadata"]["word_count"] = word_count
return result
def should_process(self, result: ExtractionResult) -> bool:
return bool(result["content"])
def initialize(self) -> None:
pass
def shutdown(self) -> None:
pass
processor: WordCountProcessor = WordCountProcessor()
register_post_processor(processor)
Conditional Processing¶
using Kreuzberg;
public class PdfOnlyProcessor : IPostProcessor
{
public string Name() => "pdf-only-processor";
public string Version() => "1.0.0";
public ExtractionResult Process(ExtractionResult result) => result;
public bool ShouldProcess(ExtractionResult result)
=> result.MimeType == "application/pdf";
}
var processor = new PdfOnlyProcessor();
KreuzbergClient.RegisterPostProcessor(processor);
package main
import (
"encoding/json"
"log"
"unsafe"
"github.com/kreuzberg-dev/kreuzberg/packages/go/v4"
)
/*
#cgo CFLAGS: -I${SRCDIR}/../../../crates/kreuzberg-ffi
#cgo LDFLAGS: -L${SRCDIR}/../../../target/release -L${SRCDIR}/../../../target/debug -lkreuzberg_ffi
#include "../../../crates/kreuzberg-ffi/kreuzberg.h"
#include <stdlib.h>
*/
import "C"
// pdfOnlyProcessor applies PDF-specific processing logic only to PDF documents
//export pdfOnlyProcessor
func pdfOnlyProcessor(resultJSON *C.char) *C.char {
jsonStr := C.GoString(resultJSON)
var result map[string]interface{}
if err := json.Unmarshal([]byte(jsonStr), &result); err != nil {
return C.CString("{\"error\":\"Failed to parse result JSON\"}")
}
// Check MIME type - only process PDFs
mimeType, ok := result["mime_type"].(string)
if !ok || mimeType != "application/pdf" {
// Return unchanged for non-PDF documents
outputJSON, err := json.Marshal(result)
if err != nil {
return C.CString("{\"error\":\"Failed to serialize result\"}")
}
return C.CString(string(outputJSON))
}
// Perform PDF-specific processing
metadata, ok := result["metadata"].(map[string]interface{})
if !ok {
metadata = make(map[string]interface{})
}
// Example PDF-specific processing:
// - Extract tables as structured data
// - Handle PDF-specific formatting
// - Preserve document hierarchy
metadata["pdf_specific_processing"] = true
metadata["processor_type"] = "pdf_only"
// Check for tables in PDF
if tablesJSON, ok := result["tables_json"].(string); ok && tablesJSON != "" {
var tables []interface{}
if err := json.Unmarshal([]byte(tablesJSON), &tables); err == nil {
metadata["table_count"] = len(tables)
}
}
result["metadata"] = metadata
// Serialize back to JSON
outputJSON, err := json.Marshal(result)
if err != nil {
return C.CString("{\"error\":\"Failed to serialize result\"}")
}
return C.CString(string(outputJSON))
}
func main() {
// Register the post-processor with priority 70
if err := kreuzberg.RegisterPostProcessor("pdf_only_processor", 70,
(C.PostProcessorCallback)(C.pdfOnlyProcessor)); err != nil {
log.Fatalf("failed to register post-processor: %v", err)
}
defer func() {
if err := kreuzberg.UnregisterPostProcessor("pdf_only_processor"); err != nil {
log.Printf("warning: failed to unregister post-processor: %v", err)
}
}()
// Process multiple documents - processor will only affect PDFs
files := []string{
"document.pdf",
"image.jpg",
"spreadsheet.xlsx",
}
for _, file := range files {
result, err := kreuzberg.ExtractFileSync(file, nil)
if err != nil {
log.Printf("Warning: extraction failed for %s: %v", file, err)
continue
}
// Parse metadata to check if PDF processing occurred
var metadata map[string]interface{}
if metaJSON, ok := result.MetadataJSON.(string); ok {
if err := json.Unmarshal([]byte(metaJSON), &metadata); err == nil {
if pdfProcessing, ok := metadata["pdf_specific_processing"].(bool); ok && pdfProcessing {
log.Printf("PDF-specific processing applied to: %s", file)
if tableCount, ok := metadata["table_count"].(float64); ok {
log.Printf(" Tables found: %.0f", tableCount)
}
} else {
log.Printf("Skipped PDF processor for: %s (MIME: %s)", file, result.MimeType)
}
}
}
}
}
import dev.kreuzberg.PostProcessor;
import java.util.HashMap;
import java.util.Map;
PostProcessor pdfOnly = result -> {
if (!result.getMimeType().equals("application/pdf")) {
return result;
}
Map<String, Object> metadata = new HashMap<>(result.getMetadata());
metadata.put("pdf_processed", true);
return result;
};
from kreuzberg import ExtractionResult, register_post_processor
class PdfOnlyProcessor:
def name(self) -> str:
return "pdf-only-processor"
def version(self) -> str:
return "1.0.0"
def process(self, result: ExtractionResult) -> ExtractionResult:
return result
def should_process(self, result: ExtractionResult) -> bool:
return result["mime_type"] == "application/pdf"
processor: PdfOnlyProcessor = PdfOnlyProcessor()
register_post_processor(processor)
impl PostProcessor for PdfOnlyProcessor {
async fn process(
&self,
result: &mut ExtractionResult,
_config: &ExtractionConfig
) -> Result<()> {
Ok(())
}
fn processing_stage(&self) -> ProcessingStage {
ProcessingStage::Middle
}
fn should_process(
&self,
result: &ExtractionResult,
_config: &ExtractionConfig
) -> bool {
result.mime_type == "application/pdf"
}
}
OCR Backends¶
Integrate custom OCR engines or cloud services.
Rust Implementation¶
use kreuzberg::plugins::{Plugin, OcrBackend, OcrBackendType};
use kreuzberg::{Result, ExtractionResult, OcrConfig, Metadata};
use async_trait::async_trait;
use std::path::Path;
struct CloudOcrBackend {
api_key: String,
supported_langs: Vec<String>,
}
impl Plugin for CloudOcrBackend {
fn name(&self) -> &str { "cloud-ocr" }
fn version(&self) -> String { "1.0.0".to_string() }
fn initialize(&self) -> Result<()> { Ok(()) }
fn shutdown(&self) -> Result<()> { Ok(()) }
}
#[async_trait]
impl OcrBackend for CloudOcrBackend {
async fn process_image(
&self,
image_bytes: &[u8],
config: &OcrConfig,
) -> Result<ExtractionResult> {
let text = self.call_cloud_api(image_bytes, &config.language).await?;
Ok(ExtractionResult {
content: text,
mime_type: "text/plain".to_string(),
metadata: Metadata::default(),
tables: vec![],
detected_languages: None,
chunks: None,
images: None,
})
}
fn supports_language(&self, lang: &str) -> bool {
self.supported_langs.iter().any(|l| l == lang)
}
fn backend_type(&self) -> OcrBackendType {
OcrBackendType::Custom
}
fn supported_languages(&self) -> Vec<String> {
self.supported_langs.clone()
}
}
impl CloudOcrBackend {
async fn call_cloud_api(
&self,
image: &[u8],
language: &str
) -> Result<String> {
Ok("Extracted text".to_string())
}
}
Python Implementation¶
using Kreuzberg;
using System.Net.Http;
using System.Text.Json;
public class CloudOcrBackend : IOcrBackend
{
private readonly string _apiKey;
private readonly List<string> _langs = new() { "eng", "deu", "fra" };
public CloudOcrBackend(string apiKey)
{
_apiKey = apiKey;
}
public string Name() => "cloud-ocr";
public string Version() => "1.0.0";
public List<string> SupportedLanguages() => _langs;
public Dictionary<string, object> ProcessImage(byte[] imageBytes, Dictionary<string, object> config)
{
using (var client = new HttpClient())
{
using (var form = new MultipartFormDataContent())
{
form.Add(new ByteArrayContent(imageBytes), "image");
var lang = config.ContainsKey("language") ? config["language"].ToString() : "eng";
form.Add(new StringContent(lang), "language");
var response = client.PostAsync("https://api.example.com/ocr", form).Result;
var json = response.Content.ReadAsStringAsync().Result;
var doc = JsonDocument.Parse(json);
var text = doc.RootElement.GetProperty("text").GetString();
return new Dictionary<string, object>
{
{ "content", text },
{ "mime_type", "text/plain" }
};
}
}
}
public void Initialize() { }
public void Shutdown() { }
}
var backend = new CloudOcrBackend(apiKey: "your-api-key");
KreuzbergClient.RegisterOcrBackend(backend);
import dev.kreuzberg.*;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.net.http.*;
import java.net.URI;
public class CloudOcrExample {
public static void main(String[] args) {
Arena callbackArena = Arena.ofAuto();
String apiKey = "your-api-key";
OcrBackend cloudOcr = (imageBytes, imageLength, configJson) -> {
try {
// Read image bytes from native memory
byte[] image = imageBytes.reinterpret(imageLength)
.toArray(ValueLayout.JAVA_BYTE);
// Read config JSON
String config = configJson.reinterpret(Long.MAX_VALUE)
.getString(0);
// Call cloud OCR API
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("https://api.example.com/ocr"))
.header("Authorization", "Bearer " + apiKey)
.POST(HttpRequest.BodyPublishers.ofByteArray(image))
.build();
HttpResponse<String> response = client.send(request,
HttpResponse.BodyHandlers.ofString());
String text = parseTextFromResponse(response.body());
// Return result as C string
return callbackArena.allocateFrom(text);
} catch (Exception e) {
return MemorySegment.NULL;
}
};
try (Arena arena = Arena.ofConfined()) {
Kreuzberg.registerOcrBackend("cloud-ocr", cloudOcr, arena);
// Use custom OCR backend in extraction
// Note: Requires ExtractionConfig with OCR enabled
ExtractionResult result = Kreuzberg.extractFileSync("scanned.pdf");
} catch (Exception e) {
e.printStackTrace();
}
}
private static String parseTextFromResponse(String json) {
// Parse JSON response and extract text field
return json; // Simplified
}
}
from kreuzberg import register_ocr_backend
import httpx
class CloudOcrBackend:
def __init__(self, api_key: str):
self.api_key: str = api_key
self.langs: list[str] = ["eng", "deu", "fra"]
def name(self) -> str:
return "cloud-ocr"
def version(self) -> str:
return "1.0.0"
def supported_languages(self) -> list[str]:
return self.langs
def process_image(self, image_bytes: bytes, config: dict) -> dict:
with httpx.Client() as client:
response = client.post(
"https://api.example.com/ocr",
files={"image": image_bytes},
json={"language": config.get("language", "eng")},
)
text: str = response.json()["text"]
return {"content": text, "mime_type": "text/plain"}
def initialize(self) -> None:
pass
def shutdown(self) -> None:
pass
backend: CloudOcrBackend = CloudOcrBackend(api_key="your-api-key")
register_ocr_backend(backend)
require 'kreuzberg'
require 'net/http'
class CloudOcrBackend
def name
'cloud-ocr'
end
def supported_languages
%w[eng fra deu]
end
def process_image(image_data, language)
uri = URI('https://api.example.com/ocr')
req = Net::HTTP::Post.new(uri)
req['Authorization'] = "Bearer #{ENV['OCR_API_KEY']}"
req.body = image_data
res = Net::HTTP.start(uri.hostname, uri.port, use_ssl: true) { |h| h.request(req) }
raise Kreuzberg::Errors::OCRError, res.message unless res.is_a?(Net::HTTPSuccess)
{ content: JSON.parse(res.body)['text'] }
rescue StandardError => e
raise Kreuzberg::Errors::OCRError, e.message
end
end
Kreuzberg.register_ocr_backend(CloudOcrBackend.new)
config = Kreuzberg::Config::Extraction.new(
ocr: Kreuzberg::Config::OCR.new(backend: 'cloud-ocr')
)
Kreuzberg.extract_file_sync('doc.pdf', config: config)
Validators¶
Enforce quality requirements on extraction results.
Validators are Fatal
Validation errors cause extraction to fail. Use validators for critical quality checks only.
Rust Implementation¶
use kreuzberg::plugins::{Plugin, Validator};
use kreuzberg::{Result, ExtractionResult, ExtractionConfig, KreuzbergError};
use async_trait::async_trait;
struct MinLengthValidator {
min_length: usize,
}
impl Plugin for MinLengthValidator {
fn name(&self) -> &str { "min-length-validator" }
fn version(&self) -> String { "1.0.0".to_string() }
fn initialize(&self) -> Result<()> { Ok(()) }
fn shutdown(&self) -> Result<()> { Ok(()) }
}
#[async_trait]
impl Validator for MinLengthValidator {
async fn validate(
&self,
result: &ExtractionResult,
_config: &ExtractionConfig,
) -> Result<()> {
if result.content.len() < self.min_length {
return Err(KreuzbergError::validation(format!(
"Content too short: {} < {} characters",
result.content.len(),
self.min_length
)));
}
Ok(())
}
fn priority(&self) -> i32 {
100
}
}
Python Implementation¶
using Kreuzberg;
public class MinLengthValidator : IValidator
{
private readonly int _minLength;
public MinLengthValidator(int minLength = 100)
{
_minLength = minLength;
}
public string Name() => "min_length_validator";
public string Version() => "1.0.0";
public int Priority() => 100;
public void Validate(Dictionary<string, object> result)
{
var contentLength = result["content"].ToString()?.Length ?? 0;
if (contentLength < _minLength)
throw new ValidationError($"Content too short: {contentLength}");
}
public bool ShouldValidate(Dictionary<string, object> result) => true;
public void Initialize() { }
public void Shutdown() { }
}
var validator = new MinLengthValidator(minLength: 100);
KreuzbergClient.RegisterValidator(validator);
import dev.kreuzberg.Kreuzberg;
import dev.kreuzberg.ExtractionResult;
import dev.kreuzberg.Validator;
import dev.kreuzberg.ValidationException;
import dev.kreuzberg.KreuzbergException;
import java.io.IOException;
public class MinLengthValidatorExample {
public static void main(String[] args) {
int minLength = 100;
Validator minLengthValidator = result -> {
if (result.getContent().length() < minLength) {
throw new ValidationException(
"Content too short: " + result.getContent().length() +
" < " + minLength
);
}
};
try {
Kreuzberg.registerValidator("min-length", minLengthValidator, 100);
ExtractionResult result = Kreuzberg.extractFile("document.pdf");
System.out.println("Validation passed!");
} catch (ValidationException e) {
System.err.println("Validation failed: " + e.getMessage());
} catch (IOException | KreuzbergException e) {
e.printStackTrace();
}
}
}
from kreuzberg import register_validator, ValidationError
class MinLengthValidator:
def __init__(self, min_length: int = 100):
self.min_length: int = min_length
def name(self) -> str:
return "min_length_validator"
def version(self) -> str:
return "1.0.0"
def priority(self) -> int:
return 100
def validate(self, result: dict) -> None:
content_len: int = len(result["content"])
if content_len < self.min_length:
raise ValidationError(f"Content too short: {content_len}")
def should_validate(self, result: dict) -> bool:
return True
def initialize(self) -> None:
pass
def shutdown(self) -> None:
pass
validator: MinLengthValidator = MinLengthValidator(min_length=100)
register_validator(validator)
Quality Score Validator¶
using Kreuzberg;
public class QualityValidator : IValidator
{
public string Name() => "quality-validator";
public string Version() => "1.0.0";
public void Validate(Dictionary<string, object> result)
{
var metadata = (Dictionary<string, object>)result["metadata"];
var score = metadata.ContainsKey("quality_score")
? Convert.ToDouble(metadata["quality_score"])
: 0.0;
if (score < 0.5)
throw new ValidationError($"Quality score too low: {score:F2}");
}
public bool ShouldValidate(Dictionary<string, object> result) => true;
public int Priority() => 100;
public void Initialize() { }
public void Shutdown() { }
}
var validator = new QualityValidator();
KreuzbergClient.RegisterValidator(validator);
Validator qualityValidator = result -> {
double score = result.getMetadata().containsKey("quality_score")
? ((Number) result.getMetadata().get("quality_score")).doubleValue()
: 0.0;
if (score < 0.5) {
throw new ValidationException(
String.format("Quality score too low: %.2f < 0.50", score)
);
}
};
from kreuzberg import ValidationError, register_validator
class QualityValidator:
def name(self) -> str:
return "quality-validator"
def version(self) -> str:
return "1.0.0"
def validate(self, result: dict) -> None:
score: float = result["metadata"].get("quality_score", 0.0)
if score < 0.5:
raise ValidationError(
f"Quality score too low: {score:.2f}"
)
validator: QualityValidator = QualityValidator()
register_validator(validator)
#[async_trait]
impl Validator for QualityValidator {
async fn validate(
&self,
result: &ExtractionResult,
_config: &ExtractionConfig,
) -> Result<()> {
let score = result.metadata
.additional
.get("quality_score")
.and_then(|v| v.as_f64())
.unwrap_or(0.0);
if score < 0.5 {
return Err(KreuzbergError::validation(format!(
"Quality score too low: {:.2} < 0.50",
score
)));
}
Ok(())
}
}
Plugin Management¶
Listing Plugins¶
using Kreuzberg;
var extractors = KreuzbergClient.ListDocumentExtractors();
var processors = KreuzbergClient.ListPostProcessors();
var ocrBackends = KreuzbergClient.ListOcrBackends();
var validators = KreuzbergClient.ListValidators();
Console.WriteLine($"Extractors: {string.Join(", ", extractors)}");
Console.WriteLine($"Processors: {string.Join(", ", processors)}");
Console.WriteLine($"OCR backends: {string.Join(", ", ocrBackends)}");
Console.WriteLine($"Validators: {string.Join(", ", validators)}");
from kreuzberg import (
list_document_extractors,
list_post_processors,
list_ocr_backends,
list_validators,
)
extractors: list[str] = list_document_extractors()
processors: list[str] = list_post_processors()
ocr_backends: list[str] = list_ocr_backends()
validators: list[str] = list_validators()
print(f"Extractors: {extractors}")
print(f"Processors: {processors}")
print(f"OCR backends: {ocr_backends}")
print(f"Validators: {validators}")
use kreuzberg::plugins::registry::*;
let registry = get_document_extractor_registry();
let extractors = registry.list()?;
println!("Registered extractors: {:?}", extractors);
let registry = get_post_processor_registry();
let processors = registry.list()?;
println!("Registered processors: {:?}", processors);
let registry = get_ocr_backend_registry();
let backends = registry.list()?;
println!("Registered OCR backends: {:?}", backends);
let registry = get_validator_registry();
let validators = registry.list()?;
println!("Registered validators: {:?}", validators);
Unregistering Plugins¶
using Kreuzberg;
var names = new List<string>
{
"custom-json-extractor",
"word_count",
"cloud-ocr",
"min_length_validator"
};
KreuzbergClient.UnregisterDocumentExtractor(names[0]);
KreuzbergClient.UnregisterPostProcessor(names[1]);
KreuzbergClient.UnregisterOcrBackend(names[2]);
KreuzbergClient.UnregisterValidator(names[3]);
from kreuzberg import (
unregister_document_extractor,
unregister_post_processor,
unregister_ocr_backend,
unregister_validator,
)
names: list[str] = [
"custom-json-extractor",
"word_count",
"cloud-ocr",
"min_length_validator",
]
unregister_document_extractor(names[0])
unregister_post_processor(names[1])
unregister_ocr_backend(names[2])
unregister_validator(names[3])
Clearing All Plugins¶
Thread Safety¶
All plugins must be thread-safe:
Rust Thread Safety¶
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
use kreuzberg::KreuzbergError;
struct StatefulPlugin {
call_count: AtomicUsize,
cache: Mutex<HashMap<String, String>>,
}
impl Plugin for StatefulPlugin {
fn name(&self) -> &str { "stateful-plugin" }
fn version(&self) -> String { "1.0.0".to_string() }
fn initialize(&self) -> Result<()> {
self.call_count.store(0, Ordering::Release);
Ok(())
}
fn shutdown(&self) -> Result<()> {
let count = self.call_count.load(Ordering::Acquire);
println!("Plugin called {} times", count);
Ok(())
}
}
#[async_trait]
impl PostProcessor for StatefulPlugin {
async fn process(
&self,
result: &mut ExtractionResult,
_config: &ExtractionConfig
) -> Result<()> {
self.call_count.fetch_add(1, Ordering::AcqRel);
let mut cache = self.cache.lock()
.map_err(|_| KreuzbergError::plugin("Cache lock poisoned"))?;
cache.insert("last_mime".to_string(), result.mime_type.clone());
Ok(())
}
fn processing_stage(&self) -> ProcessingStage {
ProcessingStage::Middle
}
}
Python Thread Safety¶
using System.Collections.Generic;
using System.Threading;
public class StatefulPlugin
{
private readonly object _lock = new();
private int _callCount = 0;
private readonly Dictionary<string, object> _cache = new();
public string Name() => "stateful-plugin";
public string Version() => "1.0.0";
public Dictionary<string, object> Process(Dictionary<string, object> result)
{
lock (_lock)
{
_callCount++;
_cache["last_mime"] = result["mime_type"];
}
return result;
}
public void Initialize() { }
public void Shutdown() { }
}
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
class StatefulPlugin implements PostProcessor {
// Use atomic types for simple counters
private final AtomicInteger callCount = new AtomicInteger(0);
// Use concurrent collections for complex state
private final ConcurrentHashMap<String, String> cache = new ConcurrentHashMap<>();
@Override
public ExtractionResult process(ExtractionResult result) {
// Increment counter atomically
callCount.incrementAndGet();
// Update cache (thread-safe)
cache.put("last_mime", result.mimeType());
return result;
}
public int getCallCount() {
return callCount.get();
}
}
import threading
class StatefulPlugin:
def __init__(self):
self.lock: threading.Lock = threading.Lock()
self.call_count: int = 0
self.cache: dict = {}
def name(self) -> str:
return "stateful-plugin"
def version(self) -> str:
return "1.0.0"
def process(self, result: dict) -> dict:
with self.lock:
self.call_count += 1
self.cache["last_mime"] = result["mime_type"]
return result
def initialize(self) -> None:
pass
def shutdown(self) -> None:
pass
Best Practices¶
Naming¶
- Use kebab-case for plugin names:
my-custom-plugin - Use lowercase only, no spaces or special characters
- Be descriptive but concise
Error Handling¶
using Kreuzberg;
try
{
var result = KreuzbergClient.ExtractFileSync("missing.pdf");
Console.WriteLine(result.Content);
}
catch (KreuzbergValidationException ex)
{
Console.Error.WriteLine($"Validation error: {ex.Message}");
}
catch (KreuzbergIOException ex)
{
Console.Error.WriteLine($"IO error: {ex.Message}");
throw;
}
catch (KreuzbergException ex)
{
Console.Error.WriteLine($"Extraction failed: {ex.Message}");
throw;
}
package main
/*
#cgo CFLAGS: -I${SRCDIR}/../../../crates/kreuzberg-ffi
#cgo LDFLAGS: -L${SRCDIR}/../../../target/release -lkreuzberg_ffi
#include "../../../crates/kreuzberg-ffi/kreuzberg.h"
#include <stdlib.h>
*/
import "C"
import (
"log"
"unsafe"
"github.com/kreuzberg-dev/kreuzberg/packages/go/v4"
)
//export customValidator
func customValidator(resultJSON *C.char) *C.char {
// Inspect resultJSON, return error message or NULL
return nil
}
func main() {
if err := kreuzberg.RegisterValidator("go-validator", 50, (C.ValidatorCallback)(C.customValidator)); err != nil {
log.Fatalf("register validator failed: %v", err)
}
result, err := kreuzberg.ExtractFileSync("document.pdf", nil)
if err != nil {
log.Fatalf("extract failed: %v", err)
}
log.Printf("Content length: %d", len(result.Content))
}
require "kreuzberg"
validator = lambda do |result|
raise Kreuzberg::ValidationError, "Content too short" if result.content.length < 50
end
Kreuzberg.register_validator("min_length", validator, priority: 10)
result = Kreuzberg.extract_file_sync("document.pdf")
puts "Validated content length: #{result.content.length}"
Kreuzberg.unregister_validator("min_length")
Logging¶
using Kreuzberg;
using Microsoft.Extensions.Logging;
public class MyPlugin
{
private readonly ILogger _logger;
public MyPlugin(ILogger logger)
{
_logger = logger;
}
public string Name() => "my-plugin";
public string Version() => "1.0.0";
public void Initialize()
{
_logger.LogInformation($"Initializing plugin: {Name()}");
}
public void Shutdown()
{
_logger.LogInformation($"Shutting down plugin: {Name()}");
}
public Dictionary<string, object> ExtractBytes(
byte[] content, string mimeType, Dictionary<string, object> config)
{
_logger.LogInformation($"Extracting {mimeType} ({content.Length} bytes)");
var result = new Dictionary<string, object> { { "content", "" }, { "mime_type", mimeType } };
if (string.IsNullOrEmpty((string?)result["content"]))
{
_logger.LogWarning("Extraction resulted in empty content");
}
return result;
}
}
import java.util.logging.Logger;
import java.util.logging.Level;
class MyPlugin implements PostProcessor {
private static final Logger logger = Logger.getLogger(MyPlugin.class.getName());
@Override
public ExtractionResult process(ExtractionResult result) {
logger.info("Processing " + result.mimeType() +
" (" + result.content().length() + " bytes)");
// Processing...
if (result.content().isEmpty()) {
logger.warning("Processing resulted in empty content");
}
return result;
}
}
import logging
logger = logging.getLogger(__name__)
class MyPlugin:
def name(self) -> str:
return "my-plugin"
def version(self) -> str:
return "1.0.0"
def initialize(self) -> None:
logger.info(f"Initializing plugin: {self.name()}")
def shutdown(self) -> None:
logger.info(f"Shutting down plugin: {self.name()}")
def extract_bytes(
self, content: bytes, mime_type: str, config: dict
) -> dict:
logger.info(f"Extracting {mime_type} ({len(content)} bytes)")
result: dict = {"content": "", "mime_type": mime_type}
if not result["content"]:
logger.warning("Extraction resulted in empty content")
return result
use log::{info, warn, error};
impl Plugin for MyPlugin {
fn initialize(&self) -> Result<()> {
info!("Initializing plugin: {}", self.name());
Ok(())
}
fn shutdown(&self) -> Result<()> {
info!("Shutting down plugin: {}", self.name());
Ok(())
}
}
#[async_trait]
impl DocumentExtractor for MyPlugin {
async fn extract_bytes(
&self,
content: &[u8],
mime_type: &str,
_config: &ExtractionConfig,
) -> Result<ExtractionResult> {
info!("Extracting {} ({} bytes)", mime_type, content.len());
let result = ExtractionResult::default();
if result.content.is_empty() {
warn!("Extraction resulted in empty content");
}
Ok(result)
}
}
Testing¶
using Xunit;
public class CustomExtractorTests
{
[Fact]
public void TestCustomExtractor()
{
var extractor = new CustomJsonExtractor();
var jsonData = System.Text.Encoding.UTF8.GetBytes(@"{""message"": ""Hello, world!""}");
var config = new Dictionary<string, object>();
var result = extractor.ExtractBytes(jsonData, "application/json", config);
Assert.Contains("Hello, world!", (string)result["content"]);
Assert.Equal("application/json", (string)result["mime_type"]);
}
}
import dev.kreuzberg.ExtractionResult;
import dev.kreuzberg.PostProcessor;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.*;
class PostProcessorTest {
@Test
void testWordCountProcessor() {
PostProcessor processor = result -> {
long count = result.getContent().split("\\s+").length;
Map<String, Object> metadata = new HashMap<>(result.getMetadata());
metadata.put("word_count", count);
return result;
};
ExtractionResult input = new ExtractionResult(
"Hello world test",
"text/plain",
new HashMap<>(),
java.util.List.of(),
java.util.List.of(),
java.util.List.of(),
java.util.List.of(),
true
);
ExtractionResult output = processor.process(input);
assertEquals(3, output.getMetadata().get("word_count"));
}
}
import pytest
from kreuzberg import ExtractionResult
def test_custom_extractor() -> None:
extractor = CustomJsonExtractor()
json_data: bytes = b'{"message": "Hello, world!"}'
config: dict = {}
result: ExtractionResult = extractor.extract_bytes(
json_data, "application/json", config
)
assert "Hello, world!" in result["content"]
assert result["mime_type"] == "application/json"
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_custom_extractor() {
let extractor = CustomJsonExtractor;
let json_data = br#"{"message": "Hello, world!"}"#;
let config = ExtractionConfig::default();
let result = extractor
.extract_bytes(json_data, "application/json", &config)
.await
.expect("Extraction failed");
assert!(result.content.contains("Hello, world!"));
assert_eq!(result.mime_type, "application/json");
}
}
Complete Example: PDF Metadata Extractor¶
using Kreuzberg;
public class PdfMetadataExtractor : IPostProcessor
{
private int _processedCount = 0;
public string Name() => "pdf_metadata_extractor";
public string Version() => "1.0.0";
public string Description() => "Extracts and enriches PDF metadata";
public string ProcessingStage() => "early";
public bool ShouldProcess(ExtractionResult result)
=> result.MimeType == "application/pdf";
public ExtractionResult Process(ExtractionResult result)
{
_processedCount++;
return result;
}
public void Initialize()
{
Console.WriteLine("PDF metadata extractor initialized");
}
public void Shutdown()
{
Console.WriteLine($"Processed {_processedCount} PDFs");
}
}
var processor = new PdfMetadataExtractor();
KreuzbergClient.RegisterPostProcessor(processor);
package main
import (
"encoding/json"
"log"
"sync/atomic"
"unsafe"
"github.com/kreuzberg-dev/kreuzberg/packages/go/v4"
)
/*
#cgo CFLAGS: -I${SRCDIR}/../../../crates/kreuzberg-ffi
#cgo LDFLAGS: -L${SRCDIR}/../../../target/release -L${SRCDIR}/../../../target/debug -lkreuzberg_ffi
#include "../../../crates/kreuzberg-ffi/kreuzberg.h"
#include <stdlib.h>
*/
import "C"
// pdfMetadataState tracks statistics about PDF processing
var pdfMetadataState = struct {
processedCount int64
}{
processedCount: 0,
}
// pdfMetadataExtractor enriches PDF extraction results with additional metadata
//export pdfMetadataExtractor
func pdfMetadataExtractor(resultJSON *C.char) *C.char {
jsonStr := C.GoString(resultJSON)
var result map[string]interface{}
if err := json.Unmarshal([]byte(jsonStr), &result); err != nil {
return C.CString("{\"error\":\"Failed to parse result JSON\"}")
}
// Only process PDFs
mimeType, ok := result["mime_type"].(string)
if !ok || mimeType != "application/pdf" {
// Return unchanged for non-PDF documents
outputJSON, err := json.Marshal(result)
if err != nil {
return C.CString("{\"error\":\"Failed to serialize result\"}")
}
return C.CString(string(outputJSON))
}
// Process PDF-specific metadata
metadata, ok := result["metadata"].(map[string]interface{})
if !ok {
metadata = make(map[string]interface{})
}
// Mark as processed by this processor
metadata["pdf_processed"] = true
// Add content statistics
content, ok := result["content"].(string)
if ok {
metadata["content_length"] = len(content)
}
// Increment processed count atomically
atomic.AddInt64(&pdfMetadataState.processedCount, 1)
metadata["pdf_processor_version"] = "1.0.0"
result["metadata"] = metadata
// Serialize back to JSON
outputJSON, err := json.Marshal(result)
if err != nil {
return C.CString("{\"error\":\"Failed to serialize result\"}")
}
return C.CString(string(outputJSON))
}
func main() {
// Register the post-processor with priority 80, early stage
if err := kreuzberg.RegisterPostProcessor("pdf_metadata_extractor", 80,
(C.PostProcessorCallback)(C.pdfMetadataExtractor)); err != nil {
log.Fatalf("failed to register post-processor: %v", err)
}
defer func() {
if err := kreuzberg.UnregisterPostProcessor("pdf_metadata_extractor"); err != nil {
log.Printf("warning: failed to unregister post-processor: %v", err)
}
log.Printf("Total PDFs processed: %d", atomic.LoadInt64(&pdfMetadataState.processedCount))
}()
// Extract PDF document
result, err := kreuzberg.ExtractFileSync("document.pdf", nil)
if err != nil {
log.Fatalf("extraction failed: %v", err)
}
log.Printf("PDF MIME type: %s", result.MimeType)
// Parse and display metadata
var metadata map[string]interface{}
if metaJSON, ok := result.MetadataJSON.(string); ok {
if err := json.Unmarshal([]byte(metaJSON), &metadata); err == nil {
if pdfProcessed, ok := metadata["pdf_processed"].(bool); ok && pdfProcessed {
log.Printf("PDF metadata extracted successfully")
if contentLen, ok := metadata["content_length"].(float64); ok {
log.Printf("Content length: %.0f bytes", contentLen)
}
}
}
}
}
import dev.kreuzberg.Kreuzberg;
import dev.kreuzberg.ExtractionResult;
import dev.kreuzberg.PostProcessor;
import dev.kreuzberg.KreuzbergException;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
public class PdfMetadataExtractorExample {
private static final Logger logger = Logger.getLogger(
PdfMetadataExtractorExample.class.getName()
);
public static void main(String[] args) {
AtomicInteger processedCount = new AtomicInteger(0);
PostProcessor pdfMetadata = result -> {
if (!result.getMimeType().equals("application/pdf")) {
return result;
}
processedCount.incrementAndGet();
Map<String, Object> metadata = new HashMap<>(result.getMetadata());
metadata.put("pdf_processed", true);
metadata.put("processing_timestamp", System.currentTimeMillis());
logger.info("Processed PDF: " + processedCount.get());
return result;
};
try {
Kreuzberg.registerPostProcessor("pdf-metadata-extractor", pdfMetadata, 50);
logger.info("PDF metadata extractor initialized");
ExtractionResult result = Kreuzberg.extractFile("document.pdf");
System.out.println("PDF processed: " + result.getMetadata().get("pdf_processed"));
logger.info("Processed " + processedCount.get() + " PDFs");
} catch (IOException | KreuzbergException e) {
e.printStackTrace();
}
}
}
require 'kreuzberg'
class PdfMetadataExtractor
def initialize
@count = 0
end
def call(result)
return result unless result['mime_type'] == 'application/pdf'
@count += 1
result['metadata'] ||= {}
result['metadata']['pdf_order'] = @count
result
end
end
extractor = PdfMetadataExtractor.new
Kreuzberg.register_post_processor('pdf_metadata', extractor)
config = Kreuzberg::Config::Extraction.new(
postprocessor: { enabled: true }
)
result = Kreuzberg.extract_file_sync('report.pdf', config: config)
puts "Metadata: #{result.metadata.inspect}"