Summary Tables (Aggregation)

Hydrolix supports aggregate Summary tables. Summary tables lend themselves well to aggregations of metrics, such as min, max, counts, sum and in some cases percentiles, unique counts, or other statistical aggregations. The basic principle of a Summary table is that at time of data ingest an SQL statement is run against the incoming data with the output being written directly to a new target table.

Overview

The are two main methods of generating summary or aggregation tables described below. Batch, Kafka and Kinesis uses one methodology, the HTTP Stream API uses another.

Batch, Kafka and Kinesis Ingested Data

Batch Ingest, Kafka and AWS Kinesis support Summary tables out of the box. Using the [transforms](ref:transforms ability to execute SQL, aggregations can be applied to the incoming data while also enriching the data.

For example if there is a requirement to have an enriched raw data table and an aggregated summary table, that contained hourly roll-ups of the data Kafka ingested environment would look as follows. One pool of Kafka servers would ingest, enrich and encode the incoming data for the raw table. The other pool of Kafka servers would ingest, enrich, aggregate and encode the data for the summary table. Both pools, pull data from the Customers Kafka environment.

587587

HTTP Stream API ingested Data

The HTTP Stream API is a special case and uses an alternative methodology for managing summary tables. This is because the data is received via a push method (http) into the system by the Stream Head which in turn puts the message onto a queue for ingest (in Kubernetes this is Redpanda). To be able to create a Summary table the HTTP Stream API uses an additional service type called a Summary-peer, these services retrieve the same raw message from the queue, apply the enrichment on the raw data (the same way a stream-peer does) and then executes their own Aggregating SQL statement.

For example:

11991199

It is important to understand the flow of data through a Summary peer as this is not the same as the other ingest methods. The data flow is such that the data has two transforms applied to it within the Summary-peer. The first transform parsed is the same that is applied to the raw data being ingested by the stream-peer - this is also called the parent table. Once this transform is applied, the resultant data then has the Summary transform applied, with the resultant output added to the Summary table.

599599

This does mean that it is important to understand the names and types of the data to be summarized, and the same names and types to be used in the storage of the data.

For example:
If the raw data coming in has a column called myColumn this is type "string". The Summary transform will need to reference the column myColumn within its transform_sql and set the data type as a string (unless casting has been applied within the SQL) as well.

šŸ“˜

Stream Summary Tables

Note that when using Summary tables the intermediary "parent" table partitions are not written to the database. It is always expected that a "raw" table will exist to supply the "parent" data to be written.

Setting up Summary Tables.

On the assumption data is already flowing into your existing "raw" table, in order to create your summary table three items need to be created:

  1. Create the Summary Table
  2. Create a Transform for the table and the SQL you will use to Summarize that data.
  3. Create the infrastructure to create your Summary.

Create your Summary table

The first step is to create the table which will receive the summary data, more information can be found here - Projects & Tables. The table created can be the same (although a new name needs to be used) as any normal table you would create and they will often have the same characteristics. One item to bare in mind is often the aim is to "aggregate" rows this means that hot and cold write settings may need to be altered to get a good level of initial aggregation.

šŸš§

Functions and Dictionaries

If you are using functions and dictionaries to enrich your "raw data" table these should also be available to the summary table. It is strongly suggested that as functions and dictionaries are "project" bound your "summary table" be within the same project as your "raw data" table

Create the Transform for the Summary table.

Once the table is created, the transform (write schema) needs to be created. This follows a similar format as for the regular ingestion of data, more information on transforms can be found here Write Transforms.

šŸš§

Hydrolix table requires timestamp as primary field

Even for Summary Tables a primary timestamp (datetime or an epoch) field is required. Note this can NOT however be a field generated from the Transform_sql as a now() or alike, it can be an existing column that has a function applied to it - for example: toStartOf5Minute.

The transform includes an additional field transform_sql within the settings object, this can be used for the aggregation. For example to get the count by status_code every 5 minutes the following could be used within your Summary tables transform_sql:

select toStartOf5Minute(timestamp) as timestamp, status_code::Nullable(String), count()::Nullable(UInt64) as count from {STREAM} GROUP BY timestamp, status_code

The basic transform for the table would be as follows:

{
    "name": "transform",
    "description": "Demo SQL Transform",
    "type": "json",
    "settings": {
        "is_default": true,
        "sql_transform": "select toStartOf5Minute(timestamp) as time, status_code::Nullable(String), count()::Nullable(UInt64) as count from {STREAM} GROUP BY time, status_code",
        "output_columns": [
            {
                "name": "time",
                "datatype": {
                    "primary": true,
                    "resolution": "seconds",
                    "format": "2006-01-02 15:04:05",
                    "type": "datetime"
                }
            },
            {
                "name": "status_code",
                "datatype": {
                    "type": "string"
                }
            },
            {
                "name": "count",
                "datatype": {
                    "type": "uint64"
                }
            }
            
        ],
    "compression": "none",
    "format_details": {}
    }
}

šŸš§

Transform settings for Summary table

The transform must be the default one otherwise the insert will fail, use the following in the settings:
"is_default": true

At Query time the following would be used to retrieve the data.

select timestamp, status_code, sum(count) as count from myProject.summaryTable GROUP BY timestamp, status_code

Basic Aggregations

Min(), Max(), count(), and sum() are all good basic aggregations that can be used in Summary tables.

For more advanced metrics such as averages and percentiles further consideration should be made. This is due to the way partitions are written, where multiple pods or services can write their own partitions containing aggregations for the same periods.

For example it may seem logical to use the avg() function as part of your roll-up to get the average value over a period of time. However if this is used, as each peer will generate its own average across each partitions when it is time to query it the user will need to average the average to account for multiple partitions with multiple averages - avg(avg()) - this can lead to unexpected outputs.

In the above example a better way of calculating the average is to use two columns in the summary table, one for the sum() and one for the count() that at query time can be aggregated and then the average calculated. For example:

// sql_transform
select toStartOf5Minute(timestamp) as timestamp, sum(download_time_sec)::Nullable(UInt64) as dnl_sec_sum, count(download_time_sec)::Nullable(UInt64) as dnl_sec_count from {STREAM} GROUP BY timestamp

//query sql to get average download time
select timestamp, sum(dnl_sec_sum)/sum(dnl_sec_count) as average from myProject.summaryTable where timestamp between '2022-04-01 00:00:00' and '2022-04-30 23:59:59' group by timestamp

Another more advanced method is to use the Clickhouse functionality of Intermediate Aggregation States below.

Intermediate Aggregation States.

Intermediate aggregations states are a datatype that holds only the information to compute an aggregate function. The simplest example of this is the avg() function, where the state would hold only sums and counts, or a more complex example would be the uniq() state, which holds the fixed memory HyperLogLog binary string of values it has seen already.

This stored state allows the aggregation of row data without the danger of aggregating and aggregate while still providing a performance gain. More information on this functionality can be found here - on the Clickhouse site - Intermediate Aggregation States

To use this functionality within Hydrolix the transform_sql should have the functions suffixed with the State keyword. This causes this intermediary state to be created. At time of writing Hydrolix stores this as a String so the data generated should be cast to a nullable(String). For example an average download time in seconds stored state would be specified as:

avgState(download_time_sec::Nullable(UInt64))::Nullable(String)

avgState is the avg() function with the State suffix attached to store the intermediary state for average. The ::Nullable(UInt64) sets download_time_sec to a Nullable UInt64 type - needed for reading the data out of the source table. The ::Nullable(String) encompasses the definition to make the state stored as a String.

For example, a complete SQL statement where a calculation for average and the 50th percentile is required is spefied as follows:

SELECT
    toStartOfFiveMinute(timestamp) as timestamp,
...
     avgState(download_time_sec::Nullable(Uint64))::Nullable(String) as dnld_time_sec_avg_state,
     quantileState(0.50)(download_time_sec::Nullable(UInt64))::Nullable(String) as dnld_time_sec_p50_state,

FROM
    {STREAM}
GROUP BY
    timestamp,
....

If query is run directly on this table we then see the states stored:

SELECT *
FROM myProject.intStateAvgPerc
LIMIT 1

Row 1:
ā”€ā”€ā”€ā”€ā”€ā”€
timestamp: 2022-08-23 11:15:00
dnld_time_sec_avg_state: "%^452Ā£%[email protected]$"93005 65 874asdf321
dnld_time_sec_p50_state: [email protected]$"93005 0 123459b"$Ā£%[email protected][email protected]

To retrieve and calculate the average and percentile from this state the Query should use the Merge Suffix with an AggregateFunction() cast for the data-type.

For example:

SELECT 
    timestamp, 
    avgMerge(dnld_time_sec_avg_state::AggregateFunction(avg, Nullable(UInt64))) as dnld_time_sec_avg,
    quantileMerge(0.50)(dnld_time_sec_p50_state::AggregateFunction(quantile(0.50), Nullable(UInt64))) as dnld_time_sec_p50
FROM 
    myProject.intStateAvgPerc
WHERE timestamp BETWEEN '2022-04-01 00:00:00' and '2022-04-30 23:59:59'
GROUP BY timestamp
ORDER BY timestamp
LIMIT 10;

avgMerge is the merge function that merges at query time the intermediary states.
The cast to AggregateFunction() is required as the state is stored as a String. More information on this datatype is here - Aggregate Function.

šŸ“˜

Aggregation Periods

Each Summary table created should ONLY have one aggregation period. For example toStartOfFiveMinute(). A table can not contain both a toStartOfHour() and toStartOfDay() aggregation.

Create Infrastructure

The infrastructure to start loading your Summary tables now needs to be created, for Batch, Kinesis and Kafka these follow the same methods as is seen for the Raw data.

HTTP Stream API Summary Source

For the HTTP Streaming API an additional service source is created via the Summary Sources API end-point.

For Kubernetes the following example could be used

curl --request POST \
     --url https://host/config/v1/orgs/$org_id/projects/$project_id/tables/$table_id/sources/summary/ \
     --header 'Accept: application/json' \
     --header 'Authorization: Bearer <token>' \
     --header 'Content-Type: application/json' \
     --data '
{
  "name": "mySummary",
  "parent_table": "myProject.rawdata",
  "settings": {
    "read_timeout": "5s",
    "max_wait": "500ms"
  },
  "subtype": "summary",
  "table": "myProject.mySummaryTable",
  "transform": "summarytransform",
  "type": "pull",
  "pool_name": "summarypool",
  "k8s_deployment": {
    "service": "summary-peer",
    "replicas": "10",
    "memory": "15Gi"
  }
}
SettingDescriptionRequiredExample
nameName of the sourceNmySummarySource
parent_tableName of the parent tableYmyProject.rawdata
settings.read_timeoutN5s
settings.max_waitN500ms
subtypeThis should always be set as summaryYsummary
tableThe name of the target project and table to write data into.YmyProject.mySummaryTable
transformThe name of the transform to be used for the summarisation.Ysummarytransform
typeThis should always be set to pull.Ypull
pool_nameA name for the summary server pool that will run the summarisation. Must be lower case.Ymysumarypool
k8s_deployment.serviceK8's specific. This should always be summary-peerYsummary-peer
k8s_deployment.cpuK8's specific. The CPU specification for the K8's Pod.N2
k8s_deployment.memoryK8's specific. The memory specification for the K8's PodN12Gi
k8s_deployment.replicasK8's specific. The number of K8's Pods.N5
k8s_deployment.storageK8's specific. The storage specification for the K8's Pod.N10Gi
instance.diskCloudformation specific. The amount of storage on the AWS instance.N10Gi
instance.countCloudformation specific. The number of AWS instances.N5
instance.sizeCloudformation specific. The size of AWS instances.Nmedium
instance.familyCloudformation specific. The family of AWS instance.Nr5
instance.serviceCloudformation specific. This should always be summary-peer.Nsummary-peer

Sizing Stream Summary-Peers

This should be done carefully, as memory and CPU utilisation will likely be higher with higher levels of aggregation. A good benchmark to use initially is to take the CPU and Memory utilisation used on the stream-peer and add 10-20% to both. For example if you have 8GB of RAM specified for your RAW table ingestion it would be worth starting with 10GB as a starting point.

Merge (Compaction)

An important aspect of Summary Tables that should not be overlooked is the use of Merge. Merge is the Hydrolix compaction service that takes written partitions and evaluates how the can be combined for better compression, faster retrieval and more efficient storage.

Merge has the ability to make use of as sql_transform the same as other ingest methods. This provides the ability to recalculate items such as intermediary states, sum, count etc states and compact them.

For example, the below uses a new function called MergeState that can be used to Merge the intermediary state of a p95 calculation.

SELECT
    toStartOfFiveMinute(timestamp) as timestamp,
...
     avgMergeState(dnld_time_sec_avg_state::AggregateFunction(avg, Nullable(UInt64)))::Nullable(String) as dnld_time_sec_avg_state,
     quantileMergeState(0.50)(dnld_time_sec_p50_state::AggregateFunction(quantile(0.50), Nullable(UInt64)))::Nullable(String) as dnld_time_sec_p50_state,

FROM
    {STREAM}
GROUP BY
    timestamp,
....

Lastly, it is important to also have sufficient capacity of merge-peers. To see how your table is being merged information can be found within the Catalog Metadata. To scale merge components more information can be found on the Merge page and on the Scaling your Cluster pages.

Gotchas

In working with Summary tables there are some standard (and in some cases temporary) "Gotchas" we found that once understood are easily navigated.

transform_sql and sql gotchas

GotchaDescription
Datetime64The toStartOf functions (toStartOfMinute, toStartOfHour, etc) can sometimes be a little tricky with datatypes when running it in your transform_sql. Make sure your toStartOf functions datatype matches the transform, by default its datetime rather than datetime64. If your table has datetime64 as the type then you will need to use something like toDateTime64(toStartOfHour(timestamp)) in your query.
Column Alias'sYou may see occasions where you appear to have a "missing field" error. This can occur when you have not aliased your function in the transform_sql. For example count(cname) should be specified in the sql as count(cname) as cname. This is because the query engine names column count(cname) without the alias.
Nullable ErrorsOne of the most common errors seen are nullable errors. This can often be solved by casting the field to a nullable type by using the following ::Nullable(Type)
Multiple Aggregation periods single tableAt this time it is not possible to have two different aggregation periods in a single table. E.g. you can not have toStartOfHour() and toStartOfDay() rows stored in the same table. If this is done, it is likely unexpected results will occur.

Service Deployment

GotchaDescription
Pool names must be lower caseFor the time being (we are working on it) pool names need to be lower case. The error will show up in the Operator in K8's.

Did this page help you?