Introduction to ksqlDB
ksqlDB is a powerful open-source streaming SQL engine that makes it easy to process, analyze, and query data streams in real time. It’s built on top of Apache Kafka, the popular distributed streaming platform, and allows you to use SQL-like statements to perform a wide range of operations on data streams. With ksqlDB, you can filter, aggregate, join, and enrich streaming data in real-time and store the results of your queries in Kafka topics or external systems. It makes ksqlDB a valuable tool for various use cases, such as real-time analytics, event-driven applications, and data integration.
This article will show you how to use Docker Compose to set up a local development environment for ksqlDB. It will allow you to quickly and easily spin up a ksqlDB server and a Kafka broker, as well as a MinIO instance, to provide an S3-compatible object store.
In the end, we will have a data pipeline which looks like this:
Before you begin, make sure you have Docker and Docker Compose installed on your machine. If you don’t already have these tools, you can follow the installation instructions for Docker and Docker Compose.
Next, clone our ksqlDB playground repository from GitHub.
To start the ksqlDB stack, run the following command in the same directory as your docker-compose.yaml
The command will start all of the services.
docker compose up -d # or depending on the version of Docker docker-compose up -d
Ingest fake sensor data into the pipeline
To start the script, we need to set up a virtual environment with Python3 and install dependencies from a requirements.txt file:
# 1. Run the following command in the cloned repository to create the virtual environment: python3 -m venv .venv # 2. Activate the virtual environment by running the following command: source .venv/bin/activate # 3. Once the virtual environment is activated, you can install the dependencies from the requirements.txt file by running the following command: pip install -r requirements.txt # 4. To start the fake sensor script, run the following command: python sensor.py
Remember to activate the virtual environment every time you open a new terminal window and want to use the installed dependencies.
You can now open KafkaUI to look at the messages in the sensor-data topic.
Once the ksqlDB stack and the sensor data ingestion are up and running, you can use the ksqlDB CLI to connect to the ksqlDB server and start issuing queries. To do this, open a new terminal window and run the following command:
docker run --net host -it confluentinc/ksqldb-cli ksql
This will open the ksqlDB CLI and connect to the ksqlDB server in your terminal. Then, you can start issuing SQL-like statements to perform operations on your data streams.
To select data from our input topic, sensor-data, we need to instruct ksqlDB to create a consumer for this topic. We can do that by issuing the following command in our previously started ksql CLI:
CREATE STREAM `sensor-data` ( `temperature` DOUBLE, `humidity` DOUBLE, `pressure` DOUBLE, `sensor` VARCHAR ) WITH ( kafka_topic = 'sensor-data', value_format = 'JSON' );
This will create a queryable STREAM which we can use to create new streams and topics in the following steps.
Create the "high-temperature-alert" topic
We want to create a Kafka topic that contains messages with the sensor ID and temperature value for temperatures higher than 28.0 degrees. To do this, we will use the previously created stream and only select the sensor ID and temperature value from it. It will allow us to monitor and store data from sensors experiencing high temperatures efficiently.
Let’s execute this query in our ksql CLI:
CREATE STREAM `high-temperature-alerts` WITH ( kafka_topic = 'high-temperature-alerts', value_format = 'JSON_SR' ) AS SELECT `temperature`, `sensor` FROM `sensor-data` WHERE `temperature` > 28.0 EMIT CHANGES;
This query creates a new topic named high-temperature alerts containing Messages only with temperatures higher than 28.0 degrees. It also creates a matching JSON schema in the connected schema registry.
Make sure your sensor-data ingestor is still running, and look at the messages in the high-temperature-alerts topic in KAFKA UI.
Create the "sensor-data-berlin" topic
Now we want to create a Kafka topic containing only the messages for the location Berlin. If you look at the messages in the sensor-data topic, there is no location information. To solve this, we will create a new input topic where we can send information about the location and then create a materialized view in ksqlDB. We will then join the `sensor-data` stream with the view and then only forward messages to the Berlin topic where the location is Berlin.
CREATE STREAM `sensor-location` ( `sensor` VARCHAR, `location` VARCHAR ) WITH ( kafka_topic = 'sensor-location', partitions = 1, value_format = 'JSON' );
This statement will create a topic called sensor location. We will produce messages on this topic by using INSERT statements in ksqlDB. In a real-world scenario, you would probably have a location service that sends location information to this topic or connect a location database table via Debezium. Every time someone updates a location for a service, there will be a new message in the location topic.
To create the materialized view in ksqlDB execute the following statement:
CREATE TABLE `sensor-location-table` AS SELECT `sensor`, LATEST_BY_OFFSET(`location`, false) AS `location` FROM `sensor-location` GROUP BY `sensor`;
This will create a table inside ksqlDB containing every sensor's latest location. First, let’s produce some messages to the topic, and then have a look at our new messages in Kafka UI.
INSERT INTO `sensor-location` (`sensor`, `location`) VALUES ('7a5d49ae-d49b-47a5-9fbb-153c964165bd', 'Istanbul'); INSERT INTO `sensor-location` (`sensor`, `location`) VALUES ('312883eb-4682-4e65-813a-2e1a99954144', 'Cologne'); INSERT INTO `sensor-location` (`sensor`, `location`) VALUES ('15e14fa6-fcee-4afc-94f2-de3c84d9aa95', 'Kiel'); INSERT INTO `sensor-location` (`sensor`, `location`) VALUES ('1d6af219-4cc3-48fa-81ba-1d4923a13df9', 'London'); INSERT INTO `sensor-location` (`sensor`, `location`) VALUES ('a3e94207-de9e-414e-a0d2-0a392aecfed7', 'Corralejo');
Let’s run a SELECT statement on our newly created view:
SELECT * FROM `sensor-location-table`;
We now have a table in ksqlDB we can use to pull the location information while joining it with messages from our sensor-data topic. The view updates every time there is a new message in the sensor-location topic.
Let’s create our topic with all the measurements in Berlin:
CREATE STREAM `sensor-data-berlin` WITH ( kafka_topic = 'sensor-data-berlin', value_format = 'JSON_SR' ) AS SELECT s.`sensor`, s.`temperature` AS `temperature`, s.`humidity` AS `humidity`, s.`pressure` AS `pressure`, l.`location` AS `location`, AS_VALUE(s.`sensor`) AS `sensor` FROM `sensor-data` s JOIN `sensor-location-table` l ON s.`sensor` = l.`sensor` WHERE l.`location` = 'Berlin' EMIT CHANGES;
After some time, ksqlDB will create the sensor-data-berlin topic in Kafka, and if we open Kafka UI, we can see no messages. The reason is that no sensor is located in Berlin. So let’s update the location for two of them by producing new messages to our sensor-location topic:
INSERT INTO `sensor-location` (`sensor`, `location`) VALUES ('15e14fa6-fcee-4afc-94f2-de3c84d9aa95', 'Berlin'); INSERT INTO `sensor-location` (`sensor`, `location`) VALUES ('1d6af219-4cc3-48fa-81ba-1d4923a13df9', 'Berlin');
Now there are messages in the sensor-data-berlin topic because ksqlDB updated the materialized view, and through our JOIN, we now have measurements that match the location 'Berlin'.
Create the "sensor-data-s3" topic and write it to MinIO as parquet-files
Kafka is a powerful platform for managing data streams, and a large ecosystem of tools and components has been developed to support it. One such component is Kafka Connect, which allows importing data from various systems into Kafka or exporting them to other services and APIs using a pluggable connector architecture.
In our case, we will create a new topic in Kafka containing all measurements with location information. We will then use the S3 Sink connector integrated into ksqlDB to dump this topic to MinIO, storing the data in Parquet files in a hive-style partitioned structure. This will allow us to easily work with the data using tools like Spark or other data science tools.
Let’s create the topic:
CREATE STREAM `sensor-data-s3` WITH ( kafka_topic = 'sensor-data-s3', value_format = 'JSON_SR' ) AS SELECT s.`sensor`, s.`temperature` AS `temperature`, s.`humidity` AS `humidity`, s.`pressure` AS `pressure`, l.`location` AS `location`, AS_VALUE(s.`sensor`) AS `sensor` FROM `sensor-data` s JOIN `sensor-location-table` l ON s.`sensor` = l.`sensor` EMIT CHANGES;
Now we need to dump the data to our S3-compatible MinIO object store:
CREATE SINK CONNECTOR `s3_sink` WITH ( 'connector.class' = 'io.confluent.connect.s3.S3SinkConnector', 'tasks.max' = '1', 'topics' = 'sensor-data-s3', 's3.region' = 'us-east-1', 's3.bucket.name' = 'sink-bucket', 's3.part.size' = '5242880', 'store.url' = 'http://minio:9000', 'flush.size' = '100', 'storage.class' = 'io.confluent.connect.s3.storage.S3Storage', 'format.class' = 'io.confluent.connect.s3.format.parquet.ParquetFormat', 'partitioner.class' = 'io.confluent.connect.storage.partitioner.TimeBasedPartitioner', 'timestamp.extractor' = 'Record', 'path.format' = '''year''=YYYY/''month''=MM/''day''=dd/''hour''=HH', 'locale' = 'en-US', 'timezone' = 'UTC', 'partition.duration.ms' = '600000', 'value.converter' = 'io.confluent.connect.json.JsonSchemaConverter', 'value.converter.schema.registry.url' = 'http://schema-registry:8081' );
You can see the running connector in KAFKA UI and the created objects in the MinIO Console (Username: admin, Password: password).
This article showed you how to set up a local development environment for ksqlDB and use the ksql CLI to process and query data streams.
ksqlDB is a game-changing technology that allows you to easily and quickly process streaming data in real-time without coding or software maintenance. Instead, you can use simple SQL statements to transform, enrich, and analyze data as it arrives, enabling you to make informed and timely decisions based on the most up-to-date information. ksqlDB is highly scalable and can be deployed on-premises or in the cloud, making it a versatile solution for any organization.
If you’re interested in learning more about how ksqlDB can benefit your organization, don’t hesitate to contact us. Our team of experts would be happy to answer any questions and help you determine if ksqlDB is the right fit for your needs. We look forward to hearing from you!
ABOUT THE AUTHOR
As a Principal Infrastructure Engineer at Hivemind, Max brings over 15 years of experience in the field and is particularly passionate about working with Kafka and Kubernetes.
Max is responsible for infrastructure architecture and implementation in our projects, such as Kafka-based streaming data pipelines and deploying cloud-based systems on clouds like AWS.