Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 45 additions & 3 deletions .github/workflows/backend.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -244,16 +244,58 @@ jobs:
- name: Init k3s
uses: nolar/setup-k3d-k3s@v1
with:
version: v1.27.16+k3s1
version: v1.35.2+k3s1
k3d-args: -s 1 --network dinky_net --api-port 172.28.0.1:6550
k3d-tag: v5.7.5
- name: Get k3s kube config
run: k3d kubeconfig get --all && mkdir ./kube && k3d kubeconfig get --all > ./kube/k3s.yaml && sed -i 's/0.0.0.0/172.28.0.1/g' ./kube/k3s.yaml
- name: Init k8s RBAC and namespace
run: |
kubectl create namespace dinky
kubectl create serviceaccount dinky -n dinky
kubectl create clusterrolebinding flink-role-binding-dinky --clusterrole=edit --serviceaccount=dinky:dinky
cat <<EOF | kubectl apply -f -
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: dinky-node-reader
rules:
- apiGroups: [""]
resources: ["nodes"]
verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: dinky-node-reader-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: dinky-node-reader
subjects:
- kind: ServiceAccount
name: dinky
namespace: dinky
EOF
mkdir ./kube
cat <<EOF > ./kube/k3s.yaml
apiVersion: v1
kind: Config
clusters:
- name: k3d-default
cluster:
server: https://172.28.0.1:6550
insecure-skip-tls-verify: true
users:
- name: dinky
user:
token: $(kubectl create token dinky -n dinky)
contexts:
- name: dinky
context:
cluster: k3d-default
namespace: dinky
user: dinky
current-context: dinky
EOF
- name: Init k3s main images
run: |
docker exec k3d-k3s-default-server-0 crictl pull library/busybox:latest
Expand Down
7 changes: 7 additions & 0 deletions dinky-admin/src/main/resources/mapper/DocumentMapper.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@
<if test='param.subtype!=null and param.subtype!=""'>
and a.subtype = #{param.subtype}
</if>
<if test='param.enabled != null and param.enabled != ""'>
and a.enabled =
<choose>
<when test='param.enabled == true or param.enabled == 1 or param.enabled == "true" or param.enabled == "1"'>1</when>
<otherwise>0</otherwise>
</choose>
</if>
<if test='param.version!=null and param.version!=""'>
and a.version = #{param.version}
</if>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,9 @@
import org.apache.flink.kubernetes.KubernetesClusterClientFactory;
import org.apache.flink.kubernetes.KubernetesClusterDescriptor;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.python.PythonOptions;

import java.lang.reflect.Method;
import java.lang.reflect.InvocationTargetException;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
Expand All @@ -55,7 +53,6 @@
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.text.StrFormatter;
import cn.hutool.core.util.ReflectUtil;
import cn.hutool.core.util.StrUtil;
import io.fabric8.kubernetes.api.model.Pod;
import lombok.Data;
Expand Down Expand Up @@ -224,26 +221,37 @@ public TestResult test() {
// Test mode no jobName, use uuid .
addConfigParas(KubernetesConfigOptions.CLUSTER_ID, UUID.randomUUID().toString());
initConfig();
FlinkKubeClient client = k8sClientHelper.getClient();
if (client instanceof Fabric8FlinkKubeClient) {
Object internalClient = ReflectUtil.getFieldValue(client, "internalClient");
Method method = ReflectUtil.getMethod(internalClient.getClass(), "getVersion");
Object versionInfo = method.invoke(internalClient);
logger.info(
"k8s cluster link successful ; k8s version: {} ; platform: {}",
ReflectUtil.getFieldValue(versionInfo, "gitVersion"),
ReflectUtil.getFieldValue(versionInfo, "platform"));
}
String namespace = configuration.get(KubernetesConfigOptions.NAMESPACE);
k8sClientHelper.getKubernetesClient().pods().inNamespace(namespace).list();
logger.info("k8s cluster link successful ; namespace: {}", namespace);
return TestResult.success();
} catch (Exception e) {
logger.error(Status.GATEWAY_KUBERNETES_TEST_FAILED.getMessage(), e);
String errorDetail = extractTestErrorDetail(e);
return TestResult.fail(
StrFormatter.format("{}:{}", Status.GATEWAY_KUBERNETES_TEST_FAILED.getMessage(), e.getMessage()));
StrFormatter.format("{} {}", Status.GATEWAY_KUBERNETES_TEST_FAILED.getMessage(), errorDetail));
} finally {
close();
}
}

static String extractTestErrorDetail(Throwable throwable) {
Throwable rootCause = throwable;
while (rootCause instanceof InvocationTargetException
&& ((InvocationTargetException) rootCause).getTargetException() != null) {
rootCause = ((InvocationTargetException) rootCause).getTargetException();
}
while (rootCause.getCause() != null && rootCause.getCause() != rootCause) {
rootCause = rootCause.getCause();
}

String message = rootCause.getMessage();
if (StringUtils.isBlank(message)) {
return rootCause.getClass().getName();
}
return StrFormatter.format("{}: {}", rootCause.getClass().getName(), message);
}

@Override
public void killCluster() {
log.info("Start kill cluster: " + config.getFlinkConfig().getJobName());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.gateway.kubernetes;

import static org.junit.Assert.assertEquals;

import java.lang.reflect.InvocationTargetException;

import org.junit.Test;

public class KubernetesGatewayTest {

@Test
public void testExtractTestErrorDetailUnwrapsInvocationTargetException() {
IllegalStateException rootCause = new IllegalStateException("connection refused");
InvocationTargetException invocationTargetException = new InvocationTargetException(rootCause);

String errorDetail = KubernetesGateway.extractTestErrorDetail(invocationTargetException);

assertEquals("java.lang.IllegalStateException: connection refused", errorDetail);
}

@Test
public void testExtractTestErrorDetailFallsBackToClassName() {
NullPointerException rootCause = new NullPointerException();

String errorDetail = KubernetesGateway.extractTestErrorDetail(rootCause);

assertEquals("java.lang.NullPointerException", errorDetail);
}
}
40 changes: 20 additions & 20 deletions dinky-web/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@
<npmVersion>10.5.0</npmVersion>
<pnpmVersion>9.15.4</pnpmVersion>
<!-- 国内node下载加速 -->
<!-- <nodeDownloadRoot>https://mirrors.huaweicloud.com/nodejs/</nodeDownloadRoot>-->
<!-- <npmDownloadRoot>https://repo.huaweicloud.com/npm-software/</npmDownloadRoot>-->
<!-- <nodeDownloadRoot>https://mirrors.huaweicloud.com/nodejs/</nodeDownloadRoot>-->
<!-- <npmDownloadRoot>https://repo.huaweicloud.com/npm-software/</npmDownloadRoot>-->
</configuration>
<executions>
<execution>
Expand All @@ -61,24 +61,24 @@
<goal>install-node-and-pnpm</goal>
</goals>
</execution>
<execution>
<id>install</id>
<goals>
<goal>pnpm</goal>
</goals>
<configuration>
<arguments>install --registry ${npm-registry-repo}</arguments>
</configuration>
</execution>
<execution>
<id>build</id>
<goals>
<goal>pnpm</goal>
</goals>
<configuration>
<arguments>run build</arguments>
</configuration>
</execution>
<execution>
<id>install</id>
<goals>
<goal>pnpm</goal>
</goals>
<configuration>
<arguments>install --registry ${npm-registry-repo}</arguments>
</configuration>
</execution>
<execution>
<id>build</id>
<goals>
<goal>pnpm</goal>
</goals>
<configuration>
<arguments>run build</arguments>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,12 @@ const DocumentTableList: React.FC = () => {
title: l('rc.doc.functionType'),
sorter: true,
dataIndex: 'type',
filterMultiple: true,
filters: true,
valueEnum: DOCUMENT_TYPE_ENUMS
},
{
title: l('rc.doc.subFunctionType'),
sorter: true,
dataIndex: 'subtype',
filters: true,
filterMultiple: true,
renderFormItem: (item, { type }, form) => {
const currentType = form.getFieldValue('type');
let options = currentType === DOCUMENT_TYPE_ENUMS.FUN_UDF.value ? FUNCTION_TYPES : JOB_TYPE;
Expand All @@ -139,8 +135,6 @@ const DocumentTableList: React.FC = () => {
title: l('rc.doc.category'),
sorter: true,
dataIndex: 'category',
filterMultiple: true,
filters: true,
valueEnum: DOCUMENT_CATEGORY_ENUMS
},
{
Expand Down Expand Up @@ -172,11 +166,11 @@ const DocumentTableList: React.FC = () => {
{
title: l('global.table.isEnable'),
dataIndex: 'enabled',
hideInSearch: true,
filters: STATUS_MAPPING(),
filterMultiple: false,
hideInDescriptions: true,
valueEnum: STATUS_ENUM(),
valueType: 'select',
valueEnum: Object.fromEntries(
STATUS_MAPPING().map(item => [item.value, { text: item.text, status: item.value === 1 ? 'Success' : 'Error' }])
),
render: (_, record) => {
return (
<EnableSwitchBtn
Expand Down
11 changes: 6 additions & 5 deletions e2e_test/tools/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ def addStandaloneCluster(session: Session) -> int:

def addApplicationCluster(session: Session, params: dict) -> Optional[int]:
name = params['name']
test_connection_yarn_resp = session.post(url("api/clusterConfiguration/testConnect"), json=params)
assertRespOk(test_connection_yarn_resp, "Test yarn connectivity")
test_connection_yarn_resp = session.put(url("api/clusterConfiguration/saveOrUpdate"), json=params)
assertRespOk(test_connection_yarn_resp, "Add Yarn Application Cluster")
cluster_type = params["type"]
test_connection_resp = session.post(url("api/clusterConfiguration/testConnect"), json=params)
assertRespOk(test_connection_resp, f"Test {cluster_type} connectivity")
save_cluster_resp = session.put(url("api/clusterConfiguration/saveOrUpdate"), json=params)
assertRespOk(save_cluster_resp, f"Add {cluster_type} cluster")
get_app_list = session.get(url(f"api/clusterConfiguration/list?keyword={name}"), json=params)
assertRespOk(get_app_list, "Get Yarn Application Cluster")
assertRespOk(get_app_list, f"Get {cluster_type} cluster")
for data in get_app_list.json()["data"]:
if data["name"] == name:
return data['id']
Expand Down
Loading