本网站(662p.com)打包出售,且带程序代码数据,662p.com域名,程序内核采用TP框架开发,需要联系扣扣:2360248666 /wx:lianweikj
精品域名一口价出售:1y1m.com(350元) ,6b7b.com(400元) , 5k5j.com(380元) , yayj.com(1800元), jiongzhun.com(1000元) , niuzen.com(2800元) , zennei.com(5000元)
需要联系扣扣:2360248666 /wx:lianweikj
DolphinScheduler容错源码分析之Worker
mycodes · 147浏览 · 发布于2023-02-06 +关注

这篇文章主要为大家介绍了DolphinScheduler容错源码分析之Worker,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

引言

上一篇文章介绍了DolphinScheduler中Master的容错机制,作为去中心化的多Master和多Worker服务对等架构,Worker的容错机制也是我们需要关注的。

和Master一样源码的版本基于3.1.3

Worker容错源码分析

worker启动注册

首先Worker的启动入口是在WorkerServer中,在Worker启动后就会执行其run方法

@PostConstruct
public void run() {
    this.workerRpcServer.start();
    this.workerRpcClient.start();
    this.taskPluginManager.loadPlugin();
    this.workerRegistryClient.setRegistryStoppable(this);
    this.workerRegistryClient.start();
    this.workerManagerThread.start();
    this.messageRetryRunner.start();
    /*
     * registry hooks, which are called before the process exits
     */
    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
        if (!ServerLifeCycleManager.isStopped()) {
            close("WorkerServer shutdown hook");
        }
    }));
}

这里我们只关心this.workerRegistryClient.start();方法所做的事情:注册当前worker信息到Zookeeper,并且启动了一个心跳任务定时更新worker的信息到Zookeeper。

/**
 * registry
 */
private void registry() {
    WorkerHeartBeat workerHeartBeat = workerHeartBeatTask.getHeartBeat();
    String workerZKPath = workerConfig.getWorkerRegistryPath();
    // remove before persist
    registryClient.remove(workerZKPath);
    registryClient.persistEphemeral(workerZKPath, JSONUtils.toJsonString(workerHeartBeat));
    log.info("Worker node: {} registry to ZK {} successfully", workerConfig.getWorkerAddress(),
     workerZKPath);
    while (!registryClient.checkNodeExists(workerConfig.getWorkerAddress(), NodeType.WORKER)) {
        ThreadUtils.sleep(SLEEP_TIME_MILLIS);
    }
    // sleep 1s, waiting master failover remove
    ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
    workerHeartBeatTask.start();
    log.info("Worker node: {} registry finished", workerConfig.getWorkerAddress());
}

这里和master的注册流程基本一致,来看看worker注册的目录:

worker注册到zk的路径如下,并且和master都有相同的父级目录名称是/node:

// /nodes/worker/+ip:listenPortworkerConfig.setWorkerRegistryPath(REGISTRY_DOLPHINSCHEDULER_
WORKERS + "/" + workerConfig.getWorkerAddress());

注册的内容就是当前worker节点的健康状况,包含了cpu,内存,负载,磁盘等信息,通过这些信息就可以标识当前worker是否健康,可以接收任务的分配并且去执行。

@Override
public WorkerHeartBeat getHeartBeat() {
    double loadAverage = OSUtils.loadAverage();
    double cpuUsage = OSUtils.cpuUsage();
    int maxCpuLoadAvg = workerConfig.getMaxCpuLoadAvg();
    double reservedMemory = workerConfig.getReservedMemory();
    double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize();
    int execThreads = workerConfig.getExecThreads();
    int workerWaitingTaskCount = this.workerWaitingTaskCount.get();
    int serverStatus = getServerStatus(loadAverage, maxCpuLoadAvg, availablePhysicalMemorySize, 
    reservedMemory,
            execThreads, workerWaitingTaskCount);
    return WorkerHeartBeat.builder()
            .startupTime(ServerLifeCycleManager.getServerStartupTime())
            .reportTime(System.currentTimeMillis())
            .cpuUsage(cpuUsage)
            .loadAverage(loadAverage)
            .availablePhysicalMemorySize(availablePhysicalMemorySize)
            .maxCpuloadAvg(maxCpuLoadAvg)
            .memoryUsage(OSUtils.memoryUsage())
            .reservedMemory(reservedMemory)
            .diskAvailable(OSUtils.diskAvailable())
            .processId(processId)
            .workerHostWeight(workerConfig.getHostWeight())
            .workerWaitingTaskCount(this.workerWaitingTaskCount.get())
            .workerExecThreadCount(workerConfig.getExecThreads())
            .serverStatus(serverStatus)
            .build();
}

Master监听worker在zk节点的状态

接下来,master就会对注册的worker节点进行监控,在上一篇的介绍中,master启动注册后对node节点已经进行了监听,大家可以进行回顾一下,这里监听了/node/节点,当其下面的子路径/master或者/worker有变动就会触发回调 :

//node
registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener());

因此当worker临时节点异常后,master就会感知到其变化。最终会回调MasterRegistryDataListener中的notify方法,并根据变动的路径来判断是master还是worker:

@Override
public void notify(Event event) {
    final String path = event.path();
    if (Strings.isNullOrEmpty(path)) {
        return;
    }
    //monitor master
    if (path.startsWith(REGISTRY_DOLPHINSCHEDULER_MASTERS + Constants.SINGLE_SLASH)) {
        handleMasterEvent(event);
    } else if (path.startsWith(REGISTRY_DOLPHINSCHEDULER_WORKERS + Constants.SINGLE_SLASH)) {
        //monitor worker
        handleWorkerEvent(event);
    }
}

这段代码在之前master的容错中也见到过。这里是对于worker的容错,就会触发handleWorkerEvent方法。

private void handleWorkerEvent(Event event) {
    final String path = event.path();
    switch (event.type()) {
        case ADD:
            logger.info("worker node added : {}", path);
            break;
        case REMOVE:
            logger.info("worker node deleted : {}", path);
            masterRegistryClient.removeWorkerNodePath(path, NodeType.WORKER, true);
            break;
        default:
            break;
    }
}

接下来就是获取到下线worker节点的host信息进行进一步的容错处理了:

public void removeWorkerNodePath(String path, NodeType nodeType, boolean failover) {
    logger.info("{} node deleted : {}", nodeType, path);
    try {
                //获取节点信息
        String serverHost = null;
        if (!StringUtils.isEmpty(path)) {
            serverHost = registryClient.getHostByEventDataPath(path);
            if (StringUtils.isEmpty(serverHost)) {
                logger.error("server down error: unknown path: {}", path);
                return;
            }
            if (!registryClient.exists(path)) {
                logger.info("path: {} not exists", path);
            }
        }
        // failover server
        if (failover) {
            failoverService.failoverServerWhenDown(serverHost, nodeType);
        }
    } catch (Exception e) {
        logger.error("{} server failover failed", nodeType, e);
    }
}

整个worker容错的大致过程如下:

1-获取需要容错worker节点的启动时间,用于后续判断worker节点是否还在下线状态,或者是否已经重新启动 

2-根据异常的worker的信息查询需要容错的任务实例,获取只属于当前master节点需要容错的任务实例信息,这里也是和master不同的,并且容错没加锁的原因。 

3-遍历所有要容错的任务实例进行容错 这里注意的是需要容错的任务是在worker重新启动之前的任务,之后worker异常重启后分配的新任务不要容错   

/**
 * Do the worker failover. Will find the SUBMITTED_SUCCESS/DISPATCH/RUNNING_EXECUTION/DELAY_
 EXECUTION/READY_PAUSE/READY_STOP tasks belong the given worker,
 * and failover these tasks.
 * <p>
 * Note: When we do worker failover, the master will only failover the processInstance belongs
  to the current master.
 *
 * @param workerHost worker host
 */
public void failoverWorker(@NonNull String workerHost) {
    LOGGER.info("Worker[{}] failover starting", workerHost);
    final StopWatch failoverTimeCost = StopWatch.createStarted();
    //获取需要容错worker节点的启动时间,用于后续判断worker节点是否还在下线状态,
    或者是否已经重新启动
    // we query the task instance from cache, so that we can directly update the cache
    final Optional<Date> needFailoverWorkerStartTime =
            getServerStartupTime(registryClient.getServerList(NodeType.WORKER), workerHost);
    //根据异常的worker的信息查询需要容错的任务实例,
    获取只属于当前master节点需要容错的任务实例信息,这里也是和master不同的,
    并且容错没加锁的原因。
    final List<TaskInstance> needFailoverTaskInstanceList = getNeedFailoverTaskInstance
    (workerHost);
    if (CollectionUtils.isEmpty(needFailoverTaskInstanceList)) {
        LOGGER.info("Worker[{}] failover finished there are no taskInstance need to failover",
         workerHost);
        return;
    }
    LOGGER.info(
            "Worker[{}] failover there are {} taskInstance may need to failover, will do a deep 
            check, taskInstanceIds: {}",
            workerHost,
            needFailoverTaskInstanceList.size(),
            needFailoverTaskInstanceList.stream().map(TaskInstance::getId).collect
            (Collectors.toList()));
    final Map<Integer, ProcessInstance> processInstanceCacheMap = new HashMap<>();
    for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
        LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskInstance.getProcessInstanceId(),
         taskInstance.getId());
        try {
            ProcessInstance processInstance = processInstanceCacheMap.computeIfAbsent(
                    taskInstance.getProcessInstanceId(), k -> {
                        WorkflowExecuteRunnable workflowExecuteRunnable = 
                        cacheManager.getByProcessInstanceId(
                                taskInstance.getProcessInstanceId());
                        if (workflowExecuteRunnable == null) {
                            return null;
                        }
                        return workflowExecuteRunnable.getProcessInstance();
                    });
            //这里注意的是需要容错的任务是在worker重新启动之前的任务,
            之后worker异常重启后分配的新任务不要容错
            if (!checkTaskInstanceNeedFailover(needFailoverWorkerStartTime, processInstance,
             taskInstance)) {
                LOGGER.info("Worker[{}] the current taskInstance doesn't need to failover", 
                workerHost);
                continue;
            }
            LOGGER.info(
                    "Worker[{}] failover: begin to failover taskInstance, will set the status
                     to NEED_FAULT_TOLERANCE",
                    workerHost);
            failoverTaskInstance(processInstance, taskInstance);
            LOGGER.info("Worker[{}] failover: Finish failover taskInstance", workerHost);
        } catch (Exception ex) {
            LOGGER.info("Worker[{}] failover taskInstance occur exception", workerHost, ex);
        } finally {
            LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
        }
    }
    failoverTimeCost.stop();
    LOGGER.info("Worker[{}] failover finished, useTime:{}ms",
            workerHost,
            failoverTimeCost.getTime(TimeUnit.MILLISECONDS));
}

4-更新taskInstance的状态为TaskExecutionStatus.NEED_FAULT_TOLERANCE。并且构造TaskStateEvent事件,设置其状态为需要容TaskExecutionStatus.NEED_FAULT_TOLERANCE的,其类型是TASK_STATE_CHANGE。最后提交需要容错的event。

private void failoverTaskInstance(@NonNull ProcessInstance processInstance,
 @NonNull TaskInstance taskInstance) {
    TaskMetrics.incTaskInstanceByState("failover");
    boolean isMasterTask = TaskProcessorFactory.isMasterTask(taskInstance.getTaskType());
    taskInstance.setProcessInstance(processInstance);
    if (!isMasterTask) {
        LOGGER.info("The failover taskInstance is not master task");
        TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
                .buildTaskInstanceRelatedInfo(taskInstance)
                .buildProcessInstanceRelatedInfo(processInstance)
                .buildProcessDefinitionRelatedInfo(processInstance.getProcessDefinition())
                .create();
        if (masterConfig.isKillYarnJobWhenTaskFailover()) {
            // only kill yarn job if exists , the local thread has exited
            LOGGER.info("TaskInstance failover begin kill the task related yarn job");
            ProcessUtils.killYarnJob(logClient, taskExecutionContext);
        }
    } else {
        LOGGER.info("The failover taskInstance is a master task");
    }
    taskInstance.setState(TaskExecutionStatus.NEED_FAULT_TOLERANCE);
    taskInstance.setFlag(Flag.NO);
    processService.saveTaskInstance(taskInstance);
        //提交event
    TaskStateEvent stateEvent = TaskStateEvent.builder()
            .processInstanceId(processInstance.getId())
            .taskInstanceId(taskInstance.getId())
            .status(TaskExecutionStatus.NEED_FAULT_TOLERANCE)
            .type(StateEventType.TASK_STATE_CHANGE)
            .build();
    workflowExecuteThreadPool.submitStateEvent(stateEvent);
}

event的提交会去根据其所属的工作流实例来选择其对应的WorkflowExecuteRunnable进行提交容错:

public void submitStateEvent(StateEvent stateEvent) {
    WorkflowExecuteRunnable workflowExecuteThread =
            processInstanceExecCacheManager.getByProcessInstanceId
            (stateEvent.getProcessInstanceId());
    if (workflowExecuteThread == null) {
        logger.warn("Submit state event error, cannot from workflowExecuteThread from 
        cache manager, stateEvent:{}",
                stateEvent);
        return;
    }
    workflowExecuteThread.addStateEvent(stateEvent);
    logger.info("Submit state event success, stateEvent: {}", stateEvent);
}

处理容错event事件

在上面的代码中已经对需要容错的任务提交了一个event事件,那么肯定会有线程对这个event进行具体的处理。我们来看WorkflowExecuteRunnable类,submitStateEvent就是将event提交到了这个类中的stateEvents队列中:

private final ConcurrentLinkedQueue<StateEvent> stateEvents = new ConcurrentLinkedQueue<>();

WorkflowExecuteRunnable在master启动的时候就已经启动了,并且会不停的从stateEvents中获取event进行处理:

/**
 * handle event
 */
public void handleEvents() {
    if (!isStart()) {
        logger.info(
                "The workflow instance is not started, will not handle its state event, 
                current state event size: {}",
                stateEvents);
        return;
    }
    StateEvent stateEvent = null;
    while (!this.stateEvents.isEmpty()) {
        try {
            stateEvent = this.stateEvents.peek();
            LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(),
                    stateEvent.getTaskInstanceId());
            // if state handle success then will remove this state, otherwise will retry this 
            state next time.
            // The state should always handle success except database error.
            checkProcessInstance(stateEvent);
            StateEventHandler stateEventHandler =
                    StateEventHandlerManager.getStateEventHandler(stateEvent.getType())
                            .orElseThrow(() -> new StateEventHandleError(
                                    "Cannot find handler for the given state event"));
            logger.info("Begin to handle state event, {}", stateEvent);
            if (stateEventHandler.handleStateEvent(this, stateEvent)) {
                this.stateEvents.remove(stateEvent);
            }
        } catch (StateEventHandleError stateEventHandleError) {
            logger.error("State event handle error, will remove this event: {}", stateEvent,
             stateEventHandleError);
            this.stateEvents.remove(stateEvent);
            ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
        } catch (StateEventHandleException stateEventHandleException) {
            logger.error("State event handle error, will retry this event: {}",
                    stateEvent,
                    stateEventHandleException);
            ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
        } catch (Exception e) {
            // we catch the exception here, since if the state event handle failed, the state 
            event will still keep
            // in the stateEvents queue.
            logger.error("State event handle error, get a unknown exception, will retry this 
            event: {}",
                    stateEvent,
                    e);
            ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
        } finally {
            LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
        }
    }
}

根据提交事件的类型StateEventType.TASK_STATE_CHANGE 可以获取到具体的StateEventHandler实现是TaskStateEventHandler。在TaskStateEventHandler的handleStateEvent方法中主要对需要容错的任务做了如下处理:

if (task.getState().isFinished()) {
       if (completeTaskMap.containsKey(task.getTaskCode())
               && completeTaskMap.get(task.getTaskCode()) == task.getId()) {
           logger.warn("The task instance is already complete, stateEvent: {}", stateEvent);
           return true;
       }
       workflowExecuteRunnable.taskFinished(task);
       if (task.getTaskGroupId() > 0) {
           logger.info("The task instance need to release task Group: {}",
            task.getTaskGroupId());
           workflowExecuteRunnable.releaseTaskGroup(task);
       }
       return true;
   }

其中判断是否完成的具体实现中就包含了是否是容错的状态。

public boolean isFinished() {
    return isSuccess() || isKill() || isFailure() || isPause();
}
public boolean isFailure() {
    return this == TaskExecutionStatus.FAILURE || this == NEED_FAULT_TOLERANCE;
}

接着就会调用workflowExecuteRunnable.taskFinished(task);方法去处理各种任务实例状态变化后的事件。这里我们只关注容错相关的代码分支:

} else if (taskInstance.taskCanRetry() && !processInstance.getState().isReadyStop()) {
            // retry task
            logger.info("Retry taskInstance taskInstance state: {}", taskInstance.getState());
            retryTaskInstance(taskInstance);
}
//判断了是否容错的状态,前面对其已经进行了更新
public boolean taskCanRetry() {
    if (this.isSubProcess()) {
        return false;
    }
    if (this.getState() == TaskExecutionStatus.NEED_FAULT_TOLERANCE) {
        return true;
    }
    return this.getState() == TaskExecutionStatus.FAILURE && 
    (this.getRetryTimes() < this.getMaxRetryTimes());
}
/**
 * crate new task instance to retry, different objects from the original
 *
 */
private void retryTaskInstance(TaskInstance taskInstance) throws StateEventHandleException {
    if (!taskInstance.taskCanRetry()) {
        return;
    }
    TaskInstance newTaskInstance = cloneRetryTaskInstance(taskInstance);
    if (newTaskInstance == null) {
        logger.error("Retry task fail because new taskInstance is null,
         task code:{}, task id:{}",
                taskInstance.getTaskCode(),
                taskInstance.getId());
        return;
    }
    waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance);
    if (!taskInstance.retryTaskIntervalOverTime()) {
        logger.info(
                "Failure task will be submitted, process id: {}, task instance code: 
                {}, state: {}, retry times: {} / {}, interval: {}",
                processInstance.getId(), newTaskInstance.getTaskCode(),
                newTaskInstance.getState(), newTaskInstance.getRetryTimes(), 
                newTaskInstance.getMaxRetryTimes(),
                newTaskInstance.getRetryInterval());
        stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, newTaskInstance);
        stateWheelExecuteThread.addTask4RetryCheck(processInstance, newTaskInstance);
    } else {
        addTaskToStandByList(newTaskInstance);
        submitStandByTask();
        waitToRetryTaskInstanceMap.remove(newTaskInstance.getTaskCode());
    }
}

最终将需要容错的任务实例重新加入到了readyToSubmitTaskQueue队列中,重新进行submit:

addTaskToStandByList(newTaskInstance);
submitStandByTask();

后面就是和正常任务一样处理了通过submitTaskExec方法提交任务到具体的worker执行。

总结

对于Worker的容错流程大致如下:

1-Master基于ZK的监听来感知需要容错的Worker节点信息

2-每个Master只负责容错属于自己调度的工作流实例,在容错前会比较实例的开始时间和服务节点的启动时间,在服务启动时间之后的则跳过容错;

3-需要容错的任务实例会重新加入到readyToSubmitTaskQueue,并提交运行。


相关推荐

PHP实现部分字符隐藏

沙雕mars · 1325浏览 · 2019-04-28 09:47:56
Java中ArrayList和LinkedList区别

kenrry1992 · 908浏览 · 2019-05-08 21:14:54
Tomcat 下载及安装配置

manongba · 970浏览 · 2019-05-13 21:03:56
JAVA变量介绍

manongba · 962浏览 · 2019-05-13 21:05:52
什么是SpringBoot

iamitnan · 1086浏览 · 2019-05-14 22:20:36
加载中

0评论

评论
我从事编程工作,现在在一家网络公司上班,偶尔也是发布博客,逛论坛等,希望可以在这里交到志同道合的朋友。
分类专栏
小鸟云服务器
扫码进入手机网页