Intake Spill
A feature to improve ingest reliability by buffering data in an external object store.
Overview
This feature is available as of Hydrolix version 4.20.1 and is disabled by default. See the Configuration section to enable it.
The intake-head
service can automatically offload raw and catalog data to a cluster's default external object storage if there is a delay in transferring incoming data buckets to the indexer. When the delay is resolved, the offloaded data will be fetched back into the processing pipeline so no data is lost.
This feature ensures that ingest operates continuously under slow response-time conditions, during unusually heavy load, and when there are temporary outages to the catalog.
Enabling the intake spill feature can be helpful if your ingest pipeline periodically emits 429
responses resulting in data loss or experiences backpressure from any of the following events:
- Ingest spikes
- High-latency partition creation
- Slow catalog inserts
Spilling raw data or catalog data can be enabled independently. When both are enabled, ingested data may be pushed to object storage prior to indexing (raw data) and when populating the database catalog (catalog data). Simultaneously, a fetching layer activates that periodically polls for data that has been pushed to object storage.
If data is waiting to be processed, it is fetched back into the processing pipeline once the indexer is able to create partitions again or once the catalog is accepting insertions. Unprocessed data that has been fetched by an intake-head
pod for processing is subsequently cleaned up from object storage.
When data is spilled, it is written to the default external object store configured for the cluster. Once written, the spilled data is shared among intake-head
pods. This means that an intake-head
pod that isn't experiencing delays can fetch and process raw data originally received by a different intake-head
pod.
When retrieving spilled data from object storage, newer data is prioritized by Last-In, First-Out, LIFO.
Resolve 429
Errors
429
ErrorsIf your ingest pipeline generates 429 Too Many Requests
errors, first identify the source of the errors. There are typically two reasons why this error occurs:
- Latency in parsing and applying transforms to the ingested data
- Latency when creating partitions with the ingested data
The hdx_sink_bucket_maint_duration_ns
metric is not high
hdx_sink_bucket_maint_duration_ns
metric is not highUse the hdx_sink_bucket_maint_duration_ns
Prometheus metric to identify the root cause of the error.
For example, if the metric is only showing several seconds in the 99th percentile, and the intake-head
is returning 429
errors, it may be due to a bottleneck while parsing or applying transforms to the incoming data.
This can mean there are too many requests for the intake-head
to process. Rather than use the intake spill feature, scale up the intake-head
to allow more requests.
The hdx_sink_bucket_maint_duration_ns
metric is high
hdx_sink_bucket_maint_duration_ns
metric is highIf the hdx_sink_bucket_maint_duration_ns
metric is higher than several seconds, it indicates the partition creation process is the bottleneck. This is typically caused by slowness in one of the downstream services. For example, latency in object storage can slow down the partition creation and ingestion process and generate 429
errors.
Enable the intake spill feature for this type of error.
Architecture diagram
Spilled data file paths
The following details the file paths in which raw and catalog data are buffered. This can be helpful for locating spilled data, but is not essential to know in order to use this feature.
When raw data is spilled to object storage, it is saved within the path:
<bucket>/spill/raw
Each raw data bucket is organized within these folders using the following structured path:
<partition folder>/<reverse timestamp millis>/<project_id>/<table_id>/<shard key hash>/<unique id>.tar.gz
Unprocessed catalog data is spilled to the path:
<bucket>/spill/catalog
And catalog buckets will be organized using the same path but into .json
format.
<partition folder>/<reverse timestamp millis>/<project_id>/<table_id>/<shard key hash>/<unique id>.json
The <partition folder>
structure reduces contention on retrieval as a fetcher can only scan one partition folder at a time. It does so in a randomized order.
Configuration
The intake spill feature is disabled by default. You can enable it in your hydrolixcluster.yaml
config file with the intake_head_raw_data_spill_config
and intake_head_catalog_spill_config
tunables. These tunables are dictionaries containing the following identical fields and default values:
intake_head_raw_data_spill_config:
enabled: false
max_concurrent_fetch: 1
fetch_lock_expire_duration: 10m #minutes
max_concurrent_spill: 20
max_attempts_spill: 5
num_partitions: 10 #this should never be reduced
empty_fetch_pause_duration: 30s #seconds
intake_head_catalog_spill_config:
enabled: false
max_concurrent_fetch: 1
fetch_lock_expire_duration: 10m #minutes
max_concurrent_spill: 20
max_attempts_spill: 5
num_partitions: 10 #this should never be reduced
empty_fetch_pause_duration: 30s #seconds
intake_head_raw_data_spill_config:
enabled: when true, ingested data is spilled to object storage when partition generation is slowed for a particular intake-head pod
max_concurrent_fetch: controls the max number of raw data buckets that can be fetched for local processing from object storage
fetch_lock_expire_duration: ttl for locks made on spilled raw data buckets claimed by an intake-head
max_concurrent_spill: maximum number of raw spill actions that can be concurrently executing
max_attempts_spill: maximum number of sequential upload attempts before failing
num_partitions: the number of partitioned folders to use for spilled data to help reduce contention. Do not decrement.
empty_fetch_pause_duration: wait time for the raw data fetcher when it obtains a null result
intake_head_catalog_spill_config:
enabled: when true, catalog adds are spilled to object storage when catalog interactions are slowed or fail for a particular intake-head pod
max_concurrent_fetch: controls the max number of spilled catalog data buckets that can be fetched for local processing from object storage
fetch_lock_expire_duration: ttl for locks made on spilled catalog data buckets claimed by an intake-head
max_concurrent_spill: maximum number of catalog spill actions that can be concurrently executing
max_attempts_spill: maximum number of sequential upload attempts before failing
num_partitions: the number of partitioned folders to use for spilled data to help reduce contention. Do not decrement.
empty_fetch_pause_duration: wait time for the catalog data fetcher when it obtains a null result
intake_head_raw_data_spill_config:
enabled: boolean
max_concurrent_fetch: uint
fetch_lock_expire_duration: duration # valid unit options are at https://pkg.go.dev/maze.io/x/duration#pkg-types ("Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h', 'd', 'w', 'y'.")
max_concurrent_spill: uint
max_attempts_spill: uint
num_partitions: uint
empty_fetch_pause_duration: duration # valid unit options are at https://pkg.go.dev/maze.io/x/duration#pkg-types ("Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h', 'd', 'w', 'y'.")
intake_head_catalog_spill_config:
enabled: boolean
max_concurrent_fetch: uint
fetch_lock_expire_duration: duration # valid unit options are at https://pkg.go.dev/maze.io/x/duration#pkg-types ("Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h', 'd', 'w', 'y'.")
max_concurrent_spill: uint
max_attempts_spill: uint
num_partitions: uint
empty_fetch_pause_duration: duration # valid unit options are at https://pkg.go.dev/maze.io/x/duration#pkg-types ("Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h', 'd', 'w', 'y'.")
These options apply to each individual intake-head
pod.
Do not decrease the value of
num_partitions
Decreasing the value of
num_partitions
could result in data loss. See Risks and Limitations for more details.
Metrics
The following metrics have been added to support this feature:
hdx_sink_spill_raw_duration_ns # A histogram of the duration in nanoseconds taken to spill raw data buckets
hdx_sink_spill_raw_failure_count # Count of spill failures for raw data resulting in lost data
hdx_sink_spill_raw_accepted_count # Count of spilled raw data entries accepted into the indexing pipeline for processing
hdx_sink_spill_catalog_duration_ns # A histogram of the duration in nanoseconds taken to spill catalog adds
hdx_sink_spill_catalog_failure_count # Count of spill failures for catalog adds resulting in lost data
hdx_sink_spill_catalog_accepted_count # Count of spilled catalog entries accepted into the indexing pipeline for processing
hdx_sink_spill_catalog_race_lost_count # Count of lost races attempting to add spilled data entry to catalog
Risks and limitations
-
Temporarily missing or gaps in data
This can occur when data is currently spilled and has not yet been fetched and processed. Subsequent fetching and processing of spilled data is prioritized by most recent data first.
To determine if raw or catalog data buckets remain unprocessed in object storage, you can take the difference between the counter metric for number of things spilled and the counter metric for number of things fetched. A duration histogram metric is captured for each spill occurrence. For raw and catalog data they are:
-
hdx_sink_spill_raw_duration_ns
-
hdx_sink_spill_catalog_duration_ns
A histogram metric generates a corresponding counter metric in addition to the duration it captures. To access the counter, append
_count
to the original histogram metric name. The counts of raw and catalog data spilled can therefore be obtained from the metrics: -
hdx_sink_spill_raw_duration_ns_count
-
hdx_sink_spill_catalog_duration_ns_count
Making the difference between the counts of spilled data and fetched data the following:
-
Raw:
hdx_sink_spill_raw_duration_ns_count - hdx_sink_spill_raw_accepted_count
-
Catalog:
hdx_sink_spill_catalog_duration_ns_count - hdx_sink_spill_catalog_accepted_count
Count of fetched data may be higher than count of spilled data
The "accepted" count (number of fetched buckets) can be higher than the count of buckets spilled because there may be a race fetching a given spilled item which can result in it being fetched by more than one
intake-head
pod. However, this race will not result in duplicated data. Due to this scenario, a positive result from this calculation indicates at least one bucket of spilled data has not been fetched; however, a zero or negative result does not guarantee that all spilled data has been fetched and processed.Because spilled data is cleaned up after being fetched to continue processing, you can manually check for existing records in object storage within the appropriate file paths to determine if there is spilled data that has yet to backfill gaps in your queryable data.
-
Reducing the value for
num_partitions
can cause data lossDecreasing the value of
num_partitions
could result in data loss due to spilled data being written to and read from a numeric partitioned folder structure. For example, if the value of this field starts at10
, spilled data will be written to folders partitioned numerically from0
to9
. Ifnum_partitions
were to be subsequently reduced to9
, spilled data would be written to and read from partitions0
through8
. Unprocessed data may remain in partition folder9
unless and untilnum_partitions
was returned to the value of10
or higher. -
Increased CPU usage on
intake-head
pods during ingest spikesTarballing and gzipping large amounts of data may notably increase CPU usage.
-
Fetching does not respect worker pool isolation
If you have set up resource pools to keep ingest workloads isolated, enabling this feature means that raw or catalog data can be ingested by an
intake-head
from one pool, spilled to the shared object storage, then picked up and have its processing completed by anintake-head
from another worker pool. -
No multi-bucket targeting
Data can only be spilled to the default object store for the Hydrolix cluster. It is not currently possible to configure a secondary storage for this feature.
-
Wasted resources due to race conditions on fetch
It is possible that multiple
intake-head
pods will try to claim the same spilled data for processing. For catalog data, to ensure that data is written exactly once, anintake-head
will write to the locks table in the catalog database in Postgres using the unique ID for the spilled data bucket it is attempting to process as part of the primary key. Upon success, it will then write the generated partition to the catalog. Any subsequentintake-head
will encounter this lock and be unable to process the same spilled data bucket, ensuring that only one writer can win any potential race. The metric to measure how often this race condition occurs for catalog data ishdx_sink_spill_catalog_race_lost_count
. A corresponding metric does not exist for raw data.
Updated 8 days ago