via Kinesis

Hydrolix can ingest data from AWS Kinesis, whether your cluster is running on AWS or any other cloud provider.

👍

The basic steps are:

  1. Create a Project/Table
  2. Create a Transform
  3. Configure a checkpoint database
  4. Configure the Hydrolix Kinesis source and adjust the scale

This document assumes that the project, table and transform are already configured, and proceeds to steps 3 and 4 above. More information on how to perform steps 1 and 2 can be found here - Projects & Tables, Write Transforms.

Configure a Checkpoint Database

A checkpoint database table is needed by the Hydrolix kinesis-peers to track their progress. The most common option is to configure an AWS DynamoDB table for this purpose, but it's also possible to use a GCP Datastore. Both options are described below.

AWS DynamoDB Table

Prerequisites

In order to load data into Hydrolix from Kinesis, you will need the following in your AWS Account:

  1. A DynamoDB table to store checkpoint information
  2. An AWS user/role with access to DynamoDB and Kinesis
  3. The Kinesis ARN and region of your Kinesis stream

Create the DynamoDB table

The DynamoDB can be created with the AWS Console. The table should be created with the following options.

The following options should be applied:

OptionValue
Table NameHydrolix suggests using your client Id with the string _kinesis_check_point appended e.g. hdxcli-123456_kinesis_check_point
Partition KeyStreamShard
SettingsSelect 'Customize settings'
Table ClassDynamoDB Standard
Read/Write capacity settingsOn-demand

📘

Record your DynamoDB ARN

Make sure to record your new DynamoDB ARN for later use in Hydrolix's checkpointer setting.

Create a User/Role for access

To access the Kinesis queue and use the DynamoDB for checkpointing, your Hydrolix cluster will need a user/role that can read and write to these services in your account. This can be done within the AWS Console. More information is within AWS's Documentation.

For Kinesis access, make sure the user has permission to perform these actions:

"kinesis:GetRecords",
"kinesis:GetShardIterator",
"kinesis:DescribeStream",
"kinesis:ListShards"

If you've set up DynamoDB, ensure the user has permission to perform these actions:

"dynamodb:PutItem",
"dynamodb:DescribeTable",
"dynamodb:GetItem"

📘

Make sure to record your AWS Secret ID and AWS Secret

AWS shows this to you only once during key creation.

Record Your Kinesis ARN

Retrieve your Kinesis ARN from the AWS console. You will need this later when you configure the Hydrolix platform.

Configure Access to Your Kinesis stream

Configure the Hydrolix Kinesis acquisition service using the Kinesis Sources API.

If your cluster uses AWS for storage, the service account needs access to Kinesis and DynamoDB.

Add the Access Key in Kubernetes

📘

Kinesis & Non-AWS Storage Providers

Configure access keys to use Kinesis ingest with a cloud provider other than AWS.

Add the access key and DynamoDB ARN in the API call. Hydrolix will add the credentials into a Kubernetes secret:

POST {{base_url}}/v1/orgs/{ org_id}}/projects/{{project_id}}/tables/{{table_id}}/sources/kinesis/
Authorization: Bearer {{token}}
Content-Type: application/json
{
  "name": "kinesissource",
  "pool_name": "kinesispool",
  "k8s_deployment": {
      "service": "kinesis-peer",
      "replicas": 1
   },
  "transform": "transform",
  "settings": {
    "stream_name": "arn:aws:kinesis:us-east-2:1234567890:stream/test-kinesis",
    "region": "us-east-2",
    "checkpointer": {
      "name": "arn:aws:dynamodb:us-east-2:1234567890:table/test-kinesis"
    },
    "aws_key": "AWS_ACCESS_KEY_ID",
    "aws_secret": "AWS_SECRET_KEY"
  }
}

The Kinesis pool creation will respond with an object:

{
	"uuid": "22b13359-7897-40d7-8e4d-5bcc0e7ce686",
	"name": "kinesissource",
	"url": "{{base_url}}/v1/orgs/{ org_id}}/projects/{{project_id}}/tables/{{table_id}}/sources/kinesis/{{kinesis_id}}",
	"type": "pull",
	"subtype": "kinesis",
	"transform": "transform",
	"table": "{{table_id}}",
	"settings": {
		"stream_name": "arn:aws:kinesis:us-east-2:1234567890:stream/test-kinesis",
		"region": "us-east-2",
		"organizer": {
			"root_path": "/intake/stream/{{id}}"
		},
		"checkpointer": {
			"name": "arn:aws:dynamodb:us-east-2:1234567890:table/test-kinesis"
		},
		"pool": {
			"name": "kinesispool-{{id}}",
			"description": null,
			"uuid": "516b0001-ead9-48ae-b413-3ee03f5e9ff2",
			"created": "2023-03-22T06:45:42.401740Z",
			"modified": "2023-03-22T06:45:42.401766Z",
			"settings": {
				"is_head": false,
				"is_default": false,
				"k8s_deployment": {
					"replicas": "1",
					"service": "kinesis-peer"
				}
			},
			"location": "880ecba8-6895-4892-a429-eb6c6d88de64",
			"type": "elastic",
			"tag": "pool"
		},
		"aws_creds_key": "K2447EFAD0B5F4E8C91A13CDD0BD5873D"
	}
}

aws_creds_key is the name of the secret in Kubernetes that stores your credentials.

GCP Datastore

Prerequisites

Hydrolix can read a Kinesis stream from GCP. The main difference is where the checkpoint is stored.
To store checkpoint into a Google Datastore, the service account used by Hydrolix needs to have access to that datastore.

Add Datastore Role

Edit the service account created in Google Cloud and add the role Cloud Datastore Owner.

Create a Kinesis source using GCP Datastore

You can now create a new Kinesis datasource using the GCP Datastore for storing checkpoints. Here's an API call example:

POST {{base_url}}/v1/orgs/{ org_id}}/projects/{{project_id}}/tables/{{table_id}}/sources/kinesis/
Authorization: Bearer {{token}}
Content-Type: application/json

{
  "name": "kinesissource",
  "pool_name": "kinesispool",
  "k8s_deployment": {
      "service": "kinesis-peer",
      "replicas": 1
   },
  "transform": "transform",
  "settings": {
    "stream_name": "arn:aws:kinesis:us-east-2:1234567890:stream/test-kinesis",
    "region": "us-east-2",
    "checkpointer": {
      "name": "https://datastore.googleapis.com/hdx-kinesis"
    },
    "aws_key": "AWS_ACCESS_KEY_ID",
    "aws_secret": "AWS_SECRET_KEY"
  }
}

This will create a new Datastore table named hdx-kinesis, which will store the checkpoint for the Kinesis stream named test-kinesis.

Configure the Hydrolix Kinesis Source

Create your Kinesis source in Hydrolix using the API endpoint Create Kinesis Sources within the API.

POST {{base_url}}/v1/orgs/{ org_id}}/projects/{{project_id}}/tables/{{table_id}}/sources/kinesis/
Authorization: Bearer {{token}}
Content-Type: application/json

{
   "name": "kinesissource",
   "pool_name": "kinesispool",
   "k8s_deployment": {
      "cpu": 1,
      "memory": "10Gi",
      "service": "kinesis-peer"
   },
   "type": "pull",
   "subtype": "kinesis",
   "transform": "{{transform_name}}",
   "table": "{{project_name}}.{{table_name}}",
   "settings": {
      "stream_name": "arn:aws:kinesis:us-east-2:1234567890:stream/test-kinesis",
      "region": "us-east-2",
      "checkpointer": {
         "name": "arn:aws:dynamodb:us-east-2:1234567890:table/test-kinesis"
      },
      "aws_key": "AWS_ACCESS_KEY_ID",
      "aws_secret": "AWS_SECRET_KEY"
   }
}

The settings for the source are as follows:

SettingDescriptionExample
nameName of your Kinesis source"myKinesisSource"
pool_nameName for the pool that will service the source"myKinesisPool"
k8s_deploymentObject describing the cpu/memory and service for the pool (kinesis-peer){ "cpu": 1, "memory": "10Gi", "service": "kinesis-peer" }
typeThe method to retrieve stream data. This should be set as pull"pull"
subtypeThe type of data source. This should be set as kinesis"kinesis"
transformThe name of the transform to use in ingesting data"myTransform"
tableThe project and table to import the data into."myproject.mytable"
settings
stream_name
ARN for the Kinesis stream"settings": { "stream_name": "arn:aws:kinesis:us-east-2:1234567890:stream/test-kinesis" }
settings
region
Region for the Kinesis stream"settings": { "region": "us-east-2", }
settings
checkpointer
name
ARN for the DynamoDB for AWS / Datastore table name for GCP"settings": { "checkpointer": { "name": "arn:aws:kinesis:us-east-2:1234567890:stream/test-kinesis" }
...or...
"settings": { "checkpointer": { "name": "<https://datastore.googleapis.com/hdx-kinesis>" }
settings
aws_key
AWS Key used to connect to the Kinesis stream
settings
aws_secret
AWS Secret used to connect to the Kinesis stream

Scale the Kinesis Service

The default scale profiles for Hydrolix set the scale of the kinesis-peers to zero, since they're not needed for every Hydrolix cluster. To scale the service, edit hydrolixcluster.yaml:

#Either:
 
 kubectl edit hydrolixcluster
 
#or
 
kubectl get hydrolixclusters <NAMESPACE> -o yaml > hydrolix-cluster.yaml
vim hydrolix-cluster.yaml
kubectl apply -f hydrolix-cluster.yaml

Edit the replicas for the number of nodes you wish for Kinesis processing.

......
  owner: admin
  pools:
  - cpu: "1"
    memory: 10Gi
    name: kinesispool-db9feb26-2a8c-4735-9b8a-6e6b7f207cbe
    replicas: "1"
    service: kinesis-peer
    storage: 10Gi
  region: us-central1
  scale:
......

See Scale your Cluster and Scale Profiles for more information.

Special Considerations

Specify an Alternative AWS Role

If your Kinesis configuration and your Hydrolix configuration do not share AWS roles, you need a way to specify an alternative AWS role ARN for accessing your Kinesis stream. Create a field named aws_role_arn within your Kinesis configuration settings. Use the full ARN as the value:

   "settings": {
      "stream_name": "arn:aws:kinesis:us-east-2:example:stream/my-example-stream",
      "region": "us-east-2",
      "aws_role_arn": "arn:aws:iam::1776422013:role/example",
      "checkpointer": {
         "name": "arn:aws:dynamodb:us-east-2:1234567890:table/test-kinesis"
      }
   }

Alternatively, you can store credentials in the aws_key and aws_secret fields of your Kinesis configuration settings. Hydrolix stores these values in a Kubernetes secret as a JSON object. You can access this secret using the aws_creds_key value.

   "settings": {
      "stream_name": "arn:aws:kinesis:us-east-2:example:stream/my-example-stream",
      "region": "us-east-2",
      "checkpointer": {
        	"name": "arn:aws:dynamodb:us-east-2:1234567890:table/test-kinesis"
      },
      "aws_key": "AWS_ACCESS_KEY_ID",
      "aws_secret": "AWS_SECRET_KEY"
   }

📘

Credential Order

Hydrolix first attempts to connect to your Kinesis stream using the ARN stored in aws_role_arn. If that does not work, Hydrolix attempts to connect using the environment variable stored in aws_creds. If neither method works, Hydrolix attempts to connect to the Kinesis stream without authentication.

AWS CloudWatch Ingest via Kinesis

AWS CloudWatch uses a special format that passes multiple logs at a time through the logEvents message field. This requires special handling in Hydrolix.

Hydrolix supports a special subtype of transform to handle this log form. In your transform, assign settings.format_details.subtype a value of "cloudwatch":

{
  "name": "my_transform",
  "type": "json",
  "settings": {
    "format_details": {
      "subtype": "cloudwatch",
      "flattening": ...,
      ...
      }
    },
    ...
  }
}

When you use the "cloudwatch" subtype, each element of the array in the logEvents field becomes a separate ingested row, handled separately by your transform. Hydrolix deserializes the JSON in the "message" field of each logEvents element. This becomes the value of the logEvent field in the new log messages. Consider another example of the CloudWatch format: following logEvents array within a CloudWatch message:

{
  "owner": "012345678901",
  "logGroup": "/aws/apigateway/test-api-gateway-mgt-api",
  "logEvents": [
    {
      "id": "37469574158817812474116640748123909459676719559957151744",
      "message": "{\"apiName\":\"test-api-gateway-mgt-api\",\"authMS\":\"18\",\"caller\":\"-\",\"httpMethod\":\"POST\",\"ip\":\"42.188.213.42\",\"jwtActorClientId\":\"7501d544-6398-4c50-a3fc-b55f53616e54\",\"jwtActorEnvId\":\"d991999a-0af5-43e4-a4dc-851ec7d2ff9f\",\"jwtActorOrgId\":\"5d9fbc79-b254-449d-9cb8-6d1f40904cfb\",\"jwtActorUserId\":\"-\",\"pathEnvId\":\"a221fb90-1804-4a12-9aa4-8cf63b430ada\",\"pathOrgId\":\"-\",\"protocol\":\"HTTP/1.1\",\"proxyRoundTripMS\":\"365\",\"regionTag\":\"us-east-2\",\"requestId\":\"37473422-3316-4622-bef6-d1d1b74734c9\",\"requestPath\":\"/v1/environments/a221fb90-1804-4a12-9aa4-8cf63b430ada/users/554a405f-4847-44c7-9aed-851206c11ec7/roleAssignments\",\"requestTime\":\"30/Mar/2023:16:30:15 +0000\",\"resourcePath\":\"/environments/{environmentid}/users/{userid}/roleAssignments\",\"responseLength\":\"909\",\"roundTripMS\":\"392\",\"status\":\"201\",\"user\":\"-\",\"userAgent\":\"Apache-HttpClient/4.5.14 (Java/11.0.18)\",\"xPingTrueClientIP\":\"42.188.213.42\"}",
      "timestamp": 1680193815400
    },
    {
      "id": "37469574158817812474116640748123909459676719559957151744",
      "message": "{\"apiName\":\"test-api-gateway-mgt-api\",\"authMS\":\"18\",\"caller\":\"-\",\"httpMethod\":\"POST\",\"ip\":\"42.188.213.42\",\"jwtActorClientId\":\"7501d544-6398-4c50-a3fc-b55f53616e54\",\"jwtActorEnvId\":\"d991999a-0af5-43e4-a4dc-851ec7d2ff9f\",\"jwtActorOrgId\":\"5d9fbc79-b254-449d-9cb8-6d1f40904cfb\",\"jwtActorUserId\":\"-\",\"pathEnvId\":\"a221fb90-1804-4a12-9aa4-8cf63b430ada\",\"pathOrgId\":\"-\",\"protocol\":\"HTTP/1.1\",\"proxyRoundTripMS\":\"365\",\"regionTag\":\"us-east-2\",\"requestId\":\"37473422-3316-4622-bef6-d1d1b74734c9\",\"requestPath\":\"/v1/environments/a221fb90-1804-4a12-9aa4-8cf63b430ada/users/554a405f-4847-44c7-9aed-851206c11ec7/roleAssignments\",\"requestTime\":\"30/Mar/2023:16:30:15 +0000\",\"resourcePath\":\"/environments/{environmentid}/users/{userid}/roleAssignments\",\"responseLength\":\"909\",\"roundTripMS\":\"392\",\"status\":\"201\",\"user\":\"-\",\"userAgent\":\"Apache-HttpClient/4.5.14 (Java/11.0.18)\",\"xPingTrueClientIP\":\"42.188.213.42\"}",
      "timestamp": 1680193815285
    }
  ],
  "logStream": "68fe326b9e008e1490dd71496307b647",
  "messageType": "DATA_MESSAGE",
  "subscriptionFilters": [
    "KinesisFilter-NewRelic"
  ]
}

The first index of the logEvents field becomes the following ingested row:

{
  "owner": "012345678901",
  "logGroup": "/aws/apigateway/test-api-gateway-mgt-api",
  "logEvent": {
    "id": "37469574158817812474116640748123909459676719559957151744",
    "message": {
      "apiName": "test-api-gateway-mgt-api",
      "authMS": "18",
      "caller": "-",
      "httpMethod": "POST",
      "ip": "42.188.213.42",
      "jwtActorClientId": "7501d544-6398-4c50-a3fc-b55f53616e54",
      "jwtActorEnvId": "d991999a-0af5-43e4-a4dc-851ec7d2ff9f",
      "jwtActorOrgId": "5d9fbc79-b254-449d-9cb8-6d1f40904cfb",
      "jwtActorUserId": "-",
      "pathEnvId": "a221fb90-1804-4a12-9aa4-8cf63b430ada",
      "pathOrgId": "-",
      "protocol": "HTTP/1.1",
      "proxyRoundTripMS": "365",
      "regionTag": "us-east-2",
      "requestId": "37473422-3316-4622-bef6-d1d1b74734c9",
      "requestPath": "/v1/environments/a221fb90-1804-4a12-9aa4-8cf63b430ada/users/554a405f-4847-44c7-9aed-851206c11ec7/roleAssignments",
      "requestTime": "30/Mar/2023:16:30:15 +0000",
      "resourcePath": "/environments/{environmentid}/users/{userid}/roleAssignments",
      "responseLength": "909",
      "roundTripMS": "392",
      "status": "201",
      "user": "-",
      "userAgent": "Apache-HttpClient/4.5.14 (Java/11.0.18)",
      "xPingTrueClientIP": "42.188.213.42"
    },
    "timestamp": 1680193815400
  },
  "logStream": "68fe326b9e008e1490dd71496307b647",
  "messageType": "DATA_MESSAGE",
  "subscriptionFilters": [
    "KinesisFilter-NewRelic"
  ]
}