👉这是一个或许对你有用的社群

🐱一对一交流/面试小册/简历优化/求职解惑,欢迎加入芋道快速开发平台知识星球。下面是星球提供的部分资料:

👉这是一个或许对你有用的开源项目

国产 Star 破 10w+ 的开源项目,前端包括管理后台 + 微信小程序,后端支持单体和微服务架构。

功能涵盖 RBAC 权限、SaaS 多租户、数据权限、商城、支付、工作流、大屏报表、微信公众号等等功能:

  • Boot 地址:https://gitee.com/zhijiantianya/ruoyi-vue-pro
  • Cloud 地址:https://gitee.com/zhijiantianya/yudao-cloud
  • 视频教程:https://doc.iocoder.cn

来源:blog.csdn.net/baidu_19473529


前言

请求合并到底有什么意义呢?我们来看下图。

假设我们3个用户(用户id分别是1、2、3),现在他们都要查询自己的基本信息,请求到服务器,服务器端请求数据库,发出3次请求。我们都知道数据库连接资源是相当宝贵的,那么我们怎么尽可能节省连接资源呢?

这里把数据库换成被调用的远程服务,也是同样的道理。

我们改变下思路,如下图所示。

我们在服务器端把请求合并,只发出一条SQL查询数据库,数据库返回后,服务器端处理返回数据,根据一个唯一请求ID,把数据分组,返回给对应用户。

基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/ruoyi-vue-pro
  • 视频教程:https://doc.iocoder.cn/video/

技术手段

  • LinkedBlockQueue 阻塞队列
  • ScheduledThreadPoolExecutor 定时任务线程池
  • CompleteableFuture future 阻塞机制(Java 8 的 CompletableFuture 并没有 timeout 机制,后面优化,使用了队列替代)

基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/yudao-cloud
  • 视频教程:https://doc.iocoder.cn/video/

代码实现

查询用户的代码

package com.springboot.sample.service.impl;

import com.springboot.sample.bean.Users;
import com.springboot.sample.service.UserService;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.*;

/****zzq*包装成批量执行的地方,使用queue解决超时问题**/
@Service
public class UserWrapBatchQueueService {
    @Resource
    private UserService userService;
    /***最大任务数**/
    public static int MAX_TASK_NUM = 100;

    /***请求类,code为查询的共同特征,例如查询商品,通过不同id的来区分*CompletableFuture将处理结果返回*/
    public class Request {
        //请求id String requestId;
        // 参数LonguserId;
        // 队列,这个有超时机制
        LinkedBlockingQueue<Users> usersQueue;

        public String getRequestId() {
            returnrequestId;
        }

        public voidsetRequestId(String requestId) {
            this.requestId = requestId;
        }

        public LonggetUserId() {
            returnuserId;
        }

        public voidsetUserId(LonguserId) {
            this.userId = userId;
        }

        public LinkedBlockingQueue<Users> getUsersQueue() {
            return usersQueue;
        }

        public voidsetUsersQueue(LinkedBlockingQueue<Users> usersQueue) {
            this.usersQueue = usersQueue;
        }
    }
    /*LinkedBlockingQueue是一个阻塞的队列,内部采用链表的结果,通过两个ReenTrantLock来保证线程安全LinkedBlockingQueue与ArrayBlockingQueue的区别ArrayBlockingQueue默认指定了长度,而LinkedBlockingQueue的默认长度是Integer.MAX_VALUE,也就是无界队列,在移除的速度小于添加的速度时,容易造成OOM。ArrayBlockingQueue的存储容器是数组,而LinkedBlockingQueue是存储容器是链表两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。*/

    private final Queue<Request> queue = new LinkedBlockingQueue();

    @PostConstruct
    public void init() {
        //定时任务线程池,创建一个支持定时、周期性或延时任务的限定线程数目(这里传入的是1)的线程池
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            int size = queue.size();
            //如果队列没数据,表示这段时间没有请求,直接返回
            if (size == 0) {
                return;
            }
            List<Request> list = new ArrayList<>();
            System.out.println("合并了[" + size + "]个请求");
            //将队列的请求消费到一个集合保存
            for (inti = 0; i < size; i++) {
                //后面的SQL语句是有长度限制的,所以还要做限制每次批量的数量,超过最大任务数,等下次执行
                if (i < MAX_TASK_NUM) {
                    list.add(queue.poll());
                }
            }
            //拿到我们需要去数据库查询的特征,保存为集合 
            List<Request> userReqs = new ArrayList<>();
            for (Request request :
                    list) {
                userReqs.add(request);
            }//将参数传入service处理,这里是本地服务,也可以把userService看成RPC之类的远程调用
            Map<String, Users> response = userService.queryUserByIdBatchQueue(userReqs);
            for (RequestuserReq:
                 userReqs) {//这里再把结果放到队列里
                Usersusers = response.get(userReq.getRequestId());
                userReq.usersQueue.offer(users);
            }
        }, 100, 10, TimeUnit.MILLISECONDS);//scheduleAtFixedRate是周期性执行schedule是延迟执行initialDelay是初始延迟period是周期间隔后面是单位//这里我写的是初始化后100毫秒后执行,周期性执行10毫秒执行一次} 
        public Users queryUser (Long userId){
            Request request = new Request();//这里用UUID做请求id
            request.requestId = UUID.randomUUID().to String().replace("-", "");
            request.userId = userId;
            LinkedBlockingQueue<Users> usersQueue = newLinkedBlockingQueue <>();
            request.usersQueue = usersQueue;
            //将对象传入队列
            queue.offer(request);
            //取出元素时,如果队列为空,给定阻塞多少毫秒再队列取值,这里是3秒
            try {
                return usersQueue.poll(3000, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return null;
        }
    }

    @Override
    public Map<String, Users> queryUserByIdBatchQueue(List<UserWrapBatchQueueService.Request> userReqs) {
        //全部参数
        List<Long> userIds = userReqs.stream().map(UserWrapBatchQueueService.Request::getUserId).collect(Collectors.toList());
        QueryWrapper<Users> queryWrapper = new QueryWrapper<>
                ();
        //用in语句合并成一条SQL,避免多次请求数据库的IO
        queryWrapper.in("id", userIds);
        List<Users> users = usersMapper.selectList(queryWrapper);
        Map<Long, List<Users>> userGroup = users.stream().collect(Collectors.groupingBy(Users::getId));
        HashMap<String, Users> result = newHashMap <>
        ();//数据分组 
        userReqs.forEach(val -> {
            List<Users> usersList = userGroup.get(val.getUserId());
            if (!CollectionUtils.isEmpty(usersList)) {
                result.put(val.getRequestId(), usersList.get(0));
            } else {
                //表示没数据,这里要new,不然加入队列会空指针
                result.put(val.getRequestId(), new Users());
            }
        });
        return result;
    }

}

 

小结

请求合并,批量的办法能大幅节省被调用系统的连接资源,本例是以数据库为例,其他RPC调用也是类似的道理。缺点就是请求的时间在执行实际的逻辑之前增加了等待时间,不适合低并发的场景。

代码地址

  • https://gitee.com/apple_1030907690/spring-boot-kubernetes/tree/v1.0.5

参考

  • https://www.cnblogs.com/oyjg/p/13099998.html

欢迎加入我的知识星球,全面提升技术能力。

👉加入方式,长按”或“扫描”下方二维码噢

星球的内容包括:项目实战、面试招聘、源码解析、学习路线。

文章有帮助的话,在看,转发吧。

谢谢支持哟 (*^__^*)

本篇文章来源于微信公众号: Java基基



微信扫描下方的二维码阅读本文

此作者没有提供个人介绍
最后更新于 2024-08-02