API Server¶
Kreuzberg provides two server modes for programmatic access: an HTTP REST API server for general integration and a Model Context Protocol (MCP) server for AI agent integration.
Server Types¶
HTTP REST API Server¶
A production-ready HTTP API server providing RESTful endpoints for document extraction, health checks, and cache management.
Best for: - Web applications - Microservices integration - General HTTP clients - Load-balanced deployments
MCP Server¶
A Model Context Protocol server that exposes Kreuzberg as tools for AI agents and assistants.
Best for: - AI agent integration (Claude, GPT, etc.) - Agentic workflows - Tool use by language models - Stdio-based communication
HTTP REST API¶
Starting the Server¶
using System;
using System.Diagnostics;
class ApiServer
{
static void Main()
{
var processInfo = new ProcessStartInfo
{
FileName = "kreuzberg",
Arguments = "serve -H 0.0.0.0 -p 8000",
UseShellExecute = false,
RedirectStandardOutput = true,
RedirectStandardError = true
};
using (var process = Process.Start(processInfo))
{
process?.WaitForExit();
}
}
}
# Run server on port 8000
docker run -d \n -p 8000:8000 \n goldziher/kreuzberg:latest \n serve -H 0.0.0.0 -p 8000
# With environment variables
docker run -d \n -e KREUZBERG_CORS_ORIGINS="https://myapp.com" \n -e KREUZBERG_MAX_UPLOAD_SIZE_MB=200 \n -p 8000:8000 \n goldziher/kreuzberg:latest \n serve -H 0.0.0.0 -p 8000
import java.io.IOException;
public class ApiServer {
public static void main(String[] args) {
try {
ProcessBuilder pb = new ProcessBuilder(
"kreuzberg", "serve", "-H", "0.0.0.0", "-p", "8000"
);
pb.inheritIO();
Process process = pb.start();
process.waitFor();
} catch (IOException | InterruptedException e) {
System.err.println("Failed to start server: " + e.getMessage());
}
}
}
API Endpoints¶
POST /extract¶
Extract text from uploaded files via multipart form data.
Request Format:
- Method: POST
- Content-Type:
multipart/form-data - Fields:
files(required, repeatable): Files to extractconfig(optional): JSON configuration overrides
Response: JSON array of extraction results
Example:
# Extract a single file via HTTP POST
curl -F "files=@document.pdf" http://localhost:8000/extract
# Extract multiple files in a single request
curl -F "files=@doc1.pdf" -F "files=@doc2.docx" \
http://localhost:8000/extract
# Extract with custom OCR configuration override
curl -F "files=@scanned.pdf" \
-F 'config={"ocr":{"language":"eng"},"force_ocr":true}' \
http://localhost:8000/extract
Response Schema:
[
{
"content": "Extracted text content...",
"mime_type": "application/pdf",
"metadata": {
"page_count": 10,
"author": "John Doe"
},
"tables": [],
"detected_languages": ["eng"],
"chunks": null,
"images": null
}
]
POST /embed¶
Generate embeddings for text strings without document extraction.
Request Format:
- Method: POST
- Content-Type:
application/json - Body:
texts(required): Array of strings to generate embeddings forconfig(optional): Embedding configuration overrides
Response: JSON object containing embeddings, model info, dimensions, and count
Example:
# Generate embeddings for two text strings
curl -X POST http://localhost:8000/embed \
-H "Content-Type: application/json" \
-d '{"texts":["Hello world","Second text"]}'
# Generate embeddings with custom model configuration
curl -X POST http://localhost:8000/embed \
-H "Content-Type: application/json" \
-d '{
"texts":["Test text"],
"config":{
"model":{"preset":{"name":"fast"}},
"batch_size":32
}
}'
Response Schema:
{
"embeddings": [
[0.123, -0.456, 0.789, ...], // 384 or 768 or 1024 dimensions
[-0.234, 0.567, -0.891, ...]
],
"model": "balanced",
"dimensions": 768,
"count": 2
}
Available Embedding Presets:
| Preset | Model | Dimensions | Use Case |
|---|---|---|---|
fast | AllMiniLML6V2Q | 384 | Quick prototyping, development |
balanced | BGEBaseENV15 | 768 | General-purpose RAG, production (default) |
quality | BGELargeENV15 | 1024 | Complex documents, maximum accuracy |
multilingual | MultilingualE5Base | 768 | International documents, 100+ languages |
Use Cases:
- Generate embeddings for semantic search
- Create vector representations for RAG (Retrieval-Augmented Generation) pipelines
- Embed text chunks without extracting from documents
- Batch embed multiple texts efficiently
Note: This endpoint requires the embeddings feature to be enabled (available in Docker images and most pre-built binaries). ONNX Runtime must be installed on the system.
GET /health¶
Health check endpoint for monitoring and load balancers.
Example:
Response:
GET /info¶
Server information and capabilities.
Example:
Response:
GET /cache/stats¶
Get cache statistics.
Example:
Response:
{
"directory": ".kreuzberg",
"total_files": 42,
"total_size_mb": 156.8,
"available_space_mb": 45123.5,
"oldest_file_age_days": 7.2,
"newest_file_age_days": 0.1
}
DELETE /cache/clear¶
Clear all cached files.
Example:
Response:
Configuration¶
Configuration File Discovery¶
The server automatically discovers configuration files in this order:
./kreuzberg.toml(current directory)./kreuzberg.yaml./kreuzberg.json- Parent directories (recursive search)
- Default configuration (if no file found)
Example kreuzberg.toml:
[ocr]
backend = "tesseract"
language = "eng"
# Enable quality processing and caching
enable_quality_processing = true
use_cache = true
# Configure token reduction for LLM optimization
[token_reduction]
enabled = true
target_reduction = 0.3
See Configuration Guide for all options.
Environment Variables¶
Upload Limits:
# Set maximum file upload size in megabytes
KREUZBERG_MAX_UPLOAD_SIZE_MB=200 # Max upload size in MB (default: 100)
For detailed configuration options, memory considerations, and performance tuning for large files, see the File Size Limits Reference.
CORS Configuration:
# Configure allowed origins for cross-origin requests (production security)
KREUZBERG_CORS_ORIGINS="https://app.example.com,https://api.example.com"
Security Warning: The default CORS configuration allows all origins for development convenience. This permits CSRF attacks. Always set KREUZBERG_CORS_ORIGINS in production.
Note: Server host and port are configured via CLI flags (-H / --host and -p / --port), not environment variables.
Client Examples¶
using System;
using System.IO;
using System.Net.Http;
var client = new HttpClient();
using (var fileStream = File.OpenRead("document.pdf"))
{
using (var content = new MultipartFormDataContent())
{
content.Add(new StreamContent(fileStream), "files", "document.pdf");
var response = await client.PostAsync("http://localhost:8000/extract", content);
var json = await response.Content.ReadAsStringAsync();
Console.WriteLine(json);
}
}
# Extract content from a single document
curl -F "files=@document.pdf" http://localhost:8000/extract | jq .
# Extract with OCR enabled for scanned documents
curl -F "files=@scanned.pdf" \
-F 'config={"ocr":{"language":"eng"}}' \
http://localhost:8000/extract | jq .
# Batch extract multiple files in parallel
curl -F "files=@doc1.pdf" \
-F "files=@doc2.docx" \
http://localhost:8000/extract | jq .
package main
import (
"bytes"
"fmt"
"io"
"log"
"mime/multipart"
"net/http"
"os"
)
func main() {
file, err := os.Open("document.pdf")
if err != nil {
log.Fatalf("open file: %v", err)
}
defer file.Close()
body := &bytes.Buffer{}
writer := multipart.NewWriter(body)
part, err := writer.CreateFormFile("files", "document.pdf")
if err != nil {
log.Fatalf("create form file: %v", err)
}
if _, err := io.Copy(part, file); err != nil {
log.Fatalf("copy file: %v", err)
}
writer.Close()
resp, err := http.Post("http://localhost:8000/extract",
writer.FormDataContentType(), body)
if err != nil {
log.Fatalf("http post: %v", err)
}
defer resp.Body.Close()
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
log.Fatalf("read response: %v", err)
}
fmt.Println(string(bodyBytes))
}
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.file.Files;
import java.nio.file.Path;
public class ExtractClient {
public static void main(String[] args) throws Exception {
HttpClient client = HttpClient.newHttpClient();
String boundary = "----WebKitFormBoundary" + System.currentTimeMillis();
byte[] fileData = Files.readAllBytes(Path.of("document.pdf"));
String multipartBody = "--" + boundary + "\r\n"
+ "Content-Disposition: form-data; name=\"files\"; filename=\"document.pdf\"\r\n"
+ "Content-Type: application/pdf\r\n\r\n"
+ new String(fileData, java.nio.charset.StandardCharsets.ISO_8859_1) + "\r\n"
+ "--" + boundary + "--\r\n";
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("http://localhost:8000/extract"))
.header("Content-Type", "multipart/form-data; boundary=" + boundary)
.POST(HttpRequest.BodyPublishers.ofString(multipartBody))
.build();
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
System.out.println(response.body());
}
}
require 'net/http'
require 'uri'
require 'json'
# Single file extraction
uri = URI('http://localhost:8000/extract')
request = Net::HTTP::Post.new(uri)
form_data = [['files', File.open('document.pdf')]]
request.set_form form_data, 'multipart/form-data'
response = Net::HTTP.start(uri.hostname, uri.port) do |http|
http.request(request)
end
results = JSON.parse(response.body)
puts results[0]['content']
use reqwest::multipart;
use std::fs::File;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let file = File::open("document.pdf")?;
let form = multipart::Form::new()
.file("files", "document.pdf", file)?;
let client = reqwest::Client::new();
let response = client
.post("http://localhost:8000/extract")
.multipart(form)
.send()
.await?;
let results: serde_json::Value = response.json().await?;
println!("{:?}", results[0]["content"]);
Ok(())
}
Error Handling¶
Error Response Format:
{
"error_type": "ValidationError",
"message": "Invalid file format",
"traceback": "...",
"status_code": 400
}
HTTP Status Codes:
| Status Code | Error Type | Meaning |
|---|---|---|
| 400 | ValidationError | Invalid input parameters |
| 422 | ParsingError, OcrError | Document processing failed |
| 500 | Internal errors | Server errors |
Example:
using System;
using System.IO;
using System.Net.Http;
using System.Text.Json;
var client = new HttpClient();
try
{
using (var fileStream = File.OpenRead("document.pdf"))
{
using (var content = new MultipartFormDataContent())
{
content.Add(new StreamContent(fileStream), "files", "document.pdf");
var response = await client.PostAsync("http://localhost:8000/extract", content);
if (!response.IsSuccessStatusCode)
{
var errorJson = await response.Content.ReadAsStringAsync();
var errorDoc = JsonDocument.Parse(errorJson);
var errorType = errorDoc.RootElement.GetProperty("error_type").GetString();
var message = errorDoc.RootElement.GetProperty("message").GetString();
Console.WriteLine($"Error: {errorType}: {message}");
return;
}
var json = await response.Content.ReadAsStringAsync();
Console.WriteLine($"Success: {json}");
}
}
}
catch (HttpRequestException e)
{
Console.WriteLine($"Request failed: {e.Message}");
}
package main
import (
"encoding/json"
"fmt"
"io"
"net/http"
)
func main() {
resp, err := http.Post("http://localhost:8000/extract", "application/json", nil)
if err != nil {
fmt.Println("Request failed:", err)
return
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
var error map[string]interface{}
body, _ := io.ReadAll(resp.Body)
json.Unmarshal(body, &error)
fmt.Printf("Error: %v: %v\n", error["error_type"], error["message"])
} else {
var results []map[string]interface{}
body, _ := io.ReadAll(resp.Body)
json.Unmarshal(body, &results)
// Process results
}
}
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.URI;
import com.fasterxml.jackson.databind.ObjectMapper;
try {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("http://localhost:8000/extract"))
.POST(HttpRequest.BodyPublishers.ofString(multipartBody))
.build();
HttpResponse<String> response = client.send(request,
HttpResponse.BodyHandlers.ofString());
if (response.statusCode() >= 400) {
ObjectMapper mapper = new ObjectMapper();
Map<String, Object> error = mapper.readValue(response.body(), Map.class);
System.err.println("Error: " + error.get("error_type") +
": " + error.get("message"));
} else {
ObjectMapper mapper = new ObjectMapper();
Map[] results = mapper.readValue(response.body(), Map[].class);
// Process results
}
} catch (IOException | InterruptedException e) {
System.err.println("Request failed: " + e.getMessage());
}
import httpx
try:
with httpx.Client() as client:
with open("document.pdf", "rb") as f:
files: dict = {"files": f}
response: httpx.Response = client.post(
"http://localhost:8000/extract", files=files
)
response.raise_for_status()
results: list = response.json()
print(f"Extracted {len(results)} documents")
except httpx.HTTPStatusError as e:
error: dict = e.response.json()
error_type: str = error.get("error_type", "Unknown")
message: str = error.get("message", "No message")
print(f"Error: {error_type}: {message}")
require 'net/http'
require 'uri'
require 'json'
begin
uri = URI('http://localhost:8000/extract')
request = Net::HTTP::Post.new(uri)
response = Net::HTTP.start(uri.hostname, uri.port) do |http|
http.request(request)
end
if response.code.to_i >= 400
error = JSON.parse(response.body)
puts "Error: #{error['error_type']}: #{error['message']}"
else
results = JSON.parse(response.body)
# Process results
end
rescue => e
puts "Request failed: #{e.message}"
end
use reqwest::Client;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::new();
let response = client
.post("http://localhost:8000/extract")
.send()
.await?;
let status = response.status();
if status.is_client_error() || status.is_server_error() {
let error: serde_json::Value = response.json().await?;
eprintln!(
"Error: {}: {}",
error["error_type"], error["message"]
);
} else {
let results: serde_json::Value = response.json().await?;
println!("{:?}", results);
}
Ok(())
}
try {
const response = await fetch("http://localhost:8000/extract", {
method: "POST",
body: formData,
});
if (!response.ok) {
const error = await response.json();
console.error(`Error: ${error.error_type}: ${error.message}`);
} else {
const results = await response.json();
console.log(results);
}
} catch (e) {
console.error("Request failed:", e);
}
MCP Server¶
The Model Context Protocol (MCP) server exposes Kreuzberg as tools for AI agents and assistants.
Starting the MCP Server¶
using System;
using System.Diagnostics;
using System.Threading.Tasks;
var processInfo = new ProcessStartInfo
{
FileName = "kreuzberg",
Arguments = "mcp",
UseShellExecute = false,
RedirectStandardOutput = true,
RedirectStandardError = true
};
var mcpProcess = Process.Start(processInfo);
Console.WriteLine($"MCP server started with PID: {mcpProcess?.Id}");
await Task.Delay(1000);
Console.WriteLine("Server is running, listening for connections");
mcpProcess?.WaitForExit();
import java.io.IOException;
public class McpServer {
public static void main(String[] args) {
try {
// Start MCP server using CLI
ProcessBuilder pb = new ProcessBuilder("kreuzberg", "mcp");
pb.inheritIO();
Process process = pb.start();
process.waitFor();
} catch (IOException | InterruptedException e) {
System.err.println("Failed to start MCP server: " + e.getMessage());
}
}
}
import subprocess
import time
from typing import Optional
mcp_process: subprocess.Popen = subprocess.Popen(
["python", "-m", "kreuzberg", "mcp"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
pid: Optional[int] = mcp_process.pid
print(f"MCP server started with PID: {pid}")
time.sleep(1)
print("Server is running, listening for connections")
import { spawn } from 'child_process';
const mcpProcess = spawn('kreuzberg', ['mcp']);
mcpProcess.stdout.on('data', (data) => {
console.log(`MCP Server: ${data}`);
});
mcpProcess.stderr.on('data', (data) => {
console.error(`MCP Error: ${data}`);
});
mcpProcess.on('error', (err) => {
console.error(`Failed to start MCP server: ${err.message}`);
});
MCP Tools¶
The MCP server exposes 6 tools for AI agents:
extract_file¶
Extract content from a file path.
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
path | string | Yes | File path to extract |
mime_type | string | No | MIME type hint |
enable_ocr | boolean | No | Enable OCR (default: false) |
force_ocr | boolean | No | Force OCR even if text exists (default: false) |
async | boolean | No | Use async extraction (default: true) |
Example MCP Request:
{
"method": "tools/call",
"params": {
"name": "extract_file",
"arguments": {
"path": "/path/to/document.pdf",
"enable_ocr": true,
"async": true
}
}
}
extract_bytes¶
Extract content from base64-encoded file data.
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
data | string | Yes | Base64-encoded file content |
mime_type | string | No | MIME type hint |
enable_ocr | boolean | No | Enable OCR |
force_ocr | boolean | No | Force OCR |
async | boolean | No | Use async extraction |
batch_extract_files¶
Extract multiple files in parallel.
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
paths | array[string] | Yes | File paths to extract |
enable_ocr | boolean | No | Enable OCR |
force_ocr | boolean | No | Force OCR |
async | boolean | No | Use async extraction |
detect_mime_type¶
Detect file format and return MIME type.
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
path | string | Yes | File path |
use_content | boolean | No | Content-based detection (default: true) |
cache_stats¶
Get cache statistics.
Parameters: None
Returns: Cache directory path, file count, size, available space, file ages
cache_clear¶
Clear all cached files.
Parameters: None
Returns: Number of files removed, space freed
MCP Server Information¶
Server Metadata:
- Name:
kreuzberg-mcp - Title: Kreuzberg Document Intelligence MCP Server
- Version: Current package version
- Website: https://goldziher.github.io/kreuzberg/
- Protocol: MCP (Model Context Protocol)
- Transport: stdio (stdin/stdout)
Capabilities:
- Tool calling (6 tools exposed)
- Async and sync extraction variants
- Base64-encoded file handling
- Batch processing
AI Agent Integration¶
Add to Claude Desktop configuration (~/Library/Application Support/Claude/claude_desktop_config.json on macOS):
{
"mcpServers": {
"kreuzberg": {
"command": "kreuzberg",
"args": ["mcp"]
}
}
}
After adding the configuration, restart Claude Desktop to load the Kreuzberg MCP server.
using System;
using System.Diagnostics;
using System.IO;
using System.Threading.Tasks;
var processInfo = new ProcessStartInfo
{
FileName = "kreuzberg",
Arguments = "mcp",
UseShellExecute = false,
RedirectStandardInput = true,
RedirectStandardOutput = true,
RedirectStandardError = true
};
var process = Process.Start(processInfo);
var clientInput = process.StandardInput;
var clientOutput = process.StandardOutput;
// Initialize session by sending initialize request
var initRequest = new
{
jsonrpc = "2.0",
id = 1,
method = "initialize",
parameters = new { }
};
await clientInput.WriteLineAsync(System.Text.Json.JsonSerializer.Serialize(initRequest));
await clientInput.FlushAsync();
var initResponse = await clientOutput.ReadLineAsync();
Console.WriteLine($"Init response: {initResponse}");
// List available tools
var listRequest = new
{
jsonrpc = "2.0",
id = 2,
method = "tools/list"
};
await clientInput.WriteLineAsync(System.Text.Json.JsonSerializer.Serialize(listRequest));
await clientInput.FlushAsync();
var listResponse = await clientOutput.ReadLineAsync();
Console.WriteLine($"Available tools: {listResponse}");
process?.WaitForExit();
package main
import (
"bufio"
"encoding/json"
"fmt"
"log"
"os/exec"
)
type MCPRequest struct {
Method string `json:"method"`
Params MCPParams `json:"params"`
}
type MCPParams struct {
Name string `json:"name"`
Arguments map[string]interface{} `json:"arguments"`
}
func main() {
cmd := exec.Command("kreuzberg", "mcp")
stdin, err := cmd.StdinPipe()
if err != nil {
log.Fatalf("create stdin pipe: %v", err)
}
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Fatalf("create stdout pipe: %v", err)
}
if err := cmd.Start(); err != nil {
log.Fatalf("start command: %v", err)
}
request := MCPRequest{
Method: "tools/call",
Params: MCPParams{
Name: "extract_file",
Arguments: map[string]interface{}{
"path": "document.pdf",
"async": true,
},
},
}
data, err := json.Marshal(request)
if err != nil {
log.Fatalf("marshal request: %v", err)
}
fmt.Fprintf(stdin, "%s\n", string(data))
scanner := bufio.NewScanner(stdout)
if scanner.Scan() {
fmt.Println(scanner.Text())
}
if err := cmd.Wait(); err != nil {
log.Fatalf("wait for command: %v", err)
}
}
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.util.Map;
public class McpClient {
private final Process mcpProcess;
private final BufferedWriter stdin;
private final BufferedReader stdout;
private final ObjectMapper mapper = new ObjectMapper();
public McpClient() throws IOException {
ProcessBuilder pb = new ProcessBuilder("kreuzberg", "mcp");
mcpProcess = pb.start();
stdin = new BufferedWriter(new OutputStreamWriter(mcpProcess.getOutputStream()));
stdout = new BufferedReader(new InputStreamReader(mcpProcess.getInputStream()));
}
public String extractFile(String path) throws IOException {
Map<String, Object> request = Map.of(
"method", "tools/call",
"params", Map.of(
"name", "extract_file",
"arguments", Map.of("path", path, "async", true)
)
);
stdin.write(mapper.writeValueAsString(request));
stdin.newLine();
stdin.flush();
String response = stdout.readLine();
@SuppressWarnings("unchecked")
Map<String, Object> result = mapper.readValue(response, Map.class);
@SuppressWarnings("unchecked")
Map<String, Object> resultData = (Map<String, Object>) result.get("result");
return (String) resultData.get("content");
}
public void close() throws IOException {
stdin.close();
stdout.close();
mcpProcess.destroy();
}
public static void main(String[] args) {
try (McpClient client = new McpClient()) {
String content = client.extractFile("contract.pdf");
System.out.println("Extracted content: " + content);
} catch (IOException e) {
System.err.println("Error: " + e.getMessage());
}
}
}
from langchain.agents import initialize_agent, AgentType
from langchain.tools import Tool
from langchain_openai import ChatOpenAI
import subprocess
import json
mcp_process = subprocess.Popen(
["kreuzberg", "mcp"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
def extract_file(path: str) -> str:
request: dict = {
"method": "tools/call",
"params": {
"name": "extract_file",
"arguments": {"path": path, "async": True},
},
}
mcp_process.stdin.write(json.dumps(request).encode() + b"\n")
mcp_process.stdin.flush()
response = mcp_process.stdout.readline()
return json.loads(response)["result"]["content"]
tools: list[Tool] = [
Tool(name="extract_document", func=extract_file, description="Extract")
]
llm = ChatOpenAI(temperature=0)
agent = initialize_agent(
tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION
)
import asyncio
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
async def main() -> None:
server_params: StdioServerParameters = StdioServerParameters(
command="kreuzberg", args=["mcp"]
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
tools = await session.list_tools()
tool_names: list[str] = [t.name for t in tools.tools]
print(f"Available tools: {tool_names}")
result = await session.call_tool(
"extract_file", arguments={"path": "document.pdf", "async": True}
)
print(result)
asyncio.run(main())
require 'json'
require 'open3'
Open3.popen3('kreuzberg', 'mcp') do |stdin, stdout, stderr, wait_thr|
request = {
method: 'tools/call',
params: {
name: 'extract_file',
arguments: { path: 'document.pdf', async: true }
}
}
stdin.puts JSON.generate(request)
stdin.close_write
response = stdout.gets
result = JSON.parse(response)
puts JSON.pretty_generate(result)
end
use serde_json::json;
use std::io::{BufRead, BufReader, Write};
use std::process::{Command, Stdio};
fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut child = Command::new("kreuzberg")
.arg("mcp")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()?;
{
let stdin = child.stdin.as_mut().ok_or("Failed to open stdin")?;
let request = json!({
"method": "tools/call",
"params": {
"name": "extract_file",
"arguments": {
"path": "document.pdf",
"async": true
}
}
});
stdin.write_all(request.to_string().as_bytes())?;
stdin.write_all(b"\n")?;
}
let stdout = child.stdout.take().ok_or("Failed to open stdout")?;
let reader = BufReader::new(stdout);
for line in reader.lines() {
if let Ok(line) = line {
println!("{}", line);
break;
}
}
child.wait()?;
Ok(())
}
import { spawn } from 'child_process';
import * as readline from 'readline';
const mcpProcess = spawn('kreuzberg', ['mcp']);
const rl = readline.createInterface({
input: mcpProcess.stdout,
output: mcpProcess.stdin,
terminal: false,
});
const request = {
method: 'tools/call',
params: {
name: 'extract_file',
arguments: {
path: 'document.pdf',
async: true,
},
},
};
mcpProcess.stdin.write(JSON.stringify(request) + '\n');
rl.on('line', (line) => {
const response = JSON.parse(line);
console.log(response);
mcpProcess.kill();
});
mcpProcess.on('error', (err) => {
console.error('Failed to start MCP process:', err);
});
Production Deployment¶
Docker Deployment¶
Docker Compose Example:
version: '3.8'
services:
kreuzberg-api:
image: goldziher/kreuzberg:latest
ports:
- "8000:8000"
environment:
# Configure CORS for production security
- KREUZBERG_CORS_ORIGINS=https://myapp.com,https://api.myapp.com
# Set maximum upload size for large documents
- KREUZBERG_MAX_UPLOAD_SIZE_MB=500
volumes:
# Mount configuration and cache directories
- ./config:/config
- ./cache:/app/.kreuzberg
command: ["kreuzberg", "serve", "-H", "0.0.0.0", "-p", "8000", "--config", "/config/kreuzberg.toml"]
restart: unless-stopped
healthcheck:
# Health check for container orchestration
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
Run:
Kubernetes Deployment¶
Deployment Manifest:
apiVersion: apps/v1
kind: Deployment
metadata:
name: kreuzberg-api
spec:
replicas: 3 # Deploy 3 replicas for high availability
selector:
matchLabels:
app: kreuzberg-api
template:
metadata:
labels:
app: kreuzberg-api
spec:
containers:
- name: kreuzberg
image: goldziher/kreuzberg:latest
ports:
- containerPort: 8000
env:
# Production environment configuration
- name: KREUZBERG_CORS_ORIGINS
value: "https://myapp.com"
- name: KREUZBERG_MAX_UPLOAD_SIZE_MB
value: "500"
command: ["kreuzberg", "serve", "-H", "0.0.0.0", "-p", "8000"]
livenessProbe:
# Check if container is alive and healthy
httpGet:
path: /health
port: 8000
initialDelaySeconds: 10
periodSeconds: 30
readinessProbe:
# Check if container is ready to accept traffic
httpGet:
path: /health
port: 8000
initialDelaySeconds: 5
periodSeconds: 10
resources:
# Resource limits for optimal performance
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "2000m"
---
apiVersion: v1
kind: Service
metadata:
name: kreuzberg-api
spec:
selector:
app: kreuzberg-api
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: LoadBalancer # Expose service via load balancer
Reverse Proxy Configuration¶
Nginx:
# Load balance across multiple Kreuzberg instances
upstream kreuzberg {
server 127.0.0.1:8000;
server 127.0.0.1:8001;
server 127.0.0.1:8002;
}
server {
listen 443 ssl http2;
server_name api.example.com;
# SSL/TLS configuration
ssl_certificate /path/to/cert.pem;
ssl_certificate_key /path/to/key.pem;
# Increase upload size limit for large documents
client_max_body_size 500M;
location / {
proxy_pass http://kreuzberg;
# Forward client headers
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# Extended timeouts for large file processing
proxy_read_timeout 300s;
proxy_send_timeout 300s;
}
location /health {
proxy_pass http://kreuzberg;
access_log off; # Disable logging for health checks
}
}
Caddy:
api.example.com {
# Load balance with automatic health checks
reverse_proxy localhost:8000 localhost:8001 localhost:8002 {
lb_policy round_robin
health_uri /health
health_interval 10s
}
# Increase maximum upload size for large documents
request_body {
max_size 500MB
}
}
Production Checklist¶
- Set
KREUZBERG_CORS_ORIGINSto explicit allowed origins - Configure
KREUZBERG_MAX_UPLOAD_SIZE_MBbased on expected document sizes - Use reverse proxy (Nginx/Caddy) for SSL/TLS termination
- Enable logging via
RUST_LOG=infoenvironment variable - Set up health checks on
/healthendpoint - Monitor cache size and set up periodic clearing
- Use
0.0.0.0binding for containerized deployments - Configure resource limits (CPU, memory) in container orchestration
- Test with large files to validate upload limits and timeouts
- Implement rate limiting at reverse proxy level
- Set up monitoring (Prometheus metrics, logs aggregation)
- Plan for horizontal scaling with load balancing
Monitoring¶
Health Check Endpoint:
# Simple health check for manual verification
curl http://localhost:8000/health
# Continuous monitoring script for production
#!/bin/bash
while true; do
if curl -f http://localhost:8000/health > /dev/null 2>&1; then
echo "$(date): Server healthy"
else
echo "$(date): Server unhealthy"
# Send alert to monitoring system
fi
sleep 30
done
Cache Monitoring:
# Retrieve cache statistics and usage metrics
curl http://localhost:8000/cache/stats | jq .
# Automatic cache clearing when size exceeds threshold
CACHE_SIZE=$(curl -s http://localhost:8000/cache/stats | jq .total_size_mb)
if (( $(echo "$CACHE_SIZE > 1000" | bc -l) )); then
curl -X DELETE http://localhost:8000/cache/clear
fi
Logging:
# Run with debug logging for development and troubleshooting
RUST_LOG=debug kreuzberg serve -H 0.0.0.0 -p 8000
# Production logging with info level (recommended)
RUST_LOG=info kreuzberg serve -H 0.0.0.0 -p 8000
# JSON structured logging for log aggregation systems
RUST_LOG=info RUST_LOG_FORMAT=json kreuzberg serve -H 0.0.0.0 -p 8000
Performance Tuning¶
Upload Size Limits¶
Configure based on expected document sizes:
# Configuration for small documents (PDFs, images under 10 MB)
export KREUZBERG_MAX_UPLOAD_SIZE_MB=50
# Configuration for typical business documents (under 50 MB)
export KREUZBERG_MAX_UPLOAD_SIZE_MB=200
# Configuration for large scans, archives, and high-resolution images
export KREUZBERG_MAX_UPLOAD_SIZE_MB=1000
See the File Size Limits Reference for comprehensive documentation including: - Memory impact calculations - Reverse proxy configuration - Error handling and troubleshooting - Client-side validation examples - Best practices for large file processing
Concurrent Requests¶
The server handles concurrent requests efficiently using Tokio's async runtime. For high-throughput scenarios:
- Run multiple instances behind a load balancer
- Configure reverse proxy connection pooling
- Monitor CPU and memory usage to determine optimal replica count
Cache Strategy¶
Configure cache behavior via kreuzberg.toml:
use_cache = true
cache_dir = "/var/cache/kreuzberg" # Custom cache location for production
Cache clearing strategies:
# Periodic cache clearing via cron job (daily at 2 AM)
0 2 * * * curl -X DELETE http://localhost:8000/cache/clear
# Size-based cache clearing when threshold is exceeded
CACHE_SIZE=$(curl -s http://localhost:8000/cache/stats | jq .total_size_mb)
if [ "$CACHE_SIZE" -gt 1000 ]; then
curl -X DELETE http://localhost:8000/cache/clear
fi
Next Steps¶
- Configuration Guide - Detailed configuration options
- CLI Usage - Command-line interface
- Advanced Features - Chunking, language detection, token reduction
- Plugin Development - Extend Kreuzberg functionality