Batch Ingest

Batch ingestion loads data from a storage bucket into a target table using one of the following mechanisms:

  • Batch job API: A one-off task that loads files based on the job configuration and stops.
  • Batch autoingest: A continuous task that uses a combination of table settings, cloud provider pub/sub, and cloud storage notifications to ingest files as they arrive into an origin storage bucket.

Batch ingest supports CSV and JSON data formats. Hydrolix requires read permissions to access external storage buckets.

The batch-peer and batch-indexer containers do the majority of hte work in a pod called batch-peer. By default, batch ingest processes one file at a time, with the replicas tunable set to a value of 1. Increase the value of the replicas tunable to increase parallelism. The containers use a predefined resource profile, but you can override memory or storage settings if needed. For more details, see Scale your Kubernetes Cluster.

As of Hydrolix v4.14, tables populated by batch ingest will also populate any summary tables for which they're a data source.

You can configure cloud storage to notify Hydrolix when new data is available for ingest. For more information, see the following pages:

Batch job

A batch job is a task that runs once to load files from an S3-compatible object store into the Hydrolix cluster's object store.

Create a batch ingest job with the API

👍

Prerequisite Steps

Once you've completed the prerequisites, Create a Batch Job. You must specify the following:

  • the table where Hydrolix will store the data
  • the transform Hydrolix should use to process the data
  • the source where Hydrolix should fetch the data, including the bucket credential ID if needed
  • the regex filter Hydrolix should use to limit ingestion to a subset of data (optional)

Note that Hydrolix version 4.19.1 updated the batch job format. Hydrolix v4.19.1 and later won't accept the old format.

AWS example

{
    "type": "batch_import",
    "name": "job_sample_data",
    "description": "sample data on aws",
    "settings": {
        "max_active_partitions": 576,
        "max_rows_per_partition": 33554432,
        "max_minutes_per_partition": 20,
        "source": {
            "settings": {
                "bucket_name": "mydatatoingest",  
                "bucket_path": "/mypath",  
                "region": "us-east-1",  
                "cloud": "aws",  
                "credential_id": "11111111-1111-1111-1111-111111111111"
            },
            "table": "sample.data",
            "type": "batch",
            "transform": "mytransform"
        },
        "regex_filter": "^s3://mydatatoingest/mypath/.*.gz"
    }
}

Note that you can omit the credential ID above if the bucket doesn't require credentials.

GCP example

🚧

GCP/GKE Note;

Make sure to add the bucket permissions to your service account. For example:

gsutil iam ch serviceAccount:${GCP_STORAGE_SA}:roles/storage.objectAdmin gs://my-bucket

{
    "type": "batch_import",
    "name": "job_sample_data",
    "description": "sample data on gcp",
    "settings": {
        "max_active_partitions": 576,
        "max_rows_per_partition": 33554432,
        "max_minutes_per_partition": 20,
        "source": {
            "settings": {
                "bucket_name": "burninbucket",  
                "bucket_path": "/gcp-prod-test",  
                "region": "us-east1",  
                "cloud": "gcp",  
                "credential_id": "11111111-1111-1111-1111-111111111111"
            },
            "table": "sample.data",
            "type": "batch",
            "transform": "mytransform"
        },
        "regex_filter": "^gs://burninbucket/gcp-prod-test/.*.gz"
    }
}

Linode example

🚧

Limitations

The Kubernetes cluster must use the same account as the Linode bucket. Linode storage doesn't support autoingest.

{
    "type": "batch_import",
    "name": "job_sample_data",
    "description": "sample data on Linode",
    "settings": {
        "max_active_partitions": 576,
        "max_rows_per_partition": 33554432,
        "max_minutes_per_partition": 20,
        "source": {
            "settings": {
                "bucket_name": "mydatatoingest",  
                "bucket_path": "/mypath",  
                "region": "us-southeast-1",  
                "cloud": "linode",  
                "credential_id": "11111111-1111-1111-1111-111111111111"
            },
            "table": "sample.data",
            "type": "batch",
            "transform": "mytransform"
        },
        "regex_filter": "^s3://mydatatoingest/mypath/.*.gz"
    }
}

Batch autoingest

A batch autoingest job runs a recurring task which ingests data from an S3-compatible object store into the configured Hydrolix cluster when notified.

Register batch autoingest sources with the API

When creating or updating a table, you can optionally register an object store and notification queue. When the notification queue receives a message, Hydrolix consumes data from the object store in batches into the table

To register a notification queue, set it as the source when configuring autoingest on a table. Valid notification queues include:

To use only a subset of files from the origin object store, configure the pattern setting. See Ingest File Paths for further explanation and examples for this setting.

The origin bucket and the notification queue must be within the same region.

Example: Configure single autoingest source in Azure

In the following example, there is an existing Azure blob storage that periodically receives new data. A user wants to configure Hydrolix to consume from the blob storage as the storage receives new data. To do so, the user should do the following:

  1. Configure the Azure blob storage account to send events to an Azure Service Bus queue as new data arrives in the blob storage.
  2. Configure a Hydrolix table with autoingest which will ingest data from the blob storage after the autoingest service reads events from the queue.

Configure Azure

  1. Create a storage account in Azure
  2. Create an Azure Service Bus queue according to the directions described here
  3. Go to the Azure storage account and select “Events” in the sidebar.
  4. Choose the “More Options” tab and then select the “Service Bus Queue” option.
  1. Configure the blob storage to send events to the Service Bus queue from step 2 using these instructions. Options for doing so include via the UI, Azure CLI, or Azure PowerShell.

Register a Hydrolix table with an autoingest source

The following cURL request registers a table with a configured notification source in an Azure Service Bus Queue. The pattern setting indicates the originating Azure blob storage and the path to the files to ingest, using a regex pattern to match a subset of files. The source setting determines the queue receives notifications. Notifications from the queue prompt the creation of batch jobs.

curl --request POST \
     --url https://{hdx-hostname}/config/v1/orgs/{org_id}/projects/{project_id}/tables/ \
     --header 'accept: application/json' \
     --header 'content-type: application/json' \
     --data '
{
  "name": "autoingest_table",
  "settings": {
    "autoingest": [ {
        "enabled": true,
        "source": "azservicebus://autoingest-test.servicebus.windows.net/test-queue",
        "source_region": "westus2",
        "pattern": "^https://autoingesttestbucket.blob.core.windows.net/data/.*.json",
        "max_rows_per_partition": 12288000,
        "max_minutes_per_partition": 60,
        "max_active_partitions": 50,
        "dry_run": false,
        "name": "azure",
        "transform": "shared_transform",
        "source_credential_id": "cred1",
        "bucket_credential_id": "cred2"
    }
  }]
}

📘

One region for the origin bucket and the source queue

Note that there is a single region specified for two entities: the SQS queue and the origin S3 bucket. Both entities must exist within the same region.

Example: Register multiple autoingest sources and buckets for the same table and transform

You can also register multiple autoingest sources and object stores for a table using the same transform. A GCP configuration sources its notifications from a PubSub topic, Azure sources notifications from a Service Bus, and AWS sources notifications from a Simple Queue Service. Each autoingest task continuously ingests data in batches into autoingest_table using the same shared_transform.

The following cURL request creates a table and registers three sources for notifications and object stores for ingestion: one each for GCP, Azure, and AWS.

curl --request POST \
     --url https://{hdx-hostname}/config/v1/orgs/{org_id}/projects/{project_id}/tables/ \
     --header 'accept: application/json' \
     --header 'content-type: application/json' \
     --data '
{
  "name": "{table_name}",
  "settings": {
    ...
    "autoingest": [
      {
        "enabled": true,
        "source": "pubsub://{project_id}/{subscription_id}",
        "source_region": "{region}",
        "pattern": "^gs://{source_bucket}/path/to/data/.*.json",
        "max_rows_per_partition": 12288000,
        "max_minutes_per_partition": 60,
        "max_active_partitions": 50,
        "dry_run": false,
        "name": "gcp",
        "transform": "shared_transform",
        "source_credential_id": "{gcp_source_topic_credential}",
        "bucket_credential_id": "{gcp_destination_bucket_credential}"
      },
      {
        "enabled": true,
        "source": "azservicebus://{servicebus_host}/{path}",
        "source_region": "{region}",
        "pattern": "^https://{source_bucket}/path/to/data/.*.json",
        "max_rows_per_partition": 12288000,
        "max_minutes_per_partition": 60,
        "max_active_partitions": 50,
        "dry_run": false,
        "name": "azure",
        "transform": "shared_transform",
        "source_credential_id": "{azure_source_topic_credential}",
        "bucket_credential_id": "{azure_destination_bucket_credential}"
      },
      {
        "enabled": true,
        "source": "sqs://{notification_queue}",
        "source_region": "{region}",
        "pattern": "s3://{source_bucket}/path/to/data/.*.json",
        "max_rows_per_partition": 12288000,
        "max_minutes_per_partition": 60,
        "max_active_partitions": 50,
        "dry_run": false,
        "name": "aws",
        "transform": "shared_transform",
        "source_credential_id": "{aws_source_topic_credential}",
        "bucket_credential_id": "{aws_destination_bucket_credential}"
      }
    ]
  }
}
'

Job attributes

A job describes how to treat the data set as a whole as it's ingested.

The top level attributes of a batch job are:

ElementPurpose
nameA unique name for this job in this organization.
descriptionAn optional description.
typeOnly accepts the value batch_import.
settingsThe settings to use for this particular ingestion job.

Ingest file paths

Hydrolix batch jobs can use varying file and path structures to determine which data to load. When creating a batch job through API, configure the file and path structures by combining the parameters bucket_name, bucket_path, and regex_filter.

When creating a batch autoingest job, configure the file and path structures using the pattern parameter. Specify a single file, a directory of files, or a directory of files with a regex filter.

For example:

  • A single file, for example: "s3://mybucket/another/file.gz"
  • All files in a single bucket, for example: "s3://mybucket/another/"
  • All files matching regex pattern "^s3://mybucket/.*/.*.gz"

The settings object

Some data sets consist of many small files, other data sets consist of fewer larger files. Hydrolix writes data into "partitions." The number and size of partitions influences the performance of your queries.

What's best for each data set is an "it depends" answer, however, consider:

  1. Partitions are a single unit to process. This means that Hydrolix can't parallelize queries of large partitions as effectively as smaller partitions.
  2. Smaller partitions mean more parallelization, but also mean less efficient use of resources.

Example Settings:

{ ...
"settings": {
  	"max_active_partitions": 576,
		"max_rows_per_partition": 10000000,
		"max_minutes_per_partition": 14400,
    "input_concurrency": 1,
    "input_aggregation": 1536000000,
    "max_files": 0,
    "dry_run": false,
		"source": {
			...
		}
	}
}

The following are the default settings. It's recommended to start with the defaults and then tune.

SettingDescriptionExample
max_minutes_per_partitionThe maximum number of minutes to hold in a partition. For dense data sets, five minutes of data may be massive. In other data sets, 2 weeks of data may be a requirement for the same volume. The velocity of your data will influence this value.15
max_active_partitionsMaximum number of active partitions.576
max_rows_per_partitionBased on the width of your data, you can control total the data size of the partition with max_rows_per_partition.64000
max_filesNumber of files to dispatch to peers. Limit the use of this setting to test environments as this setting may prevent Hydrolix from processing the entire bucket.0 (disabled)
input_concurrencyInput Concurrency restricts the number of batch peer processes to run on a single instance.1 (This should remain at 1. If you wish to change this please contact Hydrolix)
input_aggregationControls how much data constitutes a single unit of work which drives the size of the partition. Hydrolix processes files larger than input_aggregation as a single unit of work.1536000000
dry_runWhether the job is a dry run. If true, Hydrolix will perform indexing but won't upload results. Hydrolix throws away the resulting partitions.

A note on ingest scaling

Compute instances run batch ingest jobs. Batch performance improves with:

  1. Increasing your batch-peer replica count
  2. Vertically scaling your batch-peer instances
  3. Increasing the number of max_active_partitions. This configuration sets a limit on the maximum number of partitions that a batch peer can process simultaneously.

Regex filter

If the data exists in a complex bucket structure on cloud storage or exists in multiple S3 paths, regex_filter allows you to express the structure pattern to search.

Given the following example s3 source path
s3://mybucket/level1/2020/01/app1/pattern_xyz.gz

Possible regex_filter patterns could be:

  • ^.*\\.gz$
  • ^s3://mybucket/level1/2020/\\d{2}/app1/.*.gz
  • ^.*/level1/2020/\\d{2}/app1/.*.gz
  • ^.*/level1/2020/\\d{2}/.*/pattern_\\w{3}.gz
  • ^.*/level1/2020/\\d{2}/.*/pattern_.*.gz
ElementDescription
regex_filterFilters the files to ingest using a Regex match. Note that the regex string must escape a literal backwards slash with '\'. For example, on AWS, the pattern starts from s3://.

The source element

The source element specifies information about the data itself, where it exists, and how to structure it in Hydrolix storage.

Example source:

{
    ...
        "source": {
          "table": "sample_project.sample_table",
          "type": "batch",
          "transform": "mytransform",
          "settings": {
            "bucket_name": "string",
            "bucket_path": "/",
            "region": "string",
            "endpoint": "string",
            "cloud": "string",
            "credential_id": "11111111-1111-1111-1111-111111111111"
					}
			}
    ...
}
ElementOptionalPurposeExample
tableNoThe Hydrolix table which stores the data. The format is <project_name>.<table_name>..myproject.mytable
typeNoOnly accepts the value batch.batch
transformNoThe name of a registered transform that's compatible with the data to be ingested.mytransform
settings.bucket_nameNoThe name of the cloud storage bucket that contains the source files.mybucket
settings.bucket_pathYesThe path inside the bucket that contains the source files./mypath
settings.regionYesThe cloud provider's region containing the bucket.us-east-1
settings.endpointYesThe cloud provider's specific endpoint URL. This is useful if you're providing your own storage, such as Minio.https://storage.us-east1.rep.googleapis.com
settings.cloudNoThe name of the cloud provider.aws, gcp,linode, or azure
settings.credential_idYesThe UUID of the credentials needed to access the storage bucket.3fa85f64-5717-4562-b3fc-2c963f66afa6

Cancel jobs

Use the cancel jobs endpoint to cancel the batch ingest job and tasks associated with the job ID. The status output /v1/orgs/{org_id}/jobs/batch/{job_id}/cancel will show the cancellation.

Jobs status

Get the status of a job and its tasks. This endpoint is suitable for polling for job completion /v1/orgs/{org_id}/jobs/batch/{job_id}/status.

Job response codes

CodeDescription
201Created
404Job not found
405Request wasn't a GET
500Internal error–check your Hydrolix instance’s logs or contact your support representative

Response body on success

{
  "status": "RUNNING",
  "status_detail": {
    "tasks": {
      "INDEX": {
        "READY": 5
      },
      "LIST": {
        "DONE": 1
      }
    },
    "percent_complete": 0.16666667,
    "estimated": false
  }
}
KeyDescriptionOptional
.statusStatus of the job. One of READY,RUNNING,DONE, or CANCELED.No
.status_detailIn-depth task status information if task exists.Yes
.status_detail.tasksAggregations of task types and states.No
.status_detail.percent_completeJob progress percentage as a float.No
.status_detail.estimatedIndicates if the progress is an estimate. Once all listing tasks are complete progress percentage is no longer estimated.No