Testing Your Pipeline
KSML includes a test runner that lets you verify your pipeline logic without a running Kafka broker. You write a YAML test definition that describes what data to send and what to assert, and the test runner handles the rest using Kafka's TopologyTestDriver.
How It Works
The test runner:
- Parses your KSML pipeline definition and builds a Kafka Streams topology
- Sends test messages into the topology's input topics
- Runs Python assertions against output topics and/or state stores
- Reports pass/fail results
No Kafka broker, no Schema Registry, no infrastructure required.
Test Definition Format
A test definition is a YAML file with a test root element:
test:
name: "Human-readable test name"
pipeline: path/to/pipeline.yaml
schemaDirectory: path/to/schemas # optional, for Avro schemas
produce:
- topic: input-topic-name
keyType: string # optional, defaults to "string"
valueType: "avro:SensorData" # optional, defaults to "string"
messages:
- key: "my-key"
value: { field: "value" }
timestamp: 1709200000000 # optional, epoch millis
assert:
- topic: output-topic-name
code: |
assert len(records) == 1
assert records[0]["key"] == "my-key"
Produce Blocks
Each produce block targets one input topic. You can define multiple produce blocks to feed data into different topics (e.g. for join tests).
| Field | Required | Default | Description |
|---|---|---|---|
topic |
yes | Kafka topic name | |
keyType |
no | string |
Key serialization type (e.g. string, avro:MySchema) |
valueType |
no | string |
Value serialization type |
messages |
yes | List of messages with key, value, and optional timestamp |
Assert Blocks
Each assert block runs Python code with injected variables. At least one of topic or stores must be specified.
| Field | Required | Description |
|---|---|---|
topic |
no | Output topic to read records from. Injects a records list variable |
stores |
no | List of state store names to inject as Python variables |
code |
yes | Python assertion code using assert statements |
When topic is set, records is a list of dicts with key, value, and timestamp fields.
When stores is set, each store is available as a Python variable with the same API as in pipeline functions (e.g. store.get(key)).
Example: Testing a Filter Pipeline
Let's walk through testing a pipeline that filters sensor data, keeping only sensors with color "blue".
The Pipeline
Pipeline definition: test-filter.yaml (click to expand)
# This example shows how to filter messages from a simple stream. Here we only let "blue sensors" pass and discard
# other messages after logging.
streams:
sensor_source:
topic: ksml_sensordata_avro
keyType: string
valueType: avro:SensorData
sensor_filtered:
topic: ksml_sensordata_filtered
keyType: string
valueType: avro:SensorData
functions:
sensor_is_blue:
type: predicate
code: |
if value == None:
log.warn("No value in message with key={}", key)
return False
if value["color"] != "blue":
log.warn("Unknown color: {}", value["color"])
return False
expression: True
pipelines:
filter_pipeline:
from: sensor_source
via:
- type: filter
if: sensor_is_blue
- type: peek
forEach:
code: log.info("MESSAGE ACCEPTED - key={}, value={}", key, value)
to: sensor_filtered
This pipeline reads from ksml_sensordata_avro, filters messages where the sensor color is "blue", and writes the matching messages to ksml_sensordata_filtered.
The Test
Test definition: sample-filter-test.yaml (click to expand)
test:
name: "Filter pipeline passes blue sensors"
pipeline: pipelines/test-filter.yaml
schemaDirectory: schemas
produce:
- topic: ksml_sensordata_avro
keyType: string
valueType: "avro:SensorData"
messages:
- key: "sensor-1"
value:
name: "sensor-1"
timestamp: 1000
value: "25.0"
type: "TEMPERATURE"
unit: "celsius"
color: "blue"
- key: "sensor-2"
value:
name: "sensor-2"
timestamp: 2000
value: "60.0"
type: "HUMIDITY"
unit: "percent"
color: "red"
- key: "sensor-3"
value:
name: "sensor-3"
timestamp: 3000
value: "10.0"
type: "LENGTH"
unit: "meters"
color: "blue"
assert:
- topic: ksml_sensordata_filtered
code: |
assert len(records) == 2, f"Expected 2 filtered records, got {len(records)}"
assert records[0]["key"] == "sensor-1", f"Expected key 'sensor-1', got {records[0]['key']}"
assert records[1]["key"] == "sensor-3", f"Expected key 'sensor-3', got {records[1]['key']}"
The test sends three sensor messages (two blue, one red) and asserts that only the two blue sensors appear in the output topic.
Running Tests with Docker
The KSML Docker image includes the test runner at /opt/ksml/ksml-test.jar. Mount your test files and override the entrypoint:
docker run --rm \
-v ./my-tests:/tests \
--entrypoint java \
axual/ksml:latest \
-Djava.security.manager=allow -jar /opt/ksml/ksml-test.jar \
/tests/my-test.yaml
You can pass multiple test files:
docker run --rm \
-v ./my-tests:/tests \
--entrypoint java \
axual/ksml:latest \
-Djava.security.manager=allow -jar /opt/ksml/ksml-test.jar \
/tests/filter-test.yaml /tests/join-test.yaml /tests/store-test.yaml
Example Output
The exit code is 0 when all tests pass, 1 otherwise. This makes it easy to integrate into CI/CD pipelines.
Writing Assertions
Assertions use Python's assert statement. Some common patterns:
Check record count
Check specific record values
Check timestamps
Check state store contents
# With stores: [my_store] in the assert block
value = my_store.get("sensor-1")
assert value is not None, "Expected sensor-1 in store"
assert value["temperature"] == "25.0"
Schema Validation for Test Files
A JSON Schema is available for test definition files at docs/ksml-test-spec.json. See the Schema Validation page for instructions on setting up editor auto-completion and validation.
Logging
The test runner ships with a default Logback configuration that keeps output quiet: WARN for everything, INFO for the test runner itself so you still see the Running test: ... progress lines and the final results table.
To get verbose output for one run, point Logback at a custom logback configuration file at invocation time: