在本文中,让我们谈谈消息队列中最重要的最佳实践之一:消费幂等性
消费幂等性是指当 RocketMQ 消费者重复消费一条消息时,重复消费的结果与一次消费的结果相同,多次消费对业务系统没有任何负面影响。
例如,在支付场景中,消费者消费了一条扣款消息,从订单中扣款100元。
如果由于网络不稳定或其他原因反复下发扣款消息,消费者重复消费扣款消息,但最终业务结果是扣款只进行一次,扣款费用为100元,且用户扣款记录中对应的订单只有一个扣款流程,费用不会多次扣款。 那么这个扣减操作是符合要求的,整个消费过程实现了消费幂等。
重复 rocketmq 消息的场景如下:
发送时会重复发送消息
当消息成功发送到服务器并持久化时,网络中断或客户端关闭,导致服务器无法应答客户端。
如果生产者意识到消息失败并尝试再次发送消息,则使用者随后将收到两条内容相同但消息 ID 不同的消息。
消息在传递时重复
在消息消费场景下,消息已经下发给消费者,服务被处理,客户端响应服务器时网络中断。 为了保证消息至少消费一次,Broker 服务器会在网络恢复后再次尝试投递之前处理过的消息,消费者会收到两条内容相同、消息 ID 相同的消息。
负载均衡期间出现重复消息(包括但不限于网络抖动、代理重启和消费者应用程序重启)。
当代理或客户端重新启动、横向扩展或缩减时,将触发重新平衡,使用者可能会收到少量重复消息。
由于不同消息 ID 对应的消息内容可能相同,可能存在冲突(重复),因此不建议使用消息 ID 作为真正安全的幂等处理的基础。
最好的方法是:以业务的唯一标识作为幂等处理的关键依据,消息必须携带业务的唯一标识
一般来说,有两种方法可以唯一标识消息承载服务:
消息密钥保存业务的唯一标识符
message msg = new message(topic /* topic */, tag /* tag */, ("hello rocketmq " + i).getbytes(remotinghelper.default_charset) /* message body */ );message.setkey("orderid_100");订单号 sendresult sendresult = producersend(message);消息正文包含业务的唯一标识符
message msg = new message(topic /* topic */, tag /* tag */, (json.tojsonstring(orderdto)).getbytes(remotinghelper.default_charset) /* message body */ );message.setkey("orderid_100");订单号 sendresult sendresult = producersend(message);消费者在接收到消息时,从消息中获取订单号,实现消息同义词
consumer.registermessagelistener(new messagelistenerconcurrently() return consumeconcurrentlystatus.consume_success; }为了保证幂等性,必须这样做业务逻辑判断,我认为这是幂等保证的首先,也是最重要的笔者曾经服务过神州豪华轿车,乘客在用户端点开立即叫车,订单服务创建一个订单,先保存到数据库中,然后将订单信息同步保存到缓存中。
在订单的乘客生命周期中,订单的修改操作首先修改缓存,然后向metaq,下单服务消费消息,判断订单信息是否正常(例如是否乱序),订单数据是否正确,则存储在数据库中。
订单状态机器为:创造分配的驱动程序司机已经离开了司机到了司机已经接走了乘客到
这是为了快速提高系统性能,消费者因网络问题而收到乱序消息的概率非常小。
当订单状态为:司机到了,消费者可能会收到司机已经离开了由于网络原因,消息(即最先发送的消息)延迟。
这时,消费者需要判断当前专车订单状态机,保存最合理的订单数据,然后忽略旧消息,打印相关日志。
数据库重复数据删除表有两个要点:
操作前,通过去重表中的唯一服务标识符查询记录是否存在,如果没有,则进行后续的消费过程
为了避免并发出现场景,去重表需要包含服务的唯一键,这样即使并发插入,也无法插入多条记录,如果插入失败,会抛出异常。
以电子商务场景为例:当用户结账购物车时,系统会创建一个付款订单。 用户支付成功后,支付订单的状态会由未支付变为成功,然后系统会给用户加分。
我们可以使用 RocketMQ 事务性消息传递方案,它利用了 MQ:异步跟解耦,以及事务的最终一致性。
在消费的逻辑中,幂等性非常重要
。积分表 sql 如下:
create table `t_points` (id` bigint(20) not null comment '主键', `user_id` bigint(20) not null comment '用户 ID', `order_id` bigint(20) not null comment '订单号', `points` int(4) not null comment '学分', `remarks` varchar(128) collate utf8mb4_bin not null comment '言论', `create_time` datetime not null, primary key (`id`),unique key `unique_order_id` (order_id`) using btree comment '订单是唯一的') engine=innodb default charset=utf8mb4 collate=utf8mb4_bin;收到订单信息后,首先确定订单是否有积分记录,如果没有,则插入积分记录。
即使在极端并发的场景下,订单号是唯一的关键,数据库中也难免不会出现同一订单的多点记录。
public consumeconcurrentlystatus consumemessage(list msgs, consumeconcurrentlycontext context) return consumeconcurrentlystatus.consume_success; }catch (exception e) }消费者收到消息后,首先判断 Redis 中是否存在服务主键的标志位,如果存在标志位,则认为消费成功,否则执行业务逻辑,执行完成后,将标志位添加到缓存中。
public consumeconcurrentlystatus consumemessage(list msgs, consumeconcurrentlycontext context) //2.执行业务逻辑以执行业务 3设置标志 redistemplateopsforvalue().set(rediskeyconstants.waiting_send_lock + bizkey, "1", 72, timeunit.hours); return consumeconcurrentlystatus.consume_success; }catch (exception e) }光有业务逻辑判断是不够的,为了处理并发场景,我们可以使用它分布式锁分布式锁通常有三种方案:
数据库乐观锁。
数据库悲观锁。
Redis 锁。
数据的乐观锁定假设是数据在正常情况下不会引起冲突,因此在提交和更新数据时,会正式检测数据是否存在冲突,如果发现冲突,则返回用户错误的信息,由用户决定如何处理。
由于乐观锁没有锁定等待,因此吞吐量得到提高,因此乐观锁是合适的多读少写场景。
实现乐观锁定:通常在数据表中添加数据版本号version
字段,表示数据被修改的次数,修改数据时,版本值将增加 1。
当线程 A 想要更新数据值时,它会在读取数据的同时读取数据version
值,如果在提交更新时只读取version
该值位于当前数据库中version
在值相等之前不会更新更新,否则将重试更新操作,直到更新成功。
Step 1 : 查询传出入口数据
select version from my_table where id = #步骤二:修改入口数据,传递版本参数
update my_table set n = n + 1, version = version + 1 where id=# and version = #从实现的角度来看,乐观锁定非常容易实现,但它有两个缺点:
对于业务的侵入性,请添加版本字段;
在高并发场景中只有一个线程可以成功修改,然后会出现大量的失败
消费者演示如下:
public consumeconcurrentlystatus consumemessage(list msgs, consumeconcurrentlycontext context) where id = # // and version = # int affectedcount = ordermapper.updateorder(orderpo); if(affectedcount == 0) return consumeconcurrentlystatus.consume_success; }catch (exception e) }当我们想修改数据库中的一段数据时,为了避免同时被其他人修改,最好的办法是直接锁定数据,防止并发。
这种在数据库锁定机制的帮助下修改数据之前锁定数据的方法称为悲观并发控制 (PCC)。
之所以称为悲观锁,是因为它是一种对数据修改持悲观态度的并发控制方法。 我们一般认为数据同时被修改的概率比较高,所以在修改之前需要先锁定。
悲观的并发控制实际上是“先锁定后访问”的保守策略。它为数据处理的安全性提供了保证
以下是使用 MySQL 悲观锁的方法:
begin;--读取数据并锁定选择。for update;--修改数据更新。commit;例如,以下 ** 将改为
t_order
桌子id
是 1 的记录,并且将是该记录status
该字段修改为
begin;select * from t_order where id = 1 for update;update t_order set status = '3' where id = 1;commit;如果
t_order
桌子id
记录 1 正在被另一个事务修改,上面的 ** 等待记录被释放后再继续。
消费者演示如下:
public consumeconcurrentlystatus consumemessage(list msgs, consumeconcurrentlycontext context) return consumeconcurrentlystatus.consume_success; }catch (exception e) }使用数据库锁是一个非常繁重的操作,我们可以使用轻Redis 锁之所以被取代,是因为 Redis 的高性能和非常丰富的生态系统(类库),支持不同类型的分布式锁。
我们选择了 Redisson 框架提供的分布式锁功能,简化示例如下:
public consumeconcurrentlystatus consumemessage(list msgs, consumeconcurrentlycontext context) return consumeconcurrentlystatus.consume_success; }catch (exception e) }在本文中,我们将仔细研究如何实现 RocketMQ 消费幂等性。
1. 消费幂等:当 RocketMQ 消费者重复消费一条消息时,重复消费的结果与一次消费的结果相同,多次消费对业务系统没有任何负面影响。
二、适用场景:在发送、传递或负载平衡时重复发送消息。
3. 企业的唯一标识以业务的唯一标识作为幂等处理的关键依据,消息必须携带业务的唯一标识
4. 幂等策略:业务状态机需要在业务逻辑中确定,根据实际情况进行选择全局处理标识跟分布式锁有两种方法可以处理它。
如果我的文章对你有帮助,也请帮忙喜欢,看着,**您的支持将激励我输出更高质量的文章,非常感谢!
作者:勇哥的编程游记。
*:cnblogs.com/makemylife/p/17910944.html