How to Create Roll-ups in Apache Druid
Neil Buesing
Neil Buesing
Neil Buesing
December 22, 2021


If you are not familiar with Apache Druid Rollups, I recommend you check out the documentation and my earlier article on Seeking the Perfect Apache Druid Rollup. Rollups are a powerful feature of Apache Druid, and understanding how they work gives you the power to use them effectively.

These examples are to showcase roll-ups and understand how they relate to other features of Apache Druid, real-time Kafka ingestion, data-sketches, and query granularity.


The use case for these examples are for a point-of-sale setup tracking orders based on the rewards program of the customer that made the order, and the zip code of the store where the order was purchased. The metrics are number of orders and the number of items purchased.

Orders are generated by a Kafka connector called Datagen [1] at a rate of about 500 orders/second. This includes 100,000 unique customers and 10,000 unique stores. Each user has 1 of 6 reward classifications, and each store has 1 of 300 unique zip codes. A random quantity for each order is from 1 to 10 items purchased. A 4-character CODE is attached to the order; a highly unique number with some duplication.

An order is associated with users and stores by ksqlDB, which does the stream processing. The output of ksqlDB is ingestion into Apache Druid with the Kafka ingestion tasks. The Kafka topic has multiple partitions allowing for concurrent ingestion into Druid.

Example - Rollup Comparison

This example showcases how the composition of dimensions impacts rollup effectiveness.
It captures USER_REWARD and STORE_ZIPCODE in separate datasources as well as both together in a single datasource.

Datasource Structure

ORDERS - Non-rolled data source acting as the control

01:02:00.000Z GOLD 55511 1 3
01:02:00.000Z GOLD 55511 1 5
01:02:00.000Z GOLD 55600 1 4
01:02:00.000Z BRONZE 55600 1 8

ORDERS_USER_REWARD - only contains 1 dimension, USER_REWARD

01:02:00.000Z GOLD 3 12
01:02:00.000Z BRONZE 1 8

ORDERS_STORE_ZIPCODE - only contains 1 dimension, STORE_ZIPCODE

01:02:00.000Z 55511 2 8
01:02:00.000Z 55600 2 12

ORDERS_USER_REWARD_AND_STORE_ZIP_CODE - contains both dimensions

01:02:00.000Z GOLD 55511 2 8
01:02:00.000Z GOLD 55600 1 4
01:02:00.000Z BRONZE 55600 1 8

It is simple to measure the performance of rollup. Use aggregate COUNT to determine the physical records and SUM of the rollup “COUNT” to get the logical count.

SELECT 'datasource'                   "Datasource",
  SUM("COUNT")                       "Logical Count",
  COUNT("COUNT")                     "Physical Count",
  SUM("COUNT")/(COUNT("COUNT")*1.0)  "Rollup Factor"
FROM datasource

The rollup effectiveness of each 4 data sources is as follows:

Datasource Logical Count Physical Count Rollup Factor
orders (non-rolled datasource) 1,099,821 1,099,821 1.00
orders_user_reward 1,099,821 108 10,183.52
orders_store_zipcode 1,099,821 5,400 203.67
orders_user_reward_store_zipcode 1,099,821 33,600 32.73

Because the dimensions are finite (7 user rewards and 300 postal codes) and there are many orders with these combinations being generated, the observed measures align with the mathematical calculations.

7 user rewards * 16 minutes = 112 expected records
300 zip codes * 16 minutes = 5,100 records
7 user rewards * 300 zip codes * 16 minutes = 33,600 records

Knowing how to measure observed rollup effectiveness along with mathematical calculations help to plan for additional store impact as the model changes.

Example - Kafka Ingestion

Kafka producers partition data by message key, when provided. Here we are going to see how to leverage that to improve rollup performance.

Kafka is designed for high throughput. Kafka achieves this with partitioning topics and allowing for multiple consumer instances to share in the consumption of those messages by each consumer allocated to a subset of partitions. Usually, consumers leverage topic subscription, where the client library manages partitions for consumers. Druid, however, uses topic assignment and the application (Druid Supervisor) assigns partitions for the consumers. Using assignment ensures the same task always reads from the same assigned partitions.

We have 2 topic—the original ORDERS_ENRICHED topic where the message key is ORDER_ID, a dimension that is not being stored in the Druid data source. The second topic, ORDERS_ENRICHED_REKEYED has a message key of USER_REWARD, which is a dimension being captured in the Druid data source.

Here are example messages within the Kafka topics.


partition key value
3 83 {"TS":1639876924374,"ID":83,"USER_REWARD":"BRONZE","QTY":8}
2 129 {"TS":1639876924695,"D":129,"USER_REWARD":"BRONZE","QTY":2}

Two messages each with a user reward of BRONZE end up in separate partitions.


partition key value
2 BRONZE {"TS":1639876963405,"ID":448,"USER_REWARD":"BRONZE","QTY":8}
1 PLATINUM {"TS":1639876987990,"ID":449,"USER_REWARD":"PLATINUM","QTY":7}
2 BRONZE {"TS":1639876964001,"ID":451,"USER_REWARD":"BRONZE","QTY":3}

Two BRONZE events produced to the same partition.

With 4 partitions, the Kafka ingestion specification is set up with 4 tasks. It doesn’t have to be a 1:1 match, but you cannot have more tasks than the number of partitions.

By doing a non-aggregate query to each data source, we see that multiple rollups for USER_REWARD exist for the first dataset, while only a single rollup exists in the second dataset.

SELECT __time,
FROM datasource


2021-12-19T01:35:00.000Z DIAMOND 2,837
2021-12-19T01:35:00.000Z DIAMOND 2,869
2021-12-19T01:35:00.000Z DIAMOND 2,828
2021-12-19T01:35:00.000Z DIAMOND 2,815


2021-12-19T01:35:00.000Z DIAMOND 11,349

With the sum of the counts identical, they both represent the same aggregation, but the first requires the data from 4 segments to be combined at query time. The second data source was able to perform the complete rollup at time of ingestion, resulting in a 4x physical row reduction storage reduction.

This performance improvement is done with no additional logic, processing, CPU cycles, or memory requirements by Druid, as it leverages the architectural design of Apache Kafka to achieve this rollup effectiveness boost.

Example - Sketches

Traditionally, unique counts on high volume attributes break rollup effectiveness. The capture of all the values prevents rows from being rolled up. Data sketches and acceptancing that an approximate unique count is sufficient, allows unique counts to be obtained without impacting rollup effectiveness.

Apache Druid leverages the Apache DataSketches project with the DataSketches extension. Sketches do approximations, and sketches that support the union, such as the HLL and Theta sketches work well for uniqueness. The extension introduces APPROX_COUNT_DISTINCT_DS_THETA and APPROX_COUNT_DISTINCT_DS_HLL query aggregations to capture their approximations at query time. When deciding on a sketch to use, it is important to understand its accuracy and storage impact. If rollups are rare, adding a sketch could actually increase storage.

The metric specification is simple to add a sketch, each has a unique metric type, and different attributes for settings exist, based on the type selected.

A theta sketch unique count metric

  "type" : "thetaSketch",
  "name" : "UNIQUE_CODES",
  "fieldName" : "CODE",
  "size" : "32"

A HLL sketch unique count metric

  "type" : "HLLSketchBuild",
  "name" : "UNIQUE_CODES",
  "fieldName" : "CODE",
  "lgK" : 12,
  "tgtHllType" : "HLL_8"

With around 35 minutes of data and 5 minute rollups, the following data was calculated. The actual uniqueness of the 4 character CODE is 438,435. As part of the data setup the CODE represents a highly unique (but not always unique) value.

In this example of these sketches, a theta with a k=32 results in the exact same approximation as a k=16384. With theta, the k value is the number of unformed hashed values kept within the sketch. While keeping 32 values adds accuracy over keeping 16 values, the 16384 k-value adds no additional accuracy while adding a lot of storage.


Dataset Count Rollup Factor Unique Codes Row Size (Avg) Storage (KB)
w/out sketch 1,456,025 1.00015 438,435 29 bytes 16,060
theta (16) 1,456,025 134.88 439,229 153 bytes 1,368
theta (32) 1,456,025 134.88 438,600 252 bytes 2,412
theta (16384) 1,456,025 134.88 438,600 1,107 bytes 11,496
hll_4 (4) 1,456,025 134.88 361,278 60 bytes 304
hll_4 (12) 1,456,025 134.88 439,669 564 bytes 5,564
hll_8 (12) 1,456,025 134.88 439,714 564 bytes 5,564

Query for the non-sketch datasource

select 'datasource' "Datasource",
  TIME_FLOOR(__time, 'PT1H'),
  ROUND(SUM("COUNT")/(COUNT("COUNT")*1.0), 2) "Rollup Factor",
  count(distinct CODE) "UNIQUE CODES"
from datasource
group by 1,2

Query for the sketch datasources

select 'datasource' "Datasource",
  TIME_FLOOR(__time, 'PT1H'),
  ROUND(SUM("COUNT")/(COUNT("COUNT")*1.0), 2) "Rollup Factor",
from datasource
group by 1,2

When using the Druid SQL Console, be sure to select the “...” in the Run button and disable “Use approximate COUNT(DISTINCT)” and “Use approximate TopN” when trying to compare sketch effectiveness against actual distinct count.

Example - Custom Time Granularity

This example is a “because I can” example. I find this useful to demonstrate the ingestion life-cycle of Apache Druid. During ingestion, timestamp specification happens before transformation. That means you can modify (or even replace) the __time value as any other field that can be transformed. The rollup of __time, however, happens during metrics and ingestion, so by rounding __time in a transformation, it would impact operations that benefit from that specific time information (e.g. stringFirst and stringLast).

In the specification, if the minute of the current time is odd, truncate to the nearest second, if the minute of the current time is even, truncate to the nearest 5 second.

  timestamp_extract(__time, 'MINUTE') % 2,
  __time / 1000 * 1000,
  __time / 5000 * 5000)

Within the ingestion specification, it would appear as the following.

"transformSpec": {
  "transforms": [
      "type": "expression",
      "name": "__time",
      "expression": "if(timestamp_extract(\"__time\", 'MINUTE') % 2, \"__time\" / 1000 * 1000, \"__time\" / 5000 * 5000)"

Query the datasource to see the __time values within the data source.

  sum("COUNT") "Logical Count"
  from orders_custom
where __time >= TIMESTAMP '2021-12-19 18:19:57'
  and __time <= TIMESTAMP '2021-12-19 18:20:10'
group by 1

There are 1 second rollups during the odd minute, and 5 second rollups during the even minute.

__time Logical Count
2021-12-19T18:19:57.000Z 1,150
2021-12-19T18:19:58.000Z 1,120
2021-12-19T18:19:59.000Z 1,105
2021-12-19T18:20:00.000Z 5,724
2021-12-19T18:20:05.000Z 4,780
2021-12-19T18:20:10.000Z 4,907

Because the granularity specification doesn’t roll the time until it is physically stored, it is best to do rollups there. Simple and duration based query granularity are easy to use and effective for all most scenarios. I find the use of transformation with __time used more when the input data doesn’t have the timestamp stored within a single column (join date and time fields as a timestamp).


If you are interested in seeing any of these examples for yourself, check out the druid_rollup_tutorial demonstration. It attempts to provide a turn-key environment to showcase the above examples. See the README for details.

The setup for all the examples is the same which is to start up the docker compose and then run the script. Each example is a demo_{name}.sh script where you start, query, and shutdown.


[1] Data generated with DataGen which generates testing data with templates and rules.

Ready to see Rill in action? Contact us