via Kinesis
You can ingest data into Hydrolix with AWS Kinesis. You can access AWS Kinesis from Hydrolix clusters running on AWS or any other cloud provider.
The basic steps are:
- Create a Project/Table
- Create a Transform
- Configure the Kinesis Source AWS account.
- Configure the Hydrolix Kinesis Service and Scale.
It is assumed that the project, table and transform are all already configured. More information on how to set these up can be found here - Projects & Tables, Write Transforms.
Prerequisites
In order to load data into Hydrolix from Kinesis you will need the following in your AWS Account:
- A DynamoDB table to store checkpoint information
- An AWS user/role with access to DynamoDB and Kinesis.
- Kinesis ARN and region of your Kinesis stream
Create the DynamoDB Table
The DynamoDB can be created with the AWS Console. The Table should be created with the following options.
The following options should be applied:
Option | Value |
---|---|
Table Name | Hydrolix would suggest using your client Id with the string _kinesis_check_point appended e.g. hdxcli-123456_kinesis_check_point |
Partition Key | StreamShard |
Settings | Select 'Customized' |
Table Class | DynamoDB Standard |
Read/Write capacity settings | On-Demand |
Make sure to grab your DynamoDB ARN
Create a User/Role for access
For your Hydrolix cluster to access the Kinesis queue and use the DynamoDB for checkpointing the cluster will need a user/role that can read and write to these services in your account. This can be done within the AWS Console. More information is within AWS's Documentation.
Make sure to record your AWS Secret ID and AWS Secret
Record your Kinesis ARN
Retrieve your Kinesis ARN from the AWS console. You will need this later when you configure the Hydrolix platform.
Configure Access to your Kinesis
The configuration of the Hydrolix platform comes in two parts:
- Add the
Secret ID
andSecret Key
to the tunables (AWS CloudFormation) or environment variables (Kubernetes). - Configure the Hydrolix Kinesis acquisition service using the Kinesis Sources API.
If your cluster uses AWS for storage, the service account needs access to Kinesis and DynamoDB.
Add the Access Key in Kubernetes
Kinesis & Non-AWS Storage Providers
Configure access keys to use Kinesis ingest with a cloud provider other than AWS.
You can add the access key in the API call it'll add the credentials into a k8s secret:
POST {{base_url}}/v1/orgs/{ org_id}}/projects/{{project_id}}/tables/{{table_id}}/sources/kinesis/
Authorization: Bearer {{token}}
Content-Type: application/json
{
"name": "kinesissource",
"pool_name": "kinesispool",
"k8s_deployment": {
"service": "kinesis-peer",
"replicas": 1
},
"transform": "transform",
"settings": {
"stream_name": "arn:aws:kinesis:us-east-2:1234567890:stream/test-kinesis",
"region": "us-east-2",
"checkpointer": {
"name": "arn:aws:dynamodb:us-east-2:1234567890:table/test-kinesis"
},
"aws_key": "XXXXXXXXXXXX",
"aws_secret": "YYYYYYYYYYYYYYYYY"
}
}
The Kinesis pool creation will respond with an object:
{
"uuid": "22b13359-7897-40d7-8e4d-5bcc0e7ce686",
"name": "kinesissource",
"url": "{{base_url}}/v1/orgs/{ org_id}}/projects/{{project_id}}/tables/{{table_id}}/sources/kinesis/{{kinesis_id}}",
"type": "pull",
"subtype": "kinesis",
"transform": "transform",
"table": "{{table_id}}",
"settings": {
"stream_name": "arn:aws:kinesis:us-east-2:1234567890:stream/test-kinesis",
"region": "us-east-2",
"organizer": {
"root_path": "/intake/stream/{{id}}"
},
"checkpointer": {
"name": "arn:aws:dynamodb:us-east-2:1234567890:table/test-kinesis"
},
"pool": {
"name": "kinesispool-{{id}}",
"description": null,
"uuid": "516b0001-ead9-48ae-b413-3ee03f5e9ff2",
"created": "2023-03-22T06:45:42.401740Z",
"modified": "2023-03-22T06:45:42.401766Z",
"settings": {
"is_head": false,
"is_default": false,
"k8s_deployment": {
"replicas": "1",
"service": "kinesis-peer"
}
},
"location": "880ecba8-6895-4892-a429-eb6c6d88de64",
"type": "elastic",
"tag": "pool"
},
"aws_creds_key": "K2447EFAD0B5F4E8C91A13CDD0BD5873D"
}
}
The AWS_creds_key is the name of the secret in k8s storing the credentials.
GCP Pre-configuration steps
Hydrolix allows to read Kinesis stream from GCP, the main difference is where the checkpoint is stored.
To store checkpoint into Google NoSql datastore the service account used by the Hydrolix needs to have access to datastore.
You should edit the service account created in Google Cloud and add the role Cloud Datastore Owner
.
Similar to:
Create Kinesis source using GCP datastore.
You can now create a new Kinesis datasource leveraging Datastore for checkpoint, here's an API call example:
POST {{base_url}}/v1/orgs/{ org_id}}/projects/{{project_id}}/tables/{{table_id}}/sources/kinesis/
Authorization: Bearer {{token}}
Content-Type: application/json
{
"name": "kinesissource",
"pool_name": "kinesispool",
"k8s_deployment": {
"service": "kinesis-peer",
"replicas": 1
},
"transform": "transform",
"settings": {
"stream_name": "arn:aws:kinesis:us-east-2:1234567890:stream/test-kinesis",
"region": "us-east-2",
"checkpointer": {
"name": "https://datastore.googleapis.com/hdx-kinesis"
},
"aws_key": "XXXXXXXXXXXX",
"aws_secret": "YYYYYYYYYYYYYYYYY"
}
}
This will create a new datastore table hdx-kinesis
which will store the checkpoint for the Kinesis stream name test-kinesis.
Configure the Hydrolix Kinesis Service
To create your Kinesis source in Hydrolix. This is done using the API endpoint Create Kinesis Sources within the API.
POST {{base_url}}/v1/orgs/{ org_id}}/projects/{{project_id}}/tables/{{table_id}}/sources/kinesis/
Authorization: Bearer {{token}}
Content-Type: application/json
{
"name": "kinesissource",
"pool_name": "kinesispool",
"k8s_deployment": {
"cpu": 1,
"memory": 10Gi,
"service": "kinesis-peer"
},
"type": "pull",
"subtype": "kinesis",
"transform": "{{transform_name}}",
"table": "{{project_name}}.{{table_name}}",
"settings": {
"stream_name": "arn:aws:kinesis:us-east-2:1234567890:stream/test-kinesis",
"region": "us-east-2",
"checkpointer": {
"name": "arn:aws:dynamodb:us-east-2:1234567890:table/test-kinesis"
},
"aws_key": "XXXXXXXXXXXX",
"aws_secret": "YYYYYYYYYYYYYYYYY"
}
}
The settings for the source are as follows:
Setting | Description | Example |
---|---|---|
name | Name of your Kinesis Source | myKinesisSource |
pool_name | Name for the pool that will service the source | myKinesisPool |
k8s_deployment | Object describing the cpu/memory and service for the pool (kinesis-peer) | { "cpu": 1, ""memory: 1, "service": "kinesis-peer" } |
type | The method to retrieve stream data. This should be set as pull | pull |
subtype | The type of data source. This should be set as kinesis | kinesis |
transform | The name of the transform to use in ingesting data | myTransform |
table | The project and table to import the data into. | myproject.mytable |
settings stream_name | ARN for the Kinesis stream | settings": { "stream_name": "arn:aws:kinesis:us-east-2:1234567890:stream/test-kinesis" } |
settings region | Region for the Kinesis stream | settings": { "region": "us-east-2", } |
settings checkpointer name | ARN for the DynamoDB for AWS / Datastore Table name for GCP | settings": { "checkpointer": { "name": "arn:aws:kinesis:us-east-2:1234567890:stream/test-kinesis" } settings": { "checkpointer": { "name": "https://datastore.googleapis.com/hdx-kinesis" } |
settings aws_key | AWS Key used to connect to kinesis stream | |
settings aws_secret | AWS Secret used to connect to kinesis stream |
Via Cloudwatch
Cloudwatch passes multiple logs at a time through the logEvents
message field:
{
"owner": "111111111111",
"logGroup": "CloudTrail/logs",
"logStream": "111111111111_CloudTrail/logs_us-east-1",
"subscriptionFilters": [
"Destination"
],
"messageType": "DATA_MESSAGE",
"logEvents": [
{
"id": "31953106606966983378809025079804211143289615424298221568",
"timestamp": 1432826855000,
"message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
},
{
"id": "31953106606966983378809025079804211143289615424298221569",
"timestamp": 1432826855000,
"message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
},
{
"id": "31953106606966983378809025079804211143289615424298221570",
"timestamp": 1432826855000,
"message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
}
]
}
Hydrolix supports a special type of transform to handle this log form. In your transform, assign settings.format_details.subtype
a value of "cloudwatch":
{
"name": "my_transform",
"type": "json",
"settings": {
"format_details": {
"subtype": "cloudwatch",
"flattening": ...,
...
}
},
...
}
}
When you use the "cloudwatch" subtype, each element of the array in the logEvents
field becomes a separate ingested row, handled separately by your transform. Hydrolix deserializes the JSON in the "message" field of each logEvents
element. This becomes the value of the logEvent
field in the new log messages. Consider the following logEvents
array within a Cloudwatch message:
{
"owner": "012345678901",
"logGroup": "/aws/apigateway/test-api-gateway-mgt-api",
"logEvents": [
{
"id": "37469574158817812474116640748123909459676719559957151744",
"message": "{\"apiName\":\"test-api-gateway-mgt-api\",\"authMS\":\"18\",\"caller\":\"-\",\"httpMethod\":\"POST\",\"ip\":\"42.188.213.42\",\"jwtActorClientId\":\"7501d544-6398-4c50-a3fc-b55f53616e54\",\"jwtActorEnvId\":\"d991999a-0af5-43e4-a4dc-851ec7d2ff9f\",\"jwtActorOrgId\":\"5d9fbc79-b254-449d-9cb8-6d1f40904cfb\",\"jwtActorUserId\":\"-\",\"pathEnvId\":\"a221fb90-1804-4a12-9aa4-8cf63b430ada\",\"pathOrgId\":\"-\",\"protocol\":\"HTTP/1.1\",\"proxyRoundTripMS\":\"365\",\"regionTag\":\"us-east-2\",\"requestId\":\"37473422-3316-4622-bef6-d1d1b74734c9\",\"requestPath\":\"/v1/environments/a221fb90-1804-4a12-9aa4-8cf63b430ada/users/554a405f-4847-44c7-9aed-851206c11ec7/roleAssignments\",\"requestTime\":\"30/Mar/2023:16:30:15 +0000\",\"resourcePath\":\"/environments/{environmentid}/users/{userid}/roleAssignments\",\"responseLength\":\"909\",\"roundTripMS\":\"392\",\"status\":\"201\",\"user\":\"-\",\"userAgent\":\"Apache-HttpClient/4.5.14 (Java/11.0.18)\",\"xPingTrueClientIP\":\"42.188.213.42\"}",
"timestamp": 1680193815400
},
{
"id": "37469574158817812474116640748123909459676719559957151744",
"message": "{\"apiName\":\"test-api-gateway-mgt-api\",\"authMS\":\"18\",\"caller\":\"-\",\"httpMethod\":\"POST\",\"ip\":\"42.188.213.42\",\"jwtActorClientId\":\"7501d544-6398-4c50-a3fc-b55f53616e54\",\"jwtActorEnvId\":\"d991999a-0af5-43e4-a4dc-851ec7d2ff9f\",\"jwtActorOrgId\":\"5d9fbc79-b254-449d-9cb8-6d1f40904cfb\",\"jwtActorUserId\":\"-\",\"pathEnvId\":\"a221fb90-1804-4a12-9aa4-8cf63b430ada\",\"pathOrgId\":\"-\",\"protocol\":\"HTTP/1.1\",\"proxyRoundTripMS\":\"365\",\"regionTag\":\"us-east-2\",\"requestId\":\"37473422-3316-4622-bef6-d1d1b74734c9\",\"requestPath\":\"/v1/environments/a221fb90-1804-4a12-9aa4-8cf63b430ada/users/554a405f-4847-44c7-9aed-851206c11ec7/roleAssignments\",\"requestTime\":\"30/Mar/2023:16:30:15 +0000\",\"resourcePath\":\"/environments/{environmentid}/users/{userid}/roleAssignments\",\"responseLength\":\"909\",\"roundTripMS\":\"392\",\"status\":\"201\",\"user\":\"-\",\"userAgent\":\"Apache-HttpClient/4.5.14 (Java/11.0.18)\",\"xPingTrueClientIP\":\"42.188.213.42\"}",
"timestamp": 1680193815285
}
],
"logStream": "68fe326b9e008e1490dd71496307b647",
"messageType": "DATA_MESSAGE",
"subscriptionFilters": [
"KinesisFilter-NewRelic"
]
}
The first index of the logEvents
field becomes the following ingested row:
{
"owner": "012345678901",
"logGroup": "/aws/apigateway/test-api-gateway-mgt-api",
"logEvent": {
"id": "37469574158817812474116640748123909459676719559957151744",
"message": {
"apiName": "test-api-gateway-mgt-api",
"authMS": "18",
"caller": "-",
"httpMethod": "POST",
"ip": "42.188.213.42",
"jwtActorClientId": "7501d544-6398-4c50-a3fc-b55f53616e54",
"jwtActorEnvId": "d991999a-0af5-43e4-a4dc-851ec7d2ff9f",
"jwtActorOrgId": "5d9fbc79-b254-449d-9cb8-6d1f40904cfb",
"jwtActorUserId": "-",
"pathEnvId": "a221fb90-1804-4a12-9aa4-8cf63b430ada",
"pathOrgId": "-",
"protocol": "HTTP/1.1",
"proxyRoundTripMS": "365",
"regionTag": "us-east-2",
"requestId": "37473422-3316-4622-bef6-d1d1b74734c9",
"requestPath": "/v1/environments/a221fb90-1804-4a12-9aa4-8cf63b430ada/users/554a405f-4847-44c7-9aed-851206c11ec7/roleAssignments",
"requestTime": "30/Mar/2023:16:30:15 +0000",
"resourcePath": "/environments/{environmentid}/users/{userid}/roleAssignments",
"responseLength": "909",
"roundTripMS": "392",
"status": "201",
"user": "-",
"userAgent": "Apache-HttpClient/4.5.14 (Java/11.0.18)",
"xPingTrueClientIP": "42.188.213.42"
},
"timestamp": 1680193815400
},
"logStream": "68fe326b9e008e1490dd71496307b647",
"messageType": "DATA_MESSAGE",
"subscriptionFilters": [
"KinesisFilter-NewRelic"
]
}
Specify an AWS Role for Kinesis Streaming
If your Kinesis configuration and your Hydrolix configuration do not share AWS roles, you need a way to specify an alternative AWS role ARN for accessing your Kinesis stream. Create a field named aws_role_arn
within your Kinesis configuration settings. Use the full ARN as the value:
{
"name": "kinesis",
"type": "kinesis_source",
"k8s_deployment": {
"service": "kinesis-peer",
"replicas": 1
},
"transform": "transform",
"table": "kinesis.kinesis_table",
"settings": {
"stream_name": "arn:aws:kinesis:us-east-2:example:stream/my-example-stream",
"region": "us-east-2",
"aws_role_arn": "arn:aws:iam::1776422013:role/example"
}
}
Alternatively, you can store credentials in the aws_key
and aws_secret
fields of your Kinesis configuration settings. Hydrolix stores these values in a Kubernetes secret as a JSON object. You can access this secret using the aws_creds_key
value.
{
"name": "kinesis",
"type": "kinesis_source",
"k8s_deployment": {
"service": "kinesis-peer",
"replicas": 1
},
"transform": "transform",
"table": "kinesis.kinesis_table",
"settings": {
"stream_name": "arn:aws:kinesis:us-east-2:example:stream/my-example-stream",
"region": "us-east-2",
"aws_key": "<KEY>",
"aws_secret": "<SECRET>"
}
}
Credential Order
Hydrolix first attempts to connect to your Kinesis stream using the ARN stored in
aws_role_arn
. If that does not work, Hydrolix attempts to connect using the environment variable stored inaws_creds
. If neither method works, Hydrolix attempts to connect to the Kinesis stream without authentication.
Specify an Alternative Checkpoint Datastore
By default, Hydrolix stores Kinesis checkpoint data in a local DynamoDB instance. Use the settings.checkpointer
object to specify an alternate checkpoint datastore:
{
"name": "kinesis",
"type": "kinesis_source",
"k8s_deployment": {
"service": "kinesis-peer",
"replicas": 1
},
"transform": "transform",
"table": "kinesis.kinesis_table",
"settings": {
"stream_name": "arn:aws:kinesis:us-east-2:1776422013:stream/my-example-stream",
"region": "us-east-2",
"checkpointer": {
"name": "arn:aws:dynamodb:us-east-2:1776422013:table/my-example-stream",
"datastore_project_id": "example-project-id-54321"
},
"aws_role_arn": "arn:aws:iam::1776422013:role/example"
}
}
Within the name
field, specify an ARN or GCP URL. If you specify a resource within GCP, use the datastore_project_id
field to specify a project.
Scale the Kinesis Service
To scale the service, edit hydrolixcluster.yaml
:
#Either:
kubectl edit hydrolixcluster
#or
kubectl get hydrolixclusters <NAMESPACE> -o yaml > hydrolix-cluster.yaml
vim hydrolix-cluster.yaml
kubectl apply -f hydrolix-cluster.yaml
Edit the replicas for the number of nodes you wish for Kinesis.
......
owner: admin
pools:
- cpu: "1"
memory: 10Gi
name: kinesispool-db9feb26-2a8c-4735-9b8a-6e6b7f207cbe
replicas: "1"
service: kinesis-peer
storage: 10Gi
region: us-central1
scale:
......
Updated 4 months ago