一次说透,4大服务性幂等场景架构设计方案!

服务幂等性架构设计

  • 作者: 博学谷狂野架构师
  • GitHub:GitHub地址 (有我精心准备的130本电子书PDF)

只分享干货、不吹水,让我们一起加油!😄

防重表实现幂等

对于防止数据重复提交,还有一种解决方案就是通过防重表实现。

防重表的实现思路也非常简单,首先创建一张表作为防重表,同时在该表中建立一个或多个字段的唯一索引作为防重字段,用于保证并发情况下,数据只有一条。在向业务表中插入数据之前先向防重表插入,如果插入失败则表示是重复数据。

为什么不用悲观锁

对于防重表的解决方案,可能有人会说为什么不使用悲观锁,悲观锁在使用的过程中也是会发生死锁的。

悲观锁是通过锁表的方式实现的,假设现在一个用户A访问表A(锁住了表A),然后试图访问表B;

另一个用户B访问表B(锁住了表B),然后试图访问表A。 这时对于用户A来说,由于表B已经被用户B锁住了,所以用户A必须等到用户B释放表B才能访问。

同时对于用户B来说,由于表A已经被用户A锁住了,所以用户B必须等到用户A释放表A才能访问。此时死锁就已经产生了。

阅读专业类书籍是Java程序员必备的学习方式之一。通过不断学习和积累,可以不断提高自己的技术能力和职业水平,实现职业发展的目标。

非常建议大家注重阅读,并选择一些有深度、有价值的书籍不断提升自己的技术水平和能力。这些书籍包括Java编程语言、数据结构和算法、面向对象设计、设计模式、框架原理与应用等等。

对于一位2-3年的Java程序员来说,阅读专业类书籍是更加的重要,因为它们可以帮助你扩展技术广度和深度,提高你的技术能力和职业水平。以下是我给这些程序员的一些建议:

学会寻找优秀的书籍:在选择书籍时,要选择那些被广泛认可和推荐的经典书籍。可以通过搜索网上的书籍推荐列表,向其他经验丰富的程序员请教,或者参考公司内部的学习计划,来找到好的书籍。
阅读有关设计模式和架构的书籍:对于Java程序员来说,掌握设计模式和架构原则是非常重要的。可以选择阅读《设计模式:可复用面向对象软件的基础》、《大话设计模式》、《Java程序员修炼之道》等书籍来深入学习。
学习新的技术和框架:Java技术不断发展,新的技术和框架也不断涌现。因此,Java程序员应该定期阅读有关新技术和框架的书籍,比如Spring、Spring Boot、MyBatis、Netty等。
学习算法和数据结构:算法和数据结构是编程基础,掌握这些知识可以提高代码的质量和效率。可以选择阅读《算法》、《算法导论》等书籍来学习算法和数据结构。
参考开源项目和源代码:阅读开源项目和源代码是非常有益的,可以学习到其他程序员的编码技巧和设计思路。可以选择一些知名的开源项目,如Spring、MyBatis等来进行学习。
当然,我也知道,光是建议是不足以激发大家学习的动力的,所以,书也我也帮大家整理好了,把饭喂到嘴里了,我只能帮你到这里了,剩下的就靠你自己了。

以下这份包含46本Java程序员必备经典的书单(豆瓣评分8分+),是我花费一个月时间整理的:GitHub:GitHub地址

唯一主键实现幂等

数据库唯一主键的实现主要是利用数据库中主键唯一约束的特性,一般来说唯一主键比较适用于“插入”时的幂等性,其能保证一张表中只能存在一条带该唯一主键的记录。

使用数据库唯一主键完成幂等性时需要注意的是,该主键一般来说并不是使用数据库中自增主键,而是使用分布式 ID 充当主键,这样才能能保证在分布式环境下 ID 的全局唯一性。

对于一些后台系统,并发量并不高的情况下,对于幂等的实现非常简单,通过这种思想即可完成幂等控制。

适用场景

  • 插入操作
  • 删除操作

使用限制

  • 需要生成全局唯一主键 ID;

主要流程

主要流程如下:

  1. 客户端执行创建请求,调用服务端接口。
  2. 服务端执行业务逻辑,生成一个分布式 ID,将该 ID 充当待插入数据的主键,然 后执数据插入操作,运行对应的 SQL 语句。
  3. 服务端将该条数据插入数据库中,如果插入成功则表示没有重复调用接口。如果抛出主键重复异常,则表示数据库中已经存在该条记录,返回错误信息到客户端。

在业务执行前,先判断是否已经操作过,如果没有则执行,否则判断为重复操作。

效果演示

在并发下访问时,因为是基于id进行判断,那id值就必须要保证在多次提交时,需要唯一。访问流程如下:

 @Override
@Transactional(rollbackFor = Exception.class)
public String addOrder(Order order) {

    order.setCreateTime(new Date());
    order.setUpdateTime(new Date());

    //查询
    Order orderResult = orderMapper.selectByPrimaryKey(order.getId());

    Optional<Order> orderOptional = Optional.ofNullable(orderResult);
    if (orderOptional.isPresent()){

        return "repeat request";
    }

    int result = orderMapper.insert(order);
    if (result != 1){
        return "fail";
    }

    return "success";
}

对于上述功能实现,在并发下,并不能完成幂等性控制。通过jemeter测试,模拟50个并发,可以发现,插入了重复数据。产生了脏数据。

要解决这个问题,非常简单,在数据库层面添加唯一索引即可,将id设置为唯一索引,也是最容易想到的方式,一旦id出现重复,就会出现异常,避免了脏数据的发生也可以解决永久性幂等。但该方案无法用于分库分表情况,其只适用于单表情况。

乐观锁实现幂等性

数据库乐观锁方案一般只能适用于执行更新操作的过程,我们可以提前在对应的数据表中多添加一个字段,充当当前数据的版本标识。

这样每次对该数据库该表的这条数据执行更新时,都会将该版本标识作为一个条件,值为上次待更新数据中的版本标识的值。

适用操作

  • 更新操作

使用限制

  • 需要数据库对应业务表中添加额外字段

问题抛出

扣减库存数据错误

通过jemeter进行测试,可以发现。当模拟一万并发时,最终的库存数量是错误的。这主要是因为当多线程访问时,一个线程读取到了另外线程未提交的数据造成。

synchronized失效问题

对于现在的问题,暂不考虑秒杀设计、队列请求串行化等,只考虑如何通过锁进行解决,要通过锁解决的话,那最先想到的可能是synchronized

根据synchronized定义,当多线程并发访问时,会对当前加锁的方法产生阻塞,从而保证线程安全,避免脏数据。但是,真的能如预期的一样吗?

 @Service
public class StockServiceImpl implements StockService {

    @Autowired
    private StockMapper stockMapper;

    @Override
    @Transactional(rollbackFor = Exception.class)
    public synchronized int lessInventory(String goodsId, int num) {
        return stockMapper.lessInventory(goodsId, num);
    }
}

当前已经在在方法上添加了synchronized,对当前方法对象进行了锁定。 通过Jemeter,模拟一万并发对其进行访问。可以发现,仍然出现了脏数据。

事务导致锁失效

该问题的产生原因,就在于在方法上synchronized搭配使用了@Transactional

首先synchronized锁定的是当前方法对象,而@Transactional会对当前方法进行AOP增强,动态代理出一个代理对象,在方法执行前开启事务,执行后提交事务。

所以synchronized@Transactional其实操作的是两个不同的对象,换句话说就是@Transactional的事务操作并不在synchronized锁定范围之内。

假设A线程执行完扣减库存方法,会释放锁并提交事务。但A线程释放锁但还没提交事务前,B线程执行扣减库存方法,B线程执行后,和A线程一起提交事务,就出现了线程安全问题,造成脏数据的出现。

乐观锁保证幂等

基于版本号实现

MySQL乐观锁是基于数据库完成分布式锁的一种实现,实现的方式有两种:

  • 基于版本号
  • 基于条件

但是实现思想都是基于MySQL的行锁思想来实现的。

  1. 修改数据表,添加version字段,默认值为0
  2. 修改StockMapper添加基于版本修改数据方法
 @Update("update tb_stock set amount=amount-#{num},version=version+1 where goods_id=#{goodsId} and version=#{version}")
int lessInventoryByVersion(@Param("goodsId") String goodsId,@Param("num") int num,@Param("version") int version);
  1. 测试模拟一万并发进行数据修改,此时可以发现当前版本号从0变为1,且库存量正确。

基于条件实现

通过版本号控制是一种非常常见的方式,适合于大多数场景。

但现在库存扣减的场景来说,通过版本号控制就是多人并发访问购买时,查询时显示可以购买,但最终只有一个人能成功,这也是不可以的。其实最终只要商品库存不发生超卖就可以。那此时就可以通过条件来进行控制。

  1. 修改StockMapper
 @Update("update tb_stock set amount=amount-#{num} where goods_id=#{goodsId} and amount-#{num}>=0")
int lessInventoryByVersionOut(@Param("goodsId") String goodsId,@Param("num") int num);
  1. 修改StockController
 @PutMapping("/lessInventoryByVersionOut/{goodsId}/{num}")
public String lessInventoryByVersionOut(@PathVariable("goodsId") String goodsId,@PathVariable("num") int num){

    int result = stockService.lessInventoryByVersionOut(goodsId, num);
    if (result == 1){
        System.out.println("购买成功");
        return "success";
    }

    System.out.println("购买失败");
    return "fail";
}
  1. 通过jemeter进行测试,可以发现当多人并发扣减库存时,控制住了商品超卖的问题。

乐观锁实现幂等性

在系统中,不光要保证客户端访问的幂等性,同时还要保证服务间幂等。

比较常见的情况,当服务间进行调用时,因为网络抖动等原因出现超时,则很有可能出现数据错误。此时在分布式环境下,就需要通过分布式事务或分布式锁来保证数据的一致性。分布式锁的解决方案中MySQL乐观锁就是其中一种实现。

feign超时重试效果演示

以上图为例,当客户端要生成订单时,可以基于token机制保证生成订单的幂等性,接着订单生成成功后,还会基于feign调用库存服务进行库存扣减,此时则很有可能出现,库存服务执行扣减库存成功,但是当结果返回时,出现网络抖动超时了,那么上游的订单服务则很有可能会发起重试,此时如果不进行扣减库存的幂等性保证的话,则出现扣减库存执行多次。

那可以先来演示当下游服务出现延迟,上游服务基于feign进行重试的效果。

  1. 当前是order调用feign,所以在order中会存在feignConfigure配置类,用于配置超时时间与重试次数。
 /**
 * 自定义feign超时时间、重试次数
 * 默认超时为10秒,不会进行重试。
 */
@Configuration
public class FeignConfigure {

    //超时时间,时间单位毫秒
    public static int connectTimeOutMillis = 5000;
    public static int readTimeOutMillis = 5000;

    @Bean
    public Request.Options options() {
        return new Request.Options(connectTimeOutMillis, readTimeOutMillis);
    }

    //自定义重试次数
    @Bean
    public Retryer feignRetryer(){
        Retryer retryer = new Retryer.Default(100, 1000, 4);
        return retryer;
    }
}
  1. stock服务的StockController中demo方法会延迟六秒。

    通过这种方式模拟超时效果。此时在order中调用stock服务,可以发现,order服务会对stock服务调用四次。

这里就演示了服务间调用超时的效果,当下游服务超时,上游服务会进行重试。

服务调用超时库存多次扣减

根据上述演示,当下游服务超时,上游服务就会进行重试。

那么结合当前的业务场景,当用户下单成功去调用库存服务扣减库存时, 如果库存服务执行扣减库存成功但返回结果超时,则上游订单服务就会重试,再次进行扣减库存,此时就会出现同一订单商品库存被多次扣减。

  1. 在订单服务中生成订单,并调用库存服务扣减库存
 @Idemptent
@PostMapping("/genOrder")
public String genOrder(@RequestBody Order order){

    String orderId = String.valueOf(idWorker.nextId());
    order.setId(orderId);
    order.setCreateTime(new Date());
    order.setUpdateTime(new Date());
    int result = orderService.addOrder(order);

    if (result != 1){
        System.out.println("fail");
        return "fail";
    }

    //生成订单详情信息
    List<String> goodsIdArray = JSON.parseArray(order.getGoodsIds(), String.class);

    goodsIdArray.stream().forEach(goodsId->{
        //插入订单详情
        OrderDetail orderDetail = new OrderDetail();
        orderDetail.setId(String.valueOf(idWorker.nextId()));
        orderDetail.setGoodsId(goodsId);
        orderDetail.setOrderId(orderId);
        orderDetail.setGoodsName("heima");
        orderDetail.setGoodsNum(1);
        orderDetail.setGoodsPrice(1);
        orderDetailService.addOrderDetail(orderDetail);

        //扣减库存(不考虑锁)
        stockFeign.reduceStockNoLock(goodsId, orderDetail.getGoodsNum());

    });


    return "success";
}
  1. 库存服务直接基于商品信息进行库存扣减
 @Update("update tb_stock set amount=amount-#{num} where goods_id=#{goodsId}")
int reduceStockNoLock(@Param("goodsId") String goodsId,@Param("num") Integer num);
 @PutMapping("/reduceStockNoLock/{goodsId}/{num}")
public String reduceStockNoLock(@PathVariable("goodsId") String goodsId,
                                    @PathVariable("num") Integer num) throws InterruptedException {

        System.out.println("reduce stock");
        int result = stockService.reduceStockNoLock(goodsId, num);

        if (result != 1){
            return "reduce stock fail";
        }

        //延迟
        TimeUnit.SECONDS.sleep(6000);
        return "reduce stock success";
    }
  1. 执行生成订单扣减库存,此时可以发现扣减库存方法被执行多次,并且库存数量也被扣减了多次
 {"totalNum":1,"payMoney":1,"goodsIds":"['1271700536000909313']"}

乐观锁解决服务间重试保证幂等
  1. 修改StockMapper,添加乐观锁控制控制库存
 @Update("update tb_stock set version=version+1,amount=amount-#{num} where goods_id=#{goodsId} and version=#{version} and amount-#{num}>=0")
int reduceStock(@Param("goodsId") String goodsId,@Param("num") Integer num,@Param("version") Integer version);
  1. 修改StockController,添加乐观锁扣减库存方法
 /**
     * 乐观锁扣减库存
     * @param goodsId
     * @param num
     * @param version
     * @return
     */
@PutMapping("/reduceStock/{goodsId}/{num}/{version}")
public int reduceStock(@PathVariable("goodsId") String goodsId,
                       @PathVariable("num") Integer num,
                       @PathVariable("version") Integer version) throws InterruptedException {

    System.out.println("exec reduce stock");
    int result = stockService.reduceStock(goodsId, num, version);
    if (result != 1){
        //扣减失败
        return result;
    }
    //延迟
    TimeUnit.SECONDS.sleep(6000);
    return result;
}
  1. 测试,可以发现虽然发生多次重试,但是库存只会被扣减成功一次。保证了服务间的幂等性。
ps:order服务出现异常,是因为order服务会超时重试四次,但stock服务的延迟每一次都是超过超时时间的,所以最终在order服务才会出现read timeout异常提示。

消息幂等

在系统中当使用消息队列时,无论做哪种技术选型,有很多问题是无论如何也不能忽视的,如:消息必达、消息幂等等。本章节以典型的RabbitMQ为例,讲解如何保证消息幂等的可实施解决方案,其他MQ选型均可参考。

消息重试演示

消息队列的消息幂等性,主要是由MQ重试机制引起的。

因为消息生产者将消息发送到MQ-Server后,MQ-Server会将消息推送到具体的消息消费者。假设由于网络抖动或出现异常时,MQ-Server根据重试机制就会将消息重新向消息消费者推送,造成消息消费者多次收到相同消息,造成数据不一致。

在RabbitMQ中,消息重试机制是默认开启的,但只会在consumer出现异常时,才会重复推送。在使用中,异常的出现有可能是由于消费方又去调用第三方接口,由于网络抖动而造成异常,但是这个异常有可能是暂时的。所以当消费者出现异常,可以让其重试几次,如果重试几次后,仍然有异常,则需要进行数据补偿。

数据补偿方案:当重试多次后仍然出现异常,则让此条消息进入死信队列,最终进入到数据库中,接着设置定时job查询这些数据,进行手动补偿。

本节中以consumer消费异常为演示主体,因此需要修改RabbitMQ配置文件。

修改配置文件

修改consumer一方的配置文件

 # 消费者监听相关配置
    listener:
      simple:
        retry:
          # 开启消费者(程序出现异常)重试机制,默认开启并一直重试
          enabled: true
          # 最大重试次数
          max-attempts: 5
          # 重试间隔时间(毫秒)
          initial-interval: 3000
设置消费异常

当consumer消息监听类中添加异常,最终接受消息时,可以发现,消息在接收五次后,最终出现异常。

消息幂等解决

要保证消息幂等性的话,其实最终要解决的就是保证多次操作,造成的影响是相同的。那么其解决方案的思路与服务间幂等的思路其实基本都是一致的。

  1. 消息防重表,解决思路与服务间幂等的防重表一致。
  2. redis:利用redis防重。

这两种方案是最常见的解决方案。其实现思路其实都是一致的。

代码实现

修改OrderController
 /**
     * 此处为了方便演示,不做基础添加数据库操作
     * @return
     */
@PostMapping("/addOrder")
public String addOrder(){

    String uniqueKey = String.valueOf(idWorker.nextId());

    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setMessageId(uniqueKey);
    messageProperties.setContentType("text/plain");
    messageProperties.setContentEncoding("utf-8");
    Message message = new Message("1271700536000909313".getBytes(),messageProperties);
    rabbitTemplate.convertAndSend(RabbitMQConfig.REDUCE_STOCK_QUEUE,message);

    return "success";
}
修改stockApplication
 @Bean
public JedisPool jedisPool(){
    return new JedisPool("192.168.200.150",6379);
}
新增消息监听类
 @Component
public class ReduceStockListener {

    @Autowired
    private StockService stockService;

    @Autowired
    private JedisPool jedisPool;

    @Autowired
    private StockFlowService stockFlowService;

    @RabbitListener(queues = RabbitMQConfig.REDUCE_STOCK_QUEUE)
    @Transactional
    public void receiveMessage(Message message){

        //获取消息id
        String messageId = message.getMessageProperties().getMessageId();

        Jedis jedis = jedisPool.getResource();

        System.out.println(messageId);
        try {

            //redis锁去重校验
            if (!"OK".equals(jedis.set(messageId, messageId, "NX", "PX", 300000))){
                System.out.println("重复请求");
                return;
            }

            //mysql状态校验
            if (!(stockFlowService.findByFlag(messageId).size() == 0)){
                System.out.println("数据已处理");
                return;
            }

            String goodsId = null;
            try {
                //获取消息体中goodsId
                goodsId = new String(message.getBody(),"utf-8");
                stockService.reduceStock(goodsId,messageId);
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }

            int nextInt = new Random().nextInt(100);
            System.out.println("随机数:"+nextInt);
            if (nextInt%2 ==0){
                int i= 1/0;
            }


        } catch (RuntimeException e) {
            //解锁
            String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
            jedis.eval(script, Collections.singletonList(messageId), Collections.singletonList(messageId));
            System.out.println("出现异常了");
            System.out.println(messageId+":释放锁");
            throw e;
        }
    }
}

消息缓冲区

对于RabbitMQ的使用,默认情况下,每条消息都会进行分别的ack通知,消费完一条后,再来消费下一条。但是这样就会造成大量消息的阻塞情况。所以为了提升消费者对于消息的消费速度,可以增加consumer数据或者对消息进行批量消费。MQ接收到producer发送的消息后,不会直接推送给consumer。而是积攒到一定数量后,再进行消息的发送。 这种方式的实现,可以理解为是一种缓冲区的实现,提升了消息的消费速度,但是会在一定程度上舍弃结果返回的实时性。

对于批量消费来说,也是需要考虑幂等的。对于幂等性的解决方案,沿用刚才的思路即可解决。

本文由传智教育博学谷狂野架构师教研团队发布。

如果本文对您有帮助,欢迎关注点赞;如果您有任何建议也可留言评论私信,您的支持是我坚持创作的动力。

转载请注明出处!

热门相关:最强狂兵   重生之至尊千金   重生之至尊千金   夫人,你马甲又掉了!   无限杀路