Skip to main content
The Kafka connector is an Enterprise feature. It requires a Bifrost Enterprise license.

Overview

The Kafka connector publishes completed Bifrost request traces as JSON messages to a configured Kafka topic. Each message is keyed by the trace ID, so all spans for a trace land on the same partition and arrive in order. Use the Kafka connector when you want to:
  • Stream traces into your own data platform (ClickHouse, BigQuery, Spark, etc.)
  • Archive LLM request logs to cold storage via Kafka consumers
  • Build custom dashboards on top of raw trace data without the built-in log store
  • Fan out to multiple downstream systems through Kafka consumer groups

How it works

After each request completes, the connector serializes the full trace — including all spans, attributes, and optionally request headers — to JSON and writes it to Kafka as a single message. The message key is the trace ID. Writes are asynchronous so they have zero impact on request latency. Messages are batched internally and flushed based on batch_size and flush_interval_ms.

Setup

  1. Navigate to Observability in the sidebar.
  2. Select Kafka from the connector list.
  3. Add at least one broker address (e.g. localhost:9092) and enter a Topic name.
  4. Configure optional settings: compression, TLS, SASL, and batch tuning.
  5. Toggle Enabled on, then click Save Kafka Configuration.
Use the Configure Plugin Tracing button in the top-right to control which plugin spans are included in published trace payloads. See Filtering plugin spans.

Configuration reference

FieldTypeRequiredDefaultDescription
brokersstring[]Kafka broker addresses (e.g. ["localhost:9092"]). At least one required.
topicstringKafka topic to publish traces to.
tls_enabledbooleanfalseEnable TLS for broker connections.
ca_certstring | SecretVarPEM-encoded CA certificate to verify the broker’s TLS cert. Omit to use the system CA pool. Supports env.VAR_NAME.
sasl_enabledbooleanfalseEnable SASL authentication. Requires sasl to also be configured.
sasl.mechanismstring✅ (if SASL)Authentication mechanism: PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512. No default — must be set explicitly.
sasl.usernamestring | SecretVar✅ (if SASL)SASL username. Supports env.VAR_NAME.
sasl.passwordstring | SecretVar✅ (if SASL)SASL password. Supports env.VAR_NAME.
compressionstringnoneCompression codec: none, gzip, snappy, lz4, or zstd.
batch_sizeinteger100Maximum number of messages batched per write.
flush_interval_msinteger1000Maximum milliseconds to wait before flushing a batch.
auto_create_topicbooleanfalseCreate the topic at startup if it does not exist. Requires broker admin permissions.
disable_content_loggingbooleanfalseStrip input/output message content from traces before publishing.
request_headersstring[]Request-header patterns to capture and embed in traces. Supports wildcards (e.g. x-custom-*).
plugin_span_filterobjectControls which plugin spans are included in published payloads. See Filtering plugin spans.

Security

TLS

Enable tls_enabled to encrypt the connection to your brokers. If your broker uses a certificate signed by a private CA, supply the PEM-encoded CA certificate via ca_cert. Omit ca_cert to fall back to the system CA pool, which is appropriate for brokers with publicly signed certificates.
{
  "tls_enabled": true,
  "ca_cert": "env.KAFKA_CA_CERT"
}
Always reference ca_cert via an environment variable (env.KAFKA_CA_CERT) rather than embedding PEM text directly in config.json or the database.

SASL authentication

The connector supports three SASL mechanisms:
MechanismDescription
PLAINUsername/password in plaintext at the protocol layer
SCRAM-SHA-256SCRAM challenge-response with SHA-256 digest
SCRAM-SHA-512SCRAM challenge-response with SHA-512 digest
{
  "sasl_enabled": true,
  "sasl": {
    "mechanism": "PLAIN",
    "username": "env.KAFKA_USERNAME",
    "password": "env.KAFKA_PASSWORD"
  }
}
PLAIN sends credentials in cleartext at the protocol level. Always enable tls_enabled: true alongside PLAIN in production to prevent credential exposure.

Filtering traces

Stripping message content

When disable_content_logging is true, the connector removes all input and output message content from spans before serializing to JSON. Span metadata — timing, token counts, model, provider, cost, and status — is preserved. This is useful when downstream consumers should not have access to the actual prompt and completion text for compliance or access-control reasons.

Capturing request headers

By default, no request headers are embedded in the trace payload. Set request_headers to a list of header name patterns to include:
{
  "request_headers": ["x-tenant-id", "x-request-source", "x-custom-*"]
}
Patterns support exact names and wildcards: x-custom-* captures all headers with that prefix. * captures every header including Authorization — use with caution. Captured headers appear under RequestHeaders in the published JSON.

Filtering plugin spans

By default every plugin hook generates a span in the trace, which can add significant noise (e.g. 8 built-in plugins × 2 hooks = 16 spans per request). Use plugin_span_filter to control which plugin spans are published:
{
  "plugin_span_filter": {
    "mode": "exclude",
    "plugins": ["logging", "telemetry", "compat", "kafka"]
  }
}
ModeBehaviour
includePublish spans only for the listed plugins
excludePublish spans for all plugins except those listed
Plugin names match the <name> segment in span names like plugin.<name>.prerequesthook, plugin.<name>.prehook, and plugin.<name>.posthook. Use the Configure Plugin Tracing button on the Kafka connector page in the UI to toggle individual plugins instead of editing config directly. When a plugin span is filtered out, its children are automatically re-parented to the nearest surviving ancestor so the span tree stays connected.

Trace payload format

Each Kafka message value is a JSON-serialized trace. The message key is the TraceID. The RootSpan is the inbound HTTP request span. The Spans array contains every span in the trace — including the root span as its first element — followed by plugin hook spans and the llm.call span. RequestHeaders and PluginLogs are null when no headers are captured and no plugin logs were emitted.
{
  "RequestID": "d2791ef1-3386-4ec8-9861-87bdaaac72a8",
  "TraceID": "0e8b9293a69d4652804d2ab61121c1f2",
  "ParentID": "",
  "StartTime": "2026-06-29T17:34:38.435383+05:30",
  "EndTime": "2026-06-29T17:34:39.555003+05:30",
  "Attributes": {},
  "RequestHeaders": null,
  "PluginLogs": [],
  "RootSpan": {
    "SpanID": "40cd8047c2cb44c6",
    "ParentID": "",
    "TraceID": "0e8b9293a69d4652804d2ab61121c1f2",
    "Name": "/v1/chat/completions",
    "Kind": "http.request",
    "StartTime": "2026-06-29T17:34:38.436463+05:30",
    "EndTime": "2026-06-29T17:34:39.554577+05:30",
    "Status": "ok",
    "StatusMsg": "",
    "Attributes": {
      "http.method": "POST",
      "http.url": "/v1/chat/completions",
      "http.status_code": 200,
      "http.user_agent": "bruno-runtime/3.5.0",
      "gen_ai.provider.name": "openai",
      "gen_ai.request.model": "gpt-4o-mini",
      "gen_ai.response.model": "gpt-4o-mini-2024-07-18",
      "gen_ai.input.messages": "hello",
      "gen_ai.output.messages": "Hello! How can I assist you today?",
      "gen_ai.response.finish_reasons": ["stop"]
    },
    "Events": []
  },
  "Spans": [
    {
      "SpanID": "40cd8047c2cb44c6",
      "Name": "/v1/chat/completions",
      "Kind": "http.request",
      "...": "(root span repeated as first element)"
    },
    {
      "SpanID": "5bf3c4f42fe448cc",
      "ParentID": "40cd8047c2cb44c6",
      "Name": "plugin.telemetry.prerequesthook",
      "Kind": "plugin",
      "StartTime": "2026-06-29T17:34:38.445065+05:30",
      "EndTime": "2026-06-29T17:34:38.445073+05:30",
      "Status": "ok",
      "StatusMsg": "",
      "Attributes": {},
      "Events": []
    },
    {
      "...": "(additional plugin.*.prerequesthook → plugin.*.prehook spans)"
    },
    {
      "SpanID": "6e51fa290a2e43d0",
      "ParentID": "8e3ac231d1d64907",
      "Name": "chat gpt-4o-mini",
      "Kind": "llm.call",
      "StartTime": "2026-06-29T17:34:38.450939+05:30",
      "EndTime": "2026-06-29T17:34:39.551269+05:30",
      "Status": "ok",
      "StatusMsg": "",
      "Attributes": {
        "gen_ai.provider.name": "openai",
        "gen_ai.request.model": "gpt-4o-mini",
        "gen_ai.response.model": "gpt-4o-mini-2024-07-18",
        "gen_ai.usage.prompt_tokens": 8,
        "gen_ai.usage.completion_tokens": 9,
        "gen_ai.usage.total_tokens": 17,
        "gen_ai.usage.cost": 160.0000054,
        "gen_ai.input.messages": "[{\"role\":\"user\",\"content\":\"hello\"}]",
        "gen_ai.output.messages": "[{\"role\":\"assistant\",\"content\":\"Hello! How can I assist you today?\"}]",
        "gen_ai.response.finish_reason": "stop",
        "bifrost.virtual_key.name": "my-vk",
        "bifrost.retries": 0
      },
      "Events": []
    },
    {
      "...": "(plugin.*.posthook spans follow)"
    }
  ]
}
RequestHeaders is populated only for headers matched by your request_headers patterns — it is null otherwise. If disable_content_logging is true, gen_ai.input.* and gen_ai.output.* attributes are stripped from all spans before publishing.

Troubleshooting

Topic does not exist at startup

Symptom: Plugin fails to initialize with kafka plugin: topic "X" does not exist. Fix: Either create the topic manually before starting Bifrost, or set auto_create_topic: true to have the connector create it automatically on startup (requires broker admin permissions).

SASL authentication failure

Symptom: kafka plugin: failed to connect/authenticate with broker. Checks:
  • Confirm sasl_enabled is true and sasl credentials are set.
  • Verify environment variables resolve to non-empty strings.
  • For PLAIN, confirm TLS is also enabled — some brokers reject PLAIN without TLS.

Broker unreachable

Symptom: kafka plugin: failed to connect/authenticate with broker: dial tcp ... Checks:
  • Verify the broker address is reachable from the Bifrost host: nc -zv <broker-host> <port>
  • If using TLS, confirm tls_enabled is true and the CA certificate matches the broker’s cert.

Messages not appearing in topic

Symptom: Plugin initializes but no messages arrive in the topic. Checks:
  • Messages are written asynchronously — check Bifrost logs for kafka plugin: failed to write trace ... errors.
  • Confirm the plugin entry has "enabled": true.
  • Messages may be buffered for up to flush_interval_ms milliseconds (default 1000ms) before being flushed.

Next steps

  • OpenTelemetry - Send traces to Grafana, Datadog, New Relic, and other OTLP backends
  • Prometheus - Expose metrics for scraping or push to a Prometheus stack
  • Built-in observability - Query logs directly from the Bifrost dashboard