五、消息队列

5.1 异步阻塞队列对秒杀的优化

5.1.1 前言

  前面实现的优惠券秒杀业务流程,从判断库存是否充足,是否有购买资格知道最后于数据库的交互都是线性执行的,但是和数据库的交互会消耗大量时间,降低了效率。

5.1.2 异步思路

  由于与数据库交互浪费时间,因此可以把这部分的代码交给一个新开辟的线程去处理。那么如何取到每一个订单呢?
  在下单前的种种判断逻辑处理比较快,交给主线程处理,若用户拥有购买资格且库存充足,那么生成一个订单ID,并将给订单ID添加到一个阻塞队列里;开辟的子线程时刻监听阻塞队列,一旦取到订单ID就会于数据库交互执行相应的业务代码。
  下图是主线程执行的业务逻辑,其中不包括与数据库打交道的部分。

  与数据库交互的部分:首先开辟一个线程池,线程池的任务就是监听阻塞队列,一旦取到了订单ID就与数据库进行交互。
  由于业务中对多个数据库进行了修改,因此需要添加事务,因此如果直接调用方法又会出现事务失效的问题,因此需要通过获得代理对象,间接调用方法。但是获取代理对象是基于当前线程的,因此无法在开辟的子线程中获得代理对象,需要在主线程中获得代理对象。

5.1.3 效果

  使用非异步方法,在1s内1000个线程的测试下,吞吐率由1000/s提升至1400/s,吞吐率提升了40%;而请求的平均响应时长由400ms缩短至200ms,平均响应时长缩短了100%

5.2 redis消息队列实现异步秒杀

5.2.1 认识消息队列

  消息队列有三个角色组成,分别是:

  • 生产者:发送消息到消息队列
  • 消费者:从消息队列获得消息并处理消息
  • 消息队列:存储和管理消息

  redis实现消息队列有三种方式

  • List:利用LPUSH和RPOP或者RPUSH和LPOP结合实现
    • 优点:
      • 支持数据持久化
      • 可以满足消息的有序性
    • 缺点:
      • 无法避免消息丢失
      • 只支持单消费者
  • PubSub:消费者可以订阅多个channel,生产者相对应channel发送消息后,所有订阅者都能收到消息。
    • 优点:
      • 支持多生产、多消费
    • 缺点:
      • 不支持数据持久化
      • 无法避免消息丢失
      • 消费者缓存区有上限,超出时数据会丢失
  • stream单消费模式:Redis 5.0引入的数据类型,专门为实现消息队列而生
    • 优点:
      • 消息可回溯
      • 支持多消费
      • 可以阻塞读取
    • 缺点:
      • 有消息漏读风险,由于是读取最新的消息,当正在处理一条消息时,有超过1条以上的消息发送到消息队列,此时会漏读消息
  • stream消费者组:将多个消费者划分到同一个组里面,监听同一个队列。
    • 特点:
      • 消息分流:队列中的消息会分配给组内不同的消费者,提高消费效率
      • 消息标示:消费者组内会维护一个消息标示,记录最后一个被处理的消息,确保了消息不会被漏读
      • 消息确认:消费者获取消息后,消息处于pending状态,并存入pending-list中,当消息处理完成后,需要通过ack来确认消息被处理,随后从pending-list移除。
    • 优点:在stream单消费的基础上,还有如下优点
      • 没有消息漏读的风险
      • 有消息确认机制,保证消息至少被消费一次
      • 组内多消费者,提升消费效率

5.2.2 使用stream消费者组实现消息队列的思路

  使用stream消费者组消费消息时,使用’>’来获取消息队列中下一个未消费消息;当该阶段出现异常时,需要使用’0’来获取pending-list中的消息,并处理pending-list中的消息直到结束

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
while (true) {
try {
// 尝试监听,使用阻塞模式,最多2000ms
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(MQ_KEY, ReadOffset.lastConsumed())
);
if (list == null || list.isEmpty()) {
// 队列不存在消息,continue
continue;
}
// 处理消息,处理完记得ack
stringRedisTemplate.opsForStream().acknowledge(MQ_KEY, "g1", record.getId());
} catch (Exception e) {
// 出现异常,处理pending-list中的消息
while (true) {
try {
// 从pending-list中获取消息
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(MQ_KEY, ReadOffset.from("0"))
);

// 判断队列中是否存在消息
if (list == null || list.isEmpty()) {
// 队列不存在消息,continue
break;
}

// 3. 消费消息
// 4. ack
stringRedisTemplate.opsForStream().acknowledge(MQ_KEY, "g1", record.getId());
} catch (Exception ex) {
log.error("消费pending-list消息错误!");
}
}
}
}

六、Redis其他数据类型应用

6.1 用set集合实现用户对商铺点赞的记录

6.1.1 背景

  正常情况下,一个用户对一篇博客只能点赞一次;若用户对一片博客已经点过赞了,则再次点赞会取消点赞。

6.1.2 实现

  使用redis的set集合记录每一篇博客点赞的用户ID,每次可通过该集合判断用户是否已经点过赞。随后根据从redis查询的数据进行后续操作

6.1.3 进一步实现点赞排行榜

  使用redis的zset集合,每一位用户点赞的score值是时间戳,zset会根据score值进行排序,最后呈现的结果为点赞顺序为用户点赞的时间

6.2 使用set集合实现共同关注功能

  针对每一位用户,应用redis的set集合保存其所有关注的用户ID,这样想查询两个用户共同关注的用户时,使用交集即可达成目的。

6.3 使用feed流实现自动接收关注博主文章

6.3.1 基础实现

  每当用户发送一篇文章时,针对每一个粉丝,会向redis中的set集合存入带有时间戳的博客ID,当粉丝用户上线后,会从自己的set集合中取出所有的博客。

6.3.2 实现分页收件箱

  根据前端提供的上一次查询的最后一个时间戳以及偏移量实现分页查询

6.4 使用Geo数据类型实现范围内查找功能

6.5 使用bitMap实现用户签到并统计连续签到天数功能

6.6 使用hyperloglog实现UV统计