使用Apache Flink连接TDengine
小 T 导读:想用 Flink 对接 TDengine?保姆级教程来了。
0.前言
TDengine 是由涛思数据开发并开源的一款高性能、分布式、支持 SQL 的时序数据库(Time-Series Database)。
除了核心的时序数据库功能外,TDengine 还提供缓存、数据订阅、流式计算等大数据平台所需要的系列功能。但是很多小伙伴出于架构的考虑,还是需要将数据导出到 Apache Flink、Apache Spark 等平台进行计算分析。
为了帮助大家对接,我们特别推出了保姆级课程,包学包会。
1.技术实现
Apache Flink 提供了 SourceFunction 和 SinkFunction,用来提供 Flink 和外部数据源的连接,其中 SouceFunction 为从数据源读取数据,SinkFunction 为将数据写入数据源。 与此同时,Flink 提供了 RichSourceFunction 和 RichSinkFunction 这两个类(继承自AbstractRichFunction),提供了额外的初始化(open(Configuration))和销毁方法(close())。 通过重写这两个方法,可以避免每次读写数据时都重新建立连接。
2.代码实现
完整源码:https://github.com/liuyq-617/TD-Flink
代码逻辑:
1) 自定义类 SourceFromTDengine
用途:数据源连接,数据读取
package com.taosdata.flink;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import com.taosdata.model.Sensor;import java.sql.*;import java.util.Properties;public class SourceFromTDengine extends RichSourceFunction<Sensor> {Statement statement;private Connection connection;private String property;public SourceFromTDengine(){super();}@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);String driver = "com.taosdata.jdbc.rs.RestfulDriver";String host = "u05";String username = "root";String password = "taosdata";String prop = System.getProperty("java.library.path");Logger LOG = LoggerFactory.getLogger(SourceFromTDengine.class);LOG.info("java.library.path:{}", prop);System.out.println(prop);Class.forName( driver );Properties properties = new Properties();connection = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/tt" + "?user=root&password=taosdata", properties);statement = connection.createStatement();}@Overridepublic void close() throws Exception {super.close();if (connection != null) {connection.close();}if (statement != null) {statement.close();}}@Overridepublic void run(SourceContext<Sensor> sourceContext) throws Exception {try {String sql = "select * from tt.meters";ResultSet resultSet = statement.executeQuery(sql);while (resultSet.next()) {Sensor sensor = new Sensor( resultSet.getLong(1),resultSet.getInt( "vol" ),resultSet.getFloat( "current" ),resultSet.getString( "location" ).trim());sourceContext.collect( sensor );}} catch (Exception e) {e.printStackTrace();}}@Overridepublic void cancel() {}}2) 自定义类 SinkToTDengine
用途:数据源连接,数据写入
SinkToTDengine
package com.taosdata.flink;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import com.taosdata.model.Sensor;import java.sql.*;import java.util.Properties;public class SinkToTDengine extends RichSinkFunction<Sensor> {Statement statement;private Connection connection;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);String driver = "com.taosdata.jdbc.rs.RestfulDriver";String host = "TAOS-FQDN";String username = "root";String password = "taosdata";String prop = System.getProperty("java.library.path");System.out.println(prop);Class.forName( driver );Properties properties = new Properties();connection = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/tt" + "?user=root&password=taosdata", properties);statement = connection.createStatement();}@Overridepublic void close() throws Exception {super.close();if (connection != null) {connection.close();}if (statement != null) {statement.close();}}@Overridepublic void invoke(Sensor sensor, Context context) throws Exception {try {String sql = String.format("insert into sinktest.%s using sinktest.meters tags('%s') values(%d,%d,%f)",sensor.getLocation(),sensor.getLocation(),sensor.getTs(),sensor.getVal(),sensor.getCurrent());statement.executeUpdate(sql);} catch (Exception e) {e.printStackTrace();}}}3) 自定义类 Sensor
用途:定义数据结构,用来接受数据
package com.taosdata.model;public class Sensor {public long ts;public int val;public float current;public String location;public Sensor() {}public Sensor(long ts, int val, float current, String location) {this.ts = ts;this.val = val;this.current = current;this.location = location;}public long getTs() {return ts;}public void setTs(long ts) {this.ts = ts;}public int getVal() {return val;}public void setVal(int val) {this.val = val;}public float getCurrent() {return current;}public void setCurrent(float current) {this.current = current;}public String getLocation() {return location;}public void setLocation(String location) {this.location = location;}@Overridepublic String toString() {return "Sensor{" +"ts=" + ts +", val=" + val +", current=" + current +", location='" + location + '\'' +'}';}}4) 主程序类 ReadFromTDengine
用途:调用 Flink 进行读取和写入数据
package com.taosdata;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.datastream.DataStream;import com.taosdata.model.Sensor;import org.slf4j.LoggerFactory;import org.slf4j.Logger;public class ReadFromTDengine {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Sensor> SensorList = env.addSource( new com.taosdata.flink.SourceFromTDengine() );SensorList.print();SensorList.addSink( new com.taosdata.flink.SinkToTDengine() );env.execute();}}3.简单测试 RESTful 接口
1) 环境准备:
a) Flink 安装&启动:
wget https://dlcdn.apache.org/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.12.tgz
tar zxf flink-1.14.3-bin-scala_2.12.tgz -C /usr/local
/usr/local/flink-1.14.3/bin/start-cluster.sh
b) TDengine Database 环境准备:
创建原始数据:
create database tt;
create table `meters` (`ts` TIMESTAMP,`vol` INT,`current` FLOAT) TAGS (`location` BINARY(20));
insert into beijing using meters tags(‘beijing’) values(now,220,30.2);
创建目标数据库表:
create database sinktest;
create table `meters` (`ts` TIMESTAMP,`vol` INT,`current` FLOAT) TAGS (`location` BINARY(20));
2) 打包编译:
源码位置: https://github.com/liuyq-617/TD-Flink
mvn clean package
3) 程序启动:
flink run target/test-flink-1.0-SNAPSHOT-dist.jar
读取数据
vi log/flink-root-taskexecutor-0-xxxxx.out
查看到数据打印:Sensor{ts=1645166073101, val=220, current=5.7, location=’beijing’}
写入数据
show sinktest.tables;
已经创建了beijing 子表
select * from sinktest.beijing;
可以查询到刚插入的数据
4.使用 JNI 方式
举一反三的小伙伴此时已经猜到,只要把 JDBC URL 修改一下就可以了。
但是 Flink 每次分派作业时都在使用一个新的 ClassLoader,而我们在计算节点上就会得到“Native library already loaded in another classloader”错误。
为了避免此问题,可以将 JDBC 的 jar 包放到 Flink 的 lib 目录下,不去调用 dist 包就可以了。
cp taos-jdbcdriver-2.0.37-dist.jar /usr/local/flink-1.14.3/lib
flink run target/test-flink-1.0-SNAPSHOT.jar
5.小结
通过在项目中引入 SourceFromTDengine 和 SinkToTDengine 两个类,即可完成在 Flink 中对 TDengine 的读写操作。后面我们会有文章介绍 Spark 和 TDengine 的对接。
注:文中使用的是 JDBC 的 RESTful 接口,这样就不用在 Flink 的节点安装 TDengine,JNI 方式需要在 Flink 节点安装 TDengine Database 的客户端。
关注公众号:拾黑(shiheibook)了解更多
赞助链接:
关注数据与安全,洞悉企业级服务市场:https://www.ijiandao.com/
四季很好,只要有你,文娱排行榜:https://www.yaopaiming.com/
让资讯触达的更精准有趣:https://www.0xu.cn/
随时掌握互联网精彩