Batch Ingest
Batch ingestion loads data from a storage bucket into a target table. We provide the following mechanisms:
- Batch Job API: A one-off task that loads files based on the job configuration and stops.
- Batch autoingest: A continuous task that uses a combination of table settings, cloud provider pub/sub, and cloud storage notifications to ingest files as they arrive into an origin storage bucket.
Batch ingest supports CSV and JSON data formats. Hydrolix requires read permissions to access external storage buckets.
The majority of the work is done by the batch-peer
and batch-indexer
containers in a pod called batch-peer
. By default, batch ingest processes one file at a time, with the replicas
tunable set to a value of 1
. Increase the value of the replicas
tunable to increase parallelism. The containers use a predefined resource profile, but you can override memory or storage settings if needed. For more details, see Scale your Kubernetes Cluster.
As of Hydrolix v4.14, if you have summary tables using data from the tables in which you load data, those summary tables will also be populated.
You can configure cloud storage to notify Hydrolix when new data is available for ingest. For more information, see the following pages:
Batch Job
A batch job is a task that runs once to load files from an S3-compatible object store into the Hydrolix cluster's object store.
Create a batch ingest job with the API
Prerequisite Steps
- Create a Project providing a name and description
- Create a Table providing a name and ingest settings
- If the storage bucket from which you're reading requires authentication, create a Credential providing access to a cloud storage bucket containing those source files.
- Create a Transform (Write Schema)
Once you've completed the prerequisites, Create a Batch Job. You must specify the following:
- the table where Hydrolix will store the data
- the transform Hydrolix should use to process the data
- the source where Hydrolix should fetch the data, including the bucket credential ID if needed
- the regex filter Hydrolix should use to limit ingestion to a subset of data (optional)
Note that the batch job format was updated in Hydrolix version 4.19.1. Versions of Hydrolix since v4.19.1 will not accept the old format.
AWS example
{
"type": "batch_import",
"name": "job_sample_data",
"description": "sample data on aws",
"settings": {
"max_active_partitions": 576,
"max_rows_per_partition": 33554432,
"max_minutes_per_partition": 20,
"source": {
"settings": {
"bucket_name": "mydatatoingest",
"bucket_path": "/mypath",
"region": "us-east-1",
"cloud": "aws",
"credential_id": "11111111-1111-1111-1111-111111111111"
},
"table": "sample.data",
"type": "batch",
"transform": "mytransform"
},
"regex_filter": "^s3://mydatatoingest/mypath/.*.gz"
}
}
Note that you can omit the credential ID above if the bucket doesn't require credentials.
GCP example
GCP/GKE Note;
Make sure to add the bucket permissions to your service account. For example:
gsutil iam ch serviceAccount:${GCP_STORAGE_SA}:roles/storage.objectAdmin gs://my-bucket
{
"type": "batch_import",
"name": "job_sample_data",
"description": "sample data on gcp",
"settings": {
"max_active_partitions": 576,
"max_rows_per_partition": 33554432,
"max_minutes_per_partition": 20,
"source": {
"settings": {
"bucket_name": "burninbucket",
"bucket_path": "/gcp-prod-test",
"region": "us-east1",
"cloud": "gcp",
"credential_id": "11111111-1111-1111-1111-111111111111"
},
"table": "sample.data",
"type": "batch",
"transform": "mytransform"
},
"regex_filter": "^gs://burninbucket/gcp-prod-test/.*.gz"
}
}
Linode example
Limitations
The Kubernetes cluster must use the same account as the Linode bucket. Linode storage does not support autoingest.
{
"type": "batch_import",
"name": "job_sample_data",
"description": "sample data on Linode",
"settings": {
"max_active_partitions": 576,
"max_rows_per_partition": 33554432,
"max_minutes_per_partition": 20,
"source": {
"settings": {
"bucket_name": "mydatatoingest",
"bucket_path": "/mypath",
"region": "us-southeast-1",
"cloud": "linode",
"credential_id": "11111111-1111-1111-1111-111111111111"
},
"table": "sample.data",
"type": "batch",
"transform": "mytransform"
},
"regex_filter": "^s3://mydatatoingest/mypath/.*.gz"
}
}
Batch autoingest
A batch autoingest job runs a recurring task which ingests data from an S3-compatible object store into the configured Hydrolix cluster when notified.
Register batch autoingest sources with the API
When creating or updating a table, you can optionally register an object store and notification queue. Hydrolix consumes data from the object store in batches into the table when a message is received in the notification queue.
To register a notification queue, set it as the source
when configuring autoingest on a table. Valid notification queues include:
To use only a subset of files from the origin object store, configure the pattern
setting. See Ingest File Paths for further explanation and examples for this setting.
The origin bucket and the notification queue must be within the same region.
Example: Single autoingest source and bucket
The following cURL request registers a table with a configured notification source in AWS Simple Queue Service. The pattern
setting indicates the originating S3 object store and the path to the files that should be ingested, using a regex pattern to match a subset of files. The source
setting determines the queue in which notifications are received. Notifications from the queue prompt the creation of batch jobs.
curl --request POST \
--url https://{hdx-hostname}/config/v1/orgs/{org_id}/projects/{project_id}/tables/ \
--header 'accept: application/json' \
--header 'content-type: application/json' \
--data '
{
"name": "autoingest_table",
"settings": {
"autoingest": [ {
"enabled": true,
"source": "sqs://queue-to-test-batch-autoingest",
"source_region": "us-west-2",
"pattern": "^s3://autoingest_bucket/path/to/logs/*.gz", // optional regex filter
"max_rows_per_partition": 12000000,
"max_minutes_per_partition": 60,
"max_active_partitions": 50,
}]
}
}
One region for the origin bucket and the source queue
Note that there is a single region specified for two entities: the SQS queue and the origin S3 bucket. Both entities must exist within the same region.
Example: Multiple autoingest sources and buckets ingested to the same table and transform
You can also register multiple autoingest sources and object stores for a table using the same transform. A GCP configuration sources its notifications from a PubSub topic, Azure sources notifications from a Service Bus, and AWS sources notifications from a Simple Queue Service. Each autoingest task continuously ingests data in batches into autoingest_table
using the same shared_transform
.
The following cURL request creates a table and registers three sources for notifications and object stores for ingestion: one each for GCP, Azure, and AWS.
curl --request POST \
--url https://{hdx-hostname}/config/v1/orgs/{org_id}/projects/{project_id}/tables/ \
--header 'accept: application/json' \
--header 'content-type: application/json' \
--data '
{
"name": "{table_name}",
"settings": {
...
"autoingest": [
{
"enabled": true,
"source": "pubsub://{project_id}/{subscription_id}",
"source_region": "{region}",
"pattern": "^gs://{source_bucket}/path/to/data/.*.json",
"max_rows_per_partition": 12288000,
"max_minutes_per_partition": 60,
"max_active_partitions": 50,
"dry_run": false,
"name": "gcp",
"transform": "shared_transform",
"source_credential_id": "{gcp_source_topic_credential}",
"bucket_credential_id": "{gcp_destination_bucket_credential}"
},
{
"enabled": true,
"source": "azservicebus://{servicebus_host}/{path}",
"source_region": "{region}",
"pattern": "^https://{source_bucket}/path/to/data/.*.json",
"max_rows_per_partition": 12288000,
"max_minutes_per_partition": 60,
"max_active_partitions": 50,
"dry_run": false,
"name": "azure",
"transform": "shared_transform",
"source_credential_id": "{azure_source_topic_credential}",
"bucket_credential_id": "{azure_destination_bucket_credential}"
},
{
"enabled": true,
"source": "sqs://{notification_queue}",
"source_region": "{region}",
"pattern": "s3://{source_bucket}/path/to/data/.*.json",
"max_rows_per_partition": 12288000,
"max_minutes_per_partition": 60,
"max_active_partitions": 50,
"dry_run": false,
"name": "aws",
"transform": "shared_transform",
"source_credential_id": "{aws_source_topic_credential}",
"bucket_credential_id": "{aws_destination_bucket_credential}"
}
]
}
}
'
Job attributes
A job describes how to treat the data set as a whole as it's being ingested.
The top level attributes of a batch job are:
Element | Purpose |
---|---|
name | A unique name for this job in this organization. |
description | An optional description. |
type | Only accepts the value batch_import . |
settings | The settings to use for this particular ingestion job. |
Ingest file paths
Hydrolix batch jobs can use varying file and path structures to determine which data to load. When creating a batch job through API, configure the file and path structures by combining the parameters bucket_name
, bucket_path
, and regex_filter
.
When creating a batch autoingest job, configure the file and path structures using the pattern
parameter. Specify a single file, a directory of files, or a directory of files with a regex filter.
For example:
- A single file i.e.
"s3://mybucket/another/file.gz"
- All files in a single bucket i.e.
"s3://mybucket/another/"
- All files matching regex pattern
"^s3://mybucket/.*/.*.gz"
The settings
object
settings
objectSome data sets consist of many small files, other data sets consist of fewer larger files. Hydrolix ultimately writes data into "partitions". The number and size of partitions influences the performance of your queries.
What is best for each data set is an "it depends" answer, however, consider:
- Partitions are a single unit to be processed. This means that queries of large partitions cannot be parallelized as much as smaller partitions.
- Smaller partitions mean more parallelization, but also mean less efficient use of resources.
Example Settings:
{ ...
"settings": {
"max_active_partitions": 576,
"max_rows_per_partition": 10000000,
"max_minutes_per_partition": 14400,
"input_concurrency": 1,
"input_aggregation": 1536000000,
"max_files": 0,
"dry_run": false,
"source": {
...
}
}
}
The following are the default settings. We would suggest starting with the defaults and then tuning.
Setting | Description | Example |
---|---|---|
max_minutes_per_partition | The maximum number of minutes to hold in a partition. For dense data sets, five minutes of data may be massive. In other data sets, 2 weeks of data may be required for the same volume. The velocity of your data will influence this value. | 15 |
max_active_partitions | Maximum number of active partitions. | 576 |
max_rows_per_partition | Based on the width of your data, you can control total the data size of the partition with max_rows_per_partition . | 64000 |
max_files | Number of files to dispatch to peers. Limiting is typically only used for testing. In general this should not be set so that the entire bucket is procesed | 0 (disabled) |
input_concurrency | Input Concurrency restricts the number of batch peer processes which are run on a single instance. . | 1 (This should be kept at 1. If you wish to change this please contact Hydrolix) |
input_aggregation | Controls how much data should be considered a single unit of work, which ultimately drives the size of the partition. Files larger than the input_aggregation will be processed as a single unit of work. | 1536000000 |
dry_run | Whether or not the job is a dry run. If true, all indexing work will be done but no results will be uploaded. Resulting HDX partitions are effectively thrown away. |
A note on Ingest Parallelization
Batch ingest is performed on compute instances. Batch performance can be improved by:
- Increasing your
batch-peer
replica count - Vertically scaling your
batch-peer
instances - Increasing the number of
max_active_partitions
. This configuration sets a limit on the maximum number of partitions that a batch peer can process simultaneously.
Regex Filter
If data is stored in a complex bucket structure on cloud storage and cannot be expressed with a simple S3 path, regex_filter
allows you to express the structure pattern to search.
Given the following example s3 source path
s3://mybucket/level1/2020/01/app1/pattern_xyz.gz
Possible regex_filter
patterns could be:
^.*\\.gz$
^s3://mybucket/level1/2020/\\d{2}/app1/.*.gz
^.*/level1/2020/\\d{2}/app1/.*.gz
^.*/level1/2020/\\d{2}/.*/pattern_\\w{3}.gz
^.*/level1/2020/\\d{2}/.*/pattern_.*.gz
Element | Description |
---|---|
regex_filter | Filters the files to ingest using a Regex match. Note that a literal backwards slash '\' needs to be escaped within the regex string. For example, on AWS, the pattern starts from s3:// . |
The source
element
source
elementThe source element specifies information about the data itself, where it is, and how it should be treated.
Example source:
{
...
"source": {
"table": "sample_project.sample_table",
"type": "batch",
"transform": "mytransform",
"settings": {
"bucket_name": "string",
"bucket_path": "/",
"region": "string",
"endpoint": "string",
"cloud": "string",
"credential_id": "11111111-1111-1111-1111-111111111111"
}
}
...
}
Element | Optional | Purpose | Example |
---|---|---|---|
table | No | The Hydrolix table where the data should be stored. The format is <project_name>.<table_name> .. | myproject.mytable |
type | No | Only accepts the value batch . | batch |
transform | No | The name of a registered transform that is compatible with the data to be ingested. | mytransform |
settings.bucket_name | No | The name of the cloud storage bucket that contains the source files. | mybucket |
settings.bucket_path | Yes | The path inside the bucket that contains the source files. | /mypath |
settings.region | Yes | The cloud provider's region where the bucket is stored.. | us-east-1 |
settings.endpoint | Yes | The cloud provider's specific endpoint URL. This is useful if you're providing your own storage, such as Minio. | https://storage.us-east1.rep.googleapis.com |
settings.cloud | No | The name of the cloud provider. | aws , gcp ,linode , or azure |
settings.credential_id | Yes | The UUID of the credentials needed to access the storage bucket. | 3fa85f64-5717-4562-b3fc-2c963f66afa6 |
Cancel Jobs
Use the cancel jobs endpoint to cancel the batch ingest job and tasks associated with the job ID. The cancellation will be reflected in the status output /v1/orgs/{org_id}/jobs/batch/{job_id}/cancel.
Jobs Status
Get the status of a job and its tasks. This endpoint is suitable for polling for job completion /v1/orgs/{org_id}/jobs/batch/{job_id}/status.
Job Response codes
Code | Description |
---|---|
201 | Created |
404 | Job not found |
405 | Request was not a GET |
500 | Internal error – check your Hydrolix instance’s logs or contact your support representative |
Response body on success
{
"status": "RUNNING",
"status_detail": {
"tasks": {
"INDEX": {
"READY": 5
},
"LIST": {
"DONE": 1
}
},
"percent_complete": 0.16666667,
"estimated": false
}
}
Key | Description | Optional |
---|---|---|
.status | Status of the job. One of READY ,RUNNING ,DONE , or CANCELED . | No |
.status_detail | In-depth task status information if task exists. | Yes |
.status_detail.tasks | Aggregations of task types and states. | No |
.status_detail.percent_complete | Job progress percentage as a float. | No |
.status_detail.estimated | Whether or not the progress is estimated. Once all listing tasks are complete progress percentage is no longer estimated. | No |
Updated about 13 hours ago