使用Rqueue框架基于Redis和Spring Boot执行异步任务 -sonus21

作者: 解道  更新时间:2021-05-20 10:38:00  原文链接


在本文中,我们将学习如何使用Spring Boot 2.x和Redis执行异步任务,最后的代码演示了本文中描述的步骤。

一个典型的API调用包括五件事:

  1. 执行一个或多个数据库(RDBMS / NoSQL)查询。
  2. 在某些缓存系统(内存中,分布式等)上的一项或多项操作。
  3. 一些计算(可能是做一些数学运算的一些数据处理)。
  4. 调用其他服务(内部/外部)。
  5. 安排一个或多个任务在以后或立即在后台执行。

出于多种原因,可以在以后的某个时间安排任务。例如,必须在订单创建或装运后7天生成发票。同样,不需要立即发送电子邮件通知,因此我们可以延迟它们。

考虑到这些实际示例,有时,我们需要异步执行任务以减少API响应时间。例如,我们收到一个立即删除1K +记录的请求,如果我们在同一API调用中删除所有这些记录,那么肯定会增加API响应时间。为了减少API响应时间,我们可以在后台运行一个任务,以删除这些记录。

Cron计划缺点

每当我们计划任务以给定时间或特定间隔运行时,我们就会使用计划在特定时间或间隔进行的cron作业。我们可以使用UNIX风格的crontabs, Chronos 等其他工具来运行计划任务。如果我们使用的是Spring框架,则可以使用现成的Scheduled注释。 

大多数cron作业会查找何时需要采取特定措施的记录,例如,在7天后查找所有发货以及未生成发票的记录。这些调度机制中的大多数都存在缩放问题,因为我们在其中扫描数据库以查找相关的行/记录。

在许多情况下,这会导致 全表扫描 表现很差,想象一下实时应用程序和此批处理系统使用相同数据库的情况。

由于它不可扩展,因此我们需要一些可扩展的系统,该系统可以在给定的时间或间隔执行任务,而不会出现任何性能问题。

有许多以这种方式扩展的方法,例如以批处理方式运行任务或在用户/区域的特定子集上运行任务。另一种方法是在给定时间运行特定任务,而不依赖于其他任务,例如无服务器功能。一个延迟队列可以的情况下,一旦定时器达到预定时间的作业将被触发使用。有许多可用的 排队 系统/软件,但很少有提供此功能的系统,例如 SQS 它提供15分钟的延迟,而不是7个小时或7天之类的任意延迟。

Rqueue框架

Rqueue 是为Spring框架构建的消息代理,将数据存储在Redis中,并提供了一种在任意延迟下执行任务的机制。由于Redis与其他广泛使用的排队系统(例如Kafka或SQS)相比,具有一些优势,因此Rqueue得到了Redis的支持。在大多数Web应用程序的后端中,Redis用于存储缓存的数据或其他目的。在当今世界上,有 8.4%  的Web应用程序正在使用Redis数据库。

通常,对于队列,我们​​使用Kafka,SQS或其他一些系统。这些系统带来了不同维度的额外开销,例如,使用Rqueue和Redis可以将金钱减少为零。

除了成本外,如果我们使用Kafka,那么我们需要进行基础架构设置,维护,即需要更多操作,因为大多数应用程序已经在使用Redis,因此我们不会有操作开销。实际上,相同的Redis服务器/群集可与Rqueue一起使用,因为 Rqueue支持任意延迟。

这篇文章的完整代码可以在我的 GitHub repo中 找到。 

消息传递

Rqueue保证至少一次发送消息,因为长时间的数据不会在数据库中丢失。您可以在这里阅读更多有关此内容: Rqueue简介

我们将需要的工具:

  • Any IDE
  • Gradle 
  • Java
  • Redis 

依赖:

  1. Spring Data Redis
  2. Spring Web
  3. Lombok 

我们将使用 Rqueue 库以任意延迟执行任何任务。Rqueue是基于Spring的异步任务执行器,可以在任何延迟下执行任务。它是由Spring消息传递库构建的,并由Redis支持。

我们将添加Rqueue Spring Boot starter 2.7.0依赖项:

dependencies {

implementation 'org.springframework.boot:spring-boot-starter-data-redis'

implementation 'org.springframework.boot:spring-boot-starter-web'

implementation 'com.github.sonus21:rqueue-spring-boot-starter:2.0.0-RELEASE'

compileOnly 'org.projectlombok:lombok'

annotationProcessor 'org.projectlombok:lombok'

providedRuntime 'org.springframework.boot:spring-boot-starter-tomcat'

testImplementation('org.springframework.boot:spring-boot-starter-test') {

exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'

}

}

出于测试目的,我们将启用Spring Web MVC功能,以便我们可以发送测试请求。

创建任务

使用Rqueue添加任务非常简单,我们只需要使用RqueueListener注释一个方法即可。RqueuListener批注具有多个可以根据用例设置的字段,例如,设置deadLetterQueue可以将任务推送到另一个队列,否则在失败时将丢弃该任务。我们还可以使用numRetries字段设置任务应重试多少次。

创建一个Java文件名MessageListener并添加一些方法来执行任务。

import com.github.sonus21.rqueue.annotation.RqueueListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MessageListener {

  @RqueueListener(value = "${email.queue.name}")
  public void sendEmail(Email email) {
    log.info("Email {}", email);
  }

  @RqueueListener(value = "${invoice.queue.name}")
  public void generateInvoice(Invoice invoice) {
    log.info("Invoice {}", invoice);
  }
}

任务提交

可以使用RqueueMessageEnqueuer bean提交任务。 它有多种方法可以根据用例排队任务,例如重试使用,重试计数和延迟任务的延迟。

我们需要AutoWire RqueueMessageEnqueuer或使用构造函数来注入此bean。

创建用于测试目的的控制器:

我们将计划在接下来的30秒内完成发票生成,为此,我们将提交一个延迟30000(毫秒)的任务。另外,我们将尝试发送将在后台完成的电子邮件。为此,我们将添加两个GET方法sendEmail和generateInvoice,我们也可以使用POST。

@RestController
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@Slf4j
public class Controller {
  private @NonNull RqueueMessageEnqueuer rqueueMessageEnqueuer;

  @Value("${email.queue.name}")
  private String emailQueueName;

  @Value("${invoice.queue.name}")
  private String invoiceQueueName;

  @Value("${invoice.queue.delay}")
  private Long invoiceDelay;

  @GetMapping("email")
  public String sendEmail(
      @RequestParam String email, @RequestParam String subject, @RequestParam String content) {
    log.info("Sending email");
    rqueueMessageEnqueuer.enqueue(emailQueueName, new Email(email, subject, content));
    return "Please check your inbox!";
  }

  @GetMapping("invoice")
  public String generateInvoice(@RequestParam String id, @RequestParam String type) {
    log.info("Generate invoice");
    rqueueMessageEnqueuer.enqueueIn(invoiceQueueName, new Invoice(id, type), invoiceDelay);
    return "Invoice would be generated in " + invoiceDelay + " milliseconds";
  }
}

application.properties:

email.queue.name=email-queue
invoice.queue.name=invoice-queue
# 30 seconds delay for invoice
invoice.queue.delay=300000

运行测: http://localhost:8080/email?email=xample@exampl.com&subject=%22test%20email%22&content=%22testing%20email%22

30秒后发票:

http://localhost:8080/invoice?id=INV-1234&type=PROFORMA

总之,我们可以使用Rqueue调度任务,而无需花费很多锅炉代码。在配置和使用Rqueue库时,我们需要考虑一些事项。重要的一项是任务是否是延迟的任务。默认情况下,假定任务需要尽快执行。

完整的代码可以在我的Github帐户中找到 https://github.com/sonus21/rqueue-task-exector

Rqueue库代码: https://github.com/sonus21/rqueue