余晖落尽暮晚霞,黄昏迟暮远山寻
本站
当前位置:网站首页 > 编程知识 > 正文

秒杀场景下异步下单实现

xiyangw 2023-10-04 18:03 12 浏览 0 评论

背景

异步削峰,目的是削峰,方式是异步。面对瞬时压力,都需要异步削峰,其关键都在于拉长时间线,削平毛刺,最终整体提升吞吐量

核心流程


提交下单任务:用户发出下单请求后,先要获取订单许可,若获取成功才能将下单任务提交给消息队列进行处理;

下单任务轮询:异步处理,不会立刻返回结果。所以用户需要一定频率来轮询处理结果

实现原理

下单许可

所谓下单许可,指的是在秒杀品开始秒杀前,根据秒杀品的库存配置特定比例(可调节) 的下单许可,只有获得下单许可的用户才能提交下单任务。个例子,假如秒杀品A有库存10000个,那么我们可以将秒杀许可设定为10000*1.210000**1.5

  1. 降低不必要的资源竞争与浪费。限制订单任务放入消息队列,如果不设置12000的下单许可,则可能有10,000甚至100,0000的用户将请求提交到任务队列,但是我们只有区区12000的库存,这会给队列和相关计算方造成巨大压力。
  2. 提升用户体验。有了下单许可之后,当许可被抢完的时候,我们即可立即向用户展示“售罄”或“暂无库存”等更为友好且贴近事实的提示,不存在忽悠,是真没有库存了。
  3. 确保所有库存可以卖出。按照1.2或1.5等比例设置高出库存数量的下单许可,就是为了预留一定的Buffer,允许一些无效提交但不会影响整体售卖。

提交下单任务

异步处理过程中,提交下单任务是第一步,注意事项

  • 提交下单任务之前,应通过基础且必要的账号、安全相关的校验
  • 下单要加锁,防止抖动、连续点击等导致用户层面出现重复提交问题
  • 所提交的下单任务,应该具有明确且唯一的编码以便于跟踪,即 placeOrderTaskId,当用户获得提交许可时,应向用户提供placeOrderTaskId用以后续的结果轮询;
  • 同一用户、同一秒杀品,不应出现重复提交(可以通过placeOrderTaskId判断)
  • 下单许可的设计应结合本地缓存+中心化缓存,以降低网络请求负载并提高处理效率;
  • 在处理本地缓存和中心化缓存时,要着重注意过期时间的设置和更新时的锁竞争问题;

生成placeOrderTaskId 代码

通过userIditemId 生成 placeOrderTaskId 可以保证同一用户只能购买一个商品

/**
 * 订单任务id
 *
 * @param userId
 * @param itemId
 * @return
 */
private String generatePlaceOrderTaskId(Long userId, Long itemId) {
	String toEncrypt = userId + "_" + itemId;
	return DigestUtils.md5DigestAsHex(toEncrypt.getBytes());
}
复制代码

生成下单许可代码

结合本地缓存 + 分布式缓存的实现

private final static Cache<Long, Integer> availableOrderTokensLocalCache = CacheBuilder.newBuilder().initialCapacity(20).concurrencyLevel(5).expireAfterWrite(20, TimeUnit.MILLISECONDS).build();

// 从本地获取orderToken
private Integer getAvailableOrderTokens(Long itemId) {
	Integer localAvailableToken = localAvailableTokens.getIfPresent(itemId);
	if (localAvailableToken != null) {
		logger.info("本地缓存命中|{}", itemId);
		return localAvailableToken;
	}

	return refreshLocalAvailableTokens(itemId);
}

// 从本地获取远程的缓存并更新
private synchronized Integer refreshLocalAvailableTokens(Long itemId) {
	// 再次从本地缓存获取
	Integer localAvailableToken = localAvailableTokens.getIfPresent(itemId);
	if (localAvailableToken != null) {
		logger.info("本地缓存命中|{}", itemId);
		return localAvailableToken;
	}

	String goodAvailableTokensKey = getGoodAvailableTokensKey(itemId);
	Integer goodAvailableTokensInRedis = redisCacheService.getInteger(goodAvailableTokensKey);
	if (goodAvailableTokensInRedis != null) {
		logger.info("远程缓存命中|{}", itemId);
		localAvailableTokens.put(itemId, goodAvailableTokensInRedis);
		return goodAvailableTokensInRedis;
	}

	return refreshLatestAvailableTokens(itemId);
}

// 获取最新的库存信息
private Integer refreshLatestAvailableTokens(Long itemId) {
	DistributedLock distributedLock = distributedLockFactoryService.getDistributedLock(getRefreshTokensLockKey(itemId));
	try {

		boolean isSuccessLock = distributedLock.tryLock(500, 1000, TimeUnit.MILLISECONDS);
		if (!isSuccessLock) {
			return null;
		}

		GoodStockCache availableItemStock = goodStockCacheService.getAvailableItemStock(-1L, itemId);
		if (availableItemStock == null || availableItemStock.getAvailableStock() == null || !availableItemStock.isSuccess()) {
			logger.info("库存不存在|{}", itemId);
			return null;
		}

		Integer availableOrderTokens = (int) Math.ceil(availableItemStock.getAvailableStock() * 1.5);
		// 更新远程的下单许可
		redisCacheService.put(getGoodAvailableTokensKey(itemId), availableOrderTokens, 24, TimeUnit.HOURS);
		// 存入本地缓存
		localAvailableTokens.put(itemId, availableOrderTokens);
		return availableOrderTokens;
	} catch (InterruptedException e) {
		logger.info("刷新tokens失败|{}", itemId, e);
		return null;
	}
}
复制代码

扣减下单许可代码

使用 Lua代码扣减 远程缓存中存储的 许可数量(本地缓存保存的时间非常短,基本都会到分布式缓存中读取)

if (redis.call('exists', KEYS[1]) == 1) then
    local availableTokensCount = tonumber(redis.call('get', KEYS[1]))
    if availableTokensCount == 0 then
        return -1
    end
    if availableTokensCount > 0 then
        redis.call('incrby', KEYS[1], -1)
        return 1
    end
end

return -100
复制代码

使用Java代码多次调用Lua脚本

为了保证可用下单许可数量的有效性,我们给下单许可设置了过期时间,这会导致在执行LUA脚本时数据不存在,所以为了应对这种情况,在数据不存在时当前线程会主动尝试刷新数据,然后继续执行LUA脚本。也就是说,当用户抢到了下单许可但是下单失败或取消订单时,系统会定时对数据进行纠正,腾出来空余的许可给后面需要的用户,确保所有库存均可对外销售

// 扣减许可
private boolean takeOrRecoverToken(PlaceOrderTask placeOrderTask) {
	ArrayList<String> keys = new ArrayList<>();
	keys.add(getGoodAvailableTokensKey(placeOrderTask.getItemId()));

	for (int i = 0; i < 3; i++) {
		Long result = redisCacheService.getRedisTemplate().execute(TAKE_ORDER_TOKEN_LUA, keys);
		if (result == null) {
			return false;
		}
		// 没有库存
		if (result == -1L) {
			logger.info("库存为0|{}", JSON.toJSONString(placeOrderTask));
			return false;
		}
		// 数据在缓存中不存在,先去更新
		if (result == -100L) {
			refreshLatestAvailableTokens(placeOrderTask.getItemId());
			continue;
		}
		return result == 1L;
	}
	return false;
}
复制代码

异步处理下单任务

通过异步提交到MQ中的订单,MQ会根据订阅将消息推送给订阅方处理。

  • 异步处理下单任务时,应先确定TOPIC和订阅关系
  • 任务处理结果存储到缓存中,方便客户端轮询;
  • 任务处理成功后,将订单ID返回给客户端,方便查看订单详情;

异步发送消息

使用 RabbitMQ 发送消息

@Override
public boolean post(PlaceOrderTask placeOrderTask) {
	logger.info("投递下单任务|{}", JSON.toJSONString(placeOrderTask));
	if (placeOrderTask == null) {
		logger.info("下单任务参数为空");
		return false;
	}
	String placeOrderTaskString = JSON.toJSONString(placeOrderTask);

	try {
		rabbitTemplate.convertAndSend(MqConfig.SECKILL_EXCHANGE_NAME, MqConfig.SECKILL_ROUTING_KEY, placeOrderTaskString);
		logger.info("OrderTaskPostServiceImpl|任务投递成功|{}", placeOrderTaskString);
		return true;
	} catch (AmqpException e) {
		logger.info("OrderTaskPostServiceImpl|任务投递失败|{}", placeOrderTaskString);
		return false;
	}
}
复制代码

库存扣减

异步下单不存在库存扣减逻辑,由于异步并发可控,将直接在数据库层面进行竞争扣减。任务处理结束后,会将结果写入到缓存中以供查询。

/**
 * 处理下单任务
 *
 * @param placeOrderTask
 */
@Transactional
public void handlePlaceOrderTask(PlaceOrderTask placeOrderTask) {
	Long userId = placeOrderTask.getUserId();

	SeckillGoodResponse seckillGoodResponse = seckillGoodClient
			.getSeckillGood(userId, placeOrderTask.getActivityId(), placeOrderTask.getItemId())
			.getData();
	// 构造 实体类
	SeckillOrder seckillOrder = SeckillOrderBuilder.toDomain(placeOrderTask);
	seckillOrder.setItemTitle(seckillGoodResponse.getItemTitle());
	seckillOrder.setFlashPrice(seckillGoodResponse.getFlashPrice());
	seckillOrder.setUserId(userId);

	StockDeduction stockDeduction = new StockDeduction()
			.setItemId(placeOrderTask.getItemId())
			.setUserId(userId)
			.setQuantity(seckillOrder.getQuantity());
	Long orderId = null;
	try {

		// 正式扣减库存
		boolean isDecreaseStock = goodStockDeductionService.decreaseItemStock(stockDeduction);
		if (!isDecreaseStock) {
			logger.info("正式库存失败|{}, {}", userId, JSON.toJSONString(placeOrderTask));
			return;
		}

		// 创建订单
		logger.info("placeOrder|下单|{},{}", userId, JSON.toJSONString(placeOrderTask));
		seckillOrder.setStatus(SeckillOrderStatus.CREATED.getCode());

		boolean isSuccessSave = seckillOrderMapper.insert(seckillOrder) > 0;
		if (!isSuccessSave) {
			logger.info("订单创建失败|{},{}", userId, JSON.toJSONString(seckillOrder));
			throw new BusinessException(PLACE_ORDER_FAILED);
		}
		orderId = seckillOrder.getId();
		redisCacheService.put(PLACE_ORDER_TASK_ORDER_ID_KEY + placeOrderTask.getPlaceOrderTaskId(), orderId, 24, TimeUnit.HOURS);
		placeOrderTaskService.updateTaskHandleResult(placeOrderTask.getPlaceOrderTaskId(), true);
		logger.info("订单已创建成功|{},{}", userId, JSON.toJSONString(seckillOrder));
	} catch (Exception e) {
		// 扣减成功了才恢复
		logger.error("下单失败|{},{}", userId, JSON.toJSONString(placeOrderTask), e);
		placeOrderTaskService.updateTaskHandleResult(placeOrderTask.getPlaceOrderTaskId(), false);
		throw new BusinessException(ErrorCode.PLACE_ORDER_FAILED);
	}
}
复制代码

客户端轮询结果

用户提交下单任务后,会返回任务ID,随后就可以通过任务ID来查询该任务的结果:

  • 初始提交: SUBMITTED,即尚未处理;
  • 下单成功:SUCCESS,完成处理并成功下单入库;
  • 下单失败:FAILED,完成处理但下单失败;
  • 任务不存在:错误的任务ID或任务缓存已过期被删除。
    /**
     * 获取订单结果
     * @param userId
     * @param itemId
     * @param placeOrderTaskId
     * @return
     */
    public BaseResponse<SeckillOrderMessageResponse> getPlaceOrderResult(Long userId, Long itemId, String placeOrderTaskId) {
        String orderTaskId = generatePlaceOrderTaskId(userId, itemId);
        if (!orderTaskId.equals(placeOrderTaskId)) {
            logger.info("下单ID错误|{}, {}, {}", userId, itemId, placeOrderTaskId);
            return ResultUtils.error(ErrorCode.PLACE_ORDER_TASK_ID_INVALID);
        }

        // 获取订单的状态
        OrderTaskStatus taskStatus = placeOrderTaskService.getTaskStatus(placeOrderTaskId);
        if (taskStatus == null) {
            logger.info("任务状态为空|{}", placeOrderTaskId);
            return ResultUtils.error(ErrorCode.PLACE_ORDER_TASK_ID_INVALID);
        }

        if (!taskStatus.getStatus().equals(OrderTaskStatus.SUCCESS.getStatus())) {
            logger.info("订单任务尚未成功|{}", placeOrderTaskId);
            return ResultUtils.success(SeckillOrderMessageResponse.ok().setCode(taskStatus.getStatus()));
        }
        // 获取订单号
        Long orderId = redisCacheService.getLong(PLACE_ORDER_TASK_ORDER_ID_KEY + placeOrderTaskId);
        if (orderId == null) {
            logger.info("订单id尚未存在|{}, {}, {}", userId, itemId, placeOrderTaskId);
            return null;
        }
        // 封装对象
        SeckillOrderMessageResponse seckillOrderMessageResponse = SeckillOrderMessageResponse.ok().setOrderId(orderId)
                .setPlaceOrderTaskId(orderTaskId)
                .setCode(OrderTaskStatus.SUCCESS.getStatus());
        return ResultUtils.success(seckillOrderMessageResponse);
    }
复制代码

提供接口实现轮询

在接口方面,本次方案仅增加了任务结果轮询接口。需要注意的是,该接口用于轮询结果,且没有复杂计算和数据状态变更,在限流方面的阈值可以相对调高些

/**
 * 获取订单结果
 * @param userId
 * @param itemId
 * @param placeOrderTaskId
 * @return
 */
@GetMapping("/result/{itemId}/{placeOrderTaskId}")
public BaseResponse<SeckillOrderMessageResponse> getPlaceOrderResult(@RequestHeader("TokenInfo") Long userId,
																	 @PathVariable Long itemId,
																	 @PathVariable String placeOrderTaskId) {
	return seckillOrderService.getPlaceOrderResult(userId, itemId, placeOrderTaskId);
}
}
复制代码

总结

单异步削峰的目标,即提升用户体验和提升系统吞吐能力。在核心流程上,我们将异步下单过程拆分为三个环节:提交下单任务处理下单任务轮询下单结果

相关推荐

华为交换机配置命令总结

1、配置文件相关命令[Quidway]displaycurrent-configuration显示当前生效的配置[Quidway]displaysaved-configuration显示fla...

解决账户无法登录的故障
解决账户无法登录的故障

在优化系统时错误地根据网上的提示,将唯一的Administrator账户设置为禁用,导致重启后无法进入系统。类似的故障还有使用组策略限制本地账户登录,导致重启后...

2023-10-11 17:16 xiyangw

S5720交换机登录提示初始密码存在安全风险
S5720交换机登录提示初始密码存在安全风险

问题描述客户每次登录输密码时,提示初始密码不安全,现在客户嫌麻烦想要去掉:Username:huaweiPassword:Warning:Theinitia...

2023-10-11 17:15 xiyangw

Springboot,Mybatis修改登录用户的密码
Springboot,Mybatis修改登录用户的密码

一、Mybatis.xml<updateid="changePassword"parameterType="string...

2023-10-11 17:15 xiyangw

PHP理论知识之沐浴更衣重看PHP基础(二)
PHP理论知识之沐浴更衣重看PHP基础(二)

接上篇,咱们继续讲解PHP基础八、标准PHP组件和框架的数量很多,随之产生的问题就是:单独开发的框架没有考虑到与其他框架的通信。这样对开发者和框架本身都是不利的...

2023-10-11 17:15 xiyangw

新鲜出炉UCloud云主机“数据方舟”评测报告(5)— — 关其城
新鲜出炉UCloud云主机“数据方舟”评测报告(5)— — 关其城

2015年10月29日,UCloud云主机黑科技——“数据方舟”功能正式上线,首轮内测随即开放。截止至2015年12月6日,我们共收到了534位用户的评测申...

2023-10-11 17:14 xiyangw

业余无线电Q简语及英文缩语
业余无线电Q简语及英文缩语

Q简语:语音通信及CW通信通用(加粗为常用)QRA电台何台QRB电台间之距离QRG告之正确频率QRH频率是否变动QRI发送音调QRJ能否收到QRK信号之可...

2023-10-11 17:14 xiyangw

非常详细!如何理解表格存储的多版本、生命周期和有效版本偏差
非常详细!如何理解表格存储的多版本、生命周期和有效版本偏差

表格存储在8月份推出了容量型实例,直接支持了表级别最大版本号和生命周期,高性能实例也将会在9月中旬支持这两个特性。那么,最大版本号和生命周期以及特有的...

2023-10-11 17:14 xiyangw

H3C交换机恢复出厂和各种基本配置,这20个要点你知道吗?
H3C交换机恢复出厂和各种基本配置,这20个要点你知道吗?

私信“干货”二字,即可领取138G伺服与机器人专属及电控资料!H3C交换机不知道密码如何恢复出厂设置1、开机启动,Ctrl+B进入bootrom菜单,选择恢复出...

2023-10-11 17:13 xiyangw

在使用移动支付系统的时候如何保护信息安全?

移动支付的方式近年来不断被更新,使得Venmo(据嘉丰瑞德理财师了解,此为美国的“支付宝”)之类的支付方式已经可以某种意义上代替随身携带现金了。但是你必须防范那些第三方应用程序轻松地获取你的银行卡以及...

界面控件DevExpress WinForms MVVM入门指南——登录表单(下)

从本文档中,您将了解如何向应用程序添加登录表单。在本节教程中着重讨论了如何实现此任务,这基本上是附加应用程序功能的一部分。DevExpressUniversalSubscription官方最新版免...

linux基础命令(一)
linux基础命令(一)

为啥要学linux?您可能熟悉WindowsXP、Windows7、Windows10和MacOSX等操作系统。Linux就是这样一种强大的操...

2023-10-11 17:13 xiyangw

MySQL数据库密码忘记了,怎么办?

#头条创作挑战赛#MySQL数据库密码忘记了且没有其他可以修改账号密码的账户时怎么办呢?登录MySQL,密码输入错误/*密码错误,报如下错误*/[root@TESTDB~]#mysql-u...

MobaXterm忘记Session密码,如何查看已保存的密码
MobaXterm忘记Session密码,如何查看已保存的密码

MobaXterm工具登录过SSH终端后,如果存储了Session(存储后再连接ssh的时候只需要输入账号不需要输入密码就可以直接连接上ssh),则可以...

2023-10-11 17:12 xiyangw

华为交换机密码丢失修改方法
华为交换机密码丢失修改方法

华为S2300交换机找回密码设置一、目的交换机的console和telnet密码丢失,无法登录设备。交换机已进行过数据配置,要把密码恢复而数据配置不能丢失。二、...

2023-10-11 17:12 xiyangw

取消回复欢迎 发表评论: