线程池实现原理及实践

awstan / 2024-11-10 / 原文

线程池的总体设计


ThreadPoolExecutor实现的顶层接口是Executor,顶层接口Executor提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分。

ExecutorService接口增加了一些能力:

(1)扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法;

(2)提供了管控线程池的方法,比如停止线程池的运行。

AbstractExecutorService则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。

ThreadPoolExecutor实现最复杂的运行部分,ThreadPoolExecutor将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。

    public static void main(String[] args) {
        Executor executor = new Executor() {
            @Override
            public void execute(@NotNull Runnable command) {
                System.out.println("1");
                command.run();
            }
        };
        executor.execute(()-> System.out.println("1"));


        Executor e1 = c->{
            c.run();
        };
        e1.execute(()-> System.out.println("1"));
    }

线程池核心参数:

创建线程池:

是常用的几种方式:

1. ThreadPoolExecutor

最灵活的方式,可以通过自定义参数创建线程池。

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    corePoolSize, 
    maximumPoolSize, 
    keepAliveTime, 
    TimeUnit.SECONDS, 
    new LinkedBlockingQueue<Runnable>(),
    Executors.defaultThreadFactory(),
    new ThreadPoolExecutor.AbortPolicy()
);

2. Executors 工具类

Executors 提供了一些便捷的方法来创建常用的线程池类型:

  • 单线程池

    ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
    
  • 固定线程池(指定核心线程数):

    ExecutorService fixedThreadPool = Executors.newFixedThreadPool(nThreads);
    
  • 可缓存线程池(线程数不限制,根据需要创建和回收线程):

    ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
    
  • 定时任务线程池(可以定期执行任务):

    ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(nThreads);
    

线程池线程执行执行机制

  1. 核心线程数未满
    • 如果当前活动线程数少于核心线程数(corePoolSize),线程池会创建新的线程来执行新提交的任务。
  2. 核心线程数满,但阻塞队列未满
    • 如果当前活动线程数已经达到核心线程数,而阻塞队列尚未满,新的任务会被添加到阻塞队列中,等待已有线程来处理。
  3. 阻塞队列满,最大线程数未满
    • 如果当前活动线程数达到核心线程数,且阻塞队列已满,线程池会创建新的线程来执行新提交的任务,前提是活动线程数未达到最大线程数(maximumPoolSize)。
  4. 拒绝策略
    • 如果当前活动线程数已经达到最大线程数,并且阻塞队列也已满,线程池会根据设置的拒绝策略来处理新提交的任务。常见的拒绝策略包括:
      • AbortPolicy:抛出 RejectedExecutionException(默认策略)。
      • CallerRunsPolicy:由调用者线程执行任务,阻塞调用线程。
      • DiscardPolicy:丢弃新提交的任务。
      • DiscardOldestPolicy:丢弃阻塞队列中最旧的任务,并尝试重新提交当前任务。

线程池在业务中的实践:

场景1:快速响应请求

描述:用户发起的实时请求,服务追求响应时间。比如说用户要查看一个商品的信息,那么我们需要将商品维度的一系列信息如商品的价格、优惠、库存、图片等等聚合起来,展示给用户。

分析:这种场景最重要的就是获取最大的响应速度,所以不应该设置队列去缓冲并发任务(new SynchronousQueue()),并且应该尽量调大corepoolsize和maximumpollsize

场景2:快速处理批量任务

描述:离线的大量计算任务,需要快速执行,但不追求立即响应。比如上游服务下发的一些任务。

分析:不需要瞬时的完成,而是关注如何使用有限的资源,尽可能在单位时间内处理更多的任务,也就是吞吐量优先的问题。所以应该设置队列去缓冲并发任务( new LinkedBlockingDeque<>(10000)),调整合适的corePoolSize去设置处理任务的线程数。在这里,设置的线程数过多可能还会引发线程上下文切换频繁的问题,也会降低处理任务的速度,降低吞吐量。

TIPS:

线程池的线程数量没有银弹,真正能并发的线程量=CPU核数,所以要注意当线程数量太大时,上下文的切换也会占用相当的资源

业务中也可以通过配置的方式动态配置线程池