背景介绍
我们的业务需求是批量处理7702条数据,调用抖音开放平台的接口进行广告主的批量操作。最初,我们采用的是同步方式处理数据,导致处理时间过长,用户体验极差。为了提升效率,我们决定引入多线程和异步任务。
第一步:发现问题 —— 同步处理太慢
初始代码示例
for (Item item : list) {
engineService.blockListBatch(...); // 一个一个调,太慢!
}这种同步处理方式显然无法满足实时性要求,我们需要一种更高效的方式来处理这些任务。
第二步:引入异步 —— @Async 启动异步线程池
为了提高处理速度,我们首先引入了Spring的@Async注解,将其配置为异步执行:
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import javax.annotation.PostConstruct;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
private ThreadPoolTaskExecutor executor;
@PostConstruct
public void init() {
executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("async-block-");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
}
@Override
public Executor getAsyncExecutor() {
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex, method, params) -> System.out.println("异步任务异常 in " + method.getName() + ": " + ex.getMessage());
}
// 添加 getter,供其他类复用此线程池
public Executor getExecutor() {
return this.executor;
}
}同时,在需要异步执行的方法上添加@Async注解:
@Async
@Transactional
public void blockListBatch(...) { ... }这样做的好处是,方法一调用,立刻返回,不阻塞主线程,任务交给后台线程池执行,大大提升了用户体验。
第三步:并发爆炸 —— “抖音接口报 40110,被限流了!”
虽然异步处理提高了速度,但由于没有对请求频率进行控制,导致我们频繁触发抖音接口的QPS限流(每秒请求数),收到错误码40110。
解决方案:引入限流器
为了避免这种情况,我们引入了Guava的RateLimiter来进行限流控制:
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.1.3-jre</version>
</dependency>private static final RateLimiter rateLimiter = RateLimiter.create(20.0); // 20 QPS
CompletableFuture.runAsync(() -> {
rateLimiter.acquire(); // 获取令牌
BlockAdvertiserResult result = engineService.blockListBatch(batchId, item.getAppId(), item.getAdvertiserId());
// 其他逻辑...
}, getAsyncExecutor());这样,我们可以保证每秒最多发出20个请求,避免触发抖音接口的限流机制。
第四步:滑动窗口 vs 令牌桶 —— 算法选择
尽管RateLimiter允许短时突发,但在我们的实际场景中,由于任务是逐步提交的,并不会出现瞬间大量请求的情况,因此RateLimiter仍然适用。
最终实现
@Async
public void blockListBatch(...) {
List<CompletableFuture<Void>> futures = list.stream()
.map(item -> CompletableFuture.runAsync(() -> {
rateLimiter.acquire(); // 获取令牌
engineService.blockListBatch(...); // 调用接口
}, getAsyncExecutor()))
.collect(Collectors.toList());
// 等待所有任务完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}// 获取当前使用的异步执行器(复用 AsyncConfig 中的逻辑)
private Executor getAsyncExecutor() {
return asyncConfig.getExecutor();
}
默认评论
Halo系统提供的评论