> ## Documentation Index
> Fetch the complete documentation index at: https://docs.getbifrost.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Kafka

> Stream Bifrost request traces as JSON to a Kafka topic for custom analytics, archival, and downstream processing

<Note>
  The Kafka connector is an **Enterprise** feature. It requires a Bifrost Enterprise license.
</Note>

## Overview

The **Kafka connector** publishes completed Bifrost request traces as JSON messages to a configured Kafka topic. Each message is keyed by the trace ID, so all spans for a trace land on the same partition and arrive in order.

Use the Kafka connector when you want to:

* **Stream traces** into your own data platform (ClickHouse, BigQuery, Spark, etc.)
* **Archive LLM request logs** to cold storage via Kafka consumers
* **Build custom dashboards** on top of raw trace data without the built-in log store
* **Fan out** to multiple downstream systems through Kafka consumer groups

***

## How it works

After each request completes, the connector serializes the full trace — including all spans, attributes, and optionally request headers — to JSON and writes it to Kafka as a single message. The message key is the trace ID. Writes are asynchronous so they have zero impact on request latency. Messages are batched internally and flushed based on `batch_size` and `flush_interval_ms`.

***

## Setup

<Tabs group="config-method">
  <Tab title="Web UI">
    1. Navigate to **Observability** in the sidebar.
    2. Select **Kafka** from the connector list.
    3. Add at least one broker address (e.g. `localhost:9092`) and enter a **Topic** name.
    4. Configure optional settings: compression, TLS, SASL, and batch tuning.
    5. Toggle **Enabled** on, then click **Save Kafka Configuration**.

    Use the **Configure Plugin Tracing** button in the top-right to control which plugin spans are included in published trace payloads. See [Filtering plugin spans](#filtering-plugin-spans).
  </Tab>

  <Tab title="config.json">
    Minimal configuration:

    ```json theme={null}
    {
      "plugins": [
        {
          "enabled": true,
          "name": "kafka",
          "config": {
            "brokers": ["localhost:9092"],
            "topic": "bifrost-traces"
          }
        }
      ]
    }
    ```

    Full example with TLS, SASL, and content filtering:

    ```json theme={null}
    {
      "plugins": [
        {
          "enabled": true,
          "name": "kafka",
          "config": {
            "brokers": ["kafka.internal:9093"],
            "topic": "bifrost-traces",
            "tls_enabled": true,
            "ca_cert": "env.KAFKA_CA_CERT",
            "sasl_enabled": true,
            "sasl": {
              "mechanism": "SCRAM-SHA-256",
              "username": "env.KAFKA_USERNAME",
              "password": "env.KAFKA_PASSWORD"
            },
            "compression": "zstd",
            "batch_size": 100,
            "flush_interval_ms": 1000,
            "auto_create_topic": false,
            "disable_content_logging": false,
            "request_headers": ["x-tenant-id", "x-request-source"]
          }
        }
      ]
    }
    ```
  </Tab>
</Tabs>

***

## Configuration reference

| Field                     | Type                  | Required    | Default | Description                                                                                                            |
| ------------------------- | --------------------- | ----------- | ------- | ---------------------------------------------------------------------------------------------------------------------- |
| `brokers`                 | `string[]`            | ✅           | —       | Kafka broker addresses (e.g. `["localhost:9092"]`). At least one required.                                             |
| `topic`                   | `string`              | ✅           | —       | Kafka topic to publish traces to.                                                                                      |
| `tls_enabled`             | `boolean`             | ❌           | `false` | Enable TLS for broker connections.                                                                                     |
| `ca_cert`                 | `string \| SecretVar` | ❌           | —       | PEM-encoded CA certificate to verify the broker's TLS cert. Omit to use the system CA pool. Supports `env.VAR_NAME`.   |
| `sasl_enabled`            | `boolean`             | ❌           | `false` | Enable SASL authentication. Requires `sasl` to also be configured.                                                     |
| `sasl.mechanism`          | `string`              | ✅ (if SASL) | —       | Authentication mechanism: `PLAIN`, `SCRAM-SHA-256`, or `SCRAM-SHA-512`. No default — must be set explicitly.           |
| `sasl.username`           | `string \| SecretVar` | ✅ (if SASL) | —       | SASL username. Supports `env.VAR_NAME`.                                                                                |
| `sasl.password`           | `string \| SecretVar` | ✅ (if SASL) | —       | SASL password. Supports `env.VAR_NAME`.                                                                                |
| `compression`             | `string`              | ❌           | `none`  | Compression codec: `none`, `gzip`, `snappy`, `lz4`, or `zstd`.                                                         |
| `batch_size`              | `integer`             | ❌           | `100`   | Maximum number of messages batched per write.                                                                          |
| `flush_interval_ms`       | `integer`             | ❌           | `1000`  | Maximum milliseconds to wait before flushing a batch.                                                                  |
| `auto_create_topic`       | `boolean`             | ❌           | `false` | Create the topic at startup if it does not exist. Requires broker admin permissions.                                   |
| `disable_content_logging` | `boolean`             | ❌           | `false` | Strip input/output message content from traces before publishing.                                                      |
| `request_headers`         | `string[]`            | ❌           | —       | Request-header patterns to capture and embed in traces. Supports wildcards (e.g. `x-custom-*`).                        |
| `plugin_span_filter`      | `object`              | ❌           | —       | Controls which plugin spans are included in published payloads. See [Filtering plugin spans](#filtering-plugin-spans). |

***

## Security

### TLS

Enable `tls_enabled` to encrypt the connection to your brokers. If your broker uses a certificate signed by a private CA, supply the PEM-encoded CA certificate via `ca_cert`. Omit `ca_cert` to fall back to the system CA pool, which is appropriate for brokers with publicly signed certificates.

```json theme={null}
{
  "tls_enabled": true,
  "ca_cert": "env.KAFKA_CA_CERT"
}
```

<Warning>
  Always reference `ca_cert` via an environment variable (`env.KAFKA_CA_CERT`) rather than embedding PEM text directly in `config.json` or the database.
</Warning>

### SASL authentication

The connector supports three SASL mechanisms:

| Mechanism       | Description                                          |
| --------------- | ---------------------------------------------------- |
| `PLAIN`         | Username/password in plaintext at the protocol layer |
| `SCRAM-SHA-256` | SCRAM challenge-response with SHA-256 digest         |
| `SCRAM-SHA-512` | SCRAM challenge-response with SHA-512 digest         |

```json theme={null}
{
  "sasl_enabled": true,
  "sasl": {
    "mechanism": "PLAIN",
    "username": "env.KAFKA_USERNAME",
    "password": "env.KAFKA_PASSWORD"
  }
}
```

<Note>
  `PLAIN` sends credentials in cleartext at the protocol level. Always enable `tls_enabled: true` alongside `PLAIN` in production to prevent credential exposure.
</Note>

***

## Filtering traces

### Stripping message content

When `disable_content_logging` is `true`, the connector removes all input and output message content from spans before serializing to JSON. Span metadata — timing, token counts, model, provider, cost, and status — is preserved.

This is useful when downstream consumers should not have access to the actual prompt and completion text for compliance or access-control reasons.

### Capturing request headers

By default, no request headers are embedded in the trace payload. Set `request_headers` to a list of header name patterns to include:

```json theme={null}
{
  "request_headers": ["x-tenant-id", "x-request-source", "x-custom-*"]
}
```

Patterns support exact names and wildcards: `x-custom-*` captures all headers with that prefix. `*` captures every header including `Authorization` — use with caution.

Captured headers appear under `RequestHeaders` in the published JSON.

### Filtering plugin spans

By default every plugin hook generates a span in the trace, which can add significant noise (e.g. 8 built-in plugins × 2 hooks = 16 spans per request). Use `plugin_span_filter` to control which plugin spans are published:

```json theme={null}
{
  "plugin_span_filter": {
    "mode": "exclude",
    "plugins": ["logging", "telemetry", "compat", "kafka"]
  }
}
```

| Mode      | Behaviour                                         |
| --------- | ------------------------------------------------- |
| `include` | Publish spans only for the listed plugins         |
| `exclude` | Publish spans for all plugins except those listed |

Plugin names match the `<name>` segment in span names like `plugin.<name>.prerequesthook`, `plugin.<name>.prehook`, and `plugin.<name>.posthook`. Use the **Configure Plugin Tracing** button on the Kafka connector page in the UI to toggle individual plugins instead of editing config directly.

When a plugin span is filtered out, its children are automatically re-parented to the nearest surviving ancestor so the span tree stays connected.

***

## Trace payload format

Each Kafka message value is a JSON-serialized trace. The message key is the `TraceID`.

The `RootSpan` is the inbound HTTP request span. The `Spans` array contains every span in the trace — including the root span as its first element — followed by plugin hook spans and the `llm.call` span. `RequestHeaders` and `PluginLogs` are `null` when no headers are captured and no plugin logs were emitted.

```json theme={null}
{
  "RequestID": "d2791ef1-3386-4ec8-9861-87bdaaac72a8",
  "TraceID": "0e8b9293a69d4652804d2ab61121c1f2",
  "ParentID": "",
  "StartTime": "2026-06-29T17:34:38.435383+05:30",
  "EndTime": "2026-06-29T17:34:39.555003+05:30",
  "Attributes": {},
  "RequestHeaders": null,
  "PluginLogs": [],
  "RootSpan": {
    "SpanID": "40cd8047c2cb44c6",
    "ParentID": "",
    "TraceID": "0e8b9293a69d4652804d2ab61121c1f2",
    "Name": "/v1/chat/completions",
    "Kind": "http.request",
    "StartTime": "2026-06-29T17:34:38.436463+05:30",
    "EndTime": "2026-06-29T17:34:39.554577+05:30",
    "Status": "ok",
    "StatusMsg": "",
    "Attributes": {
      "http.method": "POST",
      "http.url": "/v1/chat/completions",
      "http.status_code": 200,
      "http.user_agent": "bruno-runtime/3.5.0",
      "gen_ai.provider.name": "openai",
      "gen_ai.request.model": "gpt-4o-mini",
      "gen_ai.response.model": "gpt-4o-mini-2024-07-18",
      "gen_ai.input.messages": "hello",
      "gen_ai.output.messages": "Hello! How can I assist you today?",
      "gen_ai.response.finish_reasons": ["stop"]
    },
    "Events": []
  },
  "Spans": [
    {
      "SpanID": "40cd8047c2cb44c6",
      "Name": "/v1/chat/completions",
      "Kind": "http.request",
      "...": "(root span repeated as first element)"
    },
    {
      "SpanID": "5bf3c4f42fe448cc",
      "ParentID": "40cd8047c2cb44c6",
      "Name": "plugin.telemetry.prerequesthook",
      "Kind": "plugin",
      "StartTime": "2026-06-29T17:34:38.445065+05:30",
      "EndTime": "2026-06-29T17:34:38.445073+05:30",
      "Status": "ok",
      "StatusMsg": "",
      "Attributes": {},
      "Events": []
    },
    {
      "...": "(additional plugin.*.prerequesthook → plugin.*.prehook spans)"
    },
    {
      "SpanID": "6e51fa290a2e43d0",
      "ParentID": "8e3ac231d1d64907",
      "Name": "chat gpt-4o-mini",
      "Kind": "llm.call",
      "StartTime": "2026-06-29T17:34:38.450939+05:30",
      "EndTime": "2026-06-29T17:34:39.551269+05:30",
      "Status": "ok",
      "StatusMsg": "",
      "Attributes": {
        "gen_ai.provider.name": "openai",
        "gen_ai.request.model": "gpt-4o-mini",
        "gen_ai.response.model": "gpt-4o-mini-2024-07-18",
        "gen_ai.usage.prompt_tokens": 8,
        "gen_ai.usage.completion_tokens": 9,
        "gen_ai.usage.total_tokens": 17,
        "gen_ai.usage.cost": 160.0000054,
        "gen_ai.input.messages": "[{\"role\":\"user\",\"content\":\"hello\"}]",
        "gen_ai.output.messages": "[{\"role\":\"assistant\",\"content\":\"Hello! How can I assist you today?\"}]",
        "gen_ai.response.finish_reason": "stop",
        "bifrost.virtual_key.name": "my-vk",
        "bifrost.retries": 0
      },
      "Events": []
    },
    {
      "...": "(plugin.*.posthook spans follow)"
    }
  ]
}
```

`RequestHeaders` is populated only for headers matched by your `request_headers` patterns — it is `null` otherwise. If `disable_content_logging` is `true`, `gen_ai.input.*` and `gen_ai.output.*` attributes are stripped from all spans before publishing.

***

## Troubleshooting

### Topic does not exist at startup

**Symptom:** Plugin fails to initialize with `kafka plugin: topic "X" does not exist`.

**Fix:** Either create the topic manually before starting Bifrost, or set `auto_create_topic: true` to have the connector create it automatically on startup (requires broker admin permissions).

### SASL authentication failure

**Symptom:** `kafka plugin: failed to connect/authenticate with broker`.

**Checks:**

* Confirm `sasl_enabled` is `true` and `sasl` credentials are set.
* Verify environment variables resolve to non-empty strings.
* For `PLAIN`, confirm TLS is also enabled — some brokers reject PLAIN without TLS.

### Broker unreachable

**Symptom:** `kafka plugin: failed to connect/authenticate with broker: dial tcp ...`

**Checks:**

* Verify the broker address is reachable from the Bifrost host: `nc -zv <broker-host> <port>`
* If using TLS, confirm `tls_enabled` is `true` and the CA certificate matches the broker's cert.

### Messages not appearing in topic

**Symptom:** Plugin initializes but no messages arrive in the topic.

**Checks:**

* Messages are written asynchronously — check Bifrost logs for `kafka plugin: failed to write trace ...` errors.
* Confirm the plugin entry has `"enabled": true`.
* Messages may be buffered for up to `flush_interval_ms` milliseconds (default 1000ms) before being flushed.

***

## Next steps

* **[OpenTelemetry](./otel)** - Send traces to Grafana, Datadog, New Relic, and other OTLP backends
* **[Prometheus](./prometheus)** - Expose metrics for scraping or push to a Prometheus stack
* **[Built-in observability](./default)** - Query logs directly from the Bifrost dashboard
