Data Enrichment

Leveraging SQL to modify, enrich and filter your data before indexing

Very often you need to modify data before it is stored. Hydrolix has the ability through an additional process that can be invoked through the transform being utilised for loading data.

To modify and enrich data, SQL statements can be utilized within the transform. These statements have access to all the functions, methods, operators and advanced features that can be found with the Query capabilities offered by Hydrolix. This includes more advanced features such as Custom Functions and Custom Dictionaries .

Example splitting message into separate column

To best explain how this process works an example is provided below. The following raw data is supplied and requires enrichment/modification.

{
  "timestamp": "2022-05-31 14:41:52.000000",
  "remote_ip": "114.80.245.62",
  "remote_user": "-",
  "request": "GET /downloads/product_2 HTTP/1.1",
  "response": 200,
  "bytes": 26318005,
  "referrer": "-",
  "agent": "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/38.0.2125.104 Safari/537.36"
}

The enrichment and modification required includes:

  • The request column to be split into 3 separate columns- method, path and protocol.
  • For privacy purposes we need to obfuscate the remote_ip to remove the last octet.

With Real-Time Enrichment we have the ability to run an SQL query on this data.

SELECT  splitByWhitespace(assumeNotNull(request))[1] as method,
        splitByWhitespace(assumeNotNull(request))[2] as path,
        splitByWhitespace(assumeNotNull(request))[3] as protocol,
        IPv4NumToStringClassC(toIPv4(assumeNotNull(remote_ip))) as remote_ip,
        *
FROM {STREAM}

In the query above we use functions already available to operate on the raw data.

  • The splitByWhitespace splits up the request object in our raw data into its component parts separated by white space.
  • The IPv4NumToStringClassC and toIPv4 remove the last octet from the IPv4 address in the remote_ip object.

In addition the * has been added to the query to ensure all the columns are made available on top of the virtual ones that are created with our SELECT query.

Lastly, as the data is being processed as a stream the FROM portion of the statement uses a special {STREAM} term to reference the incoming data.

The SQL statement is then added to the sql_transform object within the settings object in the transform.

{
    "name": "transform",
    "description": "Demo SQL Transform",
    "type": "json",
    "settings": {
       "is_default": true,
       "sql_transform": "SELECT splitByWhitespace(assumeNotNull(request))[1] as method, splitByWhitespace(assumeNotNull(request))[2] as path, splitByWhitespace(assumeNotNull(request))[3] as protocol, IPv4NumToStringClassC(toIPv4(assumeNotNull(remote_ip))) as remote_ip, * FROM {STREAM}"
                }
}

In order to ensure the data is then written to the table the Transform needs to be updated with the new virtual columns generated by the SQL statement. For example:

{
    "name": "path",
    "datatype":{
        "type": "string",
        "source": {
            "from_input_field": "sql_transform"
        }
    }
}

In the example the full transform is as follows:

{
    "name": "transform",
    "description": "Demo SQL Transform",
    "type": "json",
    "settings": {
       "is_default": true,
       "sql_transform": "SELECT splitByWhitespace(assumeNotNull(request))[1] as method, splitByWhitespace(assumeNotNull(request))[2] as path, splitByWhitespace(assumeNotNull(request))[3] as protocol, IPv4NumToStringClassC(toIPv4(assumeNotNull(remote_ip))) as remote_ip, * FROM {STREAM}",
       "output_columns": [
          {
             "name": "timestamp",
             "datatype": {
                "type": "datetime",
                "format": "2006-01-02 15:04:05.000000",
                "primary": true,
                "resolution": "ms"
             }
          },
          {
             "name": "remote_ip",
             "datatype": {
                "type": "string"
             }
          },
          {
             "name": "request",
             "datatype": {
                "type": "string"
             }
          },
          {
             "name": "response",
             "datatype": {
                "type": "uint32"
             }
          },
          {
            "name": "bytes",
            "datatype":{
                "type": "uint64"
            }   
          },
          {
            "name": "referrer",
            "datatype":{
                "type": "string"
            }   
          },
          {
            "name": "agent",
            "datatype":{
                "type": "string"
            }   
          },
          {
            "name": "method",
            "datatype":{
                "type": "string",
                "source": {
                    "from_input_field": "sql_transform"
                }
            }
          },
          {
            "name": "path",
            "datatype":{
                "type": "string",
                "source": {
                    "from_input_field": "sql_transform"
                }
            }
          },
          {
            "name": "protocol",
            "datatype":{
                "type": "string",
                "source": {
                    "from_input_field": "sql_transform"
                }
            }
          }
        ],
        "compression": "none",
        "format_details": {
            "flattening": {
                "active": false,
                "map_flattening_strategy": null,
                "slice_flattening_strategy": null
            }
        }
    }
}

After transforming and indexing the data the result looks like the following:

{
      "agent": "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/38.0.2125.104 Safari/537.36",
      "bytes": "26318005",
      "method": "GET",
      "path": "/downloads/product_2",
      "protocol": "HTTP/1.1",
      "referrer": "-",
      "remote_ip": "114.80.245.xxx",
      "request": "GET /downloads/product_2 HTTP/1.1",
      "response": 200,
      "timestamp": "2022-05-31 14:41:52.000"
}

πŸ“˜

CSV file as data source

In the example above, the file type is JSON, however when the file type is CSV the source should be empty when creating the transform. The name needs to match the name in the SQL transform.

{
    "name": "path",
    "datatype":{
        "type": "string"
    }
}

Example removing rows based on pattern

Another common example is to be able to remove rows based on specific pattern.
Let's say you want to avoid indexing rows where User-Agent contains *bot* we can express this with the following statement:

SELECT  splitByWhitespace(assumeNotNull(request))[1] as method,
        splitByWhitespace(assumeNotNull(request))[2] as path,
        splitByWhitespace(assumeNotNull(request))[3] as protocol,
        IPv4NumToStringClassC(toIPv4(assumeNotNull(remote_ip))) as remote_ip,
        *
FROM {STREAM}
WHERE agent NOT ILIKE '%bot%'

This will do the split we created previously but won't select / index rows containing *bot*.

Example splitting of key value pair string

From time to time end users are indexing string which are key value pair, a good example is query string from a request:
"querystring": "version=1&user=anonymous&cache=false"

Technically we could index the whole querystring as string directly, but searching might be difficult.

The following function extract_key_pair will split the string into a map of key pair value:

(string, pair_delimiters, key_value_pair_delimiter) -> CAST((arrayMap(y -> trimLeft((y[1])), arrayMap(v -> splitByChar(key_value_pair_delimiter, v), flatten(extractAllGroups(assumeNotNull(string), concat('([^', pair_delimiters, ']+)'))))), arrayMap(y -> trimLeft((y[2])), arrayMap(v -> splitByChar(key_value_pair_delimiter, v), flatten(extractAllGroups(assumeNotNull(string), concat('([^', pair_delimiters, ']+)')))))), 'Map(String, String)')

This function takes 3 parameters, the string you want to split, the separator between the different key/pair and finally the separator between the key/value.

Going back to the querystring example we could use the function:

SELECT $project_extract_key_pair(querystring, '&', '=') as querymap

Result will be:

β”Œβ”€querymap───────────────────────────────────────────┐
β”‚ {'version':'1','user':'anonymous','cache':'false'} β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

🚧

MAP values require the same type

By default map value will be indexed as string

Going back to our main example:

{
  "timestamp": "2022-05-31 14:41:52.000000",
  "remote_ip": "114.80.245.62",
  "remote_user": "-",
  "request": "GET /downloads/product_2 HTTP/1.1",
  "response": 200,
  "bytes": 26318005,
  "referrer": "-",
  "querystring": "version=1&user=anonymous&cache=false",
  "agent": "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/38.0.2125.104 Safari/537.36"
}

We can now use the following SQL transform:

SELECT  splitByWhitespace(assumeNotNull(request))[1] as method,
        splitByWhitespace(assumeNotNull(request))[2] as path,
        splitByWhitespace(assumeNotNull(request))[3] as protocol,
        IPv4NumToStringClassC(toIPv4(assumeNotNull(remote_ip))) as remote_ip,
        $project_extract_key_pair(querystring, '&', '=') as querymap,
        *
FROM {STREAM}
WHERE agent NOT ILIKE '%bot%'

And to include the querymap you need to add the following output column in your transform:

{
  "name": "querystring",
  "datatype": {
    "type": "string"
  }
},
{
  "name": "querymap",
  "datatype": {
    "type": "map",
    "index": true,
    "source": {
      "from_input_field": "sql_transform"
    },
    "elements": [
      {
        "type": "string",
        "index": true
      },
      {
        "type": "string",
        "index": true
      }
    ]
  }
}

❗️

Import the function

In order for this to work, you need to import the function into your project and replace the $projectname in the SQL statement with your project