Batch Ingest

Batch ingestion loads data from a storage bucket into a target table. We provide the following mechanisms:

  • Batch Job API. A one-off task that will load one or more files based on the job configuration and then stop.
  • Batch auto-ingest. A continuous task of ingesting new files arriving in a storage bucket, through a combination of table settings, cloud provider pub/sub and cloud storage notification mechanism.

Batch ingest supports CSV and JSON data formats. Hydrolix requires read permissions to access external storage buckets.

The majority of the work is done by the batch-peer and batch-indexer containers in a pod called batch-peer. By default, batch ingest processes one file at a time, with the replicas tunable set to a value of 1. Increase the value of the replicas tunable to increase parallelism. The containers use a predefined resource profile, but you can override memory or storage settings if needed. For more details, see Scale your Kubernetes Cluster.

As of Hydrolix v4.14, if you have summary tables using data from the tables in which you load data, those summary tables will also be populated.

You can configure cloud storage to notify Hydrolix when new data is available for ingest. For more information, see the following pages:

Create a Batch Ingest Job via the API

👍

Prerequisite Steps

Once you've completed the prerequisites, Create a Batch Job. You must specify the following:

  • the table where Hydrolix will store the data
  • the transform Hydrolix should use to process the data
  • the source where Hydrolix should fetch the data, including the bucket credential ID if needed
  • the regex filter Hydrolix should use to limit ingestion to a subset of data (optional)

Note that the batch job format was updated in Hydrolix version 4.19.1. Versions of Hydrolix since v4.19.1 will not accept the old format.

AWS Example

{
    "type": "batch_import",
    "name": "job_sample_data",
    "description": "sample data on aws",
    "settings": {
        "max_active_partitions": 576,
        "max_rows_per_partition": 33554432,
        "max_minutes_per_partition": 20,
        "source": {
            "settings": {
                "bucket_name": "mydatatoingest",  
                "bucket_path": "/mypath",  
                "region": "us-east-1",  
                "cloud": "aws",  
                "credential_id": "11111111-1111-1111-1111-111111111111"
            },
            "table": "sample.data",
            "type": "batch",
            "transform": "mytransform"
        },
        "regex_filter": "^s3://mydatatoingest/mypath/.*.gz"
    }
}

Note that you can omit the credential ID above if the bucket you're using doesn't require credentials.

GCP Example

🚧

GCP/GKE Note;

Make sure to add the bucket permissions to your service account. For example:

gsutil iam ch serviceAccount:${GCP_STORAGE_SA}:roles/storage.objectAdmin gs://my-bucket

{
    "type": "batch_import",
    "name": "job_sample_data",
    "description": "sample data on gcp",
    "settings": {
        "max_active_partitions": 576,
        "max_rows_per_partition": 33554432,
        "max_minutes_per_partition": 20,
        "source": {
            "settings": {
                "bucket_name": "burninbucket",  
                "bucket_path": "/gcp-prod-test",  
                "region": "us-east1",  
                "cloud": "gcp",  
                "credential_id": "11111111-1111-1111-1111-111111111111"
            },
            "table": "sample.data",
            "type": "batch",
            "transform": "mytransform"
        },
        "regex_filter": "^gs://burninbucket/gcp-prod-test/.*.gz"
    }
}

Linode Example

🚧

Limitations

The k8s cluster must use the same account as the Linode bucket. Linode storage does not support auto-ingest.

{
    "type": "batch_import",
    "name": "job_sample_data",
    "description": "sample data on Linode",
    "settings": {
        "max_active_partitions": 576,
        "max_rows_per_partition": 33554432,
        "max_minutes_per_partition": 20,
        "source": {
            "settings": {
                "bucket_name": "mydatatoingest",  
                "bucket_path": "/mypath",  
                "region": "us-southeast-1",  
                "cloud": "linode",  
                "credential_id": "11111111-1111-1111-1111-111111111111"
            },
            "table": "sample.data",
            "type": "batch",
            "transform": "mytransform"
        },
        "regex_filter": "^s3://mydatatoingest/mypath/.*.gz"
    }
}

Job Attributes

A job describes how to treat the data set as a whole as it is being ingested. Hydrolix batch jobs can employ varying file and path structures to load data. A single file, a directory of files, and a directory of files with a filter can all be applied.

For example:

  • A single file i.e. "s3://mybucket/another/file.gz"
  • All files in a single bucket i.e. "s3://mybucket/another/"
  • All files matching regex pattern i.e. "s3://mybucket/" along with "settings.regex_filter": "^s3://mybucket/.*/.*.gz"
ElementPurpose
nameA unique name for this job in this organization.
descriptionAn optional description.
typeOnly accepts the value batch_import.
settingsThe settings to use for this particular ingestion job.

The settings object

Some data sets consist of many small files, other data sets consist of fewer larger files. Hydrolix ultimately writes data into "partitions". The number and size of partitions influences the performance of your queries.

What is best for each data set is an "it depends" answer, however, consider:

  1. Partitions are a single unit to be processed. This means that queries of large partitions cannot be parallelized as much as smaller partitions.
  2. Smaller partitions mean more parallelization, but also mean less efficient use of resources.

Example Settings:

{ ...
"settings": {
  	"max_active_partitions": 576,
		"max_rows_per_partition": 10000000,
		"max_minutes_per_partition": 14400,
    "input_concurrency": 1,
    "input_aggregation": 1536000000,
    "max_files": 0,
    "dry_run": false,
		"source": {
			...
		}
	}
}

The following are the default settings. We would suggest starting with the defaults and then tuning.

SettingDescriptionExample
max_minutes_per_partitionThe maximum number of minutes to hold in a partition. For dense data sets, five minutes of data may be massive. In other data sets, 2 weeks of data may be required for the same volume. The velocity of your data will influence this value.15
max_active_partitionsMaximum number of active partitions.576
max_rows_per_partitionBased on the width of your data, you can control total the data size of the partition with max_rows_per_partition.64000
max_filesNumber of files to dispatch to peers. Limiting is typically only used for testing. In general this should not be set so that the entire bucket is procesed0 (disabled)
input_concurrencyInput Concurrency restricts the number of batch peer processes which are run on a single instance. .1
(This should be kept at 1. If you wish to change this please contact Hydrolix)
input_aggregationControls how much data should be considered a single unit of work, which ultimately drives the size of the partition. Files larger than the input_aggregation will be processed as a single unit of work.1536000000
dry_runWhether or not the job is a dry run. If true, all indexing work will be done but no results will be uploaded. Resulting HDX partitions are effectively thrown away.

A note on Ingest Parallelization

Batch ingest is performed on compute instances. Batch performance can be improved by:

  1. Increasing your batch-peer replica count
  2. Vertically scaling your batch-peer instances
  3. Increasing the number of max_active_partitions. This configuration sets a limit on the maximum number of partitions that a batch peer can process simultaneously.

Regex Filter

If data is stored in a complex bucket structure on cloud storage and cannot be expressed with a simple S3 path, regex_filter allows you to express the structure pattern to search.

Given the following example s3 source path
s3://mybucket/level1/2020/01/app1/pattern_xyz.gz

Possible regex_filter patterns could be:

  • ^.*\\.gz$
  • ^s3://mybucket/level1/2020/\\d{2}/app1/.*.gz
  • ^.*/level1/2020/\\d{2}/app1/.*.gz
  • ^.*/level1/2020/\\d{2}/.*/pattern_\\w{3}.gz
  • ^.*/level1/2020/\\d{2}/.*/pattern_.*.gz
ElementDescription
regex_filterFilters the files to ingest using a Regex match. Note that a literal backwards slash '\' needs to be escaped within the regex string. For example, on AWS, the pattern starts from s3://.

The source element

The source element specifies information about the data itself, where it is, and how it should be treated.

Example source:

{
    ...
        "source": {
          "table": "sample_project.sample_table",
          "type": "batch",
          "transform": "mytransform",
          "settings": {
            "bucket_name": "string",
            "bucket_path": "/",
            "region": "string",
            "endpoint": "string",
            "cloud": "string",
            "credential_id": "11111111-1111-1111-1111-111111111111"
					}
			}
    ...
}
ElementOptionalPurposeExample
tableNoThe Hydrolix table where the data should be stored. The format is <project_name>.<table_name>..myproject.mytable
typeNoOnly accepts the value batch.batch
transformNoThe name of a registered transform that is compatible with the data to be ingested.mytransform
settings.bucket_nameNoThe name of the cloud storage bucket that contains the source files.mybucket
settings.bucket_pathYesThe path inside the bucket that contains the source files./mypath
settings.regionYesThe cloud provider's region where the bucket is stored..us-east-1
settings.endpointYesThe cloud provider's specific endpoint URL. This is useful if you're providing your own storage, such as Minio.https://storage.us-east1.rep.googleapis.com
settings.cloudNoThe name of the cloud provider.aws, gcp,linode, or azure
settings.credential_idYesThe UUID of the credentials needed to access the storage bucket.3fa85f64-5717-4562-b3fc-2c963f66afa6

Cancel Jobs

Use the cancel jobs endpoint to cancel the batch ingest job and tasks associated with the job ID. The cancellation will be reflected in the status output /v1/orgs/{org_id}/jobs/batch/{job_id}/cancel.

Jobs Status

Get the status of a job and its tasks. This endpoint is suitable for polling for job completion /v1/orgs/{org_id}/jobs/batch/{job_id}/status.

Job Response codes

CodeDescription
201Created
404Job not found
405Request was not a GET
500Internal error – check your Hydrolix instance’s logs or contact your support representative

Response body on success

{
  "status": "RUNNING",
  "status_detail": {
    "tasks": {
      "INDEX": {
        "READY": 5
      },
      "LIST": {
        "DONE": 1
      }
    },
    "percent_complete": 0.16666667,
    "estimated": false
  }
}
KeyDescriptionOptional
.statusStatus of the job. One of READY,RUNNING,DONE, or CANCELED.No
.status_detailIn-depth task status information if task exists.Yes
.status_detail.tasksAggregations of task types and states.No
.status_detail.percent_completeJob progress percentage as a float.No
.status_detail.estimatedWhether or not the progress is estimated. Once all listing tasks are complete progress percentage is no longer estimated.No