-
Notifications
You must be signed in to change notification settings - Fork 13
Expand file tree
/
Copy pathtest.go
More file actions
118 lines (99 loc) · 2.68 KB
/
test.go
File metadata and controls
118 lines (99 loc) · 2.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package main
import (
"bytes"
"context"
"fmt"
"log/slog"
"net/http"
"net/url"
"strings"
"github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/atproto/syntax"
"github.com/bluesky-social/indigo/events"
"github.com/bluesky-social/indigo/events/schedulers/parallel"
atp "github.com/bluesky-social/indigo/atproto/repo"
lexutil "github.com/bluesky-social/indigo/lex/util"
"github.com/bluesky-social/indigo/repomgr"
"github.com/gorilla/websocket"
)
func main() {
runFirehoseConsumer("ws://localhost:8080")
}
func runFirehoseConsumer(relayHost string) error {
dialer := websocket.DefaultDialer
u, err := url.Parse("wss://cocoon.hailey.at")
if err != nil {
return fmt.Errorf("invalid relayHost: %w", err)
}
u.Path = "xrpc/com.atproto.sync.subscribeRepos"
conn, _, err := dialer.Dial(u.String(), http.Header{
"User-Agent": []string{"cocoon-test/0.0.0"},
})
if err != nil {
return fmt.Errorf("subscribing to firehose failed (dialing): %w", err)
}
rsc := &events.RepoStreamCallbacks{
RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error {
fmt.Println(evt.Repo)
return handleRepoCommit(evt)
},
RepoIdentity: func(evt *atproto.SyncSubscribeRepos_Identity) error {
fmt.Println(evt.Did, evt.Handle)
return nil
},
}
var scheduler events.Scheduler
parallelism := 700
scheduler = parallel.NewScheduler(parallelism, 1000, relayHost, rsc.EventHandler)
return events.HandleRepoStream(context.TODO(), conn, scheduler, slog.Default())
}
func splitRepoPath(path string) (syntax.NSID, syntax.RecordKey, error) {
parts := strings.SplitN(path, "/", 3)
if len(parts) != 2 {
return "", "", fmt.Errorf("invalid record path: %s", path)
}
collection, err := syntax.ParseNSID(parts[0])
if err != nil {
return "", "", err
}
rkey, err := syntax.ParseRecordKey(parts[1])
if err != nil {
return "", "", err
}
return collection, rkey, nil
}
func handleRepoCommit(evt *atproto.SyncSubscribeRepos_Commit) error {
if evt.TooBig {
return nil
}
did, err := syntax.ParseDID(evt.Repo)
if err != nil {
panic(err)
}
_, rr, err := atp.LoadRepoFromCAR(context.TODO(), bytes.NewReader(evt.Blocks))
if err != nil {
panic(err)
}
for _, op := range evt.Ops {
collection, rkey, err := splitRepoPath(op.Path)
if err != nil {
panic(err)
}
ek := repomgr.EventKind(op.Action)
go func() {
switch ek {
case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord:
recordCBOR, rc, err := rr.GetRecordBytes(context.TODO(), collection, rkey)
if err != nil {
panic(err)
}
if op.Cid == nil || rc == nil || lexutil.LexLink(*rc) != *op.Cid {
panic("nocid")
}
_ = recordCBOR
_ = did
}
}()
}
return nil
}