diff --git a/libs/wire-subsystems/default.nix b/libs/wire-subsystems/default.nix index cf381ee0da6..f34fcff1924 100644 --- a/libs/wire-subsystems/default.nix +++ b/libs/wire-subsystems/default.nix @@ -7,6 +7,7 @@ , aeson-pretty , amazonka , amazonka-core +, amazonka-dynamodb , amazonka-ses , amazonka-sqs , amqp @@ -68,6 +69,7 @@ , memory , mime , mime-mail +, MonadRandom , network , network-conduit-tls , network-uri @@ -135,6 +137,7 @@ mkDerivation { aeson-pretty amazonka amazonka-core + amazonka-dynamodb amazonka-ses amazonka-sqs amqp @@ -193,6 +196,7 @@ mkDerivation { memory mime mime-mail + MonadRandom network network-conduit-tls network-uri @@ -252,6 +256,7 @@ mkDerivation { aeson-pretty amazonka amazonka-core + amazonka-dynamodb amazonka-ses amazonka-sqs amqp @@ -308,6 +313,7 @@ mkDerivation { memory mime mime-mail + MonadRandom network network-conduit-tls network-uri diff --git a/libs/wire-subsystems/src/Wire/ClientStore.hs b/libs/wire-subsystems/src/Wire/ClientStore.hs new file mode 100644 index 00000000000..6a48f6a9434 --- /dev/null +++ b/libs/wire-subsystems/src/Wire/ClientStore.hs @@ -0,0 +1,39 @@ +{-# LANGUAGE TemplateHaskell #-} + +module Wire.ClientStore where + +import Data.Id +import Data.Json.Util (UTCTimeMillis) +import Data.Time +import Imports +import Polysemy +import Wire.API.MLS.CipherSuite +import Wire.API.User.Client +import Wire.API.User.Client.Prekey +import Wire.API.UserMap + +data DuplicateMLSPublicKey = DuplicateMLSPublicKey + +data ClientStore m a where + -- Lifecycle + Upsert :: UserId -> ClientId -> UTCTimeMillis -> NewClient -> ClientStore m (Maybe DuplicateMLSPublicKey) + Delete :: UserId -> ClientId -> ClientStore m () + UpdateLabel :: UserId -> ClientId -> Maybe Text -> ClientStore m () + UpdateCapabilities :: UserId -> ClientId -> Maybe ClientCapabilityList -> ClientStore m () + UpdateLastActive :: UserId -> ClientId -> UTCTime -> ClientStore m () + -- Lookups + LookupClient :: UserId -> ClientId -> ClientStore m (Maybe Client) + LookupClients :: UserId -> ClientStore m [Client] + LookupClientIds :: UserId -> ClientStore m [ClientId] + LookupClientIdsBulk :: [UserId] -> ClientStore m UserClients + LookupClientsBulk :: [UserId] -> ClientStore m (UserMap (Set Client)) + LookupPubClientsBulk :: [UserId] -> ClientStore m (UserMap (Set PubClient)) + LookupPrekeyIds :: UserId -> ClientId -> ClientStore m [PrekeyId] + -- Proteus + UpdatePrekeys :: UserId -> ClientId -> [UncheckedPrekeyBundle] -> ClientStore m () + ClaimPrekey :: UserId -> ClientId -> ClientStore m (Maybe ClientPrekey) + -- MLS + AddMLSPublicKeys :: UserId -> ClientId -> [(SignatureSchemeTag, ByteString)] -> ClientStore m (Maybe DuplicateMLSPublicKey) + LookupMLSPublicKey :: UserId -> ClientId -> SignatureSchemeTag -> ClientStore m (Maybe LByteString) + +makeSem ''ClientStore diff --git a/libs/wire-subsystems/src/Wire/ClientStore/Cassandra.hs b/libs/wire-subsystems/src/Wire/ClientStore/Cassandra.hs new file mode 100644 index 00000000000..1806dcd8692 --- /dev/null +++ b/libs/wire-subsystems/src/Wire/ClientStore/Cassandra.hs @@ -0,0 +1,358 @@ +{-# OPTIONS_GHC -Wwarn #-} + +module Wire.ClientStore.Cassandra (ClientStoreCassandraEnv (..), interpretClientStoreCassandra) where + +import Cassandra as C hiding (Client) +import Cassandra qualified as C +import Cassandra.Settings as C hiding (Client) +import Control.Error (atMay) +import Control.Monad.Random (randomRIO) +import Data.ByteString.Lazy qualified as LBS +import Data.Id +import Data.Json.Util +import Data.Map qualified as Map +import Data.Set qualified as Set +import Data.Time +import Imports +import Polysemy +import Polysemy.Embed +import Polysemy.Error +import Polysemy.Input +import Polysemy.TinyLog (TinyLog) +import UnliftIO (pooledMapConcurrentlyN) +import Wire.API.MLS.CipherSuite +import Wire.API.User.Auth +import Wire.API.User.Client +import Wire.API.User.Client.Prekey +import Wire.API.UserMap +import Wire.ClientStore (ClientStore (..), DuplicateMLSPublicKey (..)) +import Wire.ClientStore.DynamoDB +import Wire.Sem.Metrics (Metrics) + +data ClientStoreCassandraEnv = ClientStoreCassandraEnv + { casClient :: ClientState, + prekeyLocking :: Either (MVar ()) OptimisticLockEnv + } + +interpretClientStoreCassandra :: + ( Member TinyLog r, + Member (Final IO) r, + Member Metrics r + ) => + ClientStoreCassandraEnv -> InterpreterFor ClientStore r +interpretClientStoreCassandra env = + interpret $ + runInputConst env . \case + -- Lifecycle + Upsert uid cid timestamp nc -> upsertImpl uid cid timestamp nc + Delete uid cid -> deleteImpl uid cid + UpdateLabel uid cid lbl -> runCasClient $ updateLabelImpl uid cid lbl + UpdateCapabilities uid cid caps -> runCasClient $ updateCapabilitiesImpl uid cid caps + UpdateLastActive uid cid timestamp -> runCasClient $ updateLastActiveImpl uid cid timestamp + -- Lookups + LookupClient uid cid -> runCasClient $ lookupClientImpl uid cid + LookupClients uid -> runCasClient $ lookupClientsImpl uid + LookupClientIds uid -> runCasClient $ lookupClientIdsImpl uid + LookupClientIdsBulk uids -> runCasClient $ lookupClientIdsBulkImpl uids + LookupClientsBulk uids -> runCasClient $ lookupClientsBulkImpl uids + LookupPubClientsBulk uids -> runCasClient $ lookupPubClientsBulkImpl uids + LookupPrekeyIds uid cid -> runCasClient $ lookupPrekeyIdsImpl uid cid + -- Proteus + UpdatePrekeys uid cid prekeys -> runCasClient $ updatePrekeysImpl uid cid prekeys + ClaimPrekey uid cid -> claimPrekeyImpl uid cid + -- MLS + AddMLSPublicKeys uid cid keys -> addMLSPublicKeysImpl uid cid keys + LookupMLSPublicKey uid cid scheme -> runCasClient $ lookupMLSPublicKeyImpl uid cid scheme + +runCasClient :: (Member (Input ClientStoreCassandraEnv) r, Member (Final IO) r) => C.Client a -> Sem r a +runCasClient action = do + c <- inputs (.casClient) + embedToFinal . runEmbedded (C.runClient c) . embed $ action + +upsertImpl :: (Member (Input ClientStoreCassandraEnv) r, Member (Final IO) r) => UserId -> ClientId -> UTCTimeMillis -> NewClient -> Sem r (Maybe DuplicateMLSPublicKey) +upsertImpl uid newId now c = do + let keys = unpackLastPrekey (newClientLastKey c) : newClientPrekeys c + runCasClient $ do + updatePrekeysImpl uid newId keys + let prm = (uid, newId, now, newClientType c, newClientLabel c, newClientClass c, newClientCookie c, newClientModel c, C.Set . Set.toList . fromClientCapabilityList <$> newClientCapabilities c) + retry x5 $ write insertClient (params LocalQuorum prm) + addMLSPublicKeysImpl uid newId (Map.assocs (newClientMLSPublicKeys c)) + +lookupClientImpl :: (MonadClient m) => UserId -> ClientId -> m (Maybe Client) +lookupClientImpl u c = do + keys <- retry x1 (query selectMLSPublicKeys (params LocalQuorum (u, c))) + fmap (toClient keys) + <$> retry x1 (query1 selectClient (params LocalQuorum (u, c))) + +lookupClientsBulkImpl :: (MonadClient m) => [UserId] -> m (UserMap (Imports.Set Client)) +lookupClientsBulkImpl uids = liftClient $ do + userClientTuples <- pooledMapConcurrentlyN 50 getClientSetWithUser uids + pure . UserMap $ Map.fromList userClientTuples + where + getClientSetWithUser :: (MonadClient m) => UserId -> m (UserId, Imports.Set Client) + getClientSetWithUser u = fmap ((u,) . Set.fromList) . lookupClientsImpl $ u + +lookupPubClientsBulkImpl :: (MonadClient m) => [UserId] -> m (UserMap (Imports.Set PubClient)) +lookupPubClientsBulkImpl uids = liftClient $ do + userClientTuples <- pooledMapConcurrentlyN 50 getClientSetWithUser uids + pure . UserMap $ Map.fromList userClientTuples + where + getClientSetWithUser :: (MonadClient m) => UserId -> m (UserId, Imports.Set PubClient) + getClientSetWithUser u = (u,) . Set.fromList . map toPubClient <$> executeQuery u + + executeQuery :: (MonadClient m) => UserId -> m [(ClientId, Maybe ClientClass)] + executeQuery u = retry x1 (query selectPubClients (params LocalQuorum (Identity u))) + +lookupClientsImpl :: (MonadClient m) => UserId -> m [Client] +lookupClientsImpl u = do + keys <- + (\(cid, ss, Blob b) -> (cid, [(ss, LBS.toStrict b)])) + <$$> retry x1 (query selectMLSPublicKeysByUser (params LocalQuorum (Identity u))) + let keyMap = Map.fromListWith (<>) keys + updateKeys c = + c + { clientMLSPublicKeys = + Map.fromList $ Map.findWithDefault [] c.clientId keyMap + } + updateKeys . toClient [] + <$$> retry x1 (query selectClients (params LocalQuorum (Identity u))) + +lookupClientIdsImpl :: (MonadClient m) => UserId -> m [ClientId] +lookupClientIdsImpl u = + map runIdentity + <$> retry x1 (query selectClientIds (params LocalQuorum (Identity u))) + +lookupClientIdsBulkImpl :: (MonadClient m) => [UserId] -> m UserClients +lookupClientIdsBulkImpl us = + UserClients . Map.fromList <$> (liftClient $ pooledMapConcurrentlyN 16 getClientIds us) + where + getClientIds u = (u,) <$> fmap Set.fromList (lookupClientIdsImpl u) + +lookupPrekeyIdsImpl :: (MonadClient m) => UserId -> ClientId -> m [PrekeyId] +lookupPrekeyIdsImpl u c = + map runIdentity + <$> retry x1 (query selectPrekeyIds (params LocalQuorum (u, c))) + +deleteImpl :: + (Member (Input ClientStoreCassandraEnv) r, Member (Final IO) r) => + UserId -> + ClientId -> + Sem r () +deleteImpl u c = do + runCasClient $ do + retry x5 $ write removeClient (params LocalQuorum (u, c)) + retry x5 $ write removeClientKeys (params LocalQuorum (u, c)) + inputs (.prekeyLocking) >>= \case + Left _ -> pure () + Right optLockEnv -> + embedToFinal . runInputConst optLockEnv $ deleteOptLock u c + +-- todo "call deleteOptLock" + +updateLabelImpl :: (MonadClient m) => UserId -> ClientId -> Maybe Text -> m () +updateLabelImpl u c l = retry x5 $ write updateClientLabelQuery (params LocalQuorum (l, u, c)) + +updateCapabilitiesImpl :: (MonadClient m) => UserId -> ClientId -> Maybe ClientCapabilityList -> m () +updateCapabilitiesImpl u c fs = retry x5 $ write updateClientCapabilitiesQuery (params LocalQuorum (C.Set . Set.toList . fromClientCapabilityList <$> fs, u, c)) + +-- | If the update fails, which can happen if device does not exist, then ignore the error silently. +updateLastActiveImpl :: (MonadClient m) => UserId -> ClientId -> UTCTime -> m () +updateLastActiveImpl u c t = + void . retry x5 $ + trans + updateClientLastActiveQuery + (params LocalQuorum (t, u, c)) + +-- TODO: Add check to upstream callers of this function +updatePrekeysImpl :: (MonadClient m) => UserId -> ClientId -> [UncheckedPrekeyBundle] -> m () +updatePrekeysImpl u c pks = do + for_ pks $ \k -> do + let args = (u, c, prekeyId k, prekeyKey k) + retry x5 $ write insertClientKey (params LocalQuorum args) + +-- claimPrekeyImpl :: UserId -> ClientId -> m (Maybe ClientPrekey) +-- claimPrekeyImpl = todo "implement ClaimPrekey" + +claimPrekeyImpl :: + forall r. + ( Member (Final IO) r, + Member (Input ClientStoreCassandraEnv) r, + Member Metrics r, + Member TinyLog r + ) => + UserId -> + ClientId -> + Sem r (Maybe ClientPrekey) +claimPrekeyImpl u c = do + cas <- inputs (.casClient) + inputs (.prekeyLocking) >>= \case + -- Use random prekey selection strategy + Left localLock -> embedFinal $ withLocalLock localLock $ do + prekeys <- C.runClient cas $ retry x1 $ query userPrekeys (params LocalQuorum (u, c)) + prekey <- pickRandomPrekey prekeys + C.runClient cas $ traverse removeAndReturnPreKey prekey + -- Use DynamoDB based optimistic locking strategy + Right optLockEnv -> runInputConst optLockEnv . withOptLock u c . embedFinal . C.runClient cas $ do + prekey <- retry x1 $ query1 userPrekey (params LocalQuorum (u, c)) + traverse removeAndReturnPreKey prekey + where + removeAndReturnPreKey :: (PrekeyId, Text) -> C.Client ClientPrekey + removeAndReturnPreKey (i, k) = do + if i /= lastPrekeyId + then retry x1 $ write removePrekey (params LocalQuorum (u, c, i)) + else pure () + -- Log.debug $ + -- field "user" (toByteString u) + -- . field "client" (toByteString c) + -- . msg (val "last resort prekey used") + pure $ ClientPrekey c (UncheckedPrekeyBundle i k) + + pickRandomPrekey :: [(PrekeyId, Text)] -> IO (Maybe (PrekeyId, Text)) + pickRandomPrekey [] = pure Nothing + -- unless we only have one key left + pickRandomPrekey [pk] = pure $ Just pk + -- pick among list of keys, except lastPrekeyId + pickRandomPrekey pks = do + let pks' = filter (\k -> fst k /= lastPrekeyId) pks + ind <- randomRIO (0, length pks' - 1) + pure $ atMay pks' ind + +lookupMLSPublicKeyImpl :: + (MonadClient m) => + UserId -> + ClientId -> + SignatureSchemeTag -> + m (Maybe LByteString) +lookupMLSPublicKeyImpl u c ss = + (fromBlob . runIdentity) <$$> retry x1 (query1 selectMLSPublicKey (params LocalQuorum (u, c, ss))) + +addMLSPublicKeysImpl :: + (Member (Input ClientStoreCassandraEnv) r, Member (Final IO) r) => + UserId -> + ClientId -> + [(SignatureSchemeTag, ByteString)] -> + Sem r (Maybe DuplicateMLSPublicKey) +addMLSPublicKeysImpl u c keys = + runError (traverse_ (uncurry (addMLSPublicKey u c)) keys) >>= \case + Left e -> pure $ Just e + Right () -> pure Nothing + +-- TODO: Add checks to callers of this +addMLSPublicKey :: + (Member (Input ClientStoreCassandraEnv) r, Member (Final IO) r, Member (Error DuplicateMLSPublicKey) r) => + UserId -> + ClientId -> + SignatureSchemeTag -> + ByteString -> + Sem r () +addMLSPublicKey u c ss pk = do + rows <- + runCasClient $ + trans + insertMLSPublicKeys + ( params + LocalQuorum + (u, c, ss, Blob (LBS.fromStrict pk)) + ) + { serialConsistency = Just LocalSerialConsistency + } + case rows of + [row] + | C.fromRow 0 row /= Right (Just True) -> + throw DuplicateMLSPublicKey + _ -> pure () + +------------------------------------------------------------------------------- +-- Queries + +insertClient :: PrepQuery W (UserId, ClientId, UTCTimeMillis, ClientType, Maybe Text, Maybe ClientClass, Maybe CookieLabel, Maybe Text, Maybe (C.Set ClientCapability)) () +insertClient = "INSERT INTO clients (user, client, tstamp, type, label, class, cookie, model, capabilities) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)" + +updateClientLabelQuery :: PrepQuery W (Maybe Text, UserId, ClientId) () +updateClientLabelQuery = {- `IF EXISTS`, but that requires benchmarking -} "UPDATE clients SET label = ? WHERE user = ? AND client = ?" + +updateClientCapabilitiesQuery :: PrepQuery W (Maybe (C.Set ClientCapability), UserId, ClientId) () +updateClientCapabilitiesQuery = {- `IF EXISTS`, but that requires benchmarking -} "UPDATE clients SET capabilities = ? WHERE user = ? AND client = ?" + +updateClientLastActiveQuery :: PrepQuery W (UTCTime, UserId, ClientId) Row +updateClientLastActiveQuery = "UPDATE clients SET last_active = ? WHERE user = ? AND client = ? IF EXISTS" + +selectClientIds :: PrepQuery R (Identity UserId) (Identity ClientId) +selectClientIds = "SELECT client from clients where user = ?" + +selectClients :: PrepQuery R (Identity UserId) (ClientId, ClientType, UTCTimeMillis, Maybe Text, Maybe ClientClass, Maybe CookieLabel, Maybe Text, Maybe (C.Set ClientCapability), Maybe UTCTime) +selectClients = "SELECT client, type, tstamp, label, class, cookie, model, capabilities, last_active from clients where user = ?" + +selectPubClients :: PrepQuery R (Identity UserId) (ClientId, Maybe ClientClass) +selectPubClients = "SELECT client, class from clients where user = ?" + +selectClient :: PrepQuery R (UserId, ClientId) (ClientId, ClientType, UTCTimeMillis, Maybe Text, Maybe ClientClass, Maybe CookieLabel, Maybe Text, Maybe (C.Set ClientCapability), Maybe UTCTime) +selectClient = "SELECT client, type, tstamp, label, class, cookie, model, capabilities, last_active from clients where user = ? and client = ?" + +insertClientKey :: PrepQuery W (UserId, ClientId, PrekeyId, Text) () +insertClientKey = "INSERT INTO prekeys (user, client, key, data) VALUES (?, ?, ?, ?)" + +removeClient :: PrepQuery W (UserId, ClientId) () +removeClient = "DELETE FROM clients where user = ? and client = ?" + +removeClientKeys :: PrepQuery W (UserId, ClientId) () +removeClientKeys = "DELETE FROM prekeys where user = ? and client = ?" + +userPrekey :: PrepQuery R (UserId, ClientId) (PrekeyId, Text) +userPrekey = "SELECT key, data FROM prekeys where user = ? and client = ? LIMIT 1" + +userPrekeys :: PrepQuery R (UserId, ClientId) (PrekeyId, Text) +userPrekeys = "SELECT key, data FROM prekeys where user = ? and client = ?" + +selectPrekeyIds :: PrepQuery R (UserId, ClientId) (Identity PrekeyId) +selectPrekeyIds = "SELECT key FROM prekeys where user = ? and client = ?" + +removePrekey :: PrepQuery W (UserId, ClientId, PrekeyId) () +removePrekey = "DELETE FROM prekeys where user = ? and client = ? and key = ?" + +selectMLSPublicKey :: PrepQuery R (UserId, ClientId, SignatureSchemeTag) (Identity Blob) +selectMLSPublicKey = "SELECT key from mls_public_keys where user = ? and client = ? and sig_scheme = ?" + +selectMLSPublicKeys :: PrepQuery R (UserId, ClientId) (SignatureSchemeTag, Blob) +selectMLSPublicKeys = "SELECT sig_scheme, key from mls_public_keys where user = ? and client = ?" + +selectMLSPublicKeysByUser :: PrepQuery R (Identity UserId) (ClientId, SignatureSchemeTag, Blob) +selectMLSPublicKeysByUser = "SELECT client, sig_scheme, key from mls_public_keys where user = ?" + +insertMLSPublicKeys :: PrepQuery W (UserId, ClientId, SignatureSchemeTag, Blob) Row +insertMLSPublicKeys = + "INSERT INTO mls_public_keys (user, client, sig_scheme, key) \ + \VALUES (?, ?, ?, ?) IF NOT EXISTS" + +------------------------------------------------------------------------------- +-- Conversions + +toClient :: + [(SignatureSchemeTag, Blob)] -> + ( ClientId, + ClientType, + UTCTimeMillis, + Maybe Text, + Maybe ClientClass, + Maybe CookieLabel, + Maybe Text, + Maybe (C.Set ClientCapability), + Maybe UTCTime + ) -> + Client +toClient keys (cid, cty, tme, lbl, cls, cok, mdl, cps, lastActive) = + Client + { clientId = cid, + clientType = cty, + clientTime = tme, + clientClass = cls, + clientLabel = lbl, + clientCookie = cok, + clientModel = mdl, + clientCapabilities = ClientCapabilityList $ maybe Set.empty (Set.fromList . C.fromSet) cps, + clientMLSPublicKeys = fmap (LBS.toStrict . fromBlob) (Map.fromList keys), + clientLastActive = lastActive + } + +toPubClient :: (ClientId, Maybe ClientClass) -> PubClient +toPubClient = uncurry PubClient diff --git a/libs/wire-subsystems/src/Wire/ClientStore/DynamoDB.hs b/libs/wire-subsystems/src/Wire/ClientStore/DynamoDB.hs new file mode 100644 index 00000000000..7eaa56c75e3 --- /dev/null +++ b/libs/wire-subsystems/src/Wire/ClientStore/DynamoDB.hs @@ -0,0 +1,203 @@ +-- | Best-effort optimistic locking for prekeys via DynamoDB +module Wire.ClientStore.DynamoDB where + +import Amazonka qualified as AWS +import Amazonka.Data.Text qualified as AWS +import Amazonka.DynamoDB qualified as AWS +import Amazonka.DynamoDB.Lens qualified as AWS +import Bilge.Retry (httpHandlers) +import Control.Error +import Control.Exception.Lens qualified as EL +import Control.Lens +import Control.Monad.Catch +import Control.Retry +import Data.ByteString.Conversion (toByteString') +import Data.HashMap.Strict qualified as HashMap +import Data.Id +import Data.Text qualified as Text +import Data.UUID qualified as UUID +import Imports +import Polysemy hiding (run) +import Polysemy.Input +import Polysemy.TinyLog (TinyLog) +import Polysemy.TinyLog qualified as Log +import Prometheus qualified as Prom +import System.Logger.Class (field, msg, val) +import UnliftIO.Resource (runResourceT) +import Wire.Sem.Metrics (Metrics) +import Wire.Sem.Metrics qualified as Metrics + +data OptimisticLockEnv = OptimisticLockEnv + { awsEnv :: AWS.Env, + prekeyTable :: Text + } + +ddbClient :: Text +ddbClient = "client" + +ddbVersion :: Text +ddbVersion = "version" + +ddbKey :: UserId -> ClientId -> AWS.AttributeValue +ddbKey u c = AWS.S (UUID.toText (toUUID u) <> "." <> clientToText c) + +key :: UserId -> ClientId -> HashMap Text AWS.AttributeValue +key u c = HashMap.singleton ddbClient (ddbKey u c) + +deleteOptLock :: + (Member (Embed IO) r, Member (Input OptimisticLockEnv) r) => + UserId -> + ClientId -> + Sem r () +deleteOptLock u c = do + t <- inputs (.prekeyTable) + e <- inputs (.awsEnv) + embed . runResourceT . void $ AWS.send e (AWS.newDeleteItem t & AWS.deleteItem_key .~ key u c) + +withOptLock :: + forall a r. + ( Member (Final IO) r, + Member (Input OptimisticLockEnv) r, + Member Metrics r, + Member TinyLog r + ) => + UserId -> + ClientId -> + Sem r a -> + Sem r a +withOptLock u c ma = go (10 :: Int) + where + go !n = do + v <- (version =<<) <$> execDyn pure get + a <- ma + r <- execDyn pure (put v) + case r of + Nothing | n > 0 -> reportAttemptFailure >> go (n - 1) + Nothing -> reportFailureAndLogError >> pure a + Just _ -> pure a + version :: AWS.GetItemResponse -> Maybe Word32 + version v = conv . HashMap.lookup ddbVersion =<< (view AWS.getItemResponse_item v) + where + conv :: Maybe AWS.AttributeValue -> Maybe Word32 + conv = \case + Just (AWS.N t) -> readMaybe $ Text.unpack t + _ -> Nothing + get :: Text -> AWS.GetItem + get t = + AWS.newGetItem t + & AWS.getItem_key .~ key u c + & AWS.getItem_consistentRead ?~ True + put :: Maybe Word32 -> Text -> AWS.PutItem + put v t = + AWS.newPutItem t + & AWS.putItem_item .~ item v + & AWS.putItem_expected ?~ check v + + check :: Maybe Word32 -> HashMap Text AWS.ExpectedAttributeValue + check Nothing = HashMap.singleton ddbVersion $ AWS.newExpectedAttributeValue & AWS.expectedAttributeValue_comparisonOperator ?~ AWS.ComparisonOperator_NULL + check (Just v) = + HashMap.singleton ddbVersion $ + AWS.newExpectedAttributeValue + & AWS.expectedAttributeValue_comparisonOperator ?~ AWS.ComparisonOperator_EQ + & AWS.expectedAttributeValue_attributeValueList ?~ [toAttributeValue v] + item :: Maybe Word32 -> HashMap Text AWS.AttributeValue + item v = + HashMap.insert ddbVersion (toAttributeValue (maybe (1 :: Word32) (+ 1) v)) $ + key u c + toAttributeValue :: Word32 -> AWS.AttributeValue + toAttributeValue w = AWS.N $ AWS.toText (fromIntegral w :: Int) + + reportAttemptFailure :: Sem r () + reportAttemptFailure = Metrics.incCounter optimisticLockGrabAttemptFailedCounter + + reportFailureAndLogError :: Sem r () + reportFailureAndLogError = do + Log.err $ + field "user" (toByteString' u) + . field "client" (toByteString' c) + . msg (val "PreKeys: Optimistic lock failed") + Metrics.incCounter optimisticLockFailedCounter + execDyn :: + forall s x. + (AWS.AWSRequest s) => + (AWS.AWSResponse s -> Maybe x) -> + (Text -> s) -> + Sem r (Maybe x) + execDyn cnv mkCmd = do + cmd <- mkCmd <$> inputs (.prekeyTable) + e <- inputs (.awsEnv) + embedFinal $ execDyn' e cnv cmd + where + execDyn' :: + forall y p. + (AWS.AWSRequest p) => + AWS.Env -> + (AWS.AWSResponse p -> Maybe y) -> + p -> + IO (Maybe y) + execDyn' e conv cmd = recovering policy handlers (const run) + where + run = runResourceT (AWS.sendEither e cmd) >>= either handleErr (pure . conv) + handlers = httpHandlers ++ [const $ EL.handler_ AWS._ConditionalCheckFailedException (pure True)] + policy = limitRetries 3 <> exponentialBackoff 100000 + handleErr (AWS.ServiceError se) | se ^. AWS.serviceError_code == AWS.ErrorCode "ProvisionedThroughputExceeded" = do + Prom.incCounter dynProvisionedThroughputExceededCounter + pure Nothing + handleErr _ = pure Nothing + +execCatch :: + ( AWS.AWSRequest a, + MonadUnliftIO m, + MonadCatch m + ) => + AWS.Env -> + a -> + m (Either AWS.Error (AWS.AWSResponse a)) +execCatch e cmd = + AWS.runResourceT $ + EL.trying AWS._Error $ + AWS.send e cmd + +exec :: + ( AWS.AWSRequest a, + MonadCatch m, + MonadIO m + ) => + AWS.Env -> + a -> + m (AWS.AWSResponse a) +exec e cmd = liftIO (execCatch e cmd) >>= either throwM pure + +withLocalLock :: MVar () -> IO a -> IO a +withLocalLock l ma = do + (takeMVar l *> ma) `finally` putMVar l () + +{-# NOINLINE optimisticLockGrabAttemptFailedCounter #-} +optimisticLockGrabAttemptFailedCounter :: Prom.Counter +optimisticLockGrabAttemptFailedCounter = + Prom.unsafeRegister $ + Prom.counter + Prom.Info + { Prom.metricName = "client_opt_lock_optimistic_lock_grab_attempt_failed", + Prom.metricHelp = "Number of times grab attempts for optimisitic lock on prekeys failed" + } + +{-# NOINLINE optimisticLockFailedCounter #-} +optimisticLockFailedCounter :: Prom.Counter +optimisticLockFailedCounter = + Prom.unsafeRegister $ + Prom.counter + Prom.Info + { Prom.metricName = "client_opt_lock_optimistic_lock_failed", + Prom.metricHelp = "Number of time optimisitic lock on prekeys failed" + } + +{-# NOINLINE dynProvisionedThroughputExceededCounter #-} +dynProvisionedThroughputExceededCounter :: Prom.Counter +dynProvisionedThroughputExceededCounter = + Prom.unsafeRegister $ + Prom.counter + Prom.Info + { Prom.metricName = "client_opt_lock_provisioned_throughput_exceeded", + Prom.metricHelp = "Number of times provisioned throughput on DynamoDB was exceeded" + } diff --git a/libs/wire-subsystems/wire-subsystems.cabal b/libs/wire-subsystems/wire-subsystems.cabal index 5dc5e19c770..66a38da9824 100644 --- a/libs/wire-subsystems/wire-subsystems.cabal +++ b/libs/wire-subsystems/wire-subsystems.cabal @@ -87,6 +87,7 @@ common common-all , aeson-pretty , amazonka , amazonka-core + , amazonka-dynamodb , amazonka-ses , amazonka-sqs , amqp @@ -162,6 +163,7 @@ common common-all , saml2-web-sso , schema-profunctor , servant + , MonadRandom , servant-client-core , servant-server , singletons @@ -225,6 +227,9 @@ library Wire.BlockListStore.Cassandra Wire.BrigAPIAccess Wire.BrigAPIAccess.Rpc + Wire.ClientStore + Wire.ClientStore.Cassandra + Wire.ClientStore.DynamoDB Wire.CodeStore Wire.CodeStore.Cassandra Wire.CodeStore.Cassandra.Queries diff --git a/services/brig/src/Brig/API/Auth.hs b/services/brig/src/Brig/API/Auth.hs index 962a8d79b98..d551cb7ea27 100644 --- a/services/brig/src/Brig/API/Auth.hs +++ b/services/brig/src/Brig/API/Auth.hs @@ -55,6 +55,7 @@ import Wire.AuthenticationSubsystem.Config import Wire.AuthenticationSubsystem.Error (zauthError) import Wire.AuthenticationSubsystem.ZAuth import Wire.BlockListStore +import Wire.ClientStore (ClientStore) import Wire.DomainRegistrationStore (DomainRegistrationStore) import Wire.EmailSubsystem (EmailSubsystem) import Wire.Error (HttpError (..)) @@ -87,7 +88,8 @@ accessH :: Member Now r, Member AuthenticationSubsystem r, Member Random r, - Member UserStore r + Member UserStore r, + Member ClientStore r ) => Maybe ClientId -> [Either Text SomeUserToken] -> @@ -115,7 +117,8 @@ access :: Member Now r, Member AuthenticationSubsystem r, Member Random r, - Member UserStore r + Member UserStore r, + Member ClientStore r ) => Maybe ClientId -> NonEmpty (Token u) -> diff --git a/services/brig/src/Brig/API/Client.hs b/services/brig/src/Brig/API/Client.hs index 832aa8d8892..eab7ff4b25a 100644 --- a/services/brig/src/Brig/API/Client.hs +++ b/services/brig/src/Brig/API/Client.hs @@ -31,8 +31,6 @@ module Brig.API.Client lookupPubClients, lookupPubClientsBulk, lookupLocalPubClientsBulk, - Data.lookupPrekeyIds, - Data.lookupUsersClientIds, createAccessToken, -- * Prekeys @@ -43,7 +41,6 @@ module Brig.API.Client claimPrekeyBundle, claimMultiPrekeyBundles, claimMultiPrekeyBundlesV3, - Data.lookupClientIds, ) where @@ -107,6 +104,8 @@ import Wire.API.UserEvent import Wire.API.UserMap (QualifiedUserMap (QualifiedUserMap, qualifiedUserMap), UserMap (userMap)) import Wire.AuthenticationSubsystem (AuthenticationSubsystem) import Wire.AuthenticationSubsystem qualified as Authentication +import Wire.ClientStore (ClientStore, DuplicateMLSPublicKey (..)) +import Wire.ClientStore qualified as ClientStore import Wire.DeleteQueue import Wire.EmailSubsystem (EmailSubsystem, sendNewClientEmail) import Wire.Events (Events) @@ -121,18 +120,18 @@ import Wire.UserSubsystem (UserSubsystem) import Wire.UserSubsystem qualified as User import Wire.VerificationCodeSubsystem (VerificationCodeSubsystem) -lookupLocalClient :: UserId -> ClientId -> (AppT r) (Maybe Client) -lookupLocalClient uid = wrapClient . Data.lookupClient uid +lookupLocalClient :: (Member ClientStore r) => UserId -> ClientId -> AppT r (Maybe Client) +lookupLocalClient uid = liftSem . ClientStore.lookupClient uid -lookupLocalClients :: UserId -> (AppT r) [Client] -lookupLocalClients = wrapClient . Data.lookupClients +lookupLocalClients :: (Member ClientStore r) => UserId -> AppT r [Client] +lookupLocalClients = liftSem . ClientStore.lookupClients -lookupPubClient :: Qualified UserId -> ClientId -> ExceptT ClientError (AppT r) (Maybe PubClient) +lookupPubClient :: (Member ClientStore r) => Qualified UserId -> ClientId -> ExceptT ClientError (AppT r) (Maybe PubClient) lookupPubClient qid cid = do clients <- lookupPubClients qid pure $ find ((== cid) . pubClientId) clients -lookupPubClients :: Qualified UserId -> ExceptT ClientError (AppT r) [PubClient] +lookupPubClients :: (Member ClientStore r) => Qualified UserId -> ExceptT ClientError (AppT r) [PubClient] lookupPubClients qid@(Qualified uid domain) = do getForUser <$> lookupPubClientsBulk [qid] where @@ -141,7 +140,7 @@ lookupPubClients qid@(Qualified uid domain) = do um <- userMap <$> Map.lookup domain (qualifiedUserMap qmap) Set.toList <$> Map.lookup uid um -lookupPubClientsBulk :: [Qualified UserId] -> ExceptT ClientError (AppT r) (QualifiedUserMap (Set PubClient)) +lookupPubClientsBulk :: (Member ClientStore r) => [Qualified UserId] -> ExceptT ClientError (AppT r) (QualifiedUserMap (Set PubClient)) lookupPubClientsBulk qualifiedUids = do loc <- qualifyLocal () let (localUsers, remoteUsers) = partitionQualified loc qualifiedUids @@ -161,8 +160,8 @@ lookupPubClientsBulk qualifiedUids = do ~~ msg (val "Failed to fetch clients for domain") pure $ Map.fromList (rights results) -lookupLocalPubClientsBulk :: [UserId] -> ExceptT ClientError (AppT r) (UserMap (Set PubClient)) -lookupLocalPubClientsBulk = lift . wrapClient . Data.lookupPubClientsBulk +lookupLocalPubClientsBulk :: (Member ClientStore r) => [UserId] -> ExceptT ClientError (AppT r) (UserMap (Set PubClient)) +lookupLocalPubClientsBulk = lift . liftSem . ClientStore.lookupPubClientsBulk addClient :: ( Member GalleyAPIAccess r, @@ -172,7 +171,8 @@ addClient :: Member EmailSubsystem r, Member AuthenticationSubsystem r, Member VerificationCodeSubsystem r, - Member Events r + Member Events r, + Member ClientStore r ) => Local UserId -> Maybe ConnId -> @@ -191,7 +191,8 @@ addClientWithReAuthPolicy :: Member Events r, Member UserSubsystem r, Member AuthenticationSubsystem r, - Member VerificationCodeSubsystem r + Member VerificationCodeSubsystem r, + Member ClientStore r ) => Data.ReAuthPolicy -> Local UserId -> @@ -245,14 +246,14 @@ addClientWithReAuthPolicy policy luid@(tUnqualified -> u) con new = do VerificationCodeNoEmail -> throwE ClientCodeAuthenticationFailed updateClient :: - (Member NotificationSubsystem r) => + (Member NotificationSubsystem r, Member ClientStore r) => UserId -> ClientId -> UpdateClient -> (Handler r) () updateClient uid cid req = do - client <- wrapClientE (lift (Data.lookupClient uid cid) >>= maybe (throwE ClientNotFound) pure) !>> clientError - wrapClientE $ for_ req.updateClientLabel $ lift . Data.updateClientLabel uid cid . Just + client <- (lift (liftSem (ClientStore.lookupClient uid cid)) >>= maybe (throwE ClientNotFound) pure) !>> clientError + lift . liftSem $ for_ req.updateClientLabel $ ClientStore.updateLabel uid cid . Just for_ req.updateClientCapabilities $ \caps -> do if client.clientCapabilities.fromClientCapabilityList `Set.isSubsetOf` caps.fromClientCapabilityList then do @@ -260,13 +261,15 @@ updateClient uid cid req = do let addedCapabilities = caps.fromClientCapabilityList \\ client.clientCapabilities.fromClientCapabilityList when (ClientSupportsConsumableNotifications `Set.member` addedCapabilities) $ lift $ liftSem $ do setupConsumableNotifications uid cid - wrapClientE $ lift . Data.updateClientCapabilities uid cid . Just $ caps + lift . liftSem . ClientStore.updateCapabilities uid cid . Just $ caps else throwE $ clientError ClientCapabilitiesCannotBeRemoved let lk = maybeToList (unpackLastPrekey <$> req.updateClientLastKey) - wrapClientE - ( do - Data.updatePrekeys uid cid (lk ++ req.updateClientPrekeys) - Data.addMLSPublicKeys uid cid (Map.assocs req.updateClientMLSPublicKeys) + ( do + lift . liftSem $ ClientStore.updatePrekeys uid cid (lk ++ req.updateClientPrekeys) + mErr <- lift . liftSem $ ClientStore.addMLSPublicKeys uid cid (Map.assocs req.updateClientMLSPublicKeys) + case mErr of + Just DuplicateMLSPublicKey -> throwE MLSPublicKeyDuplicate + Nothing -> pure () ) !>> ClientDataError !>> clientError @@ -275,7 +278,8 @@ updateClient uid cid req = do -- a superset of the clients known to galley. rmClient :: ( Member DeleteQueue r, - Member AuthenticationSubsystem r + Member AuthenticationSubsystem r, + Member ClientStore r ) => UserId -> ConnId -> @@ -283,7 +287,7 @@ rmClient :: Maybe PlainTextPassword6 -> ExceptT ClientError (AppT r) () rmClient u con clt pw = - maybe (throwE ClientNotFound) fn =<< lift (wrapClient $ Data.lookupClient u clt) + maybe (throwE ClientNotFound) fn =<< lift (liftSem $ ClientStore.lookupClient u clt) where fn client = do case clientType client of @@ -299,7 +303,8 @@ rmClient u con clt pw = claimPrekey :: ( Member DeleteQueue r, - Member AuthenticationSubsystem r + Member AuthenticationSubsystem r, + Member ClientStore r ) => LegalholdProtectee -> UserId -> @@ -314,7 +319,8 @@ claimPrekey protectee u d c = do claimLocalPrekey :: ( Member DeleteQueue r, - Member AuthenticationSubsystem r + Member AuthenticationSubsystem r, + Member ClientStore r ) => LegalholdProtectee -> UserId -> @@ -323,7 +329,7 @@ claimLocalPrekey :: claimLocalPrekey protectee user client = do guardLegalhold protectee (mkUserClients [(user, [client])]) lift $ do - prekey <- wrapHttpClient $ Data.claimPrekey user client + prekey <- liftSem $ ClientStore.claimPrekey user client when (isNothing prekey) (noPrekeys user client) pure prekey @@ -337,18 +343,18 @@ claimRemotePrekey :: ExceptT ClientError m (Maybe ClientPrekey) claimRemotePrekey quser client = fmapLT ClientFederationError $ Federation.claimPrekey quser client -claimPrekeyBundle :: LegalholdProtectee -> Domain -> UserId -> ExceptT ClientError (AppT r) PrekeyBundle +claimPrekeyBundle :: (Member ClientStore r) => LegalholdProtectee -> Domain -> UserId -> ExceptT ClientError (AppT r) PrekeyBundle claimPrekeyBundle protectee domain uid = do isLocalDomain <- (domain ==) <$> viewFederationDomain if isLocalDomain then claimLocalPrekeyBundle protectee uid else claimRemotePrekeyBundle (Qualified uid domain) -claimLocalPrekeyBundle :: LegalholdProtectee -> UserId -> ExceptT ClientError (AppT r) PrekeyBundle +claimLocalPrekeyBundle :: (Member ClientStore r) => LegalholdProtectee -> UserId -> ExceptT ClientError (AppT r) PrekeyBundle claimLocalPrekeyBundle protectee u = do - clients <- map (.clientId) <$> lift (wrapClient (Data.lookupClients u)) + clients <- map (.clientId) <$> lift (liftSem (ClientStore.lookupClients u)) guardLegalhold protectee (mkUserClients [(u, clients)]) - PrekeyBundle u . catMaybes <$> lift (mapM (wrapHttp . Data.claimPrekey u) clients) + PrekeyBundle u . catMaybes <$> lift (mapM (liftSem . ClientStore.claimPrekey u) clients) claimRemotePrekeyBundle :: Qualified UserId -> ExceptT ClientError (AppT r) PrekeyBundle claimRemotePrekeyBundle quser = do @@ -358,7 +364,8 @@ claimMultiPrekeyBundlesInternal :: forall r. ( Member (Concurrency 'Unsafe) r, Member DeleteQueue r, - Member AuthenticationSubsystem r + Member AuthenticationSubsystem r, + Member ClientStore r ) => LegalholdProtectee -> QualifiedUserClients -> @@ -388,7 +395,8 @@ claimMultiPrekeyBundlesInternal protectee quc = do claimMultiPrekeyBundlesV3 :: ( Member (Concurrency 'Unsafe) r, Member DeleteQueue r, - Member AuthenticationSubsystem r + Member AuthenticationSubsystem r, + Member ClientStore r ) => LegalholdProtectee -> QualifiedUserClients -> @@ -424,7 +432,8 @@ claimMultiPrekeyBundles :: forall r. ( Member (Concurrency 'Unsafe) r, Member DeleteQueue r, - Member AuthenticationSubsystem r + Member AuthenticationSubsystem r, + Member ClientStore r ) => LegalholdProtectee -> QualifiedUserClients -> @@ -454,7 +463,8 @@ claimLocalMultiPrekeyBundles :: forall r. ( Member (Concurrency 'Unsafe) r, Member DeleteQueue r, - Member AuthenticationSubsystem r + Member AuthenticationSubsystem r, + Member ClientStore r ) => LegalholdProtectee -> UserClients -> @@ -490,7 +500,7 @@ claimLocalMultiPrekeyBundles protectee userClients = do ClientId -> (AppT r) (Maybe UncheckedPrekeyBundle) getClientKeys u c = do - key <- fmap prekeyData <$> wrapHttpClient (Data.claimPrekey u c) + key <- fmap prekeyData <$> liftSem (ClientStore.claimPrekey u c) when (isNothing key) $ noPrekeys u c pure key @@ -499,7 +509,8 @@ claimLocalMultiPrekeyBundles protectee userClients = do -- | Enqueue an orderly deletion of an existing client. execDelete :: ( Member DeleteQueue r, - Member AuthenticationSubsystem r + Member AuthenticationSubsystem r, + Member ClientStore r ) => UserId -> Maybe ConnId -> @@ -508,7 +519,7 @@ execDelete :: execDelete u con c = do for_ (clientCookie c) $ \l -> liftSem $ Auth.revokeCookies u [] [l] liftSem $ enqueueClientDeletion c.clientId u con - wrapClient $ Data.rmClient u c.clientId + liftSem $ ClientStore.delete u c.clientId -- | Defensive measure when no prekey is found for a -- requested client: Ensure that the client does indeed @@ -517,13 +528,14 @@ execDelete u con c = do -- (and possibly duplicated) client data. noPrekeys :: ( Member DeleteQueue r, - Member AuthenticationSubsystem r + Member AuthenticationSubsystem r, + Member ClientStore r ) => UserId -> ClientId -> (AppT r) () noPrekeys u c = do - mclient <- wrapClient $ Data.lookupClient u c + mclient <- liftSem $ ClientStore.lookupClient u c case mclient of Nothing -> do Log.warn $ @@ -558,12 +570,13 @@ legalHoldClientRequested targetUser (LegalHoldClientRequest _requester lastPreke removeLegalHoldClient :: ( Member DeleteQueue r, Member Events r, - Member AuthenticationSubsystem r + Member AuthenticationSubsystem r, + Member ClientStore r ) => UserId -> AppT r () removeLegalHoldClient uid = do - clients <- wrapClient $ Data.lookupClients uid + clients <- liftSem $ ClientStore.lookupClients uid -- Should only be one; but just in case we'll treat it as a list let legalHoldClients = filter ((== LegalHoldClientType) . clientType) clients -- maybe log if this isn't the case diff --git a/services/brig/src/Brig/API/Federation.hs b/services/brig/src/Brig/API/Federation.hs index 7951cf09cf5..568097886ed 100644 --- a/services/brig/src/Brig/API/Federation.hs +++ b/services/brig/src/Brig/API/Federation.hs @@ -67,6 +67,7 @@ import Wire.API.User.Search hiding (searchPolicy) import Wire.API.UserEvent import Wire.API.UserMap (UserMap) import Wire.AuthenticationSubsystem +import Wire.ClientStore (ClientStore) import Wire.DeleteQueue import Wire.Error import Wire.FederationConfigStore (FederationConfigStore) @@ -89,7 +90,8 @@ federationSitemap :: Member UserSubsystem r, Member UserStore r, Member DeleteQueue r, - Member AuthenticationSubsystem r + Member AuthenticationSubsystem r, + Member ClientStore r ) => ServerT FederationAPI (Handler r) federationSitemap = @@ -183,7 +185,8 @@ getUsersByIds _ uids = do claimPrekey :: ( Member DeleteQueue r, - Member AuthenticationSubsystem r + Member AuthenticationSubsystem r, + Member ClientStore r ) => Domain -> (UserId, ClientId) -> @@ -191,14 +194,15 @@ claimPrekey :: claimPrekey _ (user, client) = do API.claimLocalPrekey LegalholdPlusFederationNotImplemented user client !>> clientError -claimPrekeyBundle :: Domain -> UserId -> (Handler r) PrekeyBundle +claimPrekeyBundle :: (Member ClientStore r) => Domain -> UserId -> (Handler r) PrekeyBundle claimPrekeyBundle _ user = API.claimLocalPrekeyBundle LegalholdPlusFederationNotImplemented user !>> clientError claimMultiPrekeyBundle :: ( Member (Concurrency 'Unsafe) r, Member DeleteQueue r, - Member AuthenticationSubsystem r + Member AuthenticationSubsystem r, + Member ClientStore r ) => Domain -> UserClients -> @@ -207,7 +211,8 @@ claimMultiPrekeyBundle _ uc = API.claimLocalMultiPrekeyBundles LegalholdPlusFede fedClaimKeyPackages :: ( Member GalleyAPIAccess r, - Member UserStore r + Member UserStore r, + Member ClientStore r ) => Domain -> ClaimKeyPackageRequest -> @@ -279,18 +284,18 @@ searchUsers domain (SearchRequest searchTerm mTeam mOnlyInTeams) = do isTeamAllowed (Just _) Nothing = False isTeamAllowed (Just teams) (Just tid) = tid `elem` teams -getUserClients :: Domain -> GetUserClients -> (Handler r) (UserMap (Set PubClient)) +getUserClients :: (Member ClientStore r) => Domain -> GetUserClients -> (Handler r) (UserMap (Set PubClient)) getUserClients _ (GetUserClients uids) = API.lookupLocalPubClientsBulk uids !>> clientError -getMLSClients :: Domain -> MLSClientsRequest -> Handler r (Set ClientInfo) +getMLSClients :: (Member ClientStore r) => Domain -> MLSClientsRequest -> Handler r (Set ClientInfo) getMLSClients _domain mcr = do Internal.getMLSClientsH mcr.userId mcr.cipherSuite -getMLSClient :: Domain -> MLSClientRequest -> Handler r ClientInfo +getMLSClient :: (Member ClientStore r) => Domain -> MLSClientRequest -> Handler r ClientInfo getMLSClient _domain mcr = Internal.getMLSClientH mcr.userId mcr.clientId mcr.cipherSuite -getMLSClientsV0 :: Domain -> MLSClientsRequestV0 -> Handler r (Set ClientInfo) +getMLSClientsV0 :: (Member ClientStore r) => Domain -> MLSClientsRequestV0 -> Handler r (Set ClientInfo) getMLSClientsV0 domain mcr0 = getMLSClients domain (mlsClientsRequestFromV0 mcr0) onUserDeleted :: diff --git a/services/brig/src/Brig/API/Internal.hs b/services/brig/src/Brig/API/Internal.hs index f025c414859..3c823ae1e77 100644 --- a/services/brig/src/Brig/API/Internal.hs +++ b/services/brig/src/Brig/API/Internal.hs @@ -46,7 +46,6 @@ import Brig.Types.Intra import Brig.Types.User import Brig.User.EJPD qualified import Brig.User.Search.Index qualified as Search -import Cassandra qualified as Cas import Control.Error hiding (bool) import Control.Lens (preview, to, _Just) import Control.Lens.Extras (is) @@ -74,7 +73,6 @@ import Polysemy.TinyLog (TinyLog) import Servant hiding (Handler, JSON, addHeader, respond) import Servant.OpenApi.Internal.Orphans () import System.Logger.Class qualified as Log -import UnliftIO.Async (pooledMapConcurrentlyN) import Wire.API.Connection import Wire.API.EnterpriseLogin hiding (domain) import Wire.API.Error @@ -95,10 +93,13 @@ import Wire.API.User.RichInfo import Wire.API.UserEvent import Wire.API.UserGroup (UserGroup) import Wire.API.UserGroup.Pagination +import Wire.API.UserMap import Wire.ActivationCodeStore (ActivationCodeStore) import Wire.AuthenticationSubsystem (AuthenticationSubsystem) import Wire.AuthenticationSubsystem.Config (AuthenticationSubsystemConfig) import Wire.BlockListStore (BlockListStore) +import Wire.ClientStore (ClientStore) +import Wire.ClientStore qualified as ClientStore import Wire.DeleteQueue (DeleteQueue) import Wire.DomainRegistrationStore hiding (domain) import Wire.EmailSubsystem (EmailSubsystem) @@ -180,7 +181,8 @@ servantSitemap :: Member (Input AuthenticationSubsystemConfig) r, Member Now r, Member CryptoSign r, - Member Random r + Member Random r, + Member ClientStore r ) => ServerT BrigIRoutes.API (Handler r) servantSitemap = @@ -213,7 +215,7 @@ ejpdAPI :: ServerT BrigIRoutes.EJPDRequest (Handler r) ejpdAPI = Named @"ejpd-request" Brig.User.EJPD.ejpdRequest -mlsAPI :: ServerT BrigIRoutes.MLSAPI (Handler r) +mlsAPI :: (Member ClientStore r) => ServerT BrigIRoutes.MLSAPI (Handler r) mlsAPI = Named @"get-mls-clients" getMLSClientsH :<|> Named @"get-mls-client" getMLSClientH @@ -247,7 +249,8 @@ accountAPI :: Member RateLimit r, Member SparAPIAccess r, Member EnterpriseLoginSubsystem r, - Member (Concurrency Unsafe) r + Member (Concurrency Unsafe) r, + Member ClientStore r ) => ServerT BrigIRoutes.AccountAPI (Handler r) accountAPI = @@ -328,7 +331,7 @@ userAPI = :<|> Named @"get-user-export-data" getUserExportDataH :<|> Named @"i-check-admin-get-team-id" checkAdminGetTeamId -clientAPI :: ServerT BrigIRoutes.ClientAPI (Handler r) +clientAPI :: (Member ClientStore r) => ServerT BrigIRoutes.ClientAPI (Handler r) clientAPI = Named @"update-client-last-active" updateClientLastActive authAPI :: @@ -450,36 +453,29 @@ deleteAccountConferenceCallingConfig :: (Member UserStore r) => UserId -> Handle deleteAccountConferenceCallingConfig uid = lift . liftSem $ UserStore.updateFeatureConferenceCalling uid Nothing $> NoContent -getMLSClientH :: UserId -> ClientId -> CipherSuite -> Handler r ClientInfo +getMLSClientH :: (Member ClientStore r) => UserId -> ClientId -> CipherSuite -> Handler r ClientInfo getMLSClientH usr cid suite = do lusr <- qualifyLocal usr suiteTag <- maybe (mlsProtocolError "Unknown ciphersuite") pure (cipherSuiteTag suite) - lift . wrapClient $ getMLSClient lusr cid suiteTag + lift $ getMLSClient lusr cid suiteTag -getMLSClientsH :: UserId -> CipherSuite -> Handler r (Set ClientInfo) +getMLSClientsH :: (Member ClientStore r) => UserId -> CipherSuite -> Handler r (Set ClientInfo) getMLSClientsH usr suite = do lusr <- qualifyLocal usr suiteTag <- maybe (mlsProtocolError "Unknown ciphersuite") pure (cipherSuiteTag suite) - allClients <- lift (wrapClient (API.lookupUsersClientIds (pure usr))) >>= getResult - clientInfos <- lift . wrapClient $ UnliftIO.Async.pooledMapConcurrentlyN 16 (\c -> getMLSClient lusr c suiteTag) (toList allClients) + allClients <- lift (liftSem (ClientStore.lookupClientIds usr)) + clientInfos <- lift $ traverse (\c -> getMLSClient lusr c suiteTag) (toList allClients) pure $ Set.fromList clientInfos - where - getResult [] = pure mempty - getResult ((u, cs') : rs) - | u == usr = pure cs' - | otherwise = getResult rs getMLSClient :: - ( MonadReader Env m, - Cas.MonadClient m - ) => + (Member ClientStore r) => Local UserId -> ClientId -> CipherSuiteTag -> - m ClientInfo + AppT r ClientInfo getMLSClient lusr cid suiteTag = do - numKeyPackages <- Data.countKeyPackages lusr cid suiteTag - mc <- Data.lookupClient (tUnqualified lusr) cid + numKeyPackages <- wrapClient $ Data.countKeyPackages lusr cid suiteTag + mc <- liftSem $ ClientStore.lookupClient (tUnqualified lusr) cid let keys = foldMap (.clientMLSPublicKeys) mc ss = csSignatureScheme suiteTag pure @@ -540,7 +536,8 @@ addClientInternalH :: Member Events r, Member UserSubsystem r, Member VerificationCodeSubsystem r, - Member AuthenticationSubsystem r + Member AuthenticationSubsystem r, + Member ClientStore r ) => UserId -> Maybe Bool -> @@ -561,21 +558,22 @@ legalHoldClientRequestedH targetUser clientRequest = do removeLegalHoldClientH :: ( Member DeleteQueue r, Member Events r, - Member AuthenticationSubsystem r + Member AuthenticationSubsystem r, + Member ClientStore r ) => UserId -> (Handler r) NoContent removeLegalHoldClientH uid = do lift $ NoContent <$ API.removeLegalHoldClient uid -internalListClientsH :: UserSet -> (Handler r) UserClients -internalListClientsH (UserSet usrs) = lift $ do - UserClients . Map.fromList - <$> wrapClient (API.lookupUsersClientIds (Set.toList usrs)) +internalListClientsH :: (Member ClientStore r) => UserSet -> (Handler r) UserClients +internalListClientsH (UserSet usrs) = + lift . liftSem $ ClientStore.lookupClientIdsBulk (Set.toList usrs) -internalListFullClientsH :: UserSet -> (Handler r) UserClientsFull -internalListFullClientsH (UserSet usrs) = lift $ do - UserClientsFull <$> wrapClient (Data.lookupClientsBulk (Set.toList usrs)) +internalListFullClientsH :: (Member ClientStore r) => UserSet -> (Handler r) UserClientsFull +internalListFullClientsH (UserSet usrs) = + lift . liftSem $ + UserClientsFull . userMap <$> ClientStore.lookupClientsBulk (Set.toList usrs) createUserNoVerify :: ( Member BlockListStore r, @@ -638,7 +636,8 @@ deleteUserNoAuthH :: Member UserSubsystem r, Member PropertySubsystem r, Member AuthenticationSubsystem r, - Member UserGroupSubsystem r + Member UserGroupSubsystem r, + Member ClientStore r ) => UserId -> (Handler r) DeleteUserResponse @@ -934,7 +933,7 @@ getDefaultUserLocale = do checkAdminGetTeamId :: (Member UserSubsystem r) => UserId -> Handler r TeamId checkAdminGetTeamId uid = lift . liftSem $ UserSubsystem.checkUserIsAdmin uid -updateClientLastActive :: UserId -> ClientId -> Handler r () +updateClientLastActive :: (Member ClientStore r) => UserId -> ClientId -> Handler r () updateClientLastActive u c = do sysTime <- liftIO getSystemTime -- round up to the next multiple of a week @@ -945,7 +944,7 @@ updateClientLastActive u c = do { systemSeconds = systemSeconds sysTime + (week - systemSeconds sysTime `mod` week), systemNanoseconds = 0 } - lift . wrapClient $ Data.updateClientLastActive u c now + lift . liftSem $ ClientStore.updateLastActive u c now getRichInfoH :: (Member UserStore r) => UserId -> Handler r RichInfo getRichInfoH uid = diff --git a/services/brig/src/Brig/API/MLS/KeyPackages.hs b/services/brig/src/Brig/API/MLS/KeyPackages.hs index 96d3f1d820e..b6cad1a060f 100644 --- a/services/brig/src/Brig/API/MLS/KeyPackages.hs +++ b/services/brig/src/Brig/API/MLS/KeyPackages.hs @@ -36,7 +36,6 @@ import Brig.API.MLS.KeyPackages.Validation import Brig.API.MLS.Util import Brig.API.Types import Brig.App -import Brig.Data.Client qualified as Data import Brig.Data.MLS.KeyPackage qualified as Data import Brig.Federation.Client import Brig.IO.Intra @@ -57,11 +56,13 @@ import Wire.API.MLS.KeyPackage import Wire.API.MLS.Serialisation import Wire.API.Team.LegalHold import Wire.API.User.Client +import Wire.ClientStore (ClientStore) +import Wire.ClientStore qualified as ClientStore import Wire.GalleyAPIAccess (GalleyAPIAccess, getUserLegalholdStatus) import Wire.StoredUser import Wire.UserStore (UserStore, getUser) -uploadKeyPackages :: Local UserId -> ClientId -> KeyPackageUpload -> Handler r () +uploadKeyPackages :: (Member ClientStore r) => Local UserId -> ClientId -> KeyPackageUpload -> Handler r () uploadKeyPackages lusr cid kps = do assertMLSEnabled let identity = mkClientIdentity (tUntagged lusr) cid @@ -70,7 +71,8 @@ uploadKeyPackages lusr cid kps = do claimKeyPackages :: ( Member GalleyAPIAccess r, - Member UserStore r + Member UserStore r, + Member ClientStore r ) => Local UserId -> Maybe ClientId -> @@ -81,7 +83,8 @@ claimKeyPackages lusr mClient target = claimKeyPackagesV7 lusr mClient target . claimKeyPackagesV7 :: ( Member GalleyAPIAccess r, - Member UserStore r + Member UserStore r, + Member ClientStore r ) => Local UserId -> Maybe ClientId -> @@ -101,7 +104,8 @@ claimKeyPackagesV7 lusr mClient target mSuite = do claimLocalKeyPackages :: forall r. ( Member GalleyAPIAccess r, - Member UserStore r + Member UserStore r, + Member ClientStore r ) => Qualified UserId -> Maybe ClientId -> @@ -119,7 +123,7 @@ claimLocalKeyPackages qusr skipOwn suite target = do -- skip own client when the target is the requesting user itself let own = guard (qusr == tUntagged target) *> skipOwn - clients <- map (.clientId) <$> wrapClientE (Data.lookupClients (tUnqualified target)) + clients <- map (.clientId) <$> lift (liftSem (ClientStore.lookupClients (tUnqualified target))) foldQualified target ( \lusr -> @@ -158,6 +162,7 @@ claimLocalKeyPackages qusr skipOwn suite target = do UserLegalHoldNoConsent -> pure () claimRemoteKeyPackages :: + (Member ClientStore r) => Local UserId -> CipherSuite -> Remote UserId -> @@ -226,6 +231,7 @@ deleteKeyPackagesV7 lusr c mSuite (unDeleteKeyPackages -> refs) = do lift $ wrapClient (Data.deleteKeyPackages (tUnqualified lusr) c suite refs) replaceKeyPackages :: + (Member ClientStore r) => Local UserId -> ClientId -> CommaSeparatedList CipherSuite -> @@ -234,6 +240,7 @@ replaceKeyPackages :: replaceKeyPackages lusr c = replaceKeyPackagesV7 lusr c . Just replaceKeyPackagesV7 :: + (Member ClientStore r) => Local UserId -> ClientId -> Maybe (CommaSeparatedList CipherSuite) -> diff --git a/services/brig/src/Brig/API/MLS/KeyPackages/Validation.hs b/services/brig/src/Brig/API/MLS/KeyPackages/Validation.hs index 4b7e4f98a85..09a2fc56bc0 100644 --- a/services/brig/src/Brig/API/MLS/KeyPackages/Validation.hs +++ b/services/brig/src/Brig/API/MLS/KeyPackages/Validation.hs @@ -26,7 +26,6 @@ where import Brig.API.Error import Brig.API.Handler import Brig.App -import Brig.Data.Client qualified as Data import Brig.Options import Control.Applicative import Data.ByteString qualified as LBS @@ -34,6 +33,7 @@ import Data.Qualified import Data.Time.Clock import Data.Time.Clock.POSIX import Imports +import Polysemy import Wire.API.Error import Wire.API.Error.Brig import Wire.API.Error.Brig qualified as E @@ -44,8 +44,11 @@ import Wire.API.MLS.Lifetime import Wire.API.MLS.Serialisation import Wire.API.MLS.Validation import Wire.API.MLS.Validation.Error (toText) +import Wire.ClientStore (ClientStore) +import Wire.ClientStore qualified as ClientStore validateUploadedKeyPackage :: + (Member ClientStore r) => ClientIdentity -> RawMLS KeyPackage -> Handler r (KeyPackageRef, CipherSuiteTag, KeyPackageData) @@ -61,8 +64,8 @@ validateUploadedKeyPackage identity kp = do loc ( \_ -> do mkey :: Maybe LByteString <- - lift . wrapClient $ - Data.lookupMLSPublicKey + lift . liftSem $ + ClientStore.lookupMLSPublicKey (ciUser identity) (ciClient identity) (csSignatureScheme cs) diff --git a/services/brig/src/Brig/API/Public.hs b/services/brig/src/Brig/API/Public.hs index 3d37d91d4e5..bea6ed4e3bd 100644 --- a/services/brig/src/Brig/API/Public.hs +++ b/services/brig/src/Brig/API/Public.hs @@ -162,6 +162,8 @@ import Wire.AppSubsystem qualified as AppSubsystem import Wire.AuthenticationSubsystem as AuthenticationSubsystem import Wire.AuthenticationSubsystem.Config (AuthenticationSubsystemConfig) import Wire.BlockListStore (BlockListStore) +import Wire.ClientStore (ClientStore) +import Wire.ClientStore qualified as ClientStore import Wire.DeleteQueue import Wire.DomainRegistrationStore (DomainRegistrationStore) import Wire.EmailSending (EmailSending) @@ -414,7 +416,8 @@ servantSitemap :: Member UserGroupSubsystem r, Member TeamCollaboratorsSubsystem r, Member TeamSubsystem r, - Member AppSubsystem r + Member AppSubsystem r, + Member ClientStore r ) => ServerT BrigAPI (Handler r) servantSitemap = @@ -675,7 +678,8 @@ listPropertyKeysAndValuesH u = lift . liftSem $ getAllProperties u getPrekeyUnqualifiedH :: ( Member DeleteQueue r, - Member AuthenticationSubsystem r + Member AuthenticationSubsystem r, + Member ClientStore r ) => UserId -> UserId -> @@ -687,7 +691,8 @@ getPrekeyUnqualifiedH zusr user client = do getPrekeyH :: ( Member DeleteQueue r, - Member AuthenticationSubsystem r + Member AuthenticationSubsystem r, + Member ClientStore r ) => UserId -> Qualified UserId -> @@ -697,19 +702,20 @@ getPrekeyH zusr (Qualified user domain) client = do mPrekey <- API.claimPrekey (ProtectedUser zusr) user domain client !>> clientError ifNothing (notFound "prekey not found") mPrekey -getPrekeyBundleUnqualifiedH :: UserId -> UserId -> (Handler r) Public.PrekeyBundle +getPrekeyBundleUnqualifiedH :: (Member ClientStore r) => UserId -> UserId -> (Handler r) Public.PrekeyBundle getPrekeyBundleUnqualifiedH zusr uid = do domain <- viewFederationDomain API.claimPrekeyBundle (ProtectedUser zusr) domain uid !>> clientError -getPrekeyBundleH :: UserId -> Qualified UserId -> (Handler r) Public.PrekeyBundle +getPrekeyBundleH :: (Member ClientStore r) => UserId -> Qualified UserId -> (Handler r) Public.PrekeyBundle getPrekeyBundleH zusr (Qualified uid domain) = API.claimPrekeyBundle (ProtectedUser zusr) domain uid !>> clientError getMultiUserPrekeyBundleUnqualifiedH :: ( Member (Concurrency 'Unsafe) r, Member DeleteQueue r, - Member AuthenticationSubsystem r + Member AuthenticationSubsystem r, + Member ClientStore r ) => UserId -> Public.UserClients -> @@ -736,7 +742,8 @@ getMultiUserPrekeyBundleHInternal qualUserClients = do getMultiUserPrekeyBundleHV3 :: ( Member (Concurrency 'Unsafe) r, Member DeleteQueue r, - Member AuthenticationSubsystem r + Member AuthenticationSubsystem r, + Member ClientStore r ) => UserId -> Public.QualifiedUserClients -> @@ -748,7 +755,8 @@ getMultiUserPrekeyBundleHV3 zusr qualUserClients = do getMultiUserPrekeyBundleH :: ( Member (Concurrency 'Unsafe) r, Member DeleteQueue r, - Member AuthenticationSubsystem r + Member AuthenticationSubsystem r, + Member ClientStore r ) => UserId -> Public.QualifiedUserClients -> @@ -765,7 +773,8 @@ addClient :: Member AuthenticationSubsystem r, Member VerificationCodeSubsystem r, Member Events r, - Member UserSubsystem r + Member UserSubsystem r, + Member ClientStore r ) => Local UserId -> ConnId -> @@ -780,7 +789,8 @@ addClient lusr con new = do deleteClient :: ( Member AuthenticationSubsystem r, - Member DeleteQueue r + Member DeleteQueue r, + Member ClientStore r ) => UserId -> ConnId -> @@ -790,40 +800,40 @@ deleteClient :: deleteClient usr con clt body = API.rmClient usr con clt (Public.rmPassword body) !>> clientError -listClients :: UserId -> (Handler r) [Public.Client] +listClients :: (Member ClientStore r) => UserId -> (Handler r) [Public.Client] listClients zusr = lift $ API.lookupLocalClients zusr -getClient :: UserId -> ClientId -> (Handler r) (Maybe Public.Client) +getClient :: (Member ClientStore r) => UserId -> ClientId -> (Handler r) (Maybe Public.Client) getClient zusr clientId = lift $ API.lookupLocalClient zusr clientId -getUserClientsUnqualified :: UserId -> (Handler r) [Public.PubClient] +getUserClientsUnqualified :: (Member ClientStore r) => UserId -> (Handler r) [Public.PubClient] getUserClientsUnqualified uid = do localdomain <- viewFederationDomain API.lookupPubClients (Qualified uid localdomain) !>> clientError -getUserClientsQualified :: Qualified UserId -> (Handler r) [Public.PubClient] +getUserClientsQualified :: (Member ClientStore r) => Qualified UserId -> (Handler r) [Public.PubClient] getUserClientsQualified quid = API.lookupPubClients quid !>> clientError -getUserClientUnqualified :: UserId -> ClientId -> (Handler r) Public.PubClient +getUserClientUnqualified :: (Member ClientStore r) => UserId -> ClientId -> (Handler r) Public.PubClient getUserClientUnqualified uid cid = do localdomain <- viewFederationDomain x <- API.lookupPubClient (Qualified uid localdomain) cid !>> clientError ifNothing (notFound "client not found") x -listClientsBulk :: UserId -> Range 1 MaxUsersForListClientsBulk [Qualified UserId] -> (Handler r) (Public.QualifiedUserMap (Set Public.PubClient)) +listClientsBulk :: (Member ClientStore r) => UserId -> Range 1 MaxUsersForListClientsBulk [Qualified UserId] -> (Handler r) (Public.QualifiedUserMap (Set Public.PubClient)) listClientsBulk _zusr limitedUids = API.lookupPubClientsBulk (fromRange limitedUids) !>> clientError -listClientsBulkV2 :: UserId -> Public.LimitedQualifiedUserIdList MaxUsersForListClientsBulk -> (Handler r) (Public.WrappedQualifiedUserMap (Set Public.PubClient)) +listClientsBulkV2 :: (Member ClientStore r) => UserId -> Public.LimitedQualifiedUserIdList MaxUsersForListClientsBulk -> (Handler r) (Public.WrappedQualifiedUserMap (Set Public.PubClient)) listClientsBulkV2 zusr userIds = Public.Wrapped <$> listClientsBulk zusr (Public.qualifiedUsers userIds) -getUserClientQualified :: Qualified UserId -> ClientId -> (Handler r) Public.PubClient +getUserClientQualified :: (Member ClientStore r) => Qualified UserId -> ClientId -> (Handler r) Public.PubClient getUserClientQualified quid cid = do x <- API.lookupPubClient quid cid !>> clientError ifNothing (notFound "client not found") x -getClientCapabilities :: UserId -> ClientId -> (Handler r) Public.ClientCapabilityList +getClientCapabilities :: (Member ClientStore r) => UserId -> ClientId -> (Handler r) Public.ClientCapabilityList getClientCapabilities uid cid = do mclient <- lift (API.lookupLocalClient uid cid) maybe (throwStd (errorToWai @'E.ClientNotFound)) (pure . Public.clientCapabilities) mclient @@ -868,8 +878,8 @@ setUserSearchableH :: Handler r () setUserSearchableH zusr uid searchable = lift $ liftSem $ User.setUserSearchable zusr uid searchable -getClientPrekeys :: UserId -> ClientId -> (Handler r) [Public.PrekeyId] -getClientPrekeys usr clt = lift (wrapClient $ API.lookupPrekeyIds usr clt) +getClientPrekeys :: (Member ClientStore r) => UserId -> ClientId -> Handler r [Public.PrekeyId] +getClientPrekeys usr clt = lift . liftSem $ ClientStore.lookupPrekeyIds usr clt newNonce :: UserId -> ClientId -> (Handler r) (Nonce, CacheControl) newNonce uid cid = do @@ -1435,7 +1445,8 @@ deleteSelfUser :: Member HashPassword r, Member RateLimit r, Member AuthenticationSubsystem r, - Member UserGroupSubsystem r + Member UserGroupSubsystem r, + Member ClientStore r ) => Local UserId -> Public.DeleteUser -> @@ -1454,7 +1465,8 @@ verifyDeleteUser :: Member UserSubsystem r, Member Events r, Member AuthenticationSubsystem r, - Member UserGroupSubsystem r + Member UserGroupSubsystem r, + Member ClientStore r ) => Public.VerifyDeleteUser -> Handler r () diff --git a/services/brig/src/Brig/API/User.hs b/services/brig/src/Brig/API/User.hs index 051c095150c..0d5dcc4b57b 100644 --- a/services/brig/src/Brig/API/User.hs +++ b/services/brig/src/Brig/API/User.hs @@ -67,7 +67,6 @@ import Brig.API.Util import Brig.App as App import Brig.Data.Activation (ActivationEvent (..), activationErrorToRegisterError) import Brig.Data.Activation qualified as Data -import Brig.Data.Client qualified as Data import Brig.Data.Connection (countConnections) import Brig.Data.Connection qualified as Data import Brig.Data.User @@ -124,6 +123,8 @@ import Wire.ActivationCodeStore import Wire.ActivationCodeStore qualified as ActivationCode import Wire.AuthenticationSubsystem (AuthenticationSubsystem, internalLookupPasswordResetCode) import Wire.BlockListStore as BlockListStore +import Wire.ClientStore (ClientStore) +import Wire.ClientStore qualified as ClientStore import Wire.DeleteQueue import Wire.EmailSubsystem import Wire.Error @@ -911,7 +912,8 @@ deleteSelfUser :: Member HashPassword r, Member RateLimit r, Member AuthenticationSubsystem r, - Member UserGroupSubsystem r + Member UserGroupSubsystem r, + Member ClientStore r ) => Local UserId -> Maybe PlainTextPassword6 -> @@ -984,7 +986,8 @@ verifyDeleteUser :: Member UserSubsystem r, Member PropertySubsystem r, Member AuthenticationSubsystem r, - Member UserGroupSubsystem r + Member UserGroupSubsystem r, + Member ClientStore r ) => VerifyDeleteUser -> ExceptT DeleteUserError (AppT r) () @@ -1013,7 +1016,8 @@ ensureAccountDeleted :: Member UserSubsystem r, Member PropertySubsystem r, Member AuthenticationSubsystem r, - Member UserGroupSubsystem r + Member UserGroupSubsystem r, + Member ClientStore r ) => Local UserId -> AppT r DeleteUserResult @@ -1024,7 +1028,7 @@ ensureAccountDeleted luid@(tUnqualified -> uid) = do Just acc -> do probs <- liftSem $ getPropertyKeys uid - clients <- wrapClient $ Data.lookupClients uid + clients <- liftSem $ ClientStore.lookupClients uid localUid <- qualifyLocal uid conCount <- wrapClient $ countConnections localUid [(minBound @Relation) .. maxBound] @@ -1063,7 +1067,8 @@ deleteAccount :: Member UserSubsystem r, Member Events r, Member AuthenticationSubsystem r, - Member UserGroupSubsystem r + Member UserGroupSubsystem r, + Member ClientStore r ) => User -> Sem r () @@ -1080,7 +1085,7 @@ deleteAccount user = do traverse_ (removeUserFromAllGroups uid) user.userTeam Intra.rmUser uid (userAssets user) - embed $ Data.lookupClients uid >>= mapM_ (Data.rmClient uid . (.clientId)) + ClientStore.lookupClients uid >>= mapM_ (ClientStore.delete uid . (.clientId)) luid <- embed $ qualifyLocal uid User.internalUpdateSearchIndex uid Events.generateUserEvent uid Nothing (UserDeleted (tUntagged luid)) diff --git a/services/brig/src/Brig/CanonicalInterpreter.hs b/services/brig/src/Brig/CanonicalInterpreter.hs index 727526a406b..84396e56068 100644 --- a/services/brig/src/Brig/CanonicalInterpreter.hs +++ b/services/brig/src/Brig/CanonicalInterpreter.hs @@ -17,7 +17,7 @@ module Brig.CanonicalInterpreter where -import Brig.AWS (amazonkaEnv) +import Brig.AWS (amazonkaEnv, prekeyTable) import Brig.App as App import Brig.DeleteQueue.Interpreter as DQ import Brig.Effects.ConnectionStore (ConnectionStore) @@ -66,6 +66,9 @@ import Wire.BackgroundJobsPublisher (BackgroundJobsPublisher) import Wire.BackgroundJobsPublisher.RabbitMQ (interpretBackgroundJobsPublisherRabbitMQ) import Wire.BlockListStore import Wire.BlockListStore.Cassandra +import Wire.ClientStore (ClientStore) +import Wire.ClientStore.Cassandra +import Wire.ClientStore.DynamoDB (OptimisticLockEnv (..)) import Wire.DeleteQueue import Wire.DomainRegistrationStore import Wire.DomainRegistrationStore.Cassandra @@ -193,6 +196,7 @@ type BrigLowerLevelEffects = DomainRegistrationStore, CryptoSign, HashPassword, + ClientStore, UserKeyStore, UserStore, IndexedUserStore, @@ -306,6 +310,20 @@ runBrigToIO e (AppT ma) = do indexName = additionalIndexName } } + clientStoreCassandraEnv = + ClientStoreCassandraEnv + { prekeyLocking = + maybe + ( Right $ + OptimisticLockEnv + { awsEnv = e.awsEnv ^. amazonkaEnv, + prekeyTable = e.awsEnv ^. Brig.AWS.prekeyTable + } + ) + Left + e.randomPrekeyLocalLock, + casClient = e.casClient + } -- These interpreters depend on each other, we use let recursion to solve that. -- @@ -363,6 +381,7 @@ runBrigToIO e (AppT ma) = do . interpretIndexedUserStoreES indexedUserStoreConfig . interpretUserStoreCassandra e.casClient . interpretUserKeyStoreCassandra e.casClient + . interpretClientStoreCassandra clientStoreCassandraEnv . runHashPassword e.settings.passwordHashingOptions . runCryptoSign . interpretDomainRegistrationStoreToCassandra e.casClient diff --git a/services/brig/src/Brig/Data/Client.hs b/services/brig/src/Brig/Data/Client.hs index 33248f0962d..8cfa9421ae4 100644 --- a/services/brig/src/Brig/Data/Client.hs +++ b/services/brig/src/Brig/Data/Client.hs @@ -26,68 +26,22 @@ module Brig.Data.Client reAuthForNewClients, addClientWithReAuthPolicy, addClient, - rmClient, - lookupClient, - lookupClients, - lookupPubClientsBulk, - lookupClientsBulk, - lookupClientIds, - lookupUsersClientIds, - updateClientLabel, - updateClientCapabilities, - updateClientLastActive, - - -- * Prekeys - claimPrekey, - updatePrekeys, - lookupPrekeyIds, - - -- * MLS public keys - addMLSPublicKeys, - lookupMLSPublicKey, ) where -import Amazonka qualified as AWS -import Amazonka.Data.Text qualified as AWS -import Amazonka.DynamoDB qualified as AWS -import Amazonka.DynamoDB.Lens qualified as AWS -import Bilge.Retry (httpHandlers) -import Brig.AWS import Brig.App -import Cassandra as C hiding (Client) -import Cassandra.Settings as C hiding (Client) import Control.Error -import Control.Exception.Lens qualified as EL -import Control.Lens -import Control.Monad.Catch -import Control.Monad.Random (randomRIO) -import Control.Retry -import Data.ByteString.Conversion (toByteString, toByteString') -import Data.ByteString.Lazy qualified as LBS -import Data.HashMap.Strict qualified as HashMap import Data.Id -import Data.Json.Util (UTCTimeMillis, toUTCTimeMillis) -import Data.Map qualified as Map +import Data.Json.Util (toUTCTimeMillis) import Data.Qualified -import Data.Set qualified as Set -import Data.Text qualified as Text -import Data.Time.Clock -import Data.UUID qualified as UUID import Imports import Polysemy (Member) -import Prometheus qualified as Prom -import System.Logger.Class (field, msg, val) -import System.Logger.Class qualified as Log -import UnliftIO (pooledMapConcurrentlyN) -import Wire.API.MLS.CipherSuite -import Wire.API.User.Auth import Wire.API.User.Client hiding (UpdateClient (..)) -import Wire.API.User.Client.Prekey -import Wire.API.UserMap (UserMap (..)) import Wire.AuthenticationSubsystem (AuthenticationSubsystem) import Wire.AuthenticationSubsystem qualified as Authentication import Wire.AuthenticationSubsystem.Error +import Wire.ClientStore (ClientStore, DuplicateMLSPublicKey (..)) +import Wire.ClientStore qualified as ClientStore data ClientDataError = TooManyClients @@ -114,7 +68,7 @@ reAuthForNewClients :: ReAuthPolicy reAuthForNewClients count upsert = count > 0 && not upsert addClient :: - (Member AuthenticationSubsystem r) => + (Member AuthenticationSubsystem r, Member ClientStore r) => Local UserId -> ClientId -> NewClient -> @@ -124,8 +78,10 @@ addClient :: addClient = addClientWithReAuthPolicy reAuthForNewClients addClientWithReAuthPolicy :: + forall r. ( MonadReader Brig.App.Env (AppT r), - Member AuthenticationSubsystem r + Member AuthenticationSubsystem r, + Member ClientStore r ) => ReAuthPolicy -> Local UserId -> @@ -135,7 +91,7 @@ addClientWithReAuthPolicy :: Maybe ClientCapabilityList -> ExceptT ClientDataError (AppT r) (Client, [Client], Word) addClientWithReAuthPolicy reAuthPolicy u newId c maxPermClients caps = do - clients <- wrapClientE $ lookupClients (tUnqualified u) + clients <- lift . liftSem $ ClientStore.lookupClients (tUnqualified u) let typed = filter ((== newClientType c) . clientType) clients let count = length typed let upsert = any exists typed @@ -145,7 +101,7 @@ addClientWithReAuthPolicy reAuthPolicy u newId c maxPermClients caps = do let capacity = fmap (+ (-count)) limit unless (maybe True (> 0) capacity || upsert) $ throwE TooManyClients - new <- wrapClientE $ insert (tUnqualified u) + new <- insert (tUnqualified u) let !total = fromIntegral (length clients + if upsert then 0 else 1) let old = maybe (filter (not . exists) typed) (const []) limit pure (new, old, total) @@ -159,447 +115,24 @@ addClientWithReAuthPolicy reAuthPolicy u newId c maxPermClients caps = do exists :: Client -> Bool exists = (==) newId . (.clientId) - insert :: (MonadClient m, MonadReader Brig.App.Env m) => UserId -> ExceptT ClientDataError m Client + insert :: UserId -> ExceptT ClientDataError (AppT r) Client insert uid = do -- Is it possible to do this somewhere else? Otherwise we could use `MonadClient` instead now <- toUTCTimeMillis <$> (liftIO =<< asks (.currentTime)) - let keys = unpackLastPrekey (newClientLastKey c) : newClientPrekeys c - updatePrekeys uid newId keys - let mdl = newClientModel c - prm = (uid, newId, now, newClientType c, newClientLabel c, newClientClass c, newClientCookie c, mdl, C.Set . Set.toList . fromClientCapabilityList <$> caps) - retry x5 $ write insertClient (params LocalQuorum prm) - addMLSPublicKeys uid newId (Map.assocs (newClientMLSPublicKeys c)) - pure $! - Client - { clientId = newId, - clientType = newClientType c, - clientTime = now, - clientClass = newClientClass c, - clientLabel = newClientLabel c, - clientCookie = newClientCookie c, - clientModel = mdl, - clientCapabilities = fromMaybe mempty caps, - clientMLSPublicKeys = mempty, - clientLastActive = Nothing - } - -lookupClient :: (MonadClient m) => UserId -> ClientId -> m (Maybe Client) -lookupClient u c = do - keys <- retry x1 (query selectMLSPublicKeys (params LocalQuorum (u, c))) - fmap (toClient keys) - <$> retry x1 (query1 selectClient (params LocalQuorum (u, c))) - -lookupClientsBulk :: (MonadClient m) => [UserId] -> m (Map UserId (Imports.Set Client)) -lookupClientsBulk uids = liftClient $ do - userClientTuples <- pooledMapConcurrentlyN 50 getClientSetWithUser uids - pure $ Map.fromList userClientTuples - where - getClientSetWithUser :: (MonadClient m) => UserId -> m (UserId, Imports.Set Client) - getClientSetWithUser u = fmap ((u,) . Set.fromList) . lookupClients $ u - -lookupPubClientsBulk :: (MonadClient m) => [UserId] -> m (UserMap (Imports.Set PubClient)) -lookupPubClientsBulk uids = liftClient $ do - userClientTuples <- pooledMapConcurrentlyN 50 getClientSetWithUser uids - pure $ UserMap $ Map.fromList userClientTuples - where - getClientSetWithUser :: (MonadClient m) => UserId -> m (UserId, Imports.Set PubClient) - getClientSetWithUser u = (u,) . Set.fromList . map toPubClient <$> executeQuery u - - executeQuery :: (MonadClient m) => UserId -> m [(ClientId, Maybe ClientClass)] - executeQuery u = retry x1 (query selectPubClients (params LocalQuorum (Identity u))) - -lookupClients :: (MonadClient m) => UserId -> m [Client] -lookupClients u = do - keys <- - (\(cid, ss, Blob b) -> (cid, [(ss, LBS.toStrict b)])) - <$$> retry x1 (query selectMLSPublicKeysByUser (params LocalQuorum (Identity u))) - let keyMap = Map.fromListWith (<>) keys - updateKeys c = - c - { clientMLSPublicKeys = - Map.fromList $ Map.findWithDefault [] c.clientId keyMap - } - updateKeys . toClient [] - <$$> retry x1 (query selectClients (params LocalQuorum (Identity u))) - -lookupClientIds :: (MonadClient m) => UserId -> m [ClientId] -lookupClientIds u = - map runIdentity - <$> retry x1 (query selectClientIds (params LocalQuorum (Identity u))) - -lookupUsersClientIds :: (MonadClient m) => [UserId] -> m [(UserId, Set.Set ClientId)] -lookupUsersClientIds us = - liftClient $ pooledMapConcurrentlyN 16 getClientIds us - where - getClientIds u = (u,) <$> fmap Set.fromList (lookupClientIds u) - -lookupPrekeyIds :: (MonadClient m) => UserId -> ClientId -> m [PrekeyId] -lookupPrekeyIds u c = - map runIdentity - <$> retry x1 (query selectPrekeyIds (params LocalQuorum (u, c))) - -rmClient :: - ( MonadClient m, - MonadReader Brig.App.Env m, - MonadCatch m - ) => - UserId -> - ClientId -> - m () -rmClient u c = do - retry x5 $ write removeClient (params LocalQuorum (u, c)) - retry x5 $ write removeClientKeys (params LocalQuorum (u, c)) - unlessM (isJust <$> asks (.randomPrekeyLocalLock)) $ deleteOptLock u c - -updateClientLabel :: (MonadClient m) => UserId -> ClientId -> Maybe Text -> m () -updateClientLabel u c l = retry x5 $ write updateClientLabelQuery (params LocalQuorum (l, u, c)) - -updateClientCapabilities :: (MonadClient m) => UserId -> ClientId -> Maybe ClientCapabilityList -> m () -updateClientCapabilities u c fs = retry x5 $ write updateClientCapabilitiesQuery (params LocalQuorum (C.Set . Set.toList . fromClientCapabilityList <$> fs, u, c)) - --- | If the update fails, which can happen if device does not exist, then ignore the error silently. -updateClientLastActive :: (MonadClient m) => UserId -> ClientId -> UTCTime -> m () -updateClientLastActive u c t = - void . retry x5 $ - trans - updateClientLastActiveQuery - (params LocalQuorum (t, u, c)) - -updatePrekeys :: (MonadClient m) => UserId -> ClientId -> [UncheckedPrekeyBundle] -> ExceptT ClientDataError m () -updatePrekeys u c pks = do - unless (all check pks) $ - throwE MalformedPrekeys - for_ pks $ \k -> do - let args = (u, c, prekeyId k, prekeyKey k) - retry x5 $ write insertClientKey (params LocalQuorum args) - where - check pk = parsePrekeyBundlePrekeyId pk == Right (prekeyId pk) - -claimPrekey :: - ( Log.MonadLogger m, - MonadMask m, - MonadClient m, - MonadReader Brig.App.Env m, - Prom.MonadMonitor m - ) => - UserId -> - ClientId -> - m (Maybe ClientPrekey) -claimPrekey u c = - asks (.randomPrekeyLocalLock) >>= \case - -- Use random prekey selection strategy - Just localLock -> withLocalLock localLock $ do - prekeys <- retry x1 $ query userPrekeys (params LocalQuorum (u, c)) - prekey <- pickRandomPrekey prekeys - removeAndReturnPreKey prekey - -- Use DynamoDB based optimistic locking strategy - Nothing -> withOptLock u c $ do - prekey <- retry x1 $ query1 userPrekey (params LocalQuorum (u, c)) - removeAndReturnPreKey prekey - where - removeAndReturnPreKey :: (MonadClient f, Log.MonadLogger f) => Maybe (PrekeyId, Text) -> f (Maybe ClientPrekey) - removeAndReturnPreKey (Just (i, k)) = do - if i /= lastPrekeyId - then retry x1 $ write removePrekey (params LocalQuorum (u, c, i)) - else - Log.debug $ - field "user" (toByteString u) - . field "client" (toByteString c) - . msg (val "last resort prekey used") - pure $ Just (ClientPrekey c (UncheckedPrekeyBundle i k)) - removeAndReturnPreKey Nothing = pure Nothing - - pickRandomPrekey :: (MonadIO f) => [(PrekeyId, Text)] -> f (Maybe (PrekeyId, Text)) - pickRandomPrekey [] = pure Nothing - -- unless we only have one key left - pickRandomPrekey [pk] = pure $ Just pk - -- pick among list of keys, except lastPrekeyId - pickRandomPrekey pks = do - let pks' = filter (\k -> fst k /= lastPrekeyId) pks - ind <- liftIO $ randomRIO (0, length pks' - 1) - pure $ atMay pks' ind - -lookupMLSPublicKey :: - (MonadClient m) => - UserId -> - ClientId -> - SignatureSchemeTag -> - m (Maybe LByteString) -lookupMLSPublicKey u c ss = - (fromBlob . runIdentity) <$$> retry x1 (query1 selectMLSPublicKey (params LocalQuorum (u, c, ss))) - -addMLSPublicKeys :: - (MonadClient m) => - UserId -> - ClientId -> - [(SignatureSchemeTag, ByteString)] -> - ExceptT ClientDataError m () -addMLSPublicKeys u c = traverse_ (uncurry (addMLSPublicKey u c)) - -addMLSPublicKey :: - (MonadClient m) => - UserId -> - ClientId -> - SignatureSchemeTag -> - ByteString -> - ExceptT ClientDataError m () -addMLSPublicKey u c ss pk = do - rows <- - trans - insertMLSPublicKeys - ( params - LocalQuorum - (u, c, ss, Blob (LBS.fromStrict pk)) - ) - { serialConsistency = Just LocalSerialConsistency - } - case rows of - [row] - | C.fromRow 0 row /= Right (Just True) -> - throwE MLSPublicKeyDuplicate - _ -> pure () - -------------------------------------------------------------------------------- --- Queries - -insertClient :: PrepQuery W (UserId, ClientId, UTCTimeMillis, ClientType, Maybe Text, Maybe ClientClass, Maybe CookieLabel, Maybe Text, Maybe (C.Set ClientCapability)) () -insertClient = "INSERT INTO clients (user, client, tstamp, type, label, class, cookie, model, capabilities) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)" - -updateClientLabelQuery :: PrepQuery W (Maybe Text, UserId, ClientId) () -updateClientLabelQuery = {- `IF EXISTS`, but that requires benchmarking -} "UPDATE clients SET label = ? WHERE user = ? AND client = ?" - -updateClientCapabilitiesQuery :: PrepQuery W (Maybe (C.Set ClientCapability), UserId, ClientId) () -updateClientCapabilitiesQuery = {- `IF EXISTS`, but that requires benchmarking -} "UPDATE clients SET capabilities = ? WHERE user = ? AND client = ?" - -updateClientLastActiveQuery :: PrepQuery W (UTCTime, UserId, ClientId) Row -updateClientLastActiveQuery = "UPDATE clients SET last_active = ? WHERE user = ? AND client = ? IF EXISTS" - -selectClientIds :: PrepQuery R (Identity UserId) (Identity ClientId) -selectClientIds = "SELECT client from clients where user = ?" - -selectClients :: PrepQuery R (Identity UserId) (ClientId, ClientType, UTCTimeMillis, Maybe Text, Maybe ClientClass, Maybe CookieLabel, Maybe Text, Maybe (C.Set ClientCapability), Maybe UTCTime) -selectClients = "SELECT client, type, tstamp, label, class, cookie, model, capabilities, last_active from clients where user = ?" - -selectPubClients :: PrepQuery R (Identity UserId) (ClientId, Maybe ClientClass) -selectPubClients = "SELECT client, class from clients where user = ?" - -selectClient :: PrepQuery R (UserId, ClientId) (ClientId, ClientType, UTCTimeMillis, Maybe Text, Maybe ClientClass, Maybe CookieLabel, Maybe Text, Maybe (C.Set ClientCapability), Maybe UTCTime) -selectClient = "SELECT client, type, tstamp, label, class, cookie, model, capabilities, last_active from clients where user = ? and client = ?" - -insertClientKey :: PrepQuery W (UserId, ClientId, PrekeyId, Text) () -insertClientKey = "INSERT INTO prekeys (user, client, key, data) VALUES (?, ?, ?, ?)" - -removeClient :: PrepQuery W (UserId, ClientId) () -removeClient = "DELETE FROM clients where user = ? and client = ?" - -removeClientKeys :: PrepQuery W (UserId, ClientId) () -removeClientKeys = "DELETE FROM prekeys where user = ? and client = ?" - -userPrekey :: PrepQuery R (UserId, ClientId) (PrekeyId, Text) -userPrekey = "SELECT key, data FROM prekeys where user = ? and client = ? LIMIT 1" - -userPrekeys :: PrepQuery R (UserId, ClientId) (PrekeyId, Text) -userPrekeys = "SELECT key, data FROM prekeys where user = ? and client = ?" - -selectPrekeyIds :: PrepQuery R (UserId, ClientId) (Identity PrekeyId) -selectPrekeyIds = "SELECT key FROM prekeys where user = ? and client = ?" - -removePrekey :: PrepQuery W (UserId, ClientId, PrekeyId) () -removePrekey = "DELETE FROM prekeys where user = ? and client = ? and key = ?" - -selectMLSPublicKey :: PrepQuery R (UserId, ClientId, SignatureSchemeTag) (Identity Blob) -selectMLSPublicKey = "SELECT key from mls_public_keys where user = ? and client = ? and sig_scheme = ?" - -selectMLSPublicKeys :: PrepQuery R (UserId, ClientId) (SignatureSchemeTag, Blob) -selectMLSPublicKeys = "SELECT sig_scheme, key from mls_public_keys where user = ? and client = ?" - -selectMLSPublicKeysByUser :: PrepQuery R (Identity UserId) (ClientId, SignatureSchemeTag, Blob) -selectMLSPublicKeysByUser = "SELECT client, sig_scheme, key from mls_public_keys where user = ?" - -insertMLSPublicKeys :: PrepQuery W (UserId, ClientId, SignatureSchemeTag, Blob) Row -insertMLSPublicKeys = - "INSERT INTO mls_public_keys (user, client, sig_scheme, key) \ - \VALUES (?, ?, ?, ?) IF NOT EXISTS" - -------------------------------------------------------------------------------- --- Conversions - -toClient :: - [(SignatureSchemeTag, Blob)] -> - ( ClientId, - ClientType, - UTCTimeMillis, - Maybe Text, - Maybe ClientClass, - Maybe CookieLabel, - Maybe Text, - Maybe (C.Set ClientCapability), - Maybe UTCTime - ) -> - Client -toClient keys (cid, cty, tme, lbl, cls, cok, mdl, cps, lastActive) = - Client - { clientId = cid, - clientType = cty, - clientTime = tme, - clientClass = cls, - clientLabel = lbl, - clientCookie = cok, - clientModel = mdl, - clientCapabilities = ClientCapabilityList $ maybe Set.empty (Set.fromList . C.fromSet) cps, - clientMLSPublicKeys = fmap (LBS.toStrict . fromBlob) (Map.fromList keys), - clientLastActive = lastActive - } - -toPubClient :: (ClientId, Maybe ClientClass) -> PubClient -toPubClient = uncurry PubClient - -------------------------------------------------------------------------------- --- Best-effort optimistic locking for prekeys via DynamoDB - -ddbClient :: Text -ddbClient = "client" - -ddbVersion :: Text -ddbVersion = "version" - -ddbKey :: UserId -> ClientId -> AWS.AttributeValue -ddbKey u c = AWS.S (UUID.toText (toUUID u) <> "." <> clientToText c) - -key :: UserId -> ClientId -> HashMap Text AWS.AttributeValue -key u c = HashMap.singleton ddbClient (ddbKey u c) - -deleteOptLock :: - ( MonadReader Brig.App.Env m, - MonadCatch m, - MonadIO m - ) => - UserId -> - ClientId -> - m () -deleteOptLock u c = do - t <- asks ((.awsEnv) <&> view prekeyTable) - e <- asks ((.awsEnv) <&> view amazonkaEnv) - void $ exec e (AWS.newDeleteItem t & AWS.deleteItem_key .~ key u c) - -withOptLock :: - forall a m. - ( MonadIO m, - MonadReader Brig.App.Env m, - Log.MonadLogger m, - Prom.MonadMonitor m - ) => - UserId -> - ClientId -> - m a -> - m a -withOptLock u c ma = go (10 :: Int) - where - go !n = do - v <- (version =<<) <$> execDyn pure get - a <- ma - r <- execDyn pure (put v) - case r of - Nothing | n > 0 -> reportAttemptFailure >> go (n - 1) - Nothing -> reportFailureAndLogError >> pure a - Just _ -> pure a - version :: AWS.GetItemResponse -> Maybe Word32 - version v = conv . HashMap.lookup ddbVersion =<< (view AWS.getItemResponse_item v) - where - conv :: Maybe AWS.AttributeValue -> Maybe Word32 - conv = \case - Just (AWS.N t) -> readMaybe $ Text.unpack t - _ -> Nothing - get :: Text -> AWS.GetItem - get t = - AWS.newGetItem t - & AWS.getItem_key .~ key u c - & AWS.getItem_consistentRead ?~ True - put :: Maybe Word32 -> Text -> AWS.PutItem - put v t = - AWS.newPutItem t - & AWS.putItem_item .~ item v - & AWS.putItem_expected ?~ check v - check :: Maybe Word32 -> HashMap Text AWS.ExpectedAttributeValue - check Nothing = HashMap.singleton ddbVersion $ AWS.newExpectedAttributeValue & AWS.expectedAttributeValue_comparisonOperator ?~ AWS.ComparisonOperator_NULL - check (Just v) = - HashMap.singleton ddbVersion $ - AWS.newExpectedAttributeValue - & AWS.expectedAttributeValue_comparisonOperator ?~ AWS.ComparisonOperator_EQ - & AWS.expectedAttributeValue_attributeValueList ?~ [toAttributeValue v] - item :: Maybe Word32 -> HashMap Text AWS.AttributeValue - item v = - HashMap.insert ddbVersion (toAttributeValue (maybe (1 :: Word32) (+ 1) v)) $ - key u c - toAttributeValue :: Word32 -> AWS.AttributeValue - toAttributeValue w = AWS.N $ AWS.toText (fromIntegral w :: Int) - reportAttemptFailure :: m () - reportAttemptFailure = Prom.incCounter optimisticLockGrabAttemptFailedCounter - reportFailureAndLogError :: m () - reportFailureAndLogError = do - Log.err $ - Log.field "user" (toByteString' u) - . Log.field "client" (toByteString' c) - . msg (val "PreKeys: Optimistic lock failed") - Prom.incCounter optimisticLockFailedCounter - execDyn :: - forall r x. - (AWS.AWSRequest r) => - (AWS.AWSResponse r -> Maybe x) -> - (Text -> r) -> - m (Maybe x) - execDyn cnv mkCmd = do - cmd <- mkCmd <$> asks ((.awsEnv) <&> view prekeyTable) - e <- asks ((.awsEnv) <&> view amazonkaEnv) - liftIO $ execDyn' e cnv cmd - where - execDyn' :: - forall y p. - (AWS.AWSRequest p) => - AWS.Env -> - (AWS.AWSResponse p -> Maybe y) -> - p -> - IO (Maybe y) - execDyn' e conv cmd = recovering policy handlers (const run) - where - run = execCatch e cmd >>= either handleErr (pure . conv) - handlers = httpHandlers ++ [const $ EL.handler_ AWS._ConditionalCheckFailedException (pure True)] - policy = limitRetries 3 <> exponentialBackoff 100000 - handleErr (AWS.ServiceError se) | se ^. AWS.serviceError_code == AWS.ErrorCode "ProvisionedThroughputExceeded" = do - Prom.incCounter dynProvisionedThroughputExceededCounter - pure Nothing - handleErr _ = pure Nothing - -withLocalLock :: (MonadMask m, MonadIO m) => MVar () -> m a -> m a -withLocalLock l ma = do - (takeMVar l *> ma) `finally` putMVar l () - -{-# NOINLINE optimisticLockGrabAttemptFailedCounter #-} -optimisticLockGrabAttemptFailedCounter :: Prom.Counter -optimisticLockGrabAttemptFailedCounter = - Prom.unsafeRegister $ - Prom.counter - Prom.Info - { Prom.metricName = "client_opt_lock_optimistic_lock_grab_attempt_failed", - Prom.metricHelp = "Number of times grab attempts for optimisitic lock on prekeys failed" - } - -{-# NOINLINE optimisticLockFailedCounter #-} -optimisticLockFailedCounter :: Prom.Counter -optimisticLockFailedCounter = - Prom.unsafeRegister $ - Prom.counter - Prom.Info - { Prom.metricName = "client_opt_lock_optimistic_lock_failed", - Prom.metricHelp = "Number of time optimisitic lock on prekeys failed" - } - -{-# NOINLINE dynProvisionedThroughputExceededCounter #-} -dynProvisionedThroughputExceededCounter :: Prom.Counter -dynProvisionedThroughputExceededCounter = - Prom.unsafeRegister $ - Prom.counter - Prom.Info - { Prom.metricName = "client_opt_lock_provisioned_throughput_exceeded", - Prom.metricHelp = "Number of times provisioned throughput on DynamoDB was exceeded" - } + mErr <- lift . liftSem $ ClientStore.upsert uid newId now (c {newClientCapabilities = caps}) + case mErr of + Just DuplicateMLSPublicKey -> throwE MLSPublicKeyDuplicate + Nothing -> + pure $! + Client + { clientId = newId, + clientType = newClientType c, + clientTime = now, + clientClass = newClientClass c, + clientLabel = newClientLabel c, + clientCookie = newClientCookie c, + clientModel = newClientModel c, + clientCapabilities = fromMaybe mempty caps, + clientMLSPublicKeys = mempty, + clientLastActive = Nothing + } diff --git a/services/brig/src/Brig/InternalEvent/Process.hs b/services/brig/src/Brig/InternalEvent/Process.hs index 14bc9dfafe9..af018889184 100644 --- a/services/brig/src/Brig/InternalEvent/Process.hs +++ b/services/brig/src/Brig/InternalEvent/Process.hs @@ -36,6 +36,7 @@ import Polysemy.TinyLog as Log import System.Logger.Class (field, msg, val, (~~)) import Wire.API.UserEvent import Wire.AuthenticationSubsystem +import Wire.ClientStore (ClientStore) import Wire.Events (Events) import Wire.NotificationSubsystem import Wire.PropertySubsystem @@ -63,7 +64,8 @@ onEvent :: Member Events r, Member AuthenticationSubsystem r, Member UserGroupSubsystem r, - Member (Concurrency Unsafe) r + Member (Concurrency Unsafe) r, + Member ClientStore r ) => InternalNotification -> Sem r () diff --git a/services/brig/src/Brig/Provider/API.hs b/services/brig/src/Brig/Provider/API.hs index a57186d6a71..78910158e75 100644 --- a/services/brig/src/Brig/Provider/API.hs +++ b/services/brig/src/Brig/Provider/API.hs @@ -121,6 +121,8 @@ import Wire.API.User.Client.Prekey qualified as Public (PrekeyId) import Wire.AuthenticationSubsystem as Authentication import Wire.AuthenticationSubsystem.Config import Wire.AuthenticationSubsystem.ZAuth qualified as ZAuth +import Wire.ClientStore (ClientStore) +import Wire.ClientStore qualified as ClientStore import Wire.DeleteQueue import Wire.EmailSending (EmailSending) import Wire.Error @@ -154,7 +156,8 @@ botAPI :: Member UserStore r, Member (Embed HttpClientIO) r, Member UserSubsystem r, - Member (Input (Local ())) r + Member (Input (Local ())) r, + Member ClientStore r ) => ServerT BotAPI (Handler r) botAPI = @@ -183,7 +186,8 @@ servicesAPI :: Member (Embed HttpClientIO) r, Member UserStore r, Member (Input (Local ())) r, - Member UserSubsystem r + Member UserSubsystem r, + Member ClientStore r ) => ServerT ServicesAPI (Handler r) servicesAPI = @@ -584,7 +588,8 @@ finishDeleteService :: Member (Embed HttpClientIO) r, Member (Concurrency Unsafe) r, Member (Input (Local ())) r, - Member UserSubsystem r + Member UserSubsystem r, + Member ClientStore r ) => ProviderId -> ServiceId -> @@ -698,7 +703,8 @@ updateServiceWhitelist :: Member (Embed HttpClientIO) r, Member UserStore r, Member (Input (Local ())) r, - Member UserSubsystem r + Member UserSubsystem r, + Member ClientStore r ) => UserId -> ConnId -> @@ -754,7 +760,8 @@ addBot :: Member CryptoSign r, Member UserStore r, Member UserSubsystem r, - Member (Input (Local ())) r + Member (Input (Local ())) r, + Member ClientStore r ) => UserId -> ConnId -> @@ -880,7 +887,8 @@ removeBot :: Member (Embed HttpClientIO) r, Member UserStore r, Member (Input (Local ())) r, - Member UserSubsystem r + Member UserSubsystem r, + Member ClientStore r ) => UserId -> ConnId -> ConvId -> BotId -> Handler r (Maybe Public.RemoveBotResponse) removeBot zusr zcon cid bid = do @@ -916,34 +924,35 @@ botGetSelf bot = do p <- fmap listToMaybe . lift . liftSem $ User.getAccountsBy getBy maybe (throwStd (errorToWai @'E.UserNotFound)) (\u -> pure $ Public.mkUserProfile EmailVisibleToSelf UserTypeBot u UserLegalHoldNoConsent) p -botGetClient :: (Member GalleyAPIAccess r) => BotId -> (Handler r) (Maybe Public.Client) +botGetClient :: (Member GalleyAPIAccess r, Member ClientStore r) => BotId -> (Handler r) (Maybe Public.Client) botGetClient bot = do guardSecondFactorDisabled (Just (botUserId bot)) - lift $ listToMaybe <$> wrapClient (User.lookupClients (botUserId bot)) + lift $ listToMaybe <$> liftSem (ClientStore.lookupClients (botUserId bot)) -botListPrekeys :: (Member GalleyAPIAccess r) => BotId -> (Handler r) [Public.PrekeyId] +botListPrekeys :: (Member GalleyAPIAccess r, Member ClientStore r) => BotId -> (Handler r) [Public.PrekeyId] botListPrekeys bot = do guardSecondFactorDisabled (Just (botUserId bot)) - clt <- lift $ listToMaybe <$> wrapClient (User.lookupClients (botUserId bot)) + clt <- lift $ listToMaybe <$> liftSem (ClientStore.lookupClients (botUserId bot)) case (.clientId) <$> clt of Nothing -> pure [] - Just ci -> lift (wrapClient $ User.lookupPrekeyIds (botUserId bot) ci) + Just ci -> lift . liftSem $ ClientStore.lookupPrekeyIds (botUserId bot) ci -botUpdatePrekeys :: (Member GalleyAPIAccess r) => BotId -> Public.UpdateBotPrekeys -> (Handler r) () +botUpdatePrekeys :: (Member GalleyAPIAccess r, Member ClientStore r) => BotId -> Public.UpdateBotPrekeys -> (Handler r) () botUpdatePrekeys bot upd = do guardSecondFactorDisabled (Just (botUserId bot)) - clt <- lift $ listToMaybe <$> wrapClient (User.lookupClients (botUserId bot)) + clt <- lift $ listToMaybe <$> liftSem (ClientStore.lookupClients (botUserId bot)) case clt of Nothing -> throwStd (errorToWai @'E.ClientNotFound) Just c -> do let pks = updateBotPrekeyList upd - wrapClientE (User.updatePrekeys (botUserId bot) c.clientId pks) !>> clientDataError + (lift . liftSem $ ClientStore.updatePrekeys (botUserId bot) c.clientId pks) !>> clientDataError botClaimUsersPrekeys :: ( Member (Concurrency 'Unsafe) r, Member GalleyAPIAccess r, Member DeleteQueue r, - Member AuthenticationSubsystem r + Member AuthenticationSubsystem r, + Member ClientStore r ) => BotId -> Public.UserClients -> @@ -969,10 +978,10 @@ botListUserProfiles _ uids = do } pure (map mkBotUserView us) -botGetUserClients :: (Member GalleyAPIAccess r) => BotId -> UserId -> (Handler r) [Public.PubClient] +botGetUserClients :: (Member GalleyAPIAccess r, Member ClientStore r) => BotId -> UserId -> (Handler r) [Public.PubClient] botGetUserClients _ uid = do guardSecondFactorDisabled (Just uid) - lift $ pubClient <$$> wrapClient (User.lookupClients uid) + lift $ pubClient <$$> liftSem (ClientStore.lookupClients uid) where pubClient c = Public.PubClient c.clientId c.clientClass @@ -981,7 +990,8 @@ botDeleteSelf :: Member (Embed HttpClientIO) r, Member UserStore r, Member UserSubsystem r, - Member (Input (Local ())) r + Member (Input (Local ())) r, + Member ClientStore r ) => BotId -> ConvId -> Handler r () botDeleteSelf bid cid = do @@ -1021,7 +1031,7 @@ activate pid old new = do wrapClientE $ DB.insertKey pid (mkEmailKey <$> old) emailKey deleteBot :: - (Member (Embed HttpClientIO) r, Member UserStore r, Member (Input (Local ())) r, Member UserSubsystem r) => + (Member (Embed HttpClientIO) r, Member UserStore r, Member (Input (Local ())) r, Member UserSubsystem r, Member ClientStore.ClientStore r) => UserId -> Maybe ConnId -> BotId -> @@ -1034,7 +1044,7 @@ deleteBot zusr zcon bid cid = do let buid = botUserId bid getBy <- qualifyLocal' $ getByNoFilters {getByUserId = [buid], includePendingInvitations = NoPendingInvitations} mbUser <- listToMaybe <$> User.getAccountsBy getBy - embed $ User.lookupClients buid >>= mapM_ (User.rmClient buid . (.clientId)) + ClientStore.lookupClients buid >>= mapM_ (ClientStore.delete buid . (.clientId)) for_ (userService =<< mbUser) $ \sref -> do let pid = sref ^. serviceRefProvider sid = sref ^. serviceRefId diff --git a/services/brig/src/Brig/User/Auth.hs b/services/brig/src/Brig/User/Auth.hs index 847e93114a0..d267ec891cf 100644 --- a/services/brig/src/Brig/User/Auth.hs +++ b/services/brig/src/Brig/User/Auth.hs @@ -38,7 +38,6 @@ import Brig.API.Types import Brig.API.User (changeSingleAccountStatus) import Brig.App import Brig.Budget -import Brig.Data.Client import Brig.Options qualified as Opt import Brig.Types.Intra import Brig.User.Auth.Cookie @@ -75,6 +74,8 @@ import Wire.AuthenticationSubsystem import Wire.AuthenticationSubsystem qualified as Authentication import Wire.AuthenticationSubsystem.Config import Wire.AuthenticationSubsystem.ZAuth qualified as ZAuth +import Wire.ClientStore (ClientStore) +import Wire.ClientStore qualified as ClientStore import Wire.Events (Events) import Wire.GalleyAPIAccess (GalleyAPIAccess) import Wire.GalleyAPIAccess qualified as GalleyAPIAccess @@ -234,7 +235,8 @@ renewAccess :: Member Now r, Member AuthenticationSubsystem r, Member Random r, - Member UserStore r + Member UserStore r, + Member ClientStore r ) => NE.NonEmpty (ZAuth.Token u) -> Maybe (ZAuth.Token a) -> @@ -242,7 +244,7 @@ renewAccess :: ExceptT ZAuth.Failure (AppT r) (Access u) renewAccess uts at mcid = do (uid, ck) <- validateTokens uts at - wrapClientE $ traverse_ (checkClientId uid) mcid + traverse_ (checkClientId uid) mcid lift . liftSem . Log.debug $ field "user" (toByteString uid) . field "action" (val "User.renewAccess") catchSuspendInactiveUser uid ZAuth.Expired mapExceptT liftSem $ do @@ -507,6 +509,6 @@ assertLegalHoldEnabled tid = do FeatureStatusDisabled -> throwE LegalHoldLoginLegalHoldNotEnabled FeatureStatusEnabled -> pure () -checkClientId :: (MonadClient m) => UserId -> ClientId -> ExceptT ZAuth.Failure m () +checkClientId :: (Member ClientStore r) => UserId -> ClientId -> ExceptT ZAuth.Failure (AppT r) () checkClientId uid cid = - lookupClient uid cid >>= maybe (throwE ZAuth.Invalid) (const (pure ())) + lift (liftSem (ClientStore.lookupClient uid cid)) >>= maybe (throwE ZAuth.Invalid) (const (pure ()))