Skip to content

Commit 4bec712

Browse files
authored
Tap: prioritize pending repos first (#1329)
When picking up "resync" jobs, process them in the following order: pending, desynchronized, error. In other words: - focus on getting out of "backfill" mode - then focus on fixing repos that have gotten desynchronized but that we haven't failed to resync - only after all those are done, go back and try to fix repos that had an error during resync. By default, tap was selecting desyncrhonized repos first which dramatically reduced tap's throughput and made it look like tap had stopped crawling the full network. I believe this addresses the issue from: #1315
2 parents a86f3ae + 09c5c95 commit 4bec712

1 file changed

Lines changed: 17 additions & 2 deletions

File tree

cmd/tap/resyncer.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,19 +98,34 @@ func (r *Resyncer) claimResyncJob(ctx context.Context) (string, bool, error) {
9898
r.claimJobMu.Lock()
9999
defer r.claimJobMu.Unlock()
100100

101+
// prioritize pending repos first before moving on to desyncrhonized
102+
// only reprocess repos that errored after resyncing all other repos
103+
for _, state := range []models.RepoState{models.RepoStatePending, models.RepoStateDesynchronized, models.RepoStateError} {
104+
did, found, err := r.claimResyncJobOfState(ctx, state)
105+
if err != nil {
106+
return "", false, err
107+
}
108+
if found {
109+
return did, true, nil
110+
}
111+
}
112+
return "", false, nil
113+
}
114+
115+
func (r *Resyncer) claimResyncJobOfState(ctx context.Context, state models.RepoState) (string, bool, error) {
101116
var did string
102117
now := time.Now().Unix()
103118
result := r.db.WithContext(ctx).Raw(`
104119
UPDATE repos
105120
SET state = ?
106121
WHERE did = (
107122
SELECT did FROM repos
108-
WHERE state IN (?, ?, ?)
123+
WHERE state = ?
109124
AND (retry_after = 0 OR retry_after < ?)
110125
LIMIT 1
111126
)
112127
RETURNING did
113-
`, models.RepoStateResyncing, models.RepoStatePending, models.RepoStateDesynchronized, models.RepoStateError, now).Scan(&did)
128+
`, models.RepoStateResyncing, state, now).Scan(&did)
114129
if result.Error != nil {
115130
return "", false, result.Error
116131
}

0 commit comments

Comments
 (0)