Spark消费Kafka如何实现精准一次性消费?-程序员宅基地

技术标签: spark  kafka  Spark  大数据  

在这里插入图片描述

1.定义

  • 精确一次消费(Exactly-once) 是指消息一定会被处理且只会被处理一次。不多不少就一次处理。

如果达不到精确一次消费,可能会达到另外两种情况:

  • 至少一次消费(at least once),主要是保证数据不会丢失,但有可能存在数据重复问题。

  • 最多一次消费 (at most once),主要是保证数据不会重复,但有可能存在数据丢失问题。

如果同时解决了数据丢失和数据重复的问题,那么就实现了精确一次消费的语义了。

2. 问题如何产生

数据何时会丢失: 比如实时计算任务进行计算,到数据结果存盘之前,进程崩溃,假设在进程崩溃前kafka调整了偏移量,那么kafka就会认为数据已经被处理过,即使进程重启,kafka也会从新的偏移量开始,所以之前没有保存的数据就被丢失掉了。

数据何时会重复: 如果数据计算结果已经存盘了,在kafka调整偏移量之前,进程崩溃,那么kafka会认为数据没有被消费,进程重启,会重新从旧的偏移量开始,那么数据就会被2次消费,又会被存盘,数据就被存了2遍,造成数据重复。

3.解决方案

方案一:利用关系型数据库的事务进行处理

出现丢失或者重复的问题,核心就是偏移量的提交与数据的保存,不是原子性的。如果能做成要么数据保存和偏移量都成功,要么两个失败。那么就不会出现丢失或者重复了。

这样的话可以把存数据和偏移量放到一个事务里。这样就做到前面的成功,如果后面做失败了,就回滚前面那么就达成了原子性。

问题与限制

  1. 数据库选型受限, 只能使用支持事务的关系型数据库 ,如mysql, oracle ,无法使用其他功能强大的nosql数据库。

  2. 如果保存的数据量较大一个数据库节点不够,多个节点的话,还要考虑分布式事务的问题。做分布式事务,结构复杂,拖慢性能。

  3. 如果做本地事务 ,只能把分区数据提取到driver中进行保存,降低并发 ,增加executor到driver的数据传递io。

方案二:手动提交偏移量+幂等性处理

咱们知道如果能够同时解决数据丢失和数据重复问题,就等于做到了精确一次消费。

那咱们就各个击破。

首先解决数据丢失问题,办法就是要等数据保存成功后再提交偏移量,所以就必须手工来控制偏移量的提交时机。

但是如果数据保存了,没等偏移量提交进程挂了,数据会被重复消费。怎么办?那就要把数据的保存做成幂等性保存。即同一批数据反复保存多次,数据不会翻倍,保存一次和保存一百次的效果是一样的。如果能做到这个,就达到了幂等性保存,就不用担心数据会重复了。

难点

话虽如此,在实际的开发中手动提交偏移量其实不难,难的是幂等性的保存,有的时候并不一定能保证。所以有的时候只能优先保证的数据不丢失。数据重复难以避免。即只保证了至少一次消费的语义

文章已收录于https://github.com/wangsl123/dzblog,欢迎start。微信关注【鄙人王道长】,也可第一时间看到文章。

在这里插入图片描述

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/wangsl754/article/details/107479977

智能推荐

未处理System.BadImageFormatException”类型的未经处理的异常在 xxxxxxx.exe 中发生_“system.badimageformatexception”类型的未经处理的异常在 未知模块。 -程序员宅基地

文章浏览阅读2.4k次。“System.BadImageFormatException”类型的未经处理的异常在 xxxx.exe 中发生其他信息: 未能加载文件或程序集“xxxxxxx, Version=xxxxxx,xxxxxxx”或它的某一个依赖项。试图加载格式不正确的程序。此原因是由于 ” 目标程序的目标平台与 依赖项的目标编译平台不一致导致,把所有的项目都修改到同一目标平台下(X86、X64或AnyCPU)进行编译,一般即可解决问题“。若果以上方式不能解决,可采用如下方式:右键选择配置管理器,在这里修改平台。_“system.badimageformatexception”类型的未经处理的异常在 未知模块。 中发生

PC移植安卓---2018/04/26_电脑软件移植安卓-程序员宅基地

文章浏览阅读2.4k次。记录一下碰到的问题:1.Assetbundle加载问题: 原PC打包后的AssetBundle导入安卓工程后,加载会出问题。同时工程打包APK时,StreamingAssets中不能有中文。解决方案: (1).加入PinYinConvert类,用于将中文转换为拼音(多音字可能会出错,例如空调转换为KongDiao||阿拉伯数字不支持,如Ⅰ、Ⅱ、Ⅲ、Ⅳ(IIII)、Ⅴ、Ⅵ、Ⅶ、Ⅷ、Ⅸ、Ⅹ..._电脑软件移植安卓

学习笔记(03):高并发下的Nginx性能优化实战-Nginx优势特点总结-程序员宅基地

文章浏览阅读87次。 高并发下的Nginx性能优化实战、解读Nginx的核心知识、掌握nginx核心原理?通过本期课程将解答我们的疑惑。_随着 nginx 发展,高并发、轻量的优势,近几年

Linux环境 docker启动redis命令_linux docker 重启 redis-程序员宅基地

文章浏览阅读1.1k次。docker启动redis命令_linux docker 重启 redis

【总结】插头DP-bzoj1210/2310/2331/2595_dp插头模型-程序员宅基地

文章浏览阅读325次。插头DP小结_dp插头模型

关于测试工作效率低的一些思考和改进方法_测试人员不足与改进-程序员宅基地

文章浏览阅读3.5k次。关于测试工作效率低的一些思考和改进方法引子  汇总统计了一下项目组近期测试项目实际工作量与基线工作量的对比,发现一个严重问题。就是工作效率特别低下。下面简单列举一下几个项目预期工作量和实际工作量以及时间耗费严重的地方、项目简要背景。  1、B版本测试。版本预期工作量15人天,实际耗费工作量在30人天。更为严重的是测试人员并没有因为测试周期延长和工作量投入加大而测试的更轻松,反而是测试期..._测试人员不足与改进

随便推点

有了这6款浏览器插件,浏览器居然“活了”?!媳妇儿直呼“大开眼界”_浏览器插件助手-程序员宅基地

文章浏览阅读901次,点赞20次,收藏23次。浏览器是每台电脑的必装软件,去浏览器搜索资源和信息已经成为我们的日常,我媳妇儿原本也以为浏览器就是上网冲浪而已,哪有那么强大,但经过我的演示之后她惊呆了,直接给我竖起大拇指道:“原来浏览器还能这么用?大开眼界!今天来给大家介绍几款实用的浏览器插件,学会之后让你的浏览器“活过来”!_浏览器插件助手

NumPy科学数学库_数学中常用的环境有numpy-程序员宅基地

文章浏览阅读101次。NumPy是Python中最常用的科学数学计算库之一,它提供了高效的多维数组对象以及对这些数组进行操作的函数NumPy的核心是ndarray(N-dimensional array)对象,它是一个用于存储同类型数据的多维数组Numpy通常与SciPy(Scientific Python)和 Matplotlib(绘图库)一起使用,用于替代MatLabSciPy是一个开源的Python算法库和数学工具包;Matplotlib是Python语言及其Numpy的可视化操作界面'''_数学中常用的环境有numpy

dind(docker in docker)学习-程序员宅基地

文章浏览阅读1.1w次。docker in docker说白了,就是在docker容器内启动一个docker daemon,对外提供服务。优点在于:镜像和容器都在一个隔离的环境,保持操作者的干净环境。想到了再补充 :)一:低版本启动及访问启动1.12.6-dinddocker run --privileged -d --name mydocker docker:1.12.6-dind在其他容器访问d..._dind

com.sun.org.apache.xerces.internal.dom.DeferredTextImpl cannot be cast to org.w3c.dom.Element-程序员宅基地

文章浏览阅读1.4k次。代码 List<book> list = new ArrayList<book>(); Document doc = new DOCUntil().getDocument("src/ww/t/qp/books.xml"); Element root = doc.getDocumentElement(); NodeList books = root.getElementsByTagName("book"); for _com.sun.org.apache.xerces.internal.dom.deferredtextimpl cannot be cast to or

计算机网络 (ISP、计算机网络体系结构 拓扑图)-程序员宅基地

文章浏览阅读2.9k次。计算机网络定义: 一些互相连接的,自治的计算机或者智能硬件设备集合信息是如何交换的?1、电路交换(电话)线路被某个终端占用后不会被释放 除非主动释放2、分组交换分组交换是 把大数据拆分成小数据进行分组传输 提高传输效率计算机上的数据是突发的,所以不采用电路交换,而采用分组交换3、ISP(internet service provider)网络服务提供商终端通过交换机连接一级ISP(国际性区域)之间互相连接 某一条线路断后可以通过其他设备进行通信二级ISP(国家/区域性规模)与一级相互连接三级ISP(本地区域

推荐文章

热门文章

相关标签