网站/小程序/APP个性化定制开发,二开,改版等服务,加扣:8582-36016

    说在前面

    trivial是根据之前设计RPC框架而来的(还在增进当中),其中较为不同的一个点为,在客户端去掉了业务线程池,因为既然都要等待,不必要再加一层。

     

    进入正题

    有在网上看到这样的信息,“之前有简单提到过, dubbo默认采用了netty做为网络组件,它属于一种NIO的模式。消费端发起远程请求后,线程不会阻塞等待服务端的返回,而是马上得到一个ResponseFuture,消费端通过不断的轮询机制判断结果是否有返回。因为是通过轮询,轮询有个需要特别注要的就是避免死循环,所以为了解决这个问题就引入了超时机制,只在一定时间范围内做轮询,如果超时时间就返回超时异常”。

    我认为这种说法是错误。

    1.以上说法只关注结果,但是如果只关注结果的话何不阻塞等待?还需要轮询判断,耗费cpu资源?超时机制绝不是为了让轮询在一定时间内结束!

    问题1:超时机制有什么作用?

    2.上述说“消费端通过不断的轮询机制判断结果是否有返回”,没有指明是消费端的什么线程,但是容易让人误以为是调用者线程(下称caller)。而事实上是由一个deamon线程去扫描判断所有的caller发起的调用是否超时。

    问题2:为什么不让caller自己去轮询?

     

    问题1个人观点:

    在正常情况下,即caller发起调用,而后只需阻塞等待服务提供方的结果即可,因为在正常情况下是能收到的。

    那要是因为某些原因而收不到呢?比如,服务提供方的处理线程意外结束了,那caller岂不是要一直等下去?

    所以要有超时。

    dubbo中超时后重试的请求是路由到其他机器上的。咋一看合情合理,再细想大有学问(有可能是我想多了)。

    除了刚刚说的因为处理线程意外结束使得caller得不到结果这种情况之外,有些人会想到另一种情况——在网络中丢失?这种情况也是不适合再发送到同一个机器的,因为有tcp的重传,这样你重试的请求若要到同一个机器,便到了协议栈同一个缓冲区,那么最先发送成功的依然是上一次的请求,再按正常情况,首次收到的依然是上一次请求的结果,相当于重试没有作用。

    事实上,这只是我的猜想,对于tcp串行传输,并行传输什么的还没有去了解,这里只算是提出一个问题来思考,如果有错误还望指出!

     

    问题2个人观点:

    假设是由caller自己轮询(有10个),那么每个cpu时间片结束后,都会从运行态转到就绪态(同样有上下文的切换)。适合短时间轮询

    假设是由超时扫描线程扫描,这10个caller直接一次进入java线程的等待状态(linux的阻塞态?),结束后由他人唤醒。适合较长时间轮询

    前者每次状态切换耗费资源少,但次数多。

    后者每次状态切换耗费资源多,但只有一次。

    所以多短算短,多长算长呢?未经测试

    同样我并不知道dubbo是怎么考虑的,但我自己是这样想的,所以再次强调这是个人观点,可能有错误。

     

    dubbo超时细节

    超时扫描线程

    static {
        Thread th = new Thread(new RemotingInvocationTimeoutScan(), "DubboResponseTimeoutScanTimer"); //扫描超时
        th.setDaemon(true);
        th.start();
    }

     

    DefaultFuture的get方法

    @Override
    public Object get() throws RemotingException {
        return get(timeout);
    }
    @Override
    public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (!isDone()) { // wait应该在循环当中
                    // 在调用的时候需要等待
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            
            if (!isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        return returnFromResponse();
    }

    扫描线程细节

    private static class RemotingInvocationTimeoutScan implements Runnable {
            @Override
            public void run() {
                while (true) {
                    try {
                        // 扫描DefaultFuture列表
                        for (DefaultFuture future : FUTURES.values()) {
                            if (future == null || future.isDone()) {
                                continue;
                            }
                            // 如果future未完成且超时
                            if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {
                                Response timeoutResponse = new Response(future.getId());
                                // 设置超时状态
                                timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
                                timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
                                DefaultFuture.received(future.getChannel(), timeoutResponse);
                            }
                        }
                        Thread.sleep(30);
                    } catch (Throwable e) {
                        logger.error("Exception when scan the timeout invocation of remoting.", e);
                    }
                }
            }
        }

    可以看到该线程用于扫描所有caller注册的调用信息,检查超时。值得注意的一个细节是,“Thread.sleep(30)”,也是在说明while(true)是不让出cpu的吗?

     

    trivial超时细节

    超时观察者watcher

    private class Watcher extends Thread{
    
            @Override
    
            public void run() {
    
                while(!RPCClient.shutdown){//每次循环检查是否已经关闭,同样会让出cpu
    
                    try {
    
                        CountDownNode head=waiterQueue.take();//阻塞获取头
    
                        if(System.currentTimeMillis()-head.createTime <RPCClient.timeout)
    
                            waiterQueue.add(head);//如果没有超时再加回到队尾
    
                        else{//如果超时了
    
                            long callerId=head.message.getCallerId();
    
                            long count=head.message.getCount();
    
                            if(countMap.get(callerId)==null
    
                                    || countMap.get(callerId)!=count) continue;//实际上已经成功返回
    
                            if(head.retryNum>0){
    
                                head.retryNum--;
    
                                log.error("线程——"+callerId+" 第 "+count +" 次调用超时,即将进行第 "
    
                                        +(RPCClient.retryNum-head.retryNum)+" 次重试");
    
                                context.writeAndFlush(head.message);//重发信息
    
                                continue;
    
                            }
    
                            resultMap.put(callerId,"调用超时");
    
                            log.error("线程—— "+callerId+" 第 "+count
    
                                    +"次调用超时,已重试 "+RPCClient.retryNum+" 次,即将返回超时提示");
    
                            LockSupport.unpark(waiterMap.get(callerId));
    
                            waiterMap.remove(callerId);
    
                            countMap.remove(callerId);
    
                        }
    
                    } catch (InterruptedException e) {
    
                        e.printStackTrace();
    
                    }
    
                }
    
                log.info("超时观察者退出");
    
            }
    
        }

    大致上是差不多的,都是要一个线程去扫描,但有一点较为不同的是,

    dubbo的超时扫描线程虽然每次循环sleep(30),但即使没有caller发起调用也会一直扫描,耗费cpu资源;

    而trivial则会阻塞地从阻塞队列中获取,如果没有caller发起调用则阻塞,不耗费cpu资源。

    在频繁发起调用的时候两者差不多的,因为后者也不会总是进入阻塞,但在偶发调用时,或许trivial较好。当然取决于真实情况。

     

    最后,如果有兴趣的话,可以了解一下这个平凡的RPC框架,https://github.com/AllenDuke/trivial

    评论 0

    暂无评论
    0
    0
    0
    立即
    投稿
    发表
    评论
    返回
    顶部