diff --git a/changelog.d/0-release-notes/WPB-22959 b/changelog.d/0-release-notes/WPB-22959 new file mode 100644 index 00000000000..959c52b0ec5 --- /dev/null +++ b/changelog.d/0-release-notes/WPB-22959 @@ -0,0 +1,6 @@ +Team features can now be migrated from Cassandra to Postgres. To migrate: +- Set galley `postgresMigration.teamFeatures` to `migration-to-postgresql`. +- Enable the background-worker flag `migrateTeamFeatures=true` to run the backfill. +- Monitor the `wire_team_features_migration_finished` metric to confirm completion. +- Switch `postgresMigration.teamFeatures` to `postgresql` and restart Galley and background-worker. +- Once fully cut over, drop the Cassandra `team_features_dyn` table. diff --git a/changelog.d/5-internal/WPB-22959 b/changelog.d/5-internal/WPB-22959 index c073df6a257..2bdc60132ac 100644 --- a/changelog.d/5-internal/WPB-22959 +++ b/changelog.d/5-internal/WPB-22959 @@ -1,3 +1 @@ -- Generalized the migration lock for better reuse -- Move logic from `TeamFeatureStore` interpreter to `FeatureConfigSubsystem` -(#4982, #4983) +Migration from Cassandra to Postgres of Team Features (#4982, #4983, #4979) diff --git a/charts/background-worker/templates/configmap.yaml b/charts/background-worker/templates/configmap.yaml index 6c84d808767..3253bcfe3c1 100644 --- a/charts/background-worker/templates/configmap.yaml +++ b/charts/background-worker/templates/configmap.yaml @@ -91,6 +91,7 @@ data: migrateConversations: {{ .migrateConversations }} migrateConversationCodes: {{ .migrateConversationCodes }} + migrateTeamFeatures: {{ .migrateTeamFeatures }} migrateConversationsOptions: {{toYaml .migrateConversationsOptions | indent 6 }} diff --git a/charts/background-worker/values.yaml b/charts/background-worker/values.yaml index 2896d749e89..d5283a64269 100644 --- a/charts/background-worker/values.yaml +++ b/charts/background-worker/values.yaml @@ -73,6 +73,10 @@ config: # It's important to set `settings.postgresMigration.conversationCodes` to `migration-to-postgresql` # before starting the migration. migrateConversationCodes: false + # This will start the migration of team features. + # It's important to set `settings.postgresMigration.teamFeatures` to `migration-to-postgresql` + # before starting the migration. + migrateTeamFeatures: false backendNotificationPusher: pushBackoffMinWait: 10000 # in microseconds, so 10ms @@ -92,6 +96,7 @@ config: postgresMigration: conversation: cassandra conversationCodes: cassandra + teamFeatures: cassandra secrets: {} diff --git a/charts/galley/values.yaml b/charts/galley/values.yaml index f4ac3331c59..6d718b75e8e 100644 --- a/charts/galley/values.yaml +++ b/charts/galley/values.yaml @@ -72,6 +72,7 @@ config: postgresMigration: conversation: cassandra conversationCodes: cassandra + teamFeatures: cassandra settings: httpPoolSize: 128 maxTeamSize: 10000 diff --git a/docs/src/developer/reference/config-options.md b/docs/src/developer/reference/config-options.md index 92f52cbf46b..3e81d7d1ccb 100644 --- a/docs/src/developer/reference/config-options.md +++ b/docs/src/developer/reference/config-options.md @@ -1812,11 +1812,13 @@ galley: postgresMigration: conversation: postgresql conversationCodes: postgresql + teamFeatures: postgresql background-worker: config: postgresMigration: conversation: postgresql conversationCodes: postgresql + teamFeatures: postgresql migrateConversations: false ``` @@ -1847,13 +1849,16 @@ pattern below applies per store. Use it for `conversation` and postgresMigration: conversation: migration-to-postgresql conversationCodes: migration-to-postgresql + teamFeatures: migration-to-postgresql background-worker: config: - postgresMigration: - conversation: migration-to-postgresql - conversationCodes: migration-to-postgresql - migrateConversations: false - migrateConversationCodes: false + postgresMigration: + conversation: migration-to-postgresql + conversationCodes: migration-to-postgresql + teamFeatures: migration-to-postgresql + migrateConversations: false + migrateConversationCodes: false + migrateTeamFeatures: false ``` This change should restart all the galley pods, and new writes will follow @@ -1866,8 +1871,14 @@ pattern below applies per store. Use it for `conversation` and config: migrateConversations: true migrateConversationCodes: true + migrateTeamFeatures: true ``` + During migration, Cassandra rows are not deleted. Writes and migration share + per-row locks to avoid races, so there is no need to delete early. Deletion is + deferred to keep rollback options and to remove Cassandra only after a full + cutover to PostgreSQL-only. + Wait for the store-specific migration metrics to reach `1.0`. For conversations: `wire_local_convs_migration_finished` and `wire_user_remote_convs_migration_finished`. For conversation codes: @@ -1882,13 +1893,16 @@ pattern below applies per store. Use it for `conversation` and postgresMigration: conversation: postgresql conversationCodes: postgresql + teamFeatures: postgresql background-worker: config: - postgresMigration: - conversation: postgresql - conversationCodes: postgresql - migrateConversations: false - migrateConversationCodes: false + postgresMigration: + conversation: postgresql + conversationCodes: postgresql + teamFeatures: postgresql + migrateConversations: false + migrateConversationCodes: false + migrateTeamFeatures: false ``` **How to run migrations independently or in batches** @@ -1956,6 +1970,8 @@ postgresqlPool: postgresMigration: # Valid: cassandra | migration-to-postgresql | postgresql conversation: postgresql + conversationCodes: postgresql + teamFeatures: postgresql # Start the migration worker when true migrateConversations: false @@ -1978,7 +1994,7 @@ Notes - `postgresql` values follow libpq keywords; password is sourced via `secrets.pgPassword`. - RabbitMQ admin fields (`adminHost`, `adminPort`) are templated only when `config.enableFederation` is true. -- `postgresMigration.conversation` must match `galley.config.postgresMigration.conversation` during migration phases. -- `migrateConversations: true` triggers the migration job; leave it `false` for new installs and after migration. +- `postgresMigration.` must match between `galley` and `background-worker` during migration phases. +- `migrateConversations: true` triggers the conversation migration job; leave it `false` for new installs and after migration. - `concurrency`, `jobTimeout`, and `maxAttempts` control parallelism and retry behavior of the consumer. - `brig` and `gundeck` endpoints default to in-cluster services; override via `background-worker.config.brig` and `.gundeck` if your service DNS/ports differ. diff --git a/hack/helm_vars/common.yaml.gotmpl b/hack/helm_vars/common.yaml.gotmpl index e1974ced474..7cd9bb5fac5 100644 --- a/hack/helm_vars/common.yaml.gotmpl +++ b/hack/helm_vars/common.yaml.gotmpl @@ -16,6 +16,7 @@ dynBackendDomain3: dynamic-backend-3.{{ requiredEnv "NAMESPACE_1" }}.svc.cluster {{- $preferredStore := default "cassandra" (env "PREFERRED_STORE") }} conversationStore: {{ $preferredStore }} conversationCodesStore: {{ $preferredStore }} +teamFeaturesStore: {{ $preferredStore }} {{- if (eq (env "UPLOAD_XML_S3_BASE_URL") "") }} uploadXml: {} diff --git a/hack/helm_vars/wire-server/values.yaml.gotmpl b/hack/helm_vars/wire-server/values.yaml.gotmpl index 5efdf49dbdc..3745be436a9 100644 --- a/hack/helm_vars/wire-server/values.yaml.gotmpl +++ b/hack/helm_vars/wire-server/values.yaml.gotmpl @@ -307,6 +307,7 @@ galley: postgresMigration: conversation: {{ .Values.conversationStore }} conversationCodes: {{ .Values.conversationCodesStore }} + teamFeatures: {{ .Values.teamFeaturesStore }} settings: maxConvAndTeamSize: 16 maxTeamSize: 32 @@ -675,6 +676,7 @@ background-worker: postgresMigration: conversation: {{ .Values.conversationStore }} conversationCodes: {{ .Values.conversationCodesStore }} + teamFeatures: {{ .Values.teamFeaturesStore }} rabbitmq: port: 5671 adminPort: 15671 diff --git a/integration/integration.cabal b/integration/integration.cabal index fae812e6f52..c6664e7de1d 100644 --- a/integration/integration.cabal +++ b/integration/integration.cabal @@ -175,6 +175,7 @@ library Test.MessageTimer Test.Migration.Conversation Test.Migration.ConversationCodes + Test.Migration.TeamFeatures Test.Migration.Util Test.MLS Test.MLS.Clients diff --git a/integration/test/Test/Migration/TeamFeatures.hs b/integration/test/Test/Migration/TeamFeatures.hs new file mode 100644 index 00000000000..b25e09bb174 --- /dev/null +++ b/integration/test/Test/Migration/TeamFeatures.hs @@ -0,0 +1,178 @@ +module Test.Migration.TeamFeatures where + +import qualified API.Galley as Public +import qualified API.GalleyInternal as Internal +import Control.Monad.Codensity +import Control.Monad.Reader +import SetupHelpers +import Test.FeatureFlags.Util +import Test.Migration.Util (waitForMigration) +import Testlib.Prelude hiding (pairs) +import Testlib.ResourcePool + +testTeamFeaturesMigration :: (HasCallStack) => App () +testTeamFeaturesMigration = do + resourcePool <- asks (.resourcePool) + runCodensity (acquireResources 1 resourcePool) $ \[backend] -> do + let preMigration = runCodensity (startDynamicBackend backend (conf "cassandra" False)) . const + switchToMigratingInterpreter = runCodensity (startDynamicBackend backend (conf "migration-to-postgresql" False)) . const + startMigration = runCodensity (startDynamicBackend backend (conf "migration-to-postgresql" True)) . const + stopMigration = runCodensity (startDynamicBackend backend (conf "migration-to-postgresql" False)) . const + switchToPostgresInterpreter = runCodensity (startDynamicBackend backend (conf "postgresql" False)) . const + domain = backend.berDomain + + (teams0, teams1) <- + preMigration $ do + teams0 <- replicateM 3 $ createTeam domain 2 + teams1@(team1 : _) <- replicateM 5 $ createTeam domain 1 + for_ teams0 $ \(owner, tid, _) -> enableFeatures owner tid unlockableFeatures + testSetFeatures team1 + testGetFeatures team1 + pure (teams0, teams1) + + team1 : team2 : team3 : team4 : team5 : _ <- pure teams1 + + switchToMigratingInterpreter $ do + assertModifiedFeatures domain teams0 + testSetFeatures team2 + testGetFeatures team1 + testGetFeatures team2 + + startMigration $ do + assertModifiedFeatures domain teams0 + testSetFeatures team3 + testGetFeatures team1 + testGetFeatures team2 + testGetFeatures team3 + waitForMigration domain counterName + + stopMigration $ do + assertModifiedFeatures domain teams0 + testSetFeatures team4 + testGetFeatures team1 + testGetFeatures team2 + testGetFeatures team3 + testGetFeatures team4 + + switchToPostgresInterpreter $ do + assertModifiedFeatures domain teams0 + testSetFeatures team5 + testGetFeatures team1 + testGetFeatures team2 + testGetFeatures team3 + testGetFeatures team4 + testGetFeatures team5 + where + unlockableFeatures :: [String] + unlockableFeatures = + [ "fileSharing", + "conferenceCalling", + "selfDeletingMessages", + "conversationGuestLinks", + "sndFactorPasswordChallenge", + "mls", + "outlookCalIntegration", + "mlsE2EId", + "mlsMigration", + "enforceFileDownloadLocation", + "domainRegistration", + "channels", + "cells", + "consumableNotifications", + "chatBubbles", + "apps", + "simplifiedUserConnectionRequestQRCode", + "stealthUsers", + "meetings", + "meetingsPremium" + ] + + assertModifiedFeatures :: String -> [(Value, String, [Value])] -> App () + assertModifiedFeatures domain teams = do + expectedModifiedFeatures <- + mkExpectedModifiedFeatures defAllFeatures + >>= setField "classifiedDomains.config.domains" [domain] + for_ teams $ \(owner, tid, _) -> + bindResponse (Public.getTeamFeatures owner tid) $ \resp -> do + resp.status `shouldMatchInt` 200 + for_ unlockableFeatures $ \feat -> do + resp.json %. feat %. "status" `shouldMatch` "enabled" + resp.json %. feat %. "lockStatus" `shouldMatch` "unlocked" + resp.json `shouldMatch` expectedModifiedFeatures + + enableFeatures :: Value -> String -> [String] -> App () + enableFeatures owner tid features = do + for_ features $ \name -> do + Internal.setTeamFeatureLockStatus owner tid name "unlocked" + assertSuccess =<< Internal.setTeamFeatureStatus owner tid name "enabled" + + testSetFeatures :: (HasCallStack) => (Value, String, [Value]) -> App () + testSetFeatures (owner, tid, _) = do + Internal.setTeamFeatureLockStatus owner tid "channels" "unlocked" + Internal.setTeamFeatureLockStatus owner tid "enforceFileDownloadLocation" "unlocked" + assertSuccess =<< Internal.setTeamFeatureConfig owner tid "channels" channelsConfig + assertSuccess =<< Internal.setTeamFeatureConfig owner tid "enforceFileDownloadLocation" enforceDownloadLocationConfig + where + channelsConfig :: Value + channelsConfig = + object + [ "status" .= "enabled", + "config" + .= object + [ "allowed_to_create_channels" .= "team-members", + "allowed_to_open_channels" .= "admins" + ] + ] + + enforceDownloadLocationConfig :: Value + enforceDownloadLocationConfig = + object + [ "status" .= "enabled", + "config" .= object ["enforcedDownloadLocation" .= "/tmp/migration-test"] + ] + + testGetFeatures :: (HasCallStack) => (Value, String, [Value]) -> App () + testGetFeatures (owner, tid, _) = do + expectedChannels <- expectedChannelsConfig + expectedDownloadLocation <- expectedEnforceDownloadLocationConfig + bindResponse (Public.getTeamFeature owner tid "channels") $ \resp -> do + resp.status `shouldMatchInt` 200 + resp.json `shouldMatch` expectedChannels + bindResponse (Public.getTeamFeature owner tid "enforceFileDownloadLocation") $ \resp -> do + resp.status `shouldMatchInt` 200 + resp.json `shouldMatch` expectedDownloadLocation + where + expectedChannelsConfig :: App Value + expectedChannelsConfig = do + defChannels <- defAllFeatures %. "channels" + defChannels + & setField "lockStatus" "unlocked" + >>= setField "status" "enabled" + >>= setField "config.allowed_to_create_channels" "team-members" + >>= setField "config.allowed_to_open_channels" "admins" + + expectedEnforceDownloadLocationConfig :: App Value + expectedEnforceDownloadLocationConfig = do + defFeature <- defAllFeatures %. "enforceFileDownloadLocation" + defFeature + & setField "lockStatus" "unlocked" + >>= setField "status" "enabled" + >>= setField "config.enforcedDownloadLocation" "/tmp/migration-test" + + mkExpectedModifiedFeatures :: Value -> App Value + mkExpectedModifiedFeatures features = + foldl (flip update) (pure features) unlockableFeatures + where + update feat = + setField (feat <> ".status") "enabled" + >=> setField (feat <> ".lockStatus") "unlocked" + + conf :: String -> Bool -> ServiceOverrides + conf db runMigration = + def + { galleyCfg = setField "postgresMigration.teamFeatures" db, + backgroundWorkerCfg = setField "migrateTeamFeatures" runMigration + } + +counterName :: String +counterName = "^wire_team_features_migration_finished" diff --git a/libs/wire-api/src/Wire/API/Team/Feature.hs b/libs/wire-api/src/Wire/API/Team/Feature.hs index 37d226b9b16..d36a0a585af 100644 --- a/libs/wire-api/src/Wire/API/Team/Feature.hs +++ b/libs/wire-api/src/Wire/API/Team/Feature.hs @@ -174,6 +174,7 @@ import Test.QuickCheck.Gen (suchThat) import URI.ByteString.QQ qualified as URI.QQ import Wire.API.Conversation.Protocol import Wire.API.MLS.CipherSuite +import Wire.API.PostgresMarshall import Wire.API.Routes.Named hiding (unnamed) import Wire.API.Routes.Version import Wire.API.Routes.Versioned @@ -325,8 +326,15 @@ resolveDbFeature defFeature dbFeature = LockStatusUnlocked -> feat newtype DbConfig = DbConfig {unDbConfig :: A.Value} + deriving newtype (Arbitrary) deriving (Eq, Show) +instance PostgresMarshall A.Value DbConfig where + postgresMarshall = unDbConfig + +instance PostgresUnmarshall A.Value DbConfig where + postgresUnmarshall = Right . DbConfig + instance Default DbConfig where def = DbConfig (A.object []) @@ -629,6 +637,15 @@ instance Cass.Cql LockStatus where toCql LockStatusLocked = Cass.CqlInt 0 toCql LockStatusUnlocked = Cass.CqlInt 1 +instance PostgresMarshall Int32 LockStatus where + postgresMarshall LockStatusLocked = 0 + postgresMarshall LockStatusUnlocked = 1 + +instance PostgresUnmarshall Int32 LockStatus where + postgresUnmarshall 0 = Right LockStatusLocked + postgresUnmarshall 1 = Right LockStatusUnlocked + postgresUnmarshall _ = Left "invalid lockStatus" + newtype LockStatusResponse = LockStatusResponse {_unlockStatus :: LockStatus} deriving stock (Eq, Show, Generic) deriving (Arbitrary) via (GenericUniform LockStatus) @@ -2172,6 +2189,15 @@ instance Cass.Cql FeatureStatus where toCql FeatureStatusDisabled = Cass.CqlInt 0 toCql FeatureStatusEnabled = Cass.CqlInt 1 +instance PostgresMarshall Int32 FeatureStatus where + postgresMarshall FeatureStatusEnabled = 1 + postgresMarshall FeatureStatusDisabled = 0 + +instance PostgresUnmarshall Int32 FeatureStatus where + postgresUnmarshall 1 = Right FeatureStatusEnabled + postgresUnmarshall 0 = Right FeatureStatusDisabled + postgresUnmarshall _ = Left "invalid feature status" + -- | list of available features config types type Features :: [Type] type Features = diff --git a/libs/wire-api/test/unit/Test/Wire/API/Roundtrip/PostgresMarshall.hs b/libs/wire-api/test/unit/Test/Wire/API/Roundtrip/PostgresMarshall.hs index 31ce898c43e..191f55bffd0 100644 --- a/libs/wire-api/test/unit/Test/Wire/API/Roundtrip/PostgresMarshall.hs +++ b/libs/wire-api/test/unit/Test/Wire/API/Roundtrip/PostgresMarshall.hs @@ -20,6 +20,7 @@ module Test.Wire.API.Roundtrip.PostgresMarshall (tests) where import Crypto.Error (CryptoFailable (..)) import Crypto.KDF.Argon2 qualified as Argon2 +import Data.Aeson as A import Data.ByteString.Char8 qualified as BS8 import Data.Code qualified as Code import Data.Misc (PlainTextPassword8, fromPlainTextPassword) @@ -32,6 +33,7 @@ import Wire.API.Password as Password import Wire.API.Password.Argon2id (Argon2HashedPassword (..), encodeArgon2HashedPassword) import Wire.API.Password.Scrypt (encodeScryptPassword) import Wire.API.PostgresMarshall +import Wire.API.Team.Feature import Wire.Arbitrary qualified as Arbitrary () tests :: T.TestTree @@ -39,7 +41,10 @@ tests = T.localOption (T.Timeout (60 * 1000000) "60s") . T.testGroup "PostgresMarshall roundtrip tests" $ [ testRoundTrip @Text @Code.Key, testRoundTrip @Text @Code.Value, - testRoundTrip @ByteString @Password.Password + testRoundTrip @ByteString @Password.Password, + testRoundTrip @Int32 @FeatureStatus, + testRoundTrip @Int32 @LockStatus, + testRoundTrip @A.Value @DbConfig ] testRoundTrip :: diff --git a/libs/wire-subsystems/postgres-migrations/20260123124917-team-features.sql b/libs/wire-subsystems/postgres-migrations/20260123124917-team-features.sql new file mode 100644 index 00000000000..a95e85d90ba --- /dev/null +++ b/libs/wire-subsystems/postgres-migrations/20260123124917-team-features.sql @@ -0,0 +1,8 @@ +CREATE TABLE team_features ( + team uuid NOT NULL, + feature text NOT NULL, + config jsonb, + lock_status int, + status int, + PRIMARY KEY (team, feature) +); diff --git a/libs/wire-subsystems/src/Wire/CodeStore/DualWrite.hs b/libs/wire-subsystems/src/Wire/CodeStore/DualWrite.hs index 112b61a91ed..1f7d263b8ae 100644 --- a/libs/wire-subsystems/src/Wire/CodeStore/DualWrite.hs +++ b/libs/wire-subsystems/src/Wire/CodeStore/DualWrite.hs @@ -31,6 +31,7 @@ import Wire.CodeStore.Cassandra qualified as Cassandra import Wire.CodeStore.Postgres qualified as Postgres import Wire.Postgres (PGConstraints) +-- | Cassandra is the source of truth during migration; writes are mirrored to Postgres. interpretCodeStoreToCassandraAndPostgres :: ( Member (Input ClientState) r, Member (Input (Either HttpsUrl (Map Text HttpsUrl))) r, @@ -38,8 +39,6 @@ interpretCodeStoreToCassandraAndPostgres :: ) => Sem (CodeStore ': r) a -> Sem r a - --- | Cassandra is the source of truth during migration; writes are mirrored to Postgres. interpretCodeStoreToCassandraAndPostgres = interpret $ \case GetCode k -> do Cassandra.interpretCodeStoreToCassandra $ CodeStore.getCode k diff --git a/libs/wire-subsystems/src/Wire/CodeStore/Migration.hs b/libs/wire-subsystems/src/Wire/CodeStore/Migration.hs index d260e9cb02f..8f14bc5423e 100644 --- a/libs/wire-subsystems/src/Wire/CodeStore/Migration.hs +++ b/libs/wire-subsystems/src/Wire/CodeStore/Migration.hs @@ -15,13 +15,10 @@ -- You should have received a copy of the GNU Affero General Public License along -- with this program. If not, see . -module Wire.CodeStore.Migration - ( MigrationOptions (..), - migrateCodesLoop, - ) -where +module Wire.CodeStore.Migration (migrateCodesLoop) where import Cassandra hiding (Value) +import Data.ByteString.Conversion import Data.Code (Key, Value) import Data.Conduit import Data.Conduit.List qualified as C @@ -30,7 +27,6 @@ import Data.Misc (HttpsUrl) import Hasql.Pool qualified as Hasql import Imports import Polysemy -import Polysemy.Error import Polysemy.Input import Polysemy.State import Polysemy.TinyLog @@ -101,25 +97,7 @@ migrateAllCodes migOpts migCounter = do lift $ info $ Log.msg (Log.val "migrateAllCodes") withCount (paginateSem Cql.selectAllCodes (paramsP LocalQuorum () migOpts.pageSize) x5) .| logRetrievedPage migOpts.pageSize id - .| C.mapM_ (traverse_ (handleErrors (migrateCodeRow migCounter))) - -handleErrors :: - ( Member (State Int) r, - Member TinyLog r - ) => - ((Key, Value, Int32, ConvId, Maybe Password) -> Sem (Error Hasql.UsageError : r) ()) -> - (Key, Value, Int32, ConvId, Maybe Password) -> - Sem r () -handleErrors action row@(k, _, _, _, _) = do - eithErr <- runError (action row) - case eithErr of - Right _ -> pure () - Left e -> do - warn $ - Log.msg (Log.val "error occurred during migration") - . Log.field "key" (show k) - . Log.field "error" (show e) - modify (+ 1) + .| C.mapM_ (traverse_ (\row@(key, _, _, _, _) -> handleErrors (toByteString' key) (migrateCodeRow migCounter row))) migrateCodeRow :: ( Member (Input (Either HttpsUrl (Map Text HttpsUrl))) r, diff --git a/libs/wire-subsystems/src/Wire/ConversationStore.hs b/libs/wire-subsystems/src/Wire/ConversationStore.hs index 9b7b4b97e24..5992eaf23d2 100644 --- a/libs/wire-subsystems/src/Wire/ConversationStore.hs +++ b/libs/wire-subsystems/src/Wire/ConversationStore.hs @@ -20,14 +20,12 @@ module Wire.ConversationStore where import Control.Error (lastMay) -import Data.Aeson import Data.Aeson qualified as Aeson import Data.ByteString qualified as BS import Data.Id import Data.Misc import Data.Qualified import Data.Range -import Data.Text qualified as Text import Data.Time.Clock import Imports import Polysemy @@ -204,37 +202,6 @@ getConversationIds lusr maxIds pagingState = do } } -data StorageLocation - = -- | Use when solely using Cassandra - CassandraStorage - | -- | Use while migration to postgresql. Using this option does not trigger - -- the migration. Newly created conversations are stored in Postgresql. - -- Once this has been turned on, it MUST NOT be made CassandraStorage ever - -- again. - MigrationToPostgresql - | -- | Use after migrating to postgresql - PostgresqlStorage - deriving (Show) - -instance FromJSON StorageLocation where - parseJSON = withText "StorageLocation" $ \case - "cassandra" -> pure CassandraStorage - "migration-to-postgresql" -> pure MigrationToPostgresql - "postgresql" -> pure PostgresqlStorage - x -> fail $ "Invalid storage location: " <> Text.unpack x <> ". Valid options: cassandra, postgresql, migration-to-postgresql" - -data PostgresMigrationOpts = PostgresMigrationOpts - { conversation :: StorageLocation, - conversationCodes :: StorageLocation - } - deriving (Show) - -instance FromJSON PostgresMigrationOpts where - parseJSON = withObject "PostgresMigrationOpts" $ \o -> - PostgresMigrationOpts - <$> o .: "conversation" - <*> o .: "conversationCodes" - getConvOrSubGroupInfo :: (Member ConversationStore r) => ConvOrSubConvId -> diff --git a/libs/wire-subsystems/src/Wire/ConversationStore/Migration.hs b/libs/wire-subsystems/src/Wire/ConversationStore/Migration.hs index 44cd9195115..4b32b9d0ccc 100644 --- a/libs/wire-subsystems/src/Wire/ConversationStore/Migration.hs +++ b/libs/wire-subsystems/src/Wire/ConversationStore/Migration.hs @@ -67,7 +67,7 @@ import Wire.ConversationStore.Cassandra (interpretConversationStoreToCassandra) import Wire.ConversationStore.MLS.Types import Wire.ConversationStore.Migration.Cleanup import Wire.ConversationStore.Migration.Types -import Wire.Migration +import Wire.Migration hiding (handleErrors) import Wire.MigrationLock import Wire.Postgres import Wire.Sem.Concurrency (Concurrency, ConcurrencySafety (..), unsafePooledMapConcurrentlyN_) diff --git a/libs/wire-subsystems/src/Wire/FeaturesConfigSubsystem/Interpreter.hs b/libs/wire-subsystems/src/Wire/FeaturesConfigSubsystem/Interpreter.hs index 852e4d57c87..92d56436d31 100644 --- a/libs/wire-subsystems/src/Wire/FeaturesConfigSubsystem/Interpreter.hs +++ b/libs/wire-subsystems/src/Wire/FeaturesConfigSubsystem/Interpreter.hs @@ -22,11 +22,10 @@ import Wire.FeaturesConfigSubsystem import Wire.FeaturesConfigSubsystem.Types import Wire.FeaturesConfigSubsystem.Utils import Wire.TeamFeatureStore +import Wire.TeamFeatureStore.Error (TeamFeatureStoreError (..)) import Wire.TeamSubsystem (TeamSubsystem) import Wire.TeamSubsystem qualified as TeamSubsystem -data TeamFeatureStoreError = TeamFeatureStoreErrorInternalError LText - runFeaturesConfigSubsystem :: forall r a. ( Member TeamFeatureStore r, diff --git a/libs/wire-subsystems/src/Wire/Migration.hs b/libs/wire-subsystems/src/Wire/Migration.hs index 3a1d6503d3c..d2ae1573e33 100644 --- a/libs/wire-subsystems/src/Wire/Migration.hs +++ b/libs/wire-subsystems/src/Wire/Migration.hs @@ -24,9 +24,12 @@ import Data.Conduit import Data.Conduit.Internal (zipSources) import Data.Conduit.List qualified as C import GHC.Generics (Generically (..)) +import Hasql.Pool qualified as Hasql import Imports import Polysemy +import Polysemy.Error import Polysemy.Input +import Polysemy.State import Polysemy.TinyLog import Prometheus qualified import System.Logger qualified as Log @@ -127,3 +130,22 @@ paginateSem q p r = do getNextPage page = do client <- input embedClient client $ retry r (nextPage page) + +handleErrors :: + forall r. + ( Member (State Int) r, + Member TinyLog r + ) => + ByteString -> + (Sem (Error Hasql.UsageError : r) ()) -> + Sem r () +handleErrors key action = do + eithErr <- runError action + case eithErr of + Right _ -> pure () + Left e -> do + warn $ + Log.msg (Log.val "error occurred during migration") + . Log.field "key" (show key) + . Log.field "error" (show e) + modify (+ 1) diff --git a/libs/wire-subsystems/src/Wire/MigrationLock.hs b/libs/wire-subsystems/src/Wire/MigrationLock.hs index d76448fb285..140d7342bba 100644 --- a/libs/wire-subsystems/src/Wire/MigrationLock.hs +++ b/libs/wire-subsystems/src/Wire/MigrationLock.hs @@ -20,6 +20,7 @@ module Wire.MigrationLock where import Data.Bits +import Data.Hashable (hash) import Data.Id import Data.UUID qualified as UUID import Data.Vector (Vector) @@ -28,6 +29,9 @@ import Hasql.Session qualified as Session import Hasql.Statement qualified as Hasql import Hasql.TH import Imports +import Network.HTTP.Types.Status (status500) +import Network.Wai.Utilities.Error qualified as WaiError +import Network.Wai.Utilities.JSONResponse import Polysemy import Polysemy.Async import Polysemy.Conc.Effect.Race @@ -37,6 +41,7 @@ import Polysemy.Time.Data.TimeUnit import Polysemy.TinyLog (TinyLog) import Polysemy.TinyLog qualified as TinyLog import System.Logger.Message qualified as Log +import Wire.API.Error import Wire.API.PostgresMarshall import Wire.Postgres @@ -56,6 +61,9 @@ data LockType data MigrationLockError = TimedOutAcquiringLock deriving (Show) +instance APIError MigrationLockError where + toResponse _ = waiErrorToJSONResponse $ WaiError.mkError status500 "internal-server-error" "Internal Server Error" + withMigrationLocks :: forall x a u r. ( PGConstraints r, @@ -137,6 +145,15 @@ withMigrationLocks lockType maxWait lockables action = do -------------------------------------------------------------------------------- -- INSTANCES +-- Combines team id and feature name into one lock key to keep per-feature locks distinct within a team +-- without introducing a separate lock table; rotate+xor mixes the two hashes to reduce collisions. +instance MigrationLockable (TeamId, Text) where + lockKey (team, featureName) = + let teamHash = hashUUID team + featureHash = fromIntegral (hash featureName) + in teamHash `xor` rotateL featureHash 1 + lockScope = "team_feature" + instance MigrationLockable ConvId where lockKey = hashUUID lockScope = "conv" diff --git a/libs/wire-subsystems/src/Wire/PostgresMigrationOpts.hs b/libs/wire-subsystems/src/Wire/PostgresMigrationOpts.hs new file mode 100644 index 00000000000..86fa90c878b --- /dev/null +++ b/libs/wire-subsystems/src/Wire/PostgresMigrationOpts.hs @@ -0,0 +1,57 @@ +{-# OPTIONS_GHC -fforce-recomp #-} + +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2025 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Wire.PostgresMigrationOpts where + +import Data.Aeson +import Data.Text qualified as Text +import Imports + +data StorageLocation + = -- | Use when solely using Cassandra + CassandraStorage + | -- | Use while migration to postgresql. Using this option does not trigger + -- the migration. Newly created data is stored in Postgresql. + -- Once this has been turned on, it MUST NOT be made CassandraStorage ever + -- again. + MigrationToPostgresql + | -- | Use after migrating to postgresql + PostgresqlStorage + deriving (Show) + +instance FromJSON StorageLocation where + parseJSON = withText "StorageLocation" $ \case + "cassandra" -> pure CassandraStorage + "migration-to-postgresql" -> pure MigrationToPostgresql + "postgresql" -> pure PostgresqlStorage + x -> fail $ "Invalid storage location: " <> Text.unpack x <> ". Valid options: cassandra, postgresql, migration-to-postgresql" + +data PostgresMigrationOpts = PostgresMigrationOpts + { conversation :: StorageLocation, + conversationCodes :: StorageLocation, + teamFeatures :: StorageLocation + } + deriving (Show) + +instance FromJSON PostgresMigrationOpts where + parseJSON = withObject "PostgresMigrationOpts" $ \o -> + PostgresMigrationOpts + <$> o .: "conversation" + <*> o .: "conversationCodes" + <*> o .: "teamFeatures" diff --git a/libs/wire-subsystems/src/Wire/TeamFeatureStore/Cassandra.hs b/libs/wire-subsystems/src/Wire/TeamFeatureStore/Cassandra.hs index 29ed1db7f26..e21a4555c34 100644 --- a/libs/wire-subsystems/src/Wire/TeamFeatureStore/Cassandra.hs +++ b/libs/wire-subsystems/src/Wire/TeamFeatureStore/Cassandra.hs @@ -22,17 +22,16 @@ module Wire.TeamFeatureStore.Cassandra (interpretTeamFeatureStoreToCassandra) wh import Cassandra import Data.Constraint import Data.Id -import Data.Map qualified as M +import Data.Map qualified as Map import Data.Proxy import Data.SOP (K (..), hcpure) -import Data.SOP.Constraint qualified as SOP import Imports import Polysemy import Polysemy.Input import Wire.API.Team.Feature import Wire.API.Team.Feature.TH -import Wire.ConversationStore.Cassandra.Instances () import Wire.TeamFeatureStore (AllDbFeaturePatches, DbFeaturePatch, TeamFeatureStore (..)) +import Wire.TeamFeatureStore.Cassandra.Queries import Wire.Util interpretTeamFeatureStoreToCassandra :: @@ -63,12 +62,8 @@ getDbFeatureImpl :: Sem r (Maybe DbFeaturePatch) getDbFeatureImpl sing tid = case featureSingIsFeature sing of Dict -> do - let q :: PrepQuery R (TeamId, Text) (Maybe FeatureStatus, Maybe LockStatus, Maybe DbConfig) - q = "select status, lock_status, config from team_features_dyn where team = ? and feature = ?" - (embedClientInput (retry x1 $ query1 q (params LocalQuorum (tid, featureName @cfg)))) >>= \case - Nothing -> pure Nothing - Just (status, lockStatus, config) -> - pure $ Just LockableFeaturePatch {..} + mRow <- (embedClientInput (retry x1 $ query1 select (params LocalQuorum (tid, featureName @cfg)))) + pure $ (\(status, lockStatus, config) -> LockableFeaturePatch {..}) <$> mRow setDbFeatureImpl :: forall cfg r. @@ -107,15 +102,6 @@ patchDbFeatureImpl sing tid patch = case featureSingIsFeature sing of for_ patch.status $ \featureStatus -> addPrepQuery writeStatus (featureStatus, tid, featureName @cfg) for_ patch.lockStatus $ \lockStatus -> addPrepQuery writeLockStatus (lockStatus, tid, featureName @cfg) for_ patch.config $ \config -> addPrepQuery writeConfig (serialiseDbConfig config, tid, featureName @cfg) - where - writeStatus :: PrepQuery W (FeatureStatus, TeamId, Text) () - writeStatus = "update team_features_dyn set status = ? where team = ? and feature = ?" - - writeLockStatus :: PrepQuery W (LockStatus, TeamId, Text) () - writeLockStatus = "update team_features_dyn set lock_status = ? where team = ? and feature = ?" - - writeConfig :: PrepQuery W (DbConfig, TeamId, Text) () - writeConfig = "update team_features_dyn set config = ? where team = ? and feature = ?" setFeatureLockStatusImpl :: forall cfg r. @@ -128,11 +114,9 @@ setFeatureLockStatusImpl :: Sem r () setFeatureLockStatusImpl sing tid (Tagged lockStatus) = case featureSingIsFeature sing of Dict -> do - let q :: PrepQuery W (LockStatus, TeamId, Text) () - q = "update team_features_dyn set lock_status = ? where team = ? and feature = ?" embedClientInput $ retry x5 $ - write q (params LocalQuorum (lockStatus, tid, featureName @cfg)) + write writeLockStatus (params LocalQuorum (lockStatus, tid, featureName @cfg)) getAllDbFeaturesImpl :: ( Member (Embed IO) r, @@ -141,19 +125,14 @@ getAllDbFeaturesImpl :: TeamId -> Sem r AllDbFeaturePatches getAllDbFeaturesImpl tid = do - let q :: PrepQuery R (Identity TeamId) (Text, Maybe FeatureStatus, Maybe LockStatus, Maybe DbConfig) - q = "select feature, status, lock_status, config from team_features_dyn where team = ?" - rows <- embedClientInput $ retry x1 $ query q (params LocalQuorum (Identity tid)) - let m = M.fromList $ do + rows <- embedClientInput $ retry x1 $ query selectAllByTeam (params LocalQuorum (Identity tid)) + let m = Map.fromList $ do (name, status, lockStatus, config) <- rows pure (name, LockableFeaturePatch {..}) pure $ mkAllDbFeaturePatches m - -mkAllDbFeaturePatches :: - (SOP.All IsFeatureConfig Features) => - M.Map Text DbFeaturePatch -> - AllDbFeaturePatches -mkAllDbFeaturePatches m = hcpure (Proxy @IsFeatureConfig) get where - get :: forall cfg. (IsFeatureConfig cfg) => K (Maybe DbFeaturePatch) cfg - get = K (M.lookup (featureName @cfg) m) + mkAllDbFeaturePatches :: Map Text DbFeaturePatch -> AllDbFeaturePatches + mkAllDbFeaturePatches m = hcpure (Proxy @IsFeatureConfig) $ get m + + get :: forall cfg. (IsFeatureConfig cfg) => Map Text DbFeaturePatch -> K (Maybe DbFeaturePatch) cfg + get m = K (Map.lookup (featureName @cfg) m) diff --git a/libs/wire-subsystems/src/Wire/TeamFeatureStore/Cassandra/Queries.hs b/libs/wire-subsystems/src/Wire/TeamFeatureStore/Cassandra/Queries.hs new file mode 100644 index 00000000000..f2eb9537973 --- /dev/null +++ b/libs/wire-subsystems/src/Wire/TeamFeatureStore/Cassandra/Queries.hs @@ -0,0 +1,41 @@ +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2026 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Wire.TeamFeatureStore.Cassandra.Queries where + +import Cassandra +import Data.Id +import Imports +import Wire.API.Team.Feature + +select :: PrepQuery R (TeamId, Text) (Maybe FeatureStatus, Maybe LockStatus, Maybe DbConfig) +select = "select status, lock_status, config from team_features_dyn where team = ? and feature = ?" + +writeStatus :: PrepQuery W (FeatureStatus, TeamId, Text) () +writeStatus = "update team_features_dyn set status = ? where team = ? and feature = ?" + +writeLockStatus :: PrepQuery W (LockStatus, TeamId, Text) () +writeLockStatus = "update team_features_dyn set lock_status = ? where team = ? and feature = ?" + +writeConfig :: PrepQuery W (DbConfig, TeamId, Text) () +writeConfig = "update team_features_dyn set config = ? where team = ? and feature = ?" + +selectAllByTeam :: PrepQuery R (Identity TeamId) (Text, Maybe FeatureStatus, Maybe LockStatus, Maybe DbConfig) +selectAllByTeam = "select feature, status, lock_status, config from team_features_dyn where team = ?" + +selectAll :: PrepQuery R () (TeamId, Text, Maybe FeatureStatus, Maybe LockStatus, Maybe DbConfig) +selectAll = "select team, feature, status, lock_status, config from team_features_dyn" diff --git a/libs/wire-subsystems/src/Wire/TeamFeatureStore/Error.hs b/libs/wire-subsystems/src/Wire/TeamFeatureStore/Error.hs new file mode 100644 index 00000000000..234e2700117 --- /dev/null +++ b/libs/wire-subsystems/src/Wire/TeamFeatureStore/Error.hs @@ -0,0 +1,35 @@ +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2026 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . +module Wire.TeamFeatureStore.Error where + +import Data.Aeson.Types qualified as A +import Data.Text.Lazy qualified as LT +import Imports +import Polysemy +import Polysemy.Error + +data TeamFeatureStoreError = TeamFeatureStoreErrorInternalError LText + +runFeatureParser :: + forall r a. + (Member (Error TeamFeatureStoreError) r) => + A.Parser a -> + Sem r a +runFeatureParser p = + mapError (TeamFeatureStoreErrorInternalError . LT.pack) + . fromEither + $ A.parseEither (const p) () diff --git a/libs/wire-subsystems/src/Wire/TeamFeatureStore/Migrating.hs b/libs/wire-subsystems/src/Wire/TeamFeatureStore/Migrating.hs new file mode 100644 index 00000000000..e85b883c9ed --- /dev/null +++ b/libs/wire-subsystems/src/Wire/TeamFeatureStore/Migrating.hs @@ -0,0 +1,191 @@ +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2026 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Wire.TeamFeatureStore.Migrating where + +import Cassandra +import Data.Constraint +import Data.Id +import Data.SOP (K (..), hzipWith) +import Imports +import Polysemy +import Polysemy.Async +import Polysemy.Conc.Effect.Race +import Polysemy.Error +import Polysemy.Input +import Polysemy.Time +import Polysemy.TinyLog +import Wire.API.Team.Feature +import Wire.API.Team.Feature.TH +import Wire.MigrationLock +import Wire.Postgres +import Wire.TeamFeatureStore +import Wire.TeamFeatureStore.Cassandra +import Wire.TeamFeatureStore.Cassandra.Queries as Cql +import Wire.TeamFeatureStore.Postgres +import Wire.TeamFeatureStore.Postgres.Queries as Psql +import Wire.Util + +interpretTeamFeatureStoreToCassandraAndPostgres :: + ( PGConstraints r, + Member (Input ClientState) r, + Member TinyLog r, + Member Async r, + Member Race r, + Member (Error MigrationLockError) r + ) => + Sem (TeamFeatureStore ': r) a -> + Sem r a +interpretTeamFeatureStoreToCassandraAndPostgres = interpret $ \case + GetDbFeature sing tid -> getDbFeatureImpl sing tid + GetAllDbFeatures tid -> getAllDbFeaturesImpl tid + SetDbFeature sing tid feat -> setDbFeatureImpl sing tid feat + SetFeatureLockStatus sing tid lock -> setFeatureLockStatusImpl sing tid lock + PatchDbFeature sing tid feat -> patchDbFeatureImpl sing tid feat + +-- Read path under lock: +-- - Prefer Postgres; fallback to Cassandra; if neither exists → Nothing. +getDbFeatureImpl :: + forall cfg r. + ( PGConstraints r, + Member TinyLog r, + Member Async r, + Member Race r, + Member (Input ClientState) r, + Member (Error MigrationLockError) r + ) => + FeatureSingleton cfg -> + TeamId -> + Sem r (Maybe DbFeaturePatch) +getDbFeatureImpl sing tid = case featureSingIsFeature sing of + Dict -> + withSharedLock (tid, featureName @cfg) $ do + mFeature <- interpretTeamFeatureStoreToPostgres $ send (GetDbFeature sing tid) + maybe + (interpretTeamFeatureStoreToCassandra $ send (GetDbFeature sing tid)) + (pure . Just) + mFeature + +-- Read all feature, no lock: +-- - Read all features from Postgres. +-- - Read all features from Cassandra. +-- - Merge per‑feature with precedence: Postgres wins, fallback to Cassandra, otherwise Nothing. +getAllDbFeaturesImpl :: + forall r. + ( PGConstraints r, + Member (Input ClientState) r + ) => + TeamId -> + Sem r AllDbFeaturePatches +getAllDbFeaturesImpl tid = do + mergeDbFeaturePatches + <$> interpretTeamFeatureStoreToPostgres (send (GetAllDbFeatures tid)) + <*> interpretTeamFeatureStoreToCassandra (send (GetAllDbFeatures tid)) + where + mergeDbFeaturePatches = hzipWith $ \(K psqlPatch) (K cassPatch) -> K (psqlPatch <|> cassPatch) + +setDbFeatureImpl :: + forall cfg r. + ( PGConstraints r, + Member TinyLog r, + Member Async r, + Member Race r, + Member (Input ClientState) r, + Member (Error MigrationLockError) r + ) => + FeatureSingleton cfg -> + TeamId -> + LockableFeature cfg -> + Sem r () +setDbFeatureImpl sing tid feat = case featureSingIsFeature sing of + Dict -> withWritePathUnderLock sing tid $ send (SetDbFeature sing tid feat) + +setFeatureLockStatusImpl :: + forall cfg r. + ( PGConstraints r, + Member TinyLog r, + Member Async r, + Member Race r, + Member (Input ClientState) r, + Member (Error MigrationLockError) r + ) => + FeatureSingleton cfg -> + TeamId -> + LockStatus -> + Sem r () +setFeatureLockStatusImpl sing tid lock = case featureSingIsFeature sing of + Dict -> withWritePathUnderLock sing tid $ send (SetFeatureLockStatus sing tid lock) + +patchDbFeatureImpl :: + forall cfg r. + ( PGConstraints r, + Member TinyLog r, + Member Async r, + Member Race r, + Member (Input ClientState) r, + Member (Error MigrationLockError) r + ) => + FeatureSingleton cfg -> + TeamId -> + LockableFeaturePatch cfg -> + Sem r () +patchDbFeatureImpl sing tid feat = case featureSingIsFeature sing of + Dict -> withWritePathUnderLock sing tid $ send (PatchDbFeature sing tid feat) + +-- Write path under lock: +-- 1. Check Postgres for row. +-- 2. If exists -> write Postgres. +-- 3. Else check Cassandra. +-- 4. If exists -> write Cassandra. +-- 5. Else -> write Postgres (new canonical row). +withWritePathUnderLock :: + forall cfg r a. + ( PGConstraints r, + Member TinyLog r, + Member Async r, + Member Race r, + Member (Input ClientState) r, + Member (Error MigrationLockError) r, + IsFeatureConfig cfg + ) => + FeatureSingleton cfg -> + TeamId -> + Sem (TeamFeatureStore ': r) a -> + Sem r a +withWritePathUnderLock _ tid action = + withSharedLock (tid, featureName @cfg) $ do + isMigrated <- runStatement (tid, featureName @cfg) Psql.exists + if isMigrated + then interpretTeamFeatureStoreToPostgres action + else do + existsInCassandra <- isJust <$> runSelectCql + if existsInCassandra + then interpretTeamFeatureStoreToCassandra action + else interpretTeamFeatureStoreToPostgres action + where + runSelectCql = embedClientInput (retry x1 $ query1 Cql.select (params LocalQuorum (tid, featureName @cfg))) + +withSharedLock :: + ( PGConstraints r, + Member TinyLog r, + Member Async r, + Member Race r, + Member (Error MigrationLockError) r, + MigrationLockable x + ) => + x -> Sem r a -> Sem r a +withSharedLock lockable = withMigrationLocks LockShared (MilliSeconds 500) [lockable] diff --git a/libs/wire-subsystems/src/Wire/TeamFeatureStore/Migration.hs b/libs/wire-subsystems/src/Wire/TeamFeatureStore/Migration.hs new file mode 100644 index 00000000000..d072cc590ca --- /dev/null +++ b/libs/wire-subsystems/src/Wire/TeamFeatureStore/Migration.hs @@ -0,0 +1,149 @@ +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2026 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Wire.TeamFeatureStore.Migration where + +import Cassandra hiding (Value) +import Data.ByteString.Conversion +import Data.Conduit +import Data.Conduit.List qualified as C +import Data.Id +import Hasql.Pool qualified as Hasql +import Imports +import Polysemy +import Polysemy.Async +import Polysemy.Conc +import Polysemy.Error +import Polysemy.Input +import Polysemy.State +import Polysemy.Time +import Polysemy.TinyLog +import Prometheus qualified +import System.Logger qualified as Log +import Wire.API.Team.Feature +import Wire.Migration hiding (handleErrors) +import Wire.MigrationLock +import Wire.Postgres +import Wire.Sem.Logger (mapLogger) +import Wire.Sem.Logger.TinyLog (loggerToTinyLog) +import Wire.TeamFeatureStore.Cassandra.Queries qualified as Cql +import Wire.TeamFeatureStore.Postgres.Queries qualified as Psql + +migrateAllTeamFeatures :: + ( Member (Input Hasql.Pool) r, + Member (Embed IO) r, + Member (Input ClientState) r, + Member TinyLog r, + Member (State Int) r, + Member Async r, + Member Race r + ) => + MigrationOptions -> + Prometheus.Counter -> + ConduitM () Void (Sem r) () +migrateAllTeamFeatures migOpts migCounter = do + lift $ info $ Log.msg (Log.val "migrateAllTeamFeatures ") + withCount (paginateSem Cql.selectAll (paramsP LocalQuorum () migOpts.pageSize) x5) + .| logRetrievedPage migOpts.pageSize id + .| C.mapM_ (traverse_ (\row@(tid, feat, _, _, _) -> handleErrors (toByteString' (idToText tid <> " - " <> feat)) (migrateTeamFeature migCounter row))) + +type EffectStack = + [ State Int, + Input ClientState, + Input Hasql.Pool, + Async, + Race, + TinyLog, + Embed IO, + Final IO + ] + +migrateTeamFeaturesLoop :: + MigrationOptions -> + ClientState -> + Hasql.Pool -> + Log.Logger -> + Prometheus.Counter -> + Prometheus.Counter -> + Prometheus.Counter -> + IO () +migrateTeamFeaturesLoop migOpts cassClient pgPool logger migCounter migFinished migFailed = + migrationLoop + logger + "team features" + migFinished + migFailed + (interpreter cassClient pgPool logger "team features") + (migrateAllTeamFeatures migOpts migCounter) + +interpreter :: ClientState -> Hasql.Pool -> Log.Logger -> ByteString -> Sem EffectStack a -> IO (Int, a) +interpreter cassClient pgPool logger name = + runFinal + . embedToFinal + . loggerToTinyLog logger + . mapLogger (Log.field "migration" name .) + . raiseUnder + . interpretRace + . asyncToIOFinal + . runInputConst pgPool + . runInputConst cassClient + . runState 0 + +migrateTeamFeature :: + ( PGConstraints r, + Member TinyLog r, + Member Async r, + Member (Error MigrationLockError) r, + Member Race r + ) => + Prometheus.Counter -> + (TeamId, Text, Maybe FeatureStatus, Maybe LockStatus, Maybe DbConfig) -> + Sem r () +migrateTeamFeature migCounter (tid, name, status, lockStatus, dbConfig) = do + -- We do not delete Cassandra rows during migration. Writes and migration use + -- the same per-row lock, so we avoid races without deleting early. Deletion is + -- deferred to keep rollback options and to remove the Cassandra table only after + -- a full cutover to Postgres-only. + void . withMigrationLocks LockExclusive (Seconds 10) [(tid, name)] $ do + isMigrated <- runStatement (tid, name) Psql.exists + unless isMigrated $ do + runStatement (tid, name, status, lockStatus, dbConfig) Psql.upsertPatch + liftIO $ Prometheus.incCounter migCounter + +handleErrors :: + ( Member (State Int) r, + Member TinyLog r + ) => + ByteString -> + (Sem (Error MigrationLockError : Error Hasql.UsageError : r) ()) -> + Sem r () +handleErrors key action = do + eithErr <- runError (runError action) + case eithErr of + Right (Right _) -> pure () + Right (Left e) -> do + warn $ + Log.msg (Log.val "error occurred during migration") + . Log.field "key" (show key) + . Log.field "error" (show e) + modify (+ 1) + Left e -> do + warn $ + Log.msg (Log.val "error occurred during migration") + . Log.field "key" (show key) + . Log.field "error" (show e) + modify (+ 1) diff --git a/libs/wire-subsystems/src/Wire/TeamFeatureStore/Postgres.hs b/libs/wire-subsystems/src/Wire/TeamFeatureStore/Postgres.hs new file mode 100644 index 00000000000..81b059cd333 --- /dev/null +++ b/libs/wire-subsystems/src/Wire/TeamFeatureStore/Postgres.hs @@ -0,0 +1,125 @@ +{-# LANGUAGE RecordWildCards #-} + +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2026 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Wire.TeamFeatureStore.Postgres (interpretTeamFeatureStoreToPostgres) where + +import Data.Constraint +import Data.Id +import Data.Map qualified as Map +import Data.Proxy +import Data.SOP (K (..), hcpure) +import Data.Vector qualified as Vector +import Imports +import Polysemy +import Wire.API.Team.Feature +import Wire.API.Team.Feature.TH +import Wire.Postgres +import Wire.TeamFeatureStore +import Wire.TeamFeatureStore.Postgres.Queries + +interpretTeamFeatureStoreToPostgres :: + (PGConstraints r) => + Sem (TeamFeatureStore ': r) a -> + Sem r a +interpretTeamFeatureStoreToPostgres = interpret $ \case + GetDbFeature sing tid -> do + getDbFeatureImpl sing tid + SetDbFeature sing tid feat -> do + setDbFeatureImpl sing tid feat + SetFeatureLockStatus sing tid lock -> do + setFeatureLockStatusImpl sing tid lock + GetAllDbFeatures tid -> do + getAllDbFeaturesImpl tid + PatchDbFeature sing tid feat -> do + patchDbFeatureImpl sing tid feat + +getDbFeatureImpl :: + forall cfg r. + (PGConstraints r) => + FeatureSingleton cfg -> + TeamId -> + Sem r (Maybe DbFeaturePatch) +getDbFeatureImpl sing tid = case featureSingIsFeature sing of + Dict -> do + mRow <- runStatement (tid, featureName @cfg) select + pure $ (\(status, lockStatus, config) -> LockableFeaturePatch {..}) <$> mRow + +setDbFeatureImpl :: + forall cfg r. + (PGConstraints r) => + FeatureSingleton cfg -> + TeamId -> + LockableFeature cfg -> + Sem r () +setDbFeatureImpl sing tid feat = + patchDbFeatureImpl + sing + tid + ( LockableFeaturePatch + { status = Just feat.status, + lockStatus = Just feat.lockStatus, + config = Just feat.config + } + ) + +patchDbFeatureImpl :: + forall cfg r. + (PGConstraints r) => + FeatureSingleton cfg -> + TeamId -> + LockableFeaturePatch cfg -> + Sem r () +patchDbFeatureImpl sing tid patch = case featureSingIsFeature sing of + Dict -> do + runStatement + ( tid, + featureName @cfg, + patch.status, + patch.lockStatus, + serialiseDbConfig <$> patch.config + ) + upsertPatch + +setFeatureLockStatusImpl :: + forall cfg r. + (PGConstraints r) => + FeatureSingleton cfg -> + TeamId -> + LockStatus -> + Sem r () +setFeatureLockStatusImpl sing tid lockStatus = case featureSingIsFeature sing of + Dict -> do + runStatement (tid, featureName @cfg, lockStatus) writeLockStatus + +getAllDbFeaturesImpl :: + (PGConstraints r) => + TeamId -> + Sem r AllDbFeaturePatches +getAllDbFeaturesImpl tid = do + rows <- runStatement tid selectAll + let m = Map.fromList $ do + (name, status, lockStatus, config) <- Vector.toList rows + pure (name, LockableFeaturePatch {..}) + pure $ mkAllDbFeaturePatches m + where + mkAllDbFeaturePatches :: Map Text DbFeaturePatch -> AllDbFeaturePatches + mkAllDbFeaturePatches m = hcpure (Proxy @IsFeatureConfig) $ get m + + get :: forall cfg. (IsFeatureConfig cfg) => Map Text DbFeaturePatch -> K (Maybe DbFeaturePatch) cfg + get m = K (Map.lookup (featureName @cfg) m) diff --git a/libs/wire-subsystems/src/Wire/TeamFeatureStore/Postgres/Queries.hs b/libs/wire-subsystems/src/Wire/TeamFeatureStore/Postgres/Queries.hs new file mode 100644 index 00000000000..0e778f2a20f --- /dev/null +++ b/libs/wire-subsystems/src/Wire/TeamFeatureStore/Postgres/Queries.hs @@ -0,0 +1,77 @@ +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2026 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Wire.TeamFeatureStore.Postgres.Queries where + +import Data.Id +import Data.Vector (Vector) +import Hasql.Statement qualified as Hasql +import Hasql.TH +import Imports +import Wire.API.PostgresMarshall +import Wire.API.Team.Feature + +select :: Hasql.Statement (TeamId, Text) (Maybe (Maybe FeatureStatus, Maybe LockStatus, Maybe DbConfig)) +select = + dimapPG + [maybeStatement|SELECT + status :: int?, + lock_status :: int?, + config :: jsonb? + FROM team_features + WHERE team = ($1 :: uuid) AND feature = ($2 :: text) + |] + +exists :: Hasql.Statement (TeamId, Text) Bool +exists = + dimapPG + [singletonStatement|SELECT EXISTS ( + SELECT 1 + FROM team_features + WHERE team = ($1 :: uuid) AND feature = ($2 :: text) + ) :: bool|] + +upsertPatch :: Hasql.Statement (TeamId, Text, Maybe FeatureStatus, Maybe LockStatus, Maybe DbConfig) () +upsertPatch = + lmapPG + [resultlessStatement|INSERT INTO team_features (team, feature, status, lock_status, config) + VALUES ($1 :: uuid, $2 :: text, $3 :: int?, $4 :: int?, $5 :: jsonb?) + ON CONFLICT (team, feature) DO UPDATE + SET status = COALESCE(EXCLUDED.status, team_features.status), + lock_status = COALESCE(EXCLUDED.lock_status, team_features.lock_status), + config = COALESCE(EXCLUDED.config, team_features.config) + |] + +writeLockStatus :: Hasql.Statement (TeamId, Text, LockStatus) () +writeLockStatus = + lmapPG + [resultlessStatement|INSERT INTO team_features (team, feature, lock_status) + VALUES ($1 :: uuid, $2 :: text, $3 :: int) + ON CONFLICT (team, feature) DO UPDATE + SET lock_status = EXCLUDED.lock_status + |] + +selectAll :: Hasql.Statement TeamId (Vector (Text, Maybe FeatureStatus, Maybe LockStatus, Maybe DbConfig)) +selectAll = + dimapPG + [vectorStatement|SELECT (feature :: text), + (status :: int?), + (lock_status :: int?), + (config :: jsonb?) + FROM team_features + WHERE team = ($1 :: uuid) + |] diff --git a/libs/wire-subsystems/wire-subsystems.cabal b/libs/wire-subsystems/wire-subsystems.cabal index 000f00229ed..d77d2512a12 100644 --- a/libs/wire-subsystems/wire-subsystems.cabal +++ b/libs/wire-subsystems/wire-subsystems.cabal @@ -307,6 +307,7 @@ library Wire.PasswordStore Wire.PasswordStore.Cassandra Wire.Postgres + Wire.PostgresMigrationOpts Wire.PostgresMigrations Wire.PropertyStore Wire.PropertyStore.Cassandra @@ -334,6 +335,12 @@ library Wire.TeamCollaboratorsSubsystem.Interpreter Wire.TeamFeatureStore Wire.TeamFeatureStore.Cassandra + Wire.TeamFeatureStore.Cassandra.Queries + Wire.TeamFeatureStore.Error + Wire.TeamFeatureStore.Migrating + Wire.TeamFeatureStore.Migration + Wire.TeamFeatureStore.Postgres + Wire.TeamFeatureStore.Postgres.Queries Wire.TeamInvitationSubsystem Wire.TeamInvitationSubsystem.Error Wire.TeamInvitationSubsystem.Interpreter diff --git a/postgres-schema.sql b/postgres-schema.sql index a0f4b619b33..fe339796c87 100644 --- a/postgres-schema.sql +++ b/postgres-schema.sql @@ -242,6 +242,21 @@ CREATE TABLE public.subconversation ( ALTER TABLE public.subconversation OWNER TO "wire-server"; +-- +-- Name: team_features; Type: TABLE; Schema: public; Owner: wire-server +-- + +CREATE TABLE public.team_features ( + team uuid NOT NULL, + feature text NOT NULL, + config jsonb, + lock_status integer, + status integer +); + + +ALTER TABLE public.team_features OWNER TO "wire-server"; + -- -- Name: user_group; Type: TABLE; Schema: public; Owner: wire-server -- @@ -369,6 +384,14 @@ ALTER TABLE ONLY public.subconversation ADD CONSTRAINT subconversation_pkey PRIMARY KEY (conv_id, subconv_id); +-- +-- Name: team_features team_features_pkey; Type: CONSTRAINT; Schema: public; Owner: wire-server +-- + +ALTER TABLE ONLY public.team_features + ADD CONSTRAINT team_features_pkey PRIMARY KEY (team, feature); + + -- -- Name: user_group_channel user_group_channel_pkey; Type: CONSTRAINT; Schema: public; Owner: wire-server -- diff --git a/services/background-worker/background-worker.integration.yaml b/services/background-worker/background-worker.integration.yaml index db49fb502d5..11090aab961 100644 --- a/services/background-worker/background-worker.integration.yaml +++ b/services/background-worker/background-worker.integration.yaml @@ -69,6 +69,7 @@ migrateConversationsOptions: pageSize: 10000 parallelism: 2 migrateConversationCodes: false +migrateTeamFeatures: false # Background jobs consumer configuration for integration backgroundJobs: @@ -79,3 +80,4 @@ backgroundJobs: postgresMigration: conversation: postgresql conversationCodes: postgresql + teamFeatures: postgresql diff --git a/services/background-worker/src/Wire/BackgroundWorker.hs b/services/background-worker/src/Wire/BackgroundWorker.hs index ab20dd7c8be..c5e16331ede 100644 --- a/services/background-worker/src/Wire/BackgroundWorker.hs +++ b/services/background-worker/src/Wire/BackgroundWorker.hs @@ -63,18 +63,27 @@ run opts = do withNamedLogger "migrate-conversation-codes" $ Migrations.conversationCodes (MigrationOptions 1000 1) else pure $ pure () + cleanupTeamFeaturesMigration <- + if opts.migrateTeamFeatures + then + runAppT env $ + withNamedLogger "migrate-team-features" $ + Migrations.teamFeatures (MigrationOptions 1000 1) + else pure $ pure () cleanupJobs <- runAppT env $ withNamedLogger "background-job-consumer" $ Jobs.startWorker amqpEP let cleanup = - void . runConcurrently $ - (,,,,) - <$> Concurrently cleanupDeadUserNotifWatcher - <*> Concurrently cleanupBackendNotifPusher - <*> Concurrently cleanupConvMigration - <*> Concurrently cleanUpConvCodesMigration - <*> Concurrently cleanupJobs + void $ + runConcurrently $ + (,,,,,) + <$> Concurrently cleanupDeadUserNotifWatcher + <*> Concurrently cleanupBackendNotifPusher + <*> Concurrently cleanupConvMigration + <*> Concurrently cleanUpConvCodesMigration + <*> Concurrently cleanupTeamFeaturesMigration + <*> Concurrently cleanupJobs let server = defaultServer (T.unpack opts.backgroundWorker.host) opts.backgroundWorker.port env.logger let settings = newSettings server diff --git a/services/background-worker/src/Wire/BackgroundWorker/Env.hs b/services/background-worker/src/Wire/BackgroundWorker/Env.hs index 11787f105c6..bccd415be19 100644 --- a/services/background-worker/src/Wire/BackgroundWorker/Env.hs +++ b/services/background-worker/src/Wire/BackgroundWorker/Env.hs @@ -44,7 +44,7 @@ import System.Logger.Class (Logger, MonadLogger (..)) import System.Logger.Extended qualified as Log import Util.Options import Wire.BackgroundWorker.Options -import Wire.ConversationStore (PostgresMigrationOpts) +import Wire.PostgresMigrationOpts type IsWorking = Bool diff --git a/services/background-worker/src/Wire/BackgroundWorker/Jobs/Registry.hs b/services/background-worker/src/Wire/BackgroundWorker/Jobs/Registry.hs index 1c9b3416bd5..b537e1b6c24 100644 --- a/services/background-worker/src/Wire/BackgroundWorker/Jobs/Registry.hs +++ b/services/background-worker/src/Wire/BackgroundWorker/Jobs/Registry.hs @@ -40,7 +40,6 @@ import Wire.BackgroundJobsRunner (runJob) import Wire.BackgroundJobsRunner.Interpreter hiding (runJob) import Wire.BackgroundWorker.Env (AppT, Env (..)) import Wire.BrigAPIAccess.Rpc -import Wire.ConversationStore import Wire.ConversationStore.Cassandra import Wire.ConversationStore.Postgres (interpretConversationStoreToPostgres) import Wire.ConversationSubsystem.Interpreter (interpretConversationSubsystem) @@ -49,6 +48,7 @@ import Wire.FireAndForget (interpretFireAndForget) import Wire.GundeckAPIAccess import Wire.NotificationSubsystem.Interpreter import Wire.ParseException +import Wire.PostgresMigrationOpts import Wire.Rpc import Wire.Sem.Delay (runDelay) import Wire.Sem.Logger (mapLogger) diff --git a/services/background-worker/src/Wire/BackgroundWorker/Options.hs b/services/background-worker/src/Wire/BackgroundWorker/Options.hs index 6dc18f03a2b..217d992935f 100644 --- a/services/background-worker/src/Wire/BackgroundWorker/Options.hs +++ b/services/background-worker/src/Wire/BackgroundWorker/Options.hs @@ -27,8 +27,8 @@ import Imports import Network.AMQP.Extended import System.Logger.Extended import Util.Options -import Wire.ConversationStore (PostgresMigrationOpts) -import Wire.Migration (MigrationOptions) +import Wire.Migration +import Wire.PostgresMigrationOpts data Opts = Opts { logLevel :: !Level, @@ -53,6 +53,7 @@ data Opts = Opts migrateConversations :: !Bool, migrateConversationsOptions :: !MigrationOptions, migrateConversationCodes :: !Bool, + migrateTeamFeatures :: !Bool, backgroundJobs :: BackgroundJobsConfig, federationDomain :: Domain } diff --git a/services/background-worker/src/Wire/PostgresMigrations.hs b/services/background-worker/src/Wire/PostgresMigrations.hs index a4c1cbff2d1..541716d0aec 100644 --- a/services/background-worker/src/Wire/PostgresMigrations.hs +++ b/services/background-worker/src/Wire/PostgresMigrations.hs @@ -25,6 +25,8 @@ import Wire.BackgroundWorker.Env import Wire.BackgroundWorker.Util import Wire.CodeStore.Migration import Wire.ConversationStore.Migration +import Wire.Migration (MigrationOptions) +import Wire.TeamFeatureStore.Migration conversations :: MigrationOptions -> AppT IO CleanupAction conversations migOpts = do @@ -65,3 +67,20 @@ conversationCodes migOpts = do pure $ do Log.info logger $ Log.msg (Log.val "cancelling conversation codes migration") cancel migrationLoop + +teamFeatures :: MigrationOptions -> AppT IO CleanupAction +teamFeatures migOpts = do + cassClient <- asks (.cassandraGalley) + pgPool <- asks (.hasqlPool) + logger <- asks (.logger) + Log.info logger $ Log.msg (Log.val "starting team features migration") + count <- register $ counter $ Prometheus.Info "wire_team_features_migrated_to_pg" "Number of team features migrated to Postgresql" + finished <- register $ counter $ Prometheus.Info "wire_team_features_migration_finished" "Whether the team features migration to Postgresql is finished successfully" + failed <- register $ counter $ Prometheus.Info "wire_team_features_migration_failed" "Whether the team features migration to Postgresql has failed" + + migrationLoop <- async . lift $ migrateTeamFeaturesLoop migOpts cassClient pgPool logger count finished failed + + Log.info logger $ Log.msg (Log.val "started team features migration") + pure $ do + Log.info logger $ Log.msg (Log.val "cancelling team features migration") + cancel migrationLoop diff --git a/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs b/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs index e10ab43123d..01d658122e6 100644 --- a/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs +++ b/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs @@ -69,7 +69,7 @@ import Wire.BackendNotificationPusher import Wire.BackgroundWorker.Env import Wire.BackgroundWorker.Options import Wire.BackgroundWorker.Util -import Wire.ConversationStore +import Wire.PostgresMigrationOpts spec :: Spec spec = do @@ -364,7 +364,8 @@ spec = do postgresMigration = PostgresMigrationOpts { conversation = CassandraStorage, - conversationCodes = CassandraStorage + conversationCodes = CassandraStorage, + teamFeatures = CassandraStorage } gundeckEndpoint = undefined brigEndpoint = undefined @@ -402,7 +403,8 @@ spec = do postgresMigration = PostgresMigrationOpts { conversation = CassandraStorage, - conversationCodes = CassandraStorage + conversationCodes = CassandraStorage, + teamFeatures = CassandraStorage } gundeckEndpoint = undefined brigEndpoint = undefined diff --git a/services/background-worker/test/Test/Wire/Util.hs b/services/background-worker/test/Test/Wire/Util.hs index cdb020a2223..ef2d18321c4 100644 --- a/services/background-worker/test/Test/Wire/Util.hs +++ b/services/background-worker/test/Test/Wire/Util.hs @@ -30,7 +30,7 @@ import Util.Options (Endpoint (..)) import Wire.BackgroundWorker.Env hiding (federatorInternal) import Wire.BackgroundWorker.Env qualified as E import Wire.BackgroundWorker.Options -import Wire.ConversationStore +import Wire.PostgresMigrationOpts testEnv :: IO Env testEnv = do @@ -42,7 +42,8 @@ testEnv = do postgresMigration = PostgresMigrationOpts { conversation = CassandraStorage, - conversationCodes = CassandraStorage + conversationCodes = CassandraStorage, + teamFeatures = CassandraStorage } statuses <- newIORef mempty backendNotificationMetrics <- mkBackendNotificationMetrics diff --git a/services/galley/galley.integration.yaml b/services/galley/galley.integration.yaml index e2106c63e67..1f19ccbee04 100644 --- a/services/galley/galley.integration.yaml +++ b/services/galley/galley.integration.yaml @@ -248,3 +248,4 @@ journal: # if set, journals; if not set, disables journaling postgresMigration: conversation: postgresql conversationCodes: postgresql + teamFeatures: postgresql diff --git a/services/galley/src/Galley/App.hs b/services/galley/src/Galley/App.hs index 9f7798f85d4..701d0b72f82 100644 --- a/services/galley/src/Galley/App.hs +++ b/services/galley/src/Galley/App.hs @@ -123,6 +123,7 @@ import Wire.GundeckAPIAccess (runGundeckAPIAccess) import Wire.HashPassword.Interpreter import Wire.LegalHoldStore.Cassandra (interpretLegalHoldStoreToCassandra) import Wire.LegalHoldStore.Env (LegalHoldEnv (..)) +import Wire.MigrationLock import Wire.NotificationSubsystem.Interpreter (runNotificationSubsystemGundeck) import Wire.ParseException import Wire.ProposalStore.Cassandra @@ -139,6 +140,9 @@ import Wire.SparAPIAccess.Rpc import Wire.TeamCollaboratorsStore.Postgres (interpretTeamCollaboratorsStoreToPostgres) import Wire.TeamCollaboratorsSubsystem.Interpreter import Wire.TeamFeatureStore.Cassandra +import Wire.TeamFeatureStore.Error (TeamFeatureStoreError (..)) +import Wire.TeamFeatureStore.Migrating +import Wire.TeamFeatureStore.Postgres import Wire.TeamJournal.Aws import Wire.TeamStore.Cassandra (interpretTeamStoreToCassandra) import Wire.TeamSubsystem.Interpreter @@ -150,6 +154,7 @@ type GalleyEffects0 = Input Hasql.Pool, Input Env, Input ConversationSubsystemConfig, + Error MigrationLockError, Error TeamFeatureStoreError, Error MigrationError, Error InvalidInput, @@ -298,6 +303,11 @@ evalGalley e = CassandraStorage -> interpretCodeStoreToCassandra MigrationToPostgresql -> interpretCodeStoreToCassandraAndPostgres PostgresqlStorage -> interpretCodeStoreToPostgres + teamFeatureStoreInterpreter = + case (e ^. options . postgresMigration).teamFeatures of + CassandraStorage -> interpretTeamFeatureStoreToCassandra + MigrationToPostgresql -> interpretTeamFeatureStoreToCassandraAndPostgres + PostgresqlStorage -> interpretTeamFeatureStoreToPostgres localUnit = toLocalUnsafe (e ^. options . settings . federationDomain) () teamSubsystemConfig = TeamSubsystemConfig @@ -348,6 +358,7 @@ evalGalley e = . mapError toResponse . logAndMapError toResponse (Text.pack . show) "migration error" . mapError mapTeamFeatureStoreError + . mapError toResponse . runInputConst conversationSubsystemConfig . runInputConst e . runInputConst (e ^. hasqlPool) @@ -369,7 +380,7 @@ evalGalley e = . interpretTeamListToCassandra . interpretTeamMemberStoreToCassandraWithPaging lh . interpretTeamMemberStoreToCassandra lh - . interpretTeamFeatureStoreToCassandra + . teamFeatureStoreInterpreter . interpretMLSCommitLockStoreToCassandra (e ^. cstate) . convStoreInterpreter . interpretTeamNotificationStoreToCassandra diff --git a/services/galley/src/Galley/Options.hs b/services/galley/src/Galley/Options.hs index a435707a7e6..0ccfa71b9a6 100644 --- a/services/galley/src/Galley/Options.hs +++ b/services/galley/src/Galley/Options.hs @@ -85,7 +85,7 @@ import Util.Options.Common import Wire.API.Conversation.Protocol import Wire.API.Routes.Version import Wire.API.Team.Member -import Wire.ConversationStore +import Wire.PostgresMigrationOpts import Wire.RateLimit.Interpreter (RateLimitConfig) newtype GuestLinkTTLSeconds = GuestLinkTTLSeconds