书接上回,在上一章的结尾,我跟大家简单讨论了一下 ThreadPoolExecutor 的扩展点,当然,具体的扩展方法我并没有讲解,只是提出了几个功能。比如可以通过线程池的扩展点得到线程池执行每一个任务的耗时;或者得到线程池拒绝过的任务的数量等等。这一章,我就为大家一一实现这些功能,在实践中为大家展示 ThreadPoolExecutor 中可扩展的方法。

实现计算线程池执行任务耗时功能

不知道大家有没有这样一种感觉:很多听起来比较高深,甚至令人生畏的概念或者名词,其背后的本质往往都特别简单;那些令人惊叹的技术,其背后的原理可能连初学者都觉得没有难度。我自己常常有这种感觉,尤其是前段时间整理响应式系列课程的代码时,很多代码表面上看上去非常神奇,奇妙的功能令人不可思议,但抽丝剥茧,解开一层层神秘的面纱后,我发现功能得以实现的方式非常简单。也许响应式本身的概念有点复杂和晦涩,但是当我看完代码之后,我相信我可以让任何一个编程初学者(掌握了并发编程就算初学者了,当然,长得再漂亮一点更有利于学习)都能轻而易举地理解响应式,并且开发一个自己的响应式框架。好了,话题扯远了,我在这里之所以啰嗦了一会,是因为想到我们即将实现的这个功能,如果这个功能放到动态线程池框架中,我们完全可以给这个功能取一个高深的名字,比如就叫做:线程池执性能监测功能。但真正实现起来是怎么样的呢?很简单,就是在线程池执行每一个任务之前得到任务开始执行的时间,任务执行完毕之后得到当前时间,然后让当前时间减去任务开始执行的时间,这样就得到了线程池执行任务的耗时。接下来就更简单了,如果耗时较短,那线程池的工作性能就不错,如果耗时较长,那性能就一般般。当然这个性能和线程池本身的关系可能不大,在线程池线程数量合适的情况下,程序的性能反应的是任务本身是否合理。

现在我们已经明确了计算线程池执行任务耗时的方式,接下来就应该在代码层面实现一下了。不过在真正开始实现之前请大家想一下,计算线程池执行任务耗时无非就是用执行结束的时间减去执行之前的时间,那这个操作应该定义在哪里?要定义在任务本身中吗?就像下面代码块展示的这样。

/***@课程描述:从零带你写框架系列中的课程,整个系列包含netty,xxl-job,rocketmq,nacos,sofajraft,spring,springboot,disruptor,编译器,虚拟机等等。*@author:陈清风扬,个人微信号:chenqingfengyangjj。*@date:2024/4/27*@方法描述:扩展线程池类,这个类为jdk的原生线程池提供了非常多的扩展点,基本上每一个重要操作都提供了拓展点*/publicclassExtensibleThreadPoolExecutorextendsThreadPoolExecutor{//插件管理器对象,当前线程池用到的所有插件都会注册到这个管理器中@GetterprivatefinalThreadPoolPluginManagerthreadPoolPluginManager;//构造方法publicExtensibleThreadPoolExecutor(ThreadPoolPluginManagerthreadPoolPluginManager,intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,TimeUnitunit,BlockingQueue<Runnable>workQueue,ThreadFactorythreadFactory,RejectedExecutionHandlerhandler){super(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler);this.threadPoolPluginManager=threadPoolPluginManager;}//该方法会在线程执行任务之前被调用@OverrideprotectedvoidbeforeExecute(Threadthread,Runnablerunnable){//这里从插件处理器中得到了ExecuteAwarePlugin类型的所有插件Collection<ExecuteAwarePlugin>executeAwarePluginList=threadPoolPluginManager.getExecuteAwarePluginList();//执行了插件对象中的beforeExecute方法executeAwarePluginList.forEach(aware->aware.beforeExecute(thread,runnable));}//该方法会在任务执行完毕之后被调用@OverrideprotectedvoidafterExecute(Runnablerunnable,Throwablethrowable){//这里从插件处理器中再次得到了ExecuteAwarePlugin类型的所有插件Collection<ExecuteAwarePlugin>executeAwarePluginList=threadPoolPluginManager.getExecuteAwarePluginList();//执行插件对象中的afterExecute方法executeAwarePluginList.forEach(aware->aware.afterExecute(runnable,throwable));}//该方法会在线程池被关闭的时候调用@Overridepublicvoidshutdown(){//得到ShutdownAwarePlugin类型的插件Collection<ShutdownAwarePlugin>shutdownAwarePluginList=threadPoolPluginManager.getShutdownAwarePluginList();//执行插件中的前置扩展方法shutdownAwarePluginList.forEach(aware->aware.beforeShutdown(this));//执行目标方法super.shutdown();//执行插件中的后置扩展方法shutdownAwarePluginList.forEach(aware->aware.afterShutdown(this,Collections.emptyList()));}//同上,只不过这个方法是立即关闭线程池@OverridepublicList<Runnable>shutdownNow(){Collection<ShutdownAwarePlugin>shutdownAwarePluginList=threadPoolPluginManager.getShutdownAwarePluginList();//执行插件中的前置方法shutdownAwarePluginList.forEach(aware->aware.beforeShutdown(this));//这里执行的就是立即停止线程池工作的方法,该方法会把还未执行的任务封装到list中返回给用户List<Runnable>tasks=super.shutdownNow();//执行插件中的后置扩展方法shutdownAwarePluginList.forEach(aware->aware.afterShutdown(this,tasks));returntasks;}//这个也是线程池中的一个拓展方法,在ThreadPoolExecutor中并没有实现,用户可以自己实现这个方法//该方法会在线程池转变为TERMINATED状态时被调用@Overrideprotectedvoidterminated(){super.terminated();Collection<ShutdownAwarePlugin>shutdownAwarePluginList=threadPoolPluginManager.getShutdownAwarePluginList();shutdownAwarePluginList.forEach(aware->aware.afterTerminated(this));}}

上面代码块中的逻辑可以说是一目了然,注释也很详细,我就不再解释了。总之,看到这里,我就为大家把插件体系展示得比较完整了。当然,这里我还要多解释一句,我在文章中展示的 ExtensibleThreadPoolExecutor 类的代码并不完整,比如对 execute 方法、newTaskFor 方法、拒绝策略功能扩展等等都没有展示。在我为大家提供的第一版本代码中,ExtensibleThreadPoolExecutor 类中的代码是完整的,和源码一致。我之所以没有在文章中把 ExtensibleThreadPoolExecutor 类中的代码完全展示出来,是因为我已经讲完了插件体系的核心知识,大家凭借这些知识足以去我提供的代码或者源码中自行掌握剩余的几个功能扩展方法。正如我在本套课程一开始说的那样,这套课程非常简单,没必要在逻辑相同的方法上耗费过多时间,我相信大家看明白文章后,绝对可以看懂我提供的代码和源码。

如果要再进一步拓展的话,其实就可以和 SpringBoot 结合起来了。我忽然提到 SpringBoot,是因为在之前讲解的内容中,我一直跟大家强调,用户可以自己定义功能扩展插件,然后在程序中使用,同时动态线程池框架中还内置了一些功能扩展插件,比如线程池执行任务耗时功能插件就是内置的,只要用户使用了动态线程池框架,动态线程池框架就会自动在内部为用户收集任务执行信息。那么用户自己定义的功能扩展插件怎么交给动态线程池使用呢?用户如何定义功能扩展插件,我们已经掌握了,但是怎么把功能扩展插件交给动态线程池使用我还没有为大家讲解。当然,之前我也给大家提供了一个测试类,在测试类中手动把插件注册到插件管理器中,然后把插件管理器交给 ExtensibleThreadPoolExecutor 对象使用了,但是在真正开发中,我们不可能真的让用户手动处理这一切,让动态线程池帮用户处理好这一切才是最优雅的方法。这时候就要轮到 SpringBoot 登场了,或者是使用 SPI,这些也都是 nacos 中的套路。实际上,这应该就是很多框架通用的套路。我在此罗嗦了这么多,并不是立刻就要为大家引入 SpringBoot,这些都是后面要实现的功能了,我只是想先让大家对动态线程池某些功能的实现有一个大概思路而已。好了,不管怎么说,我们终于稍微前进了一步,动态线程池框架的雏形终于完成了。

目前 ExtensibleThreadPoolExecutor 类、以及功能扩展插件体系已经被我们重构完整了,那么接下来应该做什么呢?如果大家一时间没有头绪,接下来就让我为大家梳理一下思路。我们最终要开发的是一个动态线程池框架,框架开发完毕后,用户就可以直接在业务中使用这个框架。使用的方式也很简单,只要用户在业务中创建了一个 ExtensibleThreadPoolExecutor 线程池对象,并且使用这个线程池对象执行任务,那么动态线程池框架就会在线程池运行的过程中一直收集线程池的运行信息,并且把这些信息展示给用户,用户可以根据这些信息随时动态调控 ExtensibleThreadPoolExecutor 线程池的配置。这就是我们要开发的动态线程池的大概功能。当然,随着这些功能的确定,这个动态线程池的内部组件也能确定下来了。

首先我们需要一个客户端,然后再开发一个对应的服务端;客户端被集成在用户开发的项目中,只要用户在项目中创建了线程池,客户端就会把线程池的配置信息上报到服务端,并且会在线程池运行期间一直收集线程池的运行信息,把这些信息也上报给服务端;最后还需要开发一个前端组件,也就是 web 页面,这个页面会访问服务端,把线程池的运行信息和配置信息展示给用户;用户根据这些信息可以随时动态调整线程池的配置,调整的信息会发送给服务端,服务端判断线程池的配置发生了变化,就会通知客户端修改线程池配置信息。到此为止,动态线程池内部组件的工作流程我就为大家讲解完毕了。当然这其中有很多细节,这些细节知识都会在后面的文章中为大家一一讲解。

按照动态线程池各个组件的难易程度,我决定先开发动态线程池的客户端。刚才我也说了,只要用户在项目中创建了 ExtensibleThreadPoolExecutor 线程池对象,客户端就会将这个线程池的配置信息以及运行时信息上报到服务端,而在这一章,我们已经把 ExtensibleThreadPoolExecutor 类给重构完毕了,所以紧接着开发客户端,直接就可以使用这个 ExtensibleThreadPoolExecutor 类。不过开发客户端的内容,这一章显然是讲不完了,就留到下一章再为大家讲解吧。好了朋友们,我们下一章见!

本篇文章来源于微信公众号: 陈清风扬写框架



微信扫描下方的二维码阅读本文

此作者没有提供个人介绍
最后更新于 2024-08-01