Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .fides/db_dataset.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3677,6 +3677,8 @@ dataset:
data_categories: [system.operations]
- name: child_resource_urns
data_categories: [system.operations]
- name: group_id
data_categories: [system.operations]
- name: created_at
data_categories: [system.operations]
- name: updated_at
Expand Down
4 changes: 4 additions & 0 deletions changelog/8115-add-group-id-to-monitortask.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
type: Added
description: Add group_id column to MonitorTask and is_monitor_task_cancelled for task cancellation support
pr: 8115
labels: ["db-migration"]
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import ConnectionTypeLogo, {
} from "~/features/datastore-connections/ConnectionTypeLogo";
import {
ConnectionType,
ExecutionLogStatus,
MonitorTaskResponse,
MonitorTaskType,
} from "~/types/api";
Expand All @@ -32,15 +33,8 @@ import {

const { Paragraph, Text, Title } = Typography;

// Helper function to format status names for display
const formatStatusForDisplay = (status: string): string => {
// Special case: "paused" should display as "Awaiting Processing"
if (status === "paused") {
return "Awaiting Processing";
}

return status.split("_").map(capitalize).join(" ");
};
const formatStatusForDisplay = (status: string) =>
status.split("_").map(capitalize).join(" ");

interface InProgressMonitorTaskItemProps extends ListItemProps {
task: MonitorTaskResponse;
Expand Down Expand Up @@ -102,11 +96,10 @@ export const InProgressMonitorTaskItem = ({

const taskCount = task.staged_resource_urns?.length || 0;
const isInProgress = [
"pending",
"in_processing",
"paused",
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the paused state here because it was showing a spinner, which made it look like the process was still running

"retrying",
].includes((task.status || "").toLowerCase());
ExecutionLogStatus.PENDING,
ExecutionLogStatus.IN_PROCESSING,
ExecutionLogStatus.RETRYING,
].some((status) => status === task.status?.toLowerCase());
const fieldCount = task.field_count || taskCount;
const taskTitle = (() => {
if (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,8 @@ import { DebouncedSearchInput } from "../../../common/DebouncedSearchInput";
import { useInProgressMonitorTasksList } from "../hooks/useInProgressMonitorTasksList";
import { InProgressMonitorTaskItem } from "./InProgressMonitorTaskItem";

// Helper function to format status names for display
const formatStatusForDisplay = (
status: ExecutionLogStatus | string,
): string => {
// Special case: "paused" should display as "Awaiting Processing"
if (status === "paused") {
return "Awaiting Processing";
}

return status.split("_").map(capitalize).join(" ");
};
const formatStatusForDisplay = (status: ExecutionLogStatus | string): string =>
status.split("_").map(capitalize).join(" ");

export const InProgressMonitorTasksList = ({
filters,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ export const useInProgressMonitorTasksList = ({
() => [
ExecutionLogStatus.PENDING,
ExecutionLogStatus.IN_PROCESSING,
ExecutionLogStatus.PAUSED,
ExecutionLogStatus.RETRYING,
ExecutionLogStatus.ERROR,
],
Expand Down Expand Up @@ -74,7 +73,6 @@ export const useInProgressMonitorTasksList = ({
}, [defaultStatusFilters, resetPagination]);

// All possible status values from ExecutionLogStatus enum
// Note: awaiting_processing displays as "Awaiting Processing" but maps to "paused" in the API
const allPossibleStatuses: ExecutionLogStatus[] = [
ExecutionLogStatus.PENDING,
ExecutionLogStatus.IN_PROCESSING,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""Add group_id to MonitorTask

Revision ID: 9f21507db078
Revises: e3f4a5b6c7d8
Create Date: 2026-05-04 21:18:44.962348

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '9f21507db078'
down_revision = 'e3f4a5b6c7d8'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('monitortask', sa.Column('group_id', sa.String(255), nullable=True))
op.create_index(op.f('ix_monitortask_group_id'), 'monitortask', ['group_id'], unique=False)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_monitortask_group_id'), table_name='monitortask')
op.drop_column('monitortask', 'group_id')
# ### end Alembic commands ###
2 changes: 2 additions & 0 deletions src/fides/api/models/detection_discovery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
MonitorTaskType,
TaskRunType,
create_monitor_task_with_execution_log,
is_monitor_task_cancelled,
update_monitor_task_with_execution_log,
)
from .staged_resource_error import StagedResourceError
Expand All @@ -39,5 +40,6 @@
"MonitorTaskType",
"TaskRunType",
"create_monitor_task_with_execution_log",
"is_monitor_task_cancelled",
"update_monitor_task_with_execution_log",
]
13 changes: 13 additions & 0 deletions src/fides/api/models/detection_discovery/monitor_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class MonitorTask(WorkerTask, Base):
)
staged_resource_urns = Column(ARRAY(String), nullable=True)
child_resource_urns = Column(ARRAY(String), nullable=True)
group_id = Column(String(255), nullable=True, index=True)
dismissed = Column(Boolean, nullable=False, default=False)

monitor_config = relationship(MonitorConfig, cascade="all, delete")
Expand Down Expand Up @@ -161,3 +162,15 @@ def update_monitor_task_with_execution_log(
db.commit()
db.refresh(task_record)
return task_record


def is_monitor_task_cancelled(db: Session, celery_id: str) -> bool:
"""Check if a monitor task has been cancelled by inspecting its status.

Cancellation uses awaiting_processing because partial classification
results are preserved — the task is paused, not discarded.
"""
task = MonitorTask.get_by(db=db, field="celery_id", value=celery_id)
if not task:
return False
return task.status == ExecutionLogStatus.awaiting_processing
74 changes: 74 additions & 0 deletions tests/ops/models/detection_discovery/test_monitor_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
MonitorTaskType,
TaskRunType,
create_monitor_task_with_execution_log,
is_monitor_task_cancelled,
update_monitor_task_with_execution_log,
)
from fides.api.models.worker_task import ExecutionLogStatus
Expand Down Expand Up @@ -508,3 +509,76 @@ def test_update_without_required_params(self, db: Session) -> None:
status=ExecutionLogStatus.in_processing,
)
assert "Either celery_id or task_record must be provided" in str(exc)


class TestMonitorTaskCancellation:
"""Tests for cancellation-related features: group_id and is_cancelled."""

def test_group_id(self, db: Session, monitor_config) -> None:
"""Tasks from the same classify operation share a group_id."""
group_id = "group-abc-123"
task_1 = MonitorTask.create(
db=db,
data={
"celery_id": "test-celery-group-1",
"action_type": MonitorTaskType.LLM_CLASSIFICATION.value,
"status": ExecutionLogStatus.pending.value,
"monitor_config_id": monitor_config.id,
"group_id": group_id,
},
)
task_2 = MonitorTask.create(
db=db,
data={
"celery_id": "test-celery-group-2",
"action_type": MonitorTaskType.LLM_CLASSIFICATION.value,
"status": ExecutionLogStatus.pending.value,
"monitor_config_id": monitor_config.id,
"group_id": group_id,
},
)

assert task_1.group_id == task_2.group_id == group_id

siblings = db.query(MonitorTask).filter(MonitorTask.group_id == group_id).all()
assert len(siblings) == 2

db.delete(task_1)
db.delete(task_2)
db.commit()
Comment thread
vcruces marked this conversation as resolved.

@pytest.mark.parametrize(
"status,expected",
[
pytest.param(
ExecutionLogStatus.awaiting_processing.value,
True,
id="awaiting_processing",
),
pytest.param(ExecutionLogStatus.pending.value, False, id="pending"),
pytest.param(
ExecutionLogStatus.in_processing.value, False, id="in_processing"
),
pytest.param(ExecutionLogStatus.complete.value, False, id="complete"),
pytest.param(ExecutionLogStatus.error.value, False, id="error"),
],
)
def test_is_cancelled(self, db: Session, monitor_config, status, expected) -> None:
celery_id = f"celery-is-cancelled-{status}"
task = MonitorTask.create(
db=db,
data={
"celery_id": celery_id,
"action_type": MonitorTaskType.LLM_CLASSIFICATION.value,
"status": status,
"monitor_config_id": monitor_config.id,
},
)

assert is_monitor_task_cancelled(db, celery_id) is expected

db.delete(task)
db.commit()

def test_is_cancelled_unknown_celery_id(self, db: Session) -> None:
assert is_monitor_task_cancelled(db, "non-existent-celery-id") is False
Loading