谷粒商城-高级-77 -商城业务-订单服务-RMQ 延时队列处理关单及库存解锁整合

一、流程分析

file

步骤:

  • 1、订单创建成功,发送消息给MQ
  • 2、订单服务订单关单监听器(死信之后判断是否关单)
  • 3、订单关单成功后发消息给MQ(订单释放直接和库存释放进行绑定)
  • 4、库存服务是释放库存监听器监听是否解锁库存队列(stock.release.stock.queue)
  • 5、库存解锁处理逻辑

这里出现了两个交换机绑定同一个队列的情况,即订单的交换机和库存的队列绑定在一起了。

二、订单关单

1、订单释放直接和库存释放进行绑定
gulimall-order/xxx/order/config/MyMQConfig.java

package com.atguigu.gulimall.order.config;

import com.atguigu.gulimall.order.entity.OrderEntity;
import com.rabbitmq.client.AMQP;

/**
 * @author: kaiyi
 * @create: 2020-09-16 13:53
 */
@Configuration
public class MyMQConfig {

  /* 容器中的Queue、Exchange、Binding 会自动创建(在RabbitMQ)不存在的情况下 */

  /**
   * 客户端监听队列(测试)
   * @param orderEntity
   * @param channel
   * @param message
   * @throws IOException
   */
  /*
  @RabbitListener(queues = "order.release.order.queue")
  public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {

    System.out.println("收到过期的订单信息:准备关闭订单" + orderEntity.getOrderSn());
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

  }
   */

  /**
   * 死信队列
   *
   * @return
   */
  @Bean
  public Queue orderDelayQueue(){

     /*
            Queue(String name,  队列名字
            boolean durable,  是否持久化
            boolean exclusive,  是否排他
            boolean autoDelete, 是否自动删除
            Map<String, Object> arguments) 属性
         */
    HashMap<String, Object> arguments = new HashMap<>();
    arguments.put("x-dead-letter-exchange", "order-event-exchange");   // 信死了交给哪个交换机
    arguments.put("x-dead-letter-routing-key", "order.release.order"); // 信死了交给哪个路由key
    arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟

    Queue queue = new Queue("order.delay.queue", true, false, false, arguments);
    return queue;
  }

  /**
   * 普通队列
   *
   * @return
   */
  @Bean
  public Queue orderReleaseQueue(){

    Queue queue = new Queue("order.release.order.queue", true, false, false);
    return queue;
  }

  /**
   * TopicExchange
   *
   * @return
   */
  @Bean
  public Exchange orderEventExchange(){
    /*
     *   String name,
     *   boolean durable,
     *   boolean autoDelete,
     *   Map<String, Object> arguments
     * */

    return new TopicExchange("order-event-exchange", true, false);
  }

  @Bean
  public Binding orderCreateBinding() {
    /*
     * String destination, 目的地(队列名或者交换机名字)
     * DestinationType destinationType, 目的地类型(Queue、Exhcange)
     * String exchange,
     * String routingKey,
     * Map<String, Object> arguments
     * */
    return new Binding("order.delay.queue",
        Binding.DestinationType.QUEUE,
        "order-event-exchange",
        "order.create.order",  // 路由key一般为事件名
        null);
  }

  @Bean
  public Binding orderReleaseBinding() {

    return new Binding("order.release.order.queue",
        Binding.DestinationType.QUEUE,
        "order-event-exchange",
        "order.release.order",
        null);
  }

  /**
   * 订单释放直接和库存释放进行绑定
   * @return
   */
  @Bean
  public Binding orderReleaseOtherBinding() {

    return new Binding("stock.release.stock.queue",
        Binding.DestinationType.QUEUE,
        "order-event-exchange",
        "order.release.other.#",
        null);
  }
}

2、提交订单增加订单创建成功,发送消息给MQ
gulimall-order/xxx/order/service/impl/OrderServiceImpl.java

 /**
     * 提交订单
     * @param vo
     * @return
     */
    // @Transactional(isolation = Isolation.READ_COMMITTED) 设置事务的隔离级别
    // @Transactional(propagation = Propagation.REQUIRED)   设置事务的传播级别
    @Transactional(rollbackFor = Exception.class)
    // @GlobalTransactional(rollbackFor = Exception.class)
    @Override
    public SubmitOrderResponseVo submitOrder(OrderSubmitVo vo) {

        confirmVoThreadLocal.set(vo);

        SubmitOrderResponseVo responseVo = new SubmitOrderResponseVo();
        //去创建、下订单、验令牌、验价格、锁定库存...

        //获取当前用户登录的信息
        MemberResponseVo memberResponseVo = LoginUserInterceptor.loginUser.get();
        responseVo.setCode(0);

        //1、验证令牌是否合法【令牌的对比和删除必须保证原子性】
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        String orderToken = vo.getOrderToken();

        //通过lua脚本原子验证令牌和删除令牌
        Long result = redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class),
            Arrays.asList(OrderConstant.USER_ORDER_TOKEN_PREFIX + memberResponseVo.getId()),
            orderToken);

        if (result == 0L) {
            //令牌验证失败
            responseVo.setCode(1);
            return responseVo;
        } else {
            //令牌验证成功
            //1、创建订单、订单项等信息
            OrderCreateTo order = createOrder();

            //2、验证价格
            BigDecimal payAmount = order.getOrder().getPayAmount();
            BigDecimal payPrice = vo.getPayPrice();

            if (Math.abs(payAmount.subtract(payPrice).doubleValue()) < 0.01) {
                //金额对比
                //TODO 3、保存订单
                saveOrder(order);

                //4、库存锁定,只要有异常,回滚订单数据
                //订单号、所有订单项信息(skuId,skuNum,skuName)
                WareSkuLockVo lockVo = new WareSkuLockVo();
                lockVo.setOrderSn(order.getOrder().getOrderSn());

                //获取出要锁定的商品数据信息
                List<OrderItemVo> orderItemVos = order.getOrderItems().stream().map((item) -> {
                    OrderItemVo orderItemVo = new OrderItemVo();
                    orderItemVo.setSkuId(item.getSkuId());
                    orderItemVo.setCount(item.getSkuQuantity());
                    orderItemVo.setTitle(item.getSkuName());
                    return orderItemVo;
                }).collect(Collectors.toList());
                lockVo.setLocks(orderItemVos);

                //TODO 调用远程锁定库存的方法
                //出现的问题:扣减库存成功了,但是由于网络原因超时,出现异常,导致订单事务回滚,库存事务不回滚(解决方案:seata)
                //为了保证高并发,不推荐使用seata,因为是加锁,并行化,提升不了效率,可以发消息给库存服务
                R r = wmsFeignService.orderLockStock(lockVo);
                if (r.getCode() == 0) {
                    //锁定成功
                    responseVo.setOrder(order.getOrder());
                    // int i = 10/0;   // 抛出异常,测试远程回滚

                    //TODO 订单创建成功,发送消息给MQ
                    rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",order.getOrder());

                    //删除购物车里的数据
                    redisTemplate.delete(CartConstant.CART_PREFIX + memberResponseVo.getId());
                    return responseVo;
                } else {
                    //锁定失败
                    String msg = (String) r.get("msg");
                    throw new NoStockException(msg);
                    // responseVo.setCode(3);
                    // return responseVo;
                }

            } else {
                responseVo.setCode(2);
                return responseVo;
            }
        }
    }

3、关单监听
gulimall-order/xxx/order/listener/OrderCloseListener.java

package com.atguigu.gulimall.order.listener;

import com.atguigu.gulimall.order.entity.OrderEntity;
import com.atguigu.gulimall.order.service.OrderService;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.io.IOException;

/**
 * 关单监听
 *
 * @author: kaiyi
 * @create: 2020-09-17 11:01
 */
@RabbitListener(queues = "order.release.order.queue")
@Service
public class OrderCloseListener {

  @Autowired
  private OrderService orderService;

  @RabbitHandler
  public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {
    System.out.println("收到过期的订单信息,准备关闭订单" + orderEntity.getOrderSn());
    try {
      orderService.closeOrder(orderEntity);
      channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    } catch (Exception e) {
      channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
    }

  }

}

4、关单成功给库存服务发送MQ消息
gulimall-order/xxx/order/service/impl/OrderServiceImpl.java

/**
     * 关闭订单
     * @param orderEntity
     */
    @Override
    public void closeOrder(OrderEntity orderEntity) {

        //关闭订单之前先查询一下数据库,判断此订单状态是否已支付
        OrderEntity orderInfo = this.getOne(new QueryWrapper<OrderEntity>().
            eq("order_sn",orderEntity.getOrderSn()));

        if (orderInfo.getStatus().equals(OrderStatusEnum.CREATE_NEW.getCode())) {
            //代付款状态进行关单
            OrderEntity orderUpdate = new OrderEntity();
            orderUpdate.setId(orderInfo.getId());
            orderUpdate.setStatus(OrderStatusEnum.CANCLED.getCode());
            this.updateById(orderUpdate);

            // 发送消息给MQ
            OrderTo orderTo = new OrderTo();
            BeanUtils.copyProperties(orderInfo, orderTo);

            try {
                //TODO 确保每个消息发送成功,给每个消息做好日志记录,(给数据库保存每一个详细信息)保存每个消息的详细信息
                rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other", orderTo);
            } catch (Exception e) {
                //TODO 定期扫描数据库,重新发送失败的消息
            }
        }
    }

三、订单释放和库存释放进行绑定

1、库存服务-库存释放监听器增加订单关单库存释放处理方法
gulimall-ware/xxx/ware/listener/StockReleaseListener.java

package com.atguigu.gulimall.ware.listener;

import com.atguigu.common.to.OrderTo;
import com.atguigu.common.to.mq.StockLockedTo;
/**
 * 库存解锁监听
 *
 * @desc
 * 库存锁定成功发送消息到延时队列 stock.locked(路由key),超时TTL,消息进入私信路由,然后转发到解锁库存的队列。
 *
 * @author: kaiyi
 * @create: 2020-09-16 19:01
 */
@Slf4j
@RabbitListener(queues = "stock.release.stock.queue")
@Service
public class StockReleaseListener {

  @Autowired
  private WareSkuService wareSkuService;

  /**
   * 1、库存自动解锁
   *  下订单成功,库存锁定成功,接下来的业务调用失败,导致订单回滚。之前锁定的库存就要自动解锁
   *
   *  2、订单失败
   *      库存锁定失败
   *
   *   只要解锁库存的消息失败,一定要告诉服务解锁失败
   */
  @RabbitHandler
  public void handleStockLockedRelease(StockLockedTo to, Message message, Channel channel) throws IOException {
    log.info("******收到解锁库存的信息******");
    try {

      //当前消息是否被第二次及以后(重新)派发过来了
      // Boolean redelivered = message.getMessageProperties().getRedelivered();

      //解锁库存
      wareSkuService.unlockStock(to);
      // 手动删除消息
      channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    } catch (Exception e) {
      // 解锁失败 将消息重新放回队列,让别人消费
      channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
    }
  }

  /**
   * 订单关单库存释放
   *
   * @param orderTo
   * @param message
   * @param channel
   * @throws IOException
   */
  @RabbitHandler
  public void handleOrderCloseRelease(OrderTo orderTo, Message message, Channel channel) throws IOException {

    log.info("******收到订单关闭,准备解锁库存的信息******");

    try {
      wareSkuService.unlockStock(orderTo);

      // 手动删除消息
      channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    } catch (Exception e) {
      // 解锁失败 将消息重新放回队列,让别人消费
      channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
    }
  }
}

这个监听器既可以处理库存解锁又可以处理订单关单的处理业务,根据参数来决定具体调用哪一个,这是一个重载。

2、具体解锁库存实现
gulimall-ware/xxx/ware/service/impl/WareSkuServiceImpl.java

 /**
     * 防止订单服务卡顿,导致订单状态消息一直改不了,库存优先到期,查订单状态新建,什么都不处理
     * 导致卡顿的订单,永远都不能解锁库存
     * @param orderTo
     */
    @Transactional(rollbackFor = Exception.class)
    @Override
    public void unlockStock(OrderTo orderTo) {

        String orderSn = orderTo.getOrderSn();
        //查一下最新的库存解锁状态,防止重复解锁库存
        WareOrderTaskEntity orderTaskEntity = wareOrderTaskService.getOrderTaskByOrderSn(orderSn);

        //按照工作单的id找到所有 没有解锁的库存,进行解锁
        Long id = orderTaskEntity.getId();
        List<WareOrderTaskDetailEntity> list = wareOrderTaskDetailService.list(new QueryWrapper<WareOrderTaskDetailEntity>()
            .eq("task_id", id).eq("lock_status", 1));

        for (WareOrderTaskDetailEntity taskDetailEntity : list) {
            unLockStock(taskDetailEntity.getSkuId(),
                taskDetailEntity.getWareId(),
                taskDetailEntity.getSkuNum(),
                taskDetailEntity.getId());
        }

    }

为者常成,行者常至