From c99ad738c02ee54d16cbca28dd5d139a1f33f8e5 Mon Sep 17 00:00:00 2001 From: OS <29528966+lenboo@users.noreply.github.com> Date: Fri, 8 Oct 2021 14:08:12 +0800 Subject: [PATCH 001/383] [Bug-6455][Master]fix bug 6455: cannot stop sub-task (#6458) * fix bug: cannot stop the task. * fix bug: cannot stop the task. * remove the check thread number --- .../master/processor/StateEventProcessor.java | 6 +++++- .../master/runner/WorkflowExecuteThread.java | 2 +- .../runner/task/CommonTaskProcessor.java | 2 +- .../master/runner/task/SubTaskProcessor.java | 18 ++++++++++++++++-- .../service/process/ProcessService.java | 4 ---- 5 files changed, 23 insertions(+), 9 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java index d5a8e85b5d..2f9a634250 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java @@ -61,8 +61,12 @@ public class StateEventProcessor implements NettyRequestProcessor { StateEventChangeCommand stateEventChangeCommand = JSONUtils.parseObject(command.getBody(), StateEventChangeCommand.class); StateEvent stateEvent = new StateEvent(); - stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION); stateEvent.setKey(stateEventChangeCommand.getKey()); + if (stateEventChangeCommand.getSourceProcessInstanceId() != stateEventChangeCommand.getDestProcessInstanceId()) { + stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION); + } else { + stateEvent.setExecutionStatus(stateEventChangeCommand.getSourceStatus()); + } stateEvent.setProcessInstanceId(stateEventChangeCommand.getDestProcessInstanceId()); stateEvent.setTaskInstanceId(stateEventChangeCommand.getDestTaskInstanceId()); StateEventType type = stateEvent.getTaskInstanceId() == 0 ? StateEventType.PROCESS_STATE_CHANGE : StateEventType.TASK_STATE_CHANGE; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index eae6abe068..43fcbd7b9e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -416,7 +416,7 @@ public class WorkflowExecuteThread implements Runnable { if (stateEvent.getExecutionStatus().typeIsFinished()) { endProcess(); } - if (stateEvent.getExecutionStatus() == ExecutionStatus.READY_STOP) { + if (processInstance.getState() == ExecutionStatus.READY_STOP) { killAllTasks(); } return true; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java index 4296b85fa4..ee1c548525 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java @@ -55,7 +55,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor { MasterConfig masterConfig; @Autowired - NettyExecutorManager nettyExecutorManager; + NettyExecutorManager nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class); /** * logger of MasterBaseTaskExecThread diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java index 7a4be5830c..e0cd3e8603 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java @@ -23,6 +23,9 @@ import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; +import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import java.util.Date; import java.util.concurrent.locks.Lock; @@ -43,6 +46,8 @@ public class SubTaskProcessor extends BaseTaskProcessor { */ private final Lock runLock = new ReentrantLock(); + private StateEventCallbackService stateEventCallbackService = SpringApplicationContext.getBean(StateEventCallbackService.class); + @Override public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) { this.processInstance = processInstance; @@ -121,8 +126,7 @@ public class SubTaskProcessor extends BaseTaskProcessor { } subProcessInstance.setState(ExecutionStatus.READY_PAUSE); processService.updateProcessInstance(subProcessInstance); - //TODO... - // send event to sub process master + sendToSubProcess(); return true; } @@ -157,9 +161,19 @@ public class SubTaskProcessor extends BaseTaskProcessor { } subProcessInstance.setState(ExecutionStatus.READY_STOP); processService.updateProcessInstance(subProcessInstance); + sendToSubProcess(); return true; } + private void sendToSubProcess() { + StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand( + processInstance.getId(), taskInstance.getId(), subProcessInstance.getState(), subProcessInstance.getId(), 0 + ); + String address = subProcessInstance.getHost().split(":")[0]; + int port = Integer.parseInt(subProcessInstance.getHost().split(":")[1]); + this.stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command()); + } + @Override public String getType() { return TaskType.SUB_PROCESS.getDesc(); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 4f5058cd1f..50aef45ee7 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -222,10 +222,6 @@ public class ProcessService { moveToErrorCommand(command, "process instance is null"); return null; } - if (!checkThreadNum(command, validThreadNum)) { - logger.info("there is not enough thread for this command: {}", command); - return setWaitingThreadProcess(command, processInstance); - } processInstance.setCommandType(command.getCommandType()); processInstance.addHistoryCmd(command.getCommandType()); saveProcessInstance(processInstance); -- Gitee From e5cdd8bd27e6e3add6e252cde70aa7e2896cc95a Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Wed, 6 Oct 2021 10:15:36 +0800 Subject: [PATCH 002/383] Run h2 in daemon mode (#6446) --- .../org/apache/dolphinscheduler/server/StandaloneServer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java b/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java index 94b6ca7bc2..bc4287119d 100644 --- a/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java +++ b/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java @@ -110,7 +110,7 @@ public class StandaloneServer { System.setProperty(SPRING_DATASOURCE_USERNAME, "sa"); System.setProperty(SPRING_DATASOURCE_PASSWORD, ""); - Server.createTcpServer("-ifNotExists").start(); + Server.createTcpServer("-ifNotExists", "-tcpDaemon").start(); final DataSource ds = ConnectionFactory.getInstance().getDataSource(); final ScriptRunner runner = new ScriptRunner(ds.getConnection(), true, true); -- Gitee From 677a8d427ec27dd719ffc2c5ca0d71db9e6fcefb Mon Sep 17 00:00:00 2001 From: kezhenxu94 Date: Fri, 8 Oct 2021 11:05:27 +0800 Subject: [PATCH 003/383] Add missing fields in H2 init sql (#6454) --- sql/dolphinscheduler_h2.sql | 44 ++++++++++++++++++++----------------- 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/sql/dolphinscheduler_h2.sql b/sql/dolphinscheduler_h2.sql index ffa3a3a609..1565ab419e 100644 --- a/sql/dolphinscheduler_h2.sql +++ b/sql/dolphinscheduler_h2.sql @@ -327,8 +327,9 @@ CREATE TABLE t_ds_command process_instance_priority int(11) DEFAULT NULL, worker_group varchar(64), environment_code bigint(20) DEFAULT '-1', + dry_run int NULL DEFAULT 0, PRIMARY KEY (id), - KEY priority_id_index (process_instance_priority, id) + KEY priority_id_index (process_instance_priority, id) ); -- ---------------------------- @@ -379,6 +380,7 @@ CREATE TABLE t_ds_error_command worker_group varchar(64), environment_code bigint(20) DEFAULT '-1', message text, + dry_run int NULL DEFAULT 0, PRIMARY KEY (id) ); @@ -592,6 +594,7 @@ CREATE TABLE t_ds_process_instance timeout int(11) DEFAULT '0', tenant_id int(11) NOT NULL DEFAULT '-1', var_pool longtext, + dry_run int NULL DEFAULT 0, PRIMARY KEY (id) ); @@ -831,11 +834,12 @@ CREATE TABLE t_ds_task_instance task_instance_priority int(11) DEFAULT NULL, worker_group varchar(64) DEFAULT NULL, environment_code bigint(20) DEFAULT '-1', - environment_config text DEFAULT '', + environment_config text DEFAULT '', executor_id int(11) DEFAULT NULL, first_submit_time datetime DEFAULT NULL, delay_time int(4) DEFAULT '0', var_pool longtext, + dry_run int NULL DEFAULT 0, PRIMARY KEY (id), FOREIGN KEY (process_instance_id) REFERENCES t_ds_process_instance (id) ON DELETE CASCADE ); @@ -1002,17 +1006,17 @@ CREATE TABLE t_ds_alert_plugin_instance DROP TABLE IF EXISTS t_ds_environment; CREATE TABLE t_ds_environment ( - id int NOT NULL AUTO_INCREMENT, - code bigint(20) NOT NULL, - name varchar(100) DEFAULT NULL, - config text DEFAULT NULL, - description text, - operator int DEFAULT NULL, - create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, - update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - PRIMARY KEY (id), - UNIQUE KEY environment_name_unique (name), - UNIQUE KEY environment_code_unique (code) + id int NOT NULL AUTO_INCREMENT, + code bigint(20) NOT NULL, + name varchar(100) DEFAULT NULL, + config text DEFAULT NULL, + description text, + operator int DEFAULT NULL, + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (id), + UNIQUE KEY environment_name_unique (name), + UNIQUE KEY environment_code_unique (code) ); -- @@ -1021,12 +1025,12 @@ CREATE TABLE t_ds_environment DROP TABLE IF EXISTS t_ds_environment_worker_group_relation; CREATE TABLE t_ds_environment_worker_group_relation ( - id int NOT NULL AUTO_INCREMENT, - environment_code bigint(20) NOT NULL, - worker_group varchar(255) NOT NULL, - operator int DEFAULT NULL, - create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, - update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - PRIMARY KEY (id) , + id int NOT NULL AUTO_INCREMENT, + environment_code bigint(20) NOT NULL, + worker_group varchar(255) NOT NULL, + operator int DEFAULT NULL, + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (id), UNIQUE KEY environment_worker_group_unique (environment_code,worker_group) ); -- Gitee From f8b68dccd736acdeac2b49defba98fd674e6d163 Mon Sep 17 00:00:00 2001 From: OS <29528966+lenboo@users.noreply.github.com> Date: Fri, 8 Oct 2021 16:56:33 +0800 Subject: [PATCH 004/383] fix bug: complement data error (#6460) --- .../dolphinscheduler/service/quartz/cron/CronUtils.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java index ab9a97b7ca..f23da2620f 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java @@ -185,6 +185,11 @@ public class CronUtils { */ public static List getSelfFireDateList(final Date startTime, final Date endTime, final List schedules) { List result = new ArrayList<>(); + if(startTime.equals(endTime)){ + result.add(startTime); + return result; + } + Date from = new Date(startTime.getTime() - Constants.SECOND_TIME_MILLIS); Date to = new Date(endTime.getTime() - Constants.SECOND_TIME_MILLIS); -- Gitee From 2460eb6fb9789620d55888267a02bd3273f209fa Mon Sep 17 00:00:00 2001 From: Kirs Date: Mon, 11 Oct 2021 16:41:10 +0800 Subject: [PATCH 005/383] [cherry-pick #6484 #6488][MasterServer] upgrade the druid and mysql connector version (#6494) * [DS-6483][MasterServer] upgrade the druid and mysql connector version (#6484) * [DS-6483][MasterServer] upgrade the druid and mysql connector version * remove mysql connector in known-dependencies.txt * change mysql connector version to 8.0.16 Co-authored-by: caishunfeng <534328519@qq.com> * Add Alibaba Druid to NOTICE file (#6488) Co-authored-by: wind Co-authored-by: caishunfeng <534328519@qq.com> --- .../dao/datasource/SpringConnectionFactory.java | 2 +- dolphinscheduler-dist/release-docs/NOTICE | 8 ++++++++ pom.xml | 4 ++-- tools/dependencies/known-dependencies.txt | 2 +- 4 files changed, 12 insertions(+), 4 deletions(-) diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java index ca4a7e20bd..1865c78fd7 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java @@ -91,7 +91,7 @@ public class SpringConnectionFactory { druidDataSource.setPoolPreparedStatements(PropertyUtils.getBoolean(Constants.SPRING_DATASOURCE_POOL_PREPARED_STATEMENTS, true)); druidDataSource.setTestWhileIdle(PropertyUtils.getBoolean(Constants.SPRING_DATASOURCE_TEST_WHILE_IDLE, true)); druidDataSource.setTestOnBorrow(PropertyUtils.getBoolean(Constants.SPRING_DATASOURCE_TEST_ON_BORROW, true)); - druidDataSource.setTestOnReturn(PropertyUtils.getBoolean(Constants.SPRING_DATASOURCE_TEST_ON_RETURN, true)); + druidDataSource.setTestOnReturn(PropertyUtils.getBoolean(Constants.SPRING_DATASOURCE_TEST_ON_RETURN, false)); druidDataSource.setKeepAlive(PropertyUtils.getBoolean(Constants.SPRING_DATASOURCE_KEEP_ALIVE, true)); druidDataSource.setMinIdle(PropertyUtils.getInt(Constants.SPRING_DATASOURCE_MIN_IDLE, 5)); diff --git a/dolphinscheduler-dist/release-docs/NOTICE b/dolphinscheduler-dist/release-docs/NOTICE index 11f5a94872..2e638696e1 100644 --- a/dolphinscheduler-dist/release-docs/NOTICE +++ b/dolphinscheduler-dist/release-docs/NOTICE @@ -674,6 +674,14 @@ http://gcc.gnu.org/onlinedocs/libstdc++/manual/license.html ======================================================================== +Alibaba Druid NOTICE + +======================================================================== + +Alibaba Druid +Copyright 1999-2021 Alibaba Group Holding Ltd. + +======================================================================== AWS SDK for Java NOTICE diff --git a/pom.xml b/pom.xml index 5d49a974b6..5c3cc6ef72 100644 --- a/pom.xml +++ b/pom.xml @@ -70,14 +70,14 @@ 3.2.0 2.0.1 5.0.5 - 1.1.22 + 1.2.4 1.4.200 1.11 1.1.1 4.4.1 4.4.1 4.12 - 5.1.34 + 8.0.16 1.7.5 1.7.5 3.2.2 diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt index 8a77f51fe3..b19e8b9d29 100755 --- a/tools/dependencies/known-dependencies.txt +++ b/tools/dependencies/known-dependencies.txt @@ -56,7 +56,7 @@ datanucleus-core-4.1.6.jar datanucleus-rdbms-4.1.7.jar derby-10.14.2.0.jar error_prone_annotations-2.1.3.jar -druid-1.1.22.jar +druid-1.2.4.jar gson-2.8.6.jar guava-24.1-jre.jar guava-retrying-2.0.0.jar -- Gitee From 3702845c1f8676157b8bfbd7c80757a756bd4018 Mon Sep 17 00:00:00 2001 From: Kirs Date: Mon, 11 Oct 2021 16:41:43 +0800 Subject: [PATCH 006/383] fix UI updateProcessInstance interface parameter error (#6487) (#6493) Co-authored-by: wangyizhi --- .../js/conf/home/pages/dag/_source/dag.vue | 15 ++++++++------- .../instance/pages/list/_source/list.vue | 2 +- .../pages/taskInstance/_source/list.vue | 4 ++-- .../src/js/conf/home/store/dag/actions.js | 19 ++++++++----------- 4 files changed, 19 insertions(+), 21 deletions(-) diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue index 37b5e10669..77946e7f74 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue @@ -161,7 +161,7 @@ if (this.type === 'instance') { this.instanceId = this.$route.params.id - this.definitionCode = this.$route.query.code + this.definitionCode = this.$route.query.code || this.code } else if (this.type === 'definition') { this.definitionCode = this.$route.params.code } @@ -199,7 +199,8 @@ 'name', 'isDetails', 'projectCode', - 'version' + 'version', + 'code' ]) }, methods: { @@ -311,12 +312,12 @@ .then((res) => { if (this.verifyConditions(res.tasks)) { this.loading(true) - const definitionCode = this.definitionCode - if (definitionCode) { + const isEdit = !!this.definitionCode + if (isEdit) { + const methodName = this.type === 'instance' ? 'updateInstance' : 'updateDefinition' + const methodParam = this.type === 'instance' ? this.instanceId : this.definitionCode // Edit - return this[ - this.type === 'instance' ? 'updateInstance' : 'updateDefinition' - ](definitionCode) + return this[methodName](methodParam) .then((res) => { this.$message({ message: res.msg, diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue index 1112e91af7..2064b04be6 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue @@ -298,7 +298,7 @@ * edit */ _reEdit (item) { - this.$router.push({ path: `/projects/${this.projectCode}/instance/list/${item.id}` }) + this.$router.push({ path: `/projects/${this.projectCode}/instance/list/${item.id}`, query: { code: item.processDefinitionCode } }) }, /** * Rerun diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/_source/list.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/_source/list.vue index 84f396caa2..40b20f8754 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/_source/list.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/_source/list.vue @@ -149,7 +149,7 @@ this.$emit('on-update') }, _go (item) { - this.$router.push({ path: `/projects/${this.projectId}/instance/list/${item.processInstanceId}` }) + this.$router.push({ path: `/projects/${this.projectCode}/instance/list/${item.processInstanceId}` }) } }, watch: { @@ -166,7 +166,7 @@ this.list = this.taskInstanceList }, computed: { - ...mapState('dag', ['projectId']) + ...mapState('dag', ['projectCode']) }, components: { mLog } } diff --git a/dolphinscheduler-ui/src/js/conf/home/store/dag/actions.js b/dolphinscheduler-ui/src/js/conf/home/store/dag/actions.js index 9f4811d60a..14295c702c 100644 --- a/dolphinscheduler-ui/src/js/conf/home/store/dag/actions.js +++ b/dolphinscheduler-ui/src/js/conf/home/store/dag/actions.js @@ -318,19 +318,16 @@ export default { /** * Process instance update */ - updateInstance ({ state }, payload) { + updateInstance ({ state }, instanceId) { return new Promise((resolve, reject) => { - const data = { - globalParams: state.globalParams, - tasks: state.tasks, - tenantId: state.tenantId, - timeout: state.timeout - } - io.put(`projects/${state.projectCode}/process-instances/${payload}`, { - processInstanceJson: JSON.stringify(data), + io.put(`projects/${state.projectCode}/process-instances/${instanceId}`, { + syncDefine: state.syncDefine, + globalParams: JSON.stringify(state.globalParams), locations: JSON.stringify(state.locations), - connects: JSON.stringify(state.connects), - syncDefine: state.syncDefine + taskDefinitionJson: JSON.stringify(state.tasks), + taskRelationJson: JSON.stringify(state.connects), + tenantCode: state.tenantCode, + timeout: state.timeout }, res => { resolve(res) state.isEditDag = false -- Gitee From 8d18ab510e45072871d24f1bad3e65f23635468c Mon Sep 17 00:00:00 2001 From: Kirs Date: Mon, 11 Oct 2021 16:42:22 +0800 Subject: [PATCH 007/383] dist module configuration generates missing task plugin. (#6473) (#6495) Co-authored-by: Kerwin <37063904+zhuangchong@users.noreply.github.com> --- .../src/main/provisio/dolphinscheduler.xml | 46 +++++++++++++------ 1 file changed, 33 insertions(+), 13 deletions(-) diff --git a/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml b/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml index 2ed6a3fc58..9ecd9efbd0 100644 --- a/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml +++ b/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml @@ -81,13 +81,8 @@ - - - - - - - + + @@ -96,19 +91,39 @@ - - + + - - + + - + + + + + + + + + + + + + + + + + + + + + @@ -116,4 +131,9 @@ - \ No newline at end of file + + + + + + -- Gitee From cbc457b1a79866566eb145a8e7c61051b4f06b07 Mon Sep 17 00:00:00 2001 From: Kirs Date: Mon, 11 Oct 2021 16:49:53 +0800 Subject: [PATCH 008/383] [Improvement-6474] [MasterServer] schedule time for process instance optimization (#6477) (#6496) * [DS-6474][MasterServer] change to handle schedule time for process instance in WorkflowExecuteThread * delete all the valid tasks when complement data if id is not null * checkstyle Co-authored-by: caishunfeng <534328519@qq.com> Co-authored-by: wind Co-authored-by: caishunfeng <534328519@qq.com> --- .../master/runner/WorkflowExecuteThread.java | 9 +++++++++ .../service/process/ProcessService.java | 20 +++++++++---------- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index 43fcbd7b9e..42effe7173 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -573,6 +573,15 @@ public class WorkflowExecuteThread implements Runnable { complementListDate = CronUtils.getSelfFireDateList(start, end, schedules); logger.info(" process definition code:{} complement data: {}", processInstance.getProcessDefinitionCode(), complementListDate.toString()); + + if (complementListDate.size() > 0 && Flag.NO == processInstance.getIsSubProcess()) { + processInstance.setScheduleTime(complementListDate.get(0)); + processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( + processDefinition.getGlobalParamMap(), + processDefinition.getGlobalParamList(), + CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime())); + processService.updateProcessInstance(processInstance); + } } } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 50aef45ee7..3c3b41d1b3 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -125,7 +125,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.facebook.presto.jdbc.internal.guava.collect.Lists; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -630,10 +629,8 @@ public class ProcessService { processInstance.setWarningGroupId(warningGroupId); processInstance.setDryRun(command.getDryRun()); - // schedule time - Date scheduleTime = getScheduleTime(command, cmdParam); - if (scheduleTime != null) { - processInstance.setScheduleTime(scheduleTime); + if (command.getScheduleTime() != null) { + processInstance.setScheduleTime(command.getScheduleTime()); } processInstance.setCommandStartTime(command.getStartTime()); processInstance.setLocations(processDefinition.getLocations()); @@ -878,13 +875,14 @@ public class ProcessService { runStatus = processInstance.getState(); break; case COMPLEMENT_DATA: - // delete all the valid tasks when complement data - List taskInstanceList = this.findValidTaskListByProcessId(processInstance.getId()); - for (TaskInstance taskInstance : taskInstanceList) { - taskInstance.setFlag(Flag.NO); - this.updateTaskInstance(taskInstance); + // delete all the valid tasks when complement data if id is not null + if (processInstance.getId() != 0) { + List taskInstanceList = this.findValidTaskListByProcessId(processInstance.getId()); + for (TaskInstance taskInstance : taskInstanceList) { + taskInstance.setFlag(Flag.NO); + this.updateTaskInstance(taskInstance); + } } - initComplementDataParam(processDefinition, processInstance, cmdParam); break; case REPEAT_RUNNING: // delete the recover task names from command parameter -- Gitee From 9942d73db6664a9dc6c9721c9d6447b135b96842 Mon Sep 17 00:00:00 2001 From: Kirs Date: Mon, 11 Oct 2021 20:24:29 +0800 Subject: [PATCH 009/383] [Fix-6478] [Server] Fix the lack of scheduling time in complement parallelism mode (#6491) (#6498) * Fix the lack of scheduling time in complement parallelism mode --- .../api/service/impl/ExecutorServiceImpl.java | 6 ++-- .../api/service/ExecutorServiceTest.java | 31 +++++++++++++++++++ .../service/quartz/cron/CronUtils.java | 1 + 3 files changed, 35 insertions(+), 3 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index b910c4ba01..5042a03714 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -604,15 +604,15 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ createCount = Math.min(listDate.size(), expectedParallelismNumber); } logger.info("In parallel mode, current expectedParallelismNumber:{}", createCount); + + listDate.addLast(end); int chunkSize = listDate.size() / createCount; for (int i = 0; i < createCount; i++) { int rangeStart = i == 0 ? i : (i * chunkSize); int rangeEnd = i == createCount - 1 ? listDate.size() - 1 : rangeStart + chunkSize; - if (rangeEnd == listDate.size()) { - rangeEnd = listDate.size() - 1; - } + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(listDate.get(rangeStart))); cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(listDate.get(rangeEnd))); command.setCommandParam(JSONUtils.toJsonString(cmdParam)); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java index e308f58443..9766c61dff 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java @@ -58,12 +58,15 @@ import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * executor service 2 test */ @RunWith(MockitoJUnitRunner.Silent.class) public class ExecutorServiceTest { + private static final Logger logger = LoggerFactory.getLogger(ExecutorServiceTest.class); @InjectMocks private ExecutorServiceImpl executorService; @@ -326,4 +329,32 @@ public class ExecutorServiceTest { result.put(Constants.STATUS, Status.SUCCESS); return result; } + + @Test + public void testCreateComplementToParallel() { + List result = new ArrayList<>(); + int expectedParallelismNumber = 3; + LinkedList listDate = new LinkedList<>(); + listDate.add(0); + listDate.add(1); + listDate.add(2); + listDate.add(3); + + int createCount = Math.min(listDate.size(), expectedParallelismNumber); + logger.info("In parallel mode, current expectedParallelismNumber:{}", createCount); + + listDate.addLast(4); + int chunkSize = listDate.size() / createCount; + for (int i = 0; i < createCount; i++) { + int rangeStart = i == 0 ? i : (i * chunkSize); + int rangeEnd = i == createCount - 1 ? listDate.size() - 1 : rangeStart + chunkSize; + logger.info("rangeStart:{}, rangeEnd:{}",rangeStart, rangeEnd); + result.add(listDate.get(rangeStart) + "," + listDate.get(rangeEnd)); + } + + Assert.assertEquals("0,1", result.get(0)); + Assert.assertEquals("1,2", result.get(1)); + Assert.assertEquals("2,4", result.get(2)); + + } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java index f23da2620f..f195d62f0f 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java @@ -190,6 +190,7 @@ public class CronUtils { return result; } + // support left closed and right open time interval (startDate <= N < endDate) Date from = new Date(startTime.getTime() - Constants.SECOND_TIME_MILLIS); Date to = new Date(endTime.getTime() - Constants.SECOND_TIME_MILLIS); -- Gitee From 6ffd019683c84b61378a56895f2903ec460311c4 Mon Sep 17 00:00:00 2001 From: OS <29528966+lenboo@users.noreply.github.com> Date: Tue, 12 Oct 2021 17:32:04 +0800 Subject: [PATCH 010/383] [cherry-pick][6471-6502]cherry-pick 6471 && 6502 to 2.0-prepare (#6508) --- .../api/service/impl/ExecutorServiceImpl.java | 16 ++- .../dolphinscheduler/dao/entity/Command.java | 41 +++++- .../dao/mapper/CommandMapper.java | 7 - .../dao/mapper/CommandMapper.xml | 10 -- .../dao/mapper/CommandMapperTest.java | 4 +- .../server/master/config/MasterConfig.java | 12 ++ .../master/runner/MasterSchedulerService.java | 28 +++- .../src/main/resources/master.properties | 3 + .../service/process/ProcessService.java | 133 ++++++++---------- .../service/quartz/ProcessScheduleJob.java | 1 + .../service/process/ProcessServiceTest.java | 48 ++++--- sql/dolphinscheduler_h2.sql | 4 + sql/dolphinscheduler_mysql.sql | 34 +++-- sql/dolphinscheduler_postgre.sql | 63 +++++---- 14 files changed, 235 insertions(+), 169 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index 5042a03714..fe5f9101b1 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -283,13 +283,13 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ switch (executeType) { case REPEAT_RUNNING: - result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), CommandType.REPEAT_RUNNING, startParams); + result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.REPEAT_RUNNING, startParams); break; case RECOVER_SUSPENDED_PROCESS: - result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams); + result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams); break; case START_FAILURE_TASK_PROCESS: - result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), CommandType.START_FAILURE_TASK_PROCESS, startParams); + result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.START_FAILURE_TASK_PROCESS, startParams); break; case STOP: if (processInstance.getState() == ExecutionStatus.READY_STOP) { @@ -409,10 +409,11 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ * @param loginUser login user * @param instanceId instance id * @param processDefinitionCode process definition code + * @param version * @param commandType command type * @return insert result code */ - private Map insertCommand(User loginUser, Integer instanceId, long processDefinitionCode, CommandType commandType, String startParams) { + private Map insertCommand(User loginUser, Integer instanceId, long processDefinitionCode, int processVersion, CommandType commandType, String startParams) { Map result = new HashMap<>(); //To add startParams only when repeat running is needed @@ -427,6 +428,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ command.setProcessDefinitionCode(processDefinitionCode); command.setCommandParam(JSONUtils.toJsonString(cmdParam)); command.setExecutorId(loginUser.getId()); + command.setProcessDefinitionVersion(processVersion); + command.setProcessInstanceId(instanceId); if (!processService.verifyIsNeedCreateCommand(command)) { putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, processDefinitionCode); @@ -545,6 +548,11 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ command.setWorkerGroup(workerGroup); command.setEnvironmentCode(environmentCode); command.setDryRun(dryRun); + ProcessDefinition processDefinition = processService.findProcessDefinitionByCode(processDefineCode); + if (processDefinition != null) { + command.setProcessDefinitionVersion(processDefinition.getVersion()); + } + command.setProcessInstanceId(0); Date start = null; Date end = null; diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java index b1ed217537..ae2ff6258a 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java @@ -132,6 +132,12 @@ public class Command { @TableField("dry_run") private int dryRun; + @TableField("process_instance_id") + private int processInstanceId; + + @TableField("process_definition_version") + private int processDefinitionVersion; + public Command() { this.taskDependType = TaskDependType.TASK_POST; this.failureStrategy = FailureStrategy.CONTINUE; @@ -152,7 +158,10 @@ public class Command { String workerGroup, Long environmentCode, Priority processInstancePriority, - int dryRun) { + int dryRun, + int processInstanceId, + int processDefinitionVersion + ) { this.commandType = commandType; this.executorId = executorId; this.processDefinitionCode = processDefinitionCode; @@ -168,6 +177,8 @@ public class Command { this.environmentCode = environmentCode; this.processInstancePriority = processInstancePriority; this.dryRun = dryRun; + this.processInstanceId = processInstanceId; + this.processDefinitionVersion = processDefinitionVersion; } public TaskDependType getTaskDependType() { @@ -298,6 +309,22 @@ public class Command { this.dryRun = dryRun; } + public int getProcessInstanceId() { + return processInstanceId; + } + + public void setProcessInstanceId(int processInstanceId) { + this.processInstanceId = processInstanceId; + } + + public int getProcessDefinitionVersion() { + return processDefinitionVersion; + } + + public void setProcessDefinitionVersion(int processDefinitionVersion) { + this.processDefinitionVersion = processDefinitionVersion; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -353,8 +380,13 @@ public class Command { if (processInstancePriority != command.processInstancePriority) { return false; } + if (processInstanceId != command.processInstanceId) { + return false; + } + if (processDefinitionVersion != command.getProcessDefinitionVersion()) { + return false; + } return !(updateTime != null ? !updateTime.equals(command.updateTime) : command.updateTime != null); - } @Override @@ -375,6 +407,8 @@ public class Command { result = 31 * result + (workerGroup != null ? workerGroup.hashCode() : 0); result = 31 * result + (environmentCode != null ? environmentCode.hashCode() : 0); result = 31 * result + dryRun; + result = 31 * result + processInstanceId; + result = 31 * result + processDefinitionVersion; return result; } @@ -397,7 +431,10 @@ public class Command { + ", workerGroup='" + workerGroup + '\'' + ", environmentCode='" + environmentCode + '\'' + ", dryRun='" + dryRun + '\'' + + ", processInstanceId='" + processInstanceId + '\'' + + ", processDefinitionVersion='" + processDefinitionVersion + '\'' + '}'; } + } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java index 2bbfb4b7b1..22913845c3 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java @@ -31,13 +31,6 @@ import java.util.List; */ public interface CommandMapper extends BaseMapper { - - /** - * get one command - * @return command - */ - Command getOneToRun(); - /** * count command state * @param userId userId diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml index 5b2d6b4d8c..b0ea477431 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml @@ -18,16 +18,6 @@ - select tepd.code as work_flow_code,tepd.name as work_flow_name, - "" as source_work_flow_code, + '' as source_work_flow_code, tepd.release_state as work_flow_publish_status, tes.start_time as schedule_start_time, tes.end_time as schedule_end_time, @@ -44,7 +44,7 @@ +