Skip to content

Commit 23c84eb

Browse files
committed
[lake] Make FlussLakeTiering pluggable to customize tiering job construct
1 parent 8b11e00 commit 23c84eb

3 files changed

Lines changed: 119 additions & 76 deletions

File tree

fluss-flink/fluss-flink-tiering/src/README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818

1919
# Fluss Flink Tiering
2020

21-
This module contains one class FlussLakeTiering.
21+
This module provides the infrastructure for tiering Fluss data to lake formats (e.g., Apache Paimon),
22+
consisting of FlussLakeTiering which encapsulates the core configuration and job graph logic,
23+
and FlussLakeTieringEntrypoint which serves as the official Flink job main class and entrypoint.
2224

2325
The reason for extracting it as a separate module is that: When executing the Flink jar job, a jar must be specified.
2426
If a `fluss-flink.jar` is specified, it may cause various classloader issues, as there are also `fluss-flink.jar`
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.flink.tiering;
20+
21+
import org.apache.fluss.config.ConfigOptions;
22+
import org.apache.fluss.config.Configuration;
23+
import org.apache.fluss.flink.adapter.MultipleParameterToolAdapter;
24+
25+
import org.apache.flink.configuration.JobManagerOptions;
26+
import org.apache.flink.core.execution.JobClient;
27+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
28+
29+
import java.util.Map;
30+
31+
import static org.apache.flink.runtime.executiongraph.failover.FailoverStrategyFactoryLoader.FULL_RESTART_STRATEGY_NAME;
32+
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.DATA_LAKE_CONFIG_PREFIX;
33+
import static org.apache.fluss.utils.PropertiesUtils.extractAndRemovePrefix;
34+
import static org.apache.fluss.utils.PropertiesUtils.extractPrefix;
35+
36+
/**
37+
* The entrypoint logic for building and launching a Fluss-to-Lake (e.g., Paimon) data tiering job.
38+
*
39+
* <p>This class is responsible for parsing configuration parameters, initializing the Flink
40+
* execution environment, and coordinating the construction of the tiering pipeline.
41+
*
42+
* <p>Design Motivation: By decoupling the logic from {@link FlussLakeTieringEntrypoint} into this
43+
* class, extensibility is significantly improved. Developers can now extend this class to customize
44+
* configuration extraction (e.g., injecting internal security tokens) without duplicating the core
45+
* entrypoint boilerplate.
46+
*/
47+
public class FlussLakeTiering {
48+
49+
private static final String FLUSS_CONF_PREFIX = "fluss.";
50+
private static final String LAKE_TIERING_CONFIG_PREFIX = "lake.tiering.";
51+
52+
protected final StreamExecutionEnvironment execEnv;
53+
protected final String dataLake;
54+
protected final Map<String, String> flussConfigMap;
55+
protected final Map<String, String> lakeConfigMap;
56+
protected final Map<String, String> lakeTieringConfigMap;
57+
58+
public FlussLakeTiering(String[] args) {
59+
// parse params
60+
final MultipleParameterToolAdapter params = MultipleParameterToolAdapter.fromArgs(args);
61+
Map<String, String> paramsMap = params.toMap();
62+
63+
// extract fluss config
64+
flussConfigMap = extractAndRemovePrefix(paramsMap, FLUSS_CONF_PREFIX);
65+
// we need to get bootstrap.servers
66+
String bootstrapServers = flussConfigMap.get(ConfigOptions.BOOTSTRAP_SERVERS.key());
67+
if (bootstrapServers == null) {
68+
throw new IllegalArgumentException(
69+
String.format(
70+
"The bootstrap server to fluss is not configured, please configure %s",
71+
FLUSS_CONF_PREFIX + ConfigOptions.BOOTSTRAP_SERVERS.key()));
72+
}
73+
flussConfigMap.put(ConfigOptions.BOOTSTRAP_SERVERS.key(), bootstrapServers);
74+
75+
dataLake = paramsMap.get(ConfigOptions.DATALAKE_FORMAT.key());
76+
if (dataLake == null) {
77+
throw new IllegalArgumentException(
78+
ConfigOptions.DATALAKE_FORMAT.key() + " is not configured");
79+
}
80+
81+
// extract lake config
82+
lakeConfigMap =
83+
extractAndRemovePrefix(
84+
paramsMap, String.format("%s%s.", DATA_LAKE_CONFIG_PREFIX, dataLake));
85+
86+
// extract tiering service config
87+
lakeTieringConfigMap = extractPrefix(paramsMap, LAKE_TIERING_CONFIG_PREFIX);
88+
89+
// now, we must use full restart strategy if any task is failed,
90+
// since committer is stateless, if tiering committer is failover, committer
91+
// will lost the collected committable, and will never collect all committable to do commit
92+
// todo: support region failover
93+
org.apache.flink.configuration.Configuration flinkConfig =
94+
new org.apache.flink.configuration.Configuration();
95+
flinkConfig.set(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, FULL_RESTART_STRATEGY_NAME);
96+
97+
execEnv = StreamExecutionEnvironment.getExecutionEnvironment(flinkConfig);
98+
}
99+
100+
protected void run() throws Exception {
101+
// build an run lake tiering job
102+
JobClient jobClient =
103+
LakeTieringJobBuilder.newBuilder(
104+
execEnv,
105+
Configuration.fromMap(flussConfigMap),
106+
Configuration.fromMap(lakeConfigMap),
107+
Configuration.fromMap(lakeTieringConfigMap),
108+
dataLake)
109+
.build();
110+
111+
System.out.printf(
112+
"Starting data tiering service from Fluss to %s, jobId is %s.....%n",
113+
dataLake, jobClient.getJobID());
114+
}
115+
}

fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTieringEntrypoint.java

Lines changed: 1 addition & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -17,84 +17,10 @@
1717

1818
package org.apache.fluss.flink.tiering;
1919

20-
import org.apache.fluss.config.ConfigOptions;
21-
import org.apache.fluss.config.Configuration;
22-
import org.apache.fluss.flink.adapter.MultipleParameterToolAdapter;
23-
24-
import org.apache.flink.configuration.JobManagerOptions;
25-
import org.apache.flink.core.execution.JobClient;
26-
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
27-
28-
import java.util.Map;
29-
30-
import static org.apache.flink.runtime.executiongraph.failover.FailoverStrategyFactoryLoader.FULL_RESTART_STRATEGY_NAME;
31-
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.DATA_LAKE_CONFIG_PREFIX;
32-
import static org.apache.fluss.utils.PropertiesUtils.extractAndRemovePrefix;
33-
import static org.apache.fluss.utils.PropertiesUtils.extractPrefix;
34-
3520
/** The entrypoint for Flink to tier fluss data to lake format like paimon. */
3621
public class FlussLakeTieringEntrypoint {
3722

38-
private static final String FLUSS_CONF_PREFIX = "fluss.";
39-
private static final String LAKE_TIERING_CONFIG_PREFIX = "lake.tiering.";
40-
4123
public static void main(String[] args) throws Exception {
42-
43-
// parse params
44-
final MultipleParameterToolAdapter params = MultipleParameterToolAdapter.fromArgs(args);
45-
Map<String, String> paramsMap = params.toMap();
46-
47-
// extract fluss config
48-
Map<String, String> flussConfigMap = extractAndRemovePrefix(paramsMap, FLUSS_CONF_PREFIX);
49-
// we need to get bootstrap.servers
50-
String bootstrapServers = flussConfigMap.get(ConfigOptions.BOOTSTRAP_SERVERS.key());
51-
if (bootstrapServers == null) {
52-
throw new IllegalArgumentException(
53-
String.format(
54-
"The bootstrap server to fluss is not configured, please configure %s",
55-
FLUSS_CONF_PREFIX + ConfigOptions.BOOTSTRAP_SERVERS.key()));
56-
}
57-
flussConfigMap.put(ConfigOptions.BOOTSTRAP_SERVERS.key(), bootstrapServers);
58-
59-
String dataLake = paramsMap.get(ConfigOptions.DATALAKE_FORMAT.key());
60-
if (dataLake == null) {
61-
throw new IllegalArgumentException(
62-
ConfigOptions.DATALAKE_FORMAT.key() + " is not configured");
63-
}
64-
65-
// extract lake config
66-
Map<String, String> lakeConfigMap =
67-
extractAndRemovePrefix(
68-
paramsMap, String.format("%s%s.", DATA_LAKE_CONFIG_PREFIX, dataLake));
69-
70-
// extract tiering service config
71-
Map<String, String> lakeTieringConfigMap =
72-
extractPrefix(paramsMap, LAKE_TIERING_CONFIG_PREFIX);
73-
74-
// now, we must use full restart strategy if any task is failed,
75-
// since committer is stateless, if tiering committer is failover, committer
76-
// will lost the collected committable, and will never collect all committable to do commit
77-
// todo: support region failover
78-
org.apache.flink.configuration.Configuration flinkConfig =
79-
new org.apache.flink.configuration.Configuration();
80-
flinkConfig.set(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, FULL_RESTART_STRATEGY_NAME);
81-
82-
// build tiering source
83-
final StreamExecutionEnvironment execEnv =
84-
StreamExecutionEnvironment.getExecutionEnvironment(flinkConfig);
85-
86-
// build lake tiering job
87-
JobClient jobClient =
88-
LakeTieringJobBuilder.newBuilder(
89-
execEnv,
90-
Configuration.fromMap(flussConfigMap),
91-
Configuration.fromMap(lakeConfigMap),
92-
Configuration.fromMap(lakeTieringConfigMap),
93-
dataLake)
94-
.build();
95-
96-
System.out.printf(
97-
"Starting data tiering service from Fluss to %s, jobId is %s.....%n",
98-
dataLake, jobClient.getJobID());
24+
new FlussLakeTiering(args).run();
9925
}
10026
}

0 commit comments

Comments
 (0)