Skip to content

API Server

Kreuzberg provides two server modes for programmatic access: an HTTP REST API server for general integration and a Model Context Protocol (MCP) server for AI agent integration.

Server Types

HTTP REST API Server

A production-ready HTTP API server providing RESTful endpoints for document extraction, health checks, and cache management.

Best for: - Web applications - Microservices integration - General HTTP clients - Load-balanced deployments

MCP Server

A Model Context Protocol server that exposes Kreuzberg as tools for AI agents and assistants.

Best for: - AI agent integration (Claude, GPT, etc.) - Agentic workflows - Tool use by language models - Stdio-based communication

HTTP REST API

Starting the Server

Bash
# Default: http://127.0.0.1:8000
kreuzberg serve

# Custom host and port
kreuzberg serve -H 0.0.0.0 -p 3000

# With configuration file
kreuzberg serve --config kreuzberg.toml
C#
using System;
using System.Diagnostics;

class ApiServer
{
    static void Main()
    {
        var processInfo = new ProcessStartInfo
        {
            FileName = "kreuzberg",
            Arguments = "serve -H 0.0.0.0 -p 8000",
            UseShellExecute = false,
            RedirectStandardOutput = true,
            RedirectStandardError = true
        };

        using (var process = Process.Start(processInfo))
        {
            process?.WaitForExit();
        }
    }
}
Bash
# Run server on port 8000
docker run -d \n  -p 8000:8000 \n  goldziher/kreuzberg:latest \n  serve -H 0.0.0.0 -p 8000

# With environment variables
docker run -d \n  -e KREUZBERG_CORS_ORIGINS="https://myapp.com" \n  -e KREUZBERG_MAX_UPLOAD_SIZE_MB=200 \n  -p 8000:8000 \n  goldziher/kreuzberg:latest \n  serve -H 0.0.0.0 -p 8000
Go
package main

import (
    "log"
    "os/exec"
)

func main() {
    cmd := exec.Command("kreuzberg", "serve", "-H", "0.0.0.0", "-p", "8000")
    cmd.Stdout = log.Writer()
    cmd.Stderr = log.Writer()
    if err := cmd.Run(); err != nil {
        log.Fatalf("failed to start server: %v", err)
    }
}
Java
import java.io.IOException;

public class ApiServer {
    public static void main(String[] args) {
        try {
            ProcessBuilder pb = new ProcessBuilder(
                "kreuzberg", "serve", "-H", "0.0.0.0", "-p", "8000"
            );
            pb.inheritIO();
            Process process = pb.start();
            process.waitFor();
        } catch (IOException | InterruptedException e) {
            System.err.println("Failed to start server: " + e.getMessage());
        }
    }
}
Python
# Start server
import subprocess
subprocess.Popen(["python", "-m", "kreuzberg", "serve", "-H", "0.0.0.0", "-p", "8000"])
Rust
use kreuzberg::{ExtractionConfig, api::serve_with_config};

#[tokio::main]
async fn main() -> kreuzberg::Result<()> {
    let config = ExtractionConfig::discover()?;
    serve_with_config("0.0.0.0", 8000, config).await?;
    Ok(())
}

API Endpoints

POST /extract

Extract text from uploaded files via multipart form data.

Request Format:

  • Method: POST
  • Content-Type: multipart/form-data
  • Fields:
    • files (required, repeatable): Files to extract
    • config (optional): JSON configuration overrides

Response: JSON array of extraction results

Example:

Terminal
# Extract a single file via HTTP POST
curl -F "files=@document.pdf" http://localhost:8000/extract

# Extract multiple files in a single request
curl -F "files=@doc1.pdf" -F "files=@doc2.docx" \
  http://localhost:8000/extract

# Extract with custom OCR configuration override
curl -F "files=@scanned.pdf" \
     -F 'config={"ocr":{"language":"eng"},"force_ocr":true}' \
  http://localhost:8000/extract

Response Schema:

Response
[
  {
    "content": "Extracted text content...",
    "mime_type": "application/pdf",
    "metadata": {
      "page_count": 10,
      "author": "John Doe"
    },
    "tables": [],
    "detected_languages": ["eng"],
    "chunks": null,
    "images": null
  }
]

POST /embed

Generate embeddings for text strings without document extraction.

Request Format:

  • Method: POST
  • Content-Type: application/json
  • Body:
    • texts (required): Array of strings to generate embeddings for
    • config (optional): Embedding configuration overrides

Response: JSON object containing embeddings, model info, dimensions, and count

Example:

Terminal
# Generate embeddings for two text strings
curl -X POST http://localhost:8000/embed \
  -H "Content-Type: application/json" \
  -d '{"texts":["Hello world","Second text"]}'

# Generate embeddings with custom model configuration
curl -X POST http://localhost:8000/embed \
  -H "Content-Type: application/json" \
  -d '{
    "texts":["Test text"],
    "config":{
      "model":{"preset":{"name":"fast"}},
      "batch_size":32
    }
  }'

Response Schema:

Response
{
  "embeddings": [
    [0.123, -0.456, 0.789, ...],  // 384 or 768 or 1024 dimensions
    [-0.234, 0.567, -0.891, ...]
  ],
  "model": "balanced",
  "dimensions": 768,
  "count": 2
}

Available Embedding Presets:

Preset Model Dimensions Use Case
fast AllMiniLML6V2Q 384 Quick prototyping, development
balanced BGEBaseENV15 768 General-purpose RAG, production (default)
quality BGELargeENV15 1024 Complex documents, maximum accuracy
multilingual MultilingualE5Base 768 International documents, 100+ languages

Use Cases:

  • Generate embeddings for semantic search
  • Create vector representations for RAG (Retrieval-Augmented Generation) pipelines
  • Embed text chunks without extracting from documents
  • Batch embed multiple texts efficiently

Note: This endpoint requires the embeddings feature to be enabled (available in Docker images and most pre-built binaries). ONNX Runtime must be installed on the system.

GET /health

Health check endpoint for monitoring and load balancers.

Example:

Terminal
# Check server health status
curl http://localhost:8000/health

Response:

Response
{
  "status": "healthy",
  "version": "4.0.0-rc.1"
}

GET /info

Server information and capabilities.

Example:

Terminal
# Get server version and capabilities
curl http://localhost:8000/info

Response:

Response
{
  "version": "4.0.0-rc.1",
  "rust_backend": true
}

GET /cache/stats

Get cache statistics.

Example:

Terminal
# Retrieve cache statistics and storage usage
curl http://localhost:8000/cache/stats

Response:

Response
{
  "directory": ".kreuzberg",
  "total_files": 42,
  "total_size_mb": 156.8,
  "available_space_mb": 45123.5,
  "oldest_file_age_days": 7.2,
  "newest_file_age_days": 0.1
}

DELETE /cache/clear

Clear all cached files.

Example:

Terminal
# Clear all cached extraction results
curl -X DELETE http://localhost:8000/cache/clear

Response:

Response
{
  "directory": ".kreuzberg",
  "removed_files": 42,
  "freed_mb": 156.8
}

Configuration

Configuration File Discovery

The server automatically discovers configuration files in this order:

  1. ./kreuzberg.toml (current directory)
  2. ./kreuzberg.yaml
  3. ./kreuzberg.json
  4. Parent directories (recursive search)
  5. Default configuration (if no file found)

Example kreuzberg.toml:

Configure OCR backend and language settings
[ocr]
backend = "tesseract"
language = "eng"

# Enable quality processing and caching
enable_quality_processing = true
use_cache = true

# Configure token reduction for LLM optimization
[token_reduction]
enabled = true
target_reduction = 0.3

See Configuration Guide for all options.

Environment Variables

Upload Limits:

Terminal
# Set maximum file upload size in megabytes
KREUZBERG_MAX_UPLOAD_SIZE_MB=200  # Max upload size in MB (default: 100)

For detailed configuration options, memory considerations, and performance tuning for large files, see the File Size Limits Reference.

CORS Configuration:

Terminal
# Configure allowed origins for cross-origin requests (production security)
KREUZBERG_CORS_ORIGINS="https://app.example.com,https://api.example.com"

Security Warning: The default CORS configuration allows all origins for development convenience. This permits CSRF attacks. Always set KREUZBERG_CORS_ORIGINS in production.

Note: Server host and port are configured via CLI flags (-H / --host and -p / --port), not environment variables.

Client Examples

C#
using System;
using System.IO;
using System.Net.Http;

var client = new HttpClient();

using (var fileStream = File.OpenRead("document.pdf"))
{
    using (var content = new MultipartFormDataContent())
    {
        content.Add(new StreamContent(fileStream), "files", "document.pdf");

        var response = await client.PostAsync("http://localhost:8000/extract", content);
        var json = await response.Content.ReadAsStringAsync();

        Console.WriteLine(json);
    }
}
Terminal
# Extract content from a single document
curl -F "files=@document.pdf" http://localhost:8000/extract | jq .

# Extract with OCR enabled for scanned documents
curl -F "files=@scanned.pdf" \
     -F 'config={"ocr":{"language":"eng"}}' \
     http://localhost:8000/extract | jq .

# Batch extract multiple files in parallel
curl -F "files=@doc1.pdf" \
     -F "files=@doc2.docx" \
     http://localhost:8000/extract | jq .
Go
package main

import (
    "bytes"
    "fmt"
    "io"
    "log"
    "mime/multipart"
    "net/http"
    "os"
)

func main() {
    file, err := os.Open("document.pdf")
    if err != nil {
        log.Fatalf("open file: %v", err)
    }
    defer file.Close()

    body := &bytes.Buffer{}
    writer := multipart.NewWriter(body)
    part, err := writer.CreateFormFile("files", "document.pdf")
    if err != nil {
        log.Fatalf("create form file: %v", err)
    }

    if _, err := io.Copy(part, file); err != nil {
        log.Fatalf("copy file: %v", err)
    }
    writer.Close()

    resp, err := http.Post("http://localhost:8000/extract",
        writer.FormDataContentType(), body)
    if err != nil {
        log.Fatalf("http post: %v", err)
    }
    defer resp.Body.Close()

    bodyBytes, err := io.ReadAll(resp.Body)
    if err != nil {
        log.Fatalf("read response: %v", err)
    }

    fmt.Println(string(bodyBytes))
}
Java
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.file.Files;
import java.nio.file.Path;

public class ExtractClient {
    public static void main(String[] args) throws Exception {
        HttpClient client = HttpClient.newHttpClient();
        String boundary = "----WebKitFormBoundary" + System.currentTimeMillis();

        byte[] fileData = Files.readAllBytes(Path.of("document.pdf"));
        String multipartBody = "--" + boundary + "\r\n"
            + "Content-Disposition: form-data; name=\"files\"; filename=\"document.pdf\"\r\n"
            + "Content-Type: application/pdf\r\n\r\n"
            + new String(fileData, java.nio.charset.StandardCharsets.ISO_8859_1) + "\r\n"
            + "--" + boundary + "--\r\n";

        HttpRequest request = HttpRequest.newBuilder()
            .uri(URI.create("http://localhost:8000/extract"))
            .header("Content-Type", "multipart/form-data; boundary=" + boundary)
            .POST(HttpRequest.BodyPublishers.ofString(multipartBody))
            .build();

        HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
        System.out.println(response.body());
    }
}
Python
import httpx

with httpx.Client() as client:
    with open("document.pdf", "rb") as f:
        files: dict[str, object] = {"files": f}
        response: httpx.Response = client.post(
            "http://localhost:8000/extract", files=files
        )
        results: list[dict] = response.json()
        print(results[0]["content"])
Ruby
require 'net/http'
require 'uri'
require 'json'

# Single file extraction
uri = URI('http://localhost:8000/extract')
request = Net::HTTP::Post.new(uri)
form_data = [['files', File.open('document.pdf')]]
request.set_form form_data, 'multipart/form-data'

response = Net::HTTP.start(uri.hostname, uri.port) do |http|
  http.request(request)
end

results = JSON.parse(response.body)
puts results[0]['content']
Rust
use reqwest::multipart;
use std::fs::File;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let file = File::open("document.pdf")?;
    let form = multipart::Form::new()
        .file("files", "document.pdf", file)?;

    let client = reqwest::Client::new();
    let response = client
        .post("http://localhost:8000/extract")
        .multipart(form)
        .send()
        .await?;

    let results: serde_json::Value = response.json().await?;
    println!("{:?}", results[0]["content"]);

    Ok(())
}
TypeScript
// Using fetch API
const formData = new FormData();
formData.append("files", fileInput.files[0]);

const response = await fetch("http://localhost:8000/extract", {
  method: "POST",
  body: formData,
});

const results = await response.json();
console.log(results[0].content);

Error Handling

Error Response Format:

Error Response
{
  "error_type": "ValidationError",
  "message": "Invalid file format",
  "traceback": "...",
  "status_code": 400
}

HTTP Status Codes:

Status Code Error Type Meaning
400 ValidationError Invalid input parameters
422 ParsingError, OcrError Document processing failed
500 Internal errors Server errors

Example:

C#
using System;
using System.IO;
using System.Net.Http;
using System.Text.Json;

var client = new HttpClient();

try
{
    using (var fileStream = File.OpenRead("document.pdf"))
    {
        using (var content = new MultipartFormDataContent())
        {
            content.Add(new StreamContent(fileStream), "files", "document.pdf");

            var response = await client.PostAsync("http://localhost:8000/extract", content);

            if (!response.IsSuccessStatusCode)
            {
                var errorJson = await response.Content.ReadAsStringAsync();
                var errorDoc = JsonDocument.Parse(errorJson);
                var errorType = errorDoc.RootElement.GetProperty("error_type").GetString();
                var message = errorDoc.RootElement.GetProperty("message").GetString();

                Console.WriteLine($"Error: {errorType}: {message}");
                return;
            }

            var json = await response.Content.ReadAsStringAsync();
            Console.WriteLine($"Success: {json}");
        }
    }
}
catch (HttpRequestException e)
{
    Console.WriteLine($"Request failed: {e.Message}");
}
Go
package main

import (
    "encoding/json"
    "fmt"
    "io"
    "net/http"
)

func main() {
    resp, err := http.Post("http://localhost:8000/extract", "application/json", nil)
    if err != nil {
        fmt.Println("Request failed:", err)
        return
    }
    defer resp.Body.Close()

    if resp.StatusCode >= 400 {
        var error map[string]interface{}
        body, _ := io.ReadAll(resp.Body)
        json.Unmarshal(body, &error)
        fmt.Printf("Error: %v: %v\n", error["error_type"], error["message"])
    } else {
        var results []map[string]interface{}
        body, _ := io.ReadAll(resp.Body)
        json.Unmarshal(body, &results)
        // Process results
    }
}
Java
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.URI;
import com.fasterxml.jackson.databind.ObjectMapper;

try {
    HttpClient client = HttpClient.newHttpClient();
    HttpRequest request = HttpRequest.newBuilder()
        .uri(URI.create("http://localhost:8000/extract"))
        .POST(HttpRequest.BodyPublishers.ofString(multipartBody))
        .build();

    HttpResponse<String> response = client.send(request,
        HttpResponse.BodyHandlers.ofString());

    if (response.statusCode() >= 400) {
        ObjectMapper mapper = new ObjectMapper();
        Map<String, Object> error = mapper.readValue(response.body(), Map.class);
        System.err.println("Error: " + error.get("error_type") +
            ": " + error.get("message"));
    } else {
        ObjectMapper mapper = new ObjectMapper();
        Map[] results = mapper.readValue(response.body(), Map[].class);
        // Process results
    }
} catch (IOException | InterruptedException e) {
    System.err.println("Request failed: " + e.getMessage());
}
Python
import httpx

try:
    with httpx.Client() as client:
        with open("document.pdf", "rb") as f:
            files: dict = {"files": f}
            response: httpx.Response = client.post(
                "http://localhost:8000/extract", files=files
            )
            response.raise_for_status()
            results: list = response.json()
            print(f"Extracted {len(results)} documents")
except httpx.HTTPStatusError as e:
    error: dict = e.response.json()
    error_type: str = error.get("error_type", "Unknown")
    message: str = error.get("message", "No message")
    print(f"Error: {error_type}: {message}")
Ruby
require 'net/http'
require 'uri'
require 'json'

begin
  uri = URI('http://localhost:8000/extract')
  request = Net::HTTP::Post.new(uri)

  response = Net::HTTP.start(uri.hostname, uri.port) do |http|
    http.request(request)
  end

  if response.code.to_i >= 400
    error = JSON.parse(response.body)
    puts "Error: #{error['error_type']}: #{error['message']}"
  else
    results = JSON.parse(response.body)
    # Process results
  end
rescue => e
  puts "Request failed: #{e.message}"
end
Rust
use reqwest::Client;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = Client::new();
    let response = client
        .post("http://localhost:8000/extract")
        .send()
        .await?;

    let status = response.status();
    if status.is_client_error() || status.is_server_error() {
        let error: serde_json::Value = response.json().await?;
        eprintln!(
            "Error: {}: {}",
            error["error_type"], error["message"]
        );
    } else {
        let results: serde_json::Value = response.json().await?;
        println!("{:?}", results);
    }

    Ok(())
}
TypeScript
try {
    const response = await fetch("http://localhost:8000/extract", {
        method: "POST",
        body: formData,
    });

    if (!response.ok) {
        const error = await response.json();
        console.error(`Error: ${error.error_type}: ${error.message}`);
    } else {
        const results = await response.json();
        console.log(results);
    }
} catch (e) {
    console.error("Request failed:", e);
}

MCP Server

The Model Context Protocol (MCP) server exposes Kreuzberg as tools for AI agents and assistants.

Starting the MCP Server

Terminal
# Start MCP server using stdio transport for AI agents
kreuzberg mcp

# Start MCP server with custom configuration file
kreuzberg mcp --config kreuzberg.toml
C#
using System;
using System.Diagnostics;
using System.Threading.Tasks;

var processInfo = new ProcessStartInfo
{
    FileName = "kreuzberg",
    Arguments = "mcp",
    UseShellExecute = false,
    RedirectStandardOutput = true,
    RedirectStandardError = true
};

var mcpProcess = Process.Start(processInfo);

Console.WriteLine($"MCP server started with PID: {mcpProcess?.Id}");
await Task.Delay(1000);
Console.WriteLine("Server is running, listening for connections");

mcpProcess?.WaitForExit();
Go
package main

import (
    "fmt"
    "os"
    "os/exec"
)

func main() {
    cmd := exec.Command("kreuzberg", "mcp")
    cmd.Stdout = os.Stdout
    cmd.Stderr = os.Stderr

    if err := cmd.Run(); err != nil {
        fmt.Fprintf(os.Stderr, "Failed to start MCP server: %v\n", err)
    }
}
Java
import java.io.IOException;

public class McpServer {
    public static void main(String[] args) {
        try {
            // Start MCP server using CLI
            ProcessBuilder pb = new ProcessBuilder("kreuzberg", "mcp");
            pb.inheritIO();
            Process process = pb.start();
            process.waitFor();
        } catch (IOException | InterruptedException e) {
            System.err.println("Failed to start MCP server: " + e.getMessage());
        }
    }
}
Python
import subprocess
import time
from typing import Optional

mcp_process: subprocess.Popen = subprocess.Popen(
    ["python", "-m", "kreuzberg", "mcp"],
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
)

pid: Optional[int] = mcp_process.pid
print(f"MCP server started with PID: {pid}")

time.sleep(1)
print("Server is running, listening for connections")
Ruby
require 'open3'

begin
  Open3.popen3('kreuzberg', 'mcp') do |stdin, stdout, stderr, wait_thr|
    puts stdout.read
    wait_thr.join
  end
rescue => e
  puts "Failed to start MCP server: #{e.message}"
end
Rust
use kreuzberg::{ExtractionConfig, mcp::start_mcp_server_with_config};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    let config = ExtractionConfig::discover()?;
    start_mcp_server_with_config(config).await?;
    Ok(())
}
TypeScript
import { spawn } from 'child_process';

const mcpProcess = spawn('kreuzberg', ['mcp']);

mcpProcess.stdout.on('data', (data) => {
  console.log(`MCP Server: ${data}`);
});

mcpProcess.stderr.on('data', (data) => {
  console.error(`MCP Error: ${data}`);
});

mcpProcess.on('error', (err) => {
  console.error(`Failed to start MCP server: ${err.message}`);
});

MCP Tools

The MCP server exposes 6 tools for AI agents:

extract_file

Extract content from a file path.

Parameters:

Parameter Type Required Description
path string Yes File path to extract
mime_type string No MIME type hint
enable_ocr boolean No Enable OCR (default: false)
force_ocr boolean No Force OCR even if text exists (default: false)
async boolean No Use async extraction (default: true)

Example MCP Request:

MCP Request
{
  "method": "tools/call",
  "params": {
    "name": "extract_file",
    "arguments": {
      "path": "/path/to/document.pdf",
      "enable_ocr": true,
      "async": true
    }
  }
}

extract_bytes

Extract content from base64-encoded file data.

Parameters:

Parameter Type Required Description
data string Yes Base64-encoded file content
mime_type string No MIME type hint
enable_ocr boolean No Enable OCR
force_ocr boolean No Force OCR
async boolean No Use async extraction

batch_extract_files

Extract multiple files in parallel.

Parameters:

Parameter Type Required Description
paths array[string] Yes File paths to extract
enable_ocr boolean No Enable OCR
force_ocr boolean No Force OCR
async boolean No Use async extraction

detect_mime_type

Detect file format and return MIME type.

Parameters:

Parameter Type Required Description
path string Yes File path
use_content boolean No Content-based detection (default: true)

cache_stats

Get cache statistics.

Parameters: None

Returns: Cache directory path, file count, size, available space, file ages

cache_clear

Clear all cached files.

Parameters: None

Returns: Number of files removed, space freed

MCP Server Information

Server Metadata:

  • Name: kreuzberg-mcp
  • Title: Kreuzberg Document Intelligence MCP Server
  • Version: Current package version
  • Website: https://goldziher.github.io/kreuzberg/
  • Protocol: MCP (Model Context Protocol)
  • Transport: stdio (stdin/stdout)

Capabilities:

  • Tool calling (6 tools exposed)
  • Async and sync extraction variants
  • Base64-encoded file handling
  • Batch processing

AI Agent Integration

Add to Claude Desktop configuration (~/Library/Application Support/Claude/claude_desktop_config.json on macOS):

claude_desktop_config.json
{
  "mcpServers": {
    "kreuzberg": {
      "command": "kreuzberg",
      "args": ["mcp"]
    }
  }
}

After adding the configuration, restart Claude Desktop to load the Kreuzberg MCP server.

C#
using System;
using System.Diagnostics;
using System.IO;
using System.Threading.Tasks;

var processInfo = new ProcessStartInfo
{
    FileName = "kreuzberg",
    Arguments = "mcp",
    UseShellExecute = false,
    RedirectStandardInput = true,
    RedirectStandardOutput = true,
    RedirectStandardError = true
};

var process = Process.Start(processInfo);

var clientInput = process.StandardInput;
var clientOutput = process.StandardOutput;

// Initialize session by sending initialize request
var initRequest = new
{
    jsonrpc = "2.0",
    id = 1,
    method = "initialize",
    parameters = new { }
};

await clientInput.WriteLineAsync(System.Text.Json.JsonSerializer.Serialize(initRequest));
await clientInput.FlushAsync();

var initResponse = await clientOutput.ReadLineAsync();
Console.WriteLine($"Init response: {initResponse}");

// List available tools
var listRequest = new
{
    jsonrpc = "2.0",
    id = 2,
    method = "tools/list"
};

await clientInput.WriteLineAsync(System.Text.Json.JsonSerializer.Serialize(listRequest));
await clientInput.FlushAsync();

var listResponse = await clientOutput.ReadLineAsync();
Console.WriteLine($"Available tools: {listResponse}");

process?.WaitForExit();
Go
package main

import (
    "bufio"
    "encoding/json"
    "fmt"
    "log"
    "os/exec"
)

type MCPRequest struct {
    Method string      `json:"method"`
    Params MCPParams   `json:"params"`
}

type MCPParams struct {
    Name      string                 `json:"name"`
    Arguments map[string]interface{} `json:"arguments"`
}

func main() {
    cmd := exec.Command("kreuzberg", "mcp")
    stdin, err := cmd.StdinPipe()
    if err != nil {
        log.Fatalf("create stdin pipe: %v", err)
    }
    stdout, err := cmd.StdoutPipe()
    if err != nil {
        log.Fatalf("create stdout pipe: %v", err)
    }

    if err := cmd.Start(); err != nil {
        log.Fatalf("start command: %v", err)
    }

    request := MCPRequest{
        Method: "tools/call",
        Params: MCPParams{
            Name: "extract_file",
            Arguments: map[string]interface{}{
                "path":  "document.pdf",
                "async": true,
            },
        },
    }

    data, err := json.Marshal(request)
    if err != nil {
        log.Fatalf("marshal request: %v", err)
    }
    fmt.Fprintf(stdin, "%s\n", string(data))

    scanner := bufio.NewScanner(stdout)
    if scanner.Scan() {
        fmt.Println(scanner.Text())
    }

    if err := cmd.Wait(); err != nil {
        log.Fatalf("wait for command: %v", err)
    }
}
Java
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.util.Map;

public class McpClient {
    private final Process mcpProcess;
    private final BufferedWriter stdin;
    private final BufferedReader stdout;
    private final ObjectMapper mapper = new ObjectMapper();

    public McpClient() throws IOException {
        ProcessBuilder pb = new ProcessBuilder("kreuzberg", "mcp");
        mcpProcess = pb.start();
        stdin = new BufferedWriter(new OutputStreamWriter(mcpProcess.getOutputStream()));
        stdout = new BufferedReader(new InputStreamReader(mcpProcess.getInputStream()));
    }

    public String extractFile(String path) throws IOException {
        Map<String, Object> request = Map.of(
            "method", "tools/call",
            "params", Map.of(
                "name", "extract_file",
                "arguments", Map.of("path", path, "async", true)
            )
        );

        stdin.write(mapper.writeValueAsString(request));
        stdin.newLine();
        stdin.flush();

        String response = stdout.readLine();
        @SuppressWarnings("unchecked")
        Map<String, Object> result = mapper.readValue(response, Map.class);
        @SuppressWarnings("unchecked")
        Map<String, Object> resultData = (Map<String, Object>) result.get("result");
        return (String) resultData.get("content");
    }

    public void close() throws IOException {
        stdin.close();
        stdout.close();
        mcpProcess.destroy();
    }

    public static void main(String[] args) {
        try (McpClient client = new McpClient()) {
            String content = client.extractFile("contract.pdf");
            System.out.println("Extracted content: " + content);
        } catch (IOException e) {
            System.err.println("Error: " + e.getMessage());
        }
    }
}
Python
from langchain.agents import initialize_agent, AgentType
from langchain.tools import Tool
from langchain_openai import ChatOpenAI
import subprocess
import json

mcp_process = subprocess.Popen(
    ["kreuzberg", "mcp"],
    stdin=subprocess.PIPE,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
)

def extract_file(path: str) -> str:
    request: dict = {
        "method": "tools/call",
        "params": {
            "name": "extract_file",
            "arguments": {"path": path, "async": True},
        },
    }
    mcp_process.stdin.write(json.dumps(request).encode() + b"\n")
    mcp_process.stdin.flush()
    response = mcp_process.stdout.readline()
    return json.loads(response)["result"]["content"]

tools: list[Tool] = [
    Tool(name="extract_document", func=extract_file, description="Extract")
]

llm = ChatOpenAI(temperature=0)
agent = initialize_agent(
    tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION
)
Python
import asyncio
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

async def main() -> None:
    server_params: StdioServerParameters = StdioServerParameters(
        command="kreuzberg", args=["mcp"]
    )

    async with stdio_client(server_params) as (read, write):
        async with ClientSession(read, write) as session:
            await session.initialize()
            tools = await session.list_tools()
            tool_names: list[str] = [t.name for t in tools.tools]
            print(f"Available tools: {tool_names}")
            result = await session.call_tool(
                "extract_file", arguments={"path": "document.pdf", "async": True}
            )
            print(result)

asyncio.run(main())
Ruby
require 'json'
require 'open3'

Open3.popen3('kreuzberg', 'mcp') do |stdin, stdout, stderr, wait_thr|
  request = {
    method: 'tools/call',
    params: {
      name: 'extract_file',
      arguments: { path: 'document.pdf', async: true }
    }
  }

  stdin.puts JSON.generate(request)
  stdin.close_write

  response = stdout.gets
  result = JSON.parse(response)
  puts JSON.pretty_generate(result)
end
Rust
use serde_json::json;
use std::io::{BufRead, BufReader, Write};
use std::process::{Command, Stdio};

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut child = Command::new("kreuzberg")
        .arg("mcp")
        .stdin(Stdio::piped())
        .stdout(Stdio::piped())
        .spawn()?;

    {
        let stdin = child.stdin.as_mut().ok_or("Failed to open stdin")?;
        let request = json!({
            "method": "tools/call",
            "params": {
                "name": "extract_file",
                "arguments": {
                    "path": "document.pdf",
                    "async": true
                }
            }
        });
        stdin.write_all(request.to_string().as_bytes())?;
        stdin.write_all(b"\n")?;
    }

    let stdout = child.stdout.take().ok_or("Failed to open stdout")?;
    let reader = BufReader::new(stdout);
    for line in reader.lines() {
        if let Ok(line) = line {
            println!("{}", line);
            break;
        }
    }

    child.wait()?;
    Ok(())
}
TypeScript
import { spawn } from 'child_process';
import * as readline from 'readline';

const mcpProcess = spawn('kreuzberg', ['mcp']);

const rl = readline.createInterface({
  input: mcpProcess.stdout,
  output: mcpProcess.stdin,
  terminal: false,
});

const request = {
  method: 'tools/call',
  params: {
    name: 'extract_file',
    arguments: {
      path: 'document.pdf',
      async: true,
    },
  },
};

mcpProcess.stdin.write(JSON.stringify(request) + '\n');

rl.on('line', (line) => {
  const response = JSON.parse(line);
  console.log(response);
  mcpProcess.kill();
});

mcpProcess.on('error', (err) => {
  console.error('Failed to start MCP process:', err);
});

Production Deployment

Docker Deployment

Docker Compose Example:

docker-compose.yaml
version: '3.8'

services:
  kreuzberg-api:
    image: goldziher/kreuzberg:latest
    ports:
      - "8000:8000"
    environment:
      # Configure CORS for production security
      - KREUZBERG_CORS_ORIGINS=https://myapp.com,https://api.myapp.com
      # Set maximum upload size for large documents
      - KREUZBERG_MAX_UPLOAD_SIZE_MB=500
    volumes:
      # Mount configuration and cache directories
      - ./config:/config
      - ./cache:/app/.kreuzberg
    command: ["kreuzberg", "serve", "-H", "0.0.0.0", "-p", "8000", "--config", "/config/kreuzberg.toml"]
    restart: unless-stopped
    healthcheck:
      # Health check for container orchestration
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 10s
      retries: 3

Run:

Terminal
# Start the Kreuzberg API server in detached mode
docker-compose up -d

Kubernetes Deployment

Deployment Manifest:

kubernetes-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kreuzberg-api
spec:
  replicas: 3  # Deploy 3 replicas for high availability
  selector:
    matchLabels:
      app: kreuzberg-api
  template:
    metadata:
      labels:
        app: kreuzberg-api
    spec:
      containers:
      - name: kreuzberg
        image: goldziher/kreuzberg:latest
        ports:
        - containerPort: 8000
        env:
        # Production environment configuration
        - name: KREUZBERG_CORS_ORIGINS
          value: "https://myapp.com"
        - name: KREUZBERG_MAX_UPLOAD_SIZE_MB
          value: "500"
        command: ["kreuzberg", "serve", "-H", "0.0.0.0", "-p", "8000"]
        livenessProbe:
          # Check if container is alive and healthy
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 10
          periodSeconds: 30
        readinessProbe:
          # Check if container is ready to accept traffic
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 10
        resources:
          # Resource limits for optimal performance
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "2Gi"
            cpu: "2000m"
---
apiVersion: v1
kind: Service
metadata:
  name: kreuzberg-api
spec:
  selector:
    app: kreuzberg-api
  ports:
  - protocol: TCP
    port: 80
    targetPort: 8000
  type: LoadBalancer  # Expose service via load balancer

Reverse Proxy Configuration

Nginx:

nginx.conf
# Load balance across multiple Kreuzberg instances
upstream kreuzberg {
    server 127.0.0.1:8000;
    server 127.0.0.1:8001;
    server 127.0.0.1:8002;
}

server {
    listen 443 ssl http2;
    server_name api.example.com;

    # SSL/TLS configuration
    ssl_certificate /path/to/cert.pem;
    ssl_certificate_key /path/to/key.pem;

    # Increase upload size limit for large documents
    client_max_body_size 500M;

    location / {
        proxy_pass http://kreuzberg;
        # Forward client headers
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;

        # Extended timeouts for large file processing
        proxy_read_timeout 300s;
        proxy_send_timeout 300s;
    }

    location /health {
        proxy_pass http://kreuzberg;
        access_log off;  # Disable logging for health checks
    }
}

Caddy:

Caddyfile
api.example.com {
    # Load balance with automatic health checks
    reverse_proxy localhost:8000 localhost:8001 localhost:8002 {
        lb_policy round_robin
        health_uri /health
        health_interval 10s
    }

    # Increase maximum upload size for large documents
    request_body {
        max_size 500MB
    }
}

Production Checklist

  1. Set KREUZBERG_CORS_ORIGINS to explicit allowed origins
  2. Configure KREUZBERG_MAX_UPLOAD_SIZE_MB based on expected document sizes
  3. Use reverse proxy (Nginx/Caddy) for SSL/TLS termination
  4. Enable logging via RUST_LOG=info environment variable
  5. Set up health checks on /health endpoint
  6. Monitor cache size and set up periodic clearing
  7. Use 0.0.0.0 binding for containerized deployments
  8. Configure resource limits (CPU, memory) in container orchestration
  9. Test with large files to validate upload limits and timeouts
  10. Implement rate limiting at reverse proxy level
  11. Set up monitoring (Prometheus metrics, logs aggregation)
  12. Plan for horizontal scaling with load balancing

Monitoring

Health Check Endpoint:

Terminal
# Simple health check for manual verification
curl http://localhost:8000/health

# Continuous monitoring script for production
#!/bin/bash
while true; do
  if curl -f http://localhost:8000/health > /dev/null 2>&1; then
    echo "$(date): Server healthy"
  else
    echo "$(date): Server unhealthy"
    # Send alert to monitoring system
  fi
  sleep 30
done

Cache Monitoring:

Terminal
# Retrieve cache statistics and usage metrics
curl http://localhost:8000/cache/stats | jq .

# Automatic cache clearing when size exceeds threshold
CACHE_SIZE=$(curl -s http://localhost:8000/cache/stats | jq .total_size_mb)
if (( $(echo "$CACHE_SIZE > 1000" | bc -l) )); then
  curl -X DELETE http://localhost:8000/cache/clear
fi

Logging:

Terminal
# Run with debug logging for development and troubleshooting
RUST_LOG=debug kreuzberg serve -H 0.0.0.0 -p 8000

# Production logging with info level (recommended)
RUST_LOG=info kreuzberg serve -H 0.0.0.0 -p 8000

# JSON structured logging for log aggregation systems
RUST_LOG=info RUST_LOG_FORMAT=json kreuzberg serve -H 0.0.0.0 -p 8000

Performance Tuning

Upload Size Limits

Configure based on expected document sizes:

Terminal
# Configuration for small documents (PDFs, images under 10 MB)
export KREUZBERG_MAX_UPLOAD_SIZE_MB=50

# Configuration for typical business documents (under 50 MB)
export KREUZBERG_MAX_UPLOAD_SIZE_MB=200

# Configuration for large scans, archives, and high-resolution images
export KREUZBERG_MAX_UPLOAD_SIZE_MB=1000

See the File Size Limits Reference for comprehensive documentation including: - Memory impact calculations - Reverse proxy configuration - Error handling and troubleshooting - Client-side validation examples - Best practices for large file processing

Concurrent Requests

The server handles concurrent requests efficiently using Tokio's async runtime. For high-throughput scenarios:

  1. Run multiple instances behind a load balancer
  2. Configure reverse proxy connection pooling
  3. Monitor CPU and memory usage to determine optimal replica count

Cache Strategy

Configure cache behavior via kreuzberg.toml:

Enable caching for faster repeated extractions
use_cache = true
cache_dir = "/var/cache/kreuzberg"  # Custom cache location for production

Cache clearing strategies:

Terminal
# Periodic cache clearing via cron job (daily at 2 AM)
0 2 * * * curl -X DELETE http://localhost:8000/cache/clear

# Size-based cache clearing when threshold is exceeded
CACHE_SIZE=$(curl -s http://localhost:8000/cache/stats | jq .total_size_mb)
if [ "$CACHE_SIZE" -gt 1000 ]; then
  curl -X DELETE http://localhost:8000/cache/clear
fi

Next Steps