Jonny Press
Databricks is fast becoming the data platform of choice for a lot of organisations, including a number of our clients. It is a Lakehouse- a combination of the best of both Data Warehouses and Data Lakes. It provides a wide variety of tooling for both analytics and governance, with the objective of making data easy to access and analyse for a broad user base. It is built on open source technology and adheres to open data format principles- predominantly Parquet under Delta Lake.
kdb+ is used to capture, store and analyse large volumes of data both historically and in real time. It is widely regarded as the highest performing timeseries database.
A number of the institutions we work with in the kdb+ space are deploying Databricks as their enterprise wide data platform, and wondering if or how it can be integrated with kdb+. This is a broad question, so lets narrow the scope with some assumptions:
kdb+ is better for timeseries analytics. Databricks offers timeseries support via Tempo from Databricks Labs, but isn’t considered as performant as kdb+
kdb+ captures a large amount of data (multiple TBs per day)
Databricks offers greater accessibility and ease-of-use to end users, particularly those working in machine learning or AI
An organisation’s Databricks instance will contain multiple additional data sets not contained within the kdb+ platform, potentially also very high volume
With these assumptions in mind we have put together some Proof of Concept examples.
In our view, kdb+ will remain the point of capture and data will continue to reside there on a long term basis. kdb+ is the better technology for timeseries analytics.
We can implement Databricks purely as a client of kdb+. However, we believe that creating a copy of some high value or aggregate datasets within the Databricks environment would be beneficial. A high value dataset might be the trade execution data, perhaps enhanced with additional analytics, and an example aggregate dataset might be market data reduced to a 1 minute granularity.
We believe an optimal solution is one where we:
minimise data duplication
have some data captured by kdb+ available natively to all Databricks users
can still access the full granularity tick data from within the Databricks environment if required
There are multiple ways to move data from any system into Databricks, each with pros and cons. The most common and cost effective is in batch, where data is exported to a common export format (e.g. CSV, JSON or Parquet) and uploaded to a storage bucket. A workflow is triggered to automate a load into Databricks.
A second approach is to stream data to Delta Live tables. The approach we employed was to utilise PyKX to subscribe to the data flow and publish to a Kinesis stream. We configured the Delta Live tables to read from the Kinesis stream. Comparing to the batch approach, streaming has the advantage of reducing ingest latency, though kdb+-esque millisecond and below latencies should not be expected. Single digit seconds is a more reasonable expectation. The disadvantage to this approach is cost- the Databricks clusters run for longer, meaning higher DBU (Databricks Unit) cost and higher cloud compute cost.
Example code is below. It will create a subscription from Python to kdb+ and push the data to Kinesis. It uses kinesis_stream.py from the AWS documentation.
import pykx as kx
import pandas as pd
import boto3
import sys
import asyncio
import logging
#establish AWS kinesis stream
from kinesis_stream import KinesisStream
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
kinesis_client = boto3.client("kinesis")
strm = KinesisStream(kinesis_client)
strm.name = 'trade'
#use pykx to poll kdb socket in background 200 times per second
#and push to Kinesis
async def main_loop(q):
while True:
await asyncio.sleep(0.005)
result = q.poll_recv()
if result is None:
continue
data = kx.q('{select time, sym, price, size from x}', result[2]).pd()
data['time'] = data['time'].dt.strftime('%Y-%m-%d %H:%M:%S')
for index, row in data.iterrows():
strm.put_record(row.to_dict(), row['sym'])
#create kdb+ connection, subscribe for data
async def main():
async with kx.RawQConnection(port=14000, username='admin', password='admin') as q:
await q('.u.sub', 'trade', '')
await main_loop(q)
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
print('Subscriber suspended')
A view of the Databricks pipeline is outlined below. We build a schema, use Spark to read from Kinesis using our AWS credentials, and finally format our trades table for storage.
Another school of thought is storage is cheap, so just ship historic data to Databricks, whilst keeping some history and real time processing in kdb+. Before adopting this approach consideration should be given to the utility and value of the high volume, full granularity market data to users that don’t work directly within the trading environment, as well as the actual volumes- kdb+ data captures are regularly multiple terabytes of data per day.
It should also be noted that as of the time of writing (January 2025) Databricks only supports microsecond resolution for timestamps in queries. Data can be stored in Delta Lake with higher resolution, but SparkSQL will truncate to microseconds on access.
A rationale for moving more data into Databricks is to allow users to make full use of the native Databricks machine learning tooling directly against the market microstructure. Some of the wider toolset, such as dashboards and reporting, may prove useful though wouldn’t be considered sufficient when compared to the types of visualisation and reporting tools generally deployed in the timeseries domain.
In the “ship it all to Databricks” model, the PyKX Databricks extension, outlined here, should be considered. The aim of this approach is to enhance the time series analytics capabilities and performance of Databricks by embedding PyKX within PySpark, operating directly on the data stored within Delta Lake.
Share this: