Skip to content

Commit 5491d45

Browse files
committed
Record hook failures on active spans
- replace standalone junjo.hook_error spans with junjo.hook_error events on the surrounding workflow, subflow, node, or run-concurrent span - dispatch terminal hooks before span close so terminal hook failures stay attached to the real execution span - update hook telemetry tests to prove the new timing and event model - keep public docs focused on user-facing hook behavior while tracking the telemetry change in the changelog
1 parent fc5a157 commit 5491d45

7 files changed

Lines changed: 98 additions & 91 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,10 @@ execution, state management, and lifecycle observation.
6161
- OpenTelemetry span attributes now use explicit identity names such as `junjo.executable_runtime_id`, `junjo.executable_structural_id`, and `junjo.enclosing_graph_structural_id` instead of the old generic `junjo.id` and `junjo.parent_id` keys.
6262
- OpenTelemetry and hook payloads now use `executable_definition_id` and `parent_executable_definition_id` instead of the older generic `definition_id` naming on those surfaces.
6363
- Workflow telemetry now records `junjo.workflow.execution_graph_snapshot` to make it explicit that the graph payload is an execution-scoped compiled snapshot containing both runtime and structural identities.
64-
- Failed workflow, subflow, node, concurrent, and hook-error spans now set the standard OpenTelemetry `error.type` attribute in addition to Junjo-specific error metadata.
64+
- Failed workflow, subflow, node, and concurrent spans now set the standard OpenTelemetry `error.type` attribute in addition to Junjo-specific error metadata.
6565
- `JunjoOtelExporter` now exposes `shutdown()`, and the docs/examples now teach provider shutdown as the normal OpenTelemetry lifecycle while keeping `flush()` as a targeted manual drain tool.
6666
- `on_state_changed` hook payloads and state-change telemetry context now identify the active executable that performed the mutation, rather than mixing workflow metadata with node or subflow runtime identities.
67+
- Hook callback failures are now recorded as `junjo.hook_error` events on the surrounding workflow, subflow, node, or concurrent span, and terminal hooks now dispatch before span close so those events stay attached to the real execution span.
6768
- Lifecycle observation examples and docs now show hook registration as a separate concern from workflow definition.
6869
- `_NestableWorkflow` remains documented in the generated API reference, but is no longer exported from the top-level `junjo` package for direct consumption.
6970
- Public docstrings and examples were updated to reflect the current execution, hooks, and result APIs.

src/junjo/_lifecycle.py

Lines changed: 19 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import inspect
4+
import traceback
45
from collections.abc import Mapping
56
from contextlib import contextmanager
67
from contextvars import ContextVar
@@ -10,8 +11,7 @@
1011
from opentelemetry import trace
1112

1213
from . import hooks as hook_events
13-
from .telemetry.otel_schema import JUNJO_OTEL_MODULE_NAME, JunjoOtelSpanTypes
14-
from .telemetry.span_lifecycle import mark_span_failed
14+
from .telemetry.otel_schema import JunjoOtelSpanTypes
1515

1616
if TYPE_CHECKING:
1717
from .hooks import Hooks
@@ -94,7 +94,7 @@ async def dispatch(self, prepared: PreparedHookEvent | None) -> None:
9494
if inspect.isawaitable(result):
9595
await result
9696
except Exception as exc:
97-
self._record_hook_error(prepared.event_name, callback, exc, prepared.event)
97+
self._record_hook_error(prepared.event_name, callback, exc)
9898

9999
async def workflow_started(
100100
self,
@@ -621,58 +621,22 @@ def _record_hook_error(
621621
event_name: str,
622622
callback,
623623
exc: Exception,
624-
event: hook_events.LifecycleEvent,
625624
) -> None:
626-
tracer = trace.get_tracer(JUNJO_OTEL_MODULE_NAME)
625+
span = trace.get_current_span()
626+
if not span.is_recording():
627+
return
628+
627629
callback_name = getattr(callback, "__qualname__", callback.__class__.__qualname__)
628630
callback_module = getattr(callback, "__module__", callback.__class__.__module__)
629-
630-
with tracer.start_as_current_span("junjo.hook_error") as span:
631-
span.set_attribute("junjo.hook.event", event_name)
632-
span.set_attribute(
633-
"junjo.hook.callback",
634-
f"{callback_module}.{callback_name}",
635-
)
636-
span.set_attribute("junjo.hook.error.type", type(exc).__name__)
637-
span.set_attribute("junjo.hook.error.message", str(exc))
638-
span.set_attribute("junjo.trace_id", event.trace_id)
639-
span.set_attribute("junjo.span_id", event.span_id)
640-
span.set_attribute("junjo.run_id", event.run_id)
641-
span.set_attribute(
642-
"junjo.executable_definition_id",
643-
event.executable_definition_id,
644-
)
645-
parent_executable_definition_id = getattr(
646-
event,
647-
"parent_executable_definition_id",
648-
None,
649-
)
650-
if parent_executable_definition_id is not None:
651-
span.set_attribute(
652-
"junjo.parent_executable_definition_id",
653-
parent_executable_definition_id,
654-
)
655-
span.set_attribute(
656-
"junjo.executable_runtime_id",
657-
event.executable_runtime_id,
658-
)
659-
span.set_attribute(
660-
"junjo.executable_structural_id",
661-
event.executable_structural_id,
662-
)
663-
span.set_attribute(
664-
"junjo.enclosing_graph_structural_id",
665-
event.enclosing_graph_structural_id,
666-
)
667-
if event.parent_executable_runtime_id is not None:
668-
span.set_attribute(
669-
"junjo.parent_executable_runtime_id",
670-
event.parent_executable_runtime_id,
671-
)
672-
if event.parent_executable_structural_id is not None:
673-
span.set_attribute(
674-
"junjo.parent_executable_structural_id",
675-
event.parent_executable_structural_id,
676-
)
677-
mark_span_failed(span, exc)
678-
span.record_exception(exc)
631+
error_attributes: dict[str, str] = {
632+
"junjo.hook.event": event_name,
633+
"junjo.hook.callback": f"{callback_module}.{callback_name}",
634+
"junjo.hook.error.type": type(exc).__name__,
635+
"junjo.hook.error.message": str(exc),
636+
"exception.type": type(exc).__name__,
637+
"exception.message": str(exc),
638+
"exception.stacktrace": "".join(
639+
traceback.format_exception(type(exc), exc, exc.__traceback__)
640+
),
641+
}
642+
span.add_event("junjo.hook_error", error_attributes)

src/junjo/hooks.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,9 @@ class Hooks:
210210
Registry for optional Junjo lifecycle callbacks.
211211
212212
Hooks are observers. They do not create spans or control workflow execution.
213+
If a hook callback raises, Junjo keeps execution isolated and continues
214+
dispatching the remaining callbacks for that hook.
215+
213216
To use them, register one or more callbacks and pass the registry to a
214217
workflow or subflow.
215218

src/junjo/node.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -295,8 +295,8 @@ async def execute(self, store: StoreT, parent_id: str) -> None:
295295
),
296296
)
297297

298-
if lifecycle_context is not None:
299-
await lifecycle_context.dispatcher.dispatch(prepared_terminal_event)
298+
if lifecycle_context is not None:
299+
await lifecycle_context.dispatcher.dispatch(prepared_terminal_event)
300300

301301
if cancellation is not None:
302302
raise cancellation

src/junjo/run_concurrent.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -322,8 +322,8 @@ async def execute(self, store: BaseStore, parent_id: str) -> None: # noqa: C901
322322
),
323323
)
324324

325-
if lifecycle_context is not None:
326-
await lifecycle_context.dispatcher.dispatch(prepared_terminal_event)
325+
if lifecycle_context is not None:
326+
await lifecycle_context.dispatcher.dispatch(prepared_terminal_event)
327327

328328
if cancellation is not None:
329329
raise cancellation

src/junjo/workflow.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -473,7 +473,7 @@ async def execute( # noqa: C901
473473
),
474474
)
475475

476-
await ctx.dispatcher.dispatch(prepared_terminal_event)
476+
await ctx.dispatcher.dispatch(prepared_terminal_event)
477477

478478
if cancellation is not None:
479479
raise cancellation

tests/test_hooks.py

Lines changed: 69 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -220,24 +220,31 @@ def create_graph() -> Graph:
220220

221221

222222
@pytest.mark.asyncio
223-
async def test_terminal_hooks_run_after_span_close(
223+
async def test_terminal_hooks_run_before_span_close(
224224
span_exporter: InMemorySpanExporter,
225225
) -> None:
226226
hooks = Hooks()
227-
seen_closed: list[bool] = []
227+
span_open: list[bool] = []
228+
current_span_matches_event: list[bool] = []
228229

229230
def on_completed(event) -> None:
230231
finished_span_ids = {
231232
format(span.context.span_id, "016x")
232233
for span in span_exporter.get_finished_spans()
233234
}
234-
seen_closed.append(event.span_id in finished_span_ids)
235+
current_span_id = format(
236+
trace.get_current_span().get_span_context().span_id,
237+
"016x",
238+
)
239+
span_open.append(event.span_id not in finished_span_ids)
240+
current_span_matches_event.append(current_span_id == event.span_id)
235241

236242
hooks.on_workflow_completed(on_completed)
237243

238244
await create_simple_workflow(hooks=hooks).execute()
239245

240-
assert seen_closed == [True]
246+
assert span_open == [True]
247+
assert current_span_matches_event == [True]
241248

242249

243250
@pytest.mark.asyncio
@@ -480,34 +487,66 @@ def failing_hook(event) -> None:
480487
result = await create_simple_workflow(hooks=hooks).execute()
481488

482489
assert observed == ["ran"]
490+
assert result.state.steps == ["HookNode"]
483491

484-
hook_error_spans = [
485-
span for span in span_exporter.get_finished_spans() if span.name == "junjo.hook_error"
492+
finished_spans = span_exporter.get_finished_spans()
493+
assert all(span.name != "junjo.hook_error" for span in finished_spans)
494+
495+
spans = {span.name: span for span in finished_spans}
496+
workflow_span = spans["Hook Workflow"]
497+
hook_error_events = [
498+
event for event in workflow_span.events if event.name == "junjo.hook_error"
486499
]
487-
assert len(hook_error_spans) == 1
488-
489-
hook_error_span = hook_error_spans[0]
490-
assert hook_error_span.attributes["junjo.hook.event"] == "workflow_started"
491-
assert hook_error_span.attributes["error.type"] == "RuntimeError"
492-
assert hook_error_span.attributes["junjo.hook.error.type"] == "RuntimeError"
493-
assert hook_error_span.attributes["junjo.hook.error.message"] == "bad hook"
494-
hook_error_exception = next(
495-
event for event in hook_error_span.events if event.name == "exception"
496-
)
497-
assert hook_error_exception.attributes["exception.type"] == "RuntimeError"
498-
assert hook_error_exception.attributes["exception.message"] == "bad hook"
499-
assert hook_error_span.attributes["junjo.run_id"] == result.run_id
500-
assert (
501-
hook_error_span.attributes["junjo.executable_definition_id"]
502-
== result.definition_id
503-
)
504-
assert "junjo.parent_executable_definition_id" not in hook_error_span.attributes
505-
assert hook_error_span.attributes["junjo.executable_runtime_id"] == result.run_id
506-
assert (
507-
hook_error_span.attributes["junjo.executable_structural_id"]
508-
== hook_error_span.attributes["junjo.enclosing_graph_structural_id"]
509-
)
510-
assert hook_error_span.status.status_code is StatusCode.ERROR
500+
assert len(hook_error_events) == 1
501+
502+
hook_error_event = hook_error_events[0]
503+
assert hook_error_event.attributes["junjo.hook.event"] == "workflow_started"
504+
assert "failing_hook" in hook_error_event.attributes["junjo.hook.callback"]
505+
assert hook_error_event.attributes["junjo.hook.error.type"] == "RuntimeError"
506+
assert hook_error_event.attributes["junjo.hook.error.message"] == "bad hook"
507+
assert hook_error_event.attributes["exception.type"] == "RuntimeError"
508+
assert hook_error_event.attributes["exception.message"] == "bad hook"
509+
assert "exception.stacktrace" in hook_error_event.attributes
510+
assert workflow_span.status.status_code is StatusCode.UNSET
511+
512+
513+
@pytest.mark.asyncio
514+
async def test_terminal_hook_failures_are_recorded_on_the_terminal_span(
515+
span_exporter: InMemorySpanExporter,
516+
) -> None:
517+
hooks = Hooks()
518+
span_open: list[bool] = []
519+
520+
def failing_hook(event) -> None:
521+
finished_span_ids = {
522+
format(span.context.span_id, "016x")
523+
for span in span_exporter.get_finished_spans()
524+
}
525+
span_open.append(event.span_id not in finished_span_ids)
526+
raise RuntimeError("bad terminal hook")
527+
528+
hooks.on_workflow_completed(failing_hook)
529+
530+
result = await create_simple_workflow(hooks=hooks).execute()
531+
532+
assert result.state.steps == ["HookNode"]
533+
assert span_open == [True]
534+
535+
finished_spans = span_exporter.get_finished_spans()
536+
assert all(span.name != "junjo.hook_error" for span in finished_spans)
537+
538+
workflow_span = next(span for span in finished_spans if span.name == "Hook Workflow")
539+
hook_error_events = [
540+
event for event in workflow_span.events if event.name == "junjo.hook_error"
541+
]
542+
assert len(hook_error_events) == 1
543+
hook_error_event = hook_error_events[0]
544+
assert hook_error_event.attributes["junjo.hook.event"] == "workflow_completed"
545+
assert hook_error_event.attributes["junjo.hook.error.type"] == "RuntimeError"
546+
assert hook_error_event.attributes["junjo.hook.error.message"] == "bad terminal hook"
547+
assert hook_error_event.attributes["exception.type"] == "RuntimeError"
548+
assert hook_error_event.attributes["exception.message"] == "bad terminal hook"
549+
assert workflow_span.status.status_code is StatusCode.UNSET
511550

512551

513552
@pytest.mark.asyncio

0 commit comments

Comments
 (0)