CHANSHIYU
GITHUBZERO
  • README
  • 時雨
    • 2017
      • 01 网站动态标题的两种方式
      • 02 RN App 外部唤醒踩坑记
    • 2018
      • 01 不一样の烟火
      • 02 Python 之禅
      • 03 Python 文件操作
    • 2019
      • 01 Aurora 食用指南
      • 02 Godaddy 域名找回记事
      • 03 一个接口的诞生
      • 04 SpringMVC 前后端传参协调
      • 05 主题集成友链访问统计
      • 06 Github Style 博客主题
      • 07 字符编码の小常识
      • 08 WSL 安装 Docker 实录
      • 09 Eriri comic reader
      • 10 Aurora 2.0
      • 11 jsDelivr 全站托管
      • 12 两年工作台变迁史
      • 13 春物
      • 14 一种优雅の笔记方式
    • 2020
      • 01 Telegram 电报机器人
      • 02 她的眼里有星辰
      • 03 文心雕龙
      • 04 软萌木子の有趣笔谈
      • 05 Telegram RSS 订阅频道
      • 06 水月雨银色飞船
      • 07 五年前旧照
    • 2021
      • 01 春宵苦短 2020
      • 02 风花
    • 2022
      • 01 小城新貌
      • 02 原神满级纪念
    • 2023
      • 01 2022 逆旅
      • 02 半透明背景图实现
      • 03 新年攒台海景房
  • 前端
    • JavaScript
      • 01 JavaScript 秘密花园
      • 02 JavaScript 正则技巧
      • 03 从浏览器解析 JS 运行机制
      • 04 Canvas 基础用法
      • 05 Blob Url And Data Url
      • 06 函数节流与函数防抖
      • 07 排序算法初探
      • 08 洗牌算法实现数组乱序
      • 09 正则匹配 match 和 exec
      • 10 正则匹配汉字
      • 11 JSX.Element vs ReactElement
      • 12 可选链与空值合并
      • 13 TypeScript 编码规范
      • 14 Typescript 中 interface 和 type 区别
      • 15 TypeScript 高级类型
      • 16 TypeScript 关键字
      • 17 TypeScript 映射类型
    • CSS
      • 01 Flex 弹性布局
      • 02 Position 定位
      • 03 CSS 逻辑属性
    • Node
      • 01 Node Tips
      • 02 七天学会 NodeJS
    • Note
      • 01 Note
      • 02 Code
      • 03 Snippets
      • 04 Git
    • React
      • 01 React Props Children 传值
      • 02 Use a Render Prop!
      • 03 React Hook
      • 04 React Hook 定时器
      • 05 Fetch data with React Hooks
      • 06 React 和 Vue 中 key 的作用
      • 07 useCallback 的正确使用方式
      • 08 useLayoutEffect 和 useEffect 的区别
      • 09 forwardRef 逃生舱
      • 10 React 条件渲染
    • Vue
      • 01 Vue Tips
      • 02 Vue 构建项目写入配置文件
      • 03 Vue 项目引入 SVG 图标
      • 04 Vue 一键导出 PDF
      • 05 动态可响应对象
      • 06 Vue 引入 SCSS
      • 07 Vue 路由权限控制
    • 实战系列
      • 01 WebSocket 心跳重连机制
      • 02 图片加解密二三事
      • 03 优雅实现 BackTop
      • 04 动态加载 JS 文件
      • 05 常用 DOM 方法比较
      • 06 AbortController 中断 fetch
      • 07 计算字符所占字节数
      • 08 Axios 自定义返回值类型
  • 后端
    • Java
      • 01 面向对象基本特征与原则
      • 02 Java 数据类型
      • 03 Java String
      • 04 Java 只有值传递
      • 05 Java final 与 static
      • 06 Java Object 通用方法
      • 07 Java 继承
      • 08 Java 反射
      • 09 Java 异常
      • 10 Java 容器
      • 11 Java 虚拟机
      • 12 Java IO
      • 13 Java HashMap
      • 14 Java List
      • 15 Java Stream
      • 16 Java 枚举
      • 17 Java 日期与时间
      • 18 Java fail fast
      • 19 Java BiFunction 和 BinaryOperator
    • 并发编程
      • 01 Java 并发
      • 02 synchronized
      • 03 volatile
      • 04 ReentrantLock
      • 05 ReadWriteLock
      • 06 StampedLock
      • 07 CompletableFuture
      • 08 ForkJoin
      • 09 ThreadLocal
      • 10 CountDownLatch
      • 11 ThreadPoolExecutor
      • 12 ExecutorService
      • 13 Atom 原子类
      • 14 BlockingQueue
    • 高效编程
      • 01 30 seconds of java8
      • 02 函数式替代 for 循环
      • 03 Java 字符串拼接
      • 04 单例模式的几种实现
      • 05 HashMap 排序
    • 理论概念
      • 01 Java Servlet
      • 02 Java 服务端分层模型
      • 03 经典排序算法
      • 04 LRU 缓存淘汰算法
      • 05 BloomFilter 判断元素存在
      • 06 Java HashMap 面试大全
      • 07 HTTP 状态码详解
      • 08 Cookie 和 Session
      • 09 基于消息队列的分布式事务解决方案
      • 10 微服务之所见
    • 实战系列
      • 01 AES CBC 加解密
      • 02 Magic 魔数获取文件类型
      • 03 获取请求 IP 地址
      • 04 Kaptcha 与数学公式验证码
      • 05 Netty 获取客户端 IP.md
      • 06 高性能无锁队列 Disruptor.md
      • 07 前后端接入阿里云盾
    • Linux
      • 01 Linux 文件权限系统
      • 02 Linux 常用软件安装
      • 03 CentOS 防火墙
    • MySQL
      • 01 MySQL
      • 02 SQL 语句 where 1=1
      • 03 truncate 和 delete
      • 04 事务
      • 05 关系模型
      • 06 Mybatis
      • 07 MySQL 查看数据库表详情
    • Nginx
      • 01 Nginx 指北
      • 02 nginx gzip 压缩
    • Note
      • 01 Vagrant
      • 02 Docker
      • 03 Lombok
      • 04 Swagger
      • 05 Redis
    • Spring
      • 01 Spring Boot
      • 02 Spring Validation
      • 03 Spring Data
      • 04 Spring 容器
      • 05 Spring AOP
      • 06 Spring Transactional 注解
      • 07 Spring Cloud Netflix
      • 08 Spring Cloud Alibaba
      • 09 Spring Security oAuth2
      • 10 Spring Boot 跨域解决方式
      • 11 Spring Boot 请求拦截
      • 12 Spring Boot 异步编程
      • 13 Spring Boot 定时任务
      • 14 Spring Boot 管理 bean
      • 15 Mybatis 逆向代码生成
      • 16 JWT
      • 17 JPA
      • 18 Apache Shiro
      • 19 Spring 异步请求
  • 书斋
    • ES6 标准入门
      • 01 变量声明与解构赋值
      • 02 语法的扩展
      • 03 数据类型与数据结构
      • 04 Proxy 和 Reflect
      • 05 异步编程 Promise
      • 06 Iterator 和 for of 循环
      • 07 Generator 函数
      • 08 Async 函数
      • 09 Class 类
    • JavaScript 设计模式
      • 01 基础知识
      • 02 设计模式(上)
      • 03 设计模式(下)
      • 04 设计原则和编程技巧
  • 纸函
    • 01 Interview
    • 02 Ceph
    • 03 动态规划
    • 04 Document.designMode
    • 2023-01-10
  • 万藏
    • 文档
      • 01 Git 文档
      • 02 Linux 命令大全
      • 03 七天学会 NodeJS
      • 04 Algorithms
    • 工具
      • 01 Nginx Config
      • 02 ProcessOn
      • 03 Flat Icon
      • 04 Regexper
      • 05 TempMail
      • 06 Carbon
由 GitBook 提供支持
在本页
  • 核心构造方法
  • 简单线程沲实现
  • 最佳实践
  • DEMO
  • 可暂停恢复的线程池

这有帮助吗?

  1. 后端
  2. 并发编程

11 ThreadPoolExecutor

上一页10 CountDownLatch下一页12 ExecutorService

最后更新于1年前

这有帮助吗?

本文为个人学习摘要笔记。 原文地址:

ThreadPoolExecutor 作为 java.util.concurrent 包对外提供基础实现,以内部线程池的形式对外提供管理任务执行、线程调度、线程池管理等等服务。

ThreadPoolExecutor 是一个可被继承的线程池实现,包含了用于微调的许多参数和钩子。Executors 方法提供的线程服务,都是通过参数设置来实现不同的线程池机制。

核心构造方法

public ThreadPoolExecutor(
    int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler)

参数说明:

corePoolSize 核心线程数量

  • 即使没有任务执行,核心线程也会一直存活;

  • 线程数小于核心线程时,即使有空闲线程,线程池也会创建新线程执行任务;

  • 设置 allowCoreThreadTimeout=true 时,核心线程会超时关闭。

maximumPoolSize 最大线程数

  • 当所有核心线程都在执行任务,且任务队列已满时,线程池会创建新线程执行任务;

  • 当线程数等于 maxPoolSize,且任务队列已满,此时添加任务时会触发 RejectedExecutionHandler 进行处理。

keepAliveTime TimeUnit 线程空闲时间

  • 如果线程数大于 corePoolSize,且有线程空闲时间达到 keepAliveTime 时,线程会销毁,直到线程数量等于 corePoolSize;

  • 如果设置 allowCoreThreadTimeout=true 时,核心线程执行完任务也会销毁直到数量为 0。

workQueue 任务队列

  • ArrayBlockingQueue 有界队列,需要指定队列大小;

  • LinkedBlockingQueue 若指定大小则和 ArrayBlockingQueue 类似,若不指定大小则默认能存储 Integer.MAX_VALUE 个任务,相当于无界队列,此时 maximumPoolSize 值其实是无意义的;

  • SynchronousQueue 同步阻塞队列,当有任务添加进来后,必须有线程从队列中取出,当前线程才会被释放,相当于一个没有容量的队列,newCachedThreadPool 就使用这种队列。

ThreadFactory 创建线程的工厂

通过他可以创建线程时做一些想做的事,比如自定义线程名称。

RejectedExecutionHandler

线程数和队列都满的情况下,对新添加的任务的处理方式:

  • AbortPolicy 直接抛出异常

  • CallerRunsPolicy 直接调用新添加 runnable.run 函数执行任务

  • DiscardPolicy 直接抛弃任务,什么也不干

  • DiscardOldestPolicy 抛弃队列中最先加入的任务,然后再添加新任务

下面是自定义实现,相当于 DiscardPolicy,只打印异常信息:

private static class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
    private CustomRejectedExecutionHandler() {
    }

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        Log.e("umeweb", "Task " + r.toString() + " rejected from " + e.toString());
    }
}

简单线程沲实现

private final ThreadPoolExecutor mExecutor;

    private ThreadPoolManager() {
        final int cpu = Runtime.getRuntime().availableProcessors();
        final int corePoolSize = cpu + 1;
        final int maximumPoolSize = cpu * 2 + 1;
        final long keepAliveTime = 1L;
        final TimeUnit timeUnit = TimeUnit.SECONDS;
        final int maxQueueNum = 128;

        mExecutor = new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                timeUnit,
                new LinkedBlockingQueue<Runnable>(maxQueueNum),
                new CustomThreadFactory(),
                new CustomRejectedExecutionHandler());
    }

    public void executor(@NonNull Runnable runnable) {
        mExecutor.execute(runnable);
    }
}

最佳实践

阿里巴巴 Java 开发手册中强制规定:【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这 样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

说明:Executors 返回的线程池对象的弊端如下:

  1. FixedThreadPool 和 SingleThreadPool:允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM;

  2. CachedThreadPool 和 ScheduledThreadPool:允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

DEMO

可暂停恢复的线程池

/**
 * 描述:     演示每个任务执行前后放钩子函数
 */
public class PauseableThreadPool extends ThreadPoolExecutor {
    private final ReentrantLock lock = new ReentrantLock();
    private Condition unpaused = lock.newCondition();
    private boolean isPaused;

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit,
            BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit, BlockingQueue<Runnable> workQueue,
            ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit, BlockingQueue<Runnable> workQueue,
            RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit, BlockingQueue<Runnable> workQueue,
            ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory,
                handler);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        lock.lock();
        try {
            while (isPaused) {
                unpaused.await();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    private void pause() {
        lock.lock();
        try {
            isPaused = true;
        } finally {
            lock.unlock();
        }
    }

    public void resume() {
        lock.lock();
        try {
            isPaused = false;
            unpaused.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
        Runnable runnable = () -> {
            System.out.println("我被执行");
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        for (int i = 0; i < 10000; i++) {
            pauseableThreadPool.execute(runnable);
        }
        Thread.sleep(1500);
        pauseableThreadPool.pause();
        System.out.println("线程池被暂停了");
        Thread.sleep(1500);
        pauseableThreadPool.resume();
        System.out.println("线程池被恢复了");
    }
}
ThreadPoolExecutor 参数解析
关系