Project Metamorphosis: Unveiling the next-gen event streaming platformLearn More

How Real-Time Stream Processing Works with ksqlDB, Animated

ksqlDB, the event streaming database, is becoming one of the most popular ways to work with Apache Kafka®. Every day, we answer many questions about the project, but here’s a question with an answer that we are always trying to improve: How does ksqlDB work?

The mechanics behind stream processing can be challenging to grasp. The concepts are abstract, and many of them involve motion—two things that are hard for the mind’s eye to visualize. Let’s pop open the hood of ksqlDB to explore its essential concepts, how each works, and how it all relates to Kafka.

If you like, you can follow along by executing the example code yourself. ksqlDB’s quickstart makes it easy to get up and running.

Declaring a stream

Stream processing is a programming paradigm for computing over events as they arrive. But where do those events come from? In Kafka, you store a collection of events in a topic. Each event can contain any raw bytes that you want. In ksqlDB, you store events in a stream. A stream is a topic with a strongly defined schema. You declare it like this:

CREATE STREAM readings (
    sensor VARCHAR KEY,
    location VARCHAR,
    reading INT
) WITH (
    kafka_topic = 'readings',
    partitions = 3,
    value_format = 'json'

When you fire off this statement from ksqlDB’s client to its server, what actually happens? If the topic that backs this stream doesn’t exist, the server issues a call to the Kafka brokers to make a new topic with the specified number of partitions. The stream metadata, like the column layout, serialization scheme, and other information, is placed into ksqlDB’s command topic, which is its internal cluster communication channel. Each ksqlDB server materializes the command topic information to a local metadata store, giving it a global catalog of objects.

A newly declared stream has no data in it:

Inserting rows

Empty collections aren’t terribly interesting. You need to write events to them to make something happen. In Kafka, you model an event as a record and put it into a topic. In ksqlDB, you model an event as a row and put it into a stream. A row is just a record with additional metadata. You insert rows like this:

INSERT INTO readings (sensor, location, reading) VALUES ('sensor-1', 'wheel', 45);
INSERT INTO readings (sensor, location, reading) VALUES ('sensor-2', 'motor', 41);
INSERT INTO readings (sensor, location, reading) VALUES ('sensor-1', 'wheel', 42);
INSERT INTO readings (sensor, location, reading) VALUES ('sensor-3', 'muffler', 42);
INSERT INTO readings (sensor, location, reading) VALUES ('sensor-3', 'muffler', 40);

INSERT INTO readings (sensor, location, reading) VALUES ('sensor-4', 'motor', 43);
INSERT INTO readings (sensor, location, reading) VALUES ('sensor-6', 'muffler', 43);
INSERT INTO readings (sensor, location, reading) VALUES ('sensor-5', 'wheel', 41);
INSERT INTO readings (sensor, location, reading) VALUES ('sensor-5', 'wheel', 42);
INSERT INTO readings (sensor, location, reading) VALUES ('sensor-4', 'motor', 41);

INSERT INTO readings (sensor, location, reading) VALUES ('sensor-7', 'muffler', 43);
INSERT INTO readings (sensor, location, reading) VALUES ('sensor-8', 'wheel', 40);
INSERT INTO readings (sensor, location, reading) VALUES ('sensor-9', 'motor', 40);
INSERT INTO readings (sensor, location, reading) VALUES ('sensor-9', 'motor', 44);
INSERT INTO readings (sensor, location, reading) VALUES ('sensor-7', 'muffler', 41);

Each time you invoke an INSERT statement, a request with the payload is sent to a ksqlDB server. The server checks that the shape of the data is coherent with respect to the stream’s schema—malformed rows are rejected. If the row’s data types are sane, the server creates a record and automatically serializes its content using the format of choice as defined in the stream’s declaration. It uses the Kafka producer client to insert that record into the backing Kafka topic. All of the stream’s data is persisted on directly on the broker. None of it lives in ksqlDB’s servers.

After the inserts complete, the stream now looks like what you see below. Hover over each row to see its contents—the data displayed describes the underlying Kafka record. Notice how the rows are ordered by offset from right to left. In the animations you’ll see below, time is depicted as flowing rightward.

Why does some of the row data end up in the key of the record and some in the value? ksqlDB superimposes a flat column abstraction on top of Kafka’s key/value model. Here’s how it works in this case.

In the declaration of the stream, sensor is qualified with the KEY keyword. That piece of syntax tells ksqlDB to look for the data for this column in the key portion of the record. The data for other columns is read from the record’s value. When ksqlDB produces the record to the underlying topic, its key content is hashed to select a partition for it to reside in. This causes all rows with the same key to be written to the same partition, which is a useful ordering guarantee.

Transforming a stream

No one ever sends data to Kafka just to let it sit there. You always want to do something with it. And most often, the data isn’t yet in the exact form that you need in order to work with it. You need to change it in some way.

The most elementary way you could do this is by writing a program that uses the Kafka producer and consumer clients. The program would read from the source topic whose data you want to change, apply a function to each record, and write the new record to the output topic. It would loop and run forever. This works, but it is rather low-level. You need to manage schemas, serializers, partitioning strategies, and other pieces of configuration.

In ksqlDB, you issue a persistent query to transform one stream into another using its SQL programming model. You derive a new stream from an existing one by selecting and manipulating columns of interest:

-- process from the beginning of each stream
set 'auto.offset.reset' = 'earliest';
    SELECT sensor,
           UCASE(location) AS location
    FROM readings

Persistent queries are little stream processing programs that run indefinitely. In this case, it continually reads rows from readings, applies the transformation logic, and writes rows to clean. You are relieved of all data janitorial work: There are no schemas to manage, no serializers to configure, no partitioning strategies to choose. But what is actually happening when you launch this query?

Each time you run a persistent query, ksqlDB’s server compiles the query’s textual representation to a physical execution plan as a Kafka Streams topology. The topology runs as a daemon, reacting to new topic records as soon as they become available. This means that all of the processing work happens on ksqlDB server; no processing work happens on the Kafka brokers. If you run ksqlDB as a cluster, the topology scales horizontally across the nodes by internally using Kafka Streams application IDs.

When everything is connected together and the data is flowing, it looks like this. Take it in for a few moments—we’ll walk through it in detail below.

What is going on here? What do the moving arrows mean? Why are those numbers changing? And what is pq1?

When a persistent query is created, it is assigned a generated name (in this case, we call it pq1). Rows are read from the stream partitions that the query selects from. As each row passes through the persistent query, the transformation logic is applied to create a new row, which is what the change of color signifies. Reading a record from Kafka does not delete it—you effectively receive a copy of it. That is why the leftmost rows remain in place, and clones of them appear to the right of each partition before they are sent to the persistent query box.

Persistent queries completely manage their own processing progression, even in the presence of faults. ksqlDB durably maintains the highest offset of each input partition. The incrementing numbers underneath the query box describe those values at each point in time. Moreover, the arrows that move from right to left on the input streams show the corresponding offsets currently being processed, giving you a spatial sense of progress. (If you’re an experienced Kafka user, note that these aren’t the committed offsets.)

Pause the animation and hover over the output rows. Notice how the column that the transformation targets has changed, while all the other columns remain intact. ksqlDB has taken care of all the bookkeeping for you.

As you watch the data flowing through the topology, you might be wondering how ksqlDB chooses which input partition it will read from next. Is it random? Is it round robin? The answer to that question is the foundation of how ksqlDB deals with out-of-order data, and it’s something that we’ll describe in a future blog post all on its own. (Spoiler: It picks the smallest timestamp available.)

Filtering rows out of a stream

Let’s look at another simple operation: filtering. Filters are used to discard rows that you do not need or want. Just like transforms, filters are specified using simple SQL syntax.

CREATE STREAM high_readings AS
    SELECT sensor, reading, location
    FROM clean
    WHERE reading > 41

When you write ksqlDB programs, you chain streams (and tables) together. You create a figurative pathway for your data to traverse, with each step in the way performing a step of processing. ksqlDB handles the mechanics of how your data is propagated through the chain.

Combining many operations into one

A crucial rule of thumb in data processing is that you should get rid of data that you don’t need as early as possible. The longer you keep irrelevant data around, the higher the cost to repeatedly store, process, and transfer it. If you use the Kafka client to process data, it is up to you to manage where each processing step takes place.

In ksqlDB, you can combine a wide range of operations into a single query. Its composable query syntax allows you to fuse discrete, yet logically conjoined operations into one.

    SELECT sensor,
           UCASE(location) AS location
    FROM readings
    WHERE reading > 41

This persistent query supplants the previous two that we wrote. It has the advantage of performing all of the computation in one physical place, discarding rows as early as possible.

Rekeying a stream

No account of data processing is complete without a discussion of data locality. When you use a distributed system, data and computation are spread over a cluster of machines, each performing a small task that adds up to a larger operation. But even though your entire data corpus is available, you often want to perform processing over some smaller slice of it. Imagine that you’re building an analytics service. You might want to see what percentage of your users who live in Philadelphia are registered to vote. To do that, you need to gather only the records of users who live in the City of Brotherly Love. But where do you gather them when the data resides in different devices?

In Kafka, partitioning controls data locality. Each partition lives in its entirety on a broker. That is why the choice of how you key your records is such a crucial one. If you use the Kafka clients to process your data, you need to be careful that you’ve set this up right. But in ksqlDB, this is just another SQL clause.

CREATE STREAM by_location AS
    SELECT *
    FROM high_pri
    PARTITION BY location

When you execute this statement, ksqlDB creates a new persistent query. It continually reads from high_pri, applies any additional logic (none in this case, as it simply selects everything), and writes a new record to by_location with a new key. The value of each key is the content of location. This has the effect of co-locating all rows with the same location in the same partition. This co-location property is essential for stateful operations like streaming join and incremental aggregations.

Observe how all circles of the same color end up on the same partition.

Processing with multiple consumers

One of the most important properties that Kafka offers is the ability for multiple consumers to read from the same topic in a conflict-free manner. Your program can read a series of records and do whatever it likes with them, and so can mine. We don’t need to agree about how to access the data before, during, or after.

ksqlDB inherits this property. Every persistent query is transparently assigned a group ID, which means that multiple persistent queries can read from the same stream. They are guaranteed to receive the same rows in the same order for each partition. But this also means that when you add nodes to your cluster and scale an individual persistent query across them, they will collectively load balance the incoming data across them. The work will be shared, and no rows will be dropped or duplicated.

Below is what it would look like if we added another persistent query (pq3, query omitted for brevity), which reads from the stream high_pri, shared by pq2. The persistent queries do not consume rows in lockstep. If pq3 consumes messages more slowly than pq2, it doesn’t inhibit pq2 in any way.

Learn more about ksqlDB

We’ve only scratched the surface of how ksqlDB works, but we’ve seen that its key constructs are concise, composable, and elegant. They offer a higher-productivity interface for working with Kafka without diluting its core concepts.

In future posts, we’ll dive into how tables, joins, scaling, fault tolerance, and time work. Each is a fascinating world in its own right. Until then, there’s no substitute for trying ksqlDB yourself.

Michael Drogalis is Confluent’s stream processing product lead, where he works on the direction and strategy behind all things compute related. Before joining Confluent, Michael served as the CEO of Distributed Masonry, a software startup that built a streaming-native data warehouse. He is also the author of several popular open source projects, most notably the Onyx Platform.

Did you like this blog post? Share it now

Subscribe to the Confluent blog

More Articles Like This

How to Manage Secrets for Confluent with Kubernetes and HashiCorp Vault

This blog post walks through an end-to-end demo that uses the Confluent Operator to deploy Confluent Platform to Kubernetes. We will deploy a connector that watches for commits to a

Keys in ksqlDB, Unlocked

One of the most highly requested enhancements to ksqlDB is here! Apache Kafka® messages may contain data in message keys as well as message values. Until now, ksqlDB could only

Announcing ksqlDB 0.15

We’re pleased to announce ksqlDB 0.15, our first release of 2021! This version adds rich support for message key columns and long-awaited improvement to interactive development with the command line