Skip to content

Kinesis

Hydrolix can ingest data from AWS Kinesis, whether your cluster is running on AWS or any other cloud provider.

👍 The basic steps are:

  1. Create a Project/Table
  2. Create a Transform
  3. Configure a checkpoint database
  4. Configure the Hydrolix Kinesis source and adjust the scale

This document assumes that the project, table and transform are already configured, and proceeds to steps 3 and 4 above. More information on how to perform steps 1 and 2 can be found here - Projects & Tables, Write Transforms.

Configure a Checkpoint Database⚓︎

A checkpoint database table is needed by the Hydrolix kinesis-peers to track their progress. The most common option is to configure an AWS DynamoDB table for this purpose, but it's also possible to use a GCP Datastore. Both options are described below.

Create an AWS DynamoDB table⚓︎

Prerequisites⚓︎

In order to load data into Hydrolix from Kinesis, you will need the following in your AWS Account:

  1. A DynamoDB table to store checkpoint information
  2. An AWS user/role with access to DynamoDB and Kinesis
  3. The Kinesis ARN and region of your Kinesis stream
Create the DynamoDB table⚓︎

The DynamoDB can be created with the AWS Console. The table should be created with the following options.

The following options should be applied:

Option Value
Table Name Hydrolix suggests using your client Id with the string _kinesis_check_point appended for example, hdxcli-123456_kinesis_check_point
Partition Key StreamShard
Settings Select 'Customize settings'
Table Class DynamoDB Standard
Read/Write capacity settings On-demand

Record your DynamoDB ARN

Make sure to record your new DynamoDB ARN for later use in Hydrolix's checkpointer setting.

Create a User/Role for access⚓︎

To access the Kinesis queue and use the DynamoDB for checkpointing, your Hydrolix cluster will need a user/role that can read and write to these services in your account. This can be done within the AWS Console. More information is within AWS's Documentation.

For Kinesis access, make sure the user has permission to perform these actions:

  • kinesis:GetRecords
  • kinesis:GetShardIterator
  • kinesis:DescribeStream
  • kinesis:ListShards

If you've set up DynamoDB, ensure the user has permission to perform these actions:

  • dynamodb:PutItem
  • dynamodb:DescribeTable
  • dynamodb:GetItem

Make sure to record your AWS Secret ID and AWS Secret

AWS shows this to you only once during key creation.

Record Your Kinesis ARN⚓︎

Retrieve your Kinesis ARN from the AWS console. You will need this later when you configure the Hydrolix platform.

Configure Access to Your Kinesis stream⚓︎

Configure the Hydrolix Kinesis acquisition service using the Kinesis Sources API.

If your cluster uses AWS for storage, the service account needs access to Kinesis and DynamoDB.

Add the Access Key in Kubernetes⚓︎

Kinesis & Non-AWS Storage Providers

Configure access keys to use Kinesis ingest with a cloud provider other than AWS.

Add the access key and DynamoDB ARN in the API call. Hydrolix will add the credentials into a Kubernetes secret:

POST {{base_url}}/v1/orgs/{{ org_id}}/projects/{{project_id}}/tables/{{table_id}}/sources/kinesis/
Authorization: Bearer {{token}}
Content-Type: application/json
{
  "name": "kinesissource",
  "pool_name": "kinesispool",
  "k8s_deployment": {
      "service": "kinesis-peer",
      "replicas": 1
   },
  "transform": "transform",
  "settings": {
    "stream_name": "arn:aws:kinesis:us-east-2:1234567890:stream/test-kinesis",
    "region": "us-east-2",
    "checkpointer": {
      "name": "arn:aws:dynamodb:us-east-2:1234567890:table/test-kinesis"
    },
    "aws_key": "AWS_ACCESS_KEY_ID",
    "aws_secret": "AWS_SECRET_KEY"
  }
}

The Kinesis pool creation will respond with an object:

{
    "uuid": "22b13359-7897-40d7-8e4d-5bcc0e7ce686",
    "name": "kinesissource",
    "url": "{{base_url}}/v1/orgs/{{ org_id}}/projects/{{project_id}}/tables/{{table_id}}/sources/kinesis/{{kinesis_id}}",
    "type": "pull",
    "subtype": "kinesis",
    "transform": "transform",
    "table": "{{table_id}}",
    "settings": {
        "stream_name": "arn:aws:kinesis:us-east-2:1234567890:stream/test-kinesis",
        "region": "us-east-2",
        "organizer": {
            "root_path": "/intake/stream/{{id}}"
        },
        "checkpointer": {
            "name": "arn:aws:dynamodb:us-east-2:1234567890:table/test-kinesis"
        },
        "pool": {
            "name": "kinesispool-{{id}}",
            "description": null,
            "uuid": "516b0001-ead9-48ae-b413-3ee03f5e9ff2",
            "created": "2023-03-22T06:45:42.401740Z",
            "modified": "2023-03-22T06:45:42.401766Z",
            "settings": {
                "is_head": false,
                "is_default": false,
                "k8s_deployment": {
                    "replicas": "1",
                    "service": "kinesis-peer"
                }
            },
            "location": "880ecba8-6895-4892-a429-eb6c6d88de64",
            "type": "elastic",
            "tag": "pool"
        },
        "aws_creds_key": "K2447EFAD0B5F4E8C91A13CDD0BD5873D"
    }
}

aws_creds_key is the name of the secret in Kubernetes that stores your credentials.

GCP Datastore⚓︎

Prerequisites⚓︎

Hydrolix can read a Kinesis stream from GCP. The main difference is where the checkpoint is stored. To store checkpoint into a Google Datastore, the service account used by Hydrolix needs to have access to that datastore.

Add Datastore Role⚓︎

Edit the service account created in Google Cloud and add the role Cloud Datastore Owner.

Create a Kinesis source using GCP Datastore⚓︎

You can now create a new Kinesis datasource using the GCP Datastore for storing checkpoints. Here's an API call example:

POST {{base_url}}/v1/orgs/{ org_id}}/projects/{{project_id}}/tables/{{table_id}}/sources/kinesis/
Authorization: Bearer {{token}}
Content-Type: application/json

{
  "name": "kinesissource",
  "pool_name": "kinesispool",
  "k8s_deployment": {
      "service": "kinesis-peer",
      "replicas": 1
   },
  "transform": "transform",
  "settings": {
    "stream_name": "arn:aws:kinesis:us-east-2:1234567890:stream/test-kinesis",
    "region": "us-east-2",
    "checkpointer": {
      "name": "https://datastore.googleapis.com/hdx-kinesis"
    },
    "aws_key": "AWS_ACCESS_KEY_ID",
    "aws_secret": "AWS_SECRET_KEY"
  }
}

This will create a new Datastore table named hdx-kinesis, which will store the checkpoint for the Kinesis stream named test-kinesis.

Configure the Hydrolix Kinesis Source⚓︎

Create your Kinesis source in Hydrolix using the API endpoint Create Kinesis Sources within the API.

POST {{base_url}}/v1/orgs/{ org_id}}/projects/{{project_id}}/tables/{{table_id}}/sources/kinesis/
Authorization: Bearer {{token}}
Content-Type: application/json

{
   "name": "kinesissource",
   "pool_name": "kinesispool",
   "k8s_deployment": {
      "cpu": 1,
      "memory": "10Gi",
      "service": "kinesis-peer"
   },
   "type": "pull",
   "subtype": "kinesis",
   "transform": "{{transform_name}}",
   "table": "{{project_name}}.{{table_name}}",
   "settings": {
      "stream_name": "arn:aws:kinesis:us-east-2:1234567890:stream/test-kinesis",
      "region": "us-east-2",
      "checkpointer": {
         "name": "arn:aws:dynamodb:us-east-2:1234567890:table/test-kinesis"
      },
      "aws_key": "AWS_ACCESS_KEY_ID",
      "aws_secret": "AWS_SECRET_KEY"
   }
}

The settings for the source are as follows:

Setting Description Example
name Name of your Kinesis source "myKinesisSource"
pool_name Name for the pool that will service the source "myKinesisPool"
k8s_deployment Object describing the cpu/memory and service for the pool (kinesis-peer) {<br> "cpu": 1,<br> "memory": "10Gi",<br> "service": "kinesis-peer"<br> }
type The method to retrieve stream data. This should be set as pull "pull"
subtype The type of data source. This should be set as kinesis "kinesis"
transform The name of the transform to use in ingesting data "myTransform"
table The project and table to import the data into. "myproject.mytable"
settings
stream_name
ARN for the Kinesis stream "settings": {<br> "stream_name": "arn:aws:kinesis:us-east-2:1234567890:stream/test-kinesis"<br> }
settings
region
Region for the Kinesis stream "settings": {<br> "region": "us-east-2",<br> }
settings
checkpointer
name
ARN for the DynamoDB for AWS / Datastore table name for GCP "settings": {<br> "checkpointer": {<br> "name": "arn:aws:kinesis:us-east-2:1234567890:stream/test-kinesis"<br> }
...or...
"settings": {<br> "checkpointer": {<br> "name": "<https://datastore.googleapis.com/hdx-kinesis>"<br> }
settings
aws_key
AWS Key used to connect to the Kinesis stream
settings
aws_secret
AWS Secret used to connect to the Kinesis stream

Scale the Kinesis Service⚓︎

The default scale profiles for Hydrolix set the scale of the kinesis-peers to zero, since they're not needed for every Hydrolix cluster. To scale the service, edit hydrolixcluster.yaml:

1
2
3
4
5
6
7
8
9
#Either:

 kubectl edit hydrolixcluster

#or

kubectl get hydrolixclusters <NAMESPACE> -o yaml > hydrolix-cluster.yaml
vim hydrolix-cluster.yaml
kubectl apply -f hydrolix-cluster.yaml

Edit the replicas for the number of nodes you wish for Kinesis processing.

  owner: admin
  pools:
  - cpu: "1"
    memory: 10Gi
    name: kinesispool-db9feb26-2a8c-4735-9b8a-6e6b7f207cbe
    replicas: "1"
    service: kinesis-peer
    storage: 10Gi
  region: us-central1
  scale:

See Scale your Cluster and Scale Profiles for more information.

Special Considerations⚓︎

Specify an Alternative AWS Role⚓︎

If your Kinesis configuration and your Hydrolix configuration do not share AWS roles, you need a way to specify an alternative AWS role ARN for accessing your Kinesis stream. Create a field named aws_role_arn within your Kinesis configuration settings. Use the full ARN as the value:

1
2
3
4
5
6
7
8
   "settings": {
      "stream_name": "arn:aws:kinesis:us-east-2:example:stream/my-example-stream",
      "region": "us-east-2",
      "aws_role_arn": "arn:aws:iam::1776422013:role/example",
      "checkpointer": {
         "name": "arn:aws:dynamodb:us-east-2:1234567890:table/test-kinesis"
      }
   }

You can also store credentials in the aws_key and aws_secret fields of your Kinesis configuration settings. Hydrolix stores these values in a Kubernetes secret as a JSON object. You can access this secret using the aws_creds_key value.

1
2
3
4
5
6
7
8
9
   "settings": {
      "stream_name": "arn:aws:kinesis:us-east-2:example:stream/my-example-stream",
      "region": "us-east-2",
      "checkpointer": {
            "name": "arn:aws:dynamodb:us-east-2:1234567890:table/test-kinesis"
      },
      "aws_key": "AWS_ACCESS_KEY_ID",
      "aws_secret": "AWS_SECRET_KEY"
   }

You can also specify the ARN, aws_key, and aws_secret. Simply add all three fields given in the two examples above.

Credential Order

Hydrolix first attempts to connect to your Kinesis stream using the ARN stored in aws_role_arn. If that does not work, Hydrolix attempts to connect using the environment variable stored in aws_creds. If neither method works, Hydrolix attempts to connect to the Kinesis stream without authentication.

AWS CloudWatch Ingest via Kinesis⚓︎

AWS CloudWatch uses a special format that passes multiple logs at a time through the logEvents message field. This requires special handling in Hydrolix.

Hydrolix supports a special subtype of transform to handle this log form. In your transform, assign settings.format_details.subtype a value of "cloudwatch":

{
  "name": "my_transform",
  "type": "json",
  "settings": {
    "format_details": {
      "subtype": "cloudwatch",
      "flattening": ...,
      ...
      }
    },
    ...
  }
}

When you use the "cloudwatch" subtype, each element of the array in the logEvents field becomes a separate ingested row, handled separately by your transform. Hydrolix deserializes the JSON in the "message" field of each logEvents element. This becomes the value of the logEvent field in the new log messages. Consider another example of the CloudWatch format: following logEvents array within a CloudWatch message:

{
  "owner": "012345678901",
  "logGroup": "/aws/apigateway/test-api-gateway-mgt-api",
  "logEvents": [
    {
      "id": "37469574158817812474116640748123909459676719559957151744",
      "message": "{\"apiName\":\"test-api-gateway-mgt-api\",\"authMS\":\"18\",\"caller\":\"-\",\"httpMethod\":\"POST\",\"ip\":\"42.188.213.42\",\"jwtActorClientId\":\"7501d544-6398-4c50-a3fc-b55f53616e54\",\"jwtActorEnvId\":\"d991999a-0af5-43e4-a4dc-851ec7d2ff9f\",\"jwtActorOrgId\":\"5d9fbc79-b254-449d-9cb8-6d1f40904cfb\",\"jwtActorUserId\":\"-\",\"pathEnvId\":\"a221fb90-1804-4a12-9aa4-8cf63b430ada\",\"pathOrgId\":\"-\",\"protocol\":\"HTTP/1.1\",\"proxyRoundTripMS\":\"365\",\"regionTag\":\"us-east-2\",\"requestId\":\"37473422-3316-4622-bef6-d1d1b74734c9\",\"requestPath\":\"/v1/environments/a221fb90-1804-4a12-9aa4-8cf63b430ada/users/554a405f-4847-44c7-9aed-851206c11ec7/roleAssignments\",\"requestTime\":\"30/Mar/2023:16:30:15 +0000\",\"resourcePath\":\"/environments/{environmentid}/users/{userid}/roleAssignments\",\"responseLength\":\"909\",\"roundTripMS\":\"392\",\"status\":\"201\",\"user\":\"-\",\"userAgent\":\"Apache-HttpClient/4.5.14 (Java/11.0.18)\",\"xPingTrueClientIP\":\"42.188.213.42\"}",
      "timestamp": 1680193815400
    },
    {
      "id": "37469574158817812474116640748123909459676719559957151744",
      "message": "{\"apiName\":\"test-api-gateway-mgt-api\",\"authMS\":\"18\",\"caller\":\"-\",\"httpMethod\":\"POST\",\"ip\":\"42.188.213.42\",\"jwtActorClientId\":\"7501d544-6398-4c50-a3fc-b55f53616e54\",\"jwtActorEnvId\":\"d991999a-0af5-43e4-a4dc-851ec7d2ff9f\",\"jwtActorOrgId\":\"5d9fbc79-b254-449d-9cb8-6d1f40904cfb\",\"jwtActorUserId\":\"-\",\"pathEnvId\":\"a221fb90-1804-4a12-9aa4-8cf63b430ada\",\"pathOrgId\":\"-\",\"protocol\":\"HTTP/1.1\",\"proxyRoundTripMS\":\"365\",\"regionTag\":\"us-east-2\",\"requestId\":\"37473422-3316-4622-bef6-d1d1b74734c9\",\"requestPath\":\"/v1/environments/a221fb90-1804-4a12-9aa4-8cf63b430ada/users/554a405f-4847-44c7-9aed-851206c11ec7/roleAssignments\",\"requestTime\":\"30/Mar/2023:16:30:15 +0000\",\"resourcePath\":\"/environments/{environmentid}/users/{userid}/roleAssignments\",\"responseLength\":\"909\",\"roundTripMS\":\"392\",\"status\":\"201\",\"user\":\"-\",\"userAgent\":\"Apache-HttpClient/4.5.14 (Java/11.0.18)\",\"xPingTrueClientIP\":\"42.188.213.42\"}",
      "timestamp": 1680193815285
    }
  ],
  "logStream": "68fe326b9e008e1490dd71496307b647",
  "messageType": "DATA_MESSAGE",
  "subscriptionFilters": [
    "KinesisFilter-NewRelic"
  ]
}

The first index of the logEvents field becomes the following ingested row:

{
  "owner": "012345678901",
  "logGroup": "/aws/apigateway/test-api-gateway-mgt-api",
  "logEvent": {
    "id": "37469574158817812474116640748123909459676719559957151744",
    "message": {
      "apiName": "test-api-gateway-mgt-api",
      "authMS": "18",
      "caller": "-",
      "httpMethod": "POST",
      "ip": "42.188.213.42",
      "jwtActorClientId": "7501d544-6398-4c50-a3fc-b55f53616e54",
      "jwtActorEnvId": "d991999a-0af5-43e4-a4dc-851ec7d2ff9f",
      "jwtActorOrgId": "5d9fbc79-b254-449d-9cb8-6d1f40904cfb",
      "jwtActorUserId": "-",
      "pathEnvId": "a221fb90-1804-4a12-9aa4-8cf63b430ada",
      "pathOrgId": "-",
      "protocol": "HTTP/1.1",
      "proxyRoundTripMS": "365",
      "regionTag": "us-east-2",
      "requestId": "37473422-3316-4622-bef6-d1d1b74734c9",
      "requestPath": "/v1/environments/a221fb90-1804-4a12-9aa4-8cf63b430ada/users/554a405f-4847-44c7-9aed-851206c11ec7/roleAssignments",
      "requestTime": "30/Mar/2023:16:30:15 +0000",
      "resourcePath": "/environments/{environmentid}/users/{userid}/roleAssignments",
      "responseLength": "909",
      "roundTripMS": "392",
      "status": "201",
      "user": "-",
      "userAgent": "Apache-HttpClient/4.5.14 (Java/11.0.18)",
      "xPingTrueClientIP": "42.188.213.42"
    },
    "timestamp": 1680193815400
  },
  "logStream": "68fe326b9e008e1490dd71496307b647",
  "messageType": "DATA_MESSAGE",
  "subscriptionFilters": [
    "KinesisFilter-NewRelic"
  ]
}

Scaling⚓︎

To scale the Kubernetes deployment for the ingest pool used by this source, including the number of replicas, memory, and CPU, see the scaling resource pools page.