diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml
index 448a4346f947..c70ecb8cd7df 100644
--- a/.github/workflows/e2e.yml
+++ b/.github/workflows/e2e.yml
@@ -136,6 +136,8 @@ jobs:
class: org.apache.dolphinscheduler.e2e.cases.HiveDataSourceE2ETest
- name: DolphinDBDataSourceE2ETest
class: org.apache.dolphinscheduler.e2e.cases.DolphinDBDataSourceE2ETest
+ - name: DatavinesTaskE2ETest
+ class: org.apache.dolphinscheduler.e2e.cases.tasks.DatavinesTaskE2ETest
env:
RECORDING_PATH: /tmp/recording-${{ matrix.case.name }}
steps:
diff --git a/docs/docs/en/guide/task/datavines.md b/docs/docs/en/guide/task/datavines.md
new file mode 100644
index 000000000000..32a71f80e73c
--- /dev/null
+++ b/docs/docs/en/guide/task/datavines.md
@@ -0,0 +1,30 @@
+# Datavines
+
+## Overview
+
+Use `Datavines Task` to create a datavines-type task and support data quality job in Datavines. When the worker executes `Datavines Task`,
+it will call `Datavines API` to trigger datavines job. Click [here](https://datavane.github.io/datavines-website/) for details about `Datavines`.
+
+## Create Task
+
+- Click Project Management-Project Name-Workflow Definition, and click the "Create Workflow" button to enter the DAG editing page.
+- Drag
from the toolbar to the canvas.
+
+## Task Parameter
+
+- Please refer to [DolphinScheduler Task Parameters Appendix](appendix.md) `Default Task Parameters` section for default parameters.
+
+| **Parameter** | **Description** |
+|-------------------|-------------------------------------------------------------------------------------------------------|
+| Datavines Address | The URL for the Datavines service, e.g., http://localhost:5600. |
+| Datavines Job ID | The unique job id for a datavines job. |
+| Datavines token | The Datavines service access token can be obtained through token management on the Datavines service. |
+| Block on Failure | When turned on, if the data quality check result is failed, the task result will be set as failed. |
+
+## Task Example
+
+This example illustrates how to create a datavines task node.
+
+
+
+
diff --git a/docs/docs/zh/guide/task/datavines.md b/docs/docs/zh/guide/task/datavines.md
new file mode 100644
index 000000000000..af4a3ca47805
--- /dev/null
+++ b/docs/docs/zh/guide/task/datavines.md
@@ -0,0 +1,31 @@
+# Datavines
+
+## 综述
+
+`Datavines`任务类型,用于创建并执行 `Datavines` 类型任务来执行 Datavines 中的数据质量检查作业。Worker 执行该任务的时候,会通过 `Datavines API` 触发 `Datavines 的作业`。
+点击 [这里](https://datavane.github.io/datavines-website/) 获取更多关于 `Datavines` 的信息。
+
+## 创建任务
+
+- 点击项目管理-项目名称-工作流定义,点击"创建工作流"按钮,进入DAG编辑页面。
+- 工具栏中拖动
到画板中,即可完成创建。
+
+## 任务参数
+
+- 默认参数说明请参考[DolphinScheduler任务参数附录](appendix.md)`默认任务参数`一栏。
+
+| **任务参数** | **描述** |
+|-----------------|------------------------------------------------------|
+| Datavines 地址 | Datavines 服务的 url,例如:`http://localhost:5600`。 |
+| Datavines 作业 ID | Datavines 作业对应的唯一ID。 |
+| Datavines token | Datavines 服务访问 token, 可在 Datavines 服务上的 token 管理中取得。 |
+| 检查失败时阻塞 | 开启时,数据质量检查结果为失败时会将任务结果置为失败。 |
+
+## 例子
+
+这个示例展示了如何创建 Datavines 任务节点:
+
+
+
+
+
diff --git a/docs/img/tasks/demo/datavines_job_id.png b/docs/img/tasks/demo/datavines_job_id.png
new file mode 100644
index 000000000000..c850a63c631a
Binary files /dev/null and b/docs/img/tasks/demo/datavines_job_id.png differ
diff --git a/docs/img/tasks/demo/datavines_task.png b/docs/img/tasks/demo/datavines_task.png
new file mode 100644
index 000000000000..0184c743883d
Binary files /dev/null and b/docs/img/tasks/demo/datavines_task.png differ
diff --git a/docs/img/tasks/icons/datavines.png b/docs/img/tasks/icons/datavines.png
new file mode 100644
index 000000000000..375d9da823a8
Binary files /dev/null and b/docs/img/tasks/icons/datavines.png differ
diff --git a/dolphinscheduler-api/src/main/resources/task-type-config.yaml b/dolphinscheduler-api/src/main/resources/task-type-config.yaml
index e95d4190fc60..bf3271559493 100644
--- a/dolphinscheduler-api/src/main/resources/task-type-config.yaml
+++ b/dolphinscheduler-api/src/main/resources/task-type-config.yaml
@@ -47,6 +47,8 @@ task:
- 'SEATUNNEL'
- 'DATAX'
- 'SQOOP'
+ dataQuality:
+ - 'DATAVINES'
machineLearning:
- 'JUPYTER'
- 'MLFLOW'
diff --git a/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/cases/tasks/DatavinesTaskE2ETest.java b/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/cases/tasks/DatavinesTaskE2ETest.java
new file mode 100644
index 000000000000..d46296e5799b
--- /dev/null
+++ b/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/cases/tasks/DatavinesTaskE2ETest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.e2e.cases.tasks;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.dolphinscheduler.e2e.cases.workflow.BaseWorkflowE2ETest;
+import org.apache.dolphinscheduler.e2e.core.DolphinScheduler;
+import org.apache.dolphinscheduler.e2e.core.WebDriverHolder;
+import org.apache.dolphinscheduler.e2e.pages.LoginPage;
+import org.apache.dolphinscheduler.e2e.pages.project.ProjectPage;
+import org.apache.dolphinscheduler.e2e.pages.project.workflow.TaskInstanceTab;
+import org.apache.dolphinscheduler.e2e.pages.project.workflow.WorkflowDefinitionTab;
+import org.apache.dolphinscheduler.e2e.pages.project.workflow.WorkflowForm;
+import org.apache.dolphinscheduler.e2e.pages.project.workflow.WorkflowInstanceTab;
+import org.apache.dolphinscheduler.e2e.pages.project.workflow.task.DatavinesTaskForm;
+import org.apache.dolphinscheduler.e2e.pages.security.SecurityPage;
+import org.apache.dolphinscheduler.e2e.pages.security.TenantPage;
+import org.apache.dolphinscheduler.e2e.pages.security.UserPage;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.junitpioneer.jupiter.DisableIfTestFails;
+
+@TestMethodOrder(MethodOrderer.MethodName.class)
+@DolphinScheduler(composeFiles = "docker/datavines-task/docker-compose.yaml")
+@DisableIfTestFails
+public class DatavinesTaskE2ETest extends BaseWorkflowE2ETest {
+
+ private static final String mockServerAddress = "http://mockServer:1080";
+
+ private static final String jobId = "1";
+
+ private static final String token = "test-token";
+
+ @BeforeAll
+ public static void setup() {
+ browser = WebDriverHolder.getWebDriver();
+
+ TenantPage tenantPage = new LoginPage(browser)
+ .login(adminUser)
+ .goToNav(SecurityPage.class)
+ .goToTab(TenantPage.class);
+
+ if (tenantPage.tenants().stream().noneMatch(tenant -> tenant.tenantCode().equals(adminUser.getTenant()))) {
+ tenantPage
+ .create(adminUser.getTenant())
+ .goToNav(SecurityPage.class)
+ .goToTab(UserPage.class)
+ .update(adminUser);
+ }
+
+ tenantPage
+ .goToNav(ProjectPage.class)
+ .createProjectUntilSuccess(projectName);
+ }
+
+ @Test
+ @Order(10)
+ void testRunDatavinesTasks_SuccessCase() {
+ WorkflowDefinitionTab workflowDefinitionPage =
+ new ProjectPage(browser)
+ .goToNav(ProjectPage.class)
+ .goTo(projectName)
+ .goToTab(WorkflowDefinitionTab.class);
+
+ String workflowName = "DatavinesSuccessCase";
+ String taskName = "DatavinesSuccessTask";
+ workflowDefinitionPage
+ .createWorkflow()
+ .addTask(WorkflowForm.TaskType.DATAVINES)
+ .address(mockServerAddress)
+ .jobId(jobId)
+ .token(token)
+ .name(taskName)
+ .submit()
+
+ .submit()
+ .name(workflowName)
+ .submit();
+
+ untilWorkflowDefinitionExist(workflowName);
+
+ workflowDefinitionPage.publish(workflowName);
+
+ runWorkflow(workflowName);
+ untilWorkflowInstanceExist(workflowName);
+ WorkflowInstanceTab.Row workflowInstance = untilWorkflowInstanceSuccess(workflowName);
+ assertThat(workflowInstance.executionTime()).isEqualTo(1);
+
+ TaskInstanceTab.Row taskInstance = untilTaskInstanceSuccess(workflowName, taskName);
+ assertThat(taskInstance.retryTimes()).isEqualTo(0);
+ }
+
+ @Test
+ @Order(20)
+ void testRunDatavinesTasks_FailureBlockDisabledCase() {
+ WorkflowDefinitionTab workflowDefinitionPage =
+ new ProjectPage(browser)
+ .goToNav(ProjectPage.class)
+ .goTo(projectName)
+ .goToTab(WorkflowDefinitionTab.class);
+
+ String workflowName = "DatavinesFailureBlockDisabledCase";
+ String taskName = "DatavinesFailureBlockDisabledTask";
+ workflowDefinitionPage
+ .createWorkflow()
+ .addTask(WorkflowForm.TaskType.DATAVINES)
+ .address(mockServerAddress)
+ .jobId(jobId)
+ .token(token)
+ .name(taskName)
+ .submit()
+
+ .submit()
+ .name(workflowName)
+ .submit();
+
+ untilWorkflowDefinitionExist(workflowName);
+
+ workflowDefinitionPage.publish(workflowName);
+
+ runWorkflow(workflowName);
+ untilWorkflowInstanceExist(workflowName);
+ WorkflowInstanceTab.Row workflowInstance = untilWorkflowInstanceSuccess(workflowName);
+ assertThat(workflowInstance.executionTime()).isEqualTo(1);
+
+ TaskInstanceTab.Row taskInstance = untilTaskInstanceSuccess(workflowName, taskName);
+ assertThat(taskInstance.retryTimes()).isEqualTo(0);
+ }
+
+ @Test
+ @Order(30)
+ void testRunDatavinesTasks_FailureCase() {
+ WorkflowDefinitionTab workflowDefinitionPage =
+ new ProjectPage(browser)
+ .goToNav(ProjectPage.class)
+ .goTo(projectName)
+ .goToTab(WorkflowDefinitionTab.class);
+
+ String workflowName = "DatavinesFailureCase";
+ String taskName = "DatavinesFailureTask";
+ // Job ID "2" is configured in MockServer to return execution status FAILURE
+ workflowDefinitionPage
+ .createWorkflow()
+ .addTask(WorkflowForm.TaskType.DATAVINES)
+ .address(mockServerAddress)
+ .jobId("2")
+ .token(token)
+ .name(taskName)
+ .submit()
+
+ .submit()
+ .name(workflowName)
+ .submit();
+
+ untilWorkflowDefinitionExist(workflowName);
+
+ workflowDefinitionPage.publish(workflowName);
+
+ runWorkflow(workflowName);
+ untilWorkflowInstanceExist(workflowName);
+ WorkflowInstanceTab.Row workflowInstance = untilWorkflowInstanceFailed(workflowName);
+ assertThat(workflowInstance.executionTime()).isEqualTo(1);
+ }
+}
diff --git a/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/pages/project/workflow/WorkflowForm.java b/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/pages/project/workflow/WorkflowForm.java
index 7c47eb36a1dd..9845909c6858 100644
--- a/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/pages/project/workflow/WorkflowForm.java
+++ b/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/pages/project/workflow/WorkflowForm.java
@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.e2e.pages.project.workflow;
import org.apache.dolphinscheduler.e2e.core.WebDriverWaitFactory;
+import org.apache.dolphinscheduler.e2e.pages.project.workflow.task.DatavinesTaskForm;
import org.apache.dolphinscheduler.e2e.pages.project.workflow.task.HttpTaskForm;
import org.apache.dolphinscheduler.e2e.pages.project.workflow.task.JavaTaskForm;
import org.apache.dolphinscheduler.e2e.pages.project.workflow.task.PythonTaskForm;
@@ -90,6 +91,8 @@ public T addTask(TaskType type) {
return (T) new JavaTaskForm(this);
case PYTHON:
return (T) new PythonTaskForm(this);
+ case DATAVINES:
+ return (T) new DatavinesTaskForm(this);
}
throw new UnsupportedOperationException("Unknown task type");
}
@@ -129,6 +132,7 @@ public enum TaskType {
SWITCH,
HTTP,
JAVA,
- PYTHON
+ PYTHON,
+ DATAVINES
}
}
diff --git a/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/pages/project/workflow/task/DatavinesTaskForm.java b/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/pages/project/workflow/task/DatavinesTaskForm.java
new file mode 100644
index 000000000000..e684f455c64a
--- /dev/null
+++ b/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/pages/project/workflow/task/DatavinesTaskForm.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.e2e.pages.project.workflow.task;
+
+import org.apache.dolphinscheduler.e2e.core.WebDriverWaitFactory;
+import org.apache.dolphinscheduler.e2e.pages.project.workflow.WorkflowForm;
+
+import org.openqa.selenium.WebDriver;
+import org.openqa.selenium.WebElement;
+import org.openqa.selenium.support.FindBy;
+import org.openqa.selenium.support.FindBys;
+import org.openqa.selenium.support.PageFactory;
+import org.openqa.selenium.support.ui.ExpectedConditions;
+
+public final class DatavinesTaskForm extends TaskNodeForm {
+
+ private final WebDriver driver;
+
+ @FindBys({
+ @FindBy(className = "input-datavines-address"),
+ @FindBy(tagName = "input")
+ })
+ private WebElement addressInput;
+
+ @FindBys({
+ @FindBy(className = "input-datavines-job-id"),
+ @FindBy(tagName = "input")
+ })
+ private WebElement jobIdInput;
+
+ @FindBys({
+ @FindBy(className = "input-datavines-token"),
+ @FindBy(tagName = "input")
+ })
+ private WebElement tokenInput;
+
+ public DatavinesTaskForm(WorkflowForm parent) {
+ super(parent);
+ this.driver = parent.driver();
+ PageFactory.initElements(driver, this);
+ }
+
+ public DatavinesTaskForm address(String address) {
+ WebDriverWaitFactory.createWebDriverWait(driver)
+ .until(ExpectedConditions.elementToBeClickable(addressInput));
+ addressInput.sendKeys(address);
+ return this;
+ }
+
+ public DatavinesTaskForm jobId(String jobId) {
+ WebDriverWaitFactory.createWebDriverWait(driver)
+ .until(ExpectedConditions.elementToBeClickable(jobIdInput));
+ jobIdInput.sendKeys(jobId);
+ return this;
+ }
+
+ public DatavinesTaskForm token(String token) {
+ WebDriverWaitFactory.createWebDriverWait(driver)
+ .until(ExpectedConditions.elementToBeClickable(tokenInput));
+ tokenInput.sendKeys(token);
+ return this;
+ }
+}
diff --git a/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/resources/docker/datavines-task/docker-compose.yaml b/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/resources/docker/datavines-task/docker-compose.yaml
new file mode 100644
index 000000000000..e3afb251a5b7
--- /dev/null
+++ b/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/resources/docker/datavines-task/docker-compose.yaml
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+version: "3.8"
+
+services:
+ dolphinscheduler:
+ image: apache/dolphinscheduler-standalone-server:ci
+ environment:
+ MASTER_SERVER_LOAD_PROTECTION_MAX_SYSTEM_CPU_USAGE_PERCENTAGE_THRESHOLDS: 0.95
+ MASTER_SERVER_LOAD_PROTECTION_MAX_JVM_CPU_USAGE_PERCENTAGE_THRESHOLDS: 0.95
+ MASTER_SERVER_LOAD_PROTECTION_MAX_SYSTEM_MEMORY_USAGE_PERCENTAGE_THRESHOLDS: 0.95
+ MASTER_SERVER_LOAD_PROTECTION_MAX_DISK_USAGE_PERCENTAGE_THRESHOLDS: 0.95
+ WORKER_SERVER_LOAD_PROTECTION_MAX_SYSTEM_CPU_USAGE_PERCENTAGE_THRESHOLDS: 0.95
+ WORKER_SERVER_LOAD_PROTECTION_MAX_JVM_CPU_USAGE_PERCENTAGE_THRESHOLDS: 0.95
+ WORKER_SERVER_LOAD_PROTECTION_MAX_SYSTEM_MEMORY_USAGE_PERCENTAGE_THRESHOLDS: 0.95
+ WORKER_SERVER_LOAD_PROTECTION_MAX_DISK_USAGE_PERCENTAGE_THRESHOLDS: 0.95
+ WORKER_TENANT_CONFIG_AUTO_CREATE_TENANT_ENABLED: 'true'
+ ports:
+ - "12345:12345"
+ networks:
+ - e2e
+ healthcheck:
+ test: [ "CMD", "curl", "http://localhost:12345/dolphinscheduler/actuator/health" ]
+ interval: 5s
+ timeout: 5s
+ retries: 120
+ mockServer:
+ image: mockserver/mockserver:5.14.0
+ environment:
+ MOCKSERVER_INITIALIZATION_JSON_PATH: /config/mockserver-config.json
+ ports:
+ - "1080:1080"
+ networks:
+ - e2e
+ volumes:
+ - type: bind
+ source: ./
+ target: /config
+ healthcheck:
+ test: [ "CMD", "curl", "http://localhost:1080/" ]
+ interval: 5s
+ timeout: 5s
+ retries: 120
+
+networks:
+ e2e:
diff --git a/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/resources/docker/datavines-task/mockserver-config.json b/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/resources/docker/datavines-task/mockserver-config.json
new file mode 100644
index 000000000000..c6bce3f24960
--- /dev/null
+++ b/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/resources/docker/datavines-task/mockserver-config.json
@@ -0,0 +1,62 @@
+[
+ {
+ "httpRequest": {
+ "method": "POST",
+ "path": "/api/v1/openapi/job/execute/1"
+ },
+ "httpResponse": {
+ "body": "{\"code\":200,\"msg\":\"success\",\"data\":\"1\"}",
+ "headers": {
+ "Content-Type": "application/json"
+ }
+ }
+ },
+ {
+ "httpRequest": {
+ "method": "GET",
+ "path": "/api/v1/openapi/job/execution/status/1"
+ },
+ "httpResponse": {
+ "body": "{\"code\":200,\"msg\":\"success\",\"data\":\"SUCCESS\"}",
+ "headers": {
+ "Content-Type": "application/json"
+ }
+ }
+ },
+ {
+ "httpRequest": {
+ "method": "GET",
+ "path": "/api/v1/openapi/job/execution/result/1"
+ },
+ "httpResponse": {
+ "body": "{\"code\":200,\"msg\":\"success\",\"data\":\"SUCCESS\"}",
+ "headers": {
+ "Content-Type": "application/json"
+ }
+ }
+ },
+ {
+ "httpRequest": {
+ "method": "POST",
+ "path": "/api/v1/openapi/job/execute/2"
+ },
+ "httpResponse": {
+ "body": "{\"code\":200,\"msg\":\"success\",\"data\":\"2\"}",
+ "headers": {
+ "Content-Type": "application/json"
+ }
+ }
+ },
+ {
+ "httpRequest": {
+ "method": "GET",
+ "path": "/api/v1/openapi/job/execution/status/2"
+ },
+ "httpResponse": {
+ "body": "{\"code\":200,\"msg\":\"success\",\"data\":\"FAILURE\"}",
+ "headers": {
+ "Content-Type": "application/json"
+ }
+ }
+ }
+]
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml
index f3e6f8855568..e664aea80b17 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml
@@ -160,6 +160,12 @@
${project.version}
+
+ org.apache.dolphinscheduler
+ dolphinscheduler-task-datavines
+ ${project.version}
+
+
org.apache.dolphinscheduler
dolphinscheduler-task-java
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/pom.xml
new file mode 100644
index 000000000000..a79331dfaf43
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/pom.xml
@@ -0,0 +1,78 @@
+
+
+
+ 4.0.0
+
+ org.apache.dolphinscheduler
+ dolphinscheduler-task-plugin
+ dev-SNAPSHOT
+
+
+ dolphinscheduler-task-datavines
+ jar
+
+
+ task.datavines
+
+
+
+
+ org.apache.dolphinscheduler
+ dolphinscheduler-task-api
+ ${project.version}
+ provided
+
+
+
+ org.apache.dolphinscheduler
+ dolphinscheduler-common
+ ${project.version}
+ provided
+
+
+
+ org.apache.httpcomponents
+ httpcore
+
+
+
+ org.junit.jupiter
+ junit-jupiter-params
+ test
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+
+ shade
+
+ package
+
+
+
+
+
+
+
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesParameters.java
new file mode 100644
index 000000000000..76ad1d6f3f0b
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesParameters.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.datavines;
+
+import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Collections;
+import java.util.List;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+@EqualsAndHashCode(callSuper = true)
+@Data
+public class DatavinesParameters extends AbstractParameters {
+
+ private String address;
+
+ private String jobId;
+
+ private String token;
+
+ private boolean failureBlock;
+
+ @Override
+ public boolean checkParameters() {
+ return StringUtils.isNotEmpty(this.address)
+ && StringUtils.isNotEmpty(this.jobId)
+ && StringUtils.isNotEmpty(this.token);
+ }
+
+ @Override
+ public List getResourceFilesList() {
+ return Collections.emptyList();
+ }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesTask.java
new file mode 100644
index 000000000000..d84872d87f21
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesTask.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.datavines;
+
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.datavines.utils.RequestUtils;
+
+import java.util.Collections;
+import java.util.List;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.MissingNode;
+import com.fasterxml.jackson.databind.node.NullNode;
+
+@Slf4j
+public class DatavinesTask extends AbstractRemoteTask {
+
+ private final TaskExecutionContext taskExecutionContext;
+
+ private DatavinesParameters datavinesParameters;
+ private String jobExecutionId;
+ private boolean executionStatus;
+
+ protected DatavinesTask(TaskExecutionContext taskExecutionContext) {
+ super(taskExecutionContext);
+ this.taskExecutionContext = taskExecutionContext;
+ }
+
+ @Override
+ public List getApplicationIds() throws TaskException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void init() {
+ final String taskParams = taskExecutionContext.getTaskParams();
+ this.datavinesParameters = JSONUtils.parseObject(taskParams, DatavinesParameters.class);
+ log.info("initialize datavines task params : {}", JSONUtils.toPrettyJsonString(datavinesParameters));
+ if (this.datavinesParameters == null || !this.datavinesParameters.checkParameters()) {
+ throw new DatavinesTaskException("datavines task params is not valid");
+ }
+ }
+
+ @Override
+ public void submitApplication() throws TaskException {
+ executeJob();
+ }
+
+ @Override
+ public void trackApplicationStatus() throws TaskException {
+ trackApplicationStatusInner();
+ }
+
+ private void executeJob() {
+ try {
+ String address = this.datavinesParameters.getAddress();
+ String jobId = this.datavinesParameters.getJobId();
+ String token = this.datavinesParameters.getToken();
+ String apiResultDataKey = DatavinesTaskConstants.API_RESULT_DATA;
+ JsonNode result = RequestUtils.executeJob(address, jobId, token);
+ if (checkResult(result)) {
+ jobExecutionId = result.get(apiResultDataKey).asText();
+ executionStatus = true;
+ }
+ } catch (Exception ex) {
+ Thread.currentThread().interrupt();
+ log.error(DatavinesTaskConstants.SUBMIT_FAILED_MSG, ex);
+ setExitStatusCode(EXIT_CODE_FAILURE);
+ throw new TaskException(DatavinesTaskConstants.SUBMIT_FAILED_MSG, ex);
+ }
+ }
+
+ public void trackApplicationStatusInner() throws TaskException {
+ try {
+ String address = this.datavinesParameters.getAddress();
+ if (executionStatus && jobExecutionId == null) {
+ // Use address-taskId as app id
+ setAppIds(String.format(DatavinesTaskConstants.APPIDS_FORMAT, address, this.jobExecutionId));
+ setExitStatusCode(mapStatusToExitCode(false));
+ log.info("datavines task failed.");
+ return;
+ }
+ String apiResultDataKey = DatavinesTaskConstants.API_RESULT_DATA;
+ boolean finishFlag = false;
+ while (!finishFlag) {
+ JsonNode jobExecutionStatus =
+ RequestUtils.getJobExecutionStatus(address, jobExecutionId,
+ this.datavinesParameters.getToken());
+ if (!checkResult(jobExecutionStatus)) {
+ break;
+ }
+ String jobExecutionStatusStr = jobExecutionStatus.get(apiResultDataKey).asText();
+ switch (jobExecutionStatusStr) {
+ case DatavinesTaskConstants.STATUS_SUCCESS:
+ setAppIds(String.format(DatavinesTaskConstants.APPIDS_FORMAT, address, this.jobExecutionId));
+ JsonNode jobExecutionResult =
+ RequestUtils.getJobExecutionResult(address, jobExecutionId,
+ this.datavinesParameters.getToken());
+ if (!checkResult(jobExecutionResult)) {
+ break;
+ }
+
+ String jobExecutionResultStr = jobExecutionResult.get(apiResultDataKey).asText();
+ boolean checkResult = true;
+ if (this.datavinesParameters.isFailureBlock()) {
+ checkResult = DatavinesTaskConstants.STATUS_SUCCESS.equals(jobExecutionResultStr);
+ }
+
+ setExitStatusCode(mapStatusToExitCode(checkResult));
+ log.info("datavines task finished, execution status is {} and check result is {}",
+ jobExecutionStatusStr, jobExecutionResultStr);
+ finishFlag = true;
+ break;
+ case DatavinesTaskConstants.STATUS_FAILURE:
+ case DatavinesTaskConstants.STATUS_KILL:
+ errorHandle("task execution status: " + jobExecutionStatusStr);
+ finishFlag = true;
+ break;
+ default:
+ Thread.sleep(DatavinesTaskConstants.SLEEP_MILLIS);
+ }
+ }
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ log.error(DatavinesTaskConstants.TRACK_FAILED_MSG, ex);
+ setExitStatusCode(EXIT_CODE_FAILURE);
+ throw new TaskException(DatavinesTaskConstants.TRACK_FAILED_MSG, ex);
+ }
+ }
+
+ /**
+ * map datavines task status to exitStatusCode
+ *
+ * @param status datavines job status
+ * @return exitStatusCode
+ */
+ private int mapStatusToExitCode(boolean status) {
+ if (status) {
+ return EXIT_CODE_SUCCESS;
+ } else {
+ return EXIT_CODE_FAILURE;
+ }
+ }
+
+ private boolean checkResult(JsonNode result) {
+ boolean isCorrect = true;
+ if (result == null
+ || result instanceof MissingNode
+ || result instanceof NullNode
+ || !result.hasNonNull(DatavinesTaskConstants.API_RESULT_CODE)) {
+ errorHandle(DatavinesTaskConstants.API_ERROR_MSG);
+ isCorrect = false;
+ } else if (result.get(DatavinesTaskConstants.API_RESULT_CODE)
+ .asInt() != DatavinesTaskConstants.API_RESULT_CODE_SUCCESS) {
+ JsonNode errorMsg = result.get(DatavinesTaskConstants.API_RESULT_MSG);
+ errorHandle(errorMsg == null || errorMsg instanceof NullNode
+ ? DatavinesTaskConstants.API_ERROR_MSG
+ : errorMsg);
+ isCorrect = false;
+ }
+ return isCorrect;
+ }
+
+ private void errorHandle(Object msg) {
+ setExitStatusCode(EXIT_CODE_FAILURE);
+ log.error("datavines task execute failed with error: {}", msg);
+ }
+
+ @Override
+ public AbstractParameters getParameters() {
+ return datavinesParameters;
+ }
+
+ @Override
+ public void cancelApplication() throws TaskException {
+ String address = this.datavinesParameters.getAddress();
+ log.info("trying terminate datavines task, taskId: {}, address: {}, taskId: {}",
+ this.taskExecutionContext.getTaskInstanceId(),
+ address,
+ jobExecutionId);
+ RequestUtils.killJobExecution(address, jobExecutionId, this.datavinesParameters.getToken());
+ log.warn("datavines task terminated, taskId: {}, address: {}, jobExecutionId: {}",
+ this.taskExecutionContext.getTaskInstanceId(),
+ address,
+ jobExecutionId);
+ }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesTaskChannel.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesTaskChannel.java
new file mode 100644
index 000000000000..be9f807f6f30
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesTaskChannel.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.datavines;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+
+public class DatavinesTaskChannel implements TaskChannel {
+
+ @Override
+ public AbstractTask createTask(TaskExecutionContext taskRequest) {
+ return new DatavinesTask(taskRequest);
+ }
+
+ @Override
+ public AbstractParameters parseParameters(String taskParams) {
+ return JSONUtils.parseObject(taskParams, DatavinesParameters.class);
+ }
+
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesTaskChannelFactory.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesTaskChannelFactory.java
new file mode 100644
index 000000000000..0ce2e6e66bd6
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesTaskChannelFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.datavines;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(TaskChannelFactory.class)
+public class DatavinesTaskChannelFactory implements TaskChannelFactory {
+
+ @Override
+ public String getName() {
+ return "DATAVINES";
+ }
+
+ @Override
+ public TaskChannel create() {
+ return new DatavinesTaskChannel();
+ }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesTaskConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesTaskConstants.java
new file mode 100644
index 000000000000..6c29e497dd9f
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesTaskConstants.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.datavines;
+
+/**
+ * Constants for Datavines tasks.
+ */
+public class DatavinesTaskConstants {
+
+ private DatavinesTaskConstants() {
+ throw new IllegalStateException("Utility class");
+ }
+
+ private static final String API_ROUTE = "/api/v1/openapi";
+ public static final String EXECUTE_JOB = API_ROUTE + "/job/execute/";
+ public static final String GET_JOB_EXECUTION_STATUS = API_ROUTE + "/job/execution/status/";
+ public static final String GET_JOB_EXECUTION_RESULT = API_ROUTE + "/job/execution/result/";
+ public static final String JOB_EXECUTION_KILL = API_ROUTE + "/job/execution/kill/";
+ public static final String API_RESULT_CODE = "code";
+ public static final int API_RESULT_CODE_SUCCESS = 200;
+ public static final String API_RESULT_MSG = "msg";
+ public static final String API_RESULT_DATA = "data";
+ public static final String API_ERROR_MSG = "please check url or params";
+
+ public static final String STATUS_SUCCESS = "SUCCESS";
+ public static final String STATUS_KILL = "KILL";
+ public static final String STATUS_FAILURE = "FAILURE";
+
+ public static final String SUBMIT_FAILED_MSG = "Submit datavinesTask failed:";
+ public static final String TRACK_FAILED_MSG = "Track datavinesTask failed:";
+ public static final String APPIDS_FORMAT = "%s-%s";
+
+ public static final long SLEEP_MILLIS = 3000;
+
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesTaskException.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesTaskException.java
new file mode 100644
index 000000000000..da17d49ea456
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesTaskException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.datavines;
+
+/**
+ * Custom DatavinesTaskException
+ */
+public class DatavinesTaskException extends RuntimeException {
+
+ public DatavinesTaskException() {
+ super();
+ }
+
+ public DatavinesTaskException(String message) {
+ super(message);
+ }
+
+ public DatavinesTaskException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/org/apache/dolphinscheduler/plugin/task/datavines/utils/RequestUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/org/apache/dolphinscheduler/plugin/task/datavines/utils/RequestUtils.java
new file mode 100644
index 000000000000..474736d2bc5b
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/org/apache/dolphinscheduler/plugin/task/datavines/utils/RequestUtils.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.datavines.utils;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.datavines.DatavinesTaskConstants;
+
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+
+import java.net.URI;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+@Slf4j
+public class RequestUtils {
+
+ private static final CloseableHttpClient HTTP_CLIENT = HttpClientBuilder.create().build();
+
+ private RequestUtils() {
+ throw new IllegalStateException("Utility class");
+ }
+
+ public static JsonNode executeJob(String address, String jobId, String token) {
+ return parse(doPost(address + DatavinesTaskConstants.EXECUTE_JOB + jobId, token));
+ }
+
+ public static JsonNode getJobExecutionStatus(String address, String jobExecutionId, String token) {
+ return parse(doGet(address + DatavinesTaskConstants.GET_JOB_EXECUTION_STATUS + jobExecutionId, token));
+ }
+
+ public static JsonNode getJobExecutionResult(String address, String jobExecutionId, String token) {
+ return parse(doGet(address + DatavinesTaskConstants.GET_JOB_EXECUTION_RESULT + jobExecutionId, token));
+ }
+
+ public static void killJobExecution(String address, String jobExecutionId, String token) {
+ parse(doPost(address + DatavinesTaskConstants.JOB_EXECUTION_KILL + jobExecutionId, token));
+ }
+
+ public static JsonNode parse(String res) {
+ try {
+ return JSONUtils.parseObject(res, JsonNode.class);
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+ public static String doGet(String url, String token) {
+ String result = "";
+ HttpGet httpGet = null;
+ try {
+ URIBuilder uriBuilder = new URIBuilder(url);
+ URI uri = uriBuilder.build();
+ httpGet = new HttpGet(uri);
+ httpGet.setHeader("Authorization", "Bearer " + token);
+ log.info("access url: {}", uri);
+ try (CloseableHttpResponse response = HTTP_CLIENT.execute(httpGet)) {
+ if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+ result = EntityUtils.toString(response.getEntity());
+ log.info("datavines task succeed with results: {}", result);
+ } else {
+ log.error("datavines task terminated,response: {}", response);
+ }
+ }
+ } catch (IllegalArgumentException ie) {
+ log.error("datavines task terminated: {}", ie.getMessage());
+ } catch (Exception e) {
+ log.error("datavines task terminated: ", e);
+ } finally {
+ if (null != httpGet) {
+ httpGet.releaseConnection();
+ }
+ }
+ return result;
+ }
+
+ public static String doPost(String url, String token) {
+ String result = "";
+ HttpPost httpPost = new HttpPost(url);
+ try {
+ httpPost.setHeader("Authorization", "Bearer " + token);
+ log.info("access url: {}", url);
+ try (CloseableHttpResponse response = HTTP_CLIENT.execute(httpPost)) {
+ if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+ result = EntityUtils.toString(response.getEntity());
+ log.info("datavines task succeed with results: {}", result);
+ } else {
+ log.error("datavines task terminated, response: {}", response);
+ }
+ }
+ } catch (IllegalArgumentException ie) {
+ log.error("datavines task terminated: {}", ie.getMessage());
+ } catch (Exception he) {
+ log.error("datavines task terminated: ", he);
+ }
+ return result;
+ }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/test/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesTaskChannelFactoryTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/test/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesTaskChannelFactoryTest.java
new file mode 100644
index 000000000000..dc012fee1f64
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/test/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesTaskChannelFactoryTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.datavines;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class DatavinesTaskChannelFactoryTest {
+
+ private DatavinesTaskChannelFactory factory;
+
+ @BeforeEach
+ void setUp() {
+ factory = new DatavinesTaskChannelFactory();
+ }
+
+ @Test
+ void getNameReturnsDatavines() {
+ Assertions.assertEquals("DATAVINES", factory.getName());
+ }
+
+ @Test
+ void createReturnsDatavinesTaskChannel() {
+ TaskChannel channel = factory.create();
+ Assertions.assertNotNull(channel);
+ Assertions.assertInstanceOf(DatavinesTaskChannel.class, channel);
+ }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/test/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesTaskChannelTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/test/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesTaskChannelTest.java
new file mode 100644
index 000000000000..e46c77ce5156
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/test/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesTaskChannelTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.datavines;
+
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class DatavinesTaskChannelTest {
+
+ @Mock
+ private TaskExecutionContext taskExecutionContext;
+
+ private DatavinesTaskChannel channel;
+
+ @BeforeEach
+ void setUp() {
+ channel = new DatavinesTaskChannel();
+ }
+
+ @Test
+ void createTaskReturnsDatavinesTask() {
+ AbstractTask task = channel.createTask(taskExecutionContext);
+ Assertions.assertNotNull(task);
+ Assertions.assertInstanceOf(DatavinesTask.class, task);
+ }
+
+ @Test
+ void parseParametersReturnsDatavinesParametersWithCorrectFields() {
+ String taskParams =
+ "{\"address\":\"http://datavines:9090\",\"jobId\":\"42\",\"token\":\"my-token\",\"failureBlock\":true}";
+ AbstractParameters params = channel.parseParameters(taskParams);
+ Assertions.assertNotNull(params);
+ Assertions.assertInstanceOf(DatavinesParameters.class, params);
+ DatavinesParameters datavinesParams = (DatavinesParameters) params;
+ Assertions.assertEquals("http://datavines:9090", datavinesParams.getAddress());
+ Assertions.assertEquals("42", datavinesParams.getJobId());
+ Assertions.assertEquals("my-token", datavinesParams.getToken());
+ Assertions.assertTrue(datavinesParams.isFailureBlock());
+ }
+
+ @Test
+ void parseParametersWithEmptyJsonReturnsParametersObject() {
+ AbstractParameters params = channel.parseParameters("{}");
+ Assertions.assertNotNull(params);
+ Assertions.assertInstanceOf(DatavinesParameters.class, params);
+ }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/test/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/test/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesTaskTest.java
new file mode 100644
index 000000000000..82c37d39fd8f
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/test/java/org/apache/dolphinscheduler/plugin/task/datavines/DatavinesTaskTest.java
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.datavines;
+
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.when;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.datavines.utils.RequestUtils;
+
+import java.lang.reflect.Field;
+import java.util.stream.Stream;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
+class DatavinesTaskTest {
+
+ @Mock
+ private TaskExecutionContext taskExecutionContext;
+
+ private DatavinesTask datavinesTask;
+
+ @BeforeEach
+ void setUp() {
+ MockitoAnnotations.openMocks(this);
+ datavinesTask = new DatavinesTask(taskExecutionContext);
+ }
+
+ @Test
+ void initValidParametersInitializesSuccessfully() {
+ when(taskExecutionContext.getTaskParams())
+ .thenReturn("{\"address\":\"http://localhost\",\"jobId\":\"1\",\"token\":\"token\"}");
+ datavinesTask.init();
+ Assertions.assertNotNull(datavinesTask.getParameters());
+ }
+
+ @Test
+ void initInvalidParametersThrowsException() {
+ when(taskExecutionContext.getTaskParams()).thenReturn("{}");
+ assertThrows(DatavinesTaskException.class, () -> datavinesTask.init());
+ }
+
+ @Test
+ void submitApplicationExecutesJobSuccessfully() {
+ JsonNode executeJobResult = RequestUtils.parse("{\"code\":200,\"data\":\"1\"}");
+ try (MockedStatic requestUtilsStatic = Mockito.mockStatic(RequestUtils.class)) {
+ when(taskExecutionContext.getTaskParams())
+ .thenReturn("{\"address\":\"http://localhost\",\"jobId\":\"1\",\"token\":\"token\"}");
+ datavinesTask.init();
+ requestUtilsStatic.when(() -> RequestUtils.executeJob(Mockito.any(), Mockito.any(), Mockito.any()))
+ .thenReturn(executeJobResult);
+ assertDoesNotThrow(() -> datavinesTask.submitApplication());
+ }
+ }
+
+ @Test
+ void trackApplicationStatusJobExecutionSuccessSetsExitCodeSuccess() throws TaskException {
+ JsonNode executeJobResult = RequestUtils.parse("{\"code\":\"200\",\"data\":\"1\"}");
+ JsonNode executeStatus = RequestUtils.parse("{\"code\":\"200\",\"data\":\"SUCCESS\"}");
+ JsonNode executeResult = RequestUtils.parse("{\"code\":\"200\",\"data\":\"1\"}");
+ try (MockedStatic requestUtilsStatic = Mockito.mockStatic(RequestUtils.class)) {
+ when(taskExecutionContext.getTaskParams())
+ .thenReturn("{\"address\":\"http://localhost\",\"jobId\":\"1\",\"token\":\"token\"}");
+ datavinesTask.init();
+
+ requestUtilsStatic.when(() -> RequestUtils.executeJob(Mockito.any(), Mockito.any(), Mockito.any()))
+ .thenReturn(executeJobResult);
+ datavinesTask.submitApplication();
+
+ requestUtilsStatic
+ .when(() -> RequestUtils.getJobExecutionStatus(Mockito.any(), Mockito.any(), Mockito.any()))
+ .thenReturn(executeStatus);
+
+ requestUtilsStatic
+ .when(() -> RequestUtils.getJobExecutionResult(Mockito.any(), Mockito.any(), Mockito.any()))
+ .thenReturn(executeResult);
+
+ datavinesTask.trackApplicationStatus();
+ Assertions.assertEquals(EXIT_CODE_SUCCESS, datavinesTask.getExitStatusCode());
+ }
+ }
+
+ @Test
+ void trackApplicationStatusJobExecutionFailureSetsExitCodeFailure() throws TaskException {
+ JsonNode executeJobResult = RequestUtils.parse("{\"code\":500,\"msg\":\"error\",\"data\":\"error\"}");
+ JsonNode failureStatus = RequestUtils.parse("{\"code\":200,\"data\":\"FAILURE\"}");
+ try (MockedStatic requestUtilsStatic = Mockito.mockStatic(RequestUtils.class)) {
+ when(taskExecutionContext.getTaskParams())
+ .thenReturn("{\"address\":\"http://localhost\",\"jobId\":\"1\",\"token\":\"token\"}");
+ datavinesTask.init();
+ requestUtilsStatic.when(() -> RequestUtils.executeJob(Mockito.any(), Mockito.any(), Mockito.any()))
+ .thenReturn(executeJobResult);
+ datavinesTask.submitApplication();
+
+ requestUtilsStatic
+ .when(() -> RequestUtils.getJobExecutionStatus(Mockito.any(), Mockito.any(), Mockito.any()))
+ .thenReturn(failureStatus);
+ datavinesTask.trackApplicationStatus();
+ Assertions.assertEquals(EXIT_CODE_FAILURE, datavinesTask.getExitStatusCode());
+ }
+ }
+
+ @Test
+ void cancelApplicationTerminatesJobSuccessfully() {
+ when(taskExecutionContext.getTaskParams())
+ .thenReturn("{\"address\":\"http://localhost\",\"jobId\":\"1\",\"token\":\"token\"}");
+ datavinesTask.init();
+ assertDoesNotThrow(() -> datavinesTask.cancelApplication());
+ }
+
+ @Test
+ void checkParametersValidParametersReturnsTrue() {
+ DatavinesParameters parameters = new DatavinesParameters();
+ parameters.setAddress("http://localhost");
+ parameters.setJobId("1");
+ parameters.setToken("token");
+ Assertions.assertTrue(parameters.checkParameters());
+ }
+
+ static Stream invalidCheckParametersInputs() {
+ return Stream.of(
+ Arguments.of("", "1", "token"),
+ Arguments.of("http://localhost", "", "token"),
+ Arguments.of(null, "1", "token"),
+ Arguments.of("http://localhost", null, "token"),
+ Arguments.of("http://localhost", "1", ""),
+ Arguments.of("http://localhost", "1", null));
+ }
+
+ @ParameterizedTest
+ @MethodSource("invalidCheckParametersInputs")
+ void checkParametersInvalidInputsReturnsFalse(String address, String jobId, String token) {
+ DatavinesParameters parameters = new DatavinesParameters();
+ parameters.setAddress(address);
+ parameters.setJobId(jobId);
+ parameters.setToken(token);
+ Assertions.assertFalse(parameters.checkParameters());
+ }
+
+ @Test
+ void getResourceFilesListReturnsEmptyList() {
+ DatavinesParameters parameters = new DatavinesParameters();
+ Assertions.assertTrue(parameters.getResourceFilesList().isEmpty());
+ }
+
+ @Test
+ void killJobExecutionValidParametersExecutesSuccessfully() {
+ String address = "http://localhost";
+ String jobExecutionId = "1";
+ String token = "token";
+ try (MockedStatic requestUtilsStatic = Mockito.mockStatic(RequestUtils.class)) {
+ requestUtilsStatic
+ .when(() -> RequestUtils
+ .doPost(address + DatavinesTaskConstants.JOB_EXECUTION_KILL + jobExecutionId, token))
+ .thenReturn("");
+ RequestUtils.killJobExecution(address, jobExecutionId, token);
+ requestUtilsStatic.verify(() -> RequestUtils.killJobExecution(address, jobExecutionId, token));
+ }
+ }
+
+ @Test
+ void parseValidJsonStringReturnsJsonNode() {
+ String jsonString = "{\"code\":\"200\",\"data\":\"SUCCESS\"}";
+ JsonNode result = RequestUtils.parse(jsonString);
+ Assertions.assertEquals("200", result.get("code").asText());
+ Assertions.assertEquals("SUCCESS", result.get("data").asText());
+ }
+
+ @Test
+ void parseInvalidJsonStringReturnsNull() {
+ String jsonString = "invalid json";
+ JsonNode result = RequestUtils.parse(jsonString);
+ Assertions.assertNull(result);
+ }
+
+ @Test
+ void getApplicationIdsReturnsEmptyList() throws TaskException {
+ when(taskExecutionContext.getTaskParams())
+ .thenReturn("{\"address\":\"http://localhost\",\"jobId\":\"1\",\"token\":\"token\"}");
+ datavinesTask.init();
+ Assertions.assertTrue(datavinesTask.getApplicationIds().isEmpty());
+ }
+
+ @Test
+ void trackApplicationStatusExecutionStatusTrueWithNullJobExecutionIdSetsExitCodeFailure() throws TaskException, NoSuchFieldException, IllegalAccessException {
+ when(taskExecutionContext.getTaskParams())
+ .thenReturn("{\"address\":\"http://localhost\",\"jobId\":\"1\",\"token\":\"token\"}");
+ datavinesTask.init();
+
+ Field executionStatusField = DatavinesTask.class.getDeclaredField("executionStatus");
+ executionStatusField.setAccessible(true);
+ executionStatusField.set(datavinesTask, true);
+ // jobExecutionId remains null (default)
+
+ datavinesTask.trackApplicationStatus();
+ Assertions.assertEquals(EXIT_CODE_FAILURE, datavinesTask.getExitStatusCode());
+ }
+
+ @Test
+ void trackApplicationStatusJobStatusFailureSetsExitCodeFailure() throws TaskException {
+ JsonNode executeJobResult = RequestUtils.parse("{\"code\":200,\"data\":\"1\"}");
+ JsonNode failureStatus = RequestUtils.parse("{\"code\":200,\"data\":\"FAILURE\"}");
+ try (MockedStatic requestUtilsStatic = Mockito.mockStatic(RequestUtils.class)) {
+ when(taskExecutionContext.getTaskParams())
+ .thenReturn("{\"address\":\"http://localhost\",\"jobId\":\"1\",\"token\":\"token\"}");
+ datavinesTask.init();
+
+ requestUtilsStatic.when(() -> RequestUtils.executeJob(Mockito.any(), Mockito.any(), Mockito.any()))
+ .thenReturn(executeJobResult);
+ datavinesTask.submitApplication();
+
+ requestUtilsStatic
+ .when(() -> RequestUtils.getJobExecutionStatus(Mockito.any(), Mockito.any(), Mockito.any()))
+ .thenReturn(failureStatus);
+
+ datavinesTask.trackApplicationStatus();
+ Assertions.assertEquals(EXIT_CODE_FAILURE, datavinesTask.getExitStatusCode());
+ }
+ }
+
+ @Test
+ void trackApplicationStatusJobStatusKillSetsExitCodeFailure() throws TaskException {
+ JsonNode executeJobResult = RequestUtils.parse("{\"code\":200,\"data\":\"1\"}");
+ JsonNode killStatus = RequestUtils.parse("{\"code\":200,\"data\":\"KILL\"}");
+ try (MockedStatic requestUtilsStatic = Mockito.mockStatic(RequestUtils.class)) {
+ when(taskExecutionContext.getTaskParams())
+ .thenReturn("{\"address\":\"http://localhost\",\"jobId\":\"1\",\"token\":\"token\"}");
+ datavinesTask.init();
+
+ requestUtilsStatic.when(() -> RequestUtils.executeJob(Mockito.any(), Mockito.any(), Mockito.any()))
+ .thenReturn(executeJobResult);
+ datavinesTask.submitApplication();
+
+ requestUtilsStatic
+ .when(() -> RequestUtils.getJobExecutionStatus(Mockito.any(), Mockito.any(), Mockito.any()))
+ .thenReturn(killStatus);
+
+ datavinesTask.trackApplicationStatus();
+ Assertions.assertEquals(EXIT_CODE_FAILURE, datavinesTask.getExitStatusCode());
+ }
+ }
+
+ @Test
+ void trackApplicationStatusGetJobStatusApiErrorSetsExitCodeFailure() throws TaskException {
+ JsonNode executeJobResult = RequestUtils.parse("{\"code\":200,\"data\":\"1\"}");
+ JsonNode errorStatus = RequestUtils.parse("{\"code\":500,\"msg\":\"error\",\"data\":\"error\"}");
+ try (MockedStatic requestUtilsStatic = Mockito.mockStatic(RequestUtils.class)) {
+ when(taskExecutionContext.getTaskParams())
+ .thenReturn("{\"address\":\"http://localhost\",\"jobId\":\"1\",\"token\":\"token\"}");
+ datavinesTask.init();
+
+ requestUtilsStatic.when(() -> RequestUtils.executeJob(Mockito.any(), Mockito.any(), Mockito.any()))
+ .thenReturn(executeJobResult);
+ datavinesTask.submitApplication();
+
+ requestUtilsStatic
+ .when(() -> RequestUtils.getJobExecutionStatus(Mockito.any(), Mockito.any(), Mockito.any()))
+ .thenReturn(errorStatus);
+
+ datavinesTask.trackApplicationStatus();
+ Assertions.assertEquals(EXIT_CODE_FAILURE, datavinesTask.getExitStatusCode());
+ }
+ }
+
+ @Test
+ void trackApplicationStatusGetJobResultApiErrorSetsExitCodeFailure() throws TaskException {
+ JsonNode executeJobResult = RequestUtils.parse("{\"code\":200,\"data\":\"1\"}");
+ JsonNode successStatus = RequestUtils.parse("{\"code\":200,\"data\":\"SUCCESS\"}");
+ JsonNode errorResult = RequestUtils.parse("{\"code\":500,\"msg\":\"error\",\"data\":\"error\"}");
+ JsonNode killStatus = RequestUtils.parse("{\"code\":200,\"data\":\"KILL\"}");
+ try (MockedStatic requestUtilsStatic = Mockito.mockStatic(RequestUtils.class)) {
+ when(taskExecutionContext.getTaskParams())
+ .thenReturn("{\"address\":\"http://localhost\",\"jobId\":\"1\",\"token\":\"token\"}");
+ datavinesTask.init();
+
+ requestUtilsStatic.when(() -> RequestUtils.executeJob(Mockito.any(), Mockito.any(), Mockito.any()))
+ .thenReturn(executeJobResult);
+ datavinesTask.submitApplication();
+
+ // First call to getJobExecutionStatus returns SUCCESS; second call returns KILL to exit the loop
+ requestUtilsStatic
+ .when(() -> RequestUtils.getJobExecutionStatus(Mockito.any(), Mockito.any(), Mockito.any()))
+ .thenReturn(successStatus, killStatus);
+ // getJobExecutionResult returns an API error, checkResult fails, break from switch
+ requestUtilsStatic
+ .when(() -> RequestUtils.getJobExecutionResult(Mockito.any(), Mockito.any(), Mockito.any()))
+ .thenReturn(errorResult);
+
+ datavinesTask.trackApplicationStatus();
+ Assertions.assertEquals(EXIT_CODE_FAILURE, datavinesTask.getExitStatusCode());
+ }
+ }
+
+ @Test
+ void trackApplicationStatusFailureBlockTrueWithSuccessResultSetsExitCodeSuccess() throws TaskException {
+ JsonNode executeJobResult = RequestUtils.parse("{\"code\":200,\"data\":\"1\"}");
+ JsonNode successStatus = RequestUtils.parse("{\"code\":200,\"data\":\"SUCCESS\"}");
+ JsonNode successResult = RequestUtils.parse("{\"code\":200,\"data\":\"SUCCESS\"}");
+ try (MockedStatic requestUtilsStatic = Mockito.mockStatic(RequestUtils.class)) {
+ when(taskExecutionContext.getTaskParams())
+ .thenReturn(
+ "{\"address\":\"http://localhost\",\"jobId\":\"1\",\"token\":\"token\",\"failureBlock\":true}");
+ datavinesTask.init();
+
+ requestUtilsStatic.when(() -> RequestUtils.executeJob(Mockito.any(), Mockito.any(), Mockito.any()))
+ .thenReturn(executeJobResult);
+ datavinesTask.submitApplication();
+
+ requestUtilsStatic
+ .when(() -> RequestUtils.getJobExecutionStatus(Mockito.any(), Mockito.any(), Mockito.any()))
+ .thenReturn(successStatus);
+ requestUtilsStatic
+ .when(() -> RequestUtils.getJobExecutionResult(Mockito.any(), Mockito.any(), Mockito.any()))
+ .thenReturn(successResult);
+
+ datavinesTask.trackApplicationStatus();
+ Assertions.assertEquals(EXIT_CODE_SUCCESS, datavinesTask.getExitStatusCode());
+ }
+ }
+
+ @Test
+ void trackApplicationStatusFailureBlockTrueWithNonSuccessResultSetsExitCodeFailure() throws TaskException {
+ JsonNode executeJobResult = RequestUtils.parse("{\"code\":200,\"data\":\"1\"}");
+ JsonNode successStatus = RequestUtils.parse("{\"code\":200,\"data\":\"SUCCESS\"}");
+ JsonNode failureResult = RequestUtils.parse("{\"code\":200,\"data\":\"FAILURE\"}");
+ try (MockedStatic requestUtilsStatic = Mockito.mockStatic(RequestUtils.class)) {
+ when(taskExecutionContext.getTaskParams())
+ .thenReturn(
+ "{\"address\":\"http://localhost\",\"jobId\":\"1\",\"token\":\"token\",\"failureBlock\":true}");
+ datavinesTask.init();
+
+ requestUtilsStatic.when(() -> RequestUtils.executeJob(Mockito.any(), Mockito.any(), Mockito.any()))
+ .thenReturn(executeJobResult);
+ datavinesTask.submitApplication();
+
+ requestUtilsStatic
+ .when(() -> RequestUtils.getJobExecutionStatus(Mockito.any(), Mockito.any(), Mockito.any()))
+ .thenReturn(successStatus);
+ requestUtilsStatic
+ .when(() -> RequestUtils.getJobExecutionResult(Mockito.any(), Mockito.any(), Mockito.any()))
+ .thenReturn(failureResult);
+
+ datavinesTask.trackApplicationStatus();
+ Assertions.assertEquals(EXIT_CODE_FAILURE, datavinesTask.getExitStatusCode());
+ }
+ }
+
+}
diff --git a/dolphinscheduler-task-plugin/pom.xml b/dolphinscheduler-task-plugin/pom.xml
index 830650f50f55..804b135220ab 100644
--- a/dolphinscheduler-task-plugin/pom.xml
+++ b/dolphinscheduler-task-plugin/pom.xml
@@ -63,6 +63,7 @@
dolphinscheduler-task-remoteshell
dolphinscheduler-task-aliyunserverlessspark
dolphinscheduler-task-emr-serverless
+ dolphinscheduler-task-datavines
diff --git a/dolphinscheduler-ui/public/images/task-icons/datavines.png b/dolphinscheduler-ui/public/images/task-icons/datavines.png
new file mode 100644
index 000000000000..7716d54990f2
Binary files /dev/null and b/dolphinscheduler-ui/public/images/task-icons/datavines.png differ
diff --git a/dolphinscheduler-ui/public/images/task-icons/datavines_hover.png b/dolphinscheduler-ui/public/images/task-icons/datavines_hover.png
new file mode 100644
index 000000000000..375d9da823a8
Binary files /dev/null and b/dolphinscheduler-ui/public/images/task-icons/datavines_hover.png differ
diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts b/dolphinscheduler-ui/src/locales/en_US/project.ts
index 1e8f23bba55f..bd1dfbcc13c3 100644
--- a/dolphinscheduler-ui/src/locales/en_US/project.ts
+++ b/dolphinscheduler-ui/src/locales/en_US/project.ts
@@ -902,6 +902,14 @@ export default {
dinky_task_id: 'Dinky task id',
dinky_task_id_tips: 'Please enter the task id of your dinky',
dinky_online: 'Online task',
+ datavines_address: 'Datavines address',
+ datavines_address_tips:
+ 'Please enter the url of your datavines, eg: http://localhost:5600',
+ datavines_job_id: 'Datavines job id',
+ datavines_job_id_tips: 'Please enter the job id of your datavines',
+ datavines_token: 'Datavines token',
+ datavines_token_tips: 'Please enter the token of your datavines',
+ datavines_failure_block: 'Block on failure',
factory_name: 'Factory Name',
resource_group_name: 'Resource Group Name',
pipeline_name: 'Pipeline Name',
diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
index 8876969f72d0..737c2ba02e34 100644
--- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts
+++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
@@ -867,6 +867,13 @@ export default {
dinky_task_id: 'dinky 作业ID',
dinky_task_id_tips: '请输入作业 ID',
dinky_online: '是否上线作业',
+ datavines_address: 'Datavines 地址',
+ datavines_address_tips: '请输入 Datavines 地址, 比如 http://localhost:5600',
+ datavines_job_id: 'Datavines 作业ID',
+ datavines_job_id_tips: '请输入作业 ID',
+ datavines_token: 'Datavines token',
+ datavines_token_tips: '请输入 token',
+ datavines_failure_block: '检查失败时阻塞',
factory_name: '工厂名称',
resource_group_name: '资源组名称',
pipeline_name: 'pipeline名称',
diff --git a/dolphinscheduler-ui/src/store/project/task-type.ts b/dolphinscheduler-ui/src/store/project/task-type.ts
index 597340750ae3..dc2a25f9b4ce 100644
--- a/dolphinscheduler-ui/src/store/project/task-type.ts
+++ b/dolphinscheduler-ui/src/store/project/task-type.ts
@@ -70,6 +70,10 @@ export const TASK_TYPES_MAP = {
CONDITIONS: {
alias: 'CONDITIONS'
},
+ DATAVINES: {
+ alias: 'DATAVINES',
+ helperLinkDisable: true
+ },
SWITCH: {
alias: 'SWITCH'
},
diff --git a/dolphinscheduler-ui/src/store/project/types.ts b/dolphinscheduler-ui/src/store/project/types.ts
index a58135b3e741..11ef34b6769f 100644
--- a/dolphinscheduler-ui/src/store/project/types.ts
+++ b/dolphinscheduler-ui/src/store/project/types.ts
@@ -58,6 +58,7 @@ type TaskType =
| 'DATA_FACTORY'
| 'REMOTESHELL'
| 'ALIYUN_SERVERLESS_SPARK'
+ | 'DATAVINES'
type ProgramType = 'JAVA' | 'SCALA' | 'PYTHON'
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
index da4cce0763e5..4ad151224f90 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
@@ -77,6 +77,7 @@ export { useMlflowModels } from './use-mlflow-models'
export { useOpenmldb } from './use-openmldb'
export { useDvc } from './use-dvc'
export { useDinky } from './use-dinky'
+export { useDatavines } from './use-datavines'
export { useSagemaker } from './use-sagemaker'
export { useJava } from './use-java'
export { useChunjun } from './use-chunjun'
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datavines.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datavines.ts
new file mode 100644
index 000000000000..fee9de6233f7
--- /dev/null
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datavines.ts
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import { useI18n } from 'vue-i18n'
+import { useCustomParams } from '.'
+import type { IJsonItem } from '../types'
+
+export function useDatavines(model: { [field: string]: any }): IJsonItem[] {
+ const { t } = useI18n()
+
+ return [
+ {
+ type: 'input',
+ field: 'address',
+ class: 'input-datavines-address',
+ name: t('project.node.datavines_address'),
+ props: {
+ placeholder: t('project.node.datavines_address_tips')
+ },
+ validate: {
+ trigger: ['input', 'blur'],
+ required: true,
+ validator(_validate: any, value: string) {
+ if (!value) {
+ return new Error(t('project.node.datavines_address_tips'))
+ }
+ }
+ }
+ },
+ {
+ type: 'input',
+ field: 'jobId',
+ class: 'input-datavines-job-id',
+ name: t('project.node.datavines_job_id'),
+ props: {
+ placeholder: t('project.node.datavines_job_id_tips')
+ },
+ validate: {
+ trigger: ['input', 'blur'],
+ required: true,
+ validator(_validate: any, value: string) {
+ if (!value) {
+ return new Error(t('project.node.datavines_job_id_tips'))
+ }
+ }
+ }
+ },
+ {
+ type: 'input',
+ field: 'token',
+ class: 'input-datavines-token',
+ name: t('project.node.datavines_token'),
+ props: {
+ placeholder: t('project.node.datavines_token_tips')
+ },
+ validate: {
+ trigger: ['input', 'blur'],
+ required: true,
+ validator(_validate: any, value: string) {
+ if (!value) {
+ return new Error(t('project.node.datavines_token_tips'))
+ }
+ }
+ }
+ },
+ {
+ type: 'switch',
+ field: 'failureBlock',
+ name: t('project.node.datavines_failure_block')
+ },
+ ...useCustomParams({ model, field: 'localParams', isSimple: false })
+ ]
+}
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
index a7754cb24d13..6e176fa27504 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
@@ -410,6 +410,13 @@ export function formatParams(data: INodeData): {
taskParams.online = data.online
}
+ if (data.taskType === 'DATAVINES') {
+ taskParams.address = data.address
+ taskParams.jobId = data.jobId
+ taskParams.token = data.token
+ taskParams.failureBlock = data.failureBlock
+ }
+
if (data.taskType === 'OPENMLDB') {
taskParams.zk = data.zk
taskParams.zkPath = data.zkPath
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
index 89fa49e17b4a..819eb520a7fb 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
@@ -52,6 +52,7 @@ import { useLinkis } from './use-linkis'
import { useDataFactory } from './use-data-factory'
import { useRemoteShell } from './use-remote-shell'
import { useAliyunServerlessSpark } from './use-aliyun-serverless-spark'
+import { useDatavines } from './use-datavines'
export default {
SHELL: useShell,
@@ -79,6 +80,7 @@ export default {
OPENMLDB: useOpenmldb,
DVC: useDvc,
DINKY: useDinky,
+ DATAVINES: useDatavines,
SAGEMAKER: userSagemaker,
CHUNJUN: useChunjun,
FLINK_STREAM: useFlinkStream,
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-datavines.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-datavines.ts
new file mode 100644
index 000000000000..7314504d273a
--- /dev/null
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-datavines.ts
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { reactive } from 'vue'
+import * as Fields from '../fields/index'
+import type { IJsonItem, INodeData, ITaskData } from '../types'
+
+export function useDatavines({
+ projectCode,
+ from = 0,
+ readonly,
+ data
+}: {
+ projectCode: number
+ from?: number
+ readonly?: boolean
+ data?: ITaskData
+}) {
+ const model = reactive({
+ name: '',
+ taskType: 'DATAVINES',
+ flag: 'YES',
+ description: '',
+ timeoutFlag: false,
+ localParams: [],
+ environmentCode: null,
+ failRetryInterval: 1,
+ failRetryTimes: 0,
+ workerGroup: 'default',
+ delayTime: 0,
+ timeout: 30,
+ timeoutNotifyStrategy: ['WARN']
+ } as INodeData)
+
+ return {
+ json: [
+ Fields.useName(from),
+ ...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
+ Fields.useRunFlag(),
+ Fields.useDescription(),
+ Fields.useTaskPriority(),
+ Fields.useWorkerGroup(projectCode),
+ Fields.useEnvironmentName(model, !data?.id),
+ ...Fields.useTaskGroup(model, projectCode),
+ ...Fields.useFailed(),
+ Fields.useDelayTime(model),
+ ...Fields.useTimeoutAlarm(model),
+ ...Fields.useDatavines(model),
+ Fields.usePreTasks()
+ ] as IJsonItem[],
+ model
+ }
+}
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
index 218f318db0d0..3217ff3c77ba 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
@@ -476,6 +476,9 @@ interface ITaskParams {
yarnQueue?: string
awsRegion?: string
kubeConfig?: string
+ jobId?: string
+ token?: string
+ failureBlock?: boolean
}
interface INodeData
diff --git a/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts b/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
index d0eb36b277a8..ef20cea6324d 100644
--- a/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
@@ -52,6 +52,7 @@ export type TaskType =
| 'DATA_FACTORY'
| 'REMOTESHELL'
| 'ALIYUN_SERVERLESS_SPARK'
+ | 'DATAVINES'
export type TaskExecuteType = 'STREAM' | 'BATCH'
@@ -104,6 +105,10 @@ export const TASK_TYPES_MAP = {
CONDITIONS: {
alias: 'CONDITIONS'
},
+ DATAVINES: {
+ alias: 'DATAVINES',
+ helperLinkDisable: true
+ },
SWITCH: {
alias: 'SWITCH'
},
diff --git a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
index 809bc6fae3cf..e871f14a8b1b 100644
--- a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
+++ b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
@@ -107,6 +107,9 @@ $bgLight: #ffffff;
&.icon-sub_workflow {
background-image: url('/images/task-icons/sub_workflow.png');
}
+ &.icon-datavines {
+ background-image: url('/images/task-icons/datavines.png');
+ }
&.icon-procedure {
background-image: url('/images/task-icons/procedure.png');
}
@@ -217,6 +220,9 @@ $bgLight: #ffffff;
&.icon-sub_workflow {
background-image: url('/images/task-icons/sub_workflow_hover.png');
}
+ &.icon-datavines {
+ background-image: url('/images/task-icons/datavines_hover.png');
+ }
&.icon-procedure {
background-image: url('/images/task-icons/procedure_hover.png');
}