Skip to content

Commit 7e1240e

Browse files
authored
Add --skip-dids to Hepa (#1353)
2 parents c55a189 + 14c2c14 commit 7e1240e

1 file changed

Lines changed: 13 additions & 0 deletions

File tree

automod/consumer/firehose.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ type FirehoseConsumer struct {
3535
Engine *automod.Engine
3636
Host string
3737

38+
// if set, events from these DIDs will be silently skipped
39+
SkipDIDs map[string]struct{}
40+
3841
// TODO: prefilter record collections; or predicate function?
3942
// TODO: enable/disable event types; or predicate function?
4043

@@ -80,13 +83,19 @@ func (fc *FirehoseConsumer) Run(ctx context.Context) error {
8083
},
8184
RepoIdentity: func(evt *comatproto.SyncSubscribeRepos_Identity) error {
8285
atomic.StoreInt64(&fc.lastSeq, evt.Seq)
86+
if _, skip := fc.SkipDIDs[evt.Did]; skip {
87+
return nil
88+
}
8389
if err := fc.Engine.ProcessIdentityEvent(context.Background(), *evt); err != nil {
8490
fc.Logger.Error("processing repo identity failed", "did", evt.Did, "seq", evt.Seq, "err", err)
8591
}
8692
return nil
8793
},
8894
RepoAccount: func(evt *comatproto.SyncSubscribeRepos_Account) error {
8995
atomic.StoreInt64(&fc.lastSeq, evt.Seq)
96+
if _, skip := fc.SkipDIDs[evt.Did]; skip {
97+
return nil
98+
}
9099
if err := fc.Engine.ProcessAccountEvent(context.Background(), *evt); err != nil {
91100
fc.Logger.Error("processing repo account failed", "did", evt.Did, "seq", evt.Seq, "err", err)
92101
}
@@ -130,6 +139,10 @@ func (fc *FirehoseConsumer) HandleRepoCommit(ctx context.Context, evt *comatprot
130139
return nil
131140
}
132141

142+
if _, skip := fc.SkipDIDs[evt.Repo]; skip {
143+
return nil
144+
}
145+
133146
did, err := syntax.ParseDID(evt.Repo)
134147
if err != nil {
135148
logger.Error("bad DID syntax in event", "err", err)

0 commit comments

Comments
 (0)