fix(redis_admin): restore pre-#899 pipeline-LSET clear; drop fragile verify guard#908
Open
thomasrockhu-codecov wants to merge 3 commits into
Open
fix(redis_admin): restore pre-#899 pipeline-LSET clear; drop fragile verify guard#908thomasrockhu-codecov wants to merge 3 commits into
thomasrockhu-codecov wants to merge 3 commits into
Conversation
…verify guard Restore PR #895's `_execute_celery_clear` pattern (pipelined `LSET key idx tombstone` per chunk via `pipeline(transaction=False).execute(raise_on_error=False)`) inside `_streaming_celery_clear`, replacing the per-match LINDEX / verify / LSET sequence introduced by PR #899. PR #899 retains its streaming architecture (we still walk past the display-limit cap to drain deep filters); this PR only drops the verify step. Trade-off (operator directive): > I'm ok if we lose a few tasks along the way and we make secondary > passes, but I do care that if I'm trying to remove 50% of the > queue, that there is a significant drop. Verify-before-LSET turned every pipelined LSET into LINDEX + LSET RTTs per match and, on a busy `bundles_analysis` broker (78k matches with consumers actively BLPOPing), pushed the LSET success rate to ~3% (`matched=2038`, `drift=77475`). The operator saw the queue stay full because most matches were silently swallowed by the drift counter. Pipelined unconditional LSET trades this safety guard for actual queue drain: the bounded data-loss surface area (consumer pops between LRANGE snapshot and pipelined LSET shift indexes; our LSET overwrites the new occupant of the target slot, which then gets LREM'd) is documented in the docstring and explicitly accepted. Same change set: * `_CELERY_MAX_PASSES` 3 → 10. The `prev_lset == matches_lset` plateau exit is removed; on a busy queue it fired against legitimate per-pass progress (each pass cleared ~the same number of matches). Convergence is now bounded by `matches_found == 0` plus the pass cap. * `keep_one` early-exit retained. With verify gone, `matches_drifted` only counts genuine LSET out-of-range failures, so `keep_one and matches_drifted == 0` still means "non-keeper matches all tombstoned, queue converged". * Per-pass diagnostic logging: redis_admin.streaming_clear: queue=<q> pass=<n> depth=<d> redis_admin.streaming_clear: queue=<q> pass=<n> found=<f> lset=<l> drifted=<dr> pass_first_kept=<bool> redis_admin.streaming_clear: queue=<q> pass=<n> lrem_removed=<r> On a quiet queue `lrem_removed` should equal `lset`, giving on-call a single grep that ties `LSET → LREM → LLEN delta` together. * Docstring documents the data-loss surface area and references PR #895 / #899 for the audit trail. Tests: * New `test_streaming_clear_drops_50pct_of_queue_in_first_pass` in `test_celery_broker_clear_job.py`: seeds 100 envelopes (50 matching, 50 not), runs the chunked clear job with no concurrency, asserts `LLEN` drops from 100 to 50 and that every survivor parses as a non-matching envelope. * New `test_streaming_clear_makes_progress_under_concurrent_drift`: monkey-patches `LINDEX` to always return non-matching bytes (the worst-case scenario the verify-before-LSET path was designed to catch); confirms the pipelined path tombstones every match and drains the queue to zero, because LINDEX is no longer in the loop. * Removed `test_streaming_clear_verify_before_lset_skips_drifted_entry` in `test_celery_broker_queue.py` — it asserted a semantic that no longer exists. Operator one-liner to verify the LSET → LREM round-trip in a live broker connection (paste into `python manage.py shell_plus`): from redis_admin import conn as _c r = _c.get_connection(kind="broker") before = r.llen("bundles_analysis") r.rpush("bundles_analysis", "__test_diag_value__") removed = r.lrem("bundles_analysis", 0, "__test_diag_value__") print(f"before={before} removed={removed} after={r.llen('bundles_analysis')}")
…ueue
`CeleryBrokerClearByFilterViewTest`'s `test_clear_all_*` /
`test_clear_keep_one_*` / `test_legacy_confirm_*` /
`test_task_name_*` tests were racing the chunked clear's daemon
worker. The view returns 302 the moment `start_celery_broker_clear_
job` spawns the thread, but the LSET / LREM that actually drains
the queue runs in that worker; the tests were asserting on
`self.redis.lrange("celery", ...)` immediately and getting the
pre-clear queue back when the runner was a few ms slower than
usual.
Add a `_post_clear` helper on the test class that submits the POST,
parses the redirect URL's `<job_id>` segment, and `thread.join`s
the matching worker thread (10s timeout, with a fallback to joining
every entry in `_clear_job_threads` if the redirect target isn't a
progress page). All six racy tests are now driven through this
helper.
Surfaced when the pre-#899 pipeline-LSET restoration in this
branch tightened the worker's per-pass timing enough for the race
to bite reliably in CI.
Contributor
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #908 +/- ##
=======================================
Coverage 91.89% 91.89%
=======================================
Files 1316 1316
Lines 50586 50590 +4
Branches 1625 1625
=======================================
+ Hits 46485 46491 +6
+ Misses 3795 3793 -2
Partials 306 306
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
Codecov Report✅ All modified and coverable lines are covered by tests. 📢 Thoughts on this report? Let us know! |
The pipelined `LSET` reply loop in `_streaming_celery_clear` counts every `Exception` reply (out-of-range — consumer drained the slot mid-clear) as `matches_drifted`. That branch was the only patch line left uncovered after this PR's restoration (codecov reported 85% patch coverage, target 90%, 3 missing lines). Add `test_streaming_clear_counts_pipelined_lset_out_of_range_ as_drift`: wraps `redis.pipeline()` to `LTRIM` the queue down to half its length right before the pipeline flush, so every LSET against an index past the new tail comes back as an out-of-range Exception reply. Asserts `total_lset == 25` / `total_drifted == 25` against a 50-match seed, exercising the drift counter directly without monkey-patching individual replies (which would diverge from real fakeredis pipeline semantics). Also tag the `(TypeError, ValueError)` fallback around the `int(removed or 0)` cast on `LREM`'s return as `# pragma: no cover - defensive`. Real Redis `LREM` always returns int; that branch only fires for misbehaving test doubles and isn't worth a contrived test.
calvin-codecov
approved these changes
May 2, 2026
thomasrockhu-codecov
added a commit
that referenced
this pull request
May 2, 2026
thomasrockhu-codecov
added a commit
that referenced
this pull request
May 2, 2026
…t numbers) Conflict resolution notes: - services.py docstrings for `_substitute_filter_any` and the chunked clear job: combined #908's "dry-run vs live-clear must agree" framing with #909's more general "rule survives a future field/truthiness change" framing. Both are accurate for the post-merge call sites; the merged wording covers both modes. - test_celery_broker_queue.py: took #909's structure (no preview test class) wholesale because the preview surface is gone. #908's `_post_clear` thread-join helper is unnecessary in the view tests because the post-#909 view tests only assert on the 302 redirect URL, not on post-worker queue state. End-to-end drain assertions live in `test_celery_broker_clear_job.py`, which #908 already wired up with a thread-join helper that survives this merge.
5 tasks
Contributor
Author
|
Folded into #905; keeping this open as the per-feature review record. |
thomasrockhu-codecov
added a commit
that referenced
this pull request
May 2, 2026
… maths Two CI failures on the consolidated branch HEAD: 1. `test_streaming_clear_verify_before_lset_skips_drifted_entry` was a left-over from PR #899 that asserted the verify-before- LSET guard skipped drifted slots. PR #908 deletes that guard in `_streaming_celery_clear` (the whole point of the restoration), so the test now sees `total_lset == 1` for a drifted slot and fails. The merge picked up #909's copy of the test file (it was the conflict-resolution baseline) which re-introduced the test. Apply #908's deletion verbatim and keep its replacement comment in place. 2. `test_typed_confirm_failure_preserves_chart_hint_callout` asserted `round(78500 / 20000 * 211052) == 828381`. The correct value is 828379 (the docstring arithmetic was off- by-two). The test logic and the production behaviour are both correct; the expected literal is what was wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Operator directive that drove this PR:
Restore PR #895's pipelined-LSET clear pattern inside
_streaming_celery_clear, replacing PR #899's per-match LINDEX / verify / LSET sequence. PR #899 retains its streaming architecture (we still walk pastCELERY_BROKER_DISPLAY_LIMITto drain deep filters); this PR only drops the verify-before-LSET guard. Bisect: PR #895 (da3a7e071) was the working clear path the operator was happy with. PR #899 (aa56fc351) replaced it with verify-before-LSET. PR #899 is the regression source.Why the verify guard was killing the queue
On the operator's
bundles_analysisbroker (211k items, ~78k matching the filter, consumers actively BLPOPing the head):MATCHED=2038, DRIFT=77475.LLENdid not drop measurably between passes.The verify-before-LSET path turned every queued LSET into
LINDEX + LSETround-trips per match. Between the chunk's LRANGE snapshot and the per-match LINDEX, even a moderately busy consumer pop rate is enough to mismatch the bytes in ~97% of slots, somatches_driftedabsorbed essentially every match and very few LSETs survived to be LREM'd.The plateau exit (
if prev_lset is not None and matches_lset == prev_lset: break) then fired against pass 2's identicalmatches_lset, declaring convergence at a queue that hadn't moved.What changed
_streaming_celery_clearcollects per-chunk(idx, raw)matches into a list, then issues a singlepipeline(transaction=False)ofLSET key idx tombstoneops withraise_on_error=False. EachExceptionreply (LSET out-of-range == consumer drained that slot) incrementsmatches_drifted; everything else incrementsmatches_lset._CELERY_MAX_PASSES3 → 10.Plateau exit (
prev_lset == matches_lset) removed. With verify-before-LSET gone,matches_lsetonly plateaus when the queue has converged, which is already covered bymatches_found == 0.keep_oneearly-exit retained. With verify gone,matches_driftedonly counts genuine LSET out-of-range failures, sokeep_one and matches_drifted == 0still means "non-keeper matches all tombstoned, queue converged for keep_one mode."Per-pass diagnostic logging so on-call can grep the LSET → LREM → LLEN delta from structured logs alone:
On a quiet queue
lrem_removedshould equallset. The new regression test asserts that round-trip explicitly (stats.total_lset == matches,LLEN(queue) == 0).Trade-off (data-loss surface area)
Documented in the
_streaming_celery_cleardocstring. Concretely: between the chunk LRANGE snapshot and the pipelined LSET, K consumer pops shift indexes down by K. Our LSET at slot 50 then overwrites whatever's at slot 50 now (which used to be at 50+K). That overwritten message is destroyed by the subsequent LREM. Bounded byconsumer_pops_per_chunk × chunks_per_pass; withchunk_size=1000and a consumer popping ~25/sec the bound is a handful of slots per chunk. Operator has explicitly accepted this trade-off.The
keep_onesemantic still survives the trade-off because pass 1's keeper is the lowest-index match in pass 1's LRANGE snapshot. Even if that snapshot is stale by the time we LSET, the new lowest-index match in pass 2 becomes the next keeper and the queue retains at least one matching message after a successful keep_one clear.Tests
test_streaming_clear_drops_50pct_of_queue_in_first_pass(test_celery_broker_clear_job.py): seeds 100 envelopes (50 matching the filter, 50 not), runs the chunked clear job with no concurrency, assertsLLENdrops from 100 to 50 and that every survivor parses as a non-matching envelope. This is the headline operator invariant.test_streaming_clear_makes_progress_under_concurrent_drift: monkey-patchesLINDEXto always return non-matching bytes (the worst-case drift scenario the verify-before-LSET path was designed to catch); confirms the pipelined path tombstones every match and drains the queue to zero, because LINDEX is no longer in the loop.test_streaming_clear_verify_before_lset_skips_drifted_entryintest_celery_broker_queue.py(asserted a semantic that no longer exists).test_celery_broker_clear_drain_verification.pycontinue to pass: the chunked-job worker still LSET-tombstones every match in the chunk and the LREM sweep still removes them.Local run (subset that doesn't require Postgres):
Operator one-liner to verify in
manage.py shell_plusTest plan
-m "not django_db")bundles_analysisafter deployMade with Cursor
Note
High Risk
Changes the Redis admin celery-broker clear algorithm to use pipelined
LSETwithout verify checks, intentionally increasing potential for accidental task deletion under concurrent consumers. It also alters convergence behavior (more passes, different exit conditions), which affects how aggressively queues are drained in production.Overview
Restores
_streaming_celery_clearto the pre-#899 approach: collect matching indexes per LRANGE chunk and issue batchedpipeline(...).lset(...)tombstones, dropping the per-matchLINDEXverify step and treating per-op pipelineExceptionreplies as drift.Adjusts convergence to better drain saturated queues (max passes 3→10, removes the
prev_lsetplateau exit, keeps an early-exit forkeep_onewhen no drift occurs) and adds structured per-pass logging includingLLENandLREMremoved counts.Updates tests to match the new behavior: adds regression coverage that the queue actually shrinks (including under simulated drift and out-of-range
LSETreplies), removes the verify-before-LSET drift-skip test, and makes clear-by-filter view tests join the background clear thread to avoid races.Reviewed by Cursor Bugbot for commit cb5c070. Bugbot is set up for automated code reviews on this repo. Configure here.