Home » Blog » CnosDB Real-time Streaming: Optimizing Time Series Data Processing and Downsampling Solutions

CnosDB Real-time Streaming: Optimizing Time Series Data Processing and Downsampling Solutions

When processing time series data, the data write cycle is usually related to the frequency of the data collection device, sometimes requiring the processing of a large number of data points every second. Prolonged processing of such large volumes of data can lead to storage issues. An effective solution is to use streaming to downsample the raw data.

In time series databases, streaming refers to the computation and processing of real-time data streams, allowing for immediate calculation results as data arrives. Through streaming, we can process data in real time, performing operations such as downsampling, aggregation, filtering, etc., thereby reducing data storage requirements and obtaining real-time statistical information.

The basic principle of streaming is to process real-time data streams to generate the required calculation results. For example, it can aggregate minute-level data points into hour-level data points. The aggregation process can use various statistical methods such as maximum, average, sum, etc.

CnosDB provides STREAM-type tables for handling streaming data computations. Stream queries require the SELECT function to include the GROUP BY time() clause. With stream queries, we can process data in real time and store the results in a target table. This concept is similar to materialized views in other databases.

The syntax for creating a CnosDB Stream table is as follows:

A source table is required, and the STREAM table does not support ALTER.

CREATE STREAM TABLE [IF NOT EXISTS] table_name[(field_definition [, field_definition] ...)]
    WITH (db = 'db_name', table = 'table_name', event_time_column = 'time_column')
    engine = tskv;

field_definition: 
    column_name data_type

Below is a diagram illustrating the downsampling process in streaming:

 

Implementation Steps

Prerequisites:

You need a raw table for stream computing. Data written to the raw table is received in real-time by the stream table view. The stream computing tasks established in the stream table will execute at regular intervals, process the data from the raw table, and write it to the target table.

The following example uses sample data from the CnosDB Quick Start:

\w oceanic_station.txt

 

1. Create a Stream Table View

First, you need to create a stream table view air_stream, which reads data from the raw table air and is used in the stream query.

CREATE STREAM TABLE air_stream(
    time TIMESTAMP,
    station STRING,
    pressure DOUBLE,
    temperature DOUBLE,
    visibility DOUBLE
) 
WITH (
    db = 'oceanic_station', 
    table = 'air', 
    event_time_column = 'time'
)
engine = tskv;

2. Create a Target Table

Next, create a target table air_down_sampling_1hour to store the downsampled data.

CREATE TABLE air_down_sampling_1hour(
    max_pressure DOUBLE, 
    avg_temperature DOUBLE, 
    sum_temperature DOUBLE, 
    count_pressure BIGINT, 
    TAGS(station)
);

3. Execute a Stream Query and Insert the Results into the Target Table

Use a stream query to insert the downsampled data into the target table. This query calculates the maximum pressure, average temperature, total temperature, and row count every hour.

INSERT INTO air_down_sampling_1hour(
    time, 
    station, 
    max_pressure, 
    avg_temperature, 
    sum_temperature, 
    count_pressure
) 
SELECT 
    date_bin(INTERVAL '1' HOUR, time, TIMESTAMP '2023-01-14T16:00:00') time, 
    station, 
    MAX(pressure) max_pressure, 
    AVG(temperature) avg_temperature, 
    SUM(temperature) sum_temperature, 
    COUNT(pressure) count_pressure 
FROM air_stream 
GROUP BY date_bin(INTERVAL '1' HOUR, time, TIMESTAMP '2023-01-14T16:00:00'), station;

4. Query the Downsampled Results

After data is written into the raw table, the tasks in the stream table view can calculate in real-time and write the results into the target table air_down_sampling_1hour. You can then check the downsampled data.

SELECT * FROM air_down_sampling_1hour LIMIT 10;

The above query will return the following results:


    +---------------------+------------+--------------+-----------------+-----------------+----------------+
    | time                | station    | max_pressure | avg_temperature | sum_temperature | count_pressure |
    +---------------------+------------+--------------+-----------------+-----------------+----------------+
    | 2023-01-14T16:00:00 | XiaoMaiDao | 80.0         | 68.05           | 1361.0          | 20             |
    | 2023-01-14T17:00:00 | XiaoMaiDao | 79.0         | 63.75           | 1275.0          | 20             |
    | 2023-01-14T18:00:00 | XiaoMaiDao | 79.0         | 66.35           | 1327.0          | 20             |
    | 2023-01-14T19:00:00 | XiaoMaiDao | 78.0         | 68.05           | 1361.0          | 20             |
    | 2023-01-14T20:00:00 | XiaoMaiDao | 80.0         | 64.35           | 1287.0          | 20             |
    | 2023-01-14T21:00:00 | XiaoMaiDao | 77.0         | 61.05           | 1221.0          | 20             |
    | 2023-01-14T22:00:00 | XiaoMaiDao | 80.0         | 64.8            | 1296.0          | 20             |
    | 2023-01-14T23:00:00 | XiaoMaiDao | 80.0         | 66.35           | 1327.0          | 20             |
    | 2023-01-15T00:00:00 | XiaoMaiDao | 80.0         | 65.15           | 1303.0          | 20             |
    | 2023-01-15T01:00:00 | XiaoMaiDao | 80.0         | 69.55           | 1391.0          | 20             |
    +---------------------+------------+--------------+-----------------+-----------------+----------------+