Data Enrichment
Leveraging SQL to modify, enrich and filter your data before indexing
Very often you need to modify data before it is stored. Hydrolix has the ability through an additional process that can be invoked through the transform being utilised for loading data.
To modify and enrich data, SQL statements can be utilized within the transform. These statements have access to all the functions, methods, operators and advanced features that can be found with the Query capabilities offered by Hydrolix. This includes more advanced features such as Custom Functions and Custom Dictionaries .
Example splitting message into separate column
To best explain how this process works an example is provided below. The following raw data is supplied and requires enrichment/modification.
{
"timestamp": "2022-05-31 14:41:52.000000",
"remote_ip": "114.80.245.62",
"remote_user": "-",
"request": "GET /downloads/product_2 HTTP/1.1",
"response": 200,
"bytes": 26318005,
"referrer": "-",
"agent": "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/38.0.2125.104 Safari/537.36"
}
The enrichment and modification required includes:
- The
request
column to be split into 3 separate columns- method, path and protocol. - For privacy purposes we need to obfuscate the remote_ip to remove the last octet.
With Real-Time Enrichment we have the ability to run an SQL query on this data.
SELECT splitByWhitespace(assumeNotNull(request))[1] as method,
splitByWhitespace(assumeNotNull(request))[2] as path,
splitByWhitespace(assumeNotNull(request))[3] as protocol,
IPv4NumToStringClassC(toIPv4(assumeNotNull(remote_ip))) as remote_ip,
*
FROM {STREAM}
In the query above we use functions already available to operate on the raw data.
- The
splitByWhitespace
splits up therequest
object in our raw data into its component parts separated by white space. - The
IPv4NumToStringClassC
andtoIPv4
remove the last octet from the IPv4 address in theremote_ip
object.
In addition the *
has been added to the query to ensure all the columns are made available on top of the virtual ones that are created with our SELECT
query.
Lastly, as the data is being processed as a stream the FROM
portion of the statement uses a special {STREAM}
term to reference the incoming data.
The SQL statement is then added to the sql_transform
object within the settings
object in the transform.
{
"name": "transform",
"description": "Demo SQL Transform",
"type": "json",
"settings": {
"is_default": true,
"sql_transform": "SELECT splitByWhitespace(assumeNotNull(request))[1] as method, splitByWhitespace(assumeNotNull(request))[2] as path, splitByWhitespace(assumeNotNull(request))[3] as protocol, IPv4NumToStringClassC(toIPv4(assumeNotNull(remote_ip))) as remote_ip, * FROM {STREAM}"
}
}
In order to ensure the data is then written to the table the Transform needs to be updated with the new virtual columns generated by the SQL statement. For example:
{
"name": "path",
"datatype":{
"type": "string",
"source": {
"from_input_field": "sql_transform"
}
}
}
In the example the full transform is as follows:
{
"name": "transform",
"description": "Demo SQL Transform",
"type": "json",
"settings": {
"is_default": true,
"sql_transform": "SELECT splitByWhitespace(assumeNotNull(request))[1] as method, splitByWhitespace(assumeNotNull(request))[2] as path, splitByWhitespace(assumeNotNull(request))[3] as protocol, IPv4NumToStringClassC(toIPv4(assumeNotNull(remote_ip))) as remote_ip, * FROM {STREAM}",
"output_columns": [
{
"name": "timestamp",
"datatype": {
"type": "datetime",
"format": "2006-01-02 15:04:05.000000",
"primary": true,
"resolution": "ms"
}
},
{
"name": "remote_ip",
"datatype": {
"type": "string"
}
},
{
"name": "request",
"datatype": {
"type": "string"
}
},
{
"name": "response",
"datatype": {
"type": "uint32"
}
},
{
"name": "bytes",
"datatype":{
"type": "uint64"
}
},
{
"name": "referrer",
"datatype":{
"type": "string"
}
},
{
"name": "agent",
"datatype":{
"type": "string"
}
},
{
"name": "method",
"datatype":{
"type": "string",
"source": {
"from_input_field": "sql_transform"
}
}
},
{
"name": "path",
"datatype":{
"type": "string",
"source": {
"from_input_field": "sql_transform"
}
}
},
{
"name": "protocol",
"datatype":{
"type": "string",
"source": {
"from_input_field": "sql_transform"
}
}
}
],
"compression": "none",
"format_details": {
"flattening": {
"active": false,
"map_flattening_strategy": null,
"slice_flattening_strategy": null
}
}
}
}
After transforming and indexing the data the result looks like the following:
{
"agent": "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/38.0.2125.104 Safari/537.36",
"bytes": "26318005",
"method": "GET",
"path": "/downloads/product_2",
"protocol": "HTTP/1.1",
"referrer": "-",
"remote_ip": "114.80.245.xxx",
"request": "GET /downloads/product_2 HTTP/1.1",
"response": 200,
"timestamp": "2022-05-31 14:41:52.000"
}
Example removing rows based on pattern
Another common example is to be able to remove rows based on specific pattern.
Let's say you want to avoid indexing rows where User-Agent
contains *bot*
we can express this with the following statement:
SELECT splitByWhitespace(assumeNotNull(request))[1] as method,
splitByWhitespace(assumeNotNull(request))[2] as path,
splitByWhitespace(assumeNotNull(request))[3] as protocol,
IPv4NumToStringClassC(toIPv4(assumeNotNull(remote_ip))) as remote_ip,
*
FROM {STREAM}
WHERE agent NOT ILIKE '%bot%'
This will do the split we created previously but won't select / index rows containing *bot*
.
Updated about 2 months ago