Kinesis
Hydrolix can ingest data from AWS Kinesis, whether your cluster is running on AWS or any other cloud provider.
The basic steps are:
- Create a Project/Table
- Create a Transform
- Configure a checkpoint database
- Configure the Hydrolix Kinesis source and adjust the scale
This document assumes that the project, table and transform are already configured, and proceeds to steps 3 and 4 above. More information on how to perform steps 1 and 2 can be found here - Projects & Tables, Write Transforms.
Configure a Checkpoint Database
A checkpoint database table is needed by the Hydrolix kinesis-peers to track their progress. The most common option is to configure an AWS DynamoDB table for this purpose, but it's also possible to use a GCP Datastore. Both options are described below.
AWS DynamoDB Table
Prerequisites
In order to load data into Hydrolix from Kinesis, you will need the following in your AWS Account:
- A DynamoDB table to store checkpoint information
- An AWS user/role with access to DynamoDB and Kinesis
- The Kinesis ARN and region of your Kinesis stream
Create the DynamoDB table
The DynamoDB can be created with the AWS Console. The table should be created with the following options.
The following options should be applied:
Option | Value |
---|---|
Table Name | Hydrolix suggests using your client Id with the string _kinesis_check_point appended e.g. hdxcli-123456_kinesis_check_point |
Partition Key | StreamShard |
Settings | Select 'Customize settings' |
Table Class | DynamoDB Standard |
Read/Write capacity settings | On-demand |
Record your DynamoDB ARN
Make sure to record your new DynamoDB ARN for later use in Hydrolix's
checkpointer
setting.
Create a User/Role for access
To access the Kinesis queue and use the DynamoDB for checkpointing, your Hydrolix cluster will need a user/role that can read and write to these services in your account. This can be done within the AWS Console. More information is within AWS's Documentation.
For Kinesis access, make sure the user has permission to perform these actions:
"kinesis:GetRecords",
"kinesis:GetShardIterator",
"kinesis:DescribeStream",
"kinesis:ListShards"
If you've set up DynamoDB, ensure the user has permission to perform these actions:
"dynamodb:PutItem",
"dynamodb:DescribeTable",
"dynamodb:GetItem"
Make sure to record your AWS Secret ID and AWS Secret
AWS shows this to you only once during key creation.
Record Your Kinesis ARN
Retrieve your Kinesis ARN from the AWS console. You will need this later when you configure the Hydrolix platform.
Configure Access to Your Kinesis stream
Configure the Hydrolix Kinesis acquisition service using the Kinesis Sources API.
If your cluster uses AWS for storage, the service account needs access to Kinesis and DynamoDB.
Add the Access Key in Kubernetes
Kinesis & Non-AWS Storage Providers
Configure access keys to use Kinesis ingest with a cloud provider other than AWS.
Add the access key and DynamoDB ARN in the API call. Hydrolix will add the credentials into a Kubernetes secret:
POST {{base_url}}/v1/orgs/{ org_id}}/projects/{{project_id}}/tables/{{table_id}}/sources/kinesis/
Authorization: Bearer {{token}}
Content-Type: application/json
{
"name": "kinesissource",
"pool_name": "kinesispool",
"k8s_deployment": {
"service": "kinesis-peer",
"replicas": 1
},
"transform": "transform",
"settings": {
"stream_name": "arn:aws:kinesis:us-east-2:1234567890:stream/test-kinesis",
"region": "us-east-2",
"checkpointer": {
"name": "arn:aws:dynamodb:us-east-2:1234567890:table/test-kinesis"
},
"aws_key": "AWS_ACCESS_KEY_ID",
"aws_secret": "AWS_SECRET_KEY"
}
}
The Kinesis pool creation will respond with an object:
{
"uuid": "22b13359-7897-40d7-8e4d-5bcc0e7ce686",
"name": "kinesissource",
"url": "{{base_url}}/v1/orgs/{ org_id}}/projects/{{project_id}}/tables/{{table_id}}/sources/kinesis/{{kinesis_id}}",
"type": "pull",
"subtype": "kinesis",
"transform": "transform",
"table": "{{table_id}}",
"settings": {
"stream_name": "arn:aws:kinesis:us-east-2:1234567890:stream/test-kinesis",
"region": "us-east-2",
"organizer": {
"root_path": "/intake/stream/{{id}}"
},
"checkpointer": {
"name": "arn:aws:dynamodb:us-east-2:1234567890:table/test-kinesis"
},
"pool": {
"name": "kinesispool-{{id}}",
"description": null,
"uuid": "516b0001-ead9-48ae-b413-3ee03f5e9ff2",
"created": "2023-03-22T06:45:42.401740Z",
"modified": "2023-03-22T06:45:42.401766Z",
"settings": {
"is_head": false,
"is_default": false,
"k8s_deployment": {
"replicas": "1",
"service": "kinesis-peer"
}
},
"location": "880ecba8-6895-4892-a429-eb6c6d88de64",
"type": "elastic",
"tag": "pool"
},
"aws_creds_key": "K2447EFAD0B5F4E8C91A13CDD0BD5873D"
}
}
aws_creds_key
is the name of the secret in Kubernetes that stores your credentials.
GCP Datastore
Prerequisites
Hydrolix can read a Kinesis stream from GCP. The main difference is where the checkpoint is stored.
To store checkpoint into a Google Datastore, the service account used by Hydrolix needs to have access to that datastore.
Add Datastore Role
Edit the service account created in Google Cloud and add the role Cloud Datastore Owner
.
Create a Kinesis source using GCP Datastore
You can now create a new Kinesis datasource using the GCP Datastore for storing checkpoints. Here's an API call example:
POST {{base_url}}/v1/orgs/{ org_id}}/projects/{{project_id}}/tables/{{table_id}}/sources/kinesis/
Authorization: Bearer {{token}}
Content-Type: application/json
{
"name": "kinesissource",
"pool_name": "kinesispool",
"k8s_deployment": {
"service": "kinesis-peer",
"replicas": 1
},
"transform": "transform",
"settings": {
"stream_name": "arn:aws:kinesis:us-east-2:1234567890:stream/test-kinesis",
"region": "us-east-2",
"checkpointer": {
"name": "https://datastore.googleapis.com/hdx-kinesis"
},
"aws_key": "AWS_ACCESS_KEY_ID",
"aws_secret": "AWS_SECRET_KEY"
}
}
This will create a new Datastore table named hdx-kinesis
, which will store the checkpoint for the Kinesis stream named test-kinesis
.
Configure the Hydrolix Kinesis Source
Create your Kinesis source in Hydrolix using the API endpoint Create Kinesis Sources within the API.
POST {{base_url}}/v1/orgs/{ org_id}}/projects/{{project_id}}/tables/{{table_id}}/sources/kinesis/
Authorization: Bearer {{token}}
Content-Type: application/json
{
"name": "kinesissource",
"pool_name": "kinesispool",
"k8s_deployment": {
"cpu": 1,
"memory": "10Gi",
"service": "kinesis-peer"
},
"type": "pull",
"subtype": "kinesis",
"transform": "{{transform_name}}",
"table": "{{project_name}}.{{table_name}}",
"settings": {
"stream_name": "arn:aws:kinesis:us-east-2:1234567890:stream/test-kinesis",
"region": "us-east-2",
"checkpointer": {
"name": "arn:aws:dynamodb:us-east-2:1234567890:table/test-kinesis"
},
"aws_key": "AWS_ACCESS_KEY_ID",
"aws_secret": "AWS_SECRET_KEY"
}
}
The settings for the source are as follows:
Setting | Description | Example |
---|---|---|
name | Name of your Kinesis source | "myKinesisSource" |
pool_name | Name for the pool that will service the source | "myKinesisPool" |
k8s_deployment | Object describing the cpu/memory and service for the pool (kinesis-peer) | { "cpu": 1, "memory": "10Gi", "service": "kinesis-peer" } |
type | The method to retrieve stream data. This should be set as pull | "pull" |
subtype | The type of data source. This should be set as kinesis | "kinesis" |
transform | The name of the transform to use in ingesting data | "myTransform" |
table | The project and table to import the data into. | "myproject.mytable" |
settings stream_name | ARN for the Kinesis stream | "settings": { "stream_name": "arn:aws:kinesis:us-east-2:1234567890:stream/test-kinesis" } |
settings region | Region for the Kinesis stream | "settings": { "region": "us-east-2", } |
settings checkpointer name | ARN for the DynamoDB for AWS / Datastore table name for GCP | "settings": { "checkpointer": { "name": "arn:aws:kinesis:us-east-2:1234567890:stream/test-kinesis" } ...or... "settings": { "checkpointer": { "name": "<https://datastore.googleapis.com/hdx-kinesis>" } |
settings aws_key | AWS Key used to connect to the Kinesis stream | |
settings aws_secret | AWS Secret used to connect to the Kinesis stream |
Scale the Kinesis Service
The default scale profiles for Hydrolix set the scale of the kinesis-peers
to zero, since they're not needed for every Hydrolix cluster. To scale the service, edit hydrolixcluster.yaml
:
#Either:
kubectl edit hydrolixcluster
#or
kubectl get hydrolixclusters <NAMESPACE> -o yaml > hydrolix-cluster.yaml
vim hydrolix-cluster.yaml
kubectl apply -f hydrolix-cluster.yaml
Edit the replicas for the number of nodes you wish for Kinesis processing.
......
owner: admin
pools:
- cpu: "1"
memory: 10Gi
name: kinesispool-db9feb26-2a8c-4735-9b8a-6e6b7f207cbe
replicas: "1"
service: kinesis-peer
storage: 10Gi
region: us-central1
scale:
......
See Scale your Cluster and Scale Profiles for more information.
Special Considerations
Specify an Alternative AWS Role
If your Kinesis configuration and your Hydrolix configuration do not share AWS roles, you need a way to specify an alternative AWS role ARN for accessing your Kinesis stream. Create a field named aws_role_arn
within your Kinesis configuration settings. Use the full ARN as the value:
"settings": {
"stream_name": "arn:aws:kinesis:us-east-2:example:stream/my-example-stream",
"region": "us-east-2",
"aws_role_arn": "arn:aws:iam::1776422013:role/example",
"checkpointer": {
"name": "arn:aws:dynamodb:us-east-2:1234567890:table/test-kinesis"
}
}
You can also store credentials in the aws_key
and aws_secret
fields of your Kinesis configuration settings. Hydrolix stores these values in a Kubernetes secret as a JSON object. You can access this secret using the aws_creds_key
value.
"settings": {
"stream_name": "arn:aws:kinesis:us-east-2:example:stream/my-example-stream",
"region": "us-east-2",
"checkpointer": {
"name": "arn:aws:dynamodb:us-east-2:1234567890:table/test-kinesis"
},
"aws_key": "AWS_ACCESS_KEY_ID",
"aws_secret": "AWS_SECRET_KEY"
}
You can also specify the ARN, aws_key
, and aws_secret
. Simply add all three fields given in the two examples above.
Credential Order
Hydrolix first attempts to connect to your Kinesis stream using the ARN stored in
aws_role_arn
. If that does not work, Hydrolix attempts to connect using the environment variable stored inaws_creds
. If neither method works, Hydrolix attempts to connect to the Kinesis stream without authentication.
AWS CloudWatch Ingest via Kinesis
AWS CloudWatch uses a special format that passes multiple logs at a time through the logEvents
message field. This requires special handling in Hydrolix.
Hydrolix supports a special subtype of transform to handle this log form. In your transform, assign settings.format_details.subtype
a value of "cloudwatch":
{
"name": "my_transform",
"type": "json",
"settings": {
"format_details": {
"subtype": "cloudwatch",
"flattening": ...,
...
}
},
...
}
}
When you use the "cloudwatch" subtype, each element of the array in the logEvents
field becomes a separate ingested row, handled separately by your transform. Hydrolix deserializes the JSON in the "message" field of each logEvents
element. This becomes the value of the logEvent
field in the new log messages. Consider another example of the CloudWatch format: following logEvents
array within a CloudWatch message:
{
"owner": "012345678901",
"logGroup": "/aws/apigateway/test-api-gateway-mgt-api",
"logEvents": [
{
"id": "37469574158817812474116640748123909459676719559957151744",
"message": "{\"apiName\":\"test-api-gateway-mgt-api\",\"authMS\":\"18\",\"caller\":\"-\",\"httpMethod\":\"POST\",\"ip\":\"42.188.213.42\",\"jwtActorClientId\":\"7501d544-6398-4c50-a3fc-b55f53616e54\",\"jwtActorEnvId\":\"d991999a-0af5-43e4-a4dc-851ec7d2ff9f\",\"jwtActorOrgId\":\"5d9fbc79-b254-449d-9cb8-6d1f40904cfb\",\"jwtActorUserId\":\"-\",\"pathEnvId\":\"a221fb90-1804-4a12-9aa4-8cf63b430ada\",\"pathOrgId\":\"-\",\"protocol\":\"HTTP/1.1\",\"proxyRoundTripMS\":\"365\",\"regionTag\":\"us-east-2\",\"requestId\":\"37473422-3316-4622-bef6-d1d1b74734c9\",\"requestPath\":\"/v1/environments/a221fb90-1804-4a12-9aa4-8cf63b430ada/users/554a405f-4847-44c7-9aed-851206c11ec7/roleAssignments\",\"requestTime\":\"30/Mar/2023:16:30:15 +0000\",\"resourcePath\":\"/environments/{environmentid}/users/{userid}/roleAssignments\",\"responseLength\":\"909\",\"roundTripMS\":\"392\",\"status\":\"201\",\"user\":\"-\",\"userAgent\":\"Apache-HttpClient/4.5.14 (Java/11.0.18)\",\"xPingTrueClientIP\":\"42.188.213.42\"}",
"timestamp": 1680193815400
},
{
"id": "37469574158817812474116640748123909459676719559957151744",
"message": "{\"apiName\":\"test-api-gateway-mgt-api\",\"authMS\":\"18\",\"caller\":\"-\",\"httpMethod\":\"POST\",\"ip\":\"42.188.213.42\",\"jwtActorClientId\":\"7501d544-6398-4c50-a3fc-b55f53616e54\",\"jwtActorEnvId\":\"d991999a-0af5-43e4-a4dc-851ec7d2ff9f\",\"jwtActorOrgId\":\"5d9fbc79-b254-449d-9cb8-6d1f40904cfb\",\"jwtActorUserId\":\"-\",\"pathEnvId\":\"a221fb90-1804-4a12-9aa4-8cf63b430ada\",\"pathOrgId\":\"-\",\"protocol\":\"HTTP/1.1\",\"proxyRoundTripMS\":\"365\",\"regionTag\":\"us-east-2\",\"requestId\":\"37473422-3316-4622-bef6-d1d1b74734c9\",\"requestPath\":\"/v1/environments/a221fb90-1804-4a12-9aa4-8cf63b430ada/users/554a405f-4847-44c7-9aed-851206c11ec7/roleAssignments\",\"requestTime\":\"30/Mar/2023:16:30:15 +0000\",\"resourcePath\":\"/environments/{environmentid}/users/{userid}/roleAssignments\",\"responseLength\":\"909\",\"roundTripMS\":\"392\",\"status\":\"201\",\"user\":\"-\",\"userAgent\":\"Apache-HttpClient/4.5.14 (Java/11.0.18)\",\"xPingTrueClientIP\":\"42.188.213.42\"}",
"timestamp": 1680193815285
}
],
"logStream": "68fe326b9e008e1490dd71496307b647",
"messageType": "DATA_MESSAGE",
"subscriptionFilters": [
"KinesisFilter-NewRelic"
]
}
The first index of the logEvents
field becomes the following ingested row:
{
"owner": "012345678901",
"logGroup": "/aws/apigateway/test-api-gateway-mgt-api",
"logEvent": {
"id": "37469574158817812474116640748123909459676719559957151744",
"message": {
"apiName": "test-api-gateway-mgt-api",
"authMS": "18",
"caller": "-",
"httpMethod": "POST",
"ip": "42.188.213.42",
"jwtActorClientId": "7501d544-6398-4c50-a3fc-b55f53616e54",
"jwtActorEnvId": "d991999a-0af5-43e4-a4dc-851ec7d2ff9f",
"jwtActorOrgId": "5d9fbc79-b254-449d-9cb8-6d1f40904cfb",
"jwtActorUserId": "-",
"pathEnvId": "a221fb90-1804-4a12-9aa4-8cf63b430ada",
"pathOrgId": "-",
"protocol": "HTTP/1.1",
"proxyRoundTripMS": "365",
"regionTag": "us-east-2",
"requestId": "37473422-3316-4622-bef6-d1d1b74734c9",
"requestPath": "/v1/environments/a221fb90-1804-4a12-9aa4-8cf63b430ada/users/554a405f-4847-44c7-9aed-851206c11ec7/roleAssignments",
"requestTime": "30/Mar/2023:16:30:15 +0000",
"resourcePath": "/environments/{environmentid}/users/{userid}/roleAssignments",
"responseLength": "909",
"roundTripMS": "392",
"status": "201",
"user": "-",
"userAgent": "Apache-HttpClient/4.5.14 (Java/11.0.18)",
"xPingTrueClientIP": "42.188.213.42"
},
"timestamp": 1680193815400
},
"logStream": "68fe326b9e008e1490dd71496307b647",
"messageType": "DATA_MESSAGE",
"subscriptionFilters": [
"KinesisFilter-NewRelic"
]
}
Updated about 2 months ago