GCP Storage Notifications

The following describes the use case where a file is uploaded to a Google Storage bucket and a notification is sent to Hydrolix to then load the file automatically.

To create notifications on your bucket path you can use the gsutil command (more information can be foud from google here - Pub/Sub notifications for Cloud Storage.

Create a pub/sub topic and Subscription

gcloud pubsub topics create <TOPIC_NAME>

# For Example
gcloud pubsub topics create autoingest_gcs
Created topic [projects/prod-112233/topics/autoingest_gcs].


gcloud pubsub subscriptions create <subscription name> --topic=<TOPIC_NAME>

# For Example
gcloud pubsub subscriptions create autoingest_gcs_sub --topic=autoingest_gcs

The above two commands create a topic and subscription with Google default behaviours. More information on Topic Creation and Subscription Creation is available from Google.

Once created create your notification

The following command will create your pub/sub notification.

gsutil notification create -f json -e OBJECT_FINALIZE -t <TOPIC ID> -p <path> gs://<bucket>

# For Example
gsutil notification create -f json -e OBJECT_FINALIZE -t projects/prod-112233/topics/autoingest_gcs -p mypath/ gs://mybucket

The flags in the command are as follows:

-e OBJECT_FINALIZ Is so that only creation of object notifications are created
-t <TOPIC ID> is the ID of the topic created.
-p <PATH> is the path within the bucket

Add cluster bucket and Pub/Sub Access to the Hydrolix Service Account

Your service account name should be in your env.sh file you created when you built the cluster. The basic format of it is export GCP_STORAGE_SA=hdx-${CLIENT_ID}-sa@${PROJECT_ID}.iam.gserviceaccount.com

# Subscriber Access to Pub/Sub
gcloud projects add-iam-policy-binding ${PROJECT_ID} --member="serviceAccount:${GCP_STORAGE_SA}" --role='roles/pubsub.subscriber'

# External Source Bucket access
gsutil iam ch serviceAccount:${GCP_STORAGE_SA}:roles/storage.objectAdmin gs://mysourcebucket

Create or update your Table configuration.

{
	"name": "example",
	"description": "autoingest example",
	"settings": {
		"autoingest": {
			"enabled": true,
      "source":"pubsub://<project_id>/<topic-sub>",
			"pattern": "^gs://mybucket/mypath/.*/log_.*.gz",
			"max_rows_per_partition": 4000000,
      "max_minutes_per_partition": 60,
      "max_active_partitions": 576,
      "input_aggregation": 1536000000,
      "dry_run": false
		}
	}
}

📘

Transform

Don't forget you need to create a transform so the Batch server knows what to do with the data in the files. More information can be found in Write Transforms

Table Pattern

It is highly recommended to provide a specific regex pattern when setting an auto-ingest pattern on table requests. The auto-ingest service could be handling many tables enabled for auto-ingest, and will dispatch ingest requests to the first matching table pattern.

gs event notifications contain the full s3 path. Hence regex match will start from ^gs://. Given the following example sg path
gs://mybucket/level1/2020/01/app1/pattern_xyz.gz.

Possible patterns could be:

  • ^.*\\.gz$ is not recommended - too wide a match
  • ^gs://mybucket/level1/2020/\\d{2}/app1/.*.gz
  • ^.*/level1/2020/\\d{2}/app1/.*.gz
  • ^.*/level1/2020/\\d{2}/.*/pattern_\\w{3}.gz
  • ^.*/level1/2020/\\d{2}/.*/pattern_.*.gz

It should be noted that as the pattern is submitted in a JSON document. JSON requires \ chars to be escaped, hence \\ in the examples above (online re2 regex tester).

Table Auto-ingest Attributes

Auto-ingest is defined in the settings.autoingest object within the table JSON request.

ElementPurpose
enableDefault is false
sourceThe pub/sub queue name containing the storage notifications. The name must be prefixed with pubsub://. For example: pubsub://project-1/mytopic
patternThe gs event notification regex pattern. Default is an empty string. For example: "^gs://mybucket/mypath/.*/log_.*.gz",
max_rows_per_partitionThe max row count limit per partitions. Default 33554432.
max_minutes_per_partitionThe maximum number of minutes to hold in a partition. Default 15.
max_active_partitionsMaximum number of active partitions. Default 576.
input_aggregationControls how much data should be considered a single unit of work, which ultimately drives the size of the partition. Default 1536000000.
dry_runDefault is false