Batch Ingest

A batch ingestion job enables you to ingest one or more files from your cloud storage. Hydrolix accepts CSV or JSON formats and a variety of compression codecs. Additional filters using regex can be applied to pinpoint only the files needed.

In addition there is an automatic notification mechanism that can be employed that can be used in combination with AWS Notifications or GCP Pub/Sub notifications for cloud. More information about this mechanism can be found below.

Creating a Batch Ingest Job via API

👍

Prerequisite Steps

Once the above is complete, the next step is to Create a Batch Job providing the table, transform, url and regex_filter (optional)

A batch Job runs on the batch-peer service (default scale 1). If you need to ingest faster, consider increasing the number of peers - Scaling your Kubernetes Cluster / Scaling your AWS Cluster.

Example Job AWS

🚧

AWS Note:

Make sure the S3 bucket has been added to --bucket-allowlist using the HDXCTL Tool by your Hydrolix administrator.

{
    "type": "batch_import",
    "name": "job_sample_data",
    "description": "sample data on aws",
    "settings": {
        "max_active_partitions": 576,
        "max_rows_per_partition": 33554432,
        "max_minutes_per_partition": 20,
        "source": {
            "settings": {
                "url": "s3://mydatatoingest/mypath/"
            },
            "table": "sample.data",
            "type": "batch",
            "subtype": "aws s3",
            "transform": "mytransform"
        },
        "regex_filter": "^s3://mydatatoingest/mypath/.*.gz"
    }
}

Example Job GCP

🚧

GCP/GKE Note;

Make sure to add the bucket permissions to your service account. For example:

gsutil iam ch serviceAccount:${GCP_STORAGE_SA}:roles/storage.objectAdmin gs://my bucket

{
    "type": "batch_import",
    "name": "job_sample_data",
    "description": "sample data on gcp",
    "settings": {
        "max_active_partitions": 576,
        "max_rows_per_partition": 33554432,
        "max_minutes_per_partition": 20,
        "source": {
            "settings": {
                "url": "gs://mydatatoingest/mypath/"
            },
            "table": "sample.data",
            "type": "batch",
            "subtype": "gcp gs",
            "transform": "mytransform"
        },
        "regex_filter": "^gs://burninbucket/gcp-prod-test/.*.gz"
    }
}

Job Attributes

A job describes how to treat the data set as a whole as it is being ingested. Hydrolix batch jobs can employ varying file and path structures to load data. A single file, directory of files and a directory of files with a filter can all be applied.

For example:

  • A single file i.e. "s3://mybucket/another/file.gz"
  • All files in a single bucket i.e. "s3://mybucket/another/"
  • All files matching regex pattern i.e. "s3://mybucket/" along with "settings.regex_filter": "^s3://mybucket/.*/.*.gz"

Element

Purpose

name

A unique name for this job in this organization.

description

An optional description.

type

Only accepts the value batch_import.

settings

The settings to use for this particular ingestion job.

The settings object

Some data sets consist of many small files, other data sets consist of fewer larger files. Hydrolix ultimately writes data into "partitions". The number and size of partitions influences performance of query.

What is best for each data set is an "it depends" answer, however, consider:

  1. Partitions are a single unit to be processed. This means that queries of large partitions cannot be parallelized as much as smaller partitions.
  2. Smaller partitions mean more parallelization, but also mean less efficient use of resources.

Example Settings:

{ ...
"settings": {
    "max_active_partitions": 576,
        "max_rows_per_partition": 10000000,
        "max_minutes_per_partition": 14400,
    "input_concurrency": 1,
    "input_aggregation": 1536000000,
    "max_files": 0,
    "dry_run": false,
        "source": {
            ...
        }
    }
}

The following are the default settings. We would suggest starting with the defaults and then tuning.

Setting

Description

Example

max_minutes_per_partition

The maximum number of minutes to hold in a partition. For dense data sets, five minutes of data may be massive. In other data sets, 2 weeks of data may be required for the same volume. The velocity of your data will influence this value.

15

max_active_partitions

Maximum number of active partitions.

576

max_rows_per_partition

Based on the width of your data, you can control total the data size of the partition with max_rows_per_partition.

15

max_rows_per_partition

Based on the width of your data, you can control total the data size of the partition with max_rows_per_partition.

33554432

max_files

Number of files to dispatch to peers. Limiting is typically only used for testing. In general this should not be set so that the entire bucket is procesed

0 (disabled)

input_concurrency

Input Concurrency restricts the number of batch peer processes which are run on a single instance. .

1
(This should be kept at 1. If you wish to change this please contact Hydrolix)

input_aggregation

Controls how much data should be considered a single unit of work, which ultimately drives the size of the partition. Files larger than the input_aggregation will be processed as a single unit of work.

1536000000

dry_run

Whether or not the job is a dry run. If true, all indexing work will be done but no results will be uploaded. Resulting HDX partitions are effectively thrown away.

A note on Ingest Parallelization

Batch ingest is performed on compute instances. Batch performance can be improved by:

  1. Adding more batch instances
  2. Adding larger batch instances with more parallelism

Each scenario has the potential to be different. The type and number of instances can be adjusted via Hydrolix configuration. max_active_partitions tells Hydrolix how many partitions it should work on in parallel at one time.

max_active_partitionstotal number of partitions that should be processing on a single batch peer at a time - this is a balance of speed and memory

Regex Filter.

If data is stored in a complex bucket structure on AWS S3 and cannot be expressed with a simple S3 path. regex_filter allows you to express the structure pattern to search. It is used in conjuction with settings.url which narrows down the scope.

Given the following example s3 source path
s3://mybucket/level1/2020/01/app1/pattern_xyz.gz
with setting "url":"s3://mybucket/".

Possible regex_filter pattern could be:

  • ^.*\\.gz$
  • ^s3://mybucket/level1/2020/\\d{2}/app1/.*.gz
  • ^.*/level1/2020/\\d{2}/app1/.*.gz
  • ^.*/level1/2020/\\d{2}/.*/pattern_\\w{3}.gz
  • ^.*/level1/2020/\\d{2}/.*/pattern_.*.gz576

Element

Description

regex_filter

Filters the files to ingest using a Regex match. Note backwards slash '\' need to be escaped within the regex string. The pattern starts from s3://|

The source element

The source element specifies information about the data itself, where it is, and how it should be treated.

Example source:

{
    ...
        "source": {
                    "table": "sample.trips",
                    "type": "batch",
                    "subtype": "aws s3",
                    "transform": "mytransform",
                    "settings": {
                        "url": "s3://mydatatoingest"
                    }
            }
    ...
}

Element

Purpose

Example

table

The table were the data should go. The format is <project_name>.<table_name>.

"table": "myproject.mytable"

type

Only accepts the value batch.

"type": "batch"

subtype

accepts either aws s3 or gcp gs

"subtype": "gcp gs"

transform

The name of a transform that already exists to use for this job.

"transform": "mytransform",

settings.url

The path of the files to be ingested. All paths will be analyzed in the given location and all files in the path will be ingested.

"settings": {
"url": "gs://mydatatoingest/path/"
}

Cancel Jobs

Use the cancel jobs endpoint to cancel the batch ingest job and tasks associated with the job ID. The cancellation will be reflected in the status output /v1/orgs/{org_id}/jobs/batch/{job_id}/cancel.

Jobs Status

Get the status of a job and it's tasks. This endpoint is suitable for polling for job completion /v1/orgs/{org_id}/jobs/batch/{job_id}/status.

Job Response codes

Code

Description

200

Success

404

Job not found

405

Request was not a GET

500

Internal error

Response body on success

{
  "status": "RUNNING",
  "status_detail": {
    "tasks": {
      "INDEX": {
        "READY": 5
      },
      "LIST": {
        "DONE": 1
      }
    },
    "percent_complete": 0.16666667,
    "estimated": false
  }
}

Key

Description

Optional

.status

Status of the job. One of READY,RUNNING,DONE, or CANCELED.

No

.status_detail

In-depth task status information if tasks exists.

Yes

.status_detail.tasks

Aggregations of task types and states.

No

.status_detail.percent_complete

Job progress percentage as a float.

No

.status_detail.estimated

Whether or not the progress is estimated. Once all listing tasks are complete progress percentage is no longer estimated.

No

AWS Data to GKE - Cross cloud

If you have data in AWS storage and your cluster is in Google GKE it is possible to load data from AWS.
To do this you will need to set-up a user and role within AWS that has access to the bucket you want to retrieve the data from.

You will then need to add the AWS Secret Key and ID to your Kubernetes deployment.

This can be done either through the use of the hkt command to create your hydrolixcluster.yaml and apply it as follows:

./hkt hydrolix-cluster --env AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID --env AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY > hydrolixcluster.yaml

or it can be done directly in the hydrolixcluster.yaml

spec:
  admin_email: .....
  ....
  env:
    AWS_ACCESS_KEY_ID: AWS_ACCESS_KEY_ID_HERE
    AWS_SECRET_ACCESS_KEY: AWS_ACCESS_SECRET_KEY_HERE
  host: ..........
  ip_allowlist:
  - source: ................

To run the job within the Batch Jobs API you will need to specify the URL path with an S3:// path and a Subtype of aws s3

{
    "type": "batch_import",
    "name": "job_sample_data",
    "description": "sample data on aws",
    "settings": {
        "max_active_partitions": 576,
        "max_rows_per_partition": 33554432,
        "max_minutes_per_partition": 20,
        "source": {
            "settings": {
                "url": "s3://mydatatoingest/mypath/"
            },
            "table": "sample.data",
            "type": "batch",
            "subtype": "aws s3",
            "transform": "mytransform"
        },
        "regex_filter": "^s3://mydatatoingest/mypath/.*.gz"
    }
}

Did this page help you?