diff --git a/src/fides/api/tasks/__init__.py b/src/fides/api/tasks/__init__.py index f5f90663d25..6ed39b15357 100644 --- a/src/fides/api/tasks/__init__.py +++ b/src/fides/api/tasks/__init__.py @@ -1,3 +1,4 @@ +import os from typing import Any, ContextManager, Dict, List, Optional import celery_redis_cluster_backend # type: ignore[import-untyped] # noqa: F401 - registers redis+cluster/rediss+cluster backends @@ -35,6 +36,19 @@ NEW_SESSION_RETRIES = 5 + +def _honor_configured_celery_eager_execution() -> bool: + """True when CONFIG-driven eager execution may run in the current process. + + The web API always publishes tasks to the broker. Celery worker entrypoints + set ``FIDES_CELERY_WORKER`` so eager settings apply there; tests keep + ``FIDES__TEST_MODE`` behavior unchanged. + """ + if CONFIG.test_mode: + return True + return os.environ.get("FIDES_CELERY_WORKER") == "1" + + autodiscover_task_locations: List[str] = [ "fides.api.tasks", "fides.api.tasks.scheduled", @@ -65,12 +79,17 @@ def apply_async( **options: Any, ) -> Any: # type: ignore[override] """Dispatch the task, running it eagerly if the target queue is in the configured - eager_task_queues or if task_always_eager is enabled globally.""" + eager_task_queues or if task_always_eager is enabled globally. + + Eager routing is only honored in Celery worker processes (and in test mode), + never in the API server process.""" effective_queue = ( queue or getattr(self, "queue", None) or self.app.conf.task_default_queue ) eager_queues = CONFIG.celery.eager_task_queues - if effective_queue in eager_queues or self.app.conf.task_always_eager: + if _honor_configured_celery_eager_execution() and ( + effective_queue in eager_queues or self.app.conf.task_always_eager + ): return self.apply(args, kwargs, **options) return super().apply_async(args, kwargs, queue=queue, **options) @@ -187,6 +206,11 @@ def _create_celery(config: FidesConfig = CONFIG) -> Celery: app.autodiscover_tasks(autodiscover_task_locations) + # Non-worker processes (e.g. the FastAPI app) must not execute tasks eagerly in-process; + # ``fides.api.worker`` sets ``FIDES_CELERY_WORKER`` and syncs ``task_always_eager`` from CONFIG. + if not config.test_mode and os.environ.get("FIDES_CELERY_WORKER") != "1": + app.conf.task_always_eager = False + return app diff --git a/src/fides/api/v1/endpoints/health.py b/src/fides/api/v1/endpoints/health.py index 1b86e8f491e..881b10a3e62 100644 --- a/src/fides/api/v1/endpoints/health.py +++ b/src/fides/api/v1/endpoints/health.py @@ -323,6 +323,9 @@ async def workers_health() -> Dict: workers_enabled=False, workers=[], queue_counts={} ).model_dump(mode="json") + # Use the Celery app runtime flag (tests may flip it via ``enable_celery_worker``); the + # API process also forces ``task_always_eager`` false in ``_create_celery`` when not in + # test mode so this reflects broker dispatch vs in-process execution. fides_is_using_workers = not celery_app.conf["task_always_eager"] if fides_is_using_workers: response["workers_enabled"] = True diff --git a/src/fides/api/worker/__init__.py b/src/fides/api/worker/__init__.py index c437c5b412e..a370f06f18e 100644 --- a/src/fides/api/worker/__init__.py +++ b/src/fides/api/worker/__init__.py @@ -1,3 +1,4 @@ +import os from typing import Any, Dict, List, Optional from celery import VERSION_BANNER @@ -68,49 +69,60 @@ def _parse_prefetch_map(known_queues: List[str]) -> Dict[str, int]: def _run_celery_worker(worker_queues: str, prefetch_map: Dict[str, int]) -> None: """Run the Celery worker process. Extracted so it can be used as a watchfiles target.""" - active_queues = [q.strip() for q in worker_queues.split(",")] - - # Resolve prefetch multiplier — first matching queue wins - prefetch: Optional[int] = None - for queue in active_queues: - if queue in prefetch_map: - prefetch = prefetch_map[queue] - break - - argv = [ - "--quiet", # Disable Celery startup banner - "worker", - "--loglevel=info", - f"--concurrency={CONFIG.celery.worker_concurrency}", - f"--queues={worker_queues}", - ] - if prefetch is not None: - argv.append(f"--prefetch-multiplier={prefetch}") - - without_flags = [] - if CONFIG.celery.worker_disable_heartbeat: - without_flags.append("--without-heartbeat") - if CONFIG.celery.worker_disable_gossip: - without_flags.append("--without-gossip") - if CONFIG.celery.worker_disable_mingle: - without_flags.append("--without-mingle") - if without_flags: - argv += without_flags + previous_worker_flag = os.environ.get("FIDES_CELERY_WORKER") + previous_task_always_eager = celery_app.conf.task_always_eager + os.environ["FIDES_CELERY_WORKER"] = "1" + celery_app.conf.task_always_eager = CONFIG.celery.task_always_eager + try: + active_queues = [q.strip() for q in worker_queues.split(",")] + + # Resolve prefetch multiplier — first matching queue wins + prefetch: Optional[int] = None + for queue in active_queues: + if queue in prefetch_map: + prefetch = prefetch_map[queue] + break + + argv = [ + "--quiet", # Disable Celery startup banner + "worker", + "--loglevel=info", + f"--concurrency={CONFIG.celery.worker_concurrency}", + f"--queues={worker_queues}", + ] + if prefetch is not None: + argv.append(f"--prefetch-multiplier={prefetch}") + + without_flags = [] + if CONFIG.celery.worker_disable_heartbeat: + without_flags.append("--without-heartbeat") + if CONFIG.celery.worker_disable_gossip: + without_flags.append("--without-gossip") + if CONFIG.celery.worker_disable_mingle: + without_flags.append("--without-mingle") + if without_flags: + argv += without_flags + logger.info( + f"Worker started with {' '.join(without_flags)} " + f"(FIDES__CELERY__WORKER_DISABLE_HEARTBEAT={CONFIG.celery.worker_disable_heartbeat}, " + f"FIDES__CELERY__WORKER_DISABLE_GOSSIP={CONFIG.celery.worker_disable_gossip}, " + f"FIDES__CELERY__WORKER_DISABLE_MINGLE={CONFIG.celery.worker_disable_mingle})" + ) + + eager = CONFIG.celery.task_always_eager logger.info( - f"Worker started with {' '.join(without_flags)} " - f"(FIDES__CELERY__WORKER_DISABLE_HEARTBEAT={CONFIG.celery.worker_disable_heartbeat}, " - f"FIDES__CELERY__WORKER_DISABLE_GOSSIP={CONFIG.celery.worker_disable_gossip}, " - f"FIDES__CELERY__WORKER_DISABLE_MINGLE={CONFIG.celery.worker_disable_mingle})" + f"Worker starting | queues={worker_queues} | " + f"task_always_eager={eager} | " + f"prefetch_multiplier={prefetch if prefetch is not None else 'default (4)'}" ) - eager = CONFIG.celery.task_always_eager - logger.info( - f"Worker starting | queues={worker_queues} | " - f"task_always_eager={eager} | " - f"prefetch_multiplier={prefetch if prefetch is not None else 'default (4)'}" - ) - - celery_app.worker_main(argv=argv) + celery_app.worker_main(argv=argv) + finally: + if previous_worker_flag is None: + os.environ.pop("FIDES_CELERY_WORKER", None) + else: + os.environ["FIDES_CELERY_WORKER"] = previous_worker_flag + celery_app.conf.task_always_eager = previous_task_always_eager def start_worker( diff --git a/src/fides/config/celery_settings.py b/src/fides/config/celery_settings.py index c47314d0d28..e2aa26dbfa9 100644 --- a/src/fides/config/celery_settings.py +++ b/src/fides/config/celery_settings.py @@ -26,7 +26,8 @@ class CelerySettings(FidesSettings): task_always_eager: bool = Field( default=True, description="If true, tasks are executed locally instead of being sent to the queue. " - "If False, tasks are sent to the queue.", + "If False, tasks are sent to the queue. The web API server always publishes to the " + "broker regardless; this flag is honored in Celery worker processes (and in test mode).", ) eager_task_queues: Set[str] = Field( default_factory=set,