Spark 和 Flink 的 JDBC 不可靠

小夏 社会 更新 2024-01-31

不要误会我的意思,这是不可靠的,这意味着 Spark 和 Flink 都无法通过 JDBC 支持真正意义上的单词流式读取,并不是说它不可用。

至少,从目前两者的官方文件来看,或者通过我自己的实践经验来判断。

那么我们来谈谈JDBC的缺点,虽然它是一种通用的数据库连接方式,但在流式读取(或计算)方面。

数据源的读取分类

我们知道,随着企业对数据处理的要求越来越高,这直接导致了我们的数据处理系统,对数据源的读取方式有了更多的可变要求。

从业务端的使用需求来看,数据源的读取方式和频率大致可以分为两类:

类型1:一次性,即一次性读取目标系统(如数据库)的所有数据,我们称之为批处理

类型 2:连续,在类型 1 的基础上,它仍然监视数据源的变化,并继续读取后续的新数据和更改的数据,我们称之为

首先是我们对读取数据源的传统需求,主流计算引擎 Spark 和 Flink 可以满足。

但是,对于第二种类型,虽然 Spark 和 Flink 都支持流式计算的特性,但它们其实这种支持有一个重要的前提,就是数据源端以及与数据源端对应的对接方式可以配合

适用于 Spark 的 JDBC

我之前吹嘘过,任何存储系统或数据库,你可以按名称命名,Spark都有一个接口来读取其中的数据,或者将计算结果存储到其中。

诚然,Spark 是这样做的,但是当我们想从特定数据库流式传输数据源时,这有点让人不知所措。

例如,我希望它从 MySQL 流式传输数据

我能想到的就是用它的结构化流式框架去尝试读取mysql,但是当我打开官网,看到它支持的数据源(最新)时,我不禁感到有些失望:

也就是说,数据源中没有官方支持以流式方式读取的MySQL,也没有提到JDBC。

但是,我以前的实践经验告诉我,有时我们不能完全相信官方的话,所以最好自己尝试一下,以防万一,对吧?

根据以往的经验,我写了如下核心**(记得提前在pom文件中介绍对应的mysql-connector包):

它可能看起来像这样,但一旦你运行它,你就会发现它:

果然不行,官网不会骗我的。

但我知道Spark绝对可以通过JDBC读取MySQL数据源。

所以,把核心**改成这个,就可以运行了:

只是这样一来,违背了我的初衷,这个逻辑已经从我原来想要的流式计算变成了批处理,也就是经过这次修改,就不再是Spark结构化的流式处理,而是普通的Spark。

因此,Spark官员无法(至少到目前为止)以流式方式直接使用JDBC读取数据源。

我在 GitHub 上看到一个开源项目,官方原生支持支持的 JDBC 方法改造后,说可以支持使用 Spark Structured Streaming 增量读取 MySQL 数据源,我暂时还没有验证过,感兴趣的同学可以看看(github.)。com/sutugin/spark-streaming-jdbc-source)。

用于 Flink 的 JDBC

打开 Flink 官网,在 Flink Connector 列,出现 JDBC(没有 MySQL):

在这种情况下,让我们尝试一下。

首先,你需要配置开发环境,与Spark不同,如果Flink想要读取MySQL数据源,就需要引入Flink独有的JDBC Connector(非传统的MySQL-Connector)。

注意这个版本的选择,可能与官网上的描述有所不同,最新官方文档的版本由两部分组成:连接器版本+Flink版本,我的版本稍旧一些。

然后是**部分,如下(像上面的Spark,这里只演示了读取mysql数据,然后打印出来):

package com.anryg.mysql.jdbc

import j**a.time.duration

import org.apache.flink.contrib.streaming.state.embeddedrocksdbstatebackend

import org.apache.flink.streaming.api.checkpointingmode

import org.apache.flink.streaming.api.environment.checkpointconfig.externalizedcheckpointcleanup

import org.apache.flink.streaming.api.scala.streamexecutionenvironment

import org.apache.flink.table.api.bridge.scala.streamtableenvironment

desc:以JDBC模式读取MySQL数据源。

auther: anryg

date: 2023/11/8 10:49

object frommysql2print {

def main(args: array[string]):unit = {

val env = streamexecutionenvironment.getexecutionenvironment

env.enablecheckpointing(10000l)

env.setStateBackend(New EmbeddedRocksDbStateBackend(True)) 一种设置状态后端的新方法。

env.getcheckpointconfig.setcheckpointstorage("hdfs:")

env.getcheckpointconfig.setexternalizedcheckpointcleanup(externalizedcheckpointcleanup.取消时保留)来设置检查点记录的保留策略。

env.getcheckpointconfig.setalignedcheckpointtimeout(duration.ofminutes(1l))

env.getcheckpointconfig.setcheckpointingmode(checkpointingmode.exactly_once)

val tableenv = streamtableenvironment.create(env)

第 1 步:读取 MySQL 数据源*

tableenv.executesql(

create table data_from_mysql(

client_ip` string,`domain` string,`time` string,`target_ip` string,`rcode` int,`query_type` int,`authority_record` string,`add_msg` string,`dns_ip` string,primary key(`client_ip`,`domain`,`time`,`target_ip`,`rcode`,`query_type`) not enforced

with('connector' = 'jdbc','url' = 'jdbc:mysql:', 'username' = '***', 'password' = '***', 'table-name' = 'test02'确定文本数据源的分隔符。

.stripmargin)

结果直接打印*

tableenv.executesql(

select * from data_from_mysql limit 100

.stripmargin).print()

整个**内容与上一篇文章中写的CDC阅读mysql的方式非常相似(有兴趣的可以去我之前的文章对比一下)。

但是,运行后,当程序完成对当前表中数据的读取后,它会突然停止:

也就是说,虽然我们使用了 Flink 流计算的上下文(streamexecutionenvironment),但由于程序使用 JDBC 来读取数据源,所以,它仍然只批量运行

在这方面,Flink 的行为与 Spark 完全相同。

最后

通过以上验证可以确定,无论是 Spark 还是 Flink,都无法以 JDBC 的方式流式读取 MySQL 数据源(或其他数据库),至少不可能直接使用官方正规军的方式。

那么,如果想直接通过计算引擎从某些数据库(比如MySQL)中读取增量数据,似乎最好的解决方案就是Flink CDC。

当然,JDBC也不是一无是处,对于一些较低版本的数据库(CDC暂时不支持),比如MySQL 55及以下版本的历史数据导入,依然可以派上用场。

作者丨anryg

*丨*** Anruige 是一名程序员(ID:GH C12DC29AE2E7)。

DBAPLUS 社区欢迎技术人员的贡献editor@dbapluscn

相似文章

    传音Spark 20:一款新的廉价智能手机已经发布

    Tecno 最近推出了 Spark ,标志着其 Spark 系列智能手机的开始。虽然尚未正式公布,但这款智能手机已在官方TECNO 传音 展示会上首次亮相,展示了其全面的规格和设计。让我们深入了解 Spark 的规格。因此,该设备拥有宽敞的 英寸 LCD 面板提供 x 像素的 HD 分辨率,并拥有流...

    龙眼和龙眼的区别

    龙眼和龙眼其实指的是同一种水果。在不同的地区,这种水果可能有不同的名称。在某些文化或地区,它被称为 龙眼 而在另一些文化或地区,它被称为 龙眼 它们都是指龙眼树 Dimocarpus longan 的果实。同一种植物 不管叫龙眼还是龙眼,它们都来自同一棵热带果树,属于无患子科。外观和味道 这种水果有...

    纯金和黄金的区别

    纯金和 是两种不同的金属材料,它们在成分 纯度 颜色 质地等方面都有一定的差异。下面将详细描述这两种金属。.成分。纯金是指纯度达到超过 而 是指元素周期表中化学符号为Au的金属元素。因此,从成分上看,纯金和 是同一种金属,但纯度不同。.纯度。纯金的纯度非常高,通常达到 以上,故又称千金。的纯度相对较...

    与女人有染的技巧

    没有恋爱经验的小白,跟女人谈恋爱,问题主要体现在 .时机不明确。推进关系太快,目的性太强,让女人不舒服 关系推进太慢,错过了窗口,关系停滞不前。 模棱两可的尺度太大,容易说出来 规模太小,影响不大。针对第一点,时间不明确,我想给弟兄们三点建议。.当一个女人快乐,心情好的时候。主要表现是,例如,女性对...

    大闸蟹和螃蟹的区别

    首先,形状不同。大闸蟹体型较大,身体扁平,头胸部宽度超过身体宽度。它们的体色通常是深绿色或深绿色,而腹部是乳白色。而螃蟹的体型相对较小,身体呈矩形或圆形,头胸部和腹部的宽度相等。它们的体色通常是深红色或棕色,而腹部是淡黄色。其次,习惯不同。大闸蟹是主要生活在河流和湖泊中的淡水生物。它们喜欢在水中游泳...