Skip to content

fix(redis_admin): restore pre-#899 pipeline-LSET clear; drop fragile verify guard#908

Open
thomasrockhu-codecov wants to merge 3 commits into
mainfrom
redis-admin/clear-drain-diagnostics
Open

fix(redis_admin): restore pre-#899 pipeline-LSET clear; drop fragile verify guard#908
thomasrockhu-codecov wants to merge 3 commits into
mainfrom
redis-admin/clear-drain-diagnostics

Conversation

@thomasrockhu-codecov
Copy link
Copy Markdown
Contributor

@thomasrockhu-codecov thomasrockhu-codecov commented May 1, 2026

Summary

Operator directive that drove this PR:

I'm ok if we lose a few tasks along the way and we make secondary
passes, but I do care that if I'm trying to remove 50% of the
queue, that there is a significant drop.

Restore PR #895's pipelined-LSET clear pattern inside _streaming_celery_clear, replacing PR #899's per-match LINDEX / verify / LSET sequence. PR #899 retains its streaming architecture (we still walk past CELERY_BROKER_DISPLAY_LIMIT to drain deep filters); this PR only drops the verify-before-LSET guard. Bisect: PR #895 (da3a7e071) was the working clear path the operator was happy with. PR #899 (aa56fc351) replaced it with verify-before-LSET. PR #899 is the regression source.

Why the verify guard was killing the queue

On the operator's bundles_analysis broker (211k items, ~78k matching the filter, consumers actively BLPOPing the head):

  • Pass 1 reported MATCHED=2038, DRIFT=77475.
  • LLEN did not drop measurably between passes.

The verify-before-LSET path turned every queued LSET into LINDEX + LSET round-trips per match. Between the chunk's LRANGE snapshot and the per-match LINDEX, even a moderately busy consumer pop rate is enough to mismatch the bytes in ~97% of slots, so matches_drifted absorbed essentially every match and very few LSETs survived to be LREM'd.

The plateau exit (if prev_lset is not None and matches_lset == prev_lset: break) then fired against pass 2's identical matches_lset, declaring convergence at a queue that hadn't moved.

What changed

  • _streaming_celery_clear collects per-chunk (idx, raw) matches into a list, then issues a single pipeline(transaction=False) of LSET key idx tombstone ops with raise_on_error=False. Each Exception reply (LSET out-of-range == consumer drained that slot) increments matches_drifted; everything else increments matches_lset.

  • _CELERY_MAX_PASSES 3 → 10.

  • Plateau exit (prev_lset == matches_lset) removed. With verify-before-LSET gone, matches_lset only plateaus when the queue has converged, which is already covered by matches_found == 0.

  • keep_one early-exit retained. With verify gone, matches_drifted only counts genuine LSET out-of-range failures, so keep_one and matches_drifted == 0 still means "non-keeper matches all tombstoned, queue converged for keep_one mode."

  • Per-pass diagnostic logging so on-call can grep the LSET → LREM → LLEN delta from structured logs alone:

    redis_admin.streaming_clear: queue=<q> pass=<n> depth=<d>
    redis_admin.streaming_clear: queue=<q> pass=<n> found=<f> lset=<l> drifted=<dr> pass_first_kept=<bool>
    redis_admin.streaming_clear: queue=<q> pass=<n> lrem_removed=<r>
    

    On a quiet queue lrem_removed should equal lset. The new regression test asserts that round-trip explicitly (stats.total_lset == matches, LLEN(queue) == 0).

Trade-off (data-loss surface area)

Documented in the _streaming_celery_clear docstring. Concretely: between the chunk LRANGE snapshot and the pipelined LSET, K consumer pops shift indexes down by K. Our LSET at slot 50 then overwrites whatever's at slot 50 now (which used to be at 50+K). That overwritten message is destroyed by the subsequent LREM. Bounded by consumer_pops_per_chunk × chunks_per_pass; with chunk_size=1000 and a consumer popping ~25/sec the bound is a handful of slots per chunk. Operator has explicitly accepted this trade-off.

The keep_one semantic still survives the trade-off because pass 1's keeper is the lowest-index match in pass 1's LRANGE snapshot. Even if that snapshot is stale by the time we LSET, the new lowest-index match in pass 2 becomes the next keeper and the queue retains at least one matching message after a successful keep_one clear.

Tests

  • New test_streaming_clear_drops_50pct_of_queue_in_first_pass (test_celery_broker_clear_job.py): seeds 100 envelopes (50 matching the filter, 50 not), runs the chunked clear job with no concurrency, asserts LLEN drops from 100 to 50 and that every survivor parses as a non-matching envelope. This is the headline operator invariant.
  • New test_streaming_clear_makes_progress_under_concurrent_drift: monkey-patches LINDEX to always return non-matching bytes (the worst-case drift scenario the verify-before-LSET path was designed to catch); confirms the pipelined path tombstones every match and drains the queue to zero, because LINDEX is no longer in the loop.
  • Removed test_streaming_clear_verify_before_lset_skips_drifted_entry in test_celery_broker_queue.py (asserted a semantic that no longer exists).
  • Existing drain tests in test_celery_broker_clear_drain_verification.py continue to pass: the chunked-job worker still LSET-tombstones every match in the chunk and the LREM sweep still removes them.

Local run (subset that doesn't require Postgres):

RUN_ENV=TESTING DJANGO_SETTINGS_MODULE=codecov.settings_test \
  uv run pytest -q -m "not django_db" \
    apps/codecov-api/redis_admin/tests/test_celery_broker_clear_drain_verification.py \
    apps/codecov-api/redis_admin/tests/test_celery_broker_clear_job.py \
    apps/codecov-api/redis_admin/tests/test_celery_broker_queue.py
# 78 passed, 30 errors (all django_db / Postgres unavailable locally), 16 deselected

Operator one-liner to verify in manage.py shell_plus

from redis_admin import conn as _c
r = _c.get_connection(kind="broker")
before = r.llen("bundles_analysis")
r.rpush("bundles_analysis", "__test_diag_value__")
removed = r.lrem("bundles_analysis", 0, "__test_diag_value__")
print(f"before={before} removed={removed} after={r.llen('bundles_analysis')}")
# expect: removed=1 and after==before (we pushed one then removed one)

Test plan

  • Unit tests pass locally (-m "not django_db")
  • CI green
  • Bugbot review addressed
  • Operator can confirm queue drops on bundles_analysis after deploy

Made with Cursor


Note

High Risk
Changes the Redis admin celery-broker clear algorithm to use pipelined LSET without verify checks, intentionally increasing potential for accidental task deletion under concurrent consumers. It also alters convergence behavior (more passes, different exit conditions), which affects how aggressively queues are drained in production.

Overview
Restores _streaming_celery_clear to the pre-#899 approach: collect matching indexes per LRANGE chunk and issue batched pipeline(...).lset(...) tombstones, dropping the per-match LINDEX verify step and treating per-op pipeline Exception replies as drift.

Adjusts convergence to better drain saturated queues (max passes 3→10, removes the prev_lset plateau exit, keeps an early-exit for keep_one when no drift occurs) and adds structured per-pass logging including LLEN and LREM removed counts.

Updates tests to match the new behavior: adds regression coverage that the queue actually shrinks (including under simulated drift and out-of-range LSET replies), removes the verify-before-LSET drift-skip test, and makes clear-by-filter view tests join the background clear thread to avoid races.

Reviewed by Cursor Bugbot for commit cb5c070. Bugbot is set up for automated code reviews on this repo. Configure here.

…verify guard

Restore PR #895's `_execute_celery_clear` pattern (pipelined
`LSET key idx tombstone` per chunk via
`pipeline(transaction=False).execute(raise_on_error=False)`) inside
`_streaming_celery_clear`, replacing the per-match LINDEX / verify /
LSET sequence introduced by PR #899. PR #899 retains its streaming
architecture (we still walk past the display-limit cap to drain deep
filters); this PR only drops the verify step.

Trade-off (operator directive):

> I'm ok if we lose a few tasks along the way and we make secondary
> passes, but I do care that if I'm trying to remove 50% of the
> queue, that there is a significant drop.

Verify-before-LSET turned every pipelined LSET into LINDEX + LSET
RTTs per match and, on a busy `bundles_analysis` broker (78k matches
with consumers actively BLPOPing), pushed the LSET success rate to
~3% (`matched=2038`, `drift=77475`). The operator saw the queue stay
full because most matches were silently swallowed by the drift
counter. Pipelined unconditional LSET trades this safety guard for
actual queue drain: the bounded data-loss surface area (consumer pops
between LRANGE snapshot and pipelined LSET shift indexes; our LSET
overwrites the new occupant of the target slot, which then gets
LREM'd) is documented in the docstring and explicitly accepted.

Same change set:

* `_CELERY_MAX_PASSES` 3 → 10. The `prev_lset == matches_lset`
  plateau exit is removed; on a busy queue it fired against
  legitimate per-pass progress (each pass cleared ~the same number
  of matches). Convergence is now bounded by `matches_found == 0`
  plus the pass cap.
* `keep_one` early-exit retained. With verify gone, `matches_drifted`
  only counts genuine LSET out-of-range failures, so
  `keep_one and matches_drifted == 0` still means "non-keeper
  matches all tombstoned, queue converged".
* Per-pass diagnostic logging:
    redis_admin.streaming_clear: queue=<q> pass=<n> depth=<d>
    redis_admin.streaming_clear: queue=<q> pass=<n> found=<f> lset=<l> drifted=<dr> pass_first_kept=<bool>
    redis_admin.streaming_clear: queue=<q> pass=<n> lrem_removed=<r>
  On a quiet queue `lrem_removed` should equal `lset`, giving on-call
  a single grep that ties `LSET → LREM → LLEN delta` together.
* Docstring documents the data-loss surface area and references PR
  #895 / #899 for the audit trail.

Tests:

* New `test_streaming_clear_drops_50pct_of_queue_in_first_pass` in
  `test_celery_broker_clear_job.py`: seeds 100 envelopes (50
  matching, 50 not), runs the chunked clear job with no concurrency,
  asserts `LLEN` drops from 100 to 50 and that every survivor parses
  as a non-matching envelope.
* New `test_streaming_clear_makes_progress_under_concurrent_drift`:
  monkey-patches `LINDEX` to always return non-matching bytes (the
  worst-case scenario the verify-before-LSET path was designed to
  catch); confirms the pipelined path tombstones every match and
  drains the queue to zero, because LINDEX is no longer in the loop.
* Removed `test_streaming_clear_verify_before_lset_skips_drifted_entry`
  in `test_celery_broker_queue.py` — it asserted a semantic that no
  longer exists.

Operator one-liner to verify the LSET → LREM round-trip in a live
broker connection (paste into `python manage.py shell_plus`):

    from redis_admin import conn as _c
    r = _c.get_connection(kind="broker")
    before = r.llen("bundles_analysis")
    r.rpush("bundles_analysis", "__test_diag_value__")
    removed = r.lrem("bundles_analysis", 0, "__test_diag_value__")
    print(f"before={before} removed={removed} after={r.llen('bundles_analysis')}")
…ueue

`CeleryBrokerClearByFilterViewTest`'s `test_clear_all_*` /
`test_clear_keep_one_*` / `test_legacy_confirm_*` /
`test_task_name_*` tests were racing the chunked clear's daemon
worker. The view returns 302 the moment `start_celery_broker_clear_
job` spawns the thread, but the LSET / LREM that actually drains
the queue runs in that worker; the tests were asserting on
`self.redis.lrange("celery", ...)` immediately and getting the
pre-clear queue back when the runner was a few ms slower than
usual.

Add a `_post_clear` helper on the test class that submits the POST,
parses the redirect URL's `<job_id>` segment, and `thread.join`s
the matching worker thread (10s timeout, with a fallback to joining
every entry in `_clear_job_threads` if the redirect target isn't a
progress page). All six racy tests are now driven through this
helper.

Surfaced when the pre-#899 pipeline-LSET restoration in this
branch tightened the worker's per-pass timing enough for the race
to bite reliably in CI.
@sentry
Copy link
Copy Markdown
Contributor

sentry Bot commented May 1, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 91.89%. Comparing base (6fb832f) to head (cb5c070).
✅ All tests successful. No failed tests found.

Additional details and impacted files
@@           Coverage Diff           @@
##             main     #908   +/-   ##
=======================================
  Coverage   91.89%   91.89%           
=======================================
  Files        1316     1316           
  Lines       50586    50590    +4     
  Branches     1625     1625           
=======================================
+ Hits        46485    46491    +6     
+ Misses       3795     3793    -2     
  Partials      306      306           
Flag Coverage Δ
apiunit 94.91% <100.00%> (+0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@codecov-notifications
Copy link
Copy Markdown

codecov-notifications Bot commented May 1, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ All tests successful. No failed tests found.

📢 Thoughts on this report? Let us know!

The pipelined `LSET` reply loop in `_streaming_celery_clear`
counts every `Exception` reply (out-of-range — consumer drained
the slot mid-clear) as `matches_drifted`. That branch was the
only patch line left uncovered after this PR's restoration
(codecov reported 85% patch coverage, target 90%, 3 missing
lines).

Add `test_streaming_clear_counts_pipelined_lset_out_of_range_
as_drift`: wraps `redis.pipeline()` to `LTRIM` the queue down
to half its length right before the pipeline flush, so every
LSET against an index past the new tail comes back as an
out-of-range Exception reply. Asserts `total_lset == 25` /
`total_drifted == 25` against a 50-match seed, exercising the
drift counter directly without monkey-patching individual
replies (which would diverge from real fakeredis pipeline
semantics).

Also tag the `(TypeError, ValueError)` fallback around the
`int(removed or 0)` cast on `LREM`'s return as `# pragma: no
cover - defensive`. Real Redis `LREM` always returns int; that
branch only fires for misbehaving test doubles and isn't
worth a contrived test.
thomasrockhu-codecov added a commit that referenced this pull request May 2, 2026
…t numbers)

Conflict resolution notes:
- services.py docstrings for `_substitute_filter_any` and the
  chunked clear job: combined #908's "dry-run vs live-clear must
  agree" framing with #909's more general "rule survives a future
  field/truthiness change" framing. Both are accurate for the
  post-merge call sites; the merged wording covers both modes.
- test_celery_broker_queue.py: took #909's structure (no preview
  test class) wholesale because the preview surface is gone.
  #908's `_post_clear` thread-join helper is unnecessary in the
  view tests because the post-#909 view tests only assert on the
  302 redirect URL, not on post-worker queue state. End-to-end
  drain assertions live in `test_celery_broker_clear_job.py`,
  which #908 already wired up with a thread-join helper that
  survives this merge.
@thomasrockhu-codecov
Copy link
Copy Markdown
Contributor Author

Folded into #905; keeping this open as the per-feature review record.

thomasrockhu-codecov added a commit that referenced this pull request May 2, 2026
… maths

Two CI failures on the consolidated branch HEAD:

1. `test_streaming_clear_verify_before_lset_skips_drifted_entry`
   was a left-over from PR #899 that asserted the verify-before-
   LSET guard skipped drifted slots. PR #908 deletes that guard
   in `_streaming_celery_clear` (the whole point of the
   restoration), so the test now sees `total_lset == 1` for a
   drifted slot and fails. The merge picked up #909's copy of
   the test file (it was the conflict-resolution baseline) which
   re-introduced the test. Apply #908's deletion verbatim and
   keep its replacement comment in place.

2. `test_typed_confirm_failure_preserves_chart_hint_callout`
   asserted `round(78500 / 20000 * 211052) == 828381`. The
   correct value is 828379 (the docstring arithmetic was off-
   by-two). The test logic and the production behaviour are
   both correct; the expected literal is what was wrong.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants