signed

QiShunwang

“诚信为本、客户至上”

异步与线程池

2021/6/3 16:07:56   来源:

文章目录

  • 一、初始化线程的4种方式
  • 二、new ThreadPoolExecutor()七大参数
  • 三、线程池运行任务的流程
    • 1.流程
    • 2.面试题
  • 四、常见的4种线程池
  • 五、CompletableFuture异步编排
    • 1.CompletableFuture创建异步对象
    • 2.计算完成时回调方法(whenComplete、exceptionally)
    • 3.handle方法
    • 4.线程串行化方法
    • 5.两任务组合(都要完成)
    • 6.两任务组合(一个完成)
    • 7.多任务组合
  • 六、商品详情页,代码使用异步编排优化


一、初始化线程的4种方式

  1. 继承Thread类
  2. 实现Runnable接口
  3. 实现Callable接口+FutureTask(可以拿到返回结果,可以进行异常处理)
  4. 线程池
  • 继承Thread类和实现Runnable接口:主进程无法获取线程的运算结果。
  • 实现Callable接口:主进程可以获取线程的运算结果,但是不利于控制服务器种的线程资源,可能导致服务器资源耗尽。
  • 线程池:初始化方式
 //初始化线程池
 ExecutorService executorService = Executors.newFixedThreadPool(3);

ThreadPoolExecutor(int corePoolSize,
                  int maximumPoolSize,
                  long keepAliveTime,
                  TimeUnit unit,
                  BlockingQueue<Runnable> workQueue,
                  ThreadFactory threadFactory,
                  RejectedExecutionHandler handler) 

二、new ThreadPoolExecutor()七大参数

在这里插入图片描述

  1. corePoolSize:线程池种一致保持的线程数量。
  2. maximumPoolSize:线程池种允许最大的线程数。
  3. keepAliveTime:当线程数大于核心线程数的时候,超出核心线程数的线程在最大多长时间没有接到新任务就会终止释放,最终线程池维持在corePoolSize大小。
  4. unit:时间单位。
  5. workQueue:阻塞队列,用来存储等待执行的任务,如果当前对线程的需求超过了corePoolSize大小,就会放在这里等待空闲线程来执行。
  6. threadFactory:创建线程的工厂,比如指定线程名等等。
  7. handler:拒绝策略。如果线程满了,线程池就会使用拒绝策略。

三、线程池运行任务的流程

1.流程

  1. 线程池创建,准备好core数量的核心线程,准备接受任务

  2. 新的任务进来,用core准备好的空闲线程执行。
    1)core满了,就将再来的任务加入到阻塞队列种。空闲的core就会到阻塞队列中获取任务执行。
    2)阻塞队列满了,就直接开新线程执行,最大只能开到max指定的数量。
    3)max都执行好了。max-core数量空闲的线程就会在keepAliveTime指定的时间后自行销毁。最终保持到core的大小
    4)如果线程开打max的数量,还是有新任务进来,就会使用reject指定的拒绝策略进行处理。

  3. 所有的线程创建都是由指定的threadFactory创建。

2.面试题

一个线程池 core=7;max=20;queue=50。那100个并发请求进来是怎么分配的?
先有7个请求能直接得到执行,接下来50个进入阻塞队列,在多开13(20-7)个线程继续执行请求任务。现在总过有7+13+50=70个任务被安排上了。剩余的30个默认执行拒绝策略。

四、常见的4种线程池

  1. newCacheThreadPool:创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,如无可回收,则新建线程。
  2. newFixedThreadPool:创建一个定长线程池,可以控制线程最大并发数,超出的线程回在队列种等待。
  3. newScheduledThreadPool:创建一个定长线程池,支持定时以及周期性任务执行。
  4. newSingleThreadExecutor:创建一个单线程的线程池,只会用唯一一个工作线程来执行任务,保证所有任务按照指定顺序执行。

五、CompletableFuture异步编排

在java8中,新增了一个包含50个方法左右的类,CompletableFuture,提供了非常强大的Future扩展功能,可以帮助简化异步编程的复杂性,提供了函数式编程的能力。可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。CompletableFuture类实现了Future接口,所以还是以通过get()方法阻塞或者轮询的方式获得结果。

1.CompletableFuture创建异步对象

CompletableFuture有四个静态方法来创建异步操作。
在这里插入图片描述

public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

public static CompletableFuture<Void> runAsync(Runnable runnable)

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)

runAsync方法都是没有返回值的,supplyAsync都是可以获取返回结果的。
都可以传入自定义的线程池,否则就用默认的线程池。

2.计算完成时回调方法(whenComplete、exceptionally)

在这里插入图片描述
在这里插入图片描述

public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) 

public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) 

public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor) 

public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)

whenComplete可以处理正常和异常的计算结果,exceptionally处理异常情况。
whenComplete和whenCompleteAsync的区别:

  1. whenComplete:是执行当前任务的线程继续执行whenComplete的任务。
  2. whenCompleteAsync:把执行whenCompleteAsync的任务继续提交给线程池来执行。(可能开启新的线程。)
public static void main(String[] args) throws ExecutionException, InterruptedException {
  CompletableFuture future = CompletableFuture.supplyAsync(new Supplier<Object>() {
      @Override
      public Object get() {
          System.out.println(Thread.currentThread() + " ---- completableFuture");
          int i = 10 / 0;
          return 1024;
      }
  }).whenComplete(new BiConsumer<Object, Throwable>() {
      @Override
      public void accept(Object o, Throwable throwable) {
          System.out.println("-------o = " + o.toString());
          System.out.println("-------throwable = " + throwable);
      }
  }).exceptionally(new Function<Throwable, Object>() {
      @Override
      public Object apply(Throwable throwable) {
          System.out.println("throwable=" + throwable);
          return 666;
      }
  });
  System.out.println(future.get());
}

在这里插入图片描述
在这里插入图片描述

3.handle方法

public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) 

public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)

public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) 

handle方法和complete方法一样,可以对结果进行最后的处理,可以处理异常结果,也可以改变返回值。

4.线程串行化方法

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) 

public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) 

public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) 


public CompletableFuture<Void> thenAccept(Consumer<? super T> action) 

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) 

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor) 


public CompletableFuture<Void> thenRun(Runnable action)

public CompletableFuture<Void> thenRunAsync(Runnable action) 

public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor) 

  1. thenApply方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前线程的返回值。
  2. thenAccept方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。
  3. thenRun方法:只要上面的任务执行完成,就开始执行thenRun,只是处理完任务后,执行thenRun的后续操作。
    带有Async默认是异步执行的。

5.两任务组合(都要完成)

public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn) 

public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)

public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor)


public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action) 

public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action)

public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor) 


public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action)

public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action) 

public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor) 

两个任务必须都完成,触发该任务。
thenCombine:组合两个Future,获取两个future任务的返回结果,并返回当前任务的返回值。
thenAcceptBoth:组合两个Future,获取两个future任务的返回结果,然后处理任务,没有返回值。
runAfterBoth:组合两个Future,不需要获取future的结果,只需要两个future处理完任务后,处理该任务。

public static void main(String[] args) throws ExecutionException, InterruptedException {
  CompletableFuture.supplyAsync(() -> {
      return "hello";
  }).thenApplyAsync(t -> {
      return t + " world!";
  }).thenCombineAsync(CompletableFuture.completedFuture(" CompletableFuture"), (t, u) -> {
      return t + u;
  }).whenComplete((t, u) -> {
      System.out.println(t);
  });
}

在这里插入图片描述

6.两任务组合(一个完成)

public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn) 

public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn) 

public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn,Executor executor) 


public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)

public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action) 

public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action,Executor executor)


public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action) 

public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action) 

public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor)

当两个任务中,任意一个future任务完成的时候,就执行任务。
applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。
acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有返回值。
runAfterEither:两个任务有一个执行完成,不需要获取future的结果,直接处理任务,也没有返回值。

7.多任务组合

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

allOf:等待所有任务执行完成。
anyOf:只要有一个任务完成。

六、商品详情页,代码使用异步编排优化

@Override
public SkuItemVo item(Long skuId) throws ExecutionException, InterruptedException {
  SkuItemVo skuItemVo = new SkuItemVo();
  //sku基本信息获取
  CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> {
      SkuInfoEntity skuInfoEntity = getById(skuId);
      skuItemVo.setInfo(skuInfoEntity);
      return skuInfoEntity;
  }, executor);

  //sku图片信息
  CompletableFuture<Void> imageFuture = CompletableFuture.runAsync(() -> {
      List<SkuImagesEntity> skuImagesEntities = skuImagesService.list(new QueryWrapper<SkuImagesEntity>().eq("sku_id", skuId));
      skuItemVo.setImages(skuImagesEntities);
  },executor);


  //获取spu的销售属性组合
  CompletableFuture<Void> saleAttrFuture = infoFuture.thenAcceptAsync(res -> {
      List<SkuItemSaleAttrVo> saleAttrVos = skuSaleAttrValueService.listSaleAttrs(res.getSpuId());
      skuItemVo.setSaleAttr(saleAttrVos);
  }, executor);

  //获取spu介绍
  CompletableFuture<Void> descFuture = infoFuture.thenAcceptAsync(res -> {
      SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(res.getSpuId());
      skuItemVo.setDesc(spuInfoDescEntity);
  }, executor);

  //获取spu规格参数
  CompletableFuture<Void> baseAttrFuture = infoFuture.thenAcceptAsync(res -> {
      List<SpuItemAttrGroup> spuItemAttrGroupVos=attrGroupService.getProductGroupAttrsBySpuId(
              res.getSpuId(), res.getCatalogId());
      skuItemVo.setGroupAttrs(spuItemAttrGroupVos);
  }, executor);

  //查询当前sku是否参与秒杀
//        CompletableFuture<Void> secKillFuture = CompletableFuture.runAsync(() -> {
//            R skuSeckillInfo = seckillFeignService.getSkuSeckillInfo(skuId);
//            if (skuSeckillInfo.getCode() == 0) {
//                SeckillInfoVo seckillInfoVo = skuSeckillInfo.getData(new TypeReference<SeckillInfoVo>() {});
//                skuItemVo.setSeckillInfoVo(seckillInfoVo);
//            }
//        }, executor);


  //等待所有任务都完成再返回
//        CompletableFuture.allOf(imageFuture,saleAttrFuture,descFuture,baseAttrFuture,secKillFuture).get();

  CompletableFuture.allOf(imageFuture,saleAttrFuture,descFuture,baseAttrFuture).get();
  return skuItemVo;
}