5050import org .apache .fluss .rpc .gateway .AdminReadOnlyGateway ;
5151import org .apache .fluss .rpc .gateway .CoordinatorGateway ;
5252import org .apache .fluss .rpc .gateway .TabletServerGateway ;
53+ import org .apache .fluss .rpc .messages .AdjustIsrRequest ;
54+ import org .apache .fluss .rpc .messages .CommitKvSnapshotRequest ;
55+ import org .apache .fluss .rpc .messages .CommitLakeTableSnapshotRequest ;
56+ import org .apache .fluss .rpc .messages .CommitRemoteLogManifestRequest ;
5357import org .apache .fluss .rpc .messages .ControlledShutdownRequest ;
5458import org .apache .fluss .rpc .messages .GetKvSnapshotMetadataRequest ;
5559import org .apache .fluss .rpc .messages .InitWriterRequest ;
5660import org .apache .fluss .rpc .messages .InitWriterResponse ;
61+ import org .apache .fluss .rpc .messages .LakeTieringHeartbeatRequest ;
5762import org .apache .fluss .rpc .messages .MetadataRequest ;
63+ import org .apache .fluss .rpc .messages .PrepareLakeTableSnapshotRequest ;
5864import org .apache .fluss .rpc .metrics .TestingClientMetricGroup ;
5965import org .apache .fluss .security .acl .AccessControlEntry ;
6066import org .apache .fluss .security .acl .AccessControlEntryFilter ;
@@ -825,6 +831,125 @@ void testDynamicConfigs() throws ExecutionException, InterruptedException {
825831 ConfigEntry .ConfigSource .INITIAL_SERVER_CONFIG ));
826832 }
827833
834+ @ Test
835+ void testAdjustIsr () throws Exception {
836+ AdjustIsrRequest request = new AdjustIsrRequest ().setServerId (-1 );
837+
838+ try (RpcClient rpcClient =
839+ RpcClient .create (guestConf , TestingClientMetricGroup .newInstance (), false )) {
840+ CoordinatorGateway guestGateway =
841+ GatewayClientProxy .createGatewayProxy (
842+ () -> FLUSS_CLUSTER_EXTENSION .getCoordinatorServerNode ("CLIENT" ),
843+ rpcClient ,
844+ CoordinatorGateway .class );
845+
846+ // test adjustIsr with external connection (CLIENT listener)
847+ // External connections should be rejected
848+ assertThatThrownBy (() -> guestGateway .adjustIsr (request ).get ())
849+ .rootCause ()
850+ .isInstanceOf (AuthorizationException .class )
851+ .hasMessageContaining ("Only internal requests are permitted." );
852+ }
853+
854+ // test adjustIsr with internal connection (FLUSS listener)
855+ // Internal connections should bypass authorization check
856+ CoordinatorGateway internalGateway =
857+ GatewayClientProxy .createGatewayProxy (
858+ () -> FLUSS_CLUSTER_EXTENSION .getCoordinatorServerNode ("FLUSS" ),
859+ FLUSS_CLUSTER_EXTENSION .getRpcClient (),
860+ CoordinatorGateway .class );
861+
862+ // Even without any ACL permission, internal connection should succeed
863+ // (won't throw AuthorizationException)
864+ // The request may fail for other reasons (e.g., empty tables),
865+ // but it should not fail due to authorization
866+ internalGateway .adjustIsr (request ).get ();
867+ }
868+
869+ @ Test
870+ void testCommitKvSnapshot () throws Exception {
871+ CommitKvSnapshotRequest request =
872+ new CommitKvSnapshotRequest ()
873+ .setCoordinatorEpoch (-1 )
874+ .setBucketLeaderEpoch (-1 )
875+ .setCompletedSnapshot (new byte [0 ]);
876+
877+ try (RpcClient rpcClient =
878+ RpcClient .create (guestConf , TestingClientMetricGroup .newInstance (), false )) {
879+ CoordinatorGateway guestGateway =
880+ GatewayClientProxy .createGatewayProxy (
881+ () -> FLUSS_CLUSTER_EXTENSION .getCoordinatorServerNode ("CLIENT" ),
882+ rpcClient ,
883+ CoordinatorGateway .class );
884+
885+ // test commitKvSnapshot with external connection (CLIENT listener)
886+ // External connections should be rejected
887+ assertThatThrownBy (() -> guestGateway .commitKvSnapshot (request ).get ())
888+ .rootCause ()
889+ .isInstanceOf (AuthorizationException .class )
890+ .hasMessageContaining ("Only internal requests are permitted." );
891+ }
892+
893+ // test commitKvSnapshot with internal connection (FLUSS listener)
894+ // Internal connections should bypass authorization check
895+ CoordinatorGateway internalGateway =
896+ GatewayClientProxy .createGatewayProxy (
897+ () -> FLUSS_CLUSTER_EXTENSION .getCoordinatorServerNode ("FLUSS" ),
898+ FLUSS_CLUSTER_EXTENSION .getRpcClient (),
899+ CoordinatorGateway .class );
900+
901+ // Even without any ACL permission, internal connection should succeed
902+ // (won't throw AuthorizationException)
903+ // The request may fail for other reasons (e.g., invalid snapshot data),
904+ // but it should not fail due to authorization
905+ assertThatThrownBy (() -> internalGateway .commitKvSnapshot (request ).get ())
906+ .rootCause ()
907+ .isNotInstanceOf (AuthorizationException .class );
908+ }
909+
910+ @ Test
911+ void testCommitRemoteLogManifest () throws Exception {
912+ CommitRemoteLogManifestRequest request =
913+ new CommitRemoteLogManifestRequest ()
914+ .setTableId (-1 )
915+ .setBucketId (-1 )
916+ .setRemoteLogManifestPath ("test-path" )
917+ .setRemoteLogStartOffset (0 )
918+ .setRemoteLogEndOffset (0 )
919+ .setCoordinatorEpoch (-1 )
920+ .setBucketLeaderEpoch (-1 );
921+
922+ try (RpcClient rpcClient =
923+ RpcClient .create (guestConf , TestingClientMetricGroup .newInstance (), false )) {
924+ CoordinatorGateway guestGateway =
925+ GatewayClientProxy .createGatewayProxy (
926+ () -> FLUSS_CLUSTER_EXTENSION .getCoordinatorServerNode ("CLIENT" ),
927+ rpcClient ,
928+ CoordinatorGateway .class );
929+
930+ // test commitRemoteLogManifest with external connection (CLIENT listener)
931+ // External connections should be rejected
932+ assertThatThrownBy (() -> guestGateway .commitRemoteLogManifest (request ).get ())
933+ .rootCause ()
934+ .isInstanceOf (AuthorizationException .class )
935+ .hasMessageContaining ("Only internal requests are permitted." );
936+ }
937+
938+ // test commitRemoteLogManifest with internal connection (FLUSS listener)
939+ // Internal connections should bypass authorization check
940+ CoordinatorGateway internalGateway =
941+ GatewayClientProxy .createGatewayProxy (
942+ () -> FLUSS_CLUSTER_EXTENSION .getCoordinatorServerNode ("FLUSS" ),
943+ FLUSS_CLUSTER_EXTENSION .getRpcClient (),
944+ CoordinatorGateway .class );
945+
946+ // Even without any ACL permission, internal connection should succeed
947+ // (won't throw AuthorizationException)
948+ // The request may fail for other reasons (e.g., invalid manifest data),
949+ // but it should not fail due to authorization
950+ internalGateway .commitRemoteLogManifest (request ).get ();
951+ }
952+
828953 @ Test
829954 void testControlledShutdown () throws Exception {
830955 ControlledShutdownRequest request =
@@ -838,14 +963,12 @@ void testControlledShutdown() throws Exception {
838963 rpcClient ,
839964 CoordinatorGateway .class );
840965
841- // test controlledShutdown without ALTER permission on cluster resource
966+ // test controlledShutdown with external connection (CLIENT listener)
967+ // External connections should be rejected
842968 assertThatThrownBy (() -> guestGateway .controlledShutdown (request ).get ())
843969 .rootCause ()
844970 .isInstanceOf (AuthorizationException .class )
845- .hasMessageContaining (
846- String .format (
847- "Principal %s have no authorization to operate ALTER on resource Resource{type=CLUSTER, name='fluss-cluster'}" ,
848- guestPrincipal ));
971+ .hasMessageContaining ("Only internal requests are permitted." );
849972 }
850973
851974 // test controlledShutdown with internal connection (FLUSS listener)
@@ -1020,6 +1143,126 @@ void testCancelRebalance() throws Exception {
10201143 guestAdmin .cancelRebalance (null ).get ();
10211144 }
10221145
1146+ @ Test
1147+ void testPrepareLakeTableSnapshot () throws Exception {
1148+ PrepareLakeTableSnapshotRequest request = new PrepareLakeTableSnapshotRequest ();
1149+
1150+ try (RpcClient rpcClient =
1151+ RpcClient .create (guestConf , TestingClientMetricGroup .newInstance (), false )) {
1152+ CoordinatorGateway guestGateway =
1153+ GatewayClientProxy .createGatewayProxy (
1154+ () -> FLUSS_CLUSTER_EXTENSION .getCoordinatorServerNode ("CLIENT" ),
1155+ rpcClient ,
1156+ CoordinatorGateway .class );
1157+
1158+ // test prepareLakeTableSnapshot without WRITE permission on cluster resource
1159+ assertThatThrownBy (() -> guestGateway .prepareLakeTableSnapshot (request ).get ())
1160+ .rootCause ()
1161+ .isInstanceOf (AuthorizationException .class )
1162+ .hasMessageContaining (
1163+ String .format (
1164+ "Principal %s have no authorization to operate WRITE on resource Resource{type=CLUSTER, name='fluss-cluster'}" ,
1165+ guestPrincipal ));
1166+
1167+ // add WRITE permission to guest user on cluster resource
1168+ rootAdmin
1169+ .createAcls (
1170+ Collections .singletonList (
1171+ new AclBinding (
1172+ Resource .cluster (),
1173+ new AccessControlEntry (
1174+ guestPrincipal ,
1175+ "*" ,
1176+ OperationType .WRITE ,
1177+ PermissionType .ALLOW ))))
1178+ .all ()
1179+ .get ();
1180+
1181+ // test prepareLakeTableSnapshot with WRITE permission should succeed
1182+ guestGateway .prepareLakeTableSnapshot (request ).get ();
1183+ }
1184+ }
1185+
1186+ @ Test
1187+ void testCommitLakeTableSnapshot () throws Exception {
1188+ CommitLakeTableSnapshotRequest request = new CommitLakeTableSnapshotRequest ();
1189+
1190+ try (RpcClient rpcClient =
1191+ RpcClient .create (guestConf , TestingClientMetricGroup .newInstance (), false )) {
1192+ CoordinatorGateway guestGateway =
1193+ GatewayClientProxy .createGatewayProxy (
1194+ () -> FLUSS_CLUSTER_EXTENSION .getCoordinatorServerNode ("CLIENT" ),
1195+ rpcClient ,
1196+ CoordinatorGateway .class );
1197+
1198+ // test commitLakeTableSnapshot without WRITE permission on cluster resource
1199+ assertThatThrownBy (() -> guestGateway .commitLakeTableSnapshot (request ).get ())
1200+ .rootCause ()
1201+ .isInstanceOf (AuthorizationException .class )
1202+ .hasMessageContaining (
1203+ String .format (
1204+ "Principal %s have no authorization to operate WRITE on resource Resource{type=CLUSTER, name='fluss-cluster'}" ,
1205+ guestPrincipal ));
1206+
1207+ // add WRITE permission to guest user on cluster resource
1208+ rootAdmin
1209+ .createAcls (
1210+ Collections .singletonList (
1211+ new AclBinding (
1212+ Resource .cluster (),
1213+ new AccessControlEntry (
1214+ guestPrincipal ,
1215+ "*" ,
1216+ OperationType .WRITE ,
1217+ PermissionType .ALLOW ))))
1218+ .all ()
1219+ .get ();
1220+
1221+ // test commitLakeTableSnapshot with WRITE permission should succeed
1222+ guestGateway .commitLakeTableSnapshot (request ).get ();
1223+ }
1224+ }
1225+
1226+ @ Test
1227+ void testLakeTieringHeartbeat () throws Exception {
1228+ LakeTieringHeartbeatRequest request = new LakeTieringHeartbeatRequest ();
1229+
1230+ try (RpcClient rpcClient =
1231+ RpcClient .create (guestConf , TestingClientMetricGroup .newInstance (), false )) {
1232+ CoordinatorGateway guestGateway =
1233+ GatewayClientProxy .createGatewayProxy (
1234+ () -> FLUSS_CLUSTER_EXTENSION .getCoordinatorServerNode ("CLIENT" ),
1235+ rpcClient ,
1236+ CoordinatorGateway .class );
1237+
1238+ // test lakeTieringHeartbeat without READ permission on cluster resource
1239+ assertThatThrownBy (() -> guestGateway .lakeTieringHeartbeat (request ).get ())
1240+ .rootCause ()
1241+ .isInstanceOf (AuthorizationException .class )
1242+ .hasMessageContaining (
1243+ String .format (
1244+ "Principal %s have no authorization to operate READ on resource Resource{type=CLUSTER, name='fluss-cluster'}" ,
1245+ guestPrincipal ));
1246+
1247+ // add READ permission to guest user on cluster resource
1248+ rootAdmin
1249+ .createAcls (
1250+ Collections .singletonList (
1251+ new AclBinding (
1252+ Resource .cluster (),
1253+ new AccessControlEntry (
1254+ guestPrincipal ,
1255+ "*" ,
1256+ READ ,
1257+ PermissionType .ALLOW ))))
1258+ .all ()
1259+ .get ();
1260+
1261+ // test lakeTieringHeartbeat with READ permission should succeed
1262+ guestGateway .lakeTieringHeartbeat (request ).get ();
1263+ }
1264+ }
1265+
10231266 private static Configuration initConfig () {
10241267 Configuration conf = new Configuration ();
10251268 conf .setInt (ConfigOptions .DEFAULT_REPLICATION_FACTOR , 3 );
0 commit comments