Intake Spill

A feature to improve ingest reliability by buffering data in an external object store.

Overview

This feature is available as of Hydrolix version 4.20.1 and is disabled by default. See the Configuration section to enable it.

The intake-head service can automatically offload raw and catalog data to a cluster's default external object storage if there is a delay in transferring incoming data buckets to the indexer. When the delay is resolved, the offloaded data will be fetched back into the processing pipeline so no data is lost.

This feature ensures that ingest operates continuously under slow response-time conditions, during unusually heavy load, and when there are temporary outages to the catalog.

Enabling the intake spill feature can be helpful if your ingest pipeline periodically emits 429 responses resulting in data loss or experiences backpressure from any of the following events:

  • Ingest spikes
  • High-latency partition creation
  • Slow catalog inserts

Spilling raw data or catalog data can be enabled independently. When both are enabled, ingested data may be pushed to object storage prior to indexing (raw data) and when populating the database catalog (catalog data). Simultaneously, a fetching layer activates that periodically polls for data that has been pushed to object storage.

If data is waiting to be processed, it is fetched back into the processing pipeline once the indexer is able to create partitions again or once the catalog is accepting insertions. Unprocessed data that has been fetched by an intake-head pod for processing is subsequently cleaned up from object storage.

When data is spilled, it is written to the default external object store configured for the cluster. Once written, the spilled data is shared among intake-head pods. This means that an intake-head pod that isn't experiencing delays can fetch and process raw data originally received by a different intake-head pod.

When retrieving spilled data from object storage, newer data is prioritized by Last-In, First-Out, LIFO.

Resolve 429 Errors

If your ingest pipeline generates 429 Too Many Requests errors, first identify the source of the errors. There are typically two reasons why this error occurs:

  • Latency in parsing and applying transforms to the ingested data
  • Latency when creating partitions with the ingested data

The hdx_sink_bucket_maint_duration_ns metric is not high

Use the hdx_sink_bucket_maint_duration_ns Prometheus metric to identify the root cause of the error.

For example, if the metric is only showing several seconds in the 99th percentile, and the intake-head is returning 429 errors, it may be due to a bottleneck while parsing or applying transforms to the incoming data.

This can mean there are too many requests for the intake-head to process. Rather than use the intake spill feature, scale up the intake-head to allow more requests.

The hdx_sink_bucket_maint_duration_ns metric is high

If the hdx_sink_bucket_maint_duration_ns metric is higher than several seconds, it indicates the partition creation process is the bottleneck. This is typically caused by slowness in one of the downstream services. For example, latency in object storage can slow down the partition creation and ingestion process and generate 429 errors.

Enable the intake spill feature for this type of error.

Architecture diagram

Logo Light Logo Dark

Spilled data file paths

The following details the file paths in which raw and catalog data are buffered. This can be helpful for locating spilled data, but is not essential to know in order to use this feature.

When raw data is spilled to object storage, it is saved within the path:

<bucket>/spill/raw

Each raw data bucket is organized within these folders using the following structured path:

<partition folder>/<reverse timestamp millis>/<project_id>/<table_id>/<shard key hash>/<unique id>.tar.gz

Unprocessed catalog data is spilled to the path:

<bucket>/spill/catalog

And catalog buckets will be organized using the same path but into .json format.

<partition folder>/<reverse timestamp millis>/<project_id>/<table_id>/<shard key hash>/<unique id>.json

The <partition folder> structure reduces contention on retrieval as a fetcher can only scan one partition folder at a time. It does so in a randomized order.

Configuration

The intake spill feature is disabled by default. You can enable it in your hydrolixcluster.yaml config file with the intake_head_raw_data_spill_config and intake_head_catalog_spill_config tunables. These tunables are dictionaries containing the following identical fields and default values:

intake_head_raw_data_spill_config:  
  enabled: false
  max_concurrent_fetch: 1
  fetch_lock_expire_duration: 10m #minutes
  max_concurrent_spill: 20
  max_attempts_spill: 5
  num_partitions: 10 #this should never be reduced
  empty_fetch_pause_duration: 30s #seconds

intake_head_catalog_spill_config:  
  enabled: false
  max_concurrent_fetch: 1
  fetch_lock_expire_duration: 10m #minutes
  max_concurrent_spill: 20
  max_attempts_spill: 5
  num_partitions: 10 #this should never be reduced
  empty_fetch_pause_duration: 30s #seconds
intake_head_raw_data_spill_config:  
  enabled: when true, ingested data is spilled to object storage when partition generation is slowed for a particular intake-head pod
  max_concurrent_fetch: controls the max number of raw data buckets that can be fetched for local processing from object storage 
  fetch_lock_expire_duration: ttl for locks made on spilled raw data buckets claimed by an intake-head 
  max_concurrent_spill: maximum number of raw spill actions that can be concurrently executing
  max_attempts_spill: maximum number of sequential upload attempts before failing
  num_partitions: the number of partitioned folders to use for spilled data to help reduce contention. Do not decrement.
  empty_fetch_pause_duration: wait time for the raw data fetcher when it obtains a null result

intake_head_catalog_spill_config: 
  enabled: when true, catalog adds are spilled to object storage when catalog interactions are slowed or fail for a particular intake-head pod
  max_concurrent_fetch: controls the max number of spilled catalog data buckets that can be fetched for local processing from object storage 
  fetch_lock_expire_duration: ttl for locks made on spilled catalog data buckets claimed by an intake-head 
  max_concurrent_spill: maximum number of catalog spill actions that can be concurrently executing
  max_attempts_spill: maximum number of sequential upload attempts before failing
  num_partitions: the number of partitioned folders to use for spilled data to help reduce contention. Do not decrement.
  empty_fetch_pause_duration: wait time for the catalog data fetcher when it obtains a null result
intake_head_raw_data_spill_config:  
  enabled: boolean
  max_concurrent_fetch: uint
  fetch_lock_expire_duration: duration # valid unit options are at https://pkg.go.dev/maze.io/x/duration#pkg-types ("Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h', 'd', 'w', 'y'.")
  max_concurrent_spill: uint
  max_attempts_spill: uint
  num_partitions: uint
  empty_fetch_pause_duration: duration # valid unit options are at https://pkg.go.dev/maze.io/x/duration#pkg-types ("Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h', 'd', 'w', 'y'.")

intake_head_catalog_spill_config:  
  enabled: boolean
  max_concurrent_fetch: uint
  fetch_lock_expire_duration: duration # valid unit options are at https://pkg.go.dev/maze.io/x/duration#pkg-types ("Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h', 'd', 'w', 'y'.")
  max_concurrent_spill: uint
  max_attempts_spill: uint
  num_partitions: uint
  empty_fetch_pause_duration: duration # valid unit options are at https://pkg.go.dev/maze.io/x/duration#pkg-types ("Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h', 'd', 'w', 'y'.")

These options apply to each individual intake-head pod.

🚧

Do not decrease the value of num_partitions

Decreasing the value of num_partitions could result in data loss. See Risks and Limitations for more details.

Metrics

The following metrics have been added to support this feature:

hdx_sink_spill_raw_duration_ns # A histogram of the duration in nanoseconds taken to spill raw data buckets
hdx_sink_spill_raw_failure_count # Count of spill failures for raw data resulting in lost data
hdx_sink_spill_raw_accepted_count # Count of spilled raw data entries accepted into the indexing pipeline for processing
hdx_sink_spill_catalog_duration_ns # A histogram of the duration in nanoseconds taken to spill catalog adds
hdx_sink_spill_catalog_failure_count # Count of spill failures for catalog adds resulting in lost data
hdx_sink_spill_catalog_accepted_count # Count of spilled catalog entries accepted into the indexing pipeline for processing
hdx_sink_spill_catalog_race_lost_count # Count of lost races attempting to add spilled data entry to catalog

Risks and limitations

  1. Temporarily missing or gaps in data

    This can occur when data is currently spilled and has not yet been fetched and processed. Subsequent fetching and processing of spilled data is prioritized by most recent data first.
    To determine if raw or catalog data buckets remain unprocessed in object storage, you can take the difference between the counter metric for number of things spilled and the counter metric for number of things fetched. A duration histogram metric is captured for each spill occurrence. For raw and catalog data they are:

  • hdx_sink_spill_raw_duration_ns  

  • hdx_sink_spill_catalog_duration_ns

    A histogram metric generates a corresponding counter metric in addition to the duration it captures. To access the counter, append _count to the original histogram metric name. The counts of raw and catalog data spilled can therefore be obtained from the metrics:

  • hdx_sink_spill_raw_duration_ns_count 

  • hdx_sink_spill_catalog_duration_ns_count 

    Making the difference between the counts of spilled data and fetched data the following:

  • Raw: hdx_sink_spill_raw_duration_ns_count - hdx_sink_spill_raw_accepted_count

  • Catalog: hdx_sink_spill_catalog_duration_ns_count - hdx_sink_spill_catalog_accepted_count

🚧

Count of fetched data may be higher than count of spilled data

The "accepted" count (number of fetched buckets) can be higher than the count of buckets spilled because there may be a race fetching a given spilled item which can result in it being fetched by more than one intake-head pod. However, this race will not result in duplicated data. Due to this scenario, a positive result from this calculation indicates at least one bucket of spilled data has not been fetched; however, a zero or negative result does not guarantee that all spilled data has been fetched and processed.

Because spilled data is cleaned up after being fetched to continue processing, you can manually check for existing records in object storage within the appropriate file paths to determine if there is spilled data that has yet to backfill gaps in your queryable data.

  1. Reducing the value for num_partitions can cause data loss

    Decreasing the value of num_partitions could result in data loss due to spilled data being written to and read from a numeric partitioned folder structure. For example, if the value of this field starts at 10, spilled data will be written to folders partitioned numerically from 0 to 9. If num_partitions were to be subsequently reduced to 9, spilled data would be written to and read from partitions 0 through 8. Unprocessed data may remain in partition folder 9 unless and until num_partitions was returned to the value of 10 or higher.

  2. Increased CPU usage on intake-head pods during ingest spikes

    Tarballing and gzipping large amounts of data may notably increase CPU usage.

  3. Fetching does not respect worker pool isolation

    If you have set up resource pools to keep ingest workloads isolated, enabling this feature means that raw or catalog data can be ingested by an intake-head from one pool, spilled to the shared object storage, then picked up and have its processing completed by an intake-head from another worker pool.

  4. No multi-bucket targeting

    Data can only be spilled to the default object store for the Hydrolix cluster. It is not currently possible to configure a secondary storage for this feature.

  5. Wasted resources due to race conditions on fetch

    It is possible that multiple intake-head pods will try to claim the same spilled data for processing. For catalog data, to ensure that data is written exactly once, an intake-head will write to the locks table in the catalog database in Postgres using the unique ID for the spilled data bucket it is attempting to process as part of the primary key. Upon success, it will then write the generated partition to the catalog. Any subsequent intake-head will encounter this lock and be unable to process the same spilled data bucket, ensuring that only one writer can win any potential race. The metric to measure how often this race condition occurs for catalog data is hdx_sink_spill_catalog_race_lost_count. A corresponding metric does not exist for raw data.