Skip to content

Commit 6791c1d

Browse files
committed
feat: Listener outbox + Lots of fixes
1 parent b27e354 commit 6791c1d

File tree

4 files changed

+122
-26
lines changed

4 files changed

+122
-26
lines changed

listener_creation.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,20 @@ import (
55
"sync"
66

77
"github.com/Liphium/neogate"
8+
"github.com/bytedance/sonic"
89
"github.com/dgraph-io/ristretto/v2"
910
)
1011

11-
type DatabaseListenerCreate[C Change[C]] struct {
12+
type DatabaseListenerCreate[DB any, C Change[C]] struct {
1213
Identifier string // Identifier for the listener (REQUIRED)
13-
Get func([]string) (map[string]C, error) // Get the base data from results of listeners or just with key (required)
14+
Get func(DB, []string) (map[string]C, error) // Get the base data from results of listeners or just with key (required)
1415
ToEvent func(key string, change Change[C]) neogate.Event // Should convert string and change info into an event that can be sent with Neo (required)
15-
FromEvent func(key string, encodedEvent []byte) Change[C] // Should convert an event back to a change and its key
1616

1717
PoolConfig PoolConfig // Config for the pooling of subscription workers
1818
}
1919

2020
// Helper function for initializing a new listener dictionary properly
21-
func NewListenerDictionary[T any, PS IPubSubBackend, DB any, C Change[C]](instance *Instance[T, PS], outbox *PubSubOutbox[DB, PS], create DatabaseListenerCreate[C]) *DatabaseListenerDictionary[T, PS, DB, C] {
21+
func NewListenerDictionary[T any, PS IPubSubBackend, DB any, C Change[C]](instance *Instance[T, PS], outbox *PubSubOutbox[DB, PS], create DatabaseListenerCreate[DB, C]) *DatabaseListenerDictionary[T, PS, DB, C] {
2222
subDict, err := ristretto.NewCache(&ristretto.Config[string, *ListenerSubscriptions[T, PS, C]]{
2323
MaxCost: 10_000, // Maximum 10.000 stored items
2424
NumCounters: 10_000 * 10, // 10x what we want to store
@@ -42,9 +42,12 @@ func NewListenerDictionary[T any, PS IPubSubBackend, DB any, C Change[C]](instan
4242

4343
// Create the pool to forward messages to the dictionary
4444
dictionary.pool.OnMessage(func(channel, message string) {
45-
key := dictionary.channelToKey(channel)
46-
event := create.FromEvent(key, []byte(message))
47-
dictionary.onChange(dictionary.channelToKey(channel), event)
45+
var change C
46+
if err := sonic.UnmarshalString(message, &change); err != nil {
47+
Log.Println("ERROR: Couln't process event received through pubsub ("+create.Identifier+"):", err)
48+
return
49+
}
50+
dictionary.onChange(dictionary.channelToKey(channel), change)
4851
})
4952

5053
// Just print warning when an error happens in a channel for now

listener_dictionary.go

Lines changed: 56 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,8 @@ type DatabaseListenerDictionary[T any, PS IPubSubBackend, DB any, C Change[C]] s
2323
// Dictionary for managing the subscriptions by key
2424
subDict *ristretto.Cache[string, *ListenerSubscriptions[T, PS, C]]
2525

26-
get func([]string) (map[string]C, error)
26+
get func(DB, []string) (map[string]C, error)
2727
toEvent func(string, Change[C]) neogate.Event
28-
toChange func(neogate.Event) Change[C]
2928
createMutex *sync.Mutex
3029
pool *SubPool[PS]
3130
}
@@ -48,7 +47,7 @@ func (ld *DatabaseListenerDictionary[T, PS, DB, C]) channelToKey(channel string)
4847
return values[2]
4948
}
5049

51-
func DictionarySubscribe[T any, PS IPubSubBackend, DB any, C Change[C], S Subscription[C]](ld *DatabaseListenerDictionary[T, PS, DB, C], keys []string, identifier string, subscription S) error {
50+
func DictionarySubscribe[T any, PS IPubSubBackend, DB any, C Change[C], S Subscription[C]](ld *DatabaseListenerDictionary[T, PS, DB, C], db DB, keys []string, identifier string, subscription S) error {
5251

5352
// Find all listeners that are not already available
5453
nonCached := []string{}
@@ -66,12 +65,12 @@ func DictionarySubscribe[T any, PS IPubSubBackend, DB any, C Change[C], S Subscr
6665
}
6766

6867
// Create subscriptions so that they can start receiving stuff already
69-
return createSubscriptions(ld, keys, identifier, subscription)
68+
return createSubscriptions(ld, db, keys, identifier, subscription)
7069
}
7170

7271
// Create subscriptions in the ListenerDictionary
7372
// TODO(unbreathable): How do we fix broken subscriptions in case an error is returned below subscription creation?
74-
func createSubscriptions[T any, PS IPubSubBackend, DB any, C Change[C], S Subscription[C]](ld *DatabaseListenerDictionary[T, PS, DB, C], keys []string, identifier string, subscription S) error {
73+
func createSubscriptions[T any, PS IPubSubBackend, DB any, C Change[C], S Subscription[C]](ld *DatabaseListenerDictionary[T, PS, DB, C], db DB, keys []string, identifier string, subscription S) error {
7574
ctx := context.Background()
7675

7776
toGet := []string{}
@@ -100,7 +99,7 @@ func createSubscriptions[T any, PS IPubSubBackend, DB any, C Change[C], S Subscr
10099
}
101100

102101
// Get the base data for all listeners that were created
103-
results, err := ld.Get(toGet)
102+
results, err := ld.Get(db, toGet)
104103
if err != nil {
105104
return fmt.Errorf("couldn't get from base data: %v", err)
106105
}
@@ -119,9 +118,41 @@ func createSubscriptions[T any, PS IPubSubBackend, DB any, C Change[C], S Subscr
119118
return nil
120119
}
121120

121+
// Reset all values for the keys back to the original value by re-getting them and pushing that update
122+
func (ld *DatabaseListenerDictionary[T, PS, DB, C]) Reset(db DB, keys []string) error {
123+
results, err := ld.get(db, keys)
124+
if err != nil {
125+
return err
126+
}
127+
128+
// Build all of the messages for the outbox
129+
messages := make([]OutboxMessage, len(keys))
130+
for i, key := range keys {
131+
change, ok := results[key]
132+
if !ok {
133+
return fmt.Errorf("couldn't find result for key %s", key)
134+
}
135+
136+
// Marshal the event for the reset
137+
event := ld.toEvent(key, change)
138+
bytes, err := sonic.Marshal(event)
139+
if err != nil {
140+
return err
141+
}
142+
143+
messages[i] = OutboxMessage{
144+
Identifier: ld.keyToChannel(key),
145+
Data: bytes,
146+
}
147+
}
148+
149+
// Publish all the messages to the outbox
150+
return ld.outbox.save(db, messages)
151+
}
152+
122153
// Get the value for keys from the listener dictionary (makes sure we can add batching in the future)
123-
func (ld *DatabaseListenerDictionary[T, PS, DB, C]) Get(keys []string) (map[string]C, error) {
124-
return ld.get(keys)
154+
func (ld *DatabaseListenerDictionary[T, PS, DB, C]) Get(db DB, keys []string) (map[string]C, error) {
155+
return ld.get(db, keys)
125156
}
126157

127158
// Handle a change for a specific key
@@ -131,16 +162,28 @@ func (ld *DatabaseListenerDictionary[T, PS, DB, C]) onChange(key string, change
131162
}
132163
}
133164

165+
// Package a key and change for the outbox
166+
func (ld *DatabaseListenerDictionary[T, PS, DB, C]) packageForOutbox(key string, change Change[C]) (OutboxMessage, error) {
167+
var message OutboxMessage
168+
bytes, err := sonic.Marshal(change)
169+
if err != nil {
170+
return message, err
171+
}
172+
message = OutboxMessage{
173+
Identifier: ld.keyToChannel(key),
174+
Data: bytes,
175+
}
176+
return message, nil
177+
}
178+
134179
// Save a change to the outbox, makes sure all of this stays transactional
135180
func (ld *DatabaseListenerDictionary[T, PS, DB, C]) Save(db DB, key string, change Change[C]) error {
136-
event := ld.toEvent(key, change)
137-
bytes, err := sonic.Marshal(event)
181+
message, err := ld.packageForOutbox(key, change)
138182
if err != nil {
139183
return err
140184
}
141185

142-
return ld.outbox.save(db, OutboxMessage{
143-
Identifier: ld.keyToChannel(key),
144-
Data: bytes,
186+
return ld.outbox.save(db, []OutboxMessage{
187+
message,
145188
})
146189
}

listener_dictionary_outbox.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package hydro
2+
3+
// Ignore, needed for type check below
4+
type s struct{}
5+
6+
func (d s) Stack(s Change[s]) Change[s] {
7+
return d
8+
}
9+
10+
// For making sure the thing actually implements the interface
11+
var _ PackedMessage = dbldOutboxMessage[any, IPubSubBackend, any, s]{}
12+
13+
type dbldOutboxMessage[T any, PS IPubSubBackend, DB any, C Change[C]] struct {
14+
dict *DatabaseListenerDictionary[T, PS, DB, C]
15+
key string
16+
change C
17+
}
18+
19+
func (m dbldOutboxMessage[T, PS, DB, C]) convertToOutbox() (OutboxMessage, error) {
20+
return m.dict.packageForOutbox(m.key, m.change)
21+
}
22+
23+
func (ld *DatabaseListenerDictionary[T, PS, DB, C]) ForOutbox(key string, change C) dbldOutboxMessage[T, PS, DB, C] {
24+
return dbldOutboxMessage[T, PS, DB, C]{
25+
dict: ld,
26+
key: key,
27+
change: change,
28+
}
29+
}

pubsub_outbox.go

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package hydro
22

33
import (
44
"context"
5+
"errors"
56
"time"
67
)
78

@@ -20,7 +21,7 @@ type PubSubOutbox[DB any, PS IPubSubBackend] struct {
2021

2122
closeChan chan struct{} // For closing the outbox
2223

23-
save func(database DB, message OutboxMessage) error
24+
save func(database DB, messages []OutboxMessage) error
2425
}
2526

2627
type OutboxMessage struct {
@@ -34,7 +35,7 @@ type OutboxCreate[DB any] struct {
3435
WaitDuration time.Duration
3536

3637
// This function should save an event with an identifier to the database.
37-
Save func(database DB, message OutboxMessage) error
38+
Save func(database DB, messages []OutboxMessage) error
3839

3940
// This is the main function handling the pulling of messages for the outbox.
4041
//
@@ -80,7 +81,7 @@ func NewOutbox[T any, DB any, PS IPubSubBackend](instance *Instance[T, PS], conn
8081

8182
// Send the encoded event to pub/sub
8283
err := outbox.backend.Publish(context.Background(), message.Identifier, string(message.Data))
83-
if err != nil {
84+
if err != nil && !errors.Is(err, ErrChannelNotRegistered) {
8485
// On publish error, use exponential backoff
8586
backoff *= 2
8687
if backoff > maxBackoff {
@@ -101,9 +102,11 @@ func NewOutbox[T any, DB any, PS IPubSubBackend](instance *Instance[T, PS], conn
101102

102103
// Save an event to the outbox. Use this for transactional pub/sub using the database.
103104
func (o *PubSubOutbox[DB, PS]) Save(db DB, identifier string, data []byte) error {
104-
return o.save(db, OutboxMessage{
105-
Identifier: identifier,
106-
Data: data,
105+
return o.save(db, []OutboxMessage{
106+
{
107+
Identifier: identifier,
108+
Data: data,
109+
},
107110
})
108111
}
109112

@@ -121,3 +124,21 @@ func (o *PubSubOutbox[DB, PS]) CreateWorker() ISubWorker {
121124
func (o *PubSubOutbox[DB, PS]) Close() {
122125
o.closeChan <- struct{}{}
123126
}
127+
128+
type PackedMessage interface {
129+
convertToOutbox() (OutboxMessage, error)
130+
}
131+
132+
// Method for inserting multiple things into the outbox with one query
133+
func (o *PubSubOutbox[DB, PS]) SaveMultiple(db DB, sends []PackedMessage) error {
134+
messages := make([]OutboxMessage, len(sends))
135+
for i, send := range sends {
136+
result, err := send.convertToOutbox()
137+
if err != nil {
138+
return err
139+
}
140+
messages[i] = result
141+
}
142+
143+
return o.save(db, messages)
144+
}

0 commit comments

Comments
 (0)