Fast Path to Streaming Data Analysis
Geospatial Tutorial
Neil Buesing
Neil Buesing
Neil Buesing
on
October 12, 2021

Introduction

While standing up event-driven services is challenging in its own right, many companies struggle to go from streaming data directly to analysis - adding significant latency and limiting potential use-cases. Often, existing architecture (event streaming service > data warehouse/lake > BI tool) is a barrier to achieving those results.

At Rill Data, we work with clients to create interactive operational analytics on their event streams: sub-second online analytical processing (OLAP) for their business to answer questions in real-time.  We have found challenges to analyzing streaming data result from two issues: inefficient data modeling and low-performance processing. Our platform provides both modeling capabilities to put the right data in the hands of the right teams at the right time (while assessing cost/benefit trade-offs) and a processing engine to empower individuals to analyze massive amounts of data in real-time—either via our own easy Explore UI or full integration into BI tools such as Tableau or Looker. When data enrichment is needed for analytics, a platform like ksqlDB pairs nicely with Apache Druid. 

For this blog, we will focus on the performance challenge with an example of taking streaming data into Rill’s platform for sub-second analysis. To get a sense for bridging the real-time gap, we selected aircraft telemetry data to answer time-series analytical questions. Our goal is to show:

  • Ease of set-up from streaming service to analysis via Rill 
  • ksqlDB basics for enriching Kafka streams, including user-defined functions
  • Illustrative analyses common to our streaming clients that highlight the difference between business intelligence and operational intelligence use cases

Technology Stack

When working with clients, the exact technology stack depends on the client with a few common elements of a streaming service, database platform and analysis tool. For this particular use-case, we leverage these technologies:

OpenSky and Rill Data real time data demo


The Questions

The questions for today’s exercise using the flight data are:

  • How many aircraft are over a given region at a given minute?
  • How many airplanes have been in the air for less than 10,000 meters at that given minute?

While utilizing test data for this tutorial, these questions are similar to many logistics and marketplace optimization use cases including:

  • Forecasting models to improve fleet utilization and/or personnel 
  • Cost analysis across and within regions
  • Real-time allocation of resources (e.g. marketing spend, supply chain adjustments)
  • Dynamic pricing updates

The Data

To answer these questions, we are using OpenSky Network information. They provide point-in-time information on aircraft as obtained through service through their research. 

Bringing up OpenSky: A large-scale ADS-B sensor network for research, Matthias Schäfer, Martin Strohmeier, Vincent Lenders, Ivan Martinovic, & Matthias Wilhelm. ACM/IEEE International Conference on Information Processing in Sensor Networks, April 2014

As part of our problem domain, the questions cannot be answered directly from this data, so we need to enhance it. For this, we are going to use ksqlDB. ksqlDB is a rich ecosystem that provides a SQL-like syntax to Kafka Streams. Also, It is very extensible when needed, and we will be taking advantage of that extensibility for this particular use case.

The Exercise

Step 1: Get the Data Into Kafka Connect

Getting OpenSky Network aircraft information data into Kafka is easy as there is an open-source connector (I should know since I wrote it a few years ago!). 

First, deploy the connector to pull the data into a Kafka topic. Here we will make a few structural changes, leveraging Single Message Transforms (SMT).

{
  "connector.class": "com.github.nbuesing.kafka.connect.opensky.OpenSkySourceConnector",
  "tasks.max": "1",
  "topic": "flightdata",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter.schemas.enable": "false",
  "interval": "60",
  "transforms": "flatten,rename",
  "transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
  "transforms.flatten.delimiter": "_",
  "transforms.rename.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
  "transforms.rename.renames": "location_lat:latitude,location_lon:longitude"
}

Step 2: Get the Data Into ksqlDB

The OpenSky Network provides 17 attributes of collected state vectors. We will bring that data into ksqlDB to prepare for enhancements.

To move the data, we define the datatype for the JSON elements on the topic and select only the fields of interest.

create stream FLIGHTS (
  id varchar KEY,
  callsign varchar,
  timePosition bigint,
  lastContact bigint,
  latitude double,
  longitude double,
  barometricAltitude double,
  geometricAltitude double,
  onGround boolean,
  velocity double
) with (kafka_topic='flightdata', value_format='json', timestamp='lastContact');

That is all that is needed to get the raw JSON data captured by the connector into ksqlDB.

Step 3: Enhance ksqlDB with User-Defined Functions

ksqlDB has a rich set of functions covering a wide array of basic transformations. However, this use-case needs some specific functionality not available in the base installation, so we will need to write our own. 

The user-defined functions (UDFs) demonstrated below are available in the ksqldb-udf-geospatial GitHub repository and can be deployed on the client’s ksqlDB server. 

The following functions are used for this demonstration.

geopoint

This user-defined function takes a set of 4 fields and groups them as a structure. Since aggregates can only take a single argument, this makes the distance function possible.

See GeoPointUdf for the implementation. It captures the latitude, longitude, altitude, and time of the collection.

geopoint_segment_distance

This user-defined aggregate keeps a calculator of distance traveled. The structure stored is distance and latest geopoint. The emitted value is distance. ksqlDB provides a great abstraction in the aggregate function to allow the data that is needed to be stored to perform the aggregate is more (or different) than the returned data. This enhancement that has been part of ksqlDB for nearly two years now (since 0.6.0) made user-defined aggregate functions extremely powerful.

See GeoPointSegmentDistanceUdaf for the implementation. Also, check out the GeoPointSegmentUdaf aggregate function which allows for generating a complete path of the aircraft.

nearest_city

This UDF loads a small database of city data into a KdTree for distance lookup. The information obtained is the city name, country code, and region code (state/province). The country and region codes are of the ISO 3166-2 standard. Using this standard makes it easier to be displayed in Superset since that same standard is used for their country map display.

See NearestCityUdf for the implementation.

Step 4: Enrich with ksqlDB​​

With the FLIGHTS stream available within ksqlDB and the user-defined functions, all the needed enrichment to answer the analytical questions is done with a single table. 

create table FLIGHTS_SUMMARY as
  select
     id,
     latest_by_offset(callsign) callsign,
     max(timeposition) timeposition,
     max(lastcontact) lastcontact,
     latest_by_offset(velocity) velocity,
     latest_by_offset(latitude) latitude,
     latest_by_offset(longitude) longitude,
     latest_by_offset(geometricAltitude) altitude,
     geopoint_segment_distance(geopoint(latitude, longitude, geometricaltitude, timeposition)) distance,
     latest_by_offset(nearest_city(latitude, longitude)->iso) nearest_region
  from flights
  window session(30 minutes)
  where onGround = false
  group by id;


We leverage session windows to separate back-to-back flights. When an aircraft position is not updated within 30 minutes, the session window will close.  Using “onGround = false” ensures that sessions are not kept alive if updates do occur for landed aircraft. The nearest_city UDF allows us to get the region we are over (e.g., Minnesota, United States), and ​​geopoint_segment_distance provides the current distance calculation for the flight. 

Using the KSQL CLI is a great way to validate that everything is looking as expected.  The following KSQL query shows the streaming of events to the table.

ksql> select id, callsign, timeposition, distance, nearest_region from FLIGHTS_SUMMARY
  where distance > 0
    and nearest_region like 'US-%'
  emit changes limit 5;

+-------+---------+--------------+-------------------+---------------+
|ID     |CALLSIGN |TIMEPOSITION  |DISTANCE           |NEAREST_REGION |
+-------+---------+--------------+-------------------+---------------+
|a21b74 |MLN235   |1631466509000 |16843.32162744286  |US-NY          |
|aa56d8 |UAL674   |1631466509000 |15837.892668911063 |US-GA          |
|a6fe41 |N55SC    |1631466502000 |13107.356498930902 |US-NM          |
|aa56b8 |UAL1138  |1631466509000 |14645.780565564006 |US-CA          |
|ac2320 |N881SD   |1631466509000 |399.42843220333845 |US-CA          |

Step 5: Capture data for OLAP queries

Now that our event data is properly streaming, we can move into data capture for querying. Apache Druid is a perfect time-series database for this type of data. 

First, the configuration spec for Druid needs to be created and deployed through a RESTful API.

Configure Schema validates that data parses by sampling the topic, which is then used to help build the ingestion process into Druid.

 


For this demonstration, all fields are dimensions, adding in a count metric. Apache Druid classifies strings as dimensions and numeric values as metrics, and for this use case, all fields are dimensions. 

Call out: Be sure to set or validate data types when creating your schema

See Schema Design Tips for understanding how Druid handles the ingestion of data.

Step 6: Visualization

Apache Druid provides JDBC integration, allowing Tableau, Looker, Superset, Turnilo, and other visualization tools to integrate with OLAP.  At Rill Data, your Druid instance integrates via standard connectors to each of these platforms. For this tutorial, we chose Superset as another open-source option.

Apache Superset has many options for displaying data, including many built with deck.gl. Connection is done with a JDBC connection, such as druid://druid-router:8888/druid/v2/sql.  Connecting the data sources is as simple as mapping a druid data source to a dataset within Superset.



The Country Based Visualizer will zoom the map when a region is selected, as shown. It is a great way to see a specific value. Additionally, Superset provides the ability to view the SQL.



For the sake of real-time aircraft information, I wanted a dynamic time range based on the latest minute (or near the latest minute).  I found the following statement does the trick.

DATEADD(DATETRUNC(DATETIME(“now"), MINUTE), -1, minute)

We then created a chart_template.json and used the API to stamp out charts for each country of interest - no need to enter this rather lengthy time range value.

Optional: Exploration within Rill

To navigate data within Rill without additional integrations, we can auto-generate a fully integrated dashboard with access to dimensions and metrics with no configuration. 

The Explore dashboard is a simple UI focused on ad hoc exploration and intuitive filtering. 

More details on the basics of Explore can be found in our documentation.

Additional Considerations & Next Steps

To prevent duplicates within Apache Druid, enable exactly-once semantics in ksqlDB, and Apache Druid Kafka Consumer needs configuration only to read committed messages. The Kafka Consumer within Druid manages Kafka offset positioning within Druid; this approach is perfect for databases to ensure offsets are tracked within the transactional operations of the database itself.

Additionally, the OpenSky Connector is configured for data collection only once per minute to avoid double-counting that aircraft over our desired collection granularity. Once ksqlDB adds suppression, this restriction can be lifted by adding additional logic to the pipeline.

Also, the infrastructure and applications still need deployment.  At Rill, we manage Apache Druid for clients with a self-managed control plane for database access and any service account integrations. Additionally, we have a real-time Kafka infrastructure for supporting clients that need us to manage and run their real-time pipelines.

Try out the demo

All of the open-source components are available in a GitHub repository Kafka Local. This repository provides many docker-compose files for starting up services, including Apache Kafka, Kafka Connect, ksqlDB, Apache Druid, and Superset. The demo/opensky directory provides scripts to all of the pieces demonstrated along with scripts to start-stop containers and additional scripts to pull, build, and deploy the OpenSky Network Connector and the ksqlDB User Defined Geospatial Functions.

This OpenSky demonstration Apache Kafka, Kafka Connect, ksqlDB, Apache Druid, and Apache SuperSet.  Scripts are provided for each step, allowing you to have a complete demonstration and explore a single technology. You want to see Druid used as a dataset without building the data source, use the provided druid script and flight_summary.json to make that for you.  Unsure of creating a Superset database connection, dataset, and charts; let the superset setup script create them all for you. 

You can even try out for free: Rill Data’s managed service.

About Me

I am a Principal Solutions Architect at Rill Data, Inc., working with our clients to stream their data into the Ril Data Analytic platform powered by Apache Druid.  I am actively involved with the Apache Kafka Community, a Confluent Community Catalyst for 2020-21 and 2021-22, and presented at multiple Kafka Summits.  

If you have a question, reach out to me at the following Slack communities: Confluent Community, Apache Kafka, or Apache Druid. Also, I am @nbuesing at both Twitter and Linkedin.

Ready to see Rill in action? Try Free