In today's data technology ecosystem, real-time data processing has become an indispensable part of many enterprises. To meet this demand, the combination of open-source tools like Apache Flink, Apache Kafka, and CnosDB has emerged, making the collection, processing, and storage of real-time data streams more efficient and reliable. This article will introduce how to use Flink, Kafka, and CnosDB to build a robust real-time data processing pipeline.
Flink, Kafka and CnosDB
- Flink: a powerful stream processing engine that supports event-driven, distributed, and fault-tolerant processing. Flink can handle high throughput and low-latency real-time data streams, making it suitable for various use cases such as data analysis, real-time reporting, and recommendation systems.
- Kafka: a high-throughput distributed stream data platform used for collecting, storing, and transmitting real-time data streams. Kafka offers strong durability, scalability, and fault tolerance, making it suitable for building reliable pipelines for real-time data streams.
- CnosDB: an open-source time-series database designed for time-series data. It features high performance, high availability, and ease of use, making it an excellent choice for storing real-time generated time-series data such as sensor data, logs, and monitoring data.
Use Case Scenario
In this use case, let's assume there is an Internet of Things (IoT) device network, where each device regularly generates sensor data, including temperature, humidity, and pressure, among others. We want to collect, process, and store this data in real-time for real-time monitoring and analysis.
The data flow architecture diagram is as follows:
- Firstly, we need to set up a data collector to retrieve sensor data and send it to a Kafka topic. This can be achieved by writing a producer application that will send the generated sensor data to Kafka.
- Use Flink for real-time processing of sensor data. To begin with, you need to write a Flink application that subscribes to the data stream in the Kafka topic and performs real-time processing and transformations on the data. For example, you can calculate the average temperature, maximum humidity, and so on.
- Store the processed data in CnosDB for subsequent queries. To accomplish this step, configure a CnosDB Sink so that the Flink application can write the processed data to CnosDB.
Building the Pipeline
1.Data Collection and Transmission
Write a producer application that reads sensor data and sends it to the Kafka topic.
public class SensorDataProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
while (true) {
SensorData data = generateSensorData(); // Generate Monitor data
producer.send(new ProducerRecord<>("sensor-data-topic", data));
Thread.sleep(1000); // Send data every second
}
}
}
2.Intime Processing and Transformation
Complie a Flink App to subscribe to the data stream in the Kafka topic, perform real-time processing, and transform the data.
// Flink App Example
public class SensorDataProcessingJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.setProperty("bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092");
props.setProperty("group.id", "sensor-data-consumer-group");
DataStream<String> sensorData = env.addSource(new FlinkKafkaConsumer<>("sensor-data-topic", new SimpleStringSchema(), props));
DataStream<ProcessedData> processedData = sensorData
.map(json -> parseJson(json)) // Parse JSON data
.keyBy(ProcessedData::getDeviceId)
.window(TumblingEventTimeWindows.of(Time.seconds(10))) // sliding window of 10-second
.apply(new SensorDataProcessor()); // Self define the processing logic
processedData.print(); // print processed data, which can be replaced by writing into CnosDB
env.execute("SensorDataProcessingJob");
}
}
3.Write and Store Data
Configure the CnosDB Sink to replace ‘processedData.print()' with a program that writes to CnosDB, creating a database in CnosDB with a data retention period of 30 days:
For information on the syntax for creating a database in CnosDB, please refer to: Creating a Database [https://docs.cnosdb.com/zh/latest/reference/sql.html#创建数据库]
CREATE DATABASE IF NOT EXISTS "db_flink_test" WITH TTL '30d' SHARD 2 VNODE_DURATION '1d' REPLICA 2;
In Maven [https://maven.apache.org/] use CnosBD Sink [https://docs.cnosdb.com/zh/latest/reference/connector/flink-connector-cnosdb.html] package:
<dependency>
<groupId>com.cnosdb</groupId>
<artifactId>flink-connector-cnosdb</artifactId>
<version>1.0</version>
</dependency>
Complie the App:
public class WriteToCnosDBJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.setProperty("bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092");
props.setProperty("group.id", "sensor-data-consumer-group");
DataStream<String> sensorData = env.addSource(new FlinkKafkaConsumer<>("sensor-data-topic", new SimpleStringSchema(), props));
DataStream<ProcessedData> processedData = sensorData
.map((MapFunction<String, ProcessedData>) json -> parseJson(json)) // Parse JSON data
.keyBy(ProcessedData::getDeviceId)
.window(TumblingEventTimeWindows.of(Time.seconds(10))) // sliding window of 10-second
.apply(new SensorDataProcessor()); // Self define the processing logic
DataStream<CnosDBPoint> cnosDBDataStream = processedData.map(
new RichMapFunction<ProcessedData, CnosDBPoint>() {
@Override
public CnosDBPoint map(String s) throws Exception {
return new CnosDBPoint("sensor_metric")
.time(value.getTimestamp().toEpochMilli(), TimeUnit.MILLISECONDS)
.tag("device_id", value.getDeviceId())
.field("average_temperature", value.getAverageTemperature())
.field("max_humidity", value.getMaxHumidity());
}
}
);
CnosDBConfig cnosDBConfig = CnosDBConfig.builder()
.url("http://localhost:8902")
.database("db_flink_test")
.username("root")
.password("")
.build();
cnosDBDataStream.addSink(new CnosDBSink(cnosDBConfig));
env.execute("WriteToCnosDBJob");
}
}
Run and see the result:
db_flink_test ❯ select * from sensor_metric limit 10;
+---------------------+---------------+---------------------+--------------+
| time | device_id | average_temperature | max_humidity |
+---------------------+---------------+---------------------+--------------+
| 2023-01-14T17:00:00 | OceanSensor1 | 23.5 | 79.0 |
| 2023-01-14T17:05:00 | OceanSensor2 | 21.8 | 68.0 |
| 2023-01-14T17:10:00 | OceanSensor1 | 25.2 | 75.0 |
| 2023-01-14T17:15:00 | OceanSensor3 | 24.1 | 82.0 |
| 2023-01-14T17:20:00 | OceanSensor2 | 22.7 | 71.0 |
| 2023-01-14T17:25:00 | OceanSensor1 | 24.8 | 78.0 |
| 2023-01-14T17:30:00 | OceanSensor3 | 23.6 | 80.0 |
| 2023-01-14T17:35:00 | OceanSensor4 | 22.3 | 67.0 |
| 2023-01-14T17:40:00 | OceanSensor2 | 25.9 | 76.0 |
| 2023-01-14T17:45:00 | OceanSensor4 | 23.4 | 70.0 |
+---------------------+---------------+---------------------+--------------+
Summary
By combining Flink, Kafka, and CnosDB, you can build a robust real-time data processing pipeline, spanning from data collection to real-time processing and ultimately data storage and visualization. Each step involves specific configuration and code implementation, ensuring your familiarity with the features and operations of each tool. This architecture is suitable for various real-time data applications such as IoT monitoring, real-time reporting, and dashboards. Depending on your needs and context, adjust configurations and code to create a real-time data processing solution tailored to your business.