diff --git a/.fides/db_dataset.yml b/.fides/db_dataset.yml index f0aa3aba7f4..d964f9dbecf 100644 --- a/.fides/db_dataset.yml +++ b/.fides/db_dataset.yml @@ -3677,6 +3677,8 @@ dataset: data_categories: [system.operations] - name: child_resource_urns data_categories: [system.operations] + - name: group_id + data_categories: [system.operations] - name: created_at data_categories: [system.operations] - name: updated_at diff --git a/changelog/8115-add-group-id-to-monitortask.yaml b/changelog/8115-add-group-id-to-monitortask.yaml new file mode 100644 index 00000000000..9f1cd92f4e7 --- /dev/null +++ b/changelog/8115-add-group-id-to-monitortask.yaml @@ -0,0 +1,4 @@ +type: Added +description: Add group_id column to MonitorTask and is_monitor_task_cancelled for task cancellation support +pr: 8115 +labels: ["db-migration"] diff --git a/clients/admin-ui/src/features/data-discovery-and-detection/action-center/components/InProgressMonitorTaskItem.tsx b/clients/admin-ui/src/features/data-discovery-and-detection/action-center/components/InProgressMonitorTaskItem.tsx index 445faa0992e..51ae5e8aeb0 100644 --- a/clients/admin-ui/src/features/data-discovery-and-detection/action-center/components/InProgressMonitorTaskItem.tsx +++ b/clients/admin-ui/src/features/data-discovery-and-detection/action-center/components/InProgressMonitorTaskItem.tsx @@ -20,6 +20,7 @@ import ConnectionTypeLogo, { } from "~/features/datastore-connections/ConnectionTypeLogo"; import { ConnectionType, + ExecutionLogStatus, MonitorTaskResponse, MonitorTaskType, } from "~/types/api"; @@ -32,15 +33,8 @@ import { const { Paragraph, Text, Title } = Typography; -// Helper function to format status names for display -const formatStatusForDisplay = (status: string): string => { - // Special case: "paused" should display as "Awaiting Processing" - if (status === "paused") { - return "Awaiting Processing"; - } - - return status.split("_").map(capitalize).join(" "); -}; +const formatStatusForDisplay = (status: string) => + status.split("_").map(capitalize).join(" "); interface InProgressMonitorTaskItemProps extends ListItemProps { task: MonitorTaskResponse; @@ -102,11 +96,10 @@ export const InProgressMonitorTaskItem = ({ const taskCount = task.staged_resource_urns?.length || 0; const isInProgress = [ - "pending", - "in_processing", - "paused", - "retrying", - ].includes((task.status || "").toLowerCase()); + ExecutionLogStatus.PENDING, + ExecutionLogStatus.IN_PROCESSING, + ExecutionLogStatus.RETRYING, + ].some((status) => status === task.status?.toLowerCase()); const fieldCount = task.field_count || taskCount; const taskTitle = (() => { if ( diff --git a/clients/admin-ui/src/features/data-discovery-and-detection/action-center/components/InProgressMonitorTasksList.tsx b/clients/admin-ui/src/features/data-discovery-and-detection/action-center/components/InProgressMonitorTasksList.tsx index 0fee1e65080..035e524e463 100644 --- a/clients/admin-ui/src/features/data-discovery-and-detection/action-center/components/InProgressMonitorTasksList.tsx +++ b/clients/admin-ui/src/features/data-discovery-and-detection/action-center/components/InProgressMonitorTasksList.tsx @@ -18,17 +18,8 @@ import { DebouncedSearchInput } from "../../../common/DebouncedSearchInput"; import { useInProgressMonitorTasksList } from "../hooks/useInProgressMonitorTasksList"; import { InProgressMonitorTaskItem } from "./InProgressMonitorTaskItem"; -// Helper function to format status names for display -const formatStatusForDisplay = ( - status: ExecutionLogStatus | string, -): string => { - // Special case: "paused" should display as "Awaiting Processing" - if (status === "paused") { - return "Awaiting Processing"; - } - - return status.split("_").map(capitalize).join(" "); -}; +const formatStatusForDisplay = (status: ExecutionLogStatus | string): string => + status.split("_").map(capitalize).join(" "); export const InProgressMonitorTasksList = ({ filters, diff --git a/clients/admin-ui/src/features/data-discovery-and-detection/action-center/hooks/useInProgressMonitorTasksList.tsx b/clients/admin-ui/src/features/data-discovery-and-detection/action-center/hooks/useInProgressMonitorTasksList.tsx index ba02d813c9b..5fdc5249b0c 100644 --- a/clients/admin-ui/src/features/data-discovery-and-detection/action-center/hooks/useInProgressMonitorTasksList.tsx +++ b/clients/admin-ui/src/features/data-discovery-and-detection/action-center/hooks/useInProgressMonitorTasksList.tsx @@ -21,7 +21,6 @@ export const useInProgressMonitorTasksList = ({ () => [ ExecutionLogStatus.PENDING, ExecutionLogStatus.IN_PROCESSING, - ExecutionLogStatus.PAUSED, ExecutionLogStatus.RETRYING, ExecutionLogStatus.ERROR, ], @@ -74,7 +73,6 @@ export const useInProgressMonitorTasksList = ({ }, [defaultStatusFilters, resetPagination]); // All possible status values from ExecutionLogStatus enum - // Note: awaiting_processing displays as "Awaiting Processing" but maps to "paused" in the API const allPossibleStatuses: ExecutionLogStatus[] = [ ExecutionLogStatus.PENDING, ExecutionLogStatus.IN_PROCESSING, diff --git a/src/fides/api/alembic/migrations/versions/xx_2026_05_04_2118_9f21507db078_add_group_id_to_monitortask.py b/src/fides/api/alembic/migrations/versions/xx_2026_05_04_2118_9f21507db078_add_group_id_to_monitortask.py new file mode 100644 index 00000000000..5661202c5b2 --- /dev/null +++ b/src/fides/api/alembic/migrations/versions/xx_2026_05_04_2118_9f21507db078_add_group_id_to_monitortask.py @@ -0,0 +1,30 @@ +"""Add group_id to MonitorTask + +Revision ID: 9f21507db078 +Revises: e3f4a5b6c7d8 +Create Date: 2026-05-04 21:18:44.962348 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '9f21507db078' +down_revision = 'e3f4a5b6c7d8' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('monitortask', sa.Column('group_id', sa.String(255), nullable=True)) + op.create_index(op.f('ix_monitortask_group_id'), 'monitortask', ['group_id'], unique=False) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index(op.f('ix_monitortask_group_id'), table_name='monitortask') + op.drop_column('monitortask', 'group_id') + # ### end Alembic commands ### diff --git a/src/fides/api/models/detection_discovery/__init__.py b/src/fides/api/models/detection_discovery/__init__.py index 66fb9c8de2f..487cdd9a78b 100644 --- a/src/fides/api/models/detection_discovery/__init__.py +++ b/src/fides/api/models/detection_discovery/__init__.py @@ -17,6 +17,7 @@ MonitorTaskType, TaskRunType, create_monitor_task_with_execution_log, + is_monitor_task_cancelled, update_monitor_task_with_execution_log, ) from .staged_resource_error import StagedResourceError @@ -39,5 +40,6 @@ "MonitorTaskType", "TaskRunType", "create_monitor_task_with_execution_log", + "is_monitor_task_cancelled", "update_monitor_task_with_execution_log", ] diff --git a/src/fides/api/models/detection_discovery/monitor_task.py b/src/fides/api/models/detection_discovery/monitor_task.py index c119567e603..f6a18ecb779 100644 --- a/src/fides/api/models/detection_discovery/monitor_task.py +++ b/src/fides/api/models/detection_discovery/monitor_task.py @@ -51,6 +51,7 @@ class MonitorTask(WorkerTask, Base): ) staged_resource_urns = Column(ARRAY(String), nullable=True) child_resource_urns = Column(ARRAY(String), nullable=True) + group_id = Column(String(255), nullable=True, index=True) dismissed = Column(Boolean, nullable=False, default=False) monitor_config = relationship(MonitorConfig, cascade="all, delete") @@ -161,3 +162,15 @@ def update_monitor_task_with_execution_log( db.commit() db.refresh(task_record) return task_record + + +def is_monitor_task_cancelled(db: Session, celery_id: str) -> bool: + """Check if a monitor task has been cancelled by inspecting its status. + + Cancellation uses awaiting_processing because partial classification + results are preserved — the task is paused, not discarded. + """ + task = MonitorTask.get_by(db=db, field="celery_id", value=celery_id) + if not task: + return False + return task.status == ExecutionLogStatus.awaiting_processing diff --git a/tests/ops/models/detection_discovery/test_monitor_task.py b/tests/ops/models/detection_discovery/test_monitor_task.py index 1cfd80c0ed0..7bab70ad158 100644 --- a/tests/ops/models/detection_discovery/test_monitor_task.py +++ b/tests/ops/models/detection_discovery/test_monitor_task.py @@ -8,6 +8,7 @@ MonitorTaskType, TaskRunType, create_monitor_task_with_execution_log, + is_monitor_task_cancelled, update_monitor_task_with_execution_log, ) from fides.api.models.worker_task import ExecutionLogStatus @@ -508,3 +509,76 @@ def test_update_without_required_params(self, db: Session) -> None: status=ExecutionLogStatus.in_processing, ) assert "Either celery_id or task_record must be provided" in str(exc) + + +class TestMonitorTaskCancellation: + """Tests for cancellation-related features: group_id and is_cancelled.""" + + def test_group_id(self, db: Session, monitor_config) -> None: + """Tasks from the same classify operation share a group_id.""" + group_id = "group-abc-123" + task_1 = MonitorTask.create( + db=db, + data={ + "celery_id": "test-celery-group-1", + "action_type": MonitorTaskType.LLM_CLASSIFICATION.value, + "status": ExecutionLogStatus.pending.value, + "monitor_config_id": monitor_config.id, + "group_id": group_id, + }, + ) + task_2 = MonitorTask.create( + db=db, + data={ + "celery_id": "test-celery-group-2", + "action_type": MonitorTaskType.LLM_CLASSIFICATION.value, + "status": ExecutionLogStatus.pending.value, + "monitor_config_id": monitor_config.id, + "group_id": group_id, + }, + ) + + assert task_1.group_id == task_2.group_id == group_id + + siblings = db.query(MonitorTask).filter(MonitorTask.group_id == group_id).all() + assert len(siblings) == 2 + + db.delete(task_1) + db.delete(task_2) + db.commit() + + @pytest.mark.parametrize( + "status,expected", + [ + pytest.param( + ExecutionLogStatus.awaiting_processing.value, + True, + id="awaiting_processing", + ), + pytest.param(ExecutionLogStatus.pending.value, False, id="pending"), + pytest.param( + ExecutionLogStatus.in_processing.value, False, id="in_processing" + ), + pytest.param(ExecutionLogStatus.complete.value, False, id="complete"), + pytest.param(ExecutionLogStatus.error.value, False, id="error"), + ], + ) + def test_is_cancelled(self, db: Session, monitor_config, status, expected) -> None: + celery_id = f"celery-is-cancelled-{status}" + task = MonitorTask.create( + db=db, + data={ + "celery_id": celery_id, + "action_type": MonitorTaskType.LLM_CLASSIFICATION.value, + "status": status, + "monitor_config_id": monitor_config.id, + }, + ) + + assert is_monitor_task_cancelled(db, celery_id) is expected + + db.delete(task) + db.commit() + + def test_is_cancelled_unknown_celery_id(self, db: Session) -> None: + assert is_monitor_task_cancelled(db, "non-existent-celery-id") is False