Data Enrichment

Leveraging SQL to modify, enrich and filter your data before indexing

Very often you need to modify data before it is stored. Hydrolix has the ability through an additional process that can be invoked through the transform being utilised for loading data.

To modify and enrich data, SQL statements can be utilized within the transform. These statements have access to all the functions, methods, operators and advanced features that can be found with the Query capabilities offered by Hydrolix. This includes more advanced features such as function and dictionary.

415415

Example splitting message into separate column

To best explain how this process works an example is provided below. The following raw data is supplied and requires enrichment/modification.

{
  "timestamp": "2022-05-31 14:41:52.000000",
  "remote_ip": "114.80.245.62",
  "remote_user": "-",
  "request": "GET /downloads/product_2 HTTP/1.1",
  "response": 200,
  "bytes": 26318005,
  "referrer": "-",
  "agent": "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/38.0.2125.104 Safari/537.36"
}

The enrichment and modification required includes:

  • The request column to be split into 3 separate columns- method, path and protocol.
  • For privacy purposes we need to obfuscate the remote_ip to remove the last octet.

With Real-Time Enrichment we have the ability to run an SQL query on this data.

SELECT  splitByWhitespace(assumeNotNull(request))[1] as method,
        splitByWhitespace(assumeNotNull(request))[2] as path,
        splitByWhitespace(assumeNotNull(request))[3] as protocol,
        IPv4NumToStringClassC(toIPv4(assumeNotNull(remote_ip))) as remote_ip,
        *
FROM {STREAM}

In the query above we use functions already available to operate on the raw data.

  • The splitByWhitespace splits up the request object in our raw data into its component parts separated by white space.
  • The IPv4NumToStringClassC and toIPv4 remove the last octet from the IPv4 address in the remote_ip object.

In addition the * has been added to the query to ensure all the columns are made available on top of the virtual ones that are created with our SELECT query.

Lastly, as the data is being processed as a stream the FROM portion of the statement uses a special {STREAM} term to reference the incoming data.

The SQL statement is then added to the sql_transform object within the settings object in the transform.

{
    "name": "transform",
    "description": "Demo SQL Transform",
    "type": "json",
    "settings": {
       "is_default": true,
       "sql_transform": "SELECT splitByWhitespace(assumeNotNull(request))[1] as method, splitByWhitespace(assumeNotNull(request))[2] as path, splitByWhitespace(assumeNotNull(request))[3] as protocol, IPv4NumToStringClassC(toIPv4(assumeNotNull(remote_ip))) as remote_ip, * FROM {STREAM}"
                }
}

In order to ensure the data is then written to the table the Transform needs to be updated with the new virtual columns generated by the SQL statement. For example:

{
    "name": "path",
    "datatype":{
        "type": "string",
        "source": {
            "from_input_field": "sql_transform"
        }
    }
}

In the example the full transform is as follows:

{
    "name": "transform",
    "description": "Demo SQL Transform",
    "type": "json",
    "settings": {
       "is_default": true,
       "sql_transform": "SELECT splitByWhitespace(assumeNotNull(request))[1] as method, splitByWhitespace(assumeNotNull(request))[2] as path, splitByWhitespace(assumeNotNull(request))[3] as protocol, IPv4NumToStringClassC(toIPv4(assumeNotNull(remote_ip))) as remote_ip, * FROM {STREAM}",
       "output_columns": [
          {
             "name": "timestamp",
             "datatype": {
                "type": "datetime",
                "format": "2006-01-02 15:04:05.000000",
                "primary": true,
                "resolution": "ms"
             }
          },
          {
             "name": "remote_ip",
             "datatype": {
                "type": "string"
             }
          },
          {
             "name": "request",
             "datatype": {
                "type": "string"
             }
          },
          {
             "name": "response",
             "datatype": {
                "type": "uint32"
             }
          },
          {
            "name": "bytes",
            "datatype":{
                "type": "uint64"
            }   
          },
          {
            "name": "referrer",
            "datatype":{
                "type": "string"
            }   
          },
          {
            "name": "agent",
            "datatype":{
                "type": "string"
            }   
          },
          {
            "name": "method",
            "datatype":{
                "type": "string",
                "source": {
                    "from_input_field": "sql_transform"
                }
            }
          },
          {
            "name": "path",
            "datatype":{
                "type": "string",
                "source": {
                    "from_input_field": "sql_transform"
                }
            }
          },
          {
            "name": "protocol",
            "datatype":{
                "type": "string",
                "source": {
                    "from_input_field": "sql_transform"
                }
            }
          }
        ],
        "compression": "none",
        "format_details": {
            "flattening": {
                "active": false,
                "map_flattening_strategy": null,
                "slice_flattening_strategy": null
            }
        }
    }
}

After transforming and indexing the data the result looks like the following:

{
      "agent": "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/38.0.2125.104 Safari/537.36",
      "bytes": "26318005",
      "method": "GET",
      "path": "/downloads/product_2",
      "protocol": "HTTP/1.1",
      "referrer": "-",
      "remote_ip": "114.80.245.xxx",
      "request": "GET /downloads/product_2 HTTP/1.1",
      "response": 200,
      "timestamp": "2022-05-31 14:41:52.000"
}

Example removing rows based on pattern

Another common example is to be able to remove rows based on specific pattern.
Let's say you want to avoid indexing rows where User-Agent contains *bot* we can express this with the following statement:

SELECT  splitByWhitespace(assumeNotNull(request))[1] as method,
        splitByWhitespace(assumeNotNull(request))[2] as path,
        splitByWhitespace(assumeNotNull(request))[3] as protocol,
        IPv4NumToStringClassC(toIPv4(assumeNotNull(remote_ip))) as remote_ip,
        *
FROM {STREAM}
WHERE agent NOT ILIKE '%bot%'

This will do the split we created previously but won't select / index rows containing *bot*.