Seeking the Perfect Apache Druid Rollup

Neil Buesing
Author
December 16, 2021
Date

Introduction

The goal of an Apache Druid rollup is simple—compress multiple entries into a single physical row where facts (dimensions) are identical and aggregations (metrics) can be pre-computed. By leveraging rollups, the amount of storage for the data-source decreases and query performance improves. To use them effectively and avoid mistakes, let’s walk through eight important concepts.

Concepts

1 - Get Your Facts Together

Rollups combine rows that have all identical facts. By omitting unneeded facts and generalizing facts when you can, the rollup potential improves. For example, an area code is more likely to be rolled up than a complete phone number. Druid provides transformation, which makes it easy to apply generalizations.

2 - It’s About Time

For rollup considerations, treat the timestamp as just another fact. Two rows with different timestamps, even if all other dimensions are the same, will not be rolled. Reducing the granularity of the timestamp increases the opportunities for the data to be rolled up. Apache Druid provides a rich set of granularity options for you; common settings include second, minute, and hour. See Druid documentation on simple granularities for the complete list.

Duration-based granularities also exist (even though they are currently not presented as part of the UI). The following specification creates a 6 second rollup.

If these options are not right for your use-case, there are options still available. See section 6 - Advanced Time Granularity below for details.

3 - Not All Aggregations Can be Rolled

Rollups are pre-computed aggregations. The aggregations of count, sum, min, and max are all available at time of ingestion; whereas, avg is not. Not being able to pre-compute average, however, is not an issue, since you can pre-compute count and sum and compute the actual average at query time.

The positional numerical aggregates of first (keyword of earliest in SQL) and last (keyword of latest in SQL) are not available at ingestion. For ingestion scenarios that result in multiple file segments for the same time-period (e.g., parallelize ingestion of real-time data, late arriving data), there is no guarantee that first or last would be accurate. Also, sometimes the ingestion processing of these aggregations w/out warning, and other times it will abort with an invalid aggregation error.

Druid, however, has specialized aggregation for the first and last of strings, which can be used during ingestion. The aggregate will preserve and store the timestamp within the aggregate. By returning a non-aggregate query result of these aggregates, the specialized aggregate is revealed. The returned JSON reveals the actual timestamp of the aggregate value as lhs (left-hand side), and the actual value of the aggregate is in the rhs (right-hand side) element.

Metric Specification:

SQL Query with aggregation:

SQL Query without aggregation (for demonstration):

Notice the lhs element is the parsed timestamp and not the rolled timestamp.

4 - You Can Count on it

If counting records is important, and it usually is, you need to have a count ingestion metric. This is incremented for every row represented in that rolled record. However, at query time, be sure to use sum on the count metric, not count; unless you are actually interested in the physical row count.

With a data-source created with a count metric called “COUNT”, you can see rollup effectiveness as follows. This query is also a reminder of how rollups work.

5 - Sketches are anything but Sketchy

Druid leverages the Apache DataSketches project to add a solution to problems that typically require high-cardinality. Traditionally, the unique data is kept with the record, which dramatically reduces rollups. Sketches allow for the ability to capture an approximation of uniqueness without having to increase any cardinality to the data-source. The storage size of each row will increase; there are settings available to balance accuracy and storage; see DataSketches extension.

The theta sketch is about uniform hashing, keeping the smallest distinct hashed values based on the K factor, and then measuring what that count would be if the discarded values (> K) were evenly distributed. Meaning if there were 2 values within [0.0000, 0.4218] statistically there would be 2.73 values within (0.4218, 1.0000] giving a unique total of 4.73. The higher the K factor, the more accurate the estimate (but also the more storage needed for the theta sketch). The Better Estimator example in the Apache DataSketch documentation is a great place to start if you want to explore the theta sketch in more detail. Additional sketches are available as well.

The importance of using them is ensuring the approximation and additional row storage is acceptable.  

When the granularity options of Druid are not what you need; you can perform custom transformations on the __time dimension by modifying it within a transformation. In the granularitySpec, enable rollup without a queryGranularity, since granularity adjustments to __time are now done with a transformation.

Using an expression transformation, perform a custom transformation to the __time element. For example, to create a 1/10th of a minute (every 6 seconds) granularity with the timestamp as the middle of the time period, not the beginning, try this:

However, this affects positional operations. By showing the stored aggregation of stringLast, as earlier, the stored lhs timestamp is the changed __time value, not the original.

It’s preferred to let Druid do your granularity adjustment when possible. Thus, the duration-based rollup shown earlier would be the preferred method for non-specific rollup durations, but this does show how transformations can be leveraged against the __time attribute.

7 - Best-Effort Rollups are the Best

Do not design your queries assuming rollups are perfect. For real-time ingestion, rollups are not perfect, making it easier to handle late arriving events and parallel real-time ingestion of topic partitions.

The design decision to only support best-effort rollups for real-time streaming is a good thing. Apache Druid consumes Kafka topics at scale, by leveraging Kafka partitioning and by manually assigning partitions to the task workers. This keeps Kafka partitions within a segment, giving Druid the throughput needed to handle large amounts of data coming from Kafka. To avoid duplicate rollups between segments, leverage the common practice of producing a meaningful key for the Kafka message. If the Kafka producer keys the message with a dimension, all those messages are processed by the same task worker, making best-effort rollups nearly perfect.

8 - Compaction

Late arriving messages or many messages over a short period can lead to multiple segments. While data sources are typically configured to minimize this, it happens. Druid handles this by leveraging compaction. In addition to removing the number of segments for a time period, it also will perform additional rollups.

Conclusion

Data rollup is a powerful feature of Apache Druid. Understanding how it works will allow you to increase its effectiveness, avoid mistakes, and understand the cost when additional facts are added to a data-source.

The combination of time granularity, data sketches, best-effort rollup at ingestion, and using an internal Kafka client with manual partition assignment gives Apache Druid the scalability and performance needed to leverage rollups to their full potential.

Ready for faster dashboards?

Try for free today.