不要误会我的意思,这是不可靠的,这意味着 Spark 和 Flink 都无法通过 JDBC 支持真正意义上的单词流式读取,并不是说它不可用。
至少,从目前两者的官方文件来看,或者通过我自己的实践经验来判断。
那么我们来谈谈JDBC的缺点,虽然它是一种通用的数据库连接方式,但在流式读取(或计算)方面。
数据源的读取分类
我们知道,随着企业对数据处理的要求越来越高,这直接导致了我们的数据处理系统,对数据源的读取方式有了更多的可变要求。
从业务端的使用需求来看,数据源的读取方式和频率大致可以分为两类:
类型1:一次性,即一次性读取目标系统(如数据库)的所有数据,我们称之为批处理
类型 2:连续,在类型 1 的基础上,它仍然监视数据源的变化,并继续读取后续的新数据和更改的数据,我们称之为流
首先是我们对读取数据源的传统需求,主流计算引擎 Spark 和 Flink 可以满足。
但是,对于第二种类型,虽然 Spark 和 Flink 都支持流式计算的特性,但它们其实这种支持有一个重要的前提,就是数据源端以及与数据源端对应的对接方式可以配合
适用于 Spark 的 JDBC
我之前吹嘘过,任何存储系统或数据库,你可以按名称命名,Spark都有一个接口来读取其中的数据,或者将计算结果存储到其中。
诚然,Spark 是这样做的,但是当我们想从特定数据库流式传输数据源时,这有点让人不知所措。
例如,我希望它从 MySQL 流式传输数据
我能想到的就是用它的结构化流式框架去尝试读取mysql,但是当我打开官网,看到它支持的数据源(最新)时,我不禁感到有些失望:
也就是说,数据源中没有官方支持以流式方式读取的MySQL,也没有提到JDBC。
但是,我以前的实践经验告诉我,有时我们不能完全相信官方的话,所以最好自己尝试一下,以防万一,对吧?
根据以往的经验,我写了如下核心**(记得提前在pom文件中介绍对应的mysql-connector包):
它可能看起来像这样,但一旦你运行它,你就会发现它:
果然不行,官网不会骗我的。
但我知道Spark绝对可以通过JDBC读取MySQL数据源。
所以,把核心**改成这个,就可以运行了:
只是这样一来,违背了我的初衷,这个逻辑已经从我原来想要的流式计算变成了批处理,也就是经过这次修改,就不再是Spark结构化的流式处理,而是普通的Spark。
因此,Spark官员无法(至少到目前为止)以流式方式直接使用JDBC读取数据源。
我在 GitHub 上看到一个开源项目,官方原生支持支持的 JDBC 方法改造后,说可以支持使用 Spark Structured Streaming 增量读取 MySQL 数据源,我暂时还没有验证过,感兴趣的同学可以看看(github.)。com/sutugin/spark-streaming-jdbc-source)。
用于 Flink 的 JDBC
打开 Flink 官网,在 Flink Connector 列,出现 JDBC(没有 MySQL):
在这种情况下,让我们尝试一下。
首先,你需要配置开发环境,与Spark不同,如果Flink想要读取MySQL数据源,就需要引入Flink独有的JDBC Connector(非传统的MySQL-Connector)。
注意这个版本的选择,可能与官网上的描述有所不同,最新官方文档的版本由两部分组成:连接器版本+Flink版本,我的版本稍旧一些。
然后是**部分,如下(像上面的Spark,这里只演示了读取mysql数据,然后打印出来):
package com.anryg.mysql.jdbc
import j**a.time.duration
import org.apache.flink.contrib.streaming.state.embeddedrocksdbstatebackend
import org.apache.flink.streaming.api.checkpointingmode
import org.apache.flink.streaming.api.environment.checkpointconfig.externalizedcheckpointcleanup
import org.apache.flink.streaming.api.scala.streamexecutionenvironment
import org.apache.flink.table.api.bridge.scala.streamtableenvironment
desc:以JDBC模式读取MySQL数据源。
auther: anryg
date: 2023/11/8 10:49
object frommysql2print {
def main(args: array[string]):unit = {
val env = streamexecutionenvironment.getexecutionenvironment
env.enablecheckpointing(10000l)
env.setStateBackend(New EmbeddedRocksDbStateBackend(True)) 一种设置状态后端的新方法。
env.getcheckpointconfig.setcheckpointstorage("hdfs:")
env.getcheckpointconfig.setexternalizedcheckpointcleanup(externalizedcheckpointcleanup.取消时保留)来设置检查点记录的保留策略。
env.getcheckpointconfig.setalignedcheckpointtimeout(duration.ofminutes(1l))
env.getcheckpointconfig.setcheckpointingmode(checkpointingmode.exactly_once)
val tableenv = streamtableenvironment.create(env)
第 1 步:读取 MySQL 数据源*
tableenv.executesql(
create table data_from_mysql(
client_ip` string,`domain` string,`time` string,`target_ip` string,`rcode` int,`query_type` int,`authority_record` string,`add_msg` string,`dns_ip` string,primary key(`client_ip`,`domain`,`time`,`target_ip`,`rcode`,`query_type`) not enforced
with('connector' = 'jdbc','url' = 'jdbc:mysql:', 'username' = '***', 'password' = '***', 'table-name' = 'test02'确定文本数据源的分隔符。
.stripmargin)
结果直接打印*
tableenv.executesql(
select * from data_from_mysql limit 100
.stripmargin).print()
整个**内容与上一篇文章中写的CDC阅读mysql的方式非常相似(有兴趣的可以去我之前的文章对比一下)。
但是,运行后,当程序完成对当前表中数据的读取后,它会突然停止:
也就是说,虽然我们使用了 Flink 流计算的上下文(streamexecutionenvironment),但由于程序使用 JDBC 来读取数据源,所以,它仍然只批量运行
在这方面,Flink 的行为与 Spark 完全相同。
最后
通过以上验证可以确定,无论是 Spark 还是 Flink,都无法以 JDBC 的方式流式读取 MySQL 数据源(或其他数据库),至少不可能直接使用官方正规军的方式。
那么,如果想直接通过计算引擎从某些数据库(比如MySQL)中读取增量数据,似乎最好的解决方案就是Flink CDC。
当然,JDBC也不是一无是处,对于一些较低版本的数据库(CDC暂时不支持),比如MySQL 55及以下版本的历史数据导入,依然可以派上用场。
作者丨anryg
*丨*** Anruige 是一名程序员(ID:GH C12DC29AE2E7)。
DBAPLUS 社区欢迎技术人员的贡献editor@dbapluscn