Creating Plugins v4.0.0¶
Extend Kreuzberg with custom extractors, post-processors, OCR backends, and validators. Plugins can be written in Rust or Python, and are registered globally for use across all extraction calls.
WASM
Custom plugins are not supported in WASM environments. Use Python, Rust, or other native bindings.
Plugin Types¶
| Type | Purpose | Use case |
|---|---|---|
| DocumentExtractor | Extract content from file formats | New format support, override built-in extractors |
| PostProcessor | Transform extraction results | Metadata enrichment, content filtering, text normalization |
| OcrBackend | Perform OCR on images | Cloud OCR services, custom OCR engines |
| Validator | Validate extraction quality | Minimum content length, quality score thresholds |
All plugins must be thread-safe (Send + Sync in Rust, thread-safe in Python) and implement initialize() / shutdown() lifecycle methods.
Document Extractors¶
Add support for new file formats or override built-in extractors.
Implementation¶
use kreuzberg::plugins::{DocumentExtractor, Plugin};
use kreuzberg::{Result, ExtractionResult, ExtractionConfig, Metadata};
use async_trait::async_trait;
use std::path::Path;
struct CustomJsonExtractor;
impl Plugin for CustomJsonExtractor {
fn name(&self) -> &str { "custom-json-extractor" }
fn version(&self) -> String { "1.0.0".to_string() }
fn initialize(&self) -> Result<()> { Ok(()) }
fn shutdown(&self) -> Result<()> { Ok(()) }
}
#[async_trait]
impl DocumentExtractor for CustomJsonExtractor {
async fn extract_bytes(
&self,
content: &[u8],
_mime_type: &str,
_config: &ExtractionConfig,
) -> Result<ExtractionResult> {
let json: serde_json::Value = serde_json::from_slice(content)?;
let text = extract_text_from_json(&json);
Ok(ExtractionResult {
content: text,
mime_type: "application/json".to_string(),
metadata: Metadata::default(),
tables: vec![],
detected_languages: None,
chunks: None,
images: None,
})
}
fn supported_mime_types(&self) -> &[&str] {
&["application/json", "text/json"]
}
fn priority(&self) -> i32 { 50 }
}
fn extract_text_from_json(value: &serde_json::Value) -> String {
match value {
serde_json::Value::String(s) => format!("{}\n", s),
serde_json::Value::Array(arr) => arr.iter().map(extract_text_from_json).collect(),
serde_json::Value::Object(obj) => obj.values().map(extract_text_from_json).collect(),
_ => String::new(),
}
}
from kreuzberg import register_document_extractor, ExtractionResult
import json
class CustomJsonExtractor:
def name(self) -> str:
return "custom-json-extractor"
def version(self) -> str:
return "1.0.0"
def supported_mime_types(self) -> list[str]:
return ["application/json"]
def priority(self) -> int:
return 50
def extract_bytes(
self, content: bytes, mime_type: str, config: dict
) -> ExtractionResult:
data: dict = json.loads(content)
text: str = self._extract_text(data)
return {"content": text, "mime_type": "application/json"}
def _extract_text(self, obj: object) -> str:
if isinstance(obj, str):
return f"{obj}\n"
if isinstance(obj, list):
return "".join(self._extract_text(item) for item in obj)
if isinstance(obj, dict):
return "".join(self._extract_text(v) for v in obj.values())
return ""
def initialize(self) -> None:
pass
def shutdown(self) -> None:
pass
extractor: CustomJsonExtractor = CustomJsonExtractor()
register_document_extractor(extractor)
Registration¶
import {
listDocumentExtractors,
unregisterDocumentExtractor,
clearDocumentExtractors,
} from '@kreuzberg/node';
/**
* Note: Custom document extractors are not supported in TypeScript v4.0.
* Document extraction logic lives in the Rust core.
*
* You can list, unregister, or clear built-in extractors.
*/
// List all registered document extractors
const extractors = listDocumentExtractors();
console.log("Available extractors:", extractors);
// Unregister a specific extractor (use with caution)
unregisterDocumentExtractor("SomeExtractor");
// Clear all extractors (use with extreme caution)
// clearDocumentExtractors();
use kreuzberg::plugins::registry::get_document_extractor_registry;
use std::sync::Arc;
fn register_custom_extractor() -> kreuzberg::Result<()> {
let extractor = Arc::new(CustomJsonExtractor);
let registry = get_document_extractor_registry();
registry.write().unwrap().register(extractor)?;
Ok(())
}
package main
import (
"log"
"github.com/kreuzberg-dev/kreuzberg/packages/go/v4"
)
func main() {
// Register custom extractor with priority 50
if err := kreuzberg.RegisterDocumentExtractor("custom-json-extractor", 50); err != nil {
log.Fatalf("register extractor failed: %v", err)
}
result, err := kreuzberg.ExtractFileSync("document.json", nil)
if err != nil {
log.Fatalf("extract failed: %v", err)
}
log.Printf("Extracted content length: %d", len(result.Content))
}
import dev.kreuzberg.Kreuzberg;
import dev.kreuzberg.ExtractionResult;
import dev.kreuzberg.KreuzbergException;
import java.io.IOException;
public class CustomExtractorExample {
public static void main(String[] args) {
try {
ExtractionResult result = Kreuzberg.extractFile("document.json");
System.out.println("Extracted content length: " + result.getContent().length());
} catch (IOException | KreuzbergException e) {
e.printStackTrace();
}
}
}
using Kreuzberg;
public class CustomExtractor : IDocumentExtractor
{
public string Name() => "custom";
public string Version() => "1.0.0";
public Dictionary<string, object> ExtractBytes(byte[] data, string mimeType, Dictionary<string, object> config)
{
return new Dictionary<string, object>
{
{ "content", "Extracted content" },
{ "mime_type", mimeType }
};
}
}
var extractor = new CustomExtractor();
KreuzbergClient.RegisterDocumentExtractor(extractor);
Console.WriteLine("Extractor registered");
require 'kreuzberg'
# Register custom extractor with priority 50
Kreuzberg.register_document_extractor(
name: "custom-json-extractor",
extractor: ->(content, mime_type, config) {
JSON.parse(content.to_s)
},
priority: 50
)
result = Kreuzberg.extract_file("document.json")
puts "Extracted content length: #{result.content.length}"
library(kreuzberg)
custom_extractor <- function(path, mime_type) {
content <- sprintf("Extracted from %s (%s)", path, mime_type)
return(list(
content = content,
mime_type = mime_type,
pages = 1L
))
}
register_document_extractor("custom_format", custom_extractor)
result <- extract_file_sync("custom_document.xyz", "application/custom", NULL)
cat(sprintf("Custom extractor result:\n"))
cat(sprintf("Content: %s\n", result$content))
cat(sprintf("Mime type: %s\n", result$mime_type))
Priority System¶
When multiple extractors support the same MIME type, the highest priority wins:
| Range | Level |
|---|---|
| 0–25 | Fallback / low-quality |
| 26–49 | Alternative |
| 50 | Default (built-in) |
| 51–75 | Enhanced / premium |
| 76–100 | Specialized / high-priority |
Post-Processors¶
Transform and enrich results after extraction. Processors execute in three stages:
- Early — Foundational: language detection, quality scoring, text normalization
- Middle — Transformation: keyword extraction, token reduction, summarization
- Late — Final: custom metadata, analytics, output formatting
Implementation¶
use kreuzberg::plugins::{Plugin, PostProcessor, ProcessingStage};
use kreuzberg::{Result, ExtractionResult, ExtractionConfig};
use async_trait::async_trait;
struct WordCountProcessor;
impl Plugin for WordCountProcessor {
fn name(&self) -> &str { "word-count" }
fn version(&self) -> String { "1.0.0".to_string() }
fn initialize(&self) -> Result<()> { Ok(()) }
fn shutdown(&self) -> Result<()> { Ok(()) }
}
#[async_trait]
impl PostProcessor for WordCountProcessor {
async fn process(
&self,
result: &mut ExtractionResult,
_config: &ExtractionConfig
) -> Result<()> {
let word_count = result.content.split_whitespace().count();
result.processing_warnings.push(ProcessingWarning {
source: "word-count".to_string(),
message: format!("Processed with word count: {}", word_count)
});
Ok(())
}
fn processing_stage(&self) -> ProcessingStage {
ProcessingStage::Early
}
fn should_process(
&self,
result: &ExtractionResult,
_config: &ExtractionConfig
) -> bool {
!result.content.is_empty()
}
}
from kreuzberg import register_post_processor, ExtractionResult
class WordCountProcessor:
def name(self) -> str:
return "word_count"
def version(self) -> str:
return "1.0.0"
def processing_stage(self) -> str:
return "early"
def process(self, result: ExtractionResult) -> ExtractionResult:
word_count: int = len(result.content.split())
result.metadata["word_count"] = word_count
return result
def should_process(self, result: ExtractionResult) -> bool:
return bool(result.content)
def initialize(self) -> None:
pass
def shutdown(self) -> None:
pass
processor: WordCountProcessor = WordCountProcessor()
register_post_processor(processor)
Conditional Processing¶
from kreuzberg import ExtractionResult, register_post_processor
class PdfOnlyProcessor:
def name(self) -> str:
return "pdf-only-processor"
def version(self) -> str:
return "1.0.0"
def process(self, result: ExtractionResult) -> ExtractionResult:
return result
def should_process(self, result: ExtractionResult) -> bool:
return result.mime_type == "application/pdf"
processor: PdfOnlyProcessor = PdfOnlyProcessor()
register_post_processor(processor)
impl PostProcessor for PdfOnlyProcessor {
async fn process(
&self,
result: &mut ExtractionResult,
_config: &ExtractionConfig
) -> Result<()> {
Ok(())
}
fn processing_stage(&self) -> ProcessingStage {
ProcessingStage::Middle
}
fn should_process(
&self,
result: &ExtractionResult,
_config: &ExtractionConfig
) -> bool {
result.mime_type == "application/pdf"
}
}
package main
import (
"encoding/json"
"log"
"unsafe"
"github.com/kreuzberg-dev/kreuzberg/packages/go/v4"
)
/*
#cgo CFLAGS: -I${SRCDIR}/../../../crates/kreuzberg-ffi
#cgo LDFLAGS: -L${SRCDIR}/../../../target/release -L${SRCDIR}/../../../target/debug -lkreuzberg_ffi
#include "../../../crates/kreuzberg-ffi/kreuzberg.h"
#include <stdlib.h>
*/
import "C"
// pdfOnlyProcessor applies PDF-specific processing logic only to PDF documents
//export pdfOnlyProcessor
func pdfOnlyProcessor(resultJSON *C.char) *C.char {
jsonStr := C.GoString(resultJSON)
var result map[string]interface{}
if err := json.Unmarshal([]byte(jsonStr), &result); err != nil {
return C.CString("{\"error\":\"Failed to parse result JSON\"}")
}
// Check MIME type - only process PDFs
mimeType, ok := result["mime_type"].(string)
if !ok || mimeType != "application/pdf" {
// Return unchanged for non-PDF documents
outputJSON, err := json.Marshal(result)
if err != nil {
return C.CString("{\"error\":\"Failed to serialize result\"}")
}
return C.CString(string(outputJSON))
}
// Perform PDF-specific processing
metadata, ok := result["metadata"].(map[string]interface{})
if !ok {
metadata = make(map[string]interface{})
}
// Example PDF-specific processing:
// - Extract tables as structured data
// - Handle PDF-specific formatting
// - Preserve document hierarchy
metadata["pdf_specific_processing"] = true
metadata["processor_type"] = "pdf_only"
// Check for tables in PDF
if tablesJSON, ok := result["tables_json"].(string); ok && tablesJSON != "" {
var tables []interface{}
if err := json.Unmarshal([]byte(tablesJSON), &tables); err == nil {
metadata["table_count"] = len(tables)
}
}
result["metadata"] = metadata
// Serialize back to JSON
outputJSON, err := json.Marshal(result)
if err != nil {
return C.CString("{\"error\":\"Failed to serialize result\"}")
}
return C.CString(string(outputJSON))
}
func main() {
// Register the post-processor with priority 70
if err := kreuzberg.RegisterPostProcessor("pdf_only_processor", 70,
(C.PostProcessorCallback)(C.pdfOnlyProcessor)); err != nil {
log.Fatalf("failed to register post-processor: %v", err)
}
defer func() {
if err := kreuzberg.UnregisterPostProcessor("pdf_only_processor"); err != nil {
log.Printf("warning: failed to unregister post-processor: %v", err)
}
}()
// Process multiple documents - processor will only affect PDFs
files := []string{
"document.pdf",
"image.jpg",
"spreadsheet.xlsx",
}
for _, file := range files {
result, err := kreuzberg.ExtractFileSync(file, nil)
if err != nil {
log.Printf("Warning: extraction failed for %s: %v", file, err)
continue
}
// Parse metadata to check if PDF processing occurred
var metadata map[string]interface{}
if metaJSON, ok := result.MetadataJSON.(string); ok {
if err := json.Unmarshal([]byte(metaJSON), &metadata); err == nil {
if pdfProcessing, ok := metadata["pdf_specific_processing"].(bool); ok && pdfProcessing {
log.Printf("PDF-specific processing applied to: %s", file)
if tableCount, ok := metadata["table_count"].(float64); ok {
log.Printf(" Tables found: %.0f", tableCount)
}
} else {
log.Printf("Skipped PDF processor for: %s (MIME: %s)", file, result.MimeType)
}
}
}
}
}
import dev.kreuzberg.PostProcessor;
import java.util.HashMap;
import java.util.Map;
PostProcessor pdfOnly = result -> {
if (!result.getMimeType().equals("application/pdf")) {
return result;
}
Map<String, Object> metadata = new HashMap<>(result.getMetadata());
metadata.put("pdf_processed", true);
return result;
};
using Kreuzberg;
public class PdfOnlyProcessor : IPostProcessor
{
public string Name() => "pdf-only-processor";
public string Version() => "1.0.0";
public ExtractionResult Process(ExtractionResult result) => result;
public bool ShouldProcess(ExtractionResult result)
=> result.MimeType == "application/pdf";
}
var processor = new PdfOnlyProcessor();
KreuzbergClient.RegisterPostProcessor(processor);
OCR Backends¶
Integrate cloud OCR services or custom engines:
Implementation¶
use kreuzberg::plugins::{Plugin, OcrBackend, OcrBackendType};
use kreuzberg::{Result, ExtractionResult, OcrConfig, Metadata};
use async_trait::async_trait;
use std::path::Path;
struct CloudOcrBackend {
api_key: String,
supported_langs: Vec<String>,
}
impl Plugin for CloudOcrBackend {
fn name(&self) -> &str { "cloud-ocr" }
fn version(&self) -> String { "1.0.0".to_string() }
fn initialize(&self) -> Result<()> { Ok(()) }
fn shutdown(&self) -> Result<()> { Ok(()) }
}
#[async_trait]
impl OcrBackend for CloudOcrBackend {
async fn process_image(
&self,
image_bytes: &[u8],
config: &OcrConfig,
) -> Result<ExtractionResult> {
let text = self.call_cloud_api(image_bytes, &config.language).await?;
Ok(ExtractionResult {
content: text,
mime_type: "text/plain".to_string(),
metadata: Metadata::default(),
tables: vec![],
detected_languages: None,
chunks: None,
images: None,
})
}
fn supports_language(&self, lang: &str) -> bool {
self.supported_langs.iter().any(|l| l == lang)
}
fn backend_type(&self) -> OcrBackendType {
OcrBackendType::Custom
}
fn supported_languages(&self) -> Vec<String> {
self.supported_langs.clone()
}
}
impl CloudOcrBackend {
async fn call_cloud_api(
&self,
image: &[u8],
language: &str
) -> Result<String> {
Ok("Extracted text".to_string())
}
}
from kreuzberg import register_ocr_backend
import httpx
class CloudOcrBackend:
def __init__(self, api_key: str):
self.api_key: str = api_key
self.langs: list[str] = ["eng", "deu", "fra"]
def name(self) -> str:
return "cloud-ocr"
def version(self) -> str:
return "1.0.0"
def supported_languages(self) -> list[str]:
return self.langs
def process_image(self, image_bytes: bytes, config: dict) -> dict:
with httpx.Client() as client:
response = client.post(
"https://api.example.com/ocr",
files={"image": image_bytes},
json={"language": config.get("language", "eng")},
)
text: str = response.json()["text"]
return {"content": text, "mime_type": "text/plain"}
def initialize(self) -> None:
pass
def shutdown(self) -> None:
pass
backend: CloudOcrBackend = CloudOcrBackend(api_key="your-api-key")
register_ocr_backend(backend)
import dev.kreuzberg.*;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.net.http.*;
import java.net.URI;
public class CloudOcrExample {
public static void main(String[] args) {
Arena callbackArena = Arena.ofAuto();
String apiKey = "your-api-key";
OcrBackend cloudOcr = (imageBytes, imageLength, configJson) -> {
try {
// Read image bytes from native memory
byte[] image = imageBytes.reinterpret(imageLength)
.toArray(ValueLayout.JAVA_BYTE);
// Read config JSON
String config = configJson.reinterpret(Long.MAX_VALUE)
.getString(0);
// Call cloud OCR API
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("https://api.example.com/ocr"))
.header("Authorization", "Bearer " + apiKey)
.POST(HttpRequest.BodyPublishers.ofByteArray(image))
.build();
HttpResponse<String> response = client.send(request,
HttpResponse.BodyHandlers.ofString());
String text = parseTextFromResponse(response.body());
// Return result as C string
return callbackArena.allocateFrom(text);
} catch (Exception e) {
return MemorySegment.NULL;
}
};
try (Arena arena = Arena.ofConfined()) {
Kreuzberg.registerOcrBackend("cloud-ocr", cloudOcr, arena);
// Use custom OCR backend in extraction
// Note: Requires ExtractionConfig with OCR enabled
ExtractionResult result = Kreuzberg.extractFileSync("scanned.pdf");
} catch (Exception e) {
e.printStackTrace();
}
}
private static String parseTextFromResponse(String json) {
// Parse JSON response and extract text field
return json; // Simplified
}
}
using Kreuzberg;
using System.Net.Http;
using System.Text.Json;
public class CloudOcrBackend : IOcrBackend
{
private readonly string _apiKey;
private readonly List<string> _langs = new() { "eng", "deu", "fra" };
public CloudOcrBackend(string apiKey)
{
_apiKey = apiKey;
}
public string Name() => "cloud-ocr";
public string Version() => "1.0.0";
public List<string> SupportedLanguages() => _langs;
public Dictionary<string, object> ProcessImage(byte[] imageBytes, Dictionary<string, object> config)
{
using (var client = new HttpClient())
{
using (var form = new MultipartFormDataContent())
{
form.Add(new ByteArrayContent(imageBytes), "image");
var lang = config.ContainsKey("language") ? config["language"].ToString() : "eng";
form.Add(new StringContent(lang), "language");
var response = client.PostAsync("https://api.example.com/ocr", form).Result;
var json = response.Content.ReadAsStringAsync().Result;
var doc = JsonDocument.Parse(json);
var text = doc.RootElement.GetProperty("text").GetString();
return new Dictionary<string, object>
{
{ "content", text },
{ "mime_type", "text/plain" }
};
}
}
}
public void Initialize() { }
public void Shutdown() { }
}
var backend = new CloudOcrBackend(apiKey: "your-api-key");
KreuzbergClient.RegisterOcrBackend(backend);
require 'kreuzberg'
require 'net/http'
class CloudOcrBackend
def name
'cloud-ocr'
end
def supported_languages
%w[eng fra deu]
end
def process_image(image_data, language)
uri = URI('https://api.example.com/ocr')
req = Net::HTTP::Post.new(uri)
req['Authorization'] = "Bearer #{ENV['OCR_API_KEY']}"
req.body = image_data
res = Net::HTTP.start(uri.hostname, uri.port, use_ssl: true) { |h| h.request(req) }
raise Kreuzberg::Errors::OCRError, res.message unless res.is_a?(Net::HTTPSuccess)
{ content: JSON.parse(res.body)['text'] }
rescue StandardError => e
raise Kreuzberg::Errors::OCRError, e.message
end
end
Kreuzberg.register_ocr_backend(CloudOcrBackend.new)
config = Kreuzberg::Config::Extraction.new(
ocr: Kreuzberg::Config::OCR.new(backend: 'cloud-ocr')
)
Kreuzberg.extract_file_sync('doc.pdf', config: config)
library(kreuzberg)
custom_ocr_backend <- function(image_path, language) {
cat(sprintf("Processing image: %s\n", image_path))
return(sprintf("Extracted text from %s", image_path))
}
register_ocr_backend("custom_cloud", custom_ocr_backend)
ocr_cfg <- ocr_config(backend = "custom_cloud", language = "en")
config <- extraction_config(force_ocr = TRUE, ocr = ocr_cfg)
result <- extract_file_sync("document.pdf", "application/pdf", config)
cat(sprintf("Custom backend result: %d chars\n", nchar(result$content)))
Registration¶
Once you've implemented a backend (see above), register it so the extraction pipeline can use it. Set the backend name in OcrConfig to route OCR through your custom engine:
from kreuzberg import register_ocr_backend, unregister_ocr_backend
backend = CloudOcrBackend(api_key="your-api-key")
register_ocr_backend(backend)
from kreuzberg import extract_file_sync, ExtractionConfig, OcrConfig
config = ExtractionConfig(ocr=OcrConfig(backend="cloud-ocr", language="eng"))
result = extract_file_sync("scanned.pdf", config=config)
unregister_ocr_backend("cloud-ocr")
Using EasyOCR (Built-in)¶
Kreuzberg ships with an EasyOCR backend that supports 80+ languages and optional GPU acceleration. You don't need to implement anything — just point OcrConfig at it:
from kreuzberg import extract_file_sync, ExtractionConfig, OcrConfig
config: ExtractionConfig = ExtractionConfig(
ocr=OcrConfig(backend="easyocr", language="en")
)
# EasyOCR-specific options (use_gpu, beam_width, etc.) go in easyocr_kwargs,
# not in OcrConfig — OcrConfig only accepts backend, language, and backend-specific configs.
result = extract_file_sync("scanned.pdf", config=config, easyocr_kwargs={"use_gpu": True})
content: str = result.content
preview: str = content[:100]
total_length: int = len(content)
print(f"Extracted content (preview): {preview}")
print(f"Total characters: {total_length}")
Validators¶
Enforce quality requirements on extraction results.
Warning
Validation errors cause extraction to fail. Use validators for critical quality checks only.
use kreuzberg::plugins::{Plugin, Validator};
use kreuzberg::{Result, ExtractionResult, ExtractionConfig, KreuzbergError};
use async_trait::async_trait;
struct MinLengthValidator {
min_length: usize,
}
impl Plugin for MinLengthValidator {
fn name(&self) -> &str { "min-length-validator" }
fn version(&self) -> String { "1.0.0".to_string() }
fn initialize(&self) -> Result<()> { Ok(()) }
fn shutdown(&self) -> Result<()> { Ok(()) }
}
#[async_trait]
impl Validator for MinLengthValidator {
async fn validate(
&self,
result: &ExtractionResult,
_config: &ExtractionConfig,
) -> Result<()> {
if result.content.len() < self.min_length {
return Err(KreuzbergError::validation(format!(
"Content too short: {} < {} characters",
result.content.len(),
self.min_length
)));
}
Ok(())
}
fn priority(&self) -> i32 {
100
}
}
from kreuzberg import register_validator, ExtractionResult, ValidationError
class MinLengthValidator:
def __init__(self, min_length: int = 100):
self.min_length: int = min_length
def name(self) -> str:
return "min_length_validator"
def version(self) -> str:
return "1.0.0"
def priority(self) -> int:
return 100
def validate(self, result: ExtractionResult) -> None:
content_len: int = len(result.content)
if content_len < self.min_length:
raise ValidationError(f"Content too short: {content_len}")
def should_validate(self, result: ExtractionResult) -> bool:
return True
def initialize(self) -> None:
pass
def shutdown(self) -> None:
pass
validator: MinLengthValidator = MinLengthValidator(min_length=100)
register_validator(validator)
import dev.kreuzberg.Kreuzberg;
import dev.kreuzberg.ExtractionResult;
import dev.kreuzberg.Validator;
import dev.kreuzberg.ValidationException;
import dev.kreuzberg.KreuzbergException;
import java.io.IOException;
public class MinLengthValidatorExample {
public static void main(String[] args) {
int minLength = 100;
Validator minLengthValidator = result -> {
if (result.getContent().length() < minLength) {
throw new ValidationException(
"Content too short: " + result.getContent().length() +
" < " + minLength
);
}
};
try {
Kreuzberg.registerValidator("min-length", minLengthValidator, 100);
ExtractionResult result = Kreuzberg.extractFile("document.pdf");
System.out.println("Validation passed!");
} catch (ValidationException e) {
System.err.println("Validation failed: " + e.getMessage());
} catch (IOException | KreuzbergException e) {
e.printStackTrace();
}
}
}
using Kreuzberg;
public class MinLengthValidator : IValidator
{
private readonly int _minLength;
public MinLengthValidator(int minLength = 100)
{
_minLength = minLength;
}
public string Name() => "min_length_validator";
public string Version() => "1.0.0";
public int Priority() => 100;
public void Validate(Dictionary<string, object> result)
{
var contentLength = result["content"].ToString()?.Length ?? 0;
if (contentLength < _minLength)
throw new ValidationError($"Content too short: {contentLength}");
}
public bool ShouldValidate(Dictionary<string, object> result) => true;
public void Initialize() { }
public void Shutdown() { }
}
var validator = new MinLengthValidator(minLength: 100);
KreuzbergClient.RegisterValidator(validator);
Quality Score Validator¶
#[async_trait]
impl Validator for QualityValidator {
async fn validate(
&self,
result: &ExtractionResult,
_config: &ExtractionConfig,
) -> Result<()> {
let score = result.metadata
.additional
.get("quality_score")
.and_then(|v| v.as_f64())
.unwrap_or(0.0);
if score < 0.5 {
return Err(KreuzbergError::validation(format!(
"Quality score too low: {:.2} < 0.50",
score
)));
}
Ok(())
}
}
from kreuzberg import ExtractionResult, ValidationError, register_validator
class QualityValidator:
def name(self) -> str:
return "quality-validator"
def version(self) -> str:
return "1.0.0"
def validate(self, result: ExtractionResult) -> None:
score: float = result.metadata.get("quality_score", 0.0)
if score < 0.5:
raise ValidationError(
f"Quality score too low: {score:.2f}"
)
validator: QualityValidator = QualityValidator()
register_validator(validator)
using Kreuzberg;
public class QualityValidator : IValidator
{
public string Name() => "quality-validator";
public string Version() => "1.0.0";
public void Validate(ExtractionResult result)
{
var score = result.QualityScore;
if (score < 0.5)
throw new ValidationError($"Quality score too low: {score:F2}");
}
public bool ShouldValidate(Dictionary<string, object> result) => true;
public int Priority() => 100;
public void Initialize() { }
public void Shutdown() { }
}
var validator = new QualityValidator();
KreuzbergClient.RegisterValidator(validator);
Plugin Management¶
Listing¶
from kreuzberg import (
list_document_extractors,
list_post_processors,
list_ocr_backends,
list_validators,
)
extractors: list[str] = list_document_extractors()
processors: list[str] = list_post_processors()
ocr_backends: list[str] = list_ocr_backends()
validators: list[str] = list_validators()
print(f"Extractors: {extractors}")
print(f"Processors: {processors}")
print(f"OCR backends: {ocr_backends}")
print(f"Validators: {validators}")
use kreuzberg::plugins::registry::*;
let registry = get_document_extractor_registry();
let extractors = registry.list()?;
println!("Registered extractors: {:?}", extractors);
let registry = get_post_processor_registry();
let processors = registry.list()?;
println!("Registered processors: {:?}", processors);
let registry = get_ocr_backend_registry();
let backends = registry.list()?;
println!("Registered OCR backends: {:?}", backends);
let registry = get_validator_registry();
let validators = registry.list()?;
println!("Registered validators: {:?}", validators);
using Kreuzberg;
var extractors = KreuzbergClient.ListDocumentExtractors();
var processors = KreuzbergClient.ListPostProcessors();
var ocrBackends = KreuzbergClient.ListOcrBackends();
var validators = KreuzbergClient.ListValidators();
Console.WriteLine($"Extractors: {string.Join(", ", extractors)}");
Console.WriteLine($"Processors: {string.Join(", ", processors)}");
Console.WriteLine($"OCR backends: {string.Join(", ", ocrBackends)}");
Console.WriteLine($"Validators: {string.Join(", ", validators)}");
Unregistering¶
from kreuzberg import (
unregister_document_extractor,
unregister_post_processor,
unregister_ocr_backend,
unregister_validator,
)
names: list[str] = [
"custom-json-extractor",
"word_count",
"cloud-ocr",
"min_length_validator",
]
unregister_document_extractor(names[0])
unregister_post_processor(names[1])
unregister_ocr_backend(names[2])
unregister_validator(names[3])
using Kreuzberg;
var names = new List<string>
{
"custom-json-extractor",
"word_count",
"cloud-ocr",
"min_length_validator"
};
KreuzbergClient.UnregisterDocumentExtractor(names[0]);
KreuzbergClient.UnregisterPostProcessor(names[1]);
KreuzbergClient.UnregisterOcrBackend(names[2]);
KreuzbergClient.UnregisterValidator(names[3]);
Clearing All¶
Thread Safety¶
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
use kreuzberg::KreuzbergError;
struct StatefulPlugin {
call_count: AtomicUsize,
cache: Mutex<HashMap<String, String>>,
}
impl Plugin for StatefulPlugin {
fn name(&self) -> &str { "stateful-plugin" }
fn version(&self) -> String { "1.0.0".to_string() }
fn initialize(&self) -> Result<()> {
self.call_count.store(0, Ordering::Release);
Ok(())
}
fn shutdown(&self) -> Result<()> {
let count = self.call_count.load(Ordering::Acquire);
println!("Plugin called {} times", count);
Ok(())
}
}
#[async_trait]
impl PostProcessor for StatefulPlugin {
async fn process(
&self,
result: &mut ExtractionResult,
_config: &ExtractionConfig
) -> Result<()> {
self.call_count.fetch_add(1, Ordering::AcqRel);
let mut cache = self.cache.lock()
.map_err(|_| KreuzbergError::plugin("Cache lock poisoned"))?;
cache.insert("last_mime".to_string(), result.mime_type.clone());
Ok(())
}
fn processing_stage(&self) -> ProcessingStage {
ProcessingStage::Middle
}
}
import threading
from kreuzberg import ExtractionResult
class StatefulPlugin:
def __init__(self):
self.lock: threading.Lock = threading.Lock()
self.call_count: int = 0
self.cache: dict = {}
def name(self) -> str:
return "stateful-plugin"
def version(self) -> str:
return "1.0.0"
def process(self, result: ExtractionResult) -> ExtractionResult:
with self.lock:
self.call_count += 1
self.cache["last_mime"] = result.mime_type
return result
def initialize(self) -> None:
pass
def shutdown(self) -> None:
pass
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
class StatefulPlugin implements PostProcessor {
// Use atomic types for simple counters
private final AtomicInteger callCount = new AtomicInteger(0);
// Use concurrent collections for complex state
private final ConcurrentHashMap<String, String> cache = new ConcurrentHashMap<>();
@Override
public ExtractionResult process(ExtractionResult result) {
// Increment counter atomically
callCount.incrementAndGet();
// Update cache (thread-safe)
cache.put("last_mime", result.mimeType());
return result;
}
public int getCallCount() {
return callCount.get();
}
}
using System.Collections.Generic;
using System.Threading;
public class StatefulPlugin
{
private readonly object _lock = new();
private int _callCount = 0;
private readonly Dictionary<string, object> _cache = new();
public string Name() => "stateful-plugin";
public string Version() => "1.0.0";
public Dictionary<string, object> Process(Dictionary<string, object> result)
{
lock (_lock)
{
_callCount++;
_cache["last_mime"] = result["mime_type"];
}
return result;
}
public void Initialize() { }
public void Shutdown() { }
}
Best Practices¶
Naming: Use kebab-case (my-custom-plugin), lowercase only, no spaces or special characters.
Logging¶
import logging
logger = logging.getLogger(__name__)
class MyPlugin:
def name(self) -> str:
return "my-plugin"
def version(self) -> str:
return "1.0.0"
def initialize(self) -> None:
logger.info(f"Initializing plugin: {self.name()}")
def shutdown(self) -> None:
logger.info(f"Shutting down plugin: {self.name()}")
def extract_bytes(
self, content: bytes, mime_type: str, config: dict
) -> dict:
logger.info(f"Extracting {mime_type} ({len(content)} bytes)")
result: dict = {"content": "", "mime_type": mime_type}
if not result["content"]:
logger.warning("Extraction resulted in empty content")
return result
use log::{info, warn, error};
impl Plugin for MyPlugin {
fn initialize(&self) -> Result<()> {
info!("Initializing plugin: {}", self.name());
Ok(())
}
fn shutdown(&self) -> Result<()> {
info!("Shutting down plugin: {}", self.name());
Ok(())
}
}
#[async_trait]
impl DocumentExtractor for MyPlugin {
async fn extract_bytes(
&self,
content: &[u8],
mime_type: &str,
_config: &ExtractionConfig,
) -> Result<ExtractionResult> {
info!("Extracting {} ({} bytes)", mime_type, content.len());
let result = ExtractionResult::default();
if result.content.is_empty() {
warn!("Extraction resulted in empty content");
}
Ok(result)
}
}
import java.util.logging.Logger;
import java.util.logging.Level;
class MyPlugin implements PostProcessor {
private static final Logger logger = Logger.getLogger(MyPlugin.class.getName());
@Override
public ExtractionResult process(ExtractionResult result) {
logger.info("Processing " + result.mimeType() +
" (" + result.content().length() + " bytes)");
// Processing...
if (result.content().isEmpty()) {
logger.warning("Processing resulted in empty content");
}
return result;
}
}
using Kreuzberg;
using Microsoft.Extensions.Logging;
public class MyPlugin
{
private readonly ILogger _logger;
public MyPlugin(ILogger logger)
{
_logger = logger;
}
public string Name() => "my-plugin";
public string Version() => "1.0.0";
public void Initialize()
{
_logger.LogInformation($"Initializing plugin: {Name()}");
}
public void Shutdown()
{
_logger.LogInformation($"Shutting down plugin: {Name()}");
}
public Dictionary<string, object> ExtractBytes(
byte[] content, string mimeType, Dictionary<string, object> config)
{
_logger.LogInformation($"Extracting {mimeType} ({content.Length} bytes)");
var result = new Dictionary<string, object> { { "content", "" }, { "mime_type", mimeType } };
if (string.IsNullOrEmpty((string?)result["content"]))
{
_logger.LogWarning("Extraction resulted in empty content");
}
return result;
}
}
Testing¶
import pytest
from kreuzberg import ExtractionResult
def test_custom_extractor() -> None:
extractor = CustomJsonExtractor()
json_data: bytes = b'{"message": "Hello, world!"}'
config: dict = {}
result: ExtractionResult = extractor.extract_bytes(
json_data, "application/json", config
)
assert "Hello, world!" in result.content
assert result.mime_type == "application/json"
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_custom_extractor() {
let extractor = CustomJsonExtractor;
let json_data = br#"{"message": "Hello, world!"}"#;
let config = ExtractionConfig::default();
let result = extractor
.extract_bytes(json_data, "application/json", &config)
.await
.expect("Extraction failed");
assert!(result.content.contains("Hello, world!"));
assert_eq!(result.mime_type, "application/json");
}
}
import dev.kreuzberg.ExtractionResult;
import dev.kreuzberg.PostProcessor;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.*;
class PostProcessorTest {
@Test
void testWordCountProcessor() {
PostProcessor processor = result -> {
long count = result.getContent().split("\\s+").length;
Map<String, Object> metadata = new HashMap<>(result.getMetadata());
metadata.put("word_count", count);
return result;
};
ExtractionResult input = new ExtractionResult(
"Hello world test",
"text/plain",
new HashMap<>(),
java.util.List.of(),
java.util.List.of(),
java.util.List.of(),
java.util.List.of(),
true
);
ExtractionResult output = processor.process(input);
assertEquals(3, output.getMetadata().get("word_count"));
}
}
using Xunit;
public class CustomExtractorTests
{
[Fact]
public void TestCustomExtractor()
{
var extractor = new CustomJsonExtractor();
var jsonData = System.Text.Encoding.UTF8.GetBytes(@"{""message"": ""Hello, world!""}");
var config = new Dictionary<string, object>();
var result = extractor.ExtractBytes(jsonData, "application/json", config);
Assert.Contains("Hello, world!", (string)result["content"]);
Assert.Equal("application/json", (string)result["mime_type"]);
}
}
Complete Example: PDF Metadata Extractor¶
from kreuzberg import register_post_processor, ExtractionResult
import logging
logger = logging.getLogger(__name__)
class PdfMetadataExtractor:
def __init__(self):
self.processed_count: int = 0
def name(self) -> str:
return "pdf_metadata_extractor"
def version(self) -> str:
return "1.0.0"
def description(self) -> str:
return "Extracts and enriches PDF metadata"
def processing_stage(self) -> str:
return "early"
def should_process(self, result: ExtractionResult) -> bool:
return result.mime_type == "application/pdf"
def process(self, result: ExtractionResult) -> ExtractionResult:
self.processed_count += 1
result.metadata["pdf_processed"] = True
return result
def initialize(self) -> None:
logger.info("PDF metadata extractor initialized")
def shutdown(self) -> None:
logger.info(f"Processed {self.processed_count} PDFs")
processor: PdfMetadataExtractor = PdfMetadataExtractor()
register_post_processor(processor)
package main
import (
"encoding/json"
"log"
"sync/atomic"
"unsafe"
"github.com/kreuzberg-dev/kreuzberg/packages/go/v4"
)
/*
#cgo CFLAGS: -I${SRCDIR}/../../../crates/kreuzberg-ffi
#cgo LDFLAGS: -L${SRCDIR}/../../../target/release -L${SRCDIR}/../../../target/debug -lkreuzberg_ffi
#include "../../../crates/kreuzberg-ffi/kreuzberg.h"
#include <stdlib.h>
*/
import "C"
// pdfMetadataState tracks statistics about PDF processing
var pdfMetadataState = struct {
processedCount int64
}{
processedCount: 0,
}
// pdfMetadataExtractor enriches PDF extraction results with additional metadata
//export pdfMetadataExtractor
func pdfMetadataExtractor(resultJSON *C.char) *C.char {
jsonStr := C.GoString(resultJSON)
var result map[string]interface{}
if err := json.Unmarshal([]byte(jsonStr), &result); err != nil {
return C.CString("{\"error\":\"Failed to parse result JSON\"}")
}
// Only process PDFs
mimeType, ok := result["mime_type"].(string)
if !ok || mimeType != "application/pdf" {
// Return unchanged for non-PDF documents
outputJSON, err := json.Marshal(result)
if err != nil {
return C.CString("{\"error\":\"Failed to serialize result\"}")
}
return C.CString(string(outputJSON))
}
// Process PDF-specific metadata
metadata, ok := result["metadata"].(map[string]interface{})
if !ok {
metadata = make(map[string]interface{})
}
// Mark as processed by this processor
metadata["pdf_processed"] = true
// Add content statistics
content, ok := result["content"].(string)
if ok {
metadata["content_length"] = len(content)
}
// Increment processed count atomically
atomic.AddInt64(&pdfMetadataState.processedCount, 1)
metadata["pdf_processor_version"] = "1.0.0"
result["metadata"] = metadata
// Serialize back to JSON
outputJSON, err := json.Marshal(result)
if err != nil {
return C.CString("{\"error\":\"Failed to serialize result\"}")
}
return C.CString(string(outputJSON))
}
func main() {
// Register the post-processor with priority 80, early stage
if err := kreuzberg.RegisterPostProcessor("pdf_metadata_extractor", 80,
(C.PostProcessorCallback)(C.pdfMetadataExtractor)); err != nil {
log.Fatalf("failed to register post-processor: %v", err)
}
defer func() {
if err := kreuzberg.UnregisterPostProcessor("pdf_metadata_extractor"); err != nil {
log.Printf("warning: failed to unregister post-processor: %v", err)
}
log.Printf("Total PDFs processed: %d", atomic.LoadInt64(&pdfMetadataState.processedCount))
}()
// Extract PDF document
result, err := kreuzberg.ExtractFileSync("document.pdf", nil)
if err != nil {
log.Fatalf("extraction failed: %v", err)
}
log.Printf("PDF MIME type: %s", result.MimeType)
// Parse and display metadata
var metadata map[string]interface{}
if metaJSON, ok := result.MetadataJSON.(string); ok {
if err := json.Unmarshal([]byte(metaJSON), &metadata); err == nil {
if pdfProcessed, ok := metadata["pdf_processed"].(bool); ok && pdfProcessed {
log.Printf("PDF metadata extracted successfully")
if contentLen, ok := metadata["content_length"].(float64); ok {
log.Printf("Content length: %.0f bytes", contentLen)
}
}
}
}
}
import dev.kreuzberg.Kreuzberg;
import dev.kreuzberg.ExtractionResult;
import dev.kreuzberg.PostProcessor;
import dev.kreuzberg.KreuzbergException;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
public class PdfMetadataExtractorExample {
private static final Logger logger = Logger.getLogger(
PdfMetadataExtractorExample.class.getName()
);
public static void main(String[] args) {
AtomicInteger processedCount = new AtomicInteger(0);
PostProcessor pdfMetadata = result -> {
if (!result.getMimeType().equals("application/pdf")) {
return result;
}
processedCount.incrementAndGet();
Map<String, Object> metadata = new HashMap<>(result.getMetadata());
metadata.put("pdf_processed", true);
metadata.put("processing_timestamp", System.currentTimeMillis());
logger.info("Processed PDF: " + processedCount.get());
return result;
};
try {
Kreuzberg.registerPostProcessor("pdf-metadata-extractor", pdfMetadata, 50);
logger.info("PDF metadata extractor initialized");
ExtractionResult result = Kreuzberg.extractFile("document.pdf");
System.out.println("PDF processed: " + result.getMetadata().get("pdf_processed"));
logger.info("Processed " + processedCount.get() + " PDFs");
} catch (IOException | KreuzbergException e) {
e.printStackTrace();
}
}
}
using Kreuzberg;
public class PdfMetadataExtractor : IPostProcessor
{
private int _processedCount = 0;
public string Name() => "pdf_metadata_extractor";
public string Version() => "1.0.0";
public string Description() => "Extracts and enriches PDF metadata";
public string ProcessingStage() => "early";
public bool ShouldProcess(ExtractionResult result)
=> result.MimeType == "application/pdf";
public ExtractionResult Process(ExtractionResult result)
{
_processedCount++;
return result;
}
public void Initialize()
{
Console.WriteLine("PDF metadata extractor initialized");
}
public void Shutdown()
{
Console.WriteLine($"Processed {_processedCount} PDFs");
}
}
var processor = new PdfMetadataExtractor();
KreuzbergClient.RegisterPostProcessor(processor);
require 'kreuzberg'
class PdfMetadataExtractor
def initialize
@count = 0
end
def call(result)
return result unless result['mime_type'] == 'application/pdf'
@count += 1
result['metadata'] ||= {}
result['metadata']['pdf_order'] = @count
result
end
end
extractor = PdfMetadataExtractor.new
Kreuzberg.register_post_processor('pdf_metadata', extractor)
config = Kreuzberg::Config::Extraction.new(
postprocessor: { enabled: true }
)
result = Kreuzberg.extract_file_sync('report.pdf', config: config)
puts "Metadata: #{result.metadata.inspect}"
library(kreuzberg)
extract_pdf_metadata <- function(result) {
processed_result <- result
if (!is.null(result$metadata)) {
cat(sprintf("PDF Metadata:\n"))
for (key in names(result$metadata)) {
cat(sprintf(" %s: %s\n", key, result$metadata[[key]]))
}
}
return(processed_result)
}
register_post_processor("pdf_metadata", extract_pdf_metadata)
config <- extraction_config(postprocessor = list(enabled = TRUE))
result <- extract_file_sync("document.pdf", "application/pdf", config)
cat(sprintf("Extraction complete: %d characters\n", nchar(result$content)))