Skip to content

Commit a579d47

Browse files
committed
add akka dag optimize for TIS ,issue:datavane/tis#486
1 parent d36e619 commit a579d47

File tree

41 files changed

+1491
-1381
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+1491
-1381
lines changed

tis-datax/executor/dolphinscheduler-task-tis-datasync/pom.xml

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,24 @@
1919

2020
<dependencies>
2121

22-
<dependency>
23-
<groupId>org.apache.dolphinscheduler</groupId>
24-
<artifactId>dolphinscheduler-spi</artifactId>
25-
<version>${dolphinscheduler.version}</version>
26-
<scope>provided</scope>
27-
</dependency>
22+
<!-- <dependency>-->
23+
<!-- <groupId>org.apache.dolphinscheduler</groupId>-->
24+
<!-- <artifactId>dolphinscheduler-spi</artifactId>-->
25+
<!-- <version>${dolphinscheduler.version}</version>-->
26+
<!-- <scope>provided</scope>-->
27+
<!-- </dependency>-->
2828
<dependency>
2929
<groupId>com.qlangtech.tis</groupId>
3030
<artifactId>tis-base-test</artifactId>
3131
<version>${tis.version}</version>
3232
<scope>test</scope>
3333
</dependency>
34-
<dependency>
35-
<groupId>org.apache.dolphinscheduler</groupId>
36-
<artifactId>dolphinscheduler-task-api</artifactId>
37-
<version>${dolphinscheduler.version}</version>
38-
<scope>provided</scope>
39-
</dependency>
34+
<!-- <dependency>-->
35+
<!-- <groupId>org.apache.dolphinscheduler</groupId>-->
36+
<!-- <artifactId>dolphinscheduler-task-api</artifactId>-->
37+
<!-- <version>${dolphinscheduler.version}</version>-->
38+
<!-- <scope>provided</scope>-->
39+
<!-- </dependency>-->
4040
<dependency>
4141
<groupId>org.apache.dolphinscheduler</groupId>
4242
<artifactId>dolphinscheduler-common</artifactId>

tis-datax/executor/tis-datax-executor/src/main/java/com/qlangtech/tis/datax/DataXJobSingleProcessorExecutor.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,22 +45,22 @@
4545
**/
4646
public abstract class DataXJobSingleProcessorExecutor<T extends IDataXTaskRelevant> {
4747
private static final Logger logger = LoggerFactory.getLogger(DataXJobSingleProcessorExecutor.class);
48-
public static final int DEFAULT_TASK_ID = 999;
48+
//
4949
// 记录当前正在执行的任务<taskid,ExecuteWatchdog>
5050
public final ConcurrentHashMap<Integer, ExecuteWatchdog> runningTask = new ConcurrentHashMap<>();
5151

52-
public void consumeMessage(T msg) throws Exception {
52+
public void consumeMessage(T msg, Integer taskId) throws Exception {
5353
// Integer taskId = PreviewLaunchParam;// msg.getTaskId();
5454
String jobName = msg.getJobName();
5555
DataXName dataxName = msg.getDataXName();
5656
// StoreResourceType resType = Objects.requireNonNull(msg.getResType(), "resType can not be null");
5757
// MDC.put(JobCommon.KEY_TASK_ID, String.valueOf(jobId));
5858
// MDC.put(JobCommon.KEY_COLLECTION, dataxName);
59-
JobCommon.setMDC(DEFAULT_TASK_ID, dataxName.getPipelineName());
59+
JobCommon.setMDC(taskId, dataxName.getPipelineName());
6060

6161

6262
// 查看当前任务是否正在进行中,如果已经终止则要退出
63-
execSystemTask(msg, DEFAULT_TASK_ID, jobName, dataxName.getPipelineName());
63+
execSystemTask(msg, taskId, jobName, dataxName.getPipelineName());
6464
}
6565

6666
protected void execSystemTask(T msg, Integer jobId, String jobName, String dataxName) throws IOException,
@@ -192,11 +192,10 @@ protected String[] getExtraJavaSystemPrams() {
192192
*/
193193
protected abstract String getMainClassName();
194194

195-
/**
196-
* @return
197-
*/
198-
protected abstract File getWorkingDirectory();
199195

196+
protected final File getWorkingDirectory() {
197+
return TisAppLaunch.isTestMock() ? new File("/opt/tis/tis-datax-executor") : DataXJobInfo.getDataXExecutorDir();
198+
}
200199

201200
/**
202201
* Assemble 日志收集器地址

tis-datax/executor/tis-datax-executor/src/main/java/com/qlangtech/tis/datax/DataxPrePostConsumer.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.apache.commons.exec.CommandLine;
2525
import org.apache.commons.lang3.StringUtils;
2626

27-
import java.io.File;
2827
import java.util.List;
2928
import java.util.Objects;
3029

@@ -64,8 +63,8 @@ public DataXJobRunEnvironmentParamsSetter.ExtraJavaSystemPramsSuppiler getExtraJ
6463

6564

6665
@Override
67-
public void consumeMessage(DataXLifecycleHookMsg msg) throws Exception {
68-
super.consumeMessage(msg);
66+
public void consumeMessage(DataXLifecycleHookMsg msg, Integer taskId) throws Exception {
67+
super.consumeMessage(msg, taskId);
6968
}
7069

7170
@Override
@@ -147,9 +146,9 @@ protected String getMainClassName() {
147146
return DataxPrePostExecutor.class.getName();
148147
}
149148

150-
public File getWorkingDirectory() {
151-
return DataXJobInfo.getDataXExecutorDir();
152-
}
149+
// public File getWorkingDirectory() {
150+
// return DataXJobInfo.getDataXExecutorDir();
151+
// }
153152

154153

155154
public static final String DEFAULT_CLASSPATH = "./lib/*:./" + IDataXTaskRelevant.KEY_TIS_DATAX_EXECUTOR + ".jar:./conf/";

tis-datax/executor/tis-datax-executor/src/main/java/com/qlangtech/tis/datax/executor/BasicTISTableDumpProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ public static IRemoteTaskTrigger createDataXJob(AbstractExecContext execContext
299299
return dataXBatchPost.createPostTask(execContext, entityName, tab,
300300
processor.getDataxCfgFileNames(null, Optional.empty()));
301301
} else if (cycleHook == LifeCycleHook.Prep) {
302-
return dataXBatchPost.createPreExecuteTask(execContext , entityName, tab);
302+
return dataXBatchPost.createPreExecuteTask(execContext, entityName, tab);
303303
} else {
304304
throw new IllegalArgumentException("cycleHook:" + cycleHook);
305305
}
@@ -321,7 +321,7 @@ public String getTaskName() {
321321
@Override
322322
public void run() {
323323
try {
324-
prePostConsumer.consumeMessage(lifecycleHookMsg);
324+
prePostConsumer.consumeMessage(lifecycleHookMsg, execContext.getTaskId());
325325
} catch (Exception e) {
326326
throw new RuntimeException(e);
327327
}

tis-datax/executor/tis-datax-executor/src/main/java/com/qlangtech/tis/datax/executor/BasicTISTableJoinProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ private RpcServiceReference createRpcServiceReference() {
7676

7777

7878
protected void process(ITaskExecutorContext context) throws Exception {
79+
7980
RpcServiceReference feedback = createRpcServiceReference();
8081
SqlTaskNodeMeta sqlTask =
8182
SqlTaskNodeMeta.deserializeTaskNode(ISqlTask.toCfg((context.getJobParams())));
@@ -90,7 +91,7 @@ protected void process(ITaskExecutorContext context) throws Exception {
9091

9192
WorkflowHookMsg wfHookMsg = WorkflowHookMsg.create(sqlTask, execContext, sqlTask.getExportName());
9293

93-
tableJoinConsumer.consumeMessage(wfHookMsg);
94+
tableJoinConsumer.consumeMessage(wfHookMsg,execContext.getTaskId());
9495
}
9596
BasicTISTableDumpProcessor.addSuccessPartition(context, execContext, sqlTask.getExportName());
9697

tis-datax/executor/tis-datax-executor/src/main/java/com/qlangtech/tis/datax/join/DataXJoinProcessConsumer.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package com.qlangtech.tis.datax.join;
22

33
import com.alibaba.fastjson.JSONObject;
4-
import com.qlangtech.tis.datax.DataXJobInfo;
54
import com.qlangtech.tis.datax.DataXJobRunEnvironmentParamsSetter;
65
import com.qlangtech.tis.datax.DataXJobSingleProcessorExecutor;
76
import com.qlangtech.tis.datax.DataXJobSubmit;
@@ -16,14 +15,12 @@
1615
import org.apache.commons.exec.CommandLine;
1716
import org.apache.commons.lang3.StringUtils;
1817

19-
import java.io.File;
2018
import java.util.List;
2119
import java.util.Objects;
2220

2321
import static com.qlangtech.tis.sql.parser.ISqlTask.KEY_EXECUTE_TYPE;
2422
import static com.qlangtech.tis.sql.parser.ISqlTask.KEY_EXPORT_NAME;
2523
import static com.qlangtech.tis.sql.parser.ISqlTask.KEY_ID;
26-
import static com.qlangtech.tis.sql.parser.ISqlTask.KEY_SQL_SCRIPT;
2724

2825
/**
2926
* @author 百岁 (baisui@qlangtech.com)
@@ -156,10 +153,10 @@ protected String getClasspath() {
156153
return DataxPrePostConsumer.DEFAULT_CLASSPATH;
157154
}
158155

159-
@Override
160-
protected File getWorkingDirectory() {
161-
return DataXJobInfo.getDataXExecutorDir();
162-
}
156+
// @Override
157+
// protected File getWorkingDirectory() {
158+
// return DataXJobInfo.getDataXExecutorDir();
159+
// }
163160

164161
@Override
165162
protected String getIncrStateCollectAddress() {

tis-datax/executor/tis-datax-executor/src/main/java/com/qlangtech/tis/datax/powerjob/SplitTabSync.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,14 @@
44
import com.qlangtech.tis.datax.DataXJobInfo;
55
import com.qlangtech.tis.datax.DataXJobRunEnvironmentParamsSetter;
66
import com.qlangtech.tis.datax.DataXJobSubmit;
7+
import com.qlangtech.tis.datax.DataXJobSubmitParams;
78
import com.qlangtech.tis.datax.DataxPrePostConsumer;
89
import com.qlangtech.tis.datax.IDataxProcessor;
910
import com.qlangtech.tis.datax.executor.BasicTISTableDumpProcessor;
1011
import com.qlangtech.tis.exec.AbstractExecContext;
1112
import com.qlangtech.tis.fullbuild.indexbuild.IRemoteTaskTrigger;
12-
import com.qlangtech.tis.web.start.TisAppLaunch;
1313
import com.tis.hadoop.rpc.RpcServiceReference;
1414

15-
import java.util.Optional;
16-
1715
/**
1816
* @author 百岁 (baisui@qlangtech.com)
1917
* @date 2023/11/21
@@ -36,13 +34,15 @@ public IRemoteTaskTrigger createTrigger(final AbstractExecContext execChainConte
3634
if (statusRpc == null) {
3735
throw new IllegalArgumentException("statusRpc can not be null");
3836
}
39-
DataXJobSubmit dataXJobSubmit = getDataXJobSubmit(execChainContext);
37+
DataXJobSubmitParams submitParams = DataXJobSubmitParams.getDftIfEmpty();
38+
DataXJobSubmit dataXJobSubmit = submitParams.getTaskSubmit(execChainContext.isDryRun());
39+
4040
if (dataXJobSubmit instanceof DataXJobRunEnvironmentParamsSetter) {
4141
DataXJobRunEnvironmentParamsSetter runEnvironmentParamsSetter =
4242
(DataXJobRunEnvironmentParamsSetter) dataXJobSubmit;
4343
DataxPrePostConsumer prePostConsumer = BasicTISTableDumpProcessor.createPrePostConsumer();
4444
runEnvironmentParamsSetter.setClasspath(prePostConsumer.getClasspath());
45-
runEnvironmentParamsSetter.setWorkingDirectory(prePostConsumer.getWorkingDirectory());
45+
// runEnvironmentParamsSetter.setWorkingDirectory(prePostConsumer.getWorkingDirectory());
4646
runEnvironmentParamsSetter.setExtraJavaSystemPramsSuppiler(prePostConsumer.getExtraJavaSystemPramsSuppiler());
4747
}
4848

@@ -56,18 +56,18 @@ public void execSync(final AbstractExecContext execChainContext, RpcServiceRefer
5656
createTrigger(execChainContext, statusRpc).run();
5757
}
5858

59-
private static DataXJobSubmit getDataXJobSubmit(AbstractExecContext execChainContext) {
60-
DataXJobSubmit.InstanceType instanceType = TisAppLaunch.isTestMock() ? DataXJobSubmit.InstanceType.EMBEDDED :
61-
DataXJobSubmit.InstanceType.LOCAL;
62-
63-
Optional<DataXJobSubmit> dataXJobSubmit = DataXJobSubmit.getDataXJobSubmit(execChainContext.isDryRun(),
64-
instanceType);
65-
if (dataXJobSubmit.isEmpty()) {
66-
throw new IllegalStateException("dataXJobSubmit must be present ,instanceType:"
67-
+ instanceType + ",isDryRun:" + execChainContext.isDryRun());
68-
}
69-
return dataXJobSubmit.get();
70-
}
59+
// private static DataXJobSubmit getDataXJobSubmit(AbstractExecContext execChainContext) {
60+
// DataXJobSubmit.InstanceType instanceType = (TisAppLaunch.isTestMock() && false) ? DataXJobSubmit.InstanceType.EMBEDDED :
61+
// DataXJobSubmit.InstanceType.LOCAL;
62+
//
63+
// Optional<DataXJobSubmit> dataXJobSubmit = DataXJobSubmit.getDataXJobSubmit(execChainContext.isDryRun(),
64+
// instanceType);
65+
// if (dataXJobSubmit.isEmpty()) {
66+
// throw new IllegalStateException("dataXJobSubmit must be present ,instanceType:"
67+
// + instanceType + ",isDryRun:" + execChainContext.isDryRun());
68+
// }
69+
// return dataXJobSubmit.get();
70+
// }
7171

7272

7373
}

tis-datax/executor/tis-datax-executor/src/test/java/com/qlangtech/tis/datax/TestDataxPrePostConsumer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public void testConsumePreExecMessage() throws Exception {
3333

3434
DataXLifecycleHookMsg lifecycleHookMsg = createHookMsg(LifeCycleHook.Prep);
3535

36-
prePostConsumer.consumeMessage(lifecycleHookMsg);
36+
prePostConsumer.consumeMessage(lifecycleHookMsg, taskId);
3737
}
3838

3939

@@ -85,7 +85,7 @@ public void testConsumePostExecMessage() throws Exception {
8585

8686
DataXLifecycleHookMsg lifecycleHookMsg = createHookMsg(LifeCycleHook.Post);
8787

88-
prePostConsumer.consumeMessage(lifecycleHookMsg);
88+
prePostConsumer.consumeMessage(lifecycleHookMsg, taskId);
8989
}
9090

9191
}

tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/datax/BasicWorkflowPayload.java

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,12 @@
2222
import com.alibaba.fastjson.JSONObject;
2323
import com.google.common.collect.Maps;
2424
import com.qlangtech.tis.assemble.FullbuildPhase;
25-
import com.qlangtech.tis.config.k8s.ReplicasSpec;
2625
import com.qlangtech.tis.coredefine.module.action.DistributeJobTriggerBuildResult;
2726
import com.qlangtech.tis.dao.ICommonDAOContext;
2827
import com.qlangtech.tis.datax.IDataxProcessor;
2928
import com.qlangtech.tis.datax.IDataxReader;
3029
import com.qlangtech.tis.datax.StoreResourceType;
3130
import com.qlangtech.tis.datax.impl.DataXCfgGenerator;
32-
import com.qlangtech.tis.datax.job.DataXJobWorker;
3331
import com.qlangtech.tis.exec.ExecutePhaseRange;
3432
import com.qlangtech.tis.exec.IExecChainContext;
3533
import com.qlangtech.tis.fullbuild.indexbuild.IRemoteTaskTrigger;
@@ -433,17 +431,17 @@ protected final JSONObject createInstanceParams(Integer tisTaskId) {
433431
// }
434432
}
435433

436-
protected ReplicasSpec getResourceSeplicasSpec() {
437-
DataXJobWorker worker = getSPIJobWorker();
438-
if (worker != null) {
439-
return worker.getReplicasSpec();
440-
}
441-
return ReplicasSpec.createDftPowerjobServerReplicasSpec();
442-
}
434+
// protected ReplicasSpec getResourceSeplicasSpec() {
435+
// DataXJobWorker worker = getSPIJobWorker();
436+
// if (worker != null) {
437+
// return worker.getReplicasSpec();
438+
// }
439+
// return ReplicasSpec.createDftPowerjobServerReplicasSpec();
440+
// }
443441

444-
protected DataXJobWorker getSPIJobWorker() {
445-
return DataXJobWorker.getK8SDataXPowerJobWorker();
446-
}
442+
// protected DataXJobWorker getSPIJobWorker() {
443+
// return DataXJobWorker.getK8SDataXPowerJobWorker();
444+
// }
447445

448446
public static PhaseStatusCollection createPhaseStatus(
449447
List<Pair<ISelectedTab, SelectedTabTriggers>> triggerCfgs //
@@ -481,7 +479,10 @@ public static PhaseStatusCollection createPhaseStatus(
481479
PhaseStatusCollection statusCollection //
482480
= new PhaseStatusCollection(tisTaskId, Objects.requireNonNull(phaseRange, "phaseRange can not be null"));
483481
statusCollection.setDumpPhase(dumpPhase);
484-
statusCollection.setJoinPhase(joinPhase);
482+
if (containJoinPhaseNodes) {
483+
statusCollection.setJoinPhase(joinPhase);
484+
}
485+
485486

486487
return statusCollection;
487488
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package com.qlangtech.tis;
2+
3+
import com.qlangtech.tis.dag.TISActorSystem;
4+
import com.qlangtech.tis.datax.DataXJobSubmitParams;
5+
import com.qlangtech.tis.plugin.akka.DAORestDelegateFacade;
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
9+
/**
10+
* DataX Worker 任务执行器
11+
* <pre>
12+
* export AKKA_ROLES="worker"
13+
* export AKKA_HOSTNAME="192.168.28.189"
14+
* export AKKA_PORT=2552
15+
* export AKKA_SEED_NODES="akka://TIS-DAG-System@192.168.28.189:2551"
16+
* </pre>
17+
* @author 百岁 (baisui@qlangtech.com)
18+
* @date 2026/3/5
19+
*/
20+
public class DataXWorkerLauncher {
21+
private static final Logger logger = LoggerFactory.getLogger(DataXWorkerLauncher.class);
22+
23+
public static void main(String[] args) {
24+
DataXJobSubmitParams submitParams = DataXJobSubmitParams.getDftIfEmpty();
25+
logger.info("start to launch DataX worker,maxInstancesPerNode:{},maxTotalNrOfInstances:{}", submitParams.maxInstancesPerNode, submitParams.maxTotalNrOfInstances);
26+
DAORestDelegateFacade akkaClusterDependenceDao = DAORestDelegateFacade.createAKKAClusterDependenceDao();
27+
// 3. 创建 TISActorSystem 实例
28+
TISActorSystem tisActorSystem = TISActorSystem.createAndInit(akkaClusterDependenceDao);
29+
30+
// 4. 初始化 Actor System
31+
tisActorSystem.initialize();
32+
logger.info("launch DataX Worker successful");
33+
}
34+
}

0 commit comments

Comments
 (0)