Summary Tables (Aggregation)
Hydrolix supports aggregate Summary tables. Summary tables lend themselves well to aggregations of metrics, such as min, max, counts, sum and in some cases percentiles, unique counts, or other statistical aggregations. The basic principle of a Summary table is that at time of data ingest an SQL statement is run against the incoming data with the output being written directly to a new target table.
There are two main methods of generating summary or aggregation tables described below. Batch, Kafka and Kinesis uses one methodology, the HTTP Stream API uses another.
Batch, Kafka and Kinesis Ingested Data
Batch Ingest, Kafka and AWS Kinesis support Summary tables out of the box. Using the transforms ability to execute SQL, aggregations can be applied to the incoming data while also enriching the data.
For example if there is a requirement to have an enriched raw data table and an aggregated summary table, that contained hourly roll-ups of the data Kafka ingested environment would look as follows. One pool of Kafka servers would ingest, enrich and encode the incoming data for the raw table
. The other pool of Kafka servers would ingest, enrich, aggregate and encode the data for the summary table
. Both pools, pull data from the Customers Kafka environment.

HTTP Stream API ingested Data
The HTTP Stream API is a special case and uses an alternative methodology for managing summary tables. This is because the data is received via a push method (http) into the system by the Stream Head which in turn puts the message onto a queue for ingest (in Kubernetes this is Redpanda). To be able to create a Summary table the HTTP Stream API uses an additional service type called a Summary-peer, these services retrieve the same raw message from the queue, apply the enrichment on the raw data (the same way a stream-peer does) and then executes their own Aggregating SQL statement.
For example:
It is important to understand the flow of data through a Summary peer as this is not the same as the other ingest methods. The data flow is such that the data has two transforms applied to it within the Summary-peer. The first transform parsed is the same that is applied to the raw data being ingested by the stream-peer
- this is also called the parent
table. Once this transform is applied, the resultant data then has the Summary transform applied, with the resultant output added to the Summary table.

This does mean that it is important to understand the names and types of the data to be summarized, and the same names and types to be used in the storage of the data.
For example:
If the raw data coming in has a column called myColumn
this is type "string". The Summary transform will need to reference the column myColumn
within its transform_sql
and set the data type as a string (unless casting has been applied within the SQL) as well.
Stream Summary Tables
Note that when using Summary tables the intermediary "parent" table partitions are not written to the database. It is always expected that a "raw" table will exist to supply the "parent" data to be written.
Setting up Summary Tables
On the assumption data is already flowing into your existing "raw" table, in order to create your summary table three items need to be created:
- Create the Summary Table
- Create a Transform for the table and the SQL you will use to Summarize that data.
- Create the infrastructure to create your Summary.
Create your Summary table
The first step is to create the table which will receive the summary data, more information can be found here - Projects & Tables. The table created can be the same (although a new name needs to be used) as any normal table you would create and they will often have the same characteristics. One item to bare in mind is often the aim is to "aggregate" rows this means that hot and cold write settings may need to be altered to get a good level of initial aggregation.
Functions and Dictionaries
If you are using functions and dictionaries to enrich your "raw data" table these should also be available to the summary table. It is strongly suggested that as functions and dictionaries are "project" bound your "summary table" be within the same project as your "raw data" table
Create the Transform for the Summary table.
Once the table is created, the transform (write schema) needs to be created. This follows a similar format as for the regular ingestion of data, more information on transforms can be found here Write Transforms.
Hydrolix table requires timestamp as primary field
Even for Summary Tables a primary timestamp (datetime or an epoch) field is required. Note this can NOT however be a field generated from the
Transform_sql
as anow()
or alike, it can be an existing column that has a function applied to it - for example:toStartOf5Minute
.
The transform includes an additional field transform_sql
within the settings
object, this can be used for the aggregation. For example to get the count by status_code every 5 minutes the following could be used within your Summary tables transform_sql
:
select toStartOf5Minute(timestamp) as timestamp, status_code::Nullable(String), count()::Nullable(UInt64) as count from {STREAM} GROUP BY timestamp, status_code
The basic transform for the table would be as follows:
{
"name": "transform",
"description": "Demo SQL Transform",
"type": "json",
"settings": {
"is_default": true,
"sql_transform": "select toStartOf5Minute(timestamp) as time, status_code::Nullable(String), count()::Nullable(UInt64) as count from {STREAM} GROUP BY time, status_code",
"output_columns": [
{
"name": "time",
"datatype": {
"primary": true,
"resolution": "seconds",
"format": "2006-01-02 15:04:05",
"type": "datetime"
}
},
{
"name": "status_code",
"datatype": {
"type": "string"
}
},
{
"name": "count",
"datatype": {
"type": "uint64"
}
}
],
"compression": "none",
"format_details": {}
}
}
Transform settings for Summary table
The transform must be the default one otherwise the insert will fail, use the following in the settings:
"is_default": true
At Query time the following would be used to retrieve the data.
select timestamp, status_code, sum(count) as count from myProject.summaryTable GROUP BY timestamp, status_code
Basic Aggregations
Min()
, Max()
, count()
, and sum()
are all good basic aggregations that can be used in Summary tables.
For more advanced metrics such as averages and percentiles further consideration should be made. This is due to the way partitions are written, where multiple pods or services can write their own partitions containing aggregations for the same periods.
For example it may seem logical to use the avg()
function as part of your roll-up to get the average value over a period of time. However if this is used, as each peer will generate its own average across each partitions when it is time to query it the user will need to average the average to account for multiple partitions with multiple averages - avg(avg())
- this can lead to unexpected outputs.
In the above example a better way of calculating the average is to use two columns in the summary table, one for the sum() and one for the count() that at query time can be aggregated and then the average calculated. For example:
// sql_transform
select toStartOf5Minute(timestamp) as timestamp, sum(download_time_sec)::Nullable(UInt64) as dnl_sec_sum, count(download_time_sec)::Nullable(UInt64) as dnl_sec_count from {STREAM} GROUP BY timestamp
//query sql to get average download time
select timestamp, sum(dnl_sec_sum)/sum(dnl_sec_count) as average from myProject.summaryTable where timestamp between '2022-04-01 00:00:00' and '2022-04-30 23:59:59' group by timestamp
Another more advanced method is to use the Clickhouse functionality of Intermediate Aggregation States below.
Intermediate Aggregation States.
Intermediate aggregations states are a datatype that holds only the information to compute an aggregate function. The simplest example of this is the avg()
function, where the state would hold only sums and counts, or a more complex example would be the uniq()
state, which holds the fixed memory HyperLogLog binary string of values it has seen already.
This stored state allows the aggregation of row data without the danger of aggregating an aggregate while still providing a performance gain. More information on this functionality can be found here - on the Clickhouse site - Intermediate Aggregation States
To use this functionality within Hydrolix the transform_sql
should have the functions suffixed with the State
keyword. This causes this intermediary state to be created. At time of writing Hydrolix stores this as a String
so the data generated should be cast to a nullable(String). For example an average download time in seconds stored state would be specified as:
avgState(download_time_sec::Nullable(UInt64))::Nullable(String)
avgState
is the avg() function with the State
suffix attached to store the intermediary state for average. The ::Nullable(UInt64)
sets download_time_sec to a Nullable UInt64 type - needed for reading the data out of the source table. The ::Nullable(String)
encompasses the definition to make the state stored as a String
.
For example, a complete SQL statement where a calculation for average and the 50th percentile is required is spefied as follows:
SELECT
toStartOfFiveMinute(timestamp) as timestamp,
...
avgState(download_time_sec::Nullable(Uint64))::Nullable(String) as dnld_time_sec_avg_state,
quantileState(0.50)(download_time_sec::Nullable(UInt64))::Nullable(String) as dnld_time_sec_p50_state,
FROM
{STREAM}
GROUP BY
timestamp,
....
If query is run directly on this table we then see the states stored:
SELECT *
FROM myProject.intStateAvgPerc
LIMIT 1
Row 1:
──────
timestamp: 2022-08-23 11:15:00
dnld_time_sec_avg_state: "%^452£%[email protected]$"93005 65 874asdf321
dnld_time_sec_p50_state: [email protected]$"93005 0 123459b"$£%[email protected][email protected]
To retrieve and calculate the average and percentile from this state the Query should use the Merge
Suffix with an AggregateFunction() cast for the data-type.
For example:
SELECT
timestamp,
avgMerge(dnld_time_sec_avg_state::AggregateFunction(avg, Nullable(UInt64))) as dnld_time_sec_avg,
quantileMerge(0.50)(dnld_time_sec_p50_state::AggregateFunction(quantile(0.50), Nullable(UInt64))) as dnld_time_sec_p50
FROM
myProject.intStateAvgPerc
WHERE timestamp BETWEEN '2022-04-01 00:00:00' and '2022-04-30 23:59:59'
GROUP BY timestamp
ORDER BY timestamp
LIMIT 10;
avgMerge
is the merge function that merges at query time the intermediary states.
The cast to AggregateFunction()
is required as the state is stored as a String. More information on this datatype is here - Aggregate Function.
Aggregation Periods
Each Summary table created should ONLY have one aggregation period. For example toStartOfFiveMinute(). A table can not contain both a toStartOfHour() and toStartOfDay() aggregation.
Creating summary resources
The infrastructure to start loading your Summary tables now needs to be created for Batch, Kinesis and Kafka these follow the same methods as is seen for the Raw data.
HTTP Stream API Summary Source
For the HTTP Streaming API an additional service source is created via the Summary Sources API end-point.
For Kubernetes the following example could be used
curl --request POST \
--url https://host/config/v1/orgs/$org_id/projects/$project_id/tables/$table_id/sources/summary/ \
--header 'Accept: application/json' \
--header 'Authorization: Bearer <token>' \
--header 'Content-Type: application/json' \
--data '
{
"name": "mySummary",
"parent_table": "myProject.rawdata",
"settings": {
"read_timeout": "5s",
"max_wait": "500ms"
},
"subtype": "summary",
"table": "myProject.mySummaryTable",
"transform": "summarytransform",
"type": "pull",
"pool_name": "summarypool",
"k8s_deployment": {
"service": "summary-peer",
"replicas": "10",
"memory": "15Gi"
}
}
Setting | Description | Required | Example |
---|---|---|---|
name | Name of the source | N | mySummarySource |
parent_table | Name of the parent table | Y | myProject.rawdata |
settings.read_timeout | N | 5s | |
settings.max_wait | N | 500ms | |
subtype | This should always be set as summary | Y | summary |
table | The name of the target project and table to write data into. | Y | myProject.mySummaryTable |
transform | The name of the transform to be used for the summarisation. | Y | summarytransform |
type | This should always be set to pull . | Y | pull |
pool_name | A name for the summary server pool that will run the summarisation. Must be lower case. | Y | mysumarypool |
k8s_deployment.service | K8's specific. This should always be summary-peer | Y | summary-peer |
k8s_deployment.cpu | K8's specific. The CPU specification for the K8's Pod. | N | 2 |
k8s_deployment.memory | K8's specific. The memory specification for the K8's Pod | N | 12Gi |
k8s_deployment.replicas | K8's specific. The number of K8's Pods. | N | 5 |
k8s_deployment.storage | K8's specific. The storage specification for the K8's Pod. | N | 10Gi |
instance.disk | Cloudformation specific. The amount of storage on the AWS instance. | N | 10Gi |
instance.count | Cloudformation specific. The number of AWS instances. | N | 5 |
instance.size | Cloudformation specific. The size of AWS instances. | N | medium |
instance.family | Cloudformation specific. The family of AWS instance. | N | r5 |
instance.service | Cloudformation specific. This should always be summary-peer . | N | summary-peer |
Sizing Stream Summary-Peers
This should be done carefully, as memory and CPU utilisation will likely be higher with higher levels of aggregation. A good benchmark to use initially is to take the CPU and Memory utilisation used on the stream-peer and add 10-20% to both. For example if you have 8GB of RAM specified for your RAW table ingestion it would be worth starting with 10GB as a starting point.
Merge (Compaction)
An important aspect of Summary Tables that should not be overlooked is the use of Merge. Merge is the Hydrolix compaction service that takes written partitions and evaluates how the can be combined for better compression, faster retrieval and more efficient storage.
Merge has the ability to make use of as sql_transform
the same as other ingest methods. This provides the ability to recalculate items such as intermediary states, sum, count etc states and compact them.
For example, the below uses a new function called MergeState that can be used to Merge the intermediary state of a p95 calculation.
SELECT
toStartOfFiveMinute(timestamp) as timestamp,
...
avgMergeState(dnld_time_sec_avg_state::AggregateFunction(avg, Nullable(UInt64)))::Nullable(String) as dnld_time_sec_avg_state,
quantileMergeState(0.50)(dnld_time_sec_p50_state::AggregateFunction(quantile(0.50), Nullable(UInt64)))::Nullable(String) as dnld_time_sec_p50_state,
FROM
{STREAM}
GROUP BY
timestamp,
....
Lastly, it is important to also have sufficient capacity of merge-peers. To see how your table is being merged information can be found within the Catalog Metadata. To scale merge components more information can be found on the Merge page and on the Scaling your Cluster pages.
Gotchas
In working with Summary tables there are some standard (and in some cases temporary) "Gotchas" we found that once understood are easily navigated.
transform_sql and sql gotchas
Gotcha | Description |
---|---|
Datetime64 | The toStartOf functions (toStartOfMinute, toStartOfHour, etc) can sometimes be a little tricky with datatypes when running it in your transform_sql . Make sure your toStartOf functions datatype matches the transform, by default its datetime rather than datetime64. If your table has datetime64 as the type then you will need to use something like toDateTime64(toStartOfHour(timestamp)) in your query. |
Column Alias's | You may see occasions where you appear to have a "missing field" error. This can occur when you have not aliased your function in the transform_sql . For example count(cname) should be specified in the sql as count(cname) as cname . This is because the query engine names column count(cname) without the alias. |
Nullable Errors | One of the most common errors seen are nullable errors. This can often be solved by casting the field to a nullable type by using the following ::Nullable(Type) |
Multiple Aggregation periods single table | At this time it is not possible to have two different aggregation periods in a single table. E.g. you can not have toStartOfHour() and toStartOfDay() rows stored in the same table. If this is done, it is likely unexpected results will occur. |
Service Deployment
Gotcha | Description |
---|---|
Pool names must be lower case | For the time being (we are working on it) pool names need to be lower case. The error will show up in the Operator in K8's. |
Updated about 1 month ago