From 934adee271e9cca5dfaadf5b246f15f769ef0436 Mon Sep 17 00:00:00 2001 From: Spencer Elkington Date: Mon, 13 Apr 2026 12:31:14 -0600 Subject: [PATCH 1/8] feat: multithreading implementation --- CONTRIBUTING.md | 6 +- .../Flowthru.Core/Flows/ExecutionOptions.cs | 20 +- src/core/Flowthru.Core/Flows/Flow.cs | 127 ++++---- .../Flowthru.Core/Graph/TaskGraphExecutor.cs | 284 ++++++++++++++++++ .../Flowthru.Core/Services/FlowthruService.cs | 32 +- 5 files changed, 392 insertions(+), 77 deletions(-) create mode 100644 src/core/Flowthru.Core/Graph/TaskGraphExecutor.cs diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 541958b0..df062e5a 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -104,9 +104,9 @@ This doesn't mean *ignoring* these concerns — it just means extending the API The project uses NX for task orchestration. Common commands: ```bash -nx run flowthru:build # Build the solution -nx run flowthru:test # Run all tests with coverage -nx run flowthru:format:csharp # Format code with CSharpier +dotnet build # Confirm solution builds fully +nx run affected -t test # Run all test projects affected by current changes +dotnet test # Run all tests for the project. ``` To run a subset of tests by category: diff --git a/src/core/Flowthru.Core/Flows/ExecutionOptions.cs b/src/core/Flowthru.Core/Flows/ExecutionOptions.cs index ee999ed4..1175d100 100644 --- a/src/core/Flowthru.Core/Flows/ExecutionOptions.cs +++ b/src/core/Flowthru.Core/Flows/ExecutionOptions.cs @@ -1,5 +1,5 @@ -using Flowthru.Core.Results; using Flowthru.Core.Graph; +using Flowthru.Core.Results; namespace Flowthru.Core.Flows; @@ -34,13 +34,23 @@ public class ExecutionOptions public bool StopOnFirstError { get; set; } = true; /// - /// Whether to enable parallel execution of nodes within the same layer. + /// Maximum number of steps that may execute concurrently. /// /// - /// Phase 2 feature - currently not implemented. - /// When true, nodes in the same execution layer run concurrently. + /// + /// Controls the degree of parallelism in the task-graph scheduler. Steps whose + /// dependencies are all satisfied are dispatched immediately, up to this limit. + /// + /// + /// + /// 1 (default) — sequential execution; one step at a time in topological order. + /// N > 1 — up to N independent steps run concurrently. + /// -1 or — unbounded parallelism; all ready steps + /// are dispatched immediately. + /// + /// /// - public bool EnableParallelExecution { get; set; } = false; + public int MaxDegreeOfParallelism { get; set; } = 1; /// /// The result formatter to use for displaying execution results. diff --git a/src/core/Flowthru.Core/Flows/Flow.cs b/src/core/Flowthru.Core/Flows/Flow.cs index 29dc490f..ce31c5ae 100644 --- a/src/core/Flowthru.Core/Flows/Flow.cs +++ b/src/core/Flowthru.Core/Flows/Flow.cs @@ -553,20 +553,40 @@ public DagMetadata ExportDag() } /// - /// /// Builds and executes the flow, returning comprehensive execution results. + /// Builds and executes the flow, returning comprehensive execution results. /// /// Cancellation token to signal graceful shutdown - /// FlowResult containing execution status, timing, and Flow results + /// FlowResult containing execution status, timing, and step results /// - /// /// This is the primary high-level API for executing flows. It automatically - /// calls Build() if the Flow hasn't been built yet, then executes and tracks results. + /// calls Build() if the Flow hasn't been built yet, then executes via the + /// task-graph scheduler with default options (sequential, stop on first error). + /// + public Task RunAsync(CancellationToken cancellationToken) => + RunAsync(new ExecutionOptions(), cancellationToken); + + /// + /// Builds and executes the flow with the supplied execution options. + /// + /// Controls parallelism, error policy, and other execution behaviour. + /// Cancellation token to signal graceful shutdown + /// FlowResult containing execution status, timing, and step results + /// + /// + /// Steps are dispatched by the task-graph scheduler as soon as all their dependencies + /// complete, up to concurrent steps. + /// + /// + /// With MaxDegreeOfParallelism = 1 (default) execution is sequential and + /// behaviourally equivalent to the previous layer-by-layer loop. /// /// - public async Task RunAsync(CancellationToken cancellationToken) + public async Task RunAsync( + ExecutionOptions options, + CancellationToken cancellationToken + ) { var stopwatch = Stopwatch.StartNew(); - var stepResults = new Dictionary(); try { @@ -577,33 +597,34 @@ public async Task RunAsync(CancellationToken cancellationToken) Build(); } - Logger?.LogInformation("Starting flow execution via RunAsync()"); + var stepList = (IReadOnlyList)(_slicedSteps ?? _steps); - // Execute all layers - foreach (var layer in ExecutionLayers!) - { - Logger?.LogInformation("Executing layer with {StepCount} steps", layer.Count); + Logger?.LogInformation( + "Starting flow execution via RunAsync() ({StepCount} steps, parallelism={Parallelism})", + stepList.Count, + options.MaxDegreeOfParallelism + ); - foreach (var flowStep in layer) - { - // Check for cancellation before starting each step - cancellationToken.ThrowIfCancellationRequested(); + var executor = new Graph.TaskGraphExecutor( + stepList, + options.MaxDegreeOfParallelism, + ExecuteStepWithTrackingAsync, + Logger + ); - var stepResult = await ExecuteStepWithTrackingAsync(flowStep, cancellationToken); - stepResults[flowStep.Label] = stepResult; + var stepResults = await executor.RunAsync(options.StopOnFirstError, cancellationToken); - // If step failed, stop execution - if (!stepResult.Success) - { - stopwatch.Stop(); - return FlowResult.CreateFailure( - stopwatch.Elapsed, - stepResult.Exception!, - stepResults, - Name - ); - } - } + // Surface the first failure when StopOnFirstError is true. + var firstFailure = stepResults.Values.FirstOrDefault(r => !r.Success); + if (firstFailure != null) + { + stopwatch.Stop(); + return FlowResult.CreateFailure( + stopwatch.Elapsed, + firstFailure.Exception!, + stepResults, + Name + ); } stopwatch.Stop(); @@ -616,8 +637,7 @@ public async Task RunAsync(CancellationToken cancellationToken) } catch (OperationCanceledException) { - // Re-throw cancellation exceptions so they propagate to the caller - // Cancellation is not a failure but a requested abort + // Re-throw — cancellation is not a failure but a requested abort. stopwatch.Stop(); throw; } @@ -625,31 +645,23 @@ public async Task RunAsync(CancellationToken cancellationToken) { stopwatch.Stop(); Logger?.LogError(ex, "Flow execution failed: {ErrorMessage}", ex.Message); - return FlowResult.CreateFailure(stopwatch.Elapsed, ex, stepResults, Name); + return FlowResult.CreateFailure( + stopwatch.Elapsed, + ex, + new Dictionary(), + Name + ); } } /// - /// Executes the Flow sequentially, layer by layer. + /// Executes the flow in topological order, throwing on the first step failure. /// /// Cancellation token to signal graceful shutdown - /// Task representing the Flow execution - /// Thrown if Flow has not been built + /// Task representing the flow execution + /// Thrown if the flow has not been built /// - /// - /// This method executes Flow in topological order: - /// 1. Execute all Flow in layer 0 sequentially - /// 2. Execute all Flow in layer 1 sequentially - /// 3. Continue until all layers are complete - /// - /// - /// Note: This method throws exceptions on failure. For result-based - /// execution with error handling, use RunAsync() instead. - /// - /// - /// In Phase 2, this will be replaced with a parallel executor that can run - /// steps within the same layer concurrently. - /// + /// For structured result-based execution (including parallel), use . /// public async Task ExecuteAsync(CancellationToken cancellationToken) { @@ -664,22 +676,17 @@ public async Task ExecuteAsync(CancellationToken cancellationToken) try { - foreach (var layer in ExecutionLayers!) - { - Logger?.LogInformation("Executing layer with {StepCount} steps", layer.Count); + var result = await RunAsync(cancellationToken); - foreach (var flowStep in layer) - { - // Check for cancellation before starting each step - cancellationToken.ThrowIfCancellationRequested(); - - await ExecuteStepAsync(flowStep, cancellationToken); - } + if (!result.Success) + { + throw result.Exception + ?? new InvalidOperationException("Flow execution failed with no exception details."); } Logger?.LogInformation("Flow execution completed successfully"); } - catch (Exception ex) + catch (Exception ex) when (ex is not OperationCanceledException) { Logger?.LogError(ex, "Flow execution failed: {ErrorMessage}", ex.Message); throw; diff --git a/src/core/Flowthru.Core/Graph/TaskGraphExecutor.cs b/src/core/Flowthru.Core/Graph/TaskGraphExecutor.cs new file mode 100644 index 00000000..79a79fab --- /dev/null +++ b/src/core/Flowthru.Core/Graph/TaskGraphExecutor.cs @@ -0,0 +1,284 @@ +using System.Collections.Concurrent; +using Flowthru.Core.Flows; +using Microsoft.Extensions.Logging; + +namespace Flowthru.Core.Graph; + +/// +/// Executes a built flow DAG using a task-graph scheduler. +/// +/// +/// +/// Steps are dispatched as soon as all their dependencies complete, up to a configurable +/// maximum degree of parallelism. With MaxDegreeOfParallelism = 1 the scheduler +/// produces a valid topological order and is behaviourally equivalent to the previous +/// sequential layer-by-layer loop, though the specific ordering within a layer may differ +/// (depth-first rather than breadth-first). +/// +/// +/// Correctness guarantees provided by the DAG: +/// +/// Single Producer Rule — no two steps write to the same catalog entry, so +/// concurrent steps in the same "layer" cannot have write conflicts. +/// Dependency edges — a step is never dispatched before all its producers have +/// written their outputs. +/// +/// +/// +internal sealed class TaskGraphExecutor +{ + private readonly IReadOnlyList _steps; + private readonly int _maxDegreeOfParallelism; + private readonly ILogger? _logger; + private readonly Func> _executeStep; + + /// + /// The flat list of steps to execute (after slicing). Dependencies must have been + /// resolved by before calling + /// . + /// + /// + /// Maximum concurrent steps. Pass 1 for sequential execution. + /// Pass for unbounded parallelism. + /// + /// Per-step execution delegate (matches ExecuteStepWithTrackingAsync). + /// Optional logger. + internal TaskGraphExecutor( + IReadOnlyList steps, + int maxDegreeOfParallelism, + Func> executeStep, + ILogger? logger = null + ) + { + if (maxDegreeOfParallelism < 1 && maxDegreeOfParallelism != -1) + { + throw new ArgumentOutOfRangeException( + nameof(maxDegreeOfParallelism), + "Must be -1 (unbounded) or a positive integer." + ); + } + + _steps = steps; + _maxDegreeOfParallelism = maxDegreeOfParallelism == -1 ? int.MaxValue : maxDegreeOfParallelism; + _executeStep = executeStep; + _logger = logger; + } + + /// + /// Runs all steps in dependency order, returning per-step results. + /// + /// + /// When true, cancels in-flight steps and stops dispatch on the first failure. + /// When false, only the failed step and its transitive dependents are skipped; + /// independent branches continue executing. + /// + /// External cancellation token. + /// + /// Dictionary of step label → . Includes results for every step + /// that was dispatched (not steps skipped due to a failed dependency). + /// + internal async Task> RunAsync( + bool stopOnFirstError, + CancellationToken cancellationToken + ) + { + // --- Build scheduling state --------------------------------------------------- + + // How many unfinished dependencies each step is still waiting on. + var pendingDeps = new ConcurrentDictionary( + _steps.Select(s => new KeyValuePair(s, s.Dependencies.Count)) + ); + + // Reverse adjacency: for each step, which steps depend on it? + var dependents = _steps.ToDictionary(s => s, _ => new List()); + foreach (var step in _steps) + { + foreach (var dep in step.Dependencies) + { + // dep must be in dependents because BuildDependencyGraph only adds + // steps that are in _steps (sliced or full set). + if (dependents.TryGetValue(dep, out var list)) + { + list.Add(step); + } + } + } + + // Steps whose upstream is fully satisfied (or has no dependencies). + // Channel is unbounded write / bounded dispatch (controlled by the semaphore). + var readyQueue = new ConcurrentQueue(_steps.Where(s => s.Dependencies.Count == 0)); + + // Tracks which steps were skipped because an upstream dependency failed. + var skipped = new HashSet(); + + var results = new ConcurrentDictionary(); + var semaphore = new SemaphoreSlim(_maxDegreeOfParallelism, _maxDegreeOfParallelism); + + // Linked source: either the external token or our own stop-on-error cancellation. + using var internalCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + var dispatchToken = internalCts.Token; + + // --- Dispatch loop ------------------------------------------------------------ + + // We spin tasks until every step has either produced a result or been skipped. + var inFlight = new List(); + var totalSteps = _steps.Count; + + while (results.Count + skipped.Count < totalSteps) + { + // Drain all currently runnable steps into in-flight tasks. + while (readyQueue.TryDequeue(out var step)) + { + if (skipped.Contains(step)) + { + // Already marked skipped (a dependency failed after this was enqueued). + continue; + } + + await semaphore.WaitAsync(dispatchToken).ConfigureAwait(false); + + var capturedStep = step; + var task = Task.Run( + async () => + { + try + { + _logger?.LogDebug("Dispatching step: {StepName}", capturedStep.Label); + var result = await _executeStep(capturedStep, dispatchToken).ConfigureAwait(false); + results[capturedStep.Label] = result; + + if (!result.Success) + { + _logger?.LogWarning("Step failed: {StepName}", capturedStep.Label); + + if (stopOnFirstError) + { + // Cancel everything else. + internalCts.Cancel(); + } + else + { + // Only skip transitive dependents of this step. + SkipDownstream(capturedStep, skipped, dependents); + } + } + else + { + // Notify dependents — decrement their pending count. + EnqueueReadyDependents(capturedStep, pendingDeps, dependents, skipped, readyQueue); + } + } + finally + { + semaphore.Release(); + } + }, + dispatchToken + ); + + inFlight.Add(task); + } + + if (inFlight.Count == 0) + { + // Nothing dispatched and nothing running. If not all steps accounted for, + // the dependency graph has a cycle that AssignLayers should have caught. + if (results.Count + skipped.Count < totalSteps) + { + var unaccounted = _steps + .Where(s => !results.ContainsKey(s.Label) && !skipped.Contains(s)) + .Select(s => s.Label); + throw new InvalidOperationException( + $"Task graph stalled — possible undetected cycle. Unresolved steps: " + + string.Join(", ", unaccounted) + ); + } + break; + } + + // Wait for at least one in-flight task to finish, then re-check the ready queue. + var completed = await Task.WhenAny(inFlight).ConfigureAwait(false); + + // Propagate any unhandled exceptions from the task wrapper. + // (Step failures are encoded in StepResult, not thrown — but defensive.) + try + { + await completed.ConfigureAwait(false); + } + catch (OperationCanceledException) + { + // Expected when stopOnFirstError cancels the token. + } + + inFlight.Remove(completed); + + // If the dispatch token was cancelled (stop-on-first-error), drain remaining + // in-flight tasks before surfacing. + if (dispatchToken.IsCancellationRequested && !cancellationToken.IsCancellationRequested) + { + // Wait for any tasks that are already running to finish (they may still complete + // normally since they hold the semaphore slot — they only check the token at I/O + // boundaries within ExecuteStepWithTrackingAsync). + try + { + await Task.WhenAll(inFlight).ConfigureAwait(false); + } + catch (OperationCanceledException) { } + + break; + } + } + + // Propagate external cancellation to the caller. + cancellationToken.ThrowIfCancellationRequested(); + + return new Dictionary(results); + } + + // --------------------------------------------------------------------------- + // Helpers + // --------------------------------------------------------------------------- + + private static void EnqueueReadyDependents( + FlowStep completedStep, + ConcurrentDictionary pendingDeps, + Dictionary> dependents, + HashSet skipped, + ConcurrentQueue readyQueue + ) + { + foreach (var dependent in dependents[completedStep]) + { + if (skipped.Contains(dependent)) + { + continue; + } + + var remaining = pendingDeps.AddOrUpdate(dependent, 0, (_, current) => current - 1); + if (remaining == 0) + { + readyQueue.Enqueue(dependent); + } + } + } + + private static void SkipDownstream( + FlowStep failedStep, + HashSet skipped, + Dictionary> dependents + ) + { + // BFS over the dependents graph. + var queue = new Queue(dependents[failedStep]); + while (queue.TryDequeue(out var step)) + { + if (skipped.Add(step)) + { + foreach (var downstream in dependents[step]) + { + queue.Enqueue(downstream); + } + } + } + } +} diff --git a/src/core/Flowthru.Core/Services/FlowthruService.cs b/src/core/Flowthru.Core/Services/FlowthruService.cs index 7d832b52..59ae4ec3 100644 --- a/src/core/Flowthru.Core/Services/FlowthruService.cs +++ b/src/core/Flowthru.Core/Services/FlowthruService.cs @@ -136,8 +136,17 @@ public async Task ExecuteFlowAsync( validationResult.ThrowIfInvalid(); } - var layer0Steps = mergedPipeline.ExecutionLayers![0]; - validatedInputCount = layer0Steps.SelectMany(node => node.Inputs).Distinct().Count(); + var allMergedSteps = mergedPipeline.Steps.ToList(); + var mergedProducedLabels = new HashSet( + allMergedSteps.SelectMany(s => s.Outputs.Select(o => o.Label)), + StringComparer.OrdinalIgnoreCase + ); + validatedInputCount = allMergedSteps + .SelectMany(s => s.Inputs) + .Where(i => !mergedProducedLabels.Contains(i.Label)) + .Select(i => i.Label) + .Distinct(StringComparer.OrdinalIgnoreCase) + .Count(); _logger.LogInformation(" ✓ {Count} external data sources validated", validatedInputCount); } @@ -202,7 +211,7 @@ public async Task ExecuteFlowAsync( _logger.LogInformation(""); // Execute merged pipeline - var result = await mergedPipeline.RunAsync(cancellationToken); + var result = await mergedPipeline.RunAsync(options, cancellationToken); // Format results var formatter = options.GetFormatter(); @@ -222,12 +231,17 @@ public FlowMetadata GetFlowMetadata(string pipelineName) ); } - var externalInputs = - pipeline - .ExecutionLayers?[0].SelectMany(node => node.Inputs) - .Select(e => e.Label) - .Distinct() - .ToList() ?? new List(); + var allSteps = pipeline.Steps.ToList(); + var producedLabels = new HashSet( + allSteps.SelectMany(s => s.Outputs.Select(o => o.Label)), + StringComparer.OrdinalIgnoreCase + ); + var externalInputs = allSteps + .SelectMany(s => s.Inputs) + .Where(i => !producedLabels.Contains(i.Label)) + .Select(i => i.Label) + .Distinct(StringComparer.OrdinalIgnoreCase) + .ToList(); return new FlowMetadata { From 018e8f82e9bbbe5c7ef5e83838a88f17cf4a4461 Mon Sep 17 00:00:00 2001 From: Spencer Elkington Date: Mon, 13 Apr 2026 12:45:46 -0600 Subject: [PATCH 2/8] chore: checkpoint progress --- .github/workflows/pr-tests.yml | 75 ++++++++++++++++++++ src/core/Flowthru.Core/Cli/ArgumentParser.cs | 27 ++++++- src/core/Flowthru.Core/Cli/FlowthruCli.cs | 2 + 3 files changed, 103 insertions(+), 1 deletion(-) create mode 100644 .github/workflows/pr-tests.yml diff --git a/.github/workflows/pr-tests.yml b/.github/workflows/pr-tests.yml new file mode 100644 index 00000000..898d2091 --- /dev/null +++ b/.github/workflows/pr-tests.yml @@ -0,0 +1,75 @@ +name: PR Tests + +on: + pull_request: + types: [opened, synchronize, reopened, ready_for_review] + paths-ignore: + - "**.md" + - "docs/**" + - ".github/instructions/**" + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + test: + runs-on: ubuntu-latest + if: github.event.pull_request.draft == false + + permissions: + contents: read + + steps: + - name: Checkout code + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Setup .NET + uses: actions/setup-dotnet@v4 + with: + dotnet-version: "10.0.x" + dotnet-quality: "preview" + env: + DOTNET_SKIP_FIRST_TIME_EXPERIENCE: "true" + DOTNET_NOLOGO: "true" + DOTNET_CLI_TELEMETRY_OPTOUT: "true" + + - name: Setup Node.js + uses: actions/setup-node@v4 + with: + node-version: "22" + + - name: Setup Python + uses: actions/setup-python@v5 + with: + python-version: "3.10" + + - name: Setup UV + uses: astral-sh/setup-uv@v5 + + - name: Setup pnpm + uses: pnpm/action-setup@v4 + with: + version: 10.6.3 + + - name: Cache NuGet packages + uses: actions/cache@v4 + with: + path: ~/.nuget/packages + key: ${{ runner.os }}-nuget-${{ hashFiles('**/packages.lock.json', '**/*.csproj') }} + restore-keys: | + ${{ runner.os }}-nuget- + + - name: Install Node dependencies + run: pnpm install + + - name: Run affected tests + env: + CI: "true" + run: | + pnpm nx affected -t test \ + --base=${{ github.event.pull_request.base.sha }} --head=HEAD \ + --output-style=stream \ + --logger "console;verbosity=minimal" diff --git a/src/core/Flowthru.Core/Cli/ArgumentParser.cs b/src/core/Flowthru.Core/Cli/ArgumentParser.cs index 68fb0d4e..58690c94 100644 --- a/src/core/Flowthru.Core/Cli/ArgumentParser.cs +++ b/src/core/Flowthru.Core/Cli/ArgumentParser.cs @@ -1,6 +1,6 @@ using Flowthru.Core.Flows; -using Flowthru.Core.Services.Models; using Flowthru.Core.Graph; +using Flowthru.Core.Services.Models; namespace Flowthru.Core.Cli; @@ -108,6 +108,31 @@ public static ParsedArguments Parse(string[] args, IEnumerable available } break; + case "--parallelism": + if (i + 1 < args.Length) + { + var value = args[++i]; + if (value.Equals("auto", StringComparison.OrdinalIgnoreCase)) + { + options.MaxDegreeOfParallelism = Environment.ProcessorCount; + } + else if (int.TryParse(value, out var n) && n >= 1) + { + options.MaxDegreeOfParallelism = n; + } + else + { + throw new ArgumentException( + $"--parallelism requires a positive integer or 'auto', got: '{value}'" + ); + } + } + else + { + throw new ArgumentException("--parallelism requires a value"); + } + break; + case "--help": case "-h": return new ParsedArguments { ShowHelp = true }; diff --git a/src/core/Flowthru.Core/Cli/FlowthruCli.cs b/src/core/Flowthru.Core/Cli/FlowthruCli.cs index ebf5cfd3..a22cb5ca 100644 --- a/src/core/Flowthru.Core/Cli/FlowthruCli.cs +++ b/src/core/Flowthru.Core/Cli/FlowthruCli.cs @@ -169,6 +169,8 @@ private void ShowHelp() _output.WriteLine(); _output.WriteLine("Options:"); _output.WriteLine(" --dry-run Validate without executing steps"); + _output.WriteLine(" --parallelism N|auto Max concurrent steps (default: 1 sequential)."); + _output.WriteLine(" 'auto' uses Environment.ProcessorCount."); _output.WriteLine(" --no-metadata Disable metadata export"); _output.WriteLine(" --metadata-output DIR Specify metadata output directory"); _output.WriteLine(" -h, --help Show this help message"); From 67a1065ae64976f4092fb8f5f1dcc7ebedebb77f Mon Sep 17 00:00:00 2001 From: Spencer Elkington Date: Mon, 13 Apr 2026 13:27:43 -0600 Subject: [PATCH 3/8] fix: programmatic defaults for core counts --- .../Data/_01_Raw/Catalog.Raw.cs | 6 +- .../Flowthru.Core/Flows/ExecutionOptions.cs | 8 +- src/core/Flowthru.Core/Flows/Flow.cs | 7 +- .../Flowthru.Core/Graph/TaskGraphExecutor.cs | 54 ++- .../Services/FlowthruExecutionDefaults.cs | 15 + .../Flowthru.Core/Services/FlowthruService.cs | 9 + .../Services/FlowthruServiceBuilder.cs | 37 ++ .../03_Execution/ParallelExecutionTests.cs | 355 ++++++++++++++++++ .../05_Cli/ArgumentParserParallelismTests.cs | 100 +++++ .../_Fixtures/TestCatalogs/TestCatalogs.cs | 32 ++ .../_Fixtures/TestSteps/TestSteps.cs | 25 ++ 11 files changed, 631 insertions(+), 17 deletions(-) create mode 100644 src/core/Flowthru.Core/Services/FlowthruExecutionDefaults.cs create mode 100644 tests/Flowthru.Tests/03_Execution/ParallelExecutionTests.cs create mode 100644 tests/Flowthru.Tests/05_Cli/ArgumentParserParallelismTests.cs diff --git a/examples/advanced/KedroSpaceflightsGQL/Data/_01_Raw/Catalog.Raw.cs b/examples/advanced/KedroSpaceflightsGQL/Data/_01_Raw/Catalog.Raw.cs index 37de0f02..b6cdb4c3 100644 --- a/examples/advanced/KedroSpaceflightsGQL/Data/_01_Raw/Catalog.Raw.cs +++ b/examples/advanced/KedroSpaceflightsGQL/Data/_01_Raw/Catalog.Raw.cs @@ -62,7 +62,7 @@ public partial class Catalog CreateItem( () => GqlItemFactory.Enumerable.Query( - label: "Companies", + label: "GQLCompanies", queryFunc: ct => _client.GetCompanies.ExecuteAsync(ct), selectData: r => r.Companies, allowEmptyData: true @@ -76,7 +76,7 @@ public partial class Catalog CreateItem( () => GqlItemFactory.Enumerable.Query( - label: "Shuttles", + label: "GQLShuttles", queryFunc: ct => _client.GetShuttles.ExecuteAsync(ct), selectData: r => r.Shuttles, allowEmptyData: true @@ -90,7 +90,7 @@ public partial class Catalog CreateItem( () => GqlItemFactory.Enumerable.Query( - label: "Reviews", + label: "GQLReviews", queryFunc: ct => _client.GetReviews.ExecuteAsync(ct), selectData: r => r.Reviews, allowEmptyData: true diff --git a/src/core/Flowthru.Core/Flows/ExecutionOptions.cs b/src/core/Flowthru.Core/Flows/ExecutionOptions.cs index 1175d100..627e549d 100644 --- a/src/core/Flowthru.Core/Flows/ExecutionOptions.cs +++ b/src/core/Flowthru.Core/Flows/ExecutionOptions.cs @@ -43,14 +43,14 @@ public class ExecutionOptions /// /// /// - /// 1 (default) — sequential execution; one step at a time in topological order. + /// null (default) — not specified at this layer; defers to the service-level + /// default set via flowthru.ConfigureExecution(), or 1 if that is also unset. + /// 1 — sequential execution; one step at a time in topological order. /// N > 1 — up to N independent steps run concurrently. - /// -1 or — unbounded parallelism; all ready steps - /// are dispatched immediately. /// /// /// - public int MaxDegreeOfParallelism { get; set; } = 1; + public int? MaxDegreeOfParallelism { get; set; } = null; /// /// The result formatter to use for displaying execution results. diff --git a/src/core/Flowthru.Core/Flows/Flow.cs b/src/core/Flowthru.Core/Flows/Flow.cs index ce31c5ae..b8302922 100644 --- a/src/core/Flowthru.Core/Flows/Flow.cs +++ b/src/core/Flowthru.Core/Flows/Flow.cs @@ -599,15 +599,18 @@ CancellationToken cancellationToken var stepList = (IReadOnlyList)(_slicedSteps ?? _steps); + // Resolve MaxDegreeOfParallelism: null means "not specified", default to 1 (sequential). + var parallelism = options.MaxDegreeOfParallelism ?? 1; + Logger?.LogInformation( "Starting flow execution via RunAsync() ({StepCount} steps, parallelism={Parallelism})", stepList.Count, - options.MaxDegreeOfParallelism + parallelism ); var executor = new Graph.TaskGraphExecutor( stepList, - options.MaxDegreeOfParallelism, + parallelism, ExecuteStepWithTrackingAsync, Logger ); diff --git a/src/core/Flowthru.Core/Graph/TaskGraphExecutor.cs b/src/core/Flowthru.Core/Graph/TaskGraphExecutor.cs index 79a79fab..22f01b9a 100644 --- a/src/core/Flowthru.Core/Graph/TaskGraphExecutor.cs +++ b/src/core/Flowthru.Core/Graph/TaskGraphExecutor.cs @@ -82,6 +82,12 @@ internal async Task> RunAsync( CancellationToken cancellationToken ) { + // Fast-path: surface external cancellation before any work begins. + // ThrowIfCancellationRequested throws OperationCanceledException (base type), which + // is what callers expect — not the TaskCanceledException that semaphore.WaitAsync + // would throw if we let it reach the dispatch loop with an already-cancelled token. + cancellationToken.ThrowIfCancellationRequested(); + // --- Build scheduling state --------------------------------------------------- // How many unfinished dependencies each step is still waiting on. @@ -113,6 +119,7 @@ CancellationToken cancellationToken var results = new ConcurrentDictionary(); var semaphore = new SemaphoreSlim(_maxDegreeOfParallelism, _maxDegreeOfParallelism); + var dispatchedCount = 0; // incremented atomically before each step starts // Linked source: either the external token or our own stop-on-error cancellation. using var internalCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); @@ -143,13 +150,29 @@ CancellationToken cancellationToken { try { - _logger?.LogDebug("Dispatching step: {StepName}", capturedStep.Label); + var threadId = Environment.CurrentManagedThreadId; + var startOrdinal = Interlocked.Increment(ref dispatchedCount); + _logger?.LogInformation( + " → {StepName} executing... ({StartOrdinal} of {Total} steps, thread {ThreadId})", + capturedStep.Label, + startOrdinal, + totalSteps, + threadId + ); + var result = await _executeStep(capturedStep, dispatchToken).ConfigureAwait(false); results[capturedStep.Label] = result; + var completedCount = results.Count; if (!result.Success) { - _logger?.LogWarning("Step failed: {StepName}", capturedStep.Label); + _logger?.LogWarning( + " ✗ {StepName} failed ({CompletedCount} of {Total} steps, thread {ThreadId})", + capturedStep.Label, + completedCount, + totalSteps, + threadId + ); if (stopOnFirstError) { @@ -164,6 +187,17 @@ CancellationToken cancellationToken } else { + _logger?.LogInformation( + " ✓ {StepName,-40} {Duration,6:F2}s ({InputCount,6} → {OutputCount,6} records) ({CompletedCount} of {Total} steps, thread {ThreadId})", + capturedStep.Label, + result.ExecutionTime.TotalSeconds, + result.InputCount, + result.OutputCount, + completedCount, + totalSteps, + threadId + ); + // Notify dependents — decrement their pending count. EnqueueReadyDependents(capturedStep, pendingDeps, dependents, skipped, readyQueue); } @@ -212,13 +246,17 @@ CancellationToken cancellationToken inFlight.Remove(completed); - // If the dispatch token was cancelled (stop-on-first-error), drain remaining - // in-flight tasks before surfacing. - if (dispatchToken.IsCancellationRequested && !cancellationToken.IsCancellationRequested) + // External cancellation — break immediately; ThrowIfCancellationRequested below + // will surface the OperationCanceledException to the caller. + if (cancellationToken.IsCancellationRequested) + { + break; + } + + // Internal stop-on-first-error cancellation — drain remaining in-flight tasks + // (they may still be running; let them finish before we return) then break. + if (dispatchToken.IsCancellationRequested) { - // Wait for any tasks that are already running to finish (they may still complete - // normally since they hold the semaphore slot — they only check the token at I/O - // boundaries within ExecuteStepWithTrackingAsync). try { await Task.WhenAll(inFlight).ConfigureAwait(false); diff --git a/src/core/Flowthru.Core/Services/FlowthruExecutionDefaults.cs b/src/core/Flowthru.Core/Services/FlowthruExecutionDefaults.cs new file mode 100644 index 00000000..7c460857 --- /dev/null +++ b/src/core/Flowthru.Core/Services/FlowthruExecutionDefaults.cs @@ -0,0 +1,15 @@ +namespace Flowthru.Core.Services; + +/// +/// Carries service-level execution defaults registered by . +/// Injected into and applied when a per-call +/// does not specify a value. +/// +internal sealed class FlowthruExecutionDefaults +{ + /// + /// Service-level default for . + /// null means "not configured at this layer; fall back to 1". + /// + public int? MaxDegreeOfParallelism { get; init; } +} diff --git a/src/core/Flowthru.Core/Services/FlowthruService.cs b/src/core/Flowthru.Core/Services/FlowthruService.cs index 59ae4ec3..c7d1e1ee 100644 --- a/src/core/Flowthru.Core/Services/FlowthruService.cs +++ b/src/core/Flowthru.Core/Services/FlowthruService.cs @@ -27,6 +27,7 @@ internal sealed class FlowthruService : IFlowthruService private readonly IServiceProvider _services; private readonly ILogger _logger; private readonly FlowthruMetadataBuilder? _metadataBuilder; + private readonly FlowthruExecutionDefaults _executionDefaults; /// /// Initializes a new instance of FlowthruService. @@ -36,6 +37,7 @@ public FlowthruService( Dictionary pipelines, IServiceProvider services, ILogger logger, + FlowthruExecutionDefaults executionDefaults, FlowthruMetadataBuilder? metadataBuilder = null ) { @@ -43,6 +45,7 @@ public FlowthruService( _pipelines = pipelines ?? throw new ArgumentNullException(nameof(pipelines)); _services = services ?? throw new ArgumentNullException(nameof(services)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _executionDefaults = executionDefaults ?? new FlowthruExecutionDefaults(); _metadataBuilder = metadataBuilder; // Inject services into all registered catalogs @@ -98,6 +101,12 @@ public async Task ExecuteFlowAsync( options ??= new ExecutionOptions(); + // Resolve MaxDegreeOfParallelism: CLI/caller value wins; service default is fallback; 1 is the floor. + options.MaxDegreeOfParallelism = + options.MaxDegreeOfParallelism + ?? _executionDefaults.MaxDegreeOfParallelism + ?? 1; + // ════════════════════════════════════════ // PRE-FLIGHT CHECKS // ════════════════════════════════════════ diff --git a/src/core/Flowthru.Core/Services/FlowthruServiceBuilder.cs b/src/core/Flowthru.Core/Services/FlowthruServiceBuilder.cs index 462b2926..19378963 100644 --- a/src/core/Flowthru.Core/Services/FlowthruServiceBuilder.cs +++ b/src/core/Flowthru.Core/Services/FlowthruServiceBuilder.cs @@ -40,12 +40,42 @@ private readonly List< private Func>? _flowFactory; private FlowRegistrationEntry? _lastRegistration; private IConfiguration? _configuration; + private int? _defaultMaxDegreeOfParallelism; internal FlowthruServiceBuilder(IServiceCollection services) { _services = services ?? throw new ArgumentNullException(nameof(services)); } + /// + /// Configures default execution behaviour for all pipelines run through this service. + /// + /// + /// Settings applied here act as the service-level default. They are overridden by any + /// value explicitly supplied in the passed to + /// (e.g. from the CLI --parallelism + /// flag). Properties left null or at their default in the per-call options fall + /// back to these values. + /// + /// + /// + /// services.AddFlowthru(flowthru => + /// { + /// flowthru.ConfigureExecution(opts => + /// { + /// opts.MaxDegreeOfParallelism = Environment.ProcessorCount; + /// }); + /// }); + /// + /// + public FlowthruServiceBuilder ConfigureExecution(Action configure) + { + var defaults = new ExecutionOptions(); + configure(defaults); + _defaultMaxDegreeOfParallelism = defaults.MaxDegreeOfParallelism; + return this; + } + /// /// Internal entry type that carries a Flow factory and its associated metadata. /// Replaces the FlowRegistrar indirection for cleaner multi-catalog support. @@ -619,6 +649,13 @@ private static void ApplyFileConfiguration(TBuilder builder, MetadataO /// internal void RegisterFlowDictionary() { + // Register the service-level default parallelism so FlowthruService can consume it. + var defaultParallelism = _defaultMaxDegreeOfParallelism; + _services.AddSingleton(new FlowthruExecutionDefaults + { + MaxDegreeOfParallelism = defaultParallelism, + }); + // Always register the catalog collection so FlowthruService can inject all catalogs. // Merges both type-registered catalogs (RegisterCatalog) and dynamically constructed // catalog collections (RegisterCatalogs). diff --git a/tests/Flowthru.Tests/03_Execution/ParallelExecutionTests.cs b/tests/Flowthru.Tests/03_Execution/ParallelExecutionTests.cs new file mode 100644 index 00000000..0a54a4fa --- /dev/null +++ b/tests/Flowthru.Tests/03_Execution/ParallelExecutionTests.cs @@ -0,0 +1,355 @@ +using System.Collections.Concurrent; +using Flowthru.Core.Flows; +using Flowthru.Core.Graph; +using Flowthru.Tests.Fixtures.TestCatalogs; +using Flowthru.Tests.Fixtures.TestSteps; + +namespace Flowthru.Tests.Execution; + +/// +/// Tests verifying the task-graph scheduler's parallel dispatch behaviour. +/// +/// +/// +/// Concurrency is verified structurally (overlapping execution windows) rather than +/// purely by elapsed time — wall-clock assertions are inherently flaky on loaded CI +/// machines. Each test that needs to prove steps ran concurrently compares the +/// start/end timestamps recorded by and asserts that at +/// least two windows overlap. +/// +/// +/// The core DAG used across most tests is a diamond: +/// +/// (external) Input ──→ StepA (BranchA) ──┐ +/// └──→ StepB (BranchB) ──┴──→ StepMerge (Merged) +/// +/// StepA and StepB have no dependency on each other, so they are eligible for +/// concurrent dispatch whenever MaxDegreeOfParallelism > 1. +/// +/// +[TestFixture] +[Category("Execution")] +[Category("Parallel")] +public class ParallelExecutionTests +{ + // ───────────────────────────────────────────────────────────────────────── + // Helpers + // ───────────────────────────────────────────────────────────────────────── + + private static readonly IEnumerable SeedData = + [ + new TestData + { + Id = 1, + Name = "A", + Value = 1.0, + }, + new TestData + { + Id = 2, + Name = "B", + Value = 2.0, + }, + ]; + + private static readonly TimeSpan BranchDelay = TimeSpan.FromMilliseconds(300); + + /// + /// Returns true when two execution windows overlap in time. + /// + private static bool Overlaps( + (string Label, DateTime Start, DateTime End) a, + (string Label, DateTime Start, DateTime End) b + ) => a.Start < b.End && b.Start < a.End; + + // ───────────────────────────────────────────────────────────────────────── + // Correctness — parallel produces the same results as sequential + // ───────────────────────────────────────────────────────────────────────── + + [Test] + public async Task RunAsync_WithParallelism_ProducesCorrectResults() + { + var catalog = new ParallelBranchCatalog(); + await catalog.Input.Save(SeedData).Run(); + + var flow = FlowBuilder.CreateFlow(builder => + { + builder.AddStep("StepA", PassthroughStep.Create(), catalog.Input, catalog.BranchA); + builder.AddStep("StepB", PassthroughStep.Create(), catalog.Input, catalog.BranchB); + builder.AddStep( + "StepMerge", + MergeStep.Create(), + (catalog.BranchA, catalog.BranchB), + catalog.Merged + ); + }); + + var result = await flow.RunAsync( + new ExecutionOptions { MaxDegreeOfParallelism = 2 }, + CancellationToken.None + ); + + Assert.That(result.Success, Is.True); + var merged = await catalog.Merged.Load().Run(); + Assert.That( + merged.Count(), + Is.EqualTo(SeedData.Count() * 2), + "Merge should contain both branches" + ); + Assert.That(result.StepResults, Has.Count.EqualTo(3)); + Assert.That(result.StepResults.Values.All(r => r.Success), Is.True); + } + + // ───────────────────────────────────────────────────────────────────────── + // Concurrency — independent steps actually overlap when parallelism > 1 + // ───────────────────────────────────────────────────────────────────────── + + [Test] + public async Task RunAsync_WithParallelism_IndependentStepsOverlapInTime() + { + var catalog = new ParallelBranchCatalog(); + await catalog.Input.Save(SeedData).Run(); + + var log = new ConcurrentBag<(string Label, DateTime Start, DateTime End)>(); + + var flow = FlowBuilder.CreateFlow(builder => + { + builder.AddStep( + "StepA", + RecordingStep.Create(log, "StepA", BranchDelay), + catalog.Input, + catalog.BranchA + ); + builder.AddStep( + "StepB", + RecordingStep.Create(log, "StepB", BranchDelay), + catalog.Input, + catalog.BranchB + ); + }); + + var result = await flow.RunAsync( + new ExecutionOptions { MaxDegreeOfParallelism = 2 }, + CancellationToken.None + ); + + Assert.That(result.Success, Is.True, "Flow should succeed"); + Assert.That(log, Has.Count.EqualTo(2), "Both steps should have recorded"); + + var entries = log.ToList(); + var stepA = entries.First(e => e.Label == "StepA"); + var stepB = entries.First(e => e.Label == "StepB"); + + Assert.That( + Overlaps(stepA, stepB), + Is.True, + $"StepA [{stepA.Start:mm:ss.fff}–{stepA.End:mm:ss.fff}] and " + + $"StepB [{stepB.Start:mm:ss.fff}–{stepB.End:mm:ss.fff}] should overlap" + ); + } + + // ───────────────────────────────────────────────────────────────────────── + // Sequential constraint — MaxDegreeOfParallelism = 1 prevents overlap + // ───────────────────────────────────────────────────────────────────────── + + [Test] + public async Task RunAsync_WithParallelismOne_IndependentStepsDoNotOverlap() + { + var catalog = new ParallelBranchCatalog(); + await catalog.Input.Save(SeedData).Run(); + + var log = new ConcurrentBag<(string Label, DateTime Start, DateTime End)>(); + + var flow = FlowBuilder.CreateFlow(builder => + { + builder.AddStep( + "StepA", + RecordingStep.Create(log, "StepA", BranchDelay), + catalog.Input, + catalog.BranchA + ); + builder.AddStep( + "StepB", + RecordingStep.Create(log, "StepB", BranchDelay), + catalog.Input, + catalog.BranchB + ); + }); + + var result = await flow.RunAsync( + new ExecutionOptions { MaxDegreeOfParallelism = 1 }, + CancellationToken.None + ); + + Assert.That(result.Success, Is.True, "Flow should succeed"); + var entries = log.ToList(); + var stepA = entries.First(e => e.Label == "StepA"); + var stepB = entries.First(e => e.Label == "StepB"); + + Assert.That( + Overlaps(stepA, stepB), + Is.False, + $"With MaxDegreeOfParallelism=1, StepA [{stepA.Start:mm:ss.fff}–{stepA.End:mm:ss.fff}] and " + + $"StepB [{stepB.Start:mm:ss.fff}–{stepB.End:mm:ss.fff}] must not overlap" + ); + } + + // ───────────────────────────────────────────────────────────────────────── + // StopOnFirstError = true (default) — halts on the first failure + // ───────────────────────────────────────────────────────────────────────── + + [Test] + public async Task RunAsync_StopOnFirstErrorTrue_ReturnsFailureResult() + { + var catalog = new ParallelBranchCatalog(); + await catalog.Input.Save(SeedData).Run(); + + var flow = FlowBuilder.CreateFlow(builder => + { + builder.AddStep( + "FailingA", + FailingStep.Create("branch A failed"), + catalog.Input, + catalog.BranchA + ); + builder.AddStep("StepB", PassthroughStep.Create(), catalog.Input, catalog.BranchB); + }); + + var result = await flow.RunAsync( + new ExecutionOptions { MaxDegreeOfParallelism = 2, StopOnFirstError = true }, + CancellationToken.None + ); + + Assert.That(result.Success, Is.False, "Flow should fail"); + Assert.That(result.Exception, Is.Not.Null); + Assert.That(result.StepResults["FailingA"].Success, Is.False); + } + + // ───────────────────────────────────────────────────────────────────────── + // StopOnFirstError = false — independent branches complete; dependents skipped + // ───────────────────────────────────────────────────────────────────────── + + [Test] + public async Task RunAsync_StopOnFirstErrorFalse_IndependentBranchCompletes() + { + var catalog = new ParallelBranchCatalog(); + await catalog.Input.Save(SeedData).Run(); + + var flow = FlowBuilder.CreateFlow(builder => + { + // StepA fails — BranchA will not be populated. + builder.AddStep( + "FailingA", + FailingStep.Create("branch A failed"), + catalog.Input, + catalog.BranchA + ); + // StepB is independent and should still run and succeed. + builder.AddStep("StepB", PassthroughStep.Create(), catalog.Input, catalog.BranchB); + // StepMerge depends on BranchA (failed) — should be skipped. + builder.AddStep( + "StepMerge", + MergeStep.Create(), + (catalog.BranchA, catalog.BranchB), + catalog.Merged + ); + }); + + var result = await flow.RunAsync( + new ExecutionOptions { MaxDegreeOfParallelism = 2, StopOnFirstError = false }, + CancellationToken.None + ); + + Assert.That(result.Success, Is.False, "Flow should still report failure"); + Assert.That(result.StepResults["FailingA"].Success, Is.False, "FailingA should be failed"); + Assert.That(result.StepResults["StepB"].Success, Is.True, "Independent StepB should have run"); + // StepMerge is a downstream dependent of FailingA — it should have been skipped. + Assert.That( + result.StepResults, + Does.Not.ContainKey("StepMerge"), + "StepMerge should be skipped" + ); + + var branchB = await catalog.BranchB.Load().Run(); + Assert.That(branchB.Count(), Is.EqualTo(SeedData.Count()), "BranchB should have been written"); + } + + [Test] + public async Task RunAsync_StopOnFirstErrorFalse_DownstreamOfFailedStepIsSkipped() + { + // Linear chain: A → B → C. A fails. B and C should be skipped. + var catalog = new SimpleThreeStepCatalog(); + await catalog.Input.Save(SeedData).Run(); + + var flow = FlowBuilder.CreateFlow(builder => + { + builder.AddStep("StepA", FailingStep.Create("A failed"), catalog.Input, catalog.StepOne); + builder.AddStep("StepB", PassthroughStep.Create(), catalog.StepOne, catalog.StepTwo); + builder.AddStep("StepC", PassthroughStep.Create(), catalog.StepTwo, catalog.Output); + }); + + var result = await flow.RunAsync( + new ExecutionOptions { MaxDegreeOfParallelism = 1, StopOnFirstError = false }, + CancellationToken.None + ); + + Assert.That(result.Success, Is.False); + Assert.That(result.StepResults["StepA"].Success, Is.False); + Assert.That( + result.StepResults, + Does.Not.ContainKey("StepB"), + "StepB (downstream) should be skipped" + ); + Assert.That( + result.StepResults, + Does.Not.ContainKey("StepC"), + "StepC (downstream) should be skipped" + ); + } + + // ───────────────────────────────────────────────────────────────────────── + // Dependency ordering — downstream steps never start before their producers + // ───────────────────────────────────────────────────────────────────────── + + [Test] + public async Task RunAsync_WithParallelism_DownstreamStepStartsAfterProducerCompletes() + { + // Linear chain: StepA (delayed) → StepB. Even with high parallelism, StepB must + // not start until StepA has written its output. + var catalog = new SimpleThreeStepCatalog(); + await catalog.Input.Save(SeedData).Run(); + + var log = new ConcurrentBag<(string Label, DateTime Start, DateTime End)>(); + + var flow = FlowBuilder.CreateFlow(builder => + { + builder.AddStep( + "StepA", + RecordingStep.Create(log, "StepA", BranchDelay), + catalog.Input, + catalog.StepOne + ); + builder.AddStep( + "StepB", + RecordingStep.Create(log, "StepB", TimeSpan.Zero), + catalog.StepOne, + catalog.Output + ); + }); + + var result = await flow.RunAsync( + new ExecutionOptions { MaxDegreeOfParallelism = 4 }, + CancellationToken.None + ); + + Assert.That(result.Success, Is.True); + + var entries = log.ToDictionary(e => e.Label); + Assert.That( + entries["StepB"].Start >= entries["StepA"].End, + Is.True, + $"StepB must not start [{entries["StepB"].Start:mm:ss.fff}] " + + $"before StepA finishes [{entries["StepA"].End:mm:ss.fff}]" + ); + } +} diff --git a/tests/Flowthru.Tests/05_Cli/ArgumentParserParallelismTests.cs b/tests/Flowthru.Tests/05_Cli/ArgumentParserParallelismTests.cs new file mode 100644 index 00000000..e51862ec --- /dev/null +++ b/tests/Flowthru.Tests/05_Cli/ArgumentParserParallelismTests.cs @@ -0,0 +1,100 @@ +using Flowthru.Core.Cli; +using Flowthru.Core.Flows; + +namespace Flowthru.Tests.Cli; + +/// +/// Tests for focusing on the --parallelism flag. +/// +[TestFixture] +[Category("Cli")] +[Category("Parallel")] +public class ArgumentParserParallelismTests +{ + // Dummy flow names — ArgumentParser validates slicing flags against these; + // none of the parallelism tests use named flows so the list can be empty. + private static readonly IEnumerable NoFlows = []; + + // ───────────────────────────────────────────────────────────────────────── + // Integer values + // ───────────────────────────────────────────────────────────────────────── + + [TestCase(1)] + [TestCase(2)] + [TestCase(4)] + [TestCase(16)] + public void Parse_ParallelismInteger_SetsMaxDegreeOfParallelism(int n) + { + var parsed = ArgumentParser.Parse(["--parallelism", n.ToString()], NoFlows); + + Assert.That(parsed.Options!.MaxDegreeOfParallelism, Is.EqualTo(n)); + } + + // ───────────────────────────────────────────────────────────────────────── + // "auto" keyword + // ───────────────────────────────────────────────────────────────────────── + + [TestCase("auto")] + [TestCase("AUTO")] + [TestCase("Auto")] + public void Parse_ParallelismAuto_SetsMaxDegreeToProcessorCount(string token) + { + var parsed = ArgumentParser.Parse(["--parallelism", token], NoFlows); + + Assert.That( + parsed.Options!.MaxDegreeOfParallelism, + Is.EqualTo(Environment.ProcessorCount), + "'auto' should map to Environment.ProcessorCount" + ); + } + + // ───────────────────────────────────────────────────────────────────────── + // Default (flag absent) + // ───────────────────────────────────────────────────────────────────────── + + [Test] + public void Parse_NoParallelismFlag_DefaultsToOne() + { + var parsed = ArgumentParser.Parse([], NoFlows); + + Assert.That(parsed.Options!.MaxDegreeOfParallelism, Is.EqualTo(1)); + } + + // ───────────────────────────────────────────────────────────────────────── + // Invalid values + // ───────────────────────────────────────────────────────────────────────── + + [TestCase("0")] + [TestCase("-1")] + [TestCase("abc")] + [TestCase("3.5")] + public void Parse_ParallelismInvalidValue_Throws(string bad) + { + Assert.Throws( + () => ArgumentParser.Parse(["--parallelism", bad], NoFlows), + $"'--parallelism {bad}' should throw ArgumentException" + ); + } + + [Test] + public void Parse_ParallelismMissingValue_Throws() + { + Assert.Throws( + () => ArgumentParser.Parse(["--parallelism"], NoFlows), + "'--parallelism' without a value should throw ArgumentException" + ); + } + + // ───────────────────────────────────────────────────────────────────────── + // Composed with other flags + // ───────────────────────────────────────────────────────────────────────── + + [Test] + public void Parse_ParallelismWithDryRun_BothOptionsSet() + { + var parsed = ArgumentParser.Parse(["--dry-run", "--parallelism", "4"], NoFlows); + + Assert.That(parsed.Options!.MaxDegreeOfParallelism, Is.EqualTo(4)); + Assert.That(parsed.Options.DryRun.Enabled, Is.True); + } +} diff --git a/tests/Flowthru.Tests/_Fixtures/TestCatalogs/TestCatalogs.cs b/tests/Flowthru.Tests/_Fixtures/TestCatalogs/TestCatalogs.cs index c5a0a3fa..62dfb643 100644 --- a/tests/Flowthru.Tests/_Fixtures/TestCatalogs/TestCatalogs.cs +++ b/tests/Flowthru.Tests/_Fixtures/TestCatalogs/TestCatalogs.cs @@ -115,6 +115,38 @@ public DownstreamCatalog() CreateItem(() => ItemFactory.Enumerable.Memory(label: "downstream_output")); } +/// +/// Diamond-topology catalog: a shared external input feeds two independent branches +/// that are later merged into a single output. +/// +/// +/// Topology: +/// +/// (external) Input ──→ BranchA ──┐ +/// └──→ BranchB ──┴──→ Merged +/// +/// StepA and StepB are independent — ideal for verifying parallel dispatch. +/// +public class ParallelBranchCatalog : CatalogAbstract +{ + public ParallelBranchCatalog() + { + InitializeCatalogProperties(); + } + + public IItem> Input => + CreateItem(() => ItemFactory.Enumerable.Memory(label: "pb_input")); + + public IItem> BranchA => + CreateItem(() => ItemFactory.Enumerable.Memory(label: "pb_branch_a")); + + public IItem> BranchB => + CreateItem(() => ItemFactory.Enumerable.Memory(label: "pb_branch_b")); + + public IItem> Merged => + CreateItem(() => ItemFactory.Enumerable.Memory(label: "pb_merged")); +} + /// /// Third catalog for 3-arity pipeline tests. /// diff --git a/tests/Flowthru.Tests/_Fixtures/TestSteps/TestSteps.cs b/tests/Flowthru.Tests/_Fixtures/TestSteps/TestSteps.cs index 77347ba7..008e7d41 100644 --- a/tests/Flowthru.Tests/_Fixtures/TestSteps/TestSteps.cs +++ b/tests/Flowthru.Tests/_Fixtures/TestSteps/TestSteps.cs @@ -115,3 +115,28 @@ public static Func< }; } } + +/// +/// Step that records its execution window (start/end timestamp) to a shared log, +/// then applies an optional delay. Used to verify concurrent vs. sequential dispatch. +/// +public static class RecordingStep +{ + /// Shared bag that captures execution windows across steps. + /// Identifier written into the log entry. + /// How long the step blocks before completing. + public static Func, Task>> Create( + System.Collections.Concurrent.ConcurrentBag<(string Label, DateTime Start, DateTime End)> log, + string label, + TimeSpan delay + ) + { + return async (input) => + { + var start = DateTime.UtcNow; + await Task.Delay(delay); + log.Add((label, start, DateTime.UtcNow)); + return input; + }; + } +} From 107ceeae86355a718c2151bd92c005fabcc68047 Mon Sep 17 00:00:00 2001 From: Spencer Elkington Date: Mon, 13 Apr 2026 13:35:33 -0600 Subject: [PATCH 4/8] fix: corrected log output --- .../Flowthru.Core/Graph/TaskGraphExecutor.cs | 24 +++++++++++++------ .../Flowthru.Core/Services/FlowthruService.cs | 4 +--- .../Services/FlowthruServiceBuilder.cs | 7 +++--- 3 files changed, 21 insertions(+), 14 deletions(-) diff --git a/src/core/Flowthru.Core/Graph/TaskGraphExecutor.cs b/src/core/Flowthru.Core/Graph/TaskGraphExecutor.cs index 22f01b9a..132848e2 100644 --- a/src/core/Flowthru.Core/Graph/TaskGraphExecutor.cs +++ b/src/core/Flowthru.Core/Graph/TaskGraphExecutor.cs @@ -131,6 +131,11 @@ CancellationToken cancellationToken var inFlight = new List(); var totalSteps = _steps.Count; + // Pre-populate worker slot IDs (1..N). The semaphore guarantees at most N concurrent + // tasks hold the semaphore — so a slot is always available on a successful WaitAsync. + var slotCount = Math.Max(1, Math.Min(_maxDegreeOfParallelism, totalSteps)); + var workerSlots = new ConcurrentQueue(Enumerable.Range(1, slotCount)); + while (results.Count + skipped.Count < totalSteps) { // Drain all currently runnable steps into in-flight tasks. @@ -144,20 +149,22 @@ CancellationToken cancellationToken await semaphore.WaitAsync(dispatchToken).ConfigureAwait(false); + // Slot dequeue is safe here: semaphore ensures at most slotCount concurrent holders. + workerSlots.TryDequeue(out var workerId); var capturedStep = step; var task = Task.Run( async () => { try { - var threadId = Environment.CurrentManagedThreadId; var startOrdinal = Interlocked.Increment(ref dispatchedCount); _logger?.LogInformation( - " → {StepName} executing... ({StartOrdinal} of {Total} steps, thread {ThreadId})", + " → {StepName} executing... ({StartOrdinal} of {Total} steps, worker {WorkerId}/{TotalWorkers})", capturedStep.Label, startOrdinal, totalSteps, - threadId + workerId, + slotCount ); var result = await _executeStep(capturedStep, dispatchToken).ConfigureAwait(false); @@ -167,11 +174,12 @@ CancellationToken cancellationToken if (!result.Success) { _logger?.LogWarning( - " ✗ {StepName} failed ({CompletedCount} of {Total} steps, thread {ThreadId})", + " ✗ {StepName} failed ({CompletedCount} of {Total} steps, worker {WorkerId}/{TotalWorkers})", capturedStep.Label, completedCount, totalSteps, - threadId + workerId, + slotCount ); if (stopOnFirstError) @@ -188,14 +196,15 @@ CancellationToken cancellationToken else { _logger?.LogInformation( - " ✓ {StepName,-40} {Duration,6:F2}s ({InputCount,6} → {OutputCount,6} records) ({CompletedCount} of {Total} steps, thread {ThreadId})", + " ✓ {StepName,-40} {Duration,6:F2}s ({InputCount,6} → {OutputCount,6} records) ({CompletedCount} of {Total} steps, worker {WorkerId}/{TotalWorkers})", capturedStep.Label, result.ExecutionTime.TotalSeconds, result.InputCount, result.OutputCount, completedCount, totalSteps, - threadId + workerId, + slotCount ); // Notify dependents — decrement their pending count. @@ -204,6 +213,7 @@ CancellationToken cancellationToken } finally { + workerSlots.Enqueue(workerId); semaphore.Release(); } }, diff --git a/src/core/Flowthru.Core/Services/FlowthruService.cs b/src/core/Flowthru.Core/Services/FlowthruService.cs index c7d1e1ee..ff784ed0 100644 --- a/src/core/Flowthru.Core/Services/FlowthruService.cs +++ b/src/core/Flowthru.Core/Services/FlowthruService.cs @@ -103,9 +103,7 @@ public async Task ExecuteFlowAsync( // Resolve MaxDegreeOfParallelism: CLI/caller value wins; service default is fallback; 1 is the floor. options.MaxDegreeOfParallelism = - options.MaxDegreeOfParallelism - ?? _executionDefaults.MaxDegreeOfParallelism - ?? 1; + options.MaxDegreeOfParallelism ?? _executionDefaults.MaxDegreeOfParallelism ?? 1; // ════════════════════════════════════════ // PRE-FLIGHT CHECKS diff --git a/src/core/Flowthru.Core/Services/FlowthruServiceBuilder.cs b/src/core/Flowthru.Core/Services/FlowthruServiceBuilder.cs index 19378963..bb08c02c 100644 --- a/src/core/Flowthru.Core/Services/FlowthruServiceBuilder.cs +++ b/src/core/Flowthru.Core/Services/FlowthruServiceBuilder.cs @@ -651,10 +651,9 @@ internal void RegisterFlowDictionary() { // Register the service-level default parallelism so FlowthruService can consume it. var defaultParallelism = _defaultMaxDegreeOfParallelism; - _services.AddSingleton(new FlowthruExecutionDefaults - { - MaxDegreeOfParallelism = defaultParallelism, - }); + _services.AddSingleton( + new FlowthruExecutionDefaults { MaxDegreeOfParallelism = defaultParallelism } + ); // Always register the catalog collection so FlowthruService can inject all catalogs. // Merges both type-registered catalogs (RegisterCatalog) and dynamically constructed From 780386da4d8bf8c39e2e801a8249d5abed67bd8d Mon Sep 17 00:00:00 2001 From: Spencer Elkington Date: Mon, 13 Apr 2026 13:41:09 -0600 Subject: [PATCH 5/8] fix: correct test assertion on CLI incoming --- examples/advanced/KedroSpaceflights.Custom/Program.cs | 2 ++ .../Flowthru.Tests/05_Cli/ArgumentParserParallelismTests.cs | 6 ++++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/examples/advanced/KedroSpaceflights.Custom/Program.cs b/examples/advanced/KedroSpaceflights.Custom/Program.cs index e38c756b..8f15e866 100644 --- a/examples/advanced/KedroSpaceflights.Custom/Program.cs +++ b/examples/advanced/KedroSpaceflights.Custom/Program.cs @@ -94,6 +94,8 @@ private static void ConfigureServices(IServiceCollection services, string basePa flowthru .RegisterFlow(label: "Reporting", flow: ReportingFlow.Create) .WithDescription("Generates reports and visualizations"); + + flowthru.ConfigureExecution(opts => opts.MaxDegreeOfParallelism = 8); }); services.AddLogging(logging => diff --git a/tests/Flowthru.Tests/05_Cli/ArgumentParserParallelismTests.cs b/tests/Flowthru.Tests/05_Cli/ArgumentParserParallelismTests.cs index e51862ec..d257f813 100644 --- a/tests/Flowthru.Tests/05_Cli/ArgumentParserParallelismTests.cs +++ b/tests/Flowthru.Tests/05_Cli/ArgumentParserParallelismTests.cs @@ -53,11 +53,13 @@ public void Parse_ParallelismAuto_SetsMaxDegreeToProcessorCount(string token) // ───────────────────────────────────────────────────────────────────────── [Test] - public void Parse_NoParallelismFlag_DefaultsToOne() + public void Parse_NoParallelismFlag_LeavesNull() { var parsed = ArgumentParser.Parse([], NoFlows); - Assert.That(parsed.Options!.MaxDegreeOfParallelism, Is.EqualTo(1)); + // Null means "unspecified" — the priority chain in FlowthruService resolves it + // to the service-level default, or 1 if none is configured. + Assert.That(parsed.Options!.MaxDegreeOfParallelism, Is.Null); } // ───────────────────────────────────────────────────────────────────────── From 4b7f4af6dade063d1e39ae2864a91fbffc003ae2 Mon Sep 17 00:00:00 2001 From: Spencer Elkington Date: Mon, 13 Apr 2026 14:09:05 -0600 Subject: [PATCH 6/8] fix: add multithread to split pipeline example --- examples/advanced/RetailDataSplitFlow/Program.cs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/examples/advanced/RetailDataSplitFlow/Program.cs b/examples/advanced/RetailDataSplitFlow/Program.cs index 614a12ac..17860704 100644 --- a/examples/advanced/RetailDataSplitFlow/Program.cs +++ b/examples/advanced/RetailDataSplitFlow/Program.cs @@ -1,9 +1,9 @@ using Flowthru.Core.Cli; -using Flowthru.Extensions.Python; -using Flowthru.Extensions.Python.Services; using Flowthru.Core.Meta; using Flowthru.Core.Meta.Providers; using Flowthru.Core.Services; +using Flowthru.Extensions.Python; +using Flowthru.Extensions.Python.Services; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; @@ -111,6 +111,8 @@ string outputPath mermaid.WithOutputDirectory(metadataPath) ); }); + + flowthru.ConfigureExecution(opts => opts.MaxDegreeOfParallelism = 8); }); services.AddLogging(logging => From c32837b5a2079f1cd120c7c284c34f29315487ea Mon Sep 17 00:00:00 2001 From: Spencer Elkington Date: Mon, 13 Apr 2026 14:22:32 -0600 Subject: [PATCH 7/8] fix: refactor gql to better fit --- .../_02_Intermediate/Catalog.Intermediate.cs | 12 ++ .../Schemas/PreprocessedReviewSchema.cs | 23 ++++ .../DataProcessing/DataProcessingFlow.cs | 33 +----- .../Steps/CreateModelInputTableStep.cs | 36 ++---- .../Steps/PreprocessCompaniesStep.cs | 89 --------------- .../Steps/PreprocessShuttlesStep.cs | 105 ------------------ .../Flows/Ingest/IngestFlow.cs | 50 ++++++--- .../Ingest/Steps/PreprocessCompaniesStep.cs | 57 ++++++++++ .../Ingest/Steps/PreprocessReviewsStep.cs | 34 ++++++ .../Ingest/Steps/PreprocessShuttlesStep.cs | 65 +++++++++++ .../Flows/Ingest/Steps/SeedGqlDatabaseStep.cs | 43 ++----- .../Infra/GqlClient/schema.graphql | 38 ++++--- .../Infra/GqlServer/Types.cs | 36 +++--- 13 files changed, 291 insertions(+), 330 deletions(-) create mode 100644 examples/advanced/KedroSpaceflightsGQL/Data/_02_Intermediate/Schemas/PreprocessedReviewSchema.cs delete mode 100644 examples/advanced/KedroSpaceflightsGQL/Flows/DataProcessing/Steps/PreprocessCompaniesStep.cs delete mode 100644 examples/advanced/KedroSpaceflightsGQL/Flows/DataProcessing/Steps/PreprocessShuttlesStep.cs create mode 100644 examples/advanced/KedroSpaceflightsGQL/Flows/Ingest/Steps/PreprocessCompaniesStep.cs create mode 100644 examples/advanced/KedroSpaceflightsGQL/Flows/Ingest/Steps/PreprocessReviewsStep.cs create mode 100644 examples/advanced/KedroSpaceflightsGQL/Flows/Ingest/Steps/PreprocessShuttlesStep.cs diff --git a/examples/advanced/KedroSpaceflightsGQL/Data/_02_Intermediate/Catalog.Intermediate.cs b/examples/advanced/KedroSpaceflightsGQL/Data/_02_Intermediate/Catalog.Intermediate.cs index 148feff3..e67059e1 100644 --- a/examples/advanced/KedroSpaceflightsGQL/Data/_02_Intermediate/Catalog.Intermediate.cs +++ b/examples/advanced/KedroSpaceflightsGQL/Data/_02_Intermediate/Catalog.Intermediate.cs @@ -28,4 +28,16 @@ public partial class Catalog filePath: $"{_basePath}/_02_Intermediate/Datasets/preprocessed_shuttles.parquet" ) ); + + /// + /// Preprocessed review data with parsed decimal rating scores. + /// + public IItem> PreprocessedReviews => + CreateItem( + () => + ItemFactory.Enumerable.Parquet( + label: "PreprocessedReviews", + filePath: $"{_basePath}/_02_Intermediate/Datasets/preprocessed_reviews.parquet" + ) + ); } diff --git a/examples/advanced/KedroSpaceflightsGQL/Data/_02_Intermediate/Schemas/PreprocessedReviewSchema.cs b/examples/advanced/KedroSpaceflightsGQL/Data/_02_Intermediate/Schemas/PreprocessedReviewSchema.cs new file mode 100644 index 00000000..fd237711 --- /dev/null +++ b/examples/advanced/KedroSpaceflightsGQL/Data/_02_Intermediate/Schemas/PreprocessedReviewSchema.cs @@ -0,0 +1,23 @@ +using Flowthru.Core.Abstractions; + +namespace KedroSpaceflightsGQL.Data._02_Intermediate.Schemas; + +/// +/// Represents a preprocessed review with a parsed numeric rating. +/// Produced by filtering out records with unparseable rating strings. +/// +[FlowthruSchema] +public partial record PreprocessedReviewSchema +{ + /// + /// Identifier of the shuttle being reviewed. + /// + [SerializedLabel("shuttle_id")] + public required string ShuttleId { get; init; } + + /// + /// Review rating score as a decimal value. + /// + [SerializedLabel("review_scores_rating")] + public required decimal ReviewScoresRating { get; init; } +} diff --git a/examples/advanced/KedroSpaceflightsGQL/Flows/DataProcessing/DataProcessingFlow.cs b/examples/advanced/KedroSpaceflightsGQL/Flows/DataProcessing/DataProcessingFlow.cs index 602194c2..0f737e05 100644 --- a/examples/advanced/KedroSpaceflightsGQL/Flows/DataProcessing/DataProcessingFlow.cs +++ b/examples/advanced/KedroSpaceflightsGQL/Flows/DataProcessing/DataProcessingFlow.cs @@ -1,49 +1,28 @@ using Flowthru.Core.Flows; using KedroSpaceflightsGQL.Data; using KedroSpaceflightsGQL.Flows.DataProcessing.Steps; -using KedroSpaceflightsGQL.Infra.GqlClient; namespace KedroSpaceflightsGQL.Flows.DataProcessing; /// -/// Creates the data processing pipeline that preprocesses raw company and shuttle data -/// and joins it with reviews to create a model input table. +/// Data processing pipeline. Reads typed data from the GQL server and joins it into a +/// model input table. Depends on Ingest completing first via the GqlDatabaseSeeded gate. /// public static class DataProcessingFlow { - /// - /// Creates the data processing pipeline. - /// - /// The data catalog containing input and output entries. - /// A configured pipeline that produces a model input table from raw data sources. public static Flow Create(Catalog catalog) { return FlowBuilder.CreateFlow(pipeline => { - pipeline.AddStep( - label: "PreprocessCompanies", - description: "Cleans and preprocesses raw company data.", - transform: PreprocessCompaniesStep.Create(), - input: catalog.Companies, - output: catalog.PreprocessedCompanies - ); - - pipeline.AddStep( - label: "PreprocessShuttles", - description: "Cleans and preprocesses raw shuttle data.", - transform: PreprocessShuttlesStep.Create(), - input: catalog.Shuttles, - output: catalog.PreprocessedShuttles - ); - pipeline.AddStep( label: "CreateModelInputTable", description: """ - Joins preprocessed shuttle and company data with review scores to create a - unified model input table. + Joins typed shuttle, company, and review data queried from the GQL server + into a unified model input table. GqlDatabaseSeeded is consumed as an explicit + DAG gate ensuring Ingest has completed before this step executes. """, transform: CreateModelInputTableStep.Create(), - input: (catalog.PreprocessedShuttles, catalog.PreprocessedCompanies, catalog.Reviews), + input: (catalog.GqlDatabaseSeeded, catalog.Shuttles, catalog.Companies, catalog.Reviews), output: catalog.ModelInputTable ); }); diff --git a/examples/advanced/KedroSpaceflightsGQL/Flows/DataProcessing/Steps/CreateModelInputTableStep.cs b/examples/advanced/KedroSpaceflightsGQL/Flows/DataProcessing/Steps/CreateModelInputTableStep.cs index 948cd4a5..0ccfa103 100644 --- a/examples/advanced/KedroSpaceflightsGQL/Flows/DataProcessing/Steps/CreateModelInputTableStep.cs +++ b/examples/advanced/KedroSpaceflightsGQL/Flows/DataProcessing/Steps/CreateModelInputTableStep.cs @@ -1,27 +1,23 @@ using Flowthru.Core.Steps; -using KedroSpaceflightsGQL.Data._02_Intermediate.Schemas; using KedroSpaceflightsGQL.Data._03_Primary.Schemas; using KedroSpaceflightsGQL.Infra.GqlClient; namespace KedroSpaceflightsGQL.Flows.DataProcessing.Steps; /// -/// Joins preprocessed shuttle and company data with review scores to create a unified model input table. +/// Joins typed shuttle, company, and review data from the GQL server into a unified model +/// input table. All fields are already strongly-typed — no parsing required. +/// The bool first input is the GqlDatabaseSeeded gate; it is consumed only to +/// express the DAG dependency on Ingest and is otherwise unused. /// [FlowthruStep] public static class CreateModelInputTableStep { - /// - /// Creates a join function that combines shuttle, company, and review data into a single table for modeling. - /// - /// - /// A function that performs inner joins to produce records. - /// Records are filtered to include only reviews with valid numeric scores. - /// public static Func< ( - IEnumerable, - IEnumerable, + bool, + IEnumerable, + IEnumerable, IEnumerable ), IEnumerable @@ -29,25 +25,15 @@ public static Func< { return (input) => { - var (shuttles, companies, reviews) = input; - - // Parse reviews to have decimal scores - var parsedReviews = reviews - .Select(r => new - { - r.ShuttleId, - Score = decimal.TryParse(r.ReviewScoresRating, out var score) ? score : (decimal?)null, - }) - .Where(r => r.Score.HasValue) - .ToList(); + var (_, shuttles, companies, reviews) = input; // Join reviews to shuttles - var ratedShuttles = parsedReviews + var ratedShuttles = reviews .Join( shuttles, r => r.ShuttleId, s => s.Id, - (r, s) => new { Shuttle = s, ReviewScore = r.Score!.Value } + (r, s) => new { Shuttle = s, ReviewScore = r.ReviewScoresRating } ) .ToList(); @@ -74,7 +60,7 @@ public static Func< ReviewScoresRating = rs.ReviewScore, } ) - .ToList(); // Materialize query to ensure LINQ execution completes + .ToList(); return modelInputTable; }; diff --git a/examples/advanced/KedroSpaceflightsGQL/Flows/DataProcessing/Steps/PreprocessCompaniesStep.cs b/examples/advanced/KedroSpaceflightsGQL/Flows/DataProcessing/Steps/PreprocessCompaniesStep.cs deleted file mode 100644 index b721bf52..00000000 --- a/examples/advanced/KedroSpaceflightsGQL/Flows/DataProcessing/Steps/PreprocessCompaniesStep.cs +++ /dev/null @@ -1,89 +0,0 @@ -using Flowthru.Core.Steps; -using KedroSpaceflightsGQL.Data._02_Intermediate.Schemas; -using KedroSpaceflightsGQL.Infra.GqlClient; - -namespace KedroSpaceflightsGQL.Flows.DataProcessing.Steps; - -/// -/// Preprocesses raw company data by parsing rating percentages and IATA approval flags. -/// -[FlowthruStep] -public static class PreprocessCompaniesStep -{ - /// - /// Creates a preprocessing function that transforms raw company records into strongly-typed records. - /// - /// - /// A function that converts records to records. - /// Records with invalid rating percentages are filtered out. - /// - public static Func< - IEnumerable, - IEnumerable - > Create() - { - return (input) => - { - var processed = input - .Select(raw => Parse(raw)) - .Where(item => item != null) - .Cast(); - - return processed; - }; - } - - /// - /// Parses a raw company record into a preprocessed record with strongly-typed fields. - /// - /// The raw company record to parse. - /// - /// A if parsing succeeds; otherwise, null. - /// - private static PreprocessedCompanySchema? Parse(IGetCompanies_Companies raw) - { - // Parse "t" or "f" to boolean - bool iataApproved = raw.IataApproved.Trim().ToLowerInvariant() == "t"; - - // Parse percentage string (e.g., "90%" -> 0.90) - if (!TryParsePercentage(raw.CompanyRating, out var rating)) - { - return null; - } - - return new PreprocessedCompanySchema - { - Id = raw.Id, - CompanyRating = rating, - IataApproved = iataApproved, - CompanyLocation = raw.CompanyLocation, - }; - } - - /// - /// Parses a percentage string (e.g., "90%") to a decimal ratio (e.g., 0.90). - /// - /// The percentage string to parse. Expected format: digits followed by optional "%". - /// - /// When this method returns, contains the decimal ratio (0.0 to 1.0) if parsing succeeded, - /// or zero if parsing failed. - /// - /// true if parsing succeeded; otherwise, false. - private static bool TryParsePercentage(string value, out decimal result) - { - result = 0; - if (string.IsNullOrWhiteSpace(value)) - { - return false; - } - - var cleaned = value.Replace("%", "").Trim(); - if (!decimal.TryParse(cleaned, out var parsed)) - { - return false; - } - - result = parsed / 100m; - return true; - } -} diff --git a/examples/advanced/KedroSpaceflightsGQL/Flows/DataProcessing/Steps/PreprocessShuttlesStep.cs b/examples/advanced/KedroSpaceflightsGQL/Flows/DataProcessing/Steps/PreprocessShuttlesStep.cs deleted file mode 100644 index 099bf471..00000000 --- a/examples/advanced/KedroSpaceflightsGQL/Flows/DataProcessing/Steps/PreprocessShuttlesStep.cs +++ /dev/null @@ -1,105 +0,0 @@ -using Flowthru.Core.Steps; -using KedroSpaceflightsGQL.Data._02_Intermediate.Schemas; -using KedroSpaceflightsGQL.Infra.GqlClient; - -namespace KedroSpaceflightsGQL.Flows.DataProcessing.Steps; - -/// -/// Preprocesses raw shuttle data by parsing numeric fields, boolean flags, and currency values. -/// -[FlowthruStep] -public static class PreprocessShuttlesStep -{ - /// - /// Creates a preprocessing function that transforms raw shuttle records into strongly-typed records. - /// - /// - /// A function that converts records to records. - /// Records with invalid numeric fields or currency values are filtered out. - /// - public static Func< - IEnumerable, - IEnumerable - > Create() - { - return (input) => - { - var processed = input - .Select(raw => Parse(raw)) - .Where(item => item != null) - .Cast(); - - return processed; - }; - } - - /// - /// Parses a raw shuttle record into a preprocessed record with strongly-typed fields. - /// - /// The raw shuttle record to parse. - /// - /// A if all fields parse successfully; otherwise, null. - /// - private static PreprocessedShuttleSchema? Parse(IGetShuttles_Shuttles raw) - { - // Parse boolean fields - bool dCheckComplete = raw.DCheckComplete.Trim().ToLowerInvariant() == "t"; - bool moonClearanceComplete = raw.MoonClearanceComplete.Trim().ToLowerInvariant() == "t"; - - // Parse numeric fields - if (!int.TryParse(raw.Engines, out var engines)) - { - return null; - } - - if (!int.TryParse(raw.PassengerCapacity, out var passengerCapacity)) - { - return null; - } - - if (!int.TryParse(raw.Crew, out var crew)) - { - return null; - } - - // Parse money string (e.g., "$1,234.56" -> 1234.56) - if (!TryParseMoney(raw.Price, out var price)) - { - return null; - } - - return new PreprocessedShuttleSchema - { - Id = raw.Id, - ShuttleType = raw.ShuttleType, - CompanyId = raw.CompanyId, - Engines = engines, - PassengerCapacity = passengerCapacity, - Crew = crew, - Price = price, - DCheckComplete = dCheckComplete, - MoonClearanceComplete = moonClearanceComplete, - }; - } - - /// - /// Parses a currency string (e.g., "$1,234.56") to a decimal value (e.g., 1234.56). - /// - /// The currency string to parse. Expected format: optional "$", digits with optional commas, optional decimal point. - /// - /// When this method returns, contains the decimal value if parsing succeeded, - /// or zero if parsing failed. - /// - /// true if parsing succeeded; otherwise, false. - private static bool TryParseMoney(string value, out decimal result) - { - result = 0; - if (string.IsNullOrWhiteSpace(value)) - { - return false; - } - - var cleaned = value.Replace("$", "").Replace(",", "").Trim(); - return decimal.TryParse(cleaned, out result); - } -} diff --git a/examples/advanced/KedroSpaceflightsGQL/Flows/Ingest/IngestFlow.cs b/examples/advanced/KedroSpaceflightsGQL/Flows/Ingest/IngestFlow.cs index 2da3c41b..dd688ffa 100644 --- a/examples/advanced/KedroSpaceflightsGQL/Flows/Ingest/IngestFlow.cs +++ b/examples/advanced/KedroSpaceflightsGQL/Flows/Ingest/IngestFlow.cs @@ -6,35 +6,57 @@ namespace KedroSpaceflightsGQL.Flows.Ingest; /// -/// Ingest pipeline: reads raw CSV/Excel files and seeds the GQL server via mutations. -/// This flow must run before DataProcessing, which queries the GQL server. +/// Ingest pipeline: preprocesses raw CSV/Excel files and seeds the GQL server via mutations. +/// Preprocessing runs first so the GQL server stores typed values (int, decimal, bool) +/// rather than raw strings. DataProcessing reads back through the GQL API and gets +/// first-class C# types straight from the generated StrawberryShake interfaces. /// public static class IngestFlow { /// /// Creates the ingest pipeline. /// - /// Data catalog providing seed (CSV/Excel) and ack catalog entries. - /// - /// StrawberryShake GQL client. The same instance used by the DataProcessing catalog entries. - /// - /// - /// Both and are resolved from DI - /// by Flowthru's delegate-inspection registration — no factory lambda required. - /// public static Flow Create(Catalog catalog, ISpaceflightsClient client) { return FlowBuilder.CreateFlow(pipeline => { + pipeline.AddStep( + label: "PreprocessCompanies", + description: "Parses raw company CSV data (rating percentages, IATA flags) into typed records.", + transform: PreprocessCompaniesStep.Create(), + input: catalog.SeedCompanies, + output: catalog.PreprocessedCompanies + ); + + pipeline.AddStep( + label: "PreprocessShuttles", + description: "Parses raw shuttle Excel data (numeric fields, currency, boolean flags) into typed records.", + transform: PreprocessShuttlesStep.Create(), + input: catalog.SeedShuttles, + output: catalog.PreprocessedShuttles + ); + + pipeline.AddStep( + label: "PreprocessReviews", + description: "Parses raw review CSV data (rating strings) into typed decimal records.", + transform: PreprocessReviewsStep.Create(), + input: catalog.SeedReviews, + output: catalog.PreprocessedReviews + ); + pipeline.AddStep( label: "SeedGqlDatabase", description: """ - Reads raw companies (CSV), shuttles (Excel), and reviews (CSV) then seeds the GraphQL - server via addCompany / addShuttle / addReview mutations. Replace the in-process - HotChocolate server with your own GQL endpoint in Program.cs to point at production data. + Seeds the GraphQL server with preprocessed typed data via addCompany / addShuttle / + addReview mutations. Replace the in-process HotChocolate server with your own GQL + endpoint in Program.cs to point at production data. """, transform: SeedGqlDatabaseStep.Create(client), - input: (catalog.SeedCompanies, catalog.SeedShuttles, catalog.SeedReviews), + input: ( + catalog.PreprocessedCompanies, + catalog.PreprocessedShuttles, + catalog.PreprocessedReviews + ), output: catalog.GqlDatabaseSeeded ); }); diff --git a/examples/advanced/KedroSpaceflightsGQL/Flows/Ingest/Steps/PreprocessCompaniesStep.cs b/examples/advanced/KedroSpaceflightsGQL/Flows/Ingest/Steps/PreprocessCompaniesStep.cs new file mode 100644 index 00000000..f55eabf6 --- /dev/null +++ b/examples/advanced/KedroSpaceflightsGQL/Flows/Ingest/Steps/PreprocessCompaniesStep.cs @@ -0,0 +1,57 @@ +using Flowthru.Core.Steps; +using KedroSpaceflightsGQL.Data._01_Raw.Schemas; +using KedroSpaceflightsGQL.Data._02_Intermediate.Schemas; + +namespace KedroSpaceflightsGQL.Flows.Ingest.Steps; + +/// +/// Preprocesses raw company data by parsing rating percentages and IATA approval flags. +/// Runs during Ingest so the GQL server stores typed values rather than raw strings. +/// +[FlowthruStep] +public static class PreprocessCompaniesStep +{ + /// + /// Creates a preprocessing function that transforms raw company records into strongly-typed records. + /// Records with invalid rating percentages are filtered out. + /// + public static Func< + IEnumerable, + IEnumerable + > Create() => + input => + input + .Select(raw => Parse(raw)) + .Where(item => item != null) + .Cast(); + + private static PreprocessedCompanySchema? Parse(CompanySchema raw) + { + bool iataApproved = raw.IataApproved.Trim().ToLowerInvariant() == "t"; + + if (!TryParsePercentage(raw.CompanyRating, out var rating)) + return null; + + return new PreprocessedCompanySchema + { + Id = raw.Id, + CompanyRating = rating, + IataApproved = iataApproved, + CompanyLocation = raw.CompanyLocation, + }; + } + + private static bool TryParsePercentage(string value, out decimal result) + { + result = 0; + if (string.IsNullOrWhiteSpace(value)) + return false; + + var cleaned = value.Replace("%", "").Trim(); + if (!decimal.TryParse(cleaned, out var parsed)) + return false; + + result = parsed / 100m; + return true; + } +} diff --git a/examples/advanced/KedroSpaceflightsGQL/Flows/Ingest/Steps/PreprocessReviewsStep.cs b/examples/advanced/KedroSpaceflightsGQL/Flows/Ingest/Steps/PreprocessReviewsStep.cs new file mode 100644 index 00000000..1ad0b204 --- /dev/null +++ b/examples/advanced/KedroSpaceflightsGQL/Flows/Ingest/Steps/PreprocessReviewsStep.cs @@ -0,0 +1,34 @@ +using Flowthru.Core.Steps; +using KedroSpaceflightsGQL.Data._01_Raw.Schemas; +using KedroSpaceflightsGQL.Data._02_Intermediate.Schemas; + +namespace KedroSpaceflightsGQL.Flows.Ingest.Steps; + +/// +/// Preprocesses raw review data by parsing the rating string to a decimal score. +/// Records with unparseable scores are dropped. +/// +[FlowthruStep] +public static class PreprocessReviewsStep +{ + /// + /// Creates a preprocessing function that transforms raw review records into strongly-typed records. + /// + public static Func< + IEnumerable, + IEnumerable + > Create() => + input => + input + .Select(raw => Parse(raw)) + .Where(item => item != null) + .Cast(); + + private static PreprocessedReviewSchema? Parse(ReviewSchema raw) + { + if (!decimal.TryParse(raw.ReviewScoresRating, out var score)) + return null; + + return new PreprocessedReviewSchema { ShuttleId = raw.ShuttleId, ReviewScoresRating = score }; + } +} diff --git a/examples/advanced/KedroSpaceflightsGQL/Flows/Ingest/Steps/PreprocessShuttlesStep.cs b/examples/advanced/KedroSpaceflightsGQL/Flows/Ingest/Steps/PreprocessShuttlesStep.cs new file mode 100644 index 00000000..4d9cdbcc --- /dev/null +++ b/examples/advanced/KedroSpaceflightsGQL/Flows/Ingest/Steps/PreprocessShuttlesStep.cs @@ -0,0 +1,65 @@ +using Flowthru.Core.Steps; +using KedroSpaceflightsGQL.Data._01_Raw.Schemas; +using KedroSpaceflightsGQL.Data._02_Intermediate.Schemas; + +namespace KedroSpaceflightsGQL.Flows.Ingest.Steps; + +/// +/// Preprocesses raw shuttle data by parsing numeric fields, boolean flags, and currency values. +/// Runs during Ingest so the GQL server stores typed values rather than raw strings. +/// Records with unparseable fields are dropped. +/// +[FlowthruStep] +public static class PreprocessShuttlesStep +{ + /// + /// Creates a preprocessing function that transforms raw shuttle records into strongly-typed records. + /// + public static Func< + IEnumerable, + IEnumerable + > Create() => + input => + input + .Select(raw => Parse(raw)) + .Where(item => item != null) + .Cast(); + + private static PreprocessedShuttleSchema? Parse(ShuttleSchema raw) + { + bool dCheckComplete = raw.DCheckComplete?.Trim().ToLowerInvariant() == "t"; + bool moonClearanceComplete = raw.MoonClearanceComplete?.Trim().ToLowerInvariant() == "t"; + + if (!int.TryParse(raw.Engines, out var engines)) + return null; + if (!int.TryParse(raw.PassengerCapacity, out var passengerCapacity)) + return null; + if (!int.TryParse(raw.Crew, out var crew)) + return null; + if (!TryParseMoney(raw.Price, out var price)) + return null; + + return new PreprocessedShuttleSchema + { + Id = raw.Id, + ShuttleType = raw.ShuttleType, + CompanyId = raw.CompanyId, + Engines = engines, + PassengerCapacity = passengerCapacity, + Crew = crew, + Price = price, + DCheckComplete = dCheckComplete, + MoonClearanceComplete = moonClearanceComplete, + }; + } + + private static bool TryParseMoney(string? value, out decimal result) + { + result = 0; + if (string.IsNullOrWhiteSpace(value)) + return false; + + var cleaned = value.Replace("$", "").Replace(",", "").Trim(); + return decimal.TryParse(cleaned, out result); + } +} diff --git a/examples/advanced/KedroSpaceflightsGQL/Flows/Ingest/Steps/SeedGqlDatabaseStep.cs b/examples/advanced/KedroSpaceflightsGQL/Flows/Ingest/Steps/SeedGqlDatabaseStep.cs index 87ae90b8..ceb1a6a5 100644 --- a/examples/advanced/KedroSpaceflightsGQL/Flows/Ingest/Steps/SeedGqlDatabaseStep.cs +++ b/examples/advanced/KedroSpaceflightsGQL/Flows/Ingest/Steps/SeedGqlDatabaseStep.cs @@ -1,42 +1,35 @@ using Flowthru.Core.Steps; -using KedroSpaceflightsGQL.Data._01_Raw.Schemas; +using KedroSpaceflightsGQL.Data._02_Intermediate.Schemas; using KedroSpaceflightsGQL.Infra.GqlClient; using StrawberryShake; namespace KedroSpaceflightsGQL.Flows.Ingest.Steps; /// -/// Seeds the GQL server with company, shuttle, and review data via mutations. -/// Reads from the three raw CSV/Excel catalog entries and calls one mutation per record. +/// Seeds the GQL server with preprocessed, typed company, shuttle, and review data via mutations. +/// Receives already-parsed records — no string conversion needed. /// [FlowthruStep] public static class SeedGqlDatabaseStep { /// - /// Creates the seeding transform. Receives all three raw collections and calls + /// Creates the seeding transform. Receives preprocessed typed collections and calls /// the corresponding add-mutations; returns true when seeding is complete. /// public static Func< - (IEnumerable, IEnumerable, IEnumerable), + ( + IEnumerable, + IEnumerable, + IEnumerable + ), Task > Create(ISpaceflightsClient client) => async (inputs) => { var (companies, shuttles, reviews) = inputs; - // Seed all three collections — run sequentially to avoid overwhelming - // an in-process server; switch to parallel execution for a production endpoint. foreach (var c in companies) { - // Skip records with missing required fields (sparse rows from source files) - if ( - c.Id is null - || c.CompanyRating is null - || c.IataApproved is null - || c.CompanyLocation is null - ) - continue; - var result = await client.AddCompany.ExecuteAsync( new AddCompanyInput { @@ -51,20 +44,6 @@ c.Id is null foreach (var s in shuttles) { - // Skip records with missing required fields (sparse rows from source Excel files) - if ( - s.Id is null - || s.ShuttleType is null - || s.CompanyId is null - || s.Engines is null - || s.PassengerCapacity is null - || s.Crew is null - || s.Price is null - || s.DCheckComplete is null - || s.MoonClearanceComplete is null - ) - continue; - var result = await client.AddShuttle.ExecuteAsync( new AddShuttleInput { @@ -84,10 +63,6 @@ s.Id is null foreach (var r in reviews) { - // Skip records with missing required fields (sparse rows from source files) - if (r.ShuttleId is null || r.ReviewScoresRating is null) - continue; - var result = await client.AddReview.ExecuteAsync( new AddReviewInput { ShuttleId = r.ShuttleId, ReviewScoresRating = r.ReviewScoresRating } ); diff --git a/examples/advanced/KedroSpaceflightsGQL/Infra/GqlClient/schema.graphql b/examples/advanced/KedroSpaceflightsGQL/Infra/GqlClient/schema.graphql index 61009e90..09333f3c 100644 --- a/examples/advanced/KedroSpaceflightsGQL/Infra/GqlClient/schema.graphql +++ b/examples/advanced/KedroSpaceflightsGQL/Infra/GqlClient/schema.graphql @@ -1,7 +1,9 @@ +scalar Decimal + type CompanyRecord { id: String! - companyRating: String! - iataApproved: String! + companyRating: Decimal! + iataApproved: Boolean! companyLocation: String! } @@ -9,23 +11,23 @@ type ShuttleRecord { id: String! shuttleType: String! companyId: String! - engines: String! - passengerCapacity: String! - crew: String! - price: String! - dCheckComplete: String! - moonClearanceComplete: String! + engines: Int! + passengerCapacity: Int! + crew: Int! + price: Decimal! + dCheckComplete: Boolean! + moonClearanceComplete: Boolean! } type ReviewRecord { shuttleId: String! - reviewScoresRating: String! + reviewScoresRating: Decimal! } input AddCompanyInput { id: String! - companyRating: String! - iataApproved: String! + companyRating: Decimal! + iataApproved: Boolean! companyLocation: String! } @@ -33,17 +35,17 @@ input AddShuttleInput { id: String! shuttleType: String! companyId: String! - engines: String! - passengerCapacity: String! - crew: String! - price: String! - dCheckComplete: String! - moonClearanceComplete: String! + engines: Int! + passengerCapacity: Int! + crew: Int! + price: Decimal! + dCheckComplete: Boolean! + moonClearanceComplete: Boolean! } input AddReviewInput { shuttleId: String! - reviewScoresRating: String! + reviewScoresRating: Decimal! } type Query { diff --git a/examples/advanced/KedroSpaceflightsGQL/Infra/GqlServer/Types.cs b/examples/advanced/KedroSpaceflightsGQL/Infra/GqlServer/Types.cs index b83cc3ab..04720359 100644 --- a/examples/advanced/KedroSpaceflightsGQL/Infra/GqlServer/Types.cs +++ b/examples/advanced/KedroSpaceflightsGQL/Infra/GqlServer/Types.cs @@ -5,8 +5,8 @@ namespace KedroSpaceflightsGQL.Infra.GqlServer.Types; /// Raw string fields matching the CSV source schema. public record CompanyRecord( string Id, - string CompanyRating, - string IataApproved, + decimal CompanyRating, + bool IataApproved, string CompanyLocation ); @@ -15,24 +15,24 @@ public record ShuttleRecord( string Id, string ShuttleType, string CompanyId, - string Engines, - string PassengerCapacity, - string Crew, - string Price, - string DCheckComplete, - string MoonClearanceComplete + int Engines, + int PassengerCapacity, + int Crew, + decimal Price, + bool DCheckComplete, + bool MoonClearanceComplete ); /// Raw string fields matching the CSV source schema. -public record ReviewRecord(string ShuttleId, string ReviewScoresRating); +public record ReviewRecord(string ShuttleId, decimal ReviewScoresRating); // ── GQL input types ──────────────────────────────────────────────────────── /// Input for the addCompany mutation. public record AddCompanyInput( string Id, - string CompanyRating, - string IataApproved, + decimal CompanyRating, + bool IataApproved, string CompanyLocation ); @@ -41,13 +41,13 @@ public record AddShuttleInput( string Id, string ShuttleType, string CompanyId, - string Engines, - string PassengerCapacity, - string Crew, - string Price, - string DCheckComplete, - string MoonClearanceComplete + int Engines, + int PassengerCapacity, + int Crew, + decimal Price, + bool DCheckComplete, + bool MoonClearanceComplete ); /// Input for the addReview mutation. -public record AddReviewInput(string ShuttleId, string ReviewScoresRating); +public record AddReviewInput(string ShuttleId, decimal ReviewScoresRating); From 16453d5cad90271b2b7b686ec9663ae9966441d9 Mon Sep 17 00:00:00 2001 From: Spencer Elkington Date: Mon, 13 Apr 2026 14:33:11 -0600 Subject: [PATCH 8/8] docs: comment issue on GQL --- .../KedroSpaceflightsGQL/Data/_01_Raw/Catalog.Raw.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/examples/advanced/KedroSpaceflightsGQL/Data/_01_Raw/Catalog.Raw.cs b/examples/advanced/KedroSpaceflightsGQL/Data/_01_Raw/Catalog.Raw.cs index b6cdb4c3..155c287f 100644 --- a/examples/advanced/KedroSpaceflightsGQL/Data/_01_Raw/Catalog.Raw.cs +++ b/examples/advanced/KedroSpaceflightsGQL/Data/_01_Raw/Catalog.Raw.cs @@ -46,10 +46,10 @@ public partial class Catalog ) ); - // /// - // /// In-memory flag written by the Ingest flow after all mutations succeed. - // /// Downstream flows depend on this via the DAG. - // /// + /// + /// In-memory flag written by the Ingest flow after all mutations succeed. + /// Downstream flows depend on this via the DAG. + /// public IItem GqlDatabaseSeeded => CreateItem(() => ItemFactory.Single.Memory("GqlDatabaseSeeded"));