Hydrolix HTTP API to Google Pub/Sub in Go

Export a stream of data from Hydrolix to Google Pub/Sub with a small Go program

Overview

This article shows one method to copy a data stream from a Hydrolix table to Google Pub/Sub, which can then output the data to a multitude of systems. Our example uses the Hydrolix HTTP API, and is written in Go. It’s a small program that contains a loop to pull data from the Hydrolix HTTP API, and then send the data to Google Pub/Sub.

The complete example is located here in Hydrolix’s GitHub account. It will most likely need to be changed to accommodate your particular needs. If you’re experienced with Pub/Sub and HTTP APIs, that commented example will be enough to get you started with your own implementation.

The largest raw tables in your Hydrolix cluster might be prohibitively expensive to copy. However, this can be a good approach for low-throughput tables and summary tables. Hydrolix’s summary tables are a powerful tool to provide aggregated data for dashboarding, security analysis, system observability, or other specialized systems. Both raw and summarized data are discussed below.

Setup

Sending data from your Hydrolix cluster to other systems requires planning and configuration.

Hydrolix

You’ll need a running Hydrolix cluster. If you don’t have one, sign up for a trial here: https://hydrolix.io/trial. Also, you’ll need a user account on that cluster with permission to read from tables.

Google Pub/Sub

A Google Cloud account is needed to do anything with Pub/Sub. In the GCP web UI, find the Pub/Sub service. As of this writing, it’s in the left-hand navigation under “More Products → Analytics.” Create a topic, then create a subscription to that topic to monitor incoming messages in the UI.

Set up the Pub/Sub client library for Go by following Google’s setup instructions. This will ensure the Go functions are available to you so you’re ready to send messages.

Exporter

You’ll need a host to run this example. This could be your local workstation (for demo purposes), or other infrastructure, such as Google Compute Engine. It will need the Go runtime installed. We used Go version 1.22.1 to create this example.

You’ll launch the exporter with these command-line switches, so record this information for easy reference:

Command-Line SwitchDescriptionExample
hdx_urlThe URL of your Hydrolix cluster’s HTTP query interfacehttp://your.hostname.com/query
hdx_tableThe name of your source Hydrolix table in “project.table” notationsample_project.my_raw_table
hdx_userThe username with read access to the Hydrolix source table

Alternatively, use the hdx_token below
export_reader
hdx_passwordThe password of the user with read access to the Hydrolix source table

Alternatively, use the hdx_token below
Exp0RT#read3_
hdx_token24-hour token that allows use of the Hydrolix HTTP API

Alternatively, use the hdx_user and hdx_password above
gcp_projectThe name of the Google Cloud Project that holds your Pub/Sub resourcesmy_GCP_project
gcp_topicThe Google Pub/Sub topic namemy_GCP_topic

The sample can be run like this:

% go run main.go \\
    --gcp\_project <your-project-name> \\
    --gcp\_topic <your-topic-name> \\
    —-hdx\_url https://<yourcluster.domain.name>/query \\
    --hdx\_table <your\_project.your\_raw\_table> \\
    --hdx\_token $HDX\_TOKEN

Demo Script Walkthrough

Learn how to pull data from Hydrolix and push data to Pub/Sub by taking a closer look at the demo program. Here’s a description of the parts that might raise questions:

The Hydrolix HTTP API

Hydrolix has an extensive API to allow query, configuration, and ingestion of data. This example uses the HTTP Query API to issue SQL queries and receive the data via normal HTTPS POSTs. (Hydrolix has a ClickHouse-compatible API as well, but that’s for another example.)

The hdxRequest method in our sample holds few surprises; it sets authentication headers and then performs an HTTP POST, returning the response body. Taking a closer look at the authentication:

   // Set the API token or Username:Password
   if hdxAPIToken != "" {
       req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", hdxAPIToken))
   } else {
       req.SetBasicAuth(hdxAPIUser, hdxAPIPassword)
   }

We’ve prioritized the token in this case, since it’s more compatible with good security practices. The Hydrolix OAuth2-compatible Bearer token expires every 24 hours, unlike usernames and passwords.

📘

Need to get a fresh token?

Here’s a bash/zsh one-liner that will populate an environment variable called $HDX_TOKEN for later use on the command line.

Get the bearer token, which is good for the next 24 hours, to authenticate future API calls. This command assumes you've set the $HDX_HOSTNAME, $HDX_USER and $HDX_PASSWORD environment variables:

export HDX_TOKEN=$(
  curl -v -X POST -H "Content-Type: application/json" \
  https://$HDX_HOSTNAME/config/v1/login/ \
  -d "{
    \"username\":\"$HDX_USER\",
    \"password\":\"$HDX_PASSWORD\"  
  }" | jq -r ".auth_token.access_token"
)

If there is no token available, it falls back to basic HTTP authentication, using the username and password supplied.

Getting the Pub/Sub Topic

The code in the example looks for the topic using the client.Topic function from the Go client library:

   topic := client.Topic(gcpTopicName)

If it can’t find that topic, it will attempt to create it using client.CreateTopic:

   topic, err := client.CreateTopic(ctx, gcpTopicName)

If it creates a new topic, remember that a new subscription is needed to receive messages from that topic.

The SQL Query

We need the table name and the duration of each loop to create the SQL queries necessary. For this example, we’ve chosen to pull the amount of data that will be sent every five seconds.

For non-summary tables, the example constructs queries like this:

SELECT * FROM <your_project>.<your_raw_table>
  WHERE timestamp >= '2024-04-03 17:25:10.0509'
  AND timestamp < '2024-04-03 17:25:15.0509'
  ORDER BY timestamp DESC
  FORMAT JSONEachRow

The FORMAT JSONEachRow specifier produces a compact message that can be interpreted by the Google Pub/Sub UI.

Note that in high-volume scenarios, the duration of the time window should be made shorter. This is controlled by the optional loop_delay command-line switch. The calculateSQLQuery function makes sure the dates are formatted properly and returns the SQL query string.

// calculateSQLQuery creates the SQL query needed for the current time frame
func calculateSQLQuery(start, end time.Time, table string) (string, time.Time) {
   startString := start.Format(dateFormatString)
   endString := end.Format(dateFormatString)
   queryString := fmt.Sprintf("SELECT \* FROM %s WHERE timestamp >= '%s' AND timestamp < '%s' ORDER BY timestamp DESC FORMAT JSONEachRow", table, startString, endString)
   log.Println(queryString)
   return queryString, end
}

That last line returns the string and the end time so it’s easy to set the next time windows start time from this call.

Also notice that the SQL query above selects all fields from the table. That’s fine for this demonstration, but for a high-volume table, you should narrow your query down to specific fields to increase the performance of the query and to economize when sending data to Google Pub/Sub. Naming fields is required for summary tables, described below.

Running the Example

The example is now ready to go. Give the Pub/Sub client library the login information it needs with this command:

gcloud auth application-default login

The Google Pub/Sub client library will use information generated by the above command to authenticate with Google when you run the example.

Then, make sure you have data flowing into your table and run the program:

go run main.go \
        --gcp_project <your-project-name> \
        --gcp_topic <your-topic-name> \
        --hdx_url https://<yourcluster.domain.name>/query \
        --hdx_table <your_project.your_raw_table> \
        --hdx_token $HDX_TOKEN

Summarized Data

Transferring summarized data rather than your raw data can reduce the amount of data needed for transfer, saving costs at the destination while still giving your data consumers what they need. The costs saved can include;

  • data egress from the cloud provider you use to host your Hydrolix database,
  • VPC transit costs, depending on your infrastructure,
  • storage costs at the destination, and
  • CPU costs at the destination to support querying and processing the extra data.

SQL queries for summarized data are different from raw table queries. The query depends on the summary table aliases and the summary transform belonging to the table.

The summary table and transform used for this demonstration is from the Aggregating Data page. To support the minute-based aggregation, it requires specific names for fields, a GROUP BY clause, and a different date format. Two modifications needed to be made to the example Go program.

Modify the “timestamp” field to “minute” in the query string, and list the field aliases you want:

      queryString := fmt.Sprintf("SELECT minute, sumcost, avgtax, \_95th FROM %s WHERE minute >= '%s' AND minute < '%s' GROUP BY minute ORDER BY minute DESC FORMAT JSONEachRow", table, startString, endString)

To accommodate the new time format, adjust the date formatting, changing:

   dateFormatString            = "2006-01-02 15:04:05.0000"

...to:

   dateFormatString            = "2006-01-02 15:04:05"

In this case, set the loop_delay to one minute or higher via the –loop-delay parameter. This will keep the program from sending empty Pub/Sub messages to Google.

% go run main.go \
    --gcp_project <your-project-name> \
    --gcp_topic <your-topic-name> \
    --hdx_url https://yourcluster.domain.name/query \
    --hdx_table <your_project>.<your_summary_table> \
    --hdx_token $HDX_TOKEN \
    --loop_delay 60s

Other Considerations

Duplicate Data

Hydrolix tables don’t guarantee a unique timestamp or unique monotonically increasing ID. There’s no way to have a ‘cursor’ that’s consistent between queries. This means that it’s difficult to guarantee that no duplicate data will be returned from these queries.

The solution here is not to use the handy SQL BETWEEN logical operator, but explicitly state the right end of the date range will be “right-closed” with comparison operators – it will include data from the start time, but not include data from the end time. In the example below, there’s a possibility that the timestamp could equal the start time '2024-04-04 00:00:22.8127', but not the end time  '2024-04-04 00:00:27.8127'.

    WHERE timestamp >= '2024-04-04 00:00:22.8127'

       AND timestamp < '2024-04-04 00:00:27.8127'

The end time will be included in the next query.

Gaps in Data

These SQL queries should be performed after you’re certain the data in this time frame actually exists in the database. Often this is difficult to pinpoint – data from distributed systems can arrive much later than expected. One of Hydrolix’s best features is the ability to handle late-arriving data while maintaining a short query response time.

Once you have an estimate of the maximum delay incoming data can have, subtract this time offset to your SQL timestamps. This will help ensure your data is complete, but will add latency to any reports at your destination.

   // Back-date the dates in our query to ensure all data has been received
   // before the query is executed.
   start := time.Now().Add(-queryInitialDelay).UTC()

Our example code uses the --initial_delay parameter to specify this offset, which defaults to ten seconds, or 10s. Using the raw table example above, this command give the data an extra 42 seconds to be written to the Hydrolix database:

% go run main.go \
    --gcp_project <your-project-name> \
    --gcp_topic <your-topic-name> \
    —-hdx_url https://<yourcluster.domain.name>/query \
    --hdx_table <your_project.your_raw_table> \
    --hdx_token $HDX_TOKEN \
    —-initial_delay=42s

Overflow

The main loop of the sample creates the query, pulls the data, and publishes it once during the duration specified in the --loop-delay parameter. What happens if all that work takes longer than the loop duration? There are a few options:

  • Warn the user that this is happening and do nothing (the approach we used for this demonstration)
  • Handle the work asynchronously. You could remove the blocking result.Get(ctx) call and work out a way for the ticker loop to spawn goroutines to actually do the work. In this case, you’d need to monitor the number of goroutines.

Viewing the Data

The data will be sent to your Google Pub/Sub topic for viewing and forwarding to consumers. View the messages containing the data in the Google UI by navigating to the subscription you created, selecting the “MESSAGES” tab, then clicking on the “PULL” button.

Note that with Pub/Sub, there is no guarantee that the messages will be delivered immediately, or in order.

Once your messages are in Google Pub/Sub, they can be sent to your destination. If you need to transform your messages before they get to the source, Pub/Sub allows user-defined JavaScript functions to modify message formats before transmission.