Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions src/fides/api/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from typing import Any, ContextManager, Dict, List, Optional

import celery_redis_cluster_backend # type: ignore[import-untyped] # noqa: F401 - registers redis+cluster/rediss+cluster backends
Expand Down Expand Up @@ -35,6 +36,19 @@

NEW_SESSION_RETRIES = 5


def _honor_configured_celery_eager_execution() -> bool:
"""True when CONFIG-driven eager execution may run in the current process.

The web API always publishes tasks to the broker. Celery worker entrypoints
set ``FIDES_CELERY_WORKER`` so eager settings apply there; tests keep
``FIDES__TEST_MODE`` behavior unchanged.
"""
if CONFIG.test_mode:
return True
return os.environ.get("FIDES_CELERY_WORKER") == "1"


autodiscover_task_locations: List[str] = [
"fides.api.tasks",
"fides.api.tasks.scheduled",
Expand Down Expand Up @@ -65,12 +79,17 @@ def apply_async(
**options: Any,
) -> Any: # type: ignore[override]
"""Dispatch the task, running it eagerly if the target queue is in the configured
eager_task_queues or if task_always_eager is enabled globally."""
eager_task_queues or if task_always_eager is enabled globally.

Eager routing is only honored in Celery worker processes (and in test mode),
never in the API server process."""
effective_queue = (
queue or getattr(self, "queue", None) or self.app.conf.task_default_queue
)
eager_queues = CONFIG.celery.eager_task_queues
if effective_queue in eager_queues or self.app.conf.task_always_eager:
if _honor_configured_celery_eager_execution() and (
effective_queue in eager_queues or self.app.conf.task_always_eager
):
return self.apply(args, kwargs, **options)
return super().apply_async(args, kwargs, queue=queue, **options)

Expand Down Expand Up @@ -187,6 +206,11 @@ def _create_celery(config: FidesConfig = CONFIG) -> Celery:

app.autodiscover_tasks(autodiscover_task_locations)

# Non-worker processes (e.g. the FastAPI app) must not execute tasks eagerly in-process;
# ``fides.api.worker`` sets ``FIDES_CELERY_WORKER`` and syncs ``task_always_eager`` from CONFIG.
if not config.test_mode and os.environ.get("FIDES_CELERY_WORKER") != "1":
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

post-merge nit: couldn't we just reuse _honor_configured_celery_eager_execution? seems a bit strange to me that we need the criteria/logic here defined in two places, but maybe i'm missing something obvious!

clearly not a blocker, just noticed in passing and wanted to flag

app.conf.task_always_eager = False

return app


Expand Down
3 changes: 3 additions & 0 deletions src/fides/api/v1/endpoints/health.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,9 @@ async def workers_health() -> Dict:
workers_enabled=False, workers=[], queue_counts={}
).model_dump(mode="json")

# Use the Celery app runtime flag (tests may flip it via ``enable_celery_worker``); the
# API process also forces ``task_always_eager`` false in ``_create_celery`` when not in
# test mode so this reflects broker dispatch vs in-process execution.
fides_is_using_workers = not celery_app.conf["task_always_eager"]
if fides_is_using_workers:
response["workers_enabled"] = True
Expand Down
92 changes: 52 additions & 40 deletions src/fides/api/worker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from typing import Any, Dict, List, Optional

from celery import VERSION_BANNER
Expand Down Expand Up @@ -68,49 +69,60 @@ def _parse_prefetch_map(known_queues: List[str]) -> Dict[str, int]:

def _run_celery_worker(worker_queues: str, prefetch_map: Dict[str, int]) -> None:
"""Run the Celery worker process. Extracted so it can be used as a watchfiles target."""
active_queues = [q.strip() for q in worker_queues.split(",")]

# Resolve prefetch multiplier — first matching queue wins
prefetch: Optional[int] = None
for queue in active_queues:
if queue in prefetch_map:
prefetch = prefetch_map[queue]
break

argv = [
"--quiet", # Disable Celery startup banner
"worker",
"--loglevel=info",
f"--concurrency={CONFIG.celery.worker_concurrency}",
f"--queues={worker_queues}",
]
if prefetch is not None:
argv.append(f"--prefetch-multiplier={prefetch}")

without_flags = []
if CONFIG.celery.worker_disable_heartbeat:
without_flags.append("--without-heartbeat")
if CONFIG.celery.worker_disable_gossip:
without_flags.append("--without-gossip")
if CONFIG.celery.worker_disable_mingle:
without_flags.append("--without-mingle")
if without_flags:
argv += without_flags
previous_worker_flag = os.environ.get("FIDES_CELERY_WORKER")
previous_task_always_eager = celery_app.conf.task_always_eager
os.environ["FIDES_CELERY_WORKER"] = "1"
celery_app.conf.task_always_eager = CONFIG.celery.task_always_eager
try:
active_queues = [q.strip() for q in worker_queues.split(",")]

# Resolve prefetch multiplier — first matching queue wins
prefetch: Optional[int] = None
for queue in active_queues:
if queue in prefetch_map:
prefetch = prefetch_map[queue]
break

argv = [
"--quiet", # Disable Celery startup banner
"worker",
"--loglevel=info",
f"--concurrency={CONFIG.celery.worker_concurrency}",
f"--queues={worker_queues}",
]
if prefetch is not None:
argv.append(f"--prefetch-multiplier={prefetch}")

without_flags = []
if CONFIG.celery.worker_disable_heartbeat:
without_flags.append("--without-heartbeat")
if CONFIG.celery.worker_disable_gossip:
without_flags.append("--without-gossip")
if CONFIG.celery.worker_disable_mingle:
without_flags.append("--without-mingle")
if without_flags:
argv += without_flags
logger.info(
f"Worker started with {' '.join(without_flags)} "
f"(FIDES__CELERY__WORKER_DISABLE_HEARTBEAT={CONFIG.celery.worker_disable_heartbeat}, "
f"FIDES__CELERY__WORKER_DISABLE_GOSSIP={CONFIG.celery.worker_disable_gossip}, "
f"FIDES__CELERY__WORKER_DISABLE_MINGLE={CONFIG.celery.worker_disable_mingle})"
)

eager = CONFIG.celery.task_always_eager
logger.info(
f"Worker started with {' '.join(without_flags)} "
f"(FIDES__CELERY__WORKER_DISABLE_HEARTBEAT={CONFIG.celery.worker_disable_heartbeat}, "
f"FIDES__CELERY__WORKER_DISABLE_GOSSIP={CONFIG.celery.worker_disable_gossip}, "
f"FIDES__CELERY__WORKER_DISABLE_MINGLE={CONFIG.celery.worker_disable_mingle})"
f"Worker starting | queues={worker_queues} | "
f"task_always_eager={eager} | "
f"prefetch_multiplier={prefetch if prefetch is not None else 'default (4)'}"
)

eager = CONFIG.celery.task_always_eager
logger.info(
f"Worker starting | queues={worker_queues} | "
f"task_always_eager={eager} | "
f"prefetch_multiplier={prefetch if prefetch is not None else 'default (4)'}"
)

celery_app.worker_main(argv=argv)
celery_app.worker_main(argv=argv)
finally:
if previous_worker_flag is None:
os.environ.pop("FIDES_CELERY_WORKER", None)
else:
os.environ["FIDES_CELERY_WORKER"] = previous_worker_flag
celery_app.conf.task_always_eager = previous_task_always_eager


def start_worker(
Expand Down
3 changes: 2 additions & 1 deletion src/fides/config/celery_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ class CelerySettings(FidesSettings):
task_always_eager: bool = Field(
default=True,
description="If true, tasks are executed locally instead of being sent to the queue. "
"If False, tasks are sent to the queue.",
"If False, tasks are sent to the queue. The web API server always publishes to the "
"broker regardless; this flag is honored in Celery worker processes (and in test mode).",
)
eager_task_queues: Set[str] = Field(
default_factory=set,
Expand Down
Loading