How to Create Roll-ups in Apache Druid

Neil Buesing
Author
December 22, 2021
Date

Introduction

If you are not familiar with Apache Druid Rollups, I recommend you check out the documentation and my earlier article on Seeking the Perfect Apache Druid Rollup. Rollups are a powerful feature of Apache Druid, and understanding how they work gives you the power to use them effectively.

These examples are to showcase roll-ups and understand how they relate to other features of Apache Druid, real-time Kafka ingestion, data-sketches, and query granularity.

Setup

The use case for these examples are for a point-of-sale setup tracking orders based on the rewards program of the customer that made the order, and the zip code of the store where the order was purchased. The metrics are number of orders and the number of items purchased.

Orders are generated by a Kafka connector called Datagen [1] at a rate of about 500 orders/second. This includes 100,000 unique customers and 10,000 unique stores. Each user has 1 of 6 reward classifications, and each store has 1 of 300 unique zip codes. A random quantity for each order is from 1 to 10 items purchased. A 4-character CODE is attached to the order; a highly unique number with some duplication.

An order is associated with users and stores by ksqlDB, which does the stream processing. The output of ksqlDB is ingestion into Apache Druid with the Kafka ingestion tasks. The Kafka topic has multiple partitions allowing for concurrent ingestion into Druid.

Example - Rollup Comparison

This example showcases how the composition of dimensions impacts rollup effectiveness.
It captures USER_REWARD and STORE_ZIPCODE in separate datasources as well as both together in a single datasource.

Datasource Structure

ORDERS - Non-rolled data source acting as the control

ORDERS_USER_REWARD - only contains 1 dimension, USER_REWARD

ORDERS_STORE_ZIPCODE - only contains 1 dimension, STORE_ZIPCODE

ORDERS_USER_REWARD_AND_STORE_ZIP_CODE - contains both dimensions

It is simple to measure the performance of rollup. Use aggregate COUNT to determine the physical records and SUM of the rollup “COUNT” to get the logical count.

The rollup effectiveness of each 4 data sources is as follows:

Because the dimensions are finite (7 user rewards and 300 postal codes) and there are many orders with these combinations being generated, the observed measures align with the mathematical calculations.

7 user rewards * 16 minutes = 112 expected records
300 zip codes * 16 minutes = 5,100 records
7 user rewards * 300 zip codes * 16 minutes = 33,600 records

Knowing how to measure observed rollup effectiveness along with mathematical calculations help to plan for additional store impact as the model changes.

Example - Kafka Ingestion

Kafka producers partition data by message key, when provided. Here we are going to see how to leverage that to improve rollup performance.

Kafka is designed for high throughput. Kafka achieves this with partitioning topics and allowing for multiple consumer instances to share in the consumption of those messages by each consumer allocated to a subset of partitions. Usually, consumers leverage topic subscription, where the client library manages partitions for consumers. Druid, however, uses topic assignment and the application (Druid Supervisor) assigns partitions for the consumers. Using assignment ensures the same task always reads from the same assigned partitions.

We have 2 topic—the original ORDERS_ENRICHED topic where the message key is ORDER_ID, a dimension that is not being stored in the Druid data source. The second topic, ORDERS_ENRICHED_REKEYED has a message key of USER_REWARD, which is a dimension being captured in the Druid data source.

Here are example messages within the Kafka topics.

ORDERS_ENRICHED

Two messages each with a user reward of BRONZE end up in separate partitions.

ORDERS_ENRICHED_REKEYED

Two BRONZE events produced to the same partition.

With 4 partitions, the Kafka ingestion specification is set up with 4 tasks. It doesn’t have to be a 1:1 match, but you cannot have more tasks than the number of partitions.

By doing a non-aggregate query to each data source, we see that multiple rollups for USER_REWARD exist for the first dataset, while only a single rollup exists in the second dataset.

ORDERS_USER_REWARD

ORDERS_USER_REWARD_REKEYED

With the sum of the counts identical, they both represent the same aggregation, but the first requires the data from 4 segments to be combined at query time. The second data source was able to perform the complete rollup at time of ingestion, resulting in a 4x physical row reduction storage reduction.

This performance improvement is done with no additional logic, processing, CPU cycles, or memory requirements by Druid, as it leverages the architectural design of Apache Kafka to achieve this rollup effectiveness boost.

Example - Sketches

Traditionally, unique counts on high volume attributes break rollup effectiveness. The capture of all the values prevents rows from being rolled up. Data sketches and acceptancing that an approximate unique count is sufficient, allows unique counts to be obtained without impacting rollup effectiveness.

Apache Druid leverages the Apache DataSketches project with the DataSketches extension. Sketches do approximations, and sketches that support the union, such as the HLL and Theta sketches work well for uniqueness. The extension introduces APPROX_COUNT_DISTINCT_DS_THETA and APPROX_COUNT_DISTINCT_DS_HLL query aggregations to capture their approximations at query time. When deciding on a sketch to use, it is important to understand its accuracy and storage impact. If rollups are rare, adding a sketch could actually increase storage.

The metric specification is simple to add a sketch, each has a unique metric type, and different attributes for settings exist, based on the type selected.

A theta sketch unique count metric

A HLL sketch unique count metric

With around 35 minutes of data and 5 minute rollups, the following data was calculated. The actual uniqueness of the 4 character CODE is 438,435. As part of the data setup the CODE represents a highly unique (but not always unique) value.

In this example of these sketches, a theta with a k=32 results in the exact same approximation as a k=16384. With theta, the k value is the number of unformed hashed values kept within the sketch. While keeping 32 values adds accuracy over keeping 16 values, the 16384 k-value adds no additional accuracy while adding a lot of storage.

Dimensions: USER_REWARD and STORE_ZIPCODE

Query for the non-sketch datasource

Query for the sketch datasources

When using the Druid SQL Console, be sure to select the “...” in the Run button and disable “Use approximate COUNT(DISTINCT)” and “Use approximate TopN” when trying to compare sketch effectiveness against actual distinct count.

Example - Custom Time Granularity

This example is a “because I can” example. I find this useful to demonstrate the ingestion life-cycle of Apache Druid. During ingestion, timestamp specification happens before transformation. That means you can modify (or even replace) the __time value as any other field that can be transformed. The rollup of __time, however, happens during metrics and ingestion, so by rounding __time in a transformation, it would impact operations that benefit from that specific time information (e.g. stringFirst and stringLast).

In the specification, if the minute of the current time is odd, truncate to the nearest second, if the minute of the current time is even, truncate to the nearest 5 second.

Within the ingestion specification, it would appear as the following.

Query the datasource to see the __time values within the data source.

There are 1 second rollups during the odd minute, and 5 second rollups during the even minute.

Because the granularity specification doesn’t roll the time until it is physically stored, it is best to do rollups there. Simple and duration based query granularity are easy to use and effective for all most scenarios. I find the use of transformation with __time used more when the input data doesn’t have the timestamp stored within a single column (join date and time fields as a timestamp).

Conclusion

If you are interested in seeing any of these examples for yourself, check out the druid_rollup_tutorial demonstration. It attempts to provide a turn-key environment to showcase the above examples. See the README for details.

The setup for all the examples is the same which is to start up the docker compose and then run the setup.sh script. Each example is a demo_{name}.sh script where you start, query, and shutdown.

References

Ready for faster dashboards?

Try for free today.