Data Enrichment
Use SQL to modify, enrich and filter your data before indexing
Very often you need to modify data before it is stored. Hydrolix has the ability through an additional process that can be invoked through the transform being utilised for loading data.
To modify and enrich data, SQL statements can be utilized within the transform. These statements have access to all the functions, methods, operators and advanced features that can be found with the Query capabilities offered by Hydrolix. This includes more advanced features such as Custom Functions and Custom Dictionaries .
These SQL statements are referred to as "SQL Transforms." Include them in the settings
section of your transform under the sql_transform
key.
Example: Splitting Message Into Separate Column
To best explain how this process works an example is provided below. The following raw data is supplied and requires enrichment/modification.
{
"timestamp": "2022-05-31 14:41:52.000000",
"remote_ip": "114.80.245.62",
"remote_user": "-",
"request": "GET /downloads/product_2 HTTP/1.1",
"response": 200,
"bytes": 26318005,
"referrer": "-",
"agent": "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/38.0.2125.104 Safari/537.36"
}
The enrichment and modification required includes:
- The
request
column to be split into 3 separate columns- method, path and protocol. - For privacy purposes we need to obfuscate the remote_ip to remove the last octet.
With Real-Time Enrichment we have the ability to run an SQL query on this data.
SELECT splitByWhitespace(assumeNotNull(request))[1] as method,
splitByWhitespace(assumeNotNull(request))[2] as path,
splitByWhitespace(assumeNotNull(request))[3] as protocol,
IPv4NumToStringClassC(toIPv4(assumeNotNull(remote_ip))) as remote_ip,
*
FROM {STREAM}
In the query above we use functions already available to operate on the raw data.
- The
splitByWhitespace
splits up therequest
object in our raw data into its component parts separated by white space. - The
IPv4NumToStringClassC
andtoIPv4
remove the last octet from the IPv4 address in theremote_ip
object.
In addition the *
has been added to the query to ensure all the columns are made available on top of the virtual ones that are created with our SELECT
query.
Lastly, as the data is being processed as a stream the FROM
portion of the statement uses a special {STREAM}
term to reference the incoming data.
The SQL statement is then added to the sql_transform
object within the settings
object in the transform.
{
"name": "transform",
"description": "Demo SQL Transform",
"type": "json",
"settings": {
"is_default": true,
"sql_transform": "SELECT splitByWhitespace(assumeNotNull(request))[1] as method, splitByWhitespace(assumeNotNull(request))[2] as path, splitByWhitespace(assumeNotNull(request))[3] as protocol, IPv4NumToStringClassC(toIPv4(assumeNotNull(remote_ip))) as remote_ip, * FROM {STREAM}"
}
}
In order to ensure the data is then written to the table the Transform needs to be updated with the new virtual columns generated by the SQL statement. For example:
{
"name": "path",
"datatype":{
"type": "string",
"source": {
"from_input_field": "sql_transform"
}
}
}
In the example the full transform is as follows:
{
"name": "transform",
"description": "Demo SQL Transform",
"type": "json",
"settings": {
"is_default": true,
"sql_transform": "SELECT splitByWhitespace(assumeNotNull(request))[1] as method, splitByWhitespace(assumeNotNull(request))[2] as path, splitByWhitespace(assumeNotNull(request))[3] as protocol, IPv4NumToStringClassC(toIPv4(assumeNotNull(remote_ip))) as remote_ip, * FROM {STREAM}",
"output_columns": [
{
"name": "timestamp",
"datatype": {
"type": "datetime",
"format": "2006-01-02 15:04:05.000000",
"primary": true,
"resolution": "ms"
}
},
{
"name": "remote_ip",
"datatype": {
"type": "string"
}
},
{
"name": "request",
"datatype": {
"type": "string"
}
},
{
"name": "response",
"datatype": {
"type": "uint32"
}
},
{
"name": "bytes",
"datatype":{
"type": "uint64"
}
},
{
"name": "referrer",
"datatype":{
"type": "string"
}
},
{
"name": "agent",
"datatype":{
"type": "string"
}
},
{
"name": "method",
"datatype":{
"type": "string",
"source": {
"from_input_field": "sql_transform"
}
}
},
{
"name": "path",
"datatype":{
"type": "string",
"source": {
"from_input_field": "sql_transform"
}
}
},
{
"name": "protocol",
"datatype":{
"type": "string",
"source": {
"from_input_field": "sql_transform"
}
}
}
],
"compression": "none",
"format_details": {
"flattening": {
"active": false,
"map_flattening_strategy": null,
"slice_flattening_strategy": null
}
}
}
}
After transforming and indexing the data the result looks like the following:
{
"agent": "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/38.0.2125.104 Safari/537.36",
"bytes": "26318005",
"method": "GET",
"path": "/downloads/product_2",
"protocol": "HTTP/1.1",
"referrer": "-",
"remote_ip": "114.80.245.xxx",
"request": "GET /downloads/product_2 HTTP/1.1",
"response": 200,
"timestamp": "2022-05-31 14:41:52.000"
}
CSV file as data source
In the example above, the file type is JSON, however when the file type is CSV the source should be empty when creating the transform. The name needs to match the name in the SQL transform.
{
"name": "path",
"datatype":{
"type": "string"
}
}
Example removing rows based on pattern
Another common example is to be able to remove rows based on specific pattern.
Let's say you want to avoid indexing rows where User-Agent
contains *bot*
we can express this with the following statement:
SELECT splitByWhitespace(assumeNotNull(request))[1] as method,
splitByWhitespace(assumeNotNull(request))[2] as path,
splitByWhitespace(assumeNotNull(request))[3] as protocol,
IPv4NumToStringClassC(toIPv4(assumeNotNull(remote_ip))) as remote_ip,
*
FROM {STREAM}
WHERE agent NOT ILIKE '%bot%'
This will do the split we created previously but won't select / index rows containing *bot*
.
Example splitting of key value pair string
From time to time end users are indexing string which are key value pair, a good example is query string from a request:
"querystring": "version=1&user=anonymous&cache=false"
Technically we could index the whole querystring
as string directly, but searching might be difficult.
The following function extract_key_pair
will split the string into a map of key pair value:
(string, pair_delimiters, key_value_pair_delimiter) -> CAST((arrayMap(y -> trimLeft((y[1])), arrayMap(v -> splitByChar(key_value_pair_delimiter, v), flatten(extractAllGroups(assumeNotNull(string), concat('([^', pair_delimiters, ']+)'))))), arrayMap(y -> trimLeft((y[2])), arrayMap(v -> splitByChar(key_value_pair_delimiter, v), flatten(extractAllGroups(assumeNotNull(string), concat('([^', pair_delimiters, ']+)')))))), 'Map(String, String)')
This function takes 3 parameters, the string you want to split, the separator between the different key/pair and finally the separator between the key/value.
Going back to the querystring
example we could use the function:
SELECT $project_extract_key_pair(querystring, '&', '=') as querymap
Result will be:
┌─querymap───────────────────────────────────────────┐
│ {'version':'1','user':'anonymous','cache':'false'} │
└────────────────────────────────────────────────────┘
MAP values require the same type
By default map value will be indexed as string
Going back to our main example:
{
"timestamp": "2022-05-31 14:41:52.000000",
"remote_ip": "114.80.245.62",
"remote_user": "-",
"request": "GET /downloads/product_2 HTTP/1.1",
"response": 200,
"bytes": 26318005,
"referrer": "-",
"querystring": "version=1&user=anonymous&cache=false",
"agent": "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/38.0.2125.104 Safari/537.36"
}
We can now use the following SQL transform:
SELECT splitByWhitespace(assumeNotNull(request))[1] as method,
splitByWhitespace(assumeNotNull(request))[2] as path,
splitByWhitespace(assumeNotNull(request))[3] as protocol,
IPv4NumToStringClassC(toIPv4(assumeNotNull(remote_ip))) as remote_ip,
$project_extract_key_pair(querystring, '&', '=') as querymap,
*
FROM {STREAM}
WHERE agent NOT ILIKE '%bot%'
And to include the querymap you need to add the following output column in your transform:
{
"name": "querystring",
"datatype": {
"type": "string"
}
},
{
"name": "querymap",
"datatype": {
"type": "map",
"index": true,
"source": {
"from_input_field": "sql_transform"
},
"elements": [
{
"type": "string",
"index": true
},
{
"type": "string",
"index": true
}
]
}
}
Import the function
In order for this to work, you need to import the function into your project and replace the
$projectname
in the SQL statement with your project
Updated about 2 months ago