Skip to content

Commit 25ac120

Browse files
rayharnettqwencoderjohnewart
authored andcommitted
Configurable reduction in Celery chatter on Redis queues (#8077)
Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com> Co-authored-by: John Ewart <john@johnewart.net>
1 parent f94a81b commit 25ac120

5 files changed

Lines changed: 162 additions & 9 deletions

File tree

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# Copy this file and rename it to <pr_number>-<short-description>.yaml (e.g., 1234-add-user-endpoint.yaml)
2+
# Fill in the required fields and delete this comment block
3+
4+
type: Changed
5+
description: Replaced the single worker_disable_gossip_heartbeat_mingle config flag with three independent flags (worker_disable_heartbeat, worker_disable_gossip, worker_disable_mingle) for granular control over Celery worker flags
6+
pr: 8077
7+
labels: []

src/fides/api/worker/__init__.py

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
PRIVACY_PREFERENCES_QUEUE_NAME,
2323
celery_app,
2424
)
25+
from fides.config import CONFIG
2526

2627

2728
class _PythonAndYamlFilter(DefaultFilter):
@@ -37,15 +38,29 @@ def __call__(self, change: Any, path: str) -> bool:
3738

3839
def _run_celery_worker(worker_queues: str) -> None:
3940
"""Run the Celery worker process. Extracted so it can be used as a watchfiles target."""
40-
celery_app.worker_main(
41-
argv=[
42-
"--quiet", # Disable Celery startup banner
43-
"worker",
44-
"--loglevel=info",
45-
"--concurrency=2",
46-
f"--queues={worker_queues}",
47-
]
48-
)
41+
argv = [
42+
"--quiet", # Disable Celery startup banner
43+
"worker",
44+
"--loglevel=info",
45+
f"--concurrency={CONFIG.celery.worker_concurrency}",
46+
f"--queues={worker_queues}",
47+
]
48+
without_flags = []
49+
if CONFIG.celery.worker_disable_heartbeat:
50+
without_flags.append("--without-heartbeat")
51+
if CONFIG.celery.worker_disable_gossip:
52+
without_flags.append("--without-gossip")
53+
if CONFIG.celery.worker_disable_mingle:
54+
without_flags.append("--without-mingle")
55+
if without_flags:
56+
argv += without_flags
57+
logger.info(
58+
f"Worker started with {' '.join(without_flags)} "
59+
f"(FIDES__CELERY__WORKER_DISABLE_HEARTBEAT={CONFIG.celery.worker_disable_heartbeat}, "
60+
f"FIDES__CELERY__WORKER_DISABLE_GOSSIP={CONFIG.celery.worker_disable_gossip}, "
61+
f"FIDES__CELERY__WORKER_DISABLE_MINGLE={CONFIG.celery.worker_disable_mingle})"
62+
)
63+
celery_app.worker_main(argv=argv)
4964

5065

5166
def start_worker(

src/fides/config/celery_settings.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,26 @@ class CelerySettings(FidesSettings):
3333
healthcheck_ping_timeout: float = Field(
3434
default=2.0, description="The timeout in seconds for the health check ping"
3535
)
36+
worker_disable_heartbeat: bool = Field(
37+
default=False,
38+
description="If true, starts the worker with --without-heartbeat. "
39+
"Use this as a workaround for Celery BRPOP connection drop issues (celery/celery#7276).",
40+
)
41+
worker_disable_gossip: bool = Field(
42+
default=False,
43+
description="If true, starts the worker with --without-gossip. "
44+
"Use this as a workaround for Celery BRPOP connection drop issues (celery/celery#7276).",
45+
)
46+
worker_disable_mingle: bool = Field(
47+
default=False,
48+
description="If true, starts the worker with --without-mingle. "
49+
"Use this as a workaround for Celery BRPOP connection drop issues (celery/celery#7276).",
50+
)
51+
worker_concurrency: int = Field(
52+
default=2,
53+
ge=1,
54+
description="Number of worker processes/threads passed to `celery worker --concurrency`.",
55+
)
3656
broker_url: Optional[str] = Field(
3757
default=None,
3858
description="Celery broker URL. When set, overrides the default. With redis.cluster_enabled, "

tests/ctl/api/test_worker.py

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,3 +132,113 @@ def test_no_reload_calls_worker_main_directly(self, worker_main_mock: MagicMock)
132132
start_worker()
133133

134134
worker_main_mock.assert_called_once()
135+
136+
137+
class TestWorkerDisableFlags:
138+
"""Tests for the individual worker_disable_heartbeat/gossip/mingle config flags."""
139+
140+
@pytest.fixture(autouse=True)
141+
def mock_celery_config(self):
142+
"""Provide a mock CONFIG.celery with the three disable flags."""
143+
mock_config = MagicMock()
144+
mock_config.celery.worker_disable_heartbeat = False
145+
mock_config.celery.worker_disable_gossip = False
146+
mock_config.celery.worker_disable_mingle = False
147+
mock_config.celery.worker_concurrency = 2
148+
with patch("fides.api.worker.CONFIG", mock_config):
149+
yield mock_config
150+
151+
@pytest.fixture
152+
def worker_main_mock(self):
153+
with patch("fides.api.worker.celery_app.worker_main") as mock:
154+
yield mock
155+
156+
def test_no_without_flags_when_all_disabled_false(
157+
self, mock_celery_config, worker_main_mock: MagicMock
158+
):
159+
"""Default: no --without-* flags when all flags are False."""
160+
start_worker()
161+
162+
call_args = worker_main_mock.call_args
163+
argv = call_args.kwargs["argv"]
164+
assert "--without-heartbeat" not in argv
165+
assert "--without-gossip" not in argv
166+
assert "--without-mingle" not in argv
167+
168+
@pytest.mark.parametrize(
169+
"flag_name",
170+
[
171+
"worker_disable_heartbeat",
172+
"worker_disable_gossip",
173+
"worker_disable_mingle",
174+
],
175+
)
176+
def test_single_flag_enabled(
177+
self, mock_celery_config, worker_main_mock: MagicMock, flag_name: str
178+
):
179+
"""When a disable flag is True, the matching --without-* appears in argv."""
180+
setattr(mock_celery_config.celery, flag_name, True)
181+
start_worker()
182+
183+
call_args = worker_main_mock.call_args
184+
argv = call_args.kwargs["argv"]
185+
flag_to_arg = {
186+
"worker_disable_heartbeat": "--without-heartbeat",
187+
"worker_disable_gossip": "--without-gossip",
188+
"worker_disable_mingle": "--without-mingle",
189+
}
190+
assert flag_to_arg[flag_name] in argv
191+
192+
def test_all_three_flags_enabled(
193+
self, mock_celery_config, worker_main_mock: MagicMock
194+
):
195+
"""All three flags together produces all --without-* flags."""
196+
mock_celery_config.celery.worker_disable_heartbeat = True
197+
mock_celery_config.celery.worker_disable_gossip = True
198+
mock_celery_config.celery.worker_disable_mingle = True
199+
200+
start_worker()
201+
202+
call_args = worker_main_mock.call_args
203+
argv = call_args.kwargs["argv"]
204+
assert "--without-heartbeat" in argv
205+
assert "--without-gossip" in argv
206+
assert "--without-mingle" in argv
207+
208+
209+
class TestWorkerConcurrency:
210+
"""Tests for CONFIG.celery.worker_concurrency."""
211+
212+
@pytest.fixture(autouse=True)
213+
def mock_celery_config(self):
214+
mock_config = MagicMock()
215+
mock_config.celery.worker_disable_heartbeat = False
216+
mock_config.celery.worker_disable_gossip = False
217+
mock_config.celery.worker_disable_mingle = False
218+
mock_config.celery.worker_concurrency = 2
219+
with patch("fides.api.worker.CONFIG", mock_config):
220+
yield mock_config
221+
222+
@pytest.fixture
223+
def worker_main_mock(self):
224+
with patch("fides.api.worker.celery_app.worker_main") as mock:
225+
yield mock
226+
227+
def test_default_concurrency_in_argv(
228+
self, mock_celery_config: MagicMock, worker_main_mock: MagicMock
229+
):
230+
start_worker()
231+
argv = worker_main_mock.call_args.kwargs["argv"]
232+
assert "--concurrency=2" in argv
233+
234+
@pytest.mark.parametrize("concurrency", [1, 4, 16])
235+
def test_custom_concurrency_in_argv(
236+
self,
237+
mock_celery_config: MagicMock,
238+
worker_main_mock: MagicMock,
239+
concurrency: int,
240+
):
241+
mock_celery_config.celery.worker_concurrency = concurrency
242+
start_worker()
243+
argv = worker_main_mock.call_args.kwargs["argv"]
244+
assert f"--concurrency={concurrency}" in argv

tests/ops/tasks/test_celery.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ def test_celery_default_config() -> None:
4040
assert config.celery.task_always_eager
4141
assert config.celery.event_queue_prefix == "fides_worker"
4242
assert config.celery.task_default_queue == "fides"
43+
assert config.celery.worker_concurrency == 2
4344

4445
celery_app = _create_celery(config)
4546
expected_broker = (

0 commit comments

Comments
 (0)