via Kinesis

You can ingest data into Hydrolix with AWS Kinesis. You can access AWS Kinesis from Hydrolix clusters running on AWS or any other cloud provider.

πŸ‘

The basic steps are:

  1. Create a Project/Table
  2. Create a Transform
  3. Configure the Kinesis Source AWS account.
  4. Configure the Hydrolix Kinesis Service and Scale.

It is assumed that the project, table and transform are all already configured. More information on how to set these up can be found here - Projects & Tables, Write Transforms.

Prerequisites

In order to load data into Hydrolix from Kinesis you will need the following in your AWS Account:

  1. A DynamoDB table to store checkpoint information
  2. An AWS user/role with access to DynamoDB and Kinesis.
  3. Kinesis ARN and region of your Kinesis stream

Create the DynamoDB Table

The DynamoDB can be created with the AWS Console. The Table should be created with the following options.

The following options should be applied:

OptionValue
Table NameHydrolix would suggest using your client Id with the string _kinesis_check_point appended e.g. hdxcli-123456_kinesis_check_point
Partition KeyStreamShard
SettingsSelect 'Customized'
Table ClassDynamoDB Standard
Read/Write capacity settingsOn-Demand

πŸ“˜

Make sure to grab your DynamoDB ARN

Create a User/Role for access

For your Hydrolix cluster to access the Kinesis queue and use the DynamoDB for checkpointing the cluster will need a user/role that can read and write to these services in your account. This can be done within the AWS Console. More information is within AWS's Documentation.

πŸ“˜

Make sure to record your AWS Secret ID and AWS Secret

Record your Kinesis ARN

Retrieve your Kinesis ARN from the AWS console. You will need this later when you configure the Hydrolix platform.

Configure Access to your Kinesis

The configuration of the Hydrolix platform comes in two parts:

  1. Add the Secret ID and Secret Key to the tunables (AWS CloudFormation) or environment variables (Kubernetes).
  2. Configure the Hydrolix Kinesis acquisition service using the Kinesis Sources API.

If your cluster uses AWS for storage, the service account needs access to Kinesis and DynamoDB.

Add the Access Key in Kubernetes

πŸ“˜

Kinesis & Non-AWS Storage Providers

Configure access keys to use Kinesis ingest with a cloud provider other than AWS.

You can add the access key in the API call it'll add the credentials into a k8s secret:

POST {{base_url}}/v1/orgs/{ org_id}}/projects/{{project_id}}/tables/{{table_id}}/sources/kinesis/
Authorization: Bearer {{token}}
Content-Type: application/json
{
  "name": "kinesissource",
  "pool_name": "kinesispool",
  "k8s_deployment": {
      "service": "kinesis-peer",
      "replicas": 1
   },
  "transform": "transform",
  "settings": {
    "stream_name": "arn:aws:kinesis:us-east-2:1234567890:stream/test-kinesis",
    "region": "us-east-2",
    "checkpointer": {
      "name": "arn:aws:dynamodb:us-east-2:1234567890:table/test-kinesis"
    },
		"aws_key": "XXXXXXXXXXXX",
    "aws_secret": "YYYYYYYYYYYYYYYYY"
  }
}

The Kinesis pool creation will respond with an object:

{
	"uuid": "22b13359-7897-40d7-8e4d-5bcc0e7ce686",
	"name": "kinesissource",
	"url": "{{base_url}}/v1/orgs/{ org_id}}/projects/{{project_id}}/tables/{{table_id}}/sources/kinesis/{{kinesis_id}}",
	"type": "pull",
	"subtype": "kinesis",
	"transform": "transform",
	"table": "{{table_id}}",
	"settings": {
		"stream_name": "arn:aws:kinesis:us-east-2:1234567890:stream/test-kinesis",
		"region": "us-east-2",
		"organizer": {
			"root_path": "/intake/stream/{{id}}"
		},
		"checkpointer": {
			"name": "arn:aws:dynamodb:us-east-2:1234567890:table/test-kinesis"
		},
		"pool": {
			"name": "kinesispool-{{id}}",
			"description": null,
			"uuid": "516b0001-ead9-48ae-b413-3ee03f5e9ff2",
			"created": "2023-03-22T06:45:42.401740Z",
			"modified": "2023-03-22T06:45:42.401766Z",
			"settings": {
				"is_head": false,
				"is_default": false,
				"k8s_deployment": {
					"replicas": "1",
					"service": "kinesis-peer"
				}
			},
			"location": "880ecba8-6895-4892-a429-eb6c6d88de64",
			"type": "elastic",
			"tag": "pool"
		},
		"aws_creds_key": "K2447EFAD0B5F4E8C91A13CDD0BD5873D"
	}
}

The AWS_creds_key is the name of the secret in k8s storing the credentials.

GCP Pre-configuration steps

Hydrolix allows to read Kinesis stream from GCP, the main difference is where the checkpoint is stored.
To store checkpoint into Google NoSql datastore the service account used by the Hydrolix needs to have access to datastore.
You should edit the service account created in Google Cloud and add the role Cloud Datastore Owner.
Similar to:

Create Kinesis source using GCP datastore.

You can now create a new Kinesis datasource leveraging Datastore for checkpoint, here's an API call example:

POST {{base_url}}/v1/orgs/{ org_id}}/projects/{{project_id}}/tables/{{table_id}}/sources/kinesis/
Authorization: Bearer {{token}}
Content-Type: application/json

{
  "name": "kinesissource",
  "pool_name": "kinesispool",
  "k8s_deployment": {
      "service": "kinesis-peer",
      "replicas": 1
   },
  "transform": "transform",
  "settings": {
    "stream_name": "arn:aws:kinesis:us-east-2:1234567890:stream/test-kinesis",
    "region": "us-east-2",
    "checkpointer": {
      "name": "https://datastore.googleapis.com/hdx-kinesis"
    },
		"aws_key": "XXXXXXXXXXXX",
    "aws_secret": "YYYYYYYYYYYYYYYYY"
  }
}

This will create a new datastore table hdx-kinesis which will store the checkpoint for the Kinesis stream name test-kinesis.

Configure the Hydrolix Kinesis Service

To create your Kinesis source in Hydrolix. This is done using the API endpoint Create Kinesis Sources within the API.

POST {{base_url}}/v1/orgs/{ org_id}}/projects/{{project_id}}/tables/{{table_id}}/sources/kinesis/
Authorization: Bearer {{token}}
Content-Type: application/json

{
   "name": "kinesissource",
   "pool_name": "kinesispool",
   "k8s_deployment": {
      "cpu": 1,
      "memory": 10Gi,
      "service": "kinesis-peer"
   },
   "type": "pull",
   "subtype": "kinesis",
   "transform": "{{transform_name}}",
   "table": "{{project_name}}.{{table_name}}",
   "settings": {
      "stream_name": "arn:aws:kinesis:us-east-2:1234567890:stream/test-kinesis",
      "region": "us-east-2",
      "checkpointer": {
         "name": "arn:aws:dynamodb:us-east-2:1234567890:table/test-kinesis"
      },
      "aws_key": "XXXXXXXXXXXX",
      "aws_secret": "YYYYYYYYYYYYYYYYY"
   }
}

The settings for the source are as follows:

SettingDescriptionExample
nameName of your Kinesis SourcemyKinesisSource
pool_nameName for the pool that will service the sourcemyKinesisPool
k8s_deploymentObject describing the cpu/memory and service for the pool (kinesis-peer){
"cpu": 1,
""memory: 1,
"service": "kinesis-peer"
}
typeThe method to retrieve stream data. This should be set as pullpull
subtypeThe type of data source. This should be set as kinesiskinesis
transformThe name of the transform to use in ingesting datamyTransform
tableThe project and table to import the data into.myproject.mytable
settings
stream_name
ARN for the Kinesis streamsettings": {
"stream_name": "arn:aws:kinesis:us-east-2:1234567890:stream/test-kinesis"
}
settings
region
Region for the Kinesis streamsettings": {
"region": "us-east-2",
}
settings
checkpointer
name
ARN for the DynamoDB for AWS / Datastore Table name for GCPsettings": {
"checkpointer": {
"name": "arn:aws:kinesis:us-east-2:1234567890:stream/test-kinesis"
}

settings": {
"checkpointer": {
"name": "https://datastore.googleapis.com/hdx-kinesis"
}
settings
aws_key
AWS Key used to connect to kinesis stream
settings
aws_secret
AWS Secret used to connect to kinesis stream

Via Cloudwatch

Cloudwatch passes multiple logs at a time through the logEvents message field:

{
    "owner": "111111111111",
    "logGroup": "CloudTrail/logs",
    "logStream": "111111111111_CloudTrail/logs_us-east-1",
    "subscriptionFilters": [
        "Destination"
    ],
    "messageType": "DATA_MESSAGE",
    "logEvents": [
        {
            "id": "31953106606966983378809025079804211143289615424298221568",
            "timestamp": 1432826855000,
            "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
        },
        {
            "id": "31953106606966983378809025079804211143289615424298221569",
            "timestamp": 1432826855000,
            "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
        },
        {
            "id": "31953106606966983378809025079804211143289615424298221570",
            "timestamp": 1432826855000,
            "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
        }
    ]
}

Hydrolix supports a special type of transform to handle this log form. In your transform, assign settings.format_details.subtype a value of "cloudwatch":

{
  "name": "my_transform",
  "type": "json",
  "settings": {
    "format_details": {
      "subtype": "cloudwatch",
      "flattening": ...,
      ...
      }
    },
    ...
  }
}

When you use the "cloudwatch" subtype, each element of the array in the logEvents field becomes a separate ingested row, handled separately by your transform. Hydrolix deserializes the JSON in the "message" field of each logEvents element. This becomes the value of the logEvent field in the new log messages. Consider the following logEvents array within a Cloudwatch message:

{
  "owner": "012345678901",
  "logGroup": "/aws/apigateway/test-api-gateway-mgt-api",
  "logEvents": [
    {
      "id": "37469574158817812474116640748123909459676719559957151744",
      "message": "{\"apiName\":\"test-api-gateway-mgt-api\",\"authMS\":\"18\",\"caller\":\"-\",\"httpMethod\":\"POST\",\"ip\":\"42.188.213.42\",\"jwtActorClientId\":\"7501d544-6398-4c50-a3fc-b55f53616e54\",\"jwtActorEnvId\":\"d991999a-0af5-43e4-a4dc-851ec7d2ff9f\",\"jwtActorOrgId\":\"5d9fbc79-b254-449d-9cb8-6d1f40904cfb\",\"jwtActorUserId\":\"-\",\"pathEnvId\":\"a221fb90-1804-4a12-9aa4-8cf63b430ada\",\"pathOrgId\":\"-\",\"protocol\":\"HTTP/1.1\",\"proxyRoundTripMS\":\"365\",\"regionTag\":\"us-east-2\",\"requestId\":\"37473422-3316-4622-bef6-d1d1b74734c9\",\"requestPath\":\"/v1/environments/a221fb90-1804-4a12-9aa4-8cf63b430ada/users/554a405f-4847-44c7-9aed-851206c11ec7/roleAssignments\",\"requestTime\":\"30/Mar/2023:16:30:15 +0000\",\"resourcePath\":\"/environments/{environmentid}/users/{userid}/roleAssignments\",\"responseLength\":\"909\",\"roundTripMS\":\"392\",\"status\":\"201\",\"user\":\"-\",\"userAgent\":\"Apache-HttpClient/4.5.14 (Java/11.0.18)\",\"xPingTrueClientIP\":\"42.188.213.42\"}",
      "timestamp": 1680193815400
    },
    {
      "id": "37469574158817812474116640748123909459676719559957151744",
      "message": "{\"apiName\":\"test-api-gateway-mgt-api\",\"authMS\":\"18\",\"caller\":\"-\",\"httpMethod\":\"POST\",\"ip\":\"42.188.213.42\",\"jwtActorClientId\":\"7501d544-6398-4c50-a3fc-b55f53616e54\",\"jwtActorEnvId\":\"d991999a-0af5-43e4-a4dc-851ec7d2ff9f\",\"jwtActorOrgId\":\"5d9fbc79-b254-449d-9cb8-6d1f40904cfb\",\"jwtActorUserId\":\"-\",\"pathEnvId\":\"a221fb90-1804-4a12-9aa4-8cf63b430ada\",\"pathOrgId\":\"-\",\"protocol\":\"HTTP/1.1\",\"proxyRoundTripMS\":\"365\",\"regionTag\":\"us-east-2\",\"requestId\":\"37473422-3316-4622-bef6-d1d1b74734c9\",\"requestPath\":\"/v1/environments/a221fb90-1804-4a12-9aa4-8cf63b430ada/users/554a405f-4847-44c7-9aed-851206c11ec7/roleAssignments\",\"requestTime\":\"30/Mar/2023:16:30:15 +0000\",\"resourcePath\":\"/environments/{environmentid}/users/{userid}/roleAssignments\",\"responseLength\":\"909\",\"roundTripMS\":\"392\",\"status\":\"201\",\"user\":\"-\",\"userAgent\":\"Apache-HttpClient/4.5.14 (Java/11.0.18)\",\"xPingTrueClientIP\":\"42.188.213.42\"}",
      "timestamp": 1680193815285
    }
  ],
  "logStream": "68fe326b9e008e1490dd71496307b647",
  "messageType": "DATA_MESSAGE",
  "subscriptionFilters": [
    "KinesisFilter-NewRelic"
  ]
}

The first index of the logEvents field becomes the following ingested row:

{
  "owner": "012345678901",
  "logGroup": "/aws/apigateway/test-api-gateway-mgt-api",
  "logEvent": {
    "id": "37469574158817812474116640748123909459676719559957151744",
    "message": {
      "apiName": "test-api-gateway-mgt-api",
      "authMS": "18",
      "caller": "-",
      "httpMethod": "POST",
      "ip": "42.188.213.42",
      "jwtActorClientId": "7501d544-6398-4c50-a3fc-b55f53616e54",
      "jwtActorEnvId": "d991999a-0af5-43e4-a4dc-851ec7d2ff9f",
      "jwtActorOrgId": "5d9fbc79-b254-449d-9cb8-6d1f40904cfb",
      "jwtActorUserId": "-",
      "pathEnvId": "a221fb90-1804-4a12-9aa4-8cf63b430ada",
      "pathOrgId": "-",
      "protocol": "HTTP/1.1",
      "proxyRoundTripMS": "365",
      "regionTag": "us-east-2",
      "requestId": "37473422-3316-4622-bef6-d1d1b74734c9",
      "requestPath": "/v1/environments/a221fb90-1804-4a12-9aa4-8cf63b430ada/users/554a405f-4847-44c7-9aed-851206c11ec7/roleAssignments",
      "requestTime": "30/Mar/2023:16:30:15 +0000",
      "resourcePath": "/environments/{environmentid}/users/{userid}/roleAssignments",
      "responseLength": "909",
      "roundTripMS": "392",
      "status": "201",
      "user": "-",
      "userAgent": "Apache-HttpClient/4.5.14 (Java/11.0.18)",
      "xPingTrueClientIP": "42.188.213.42"
    },
    "timestamp": 1680193815400
  },
  "logStream": "68fe326b9e008e1490dd71496307b647",
  "messageType": "DATA_MESSAGE",
  "subscriptionFilters": [
    "KinesisFilter-NewRelic"
  ]
}

Specify an AWS Role for Kinesis Streaming

If your Kinesis configuration and your Hydrolix configuration do not share AWS roles, you need a way to specify an alternative AWS role ARN for accessing your Kinesis stream. Create a field named aws_role_arn within your Kinesis configuration settings. Use the full ARN as the value:

{
    "name": "kinesis",
    "type": "kinesis_source",
    "k8s_deployment": {
      "service": "kinesis-peer",
      "replicas": 1
    },
    "transform": "transform",
    "table": "kinesis.kinesis_table",
    "settings": {
        "stream_name": "arn:aws:kinesis:us-east-2:example:stream/my-example-stream",
        "region": "us-east-2",
        "aws_role_arn": "arn:aws:iam::1776422013:role/example"
    }
}

Alternatively, you can store credentials in the aws_key and aws_secret fields of your Kinesis configuration settings. Hydrolix stores these values in a Kubernetes secret as a JSON object. You can access this secret using the aws_creds_key value.

{
    "name": "kinesis",
    "type": "kinesis_source",
    "k8s_deployment": {
      "service": "kinesis-peer",
      "replicas": 1
    },
    "transform": "transform",
    "table": "kinesis.kinesis_table",
    "settings": {
        "stream_name": "arn:aws:kinesis:us-east-2:example:stream/my-example-stream",
        "region": "us-east-2",
        "aws_key": "<KEY>",
        "aws_secret": "<SECRET>"
    }
}

πŸ“˜

Credential Order

Hydrolix first attempts to connect to your Kinesis stream using the ARN stored in aws_role_arn. If that does not work, Hydrolix attempts to connect using the environment variable stored in aws_creds. If neither method works, Hydrolix attempts to connect to the Kinesis stream without authentication.

Specify an Alternative Checkpoint Datastore

By default, Hydrolix stores Kinesis checkpoint data in a local DynamoDB instance. Use the settings.checkpointer object to specify an alternate checkpoint datastore:

{
    "name": "kinesis",
    "type": "kinesis_source",
    "k8s_deployment": {
      "service": "kinesis-peer",
      "replicas": 1
    },
    "transform": "transform",
    "table": "kinesis.kinesis_table",
    "settings": {
        "stream_name": "arn:aws:kinesis:us-east-2:1776422013:stream/my-example-stream",
        "region": "us-east-2",
        "checkpointer": {
            "name": "arn:aws:dynamodb:us-east-2:1776422013:table/my-example-stream",
            "datastore_project_id": "example-project-id-54321"
        },
        "aws_role_arn": "arn:aws:iam::1776422013:role/example"
    }
}

Within the name field, specify an ARN or GCP URL. If you specify a resource within GCP, use the datastore_project_id field to specify a project.

Scale the Kinesis Service

To scale the service, edit hydrolixcluster.yaml:

#Either:
 
 kubectl edit hydrolixcluster
 
#or
 
kubectl get hydrolixclusters <NAMESPACE> -o yaml > hydrolix-cluster.yaml
vim hydrolix-cluster.yaml
kubectl apply -f hydrolix-cluster.yaml

Edit the replicas for the number of nodes you wish for Kinesis.

......
  owner: admin
  pools:
  - cpu: "1"
    memory: 10Gi
    name: kinesispool-db9feb26-2a8c-4735-9b8a-6e6b7f207cbe
    replicas: "1"
    service: kinesis-peer
    storage: 10Gi
  region: us-central1
  scale:
......