diff --git a/.github/workflows/pr-tests.yml b/.github/workflows/pr-tests.yml
new file mode 100644
index 00000000..898d2091
--- /dev/null
+++ b/.github/workflows/pr-tests.yml
@@ -0,0 +1,75 @@
+name: PR Tests
+
+on:
+ pull_request:
+ types: [opened, synchronize, reopened, ready_for_review]
+ paths-ignore:
+ - "**.md"
+ - "docs/**"
+ - ".github/instructions/**"
+
+concurrency:
+ group: ${{ github.workflow }}-${{ github.ref }}
+ cancel-in-progress: true
+
+jobs:
+ test:
+ runs-on: ubuntu-latest
+ if: github.event.pull_request.draft == false
+
+ permissions:
+ contents: read
+
+ steps:
+ - name: Checkout code
+ uses: actions/checkout@v4
+ with:
+ fetch-depth: 0
+
+ - name: Setup .NET
+ uses: actions/setup-dotnet@v4
+ with:
+ dotnet-version: "10.0.x"
+ dotnet-quality: "preview"
+ env:
+ DOTNET_SKIP_FIRST_TIME_EXPERIENCE: "true"
+ DOTNET_NOLOGO: "true"
+ DOTNET_CLI_TELEMETRY_OPTOUT: "true"
+
+ - name: Setup Node.js
+ uses: actions/setup-node@v4
+ with:
+ node-version: "22"
+
+ - name: Setup Python
+ uses: actions/setup-python@v5
+ with:
+ python-version: "3.10"
+
+ - name: Setup UV
+ uses: astral-sh/setup-uv@v5
+
+ - name: Setup pnpm
+ uses: pnpm/action-setup@v4
+ with:
+ version: 10.6.3
+
+ - name: Cache NuGet packages
+ uses: actions/cache@v4
+ with:
+ path: ~/.nuget/packages
+ key: ${{ runner.os }}-nuget-${{ hashFiles('**/packages.lock.json', '**/*.csproj') }}
+ restore-keys: |
+ ${{ runner.os }}-nuget-
+
+ - name: Install Node dependencies
+ run: pnpm install
+
+ - name: Run affected tests
+ env:
+ CI: "true"
+ run: |
+ pnpm nx affected -t test \
+ --base=${{ github.event.pull_request.base.sha }} --head=HEAD \
+ --output-style=stream \
+ --logger "console;verbosity=minimal"
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index 541958b0..df062e5a 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -104,9 +104,9 @@ This doesn't mean *ignoring* these concerns — it just means extending the API
The project uses NX for task orchestration. Common commands:
```bash
-nx run flowthru:build # Build the solution
-nx run flowthru:test # Run all tests with coverage
-nx run flowthru:format:csharp # Format code with CSharpier
+dotnet build # Confirm solution builds fully
+nx run affected -t test # Run all test projects affected by current changes
+dotnet test # Run all tests for the project.
```
To run a subset of tests by category:
diff --git a/examples/advanced/KedroSpaceflights.Custom/Program.cs b/examples/advanced/KedroSpaceflights.Custom/Program.cs
index e38c756b..8f15e866 100644
--- a/examples/advanced/KedroSpaceflights.Custom/Program.cs
+++ b/examples/advanced/KedroSpaceflights.Custom/Program.cs
@@ -94,6 +94,8 @@ private static void ConfigureServices(IServiceCollection services, string basePa
flowthru
.RegisterFlow(label: "Reporting", flow: ReportingFlow.Create)
.WithDescription("Generates reports and visualizations");
+
+ flowthru.ConfigureExecution(opts => opts.MaxDegreeOfParallelism = 8);
});
services.AddLogging(logging =>
diff --git a/examples/advanced/KedroSpaceflightsGQL/Data/_01_Raw/Catalog.Raw.cs b/examples/advanced/KedroSpaceflightsGQL/Data/_01_Raw/Catalog.Raw.cs
index 37de0f02..155c287f 100644
--- a/examples/advanced/KedroSpaceflightsGQL/Data/_01_Raw/Catalog.Raw.cs
+++ b/examples/advanced/KedroSpaceflightsGQL/Data/_01_Raw/Catalog.Raw.cs
@@ -46,10 +46,10 @@ public partial class Catalog
)
);
- // ///
- // /// In-memory flag written by the Ingest flow after all mutations succeed.
- // /// Downstream flows depend on this via the DAG.
- // ///
+ ///
+ /// In-memory flag written by the Ingest flow after all mutations succeed.
+ /// Downstream flows depend on this via the DAG.
+ ///
public IItem GqlDatabaseSeeded =>
CreateItem(() => ItemFactory.Single.Memory("GqlDatabaseSeeded"));
@@ -62,7 +62,7 @@ public partial class Catalog
CreateItem(
() =>
GqlItemFactory.Enumerable.Query(
- label: "Companies",
+ label: "GQLCompanies",
queryFunc: ct => _client.GetCompanies.ExecuteAsync(ct),
selectData: r => r.Companies,
allowEmptyData: true
@@ -76,7 +76,7 @@ public partial class Catalog
CreateItem(
() =>
GqlItemFactory.Enumerable.Query(
- label: "Shuttles",
+ label: "GQLShuttles",
queryFunc: ct => _client.GetShuttles.ExecuteAsync(ct),
selectData: r => r.Shuttles,
allowEmptyData: true
@@ -90,7 +90,7 @@ public partial class Catalog
CreateItem(
() =>
GqlItemFactory.Enumerable.Query(
- label: "Reviews",
+ label: "GQLReviews",
queryFunc: ct => _client.GetReviews.ExecuteAsync(ct),
selectData: r => r.Reviews,
allowEmptyData: true
diff --git a/examples/advanced/KedroSpaceflightsGQL/Data/_02_Intermediate/Catalog.Intermediate.cs b/examples/advanced/KedroSpaceflightsGQL/Data/_02_Intermediate/Catalog.Intermediate.cs
index 148feff3..e67059e1 100644
--- a/examples/advanced/KedroSpaceflightsGQL/Data/_02_Intermediate/Catalog.Intermediate.cs
+++ b/examples/advanced/KedroSpaceflightsGQL/Data/_02_Intermediate/Catalog.Intermediate.cs
@@ -28,4 +28,16 @@ public partial class Catalog
filePath: $"{_basePath}/_02_Intermediate/Datasets/preprocessed_shuttles.parquet"
)
);
+
+ ///
+ /// Preprocessed review data with parsed decimal rating scores.
+ ///
+ public IItem> PreprocessedReviews =>
+ CreateItem(
+ () =>
+ ItemFactory.Enumerable.Parquet(
+ label: "PreprocessedReviews",
+ filePath: $"{_basePath}/_02_Intermediate/Datasets/preprocessed_reviews.parquet"
+ )
+ );
}
diff --git a/examples/advanced/KedroSpaceflightsGQL/Data/_02_Intermediate/Schemas/PreprocessedReviewSchema.cs b/examples/advanced/KedroSpaceflightsGQL/Data/_02_Intermediate/Schemas/PreprocessedReviewSchema.cs
new file mode 100644
index 00000000..fd237711
--- /dev/null
+++ b/examples/advanced/KedroSpaceflightsGQL/Data/_02_Intermediate/Schemas/PreprocessedReviewSchema.cs
@@ -0,0 +1,23 @@
+using Flowthru.Core.Abstractions;
+
+namespace KedroSpaceflightsGQL.Data._02_Intermediate.Schemas;
+
+///
+/// Represents a preprocessed review with a parsed numeric rating.
+/// Produced by filtering out records with unparseable rating strings.
+///
+[FlowthruSchema]
+public partial record PreprocessedReviewSchema
+{
+ ///
+ /// Identifier of the shuttle being reviewed.
+ ///
+ [SerializedLabel("shuttle_id")]
+ public required string ShuttleId { get; init; }
+
+ ///
+ /// Review rating score as a decimal value.
+ ///
+ [SerializedLabel("review_scores_rating")]
+ public required decimal ReviewScoresRating { get; init; }
+}
diff --git a/examples/advanced/KedroSpaceflightsGQL/Flows/DataProcessing/DataProcessingFlow.cs b/examples/advanced/KedroSpaceflightsGQL/Flows/DataProcessing/DataProcessingFlow.cs
index 602194c2..0f737e05 100644
--- a/examples/advanced/KedroSpaceflightsGQL/Flows/DataProcessing/DataProcessingFlow.cs
+++ b/examples/advanced/KedroSpaceflightsGQL/Flows/DataProcessing/DataProcessingFlow.cs
@@ -1,49 +1,28 @@
using Flowthru.Core.Flows;
using KedroSpaceflightsGQL.Data;
using KedroSpaceflightsGQL.Flows.DataProcessing.Steps;
-using KedroSpaceflightsGQL.Infra.GqlClient;
namespace KedroSpaceflightsGQL.Flows.DataProcessing;
///
-/// Creates the data processing pipeline that preprocesses raw company and shuttle data
-/// and joins it with reviews to create a model input table.
+/// Data processing pipeline. Reads typed data from the GQL server and joins it into a
+/// model input table. Depends on Ingest completing first via the GqlDatabaseSeeded gate.
///
public static class DataProcessingFlow
{
- ///
- /// Creates the data processing pipeline.
- ///
- /// The data catalog containing input and output entries.
- /// A configured pipeline that produces a model input table from raw data sources.
public static Flow Create(Catalog catalog)
{
return FlowBuilder.CreateFlow(pipeline =>
{
- pipeline.AddStep(
- label: "PreprocessCompanies",
- description: "Cleans and preprocesses raw company data.",
- transform: PreprocessCompaniesStep.Create(),
- input: catalog.Companies,
- output: catalog.PreprocessedCompanies
- );
-
- pipeline.AddStep(
- label: "PreprocessShuttles",
- description: "Cleans and preprocesses raw shuttle data.",
- transform: PreprocessShuttlesStep.Create(),
- input: catalog.Shuttles,
- output: catalog.PreprocessedShuttles
- );
-
pipeline.AddStep(
label: "CreateModelInputTable",
description: """
- Joins preprocessed shuttle and company data with review scores to create a
- unified model input table.
+ Joins typed shuttle, company, and review data queried from the GQL server
+ into a unified model input table. GqlDatabaseSeeded is consumed as an explicit
+ DAG gate ensuring Ingest has completed before this step executes.
""",
transform: CreateModelInputTableStep.Create(),
- input: (catalog.PreprocessedShuttles, catalog.PreprocessedCompanies, catalog.Reviews),
+ input: (catalog.GqlDatabaseSeeded, catalog.Shuttles, catalog.Companies, catalog.Reviews),
output: catalog.ModelInputTable
);
});
diff --git a/examples/advanced/KedroSpaceflightsGQL/Flows/DataProcessing/Steps/CreateModelInputTableStep.cs b/examples/advanced/KedroSpaceflightsGQL/Flows/DataProcessing/Steps/CreateModelInputTableStep.cs
index 948cd4a5..0ccfa103 100644
--- a/examples/advanced/KedroSpaceflightsGQL/Flows/DataProcessing/Steps/CreateModelInputTableStep.cs
+++ b/examples/advanced/KedroSpaceflightsGQL/Flows/DataProcessing/Steps/CreateModelInputTableStep.cs
@@ -1,27 +1,23 @@
using Flowthru.Core.Steps;
-using KedroSpaceflightsGQL.Data._02_Intermediate.Schemas;
using KedroSpaceflightsGQL.Data._03_Primary.Schemas;
using KedroSpaceflightsGQL.Infra.GqlClient;
namespace KedroSpaceflightsGQL.Flows.DataProcessing.Steps;
///
-/// Joins preprocessed shuttle and company data with review scores to create a unified model input table.
+/// Joins typed shuttle, company, and review data from the GQL server into a unified model
+/// input table. All fields are already strongly-typed — no parsing required.
+/// The bool first input is the GqlDatabaseSeeded gate; it is consumed only to
+/// express the DAG dependency on Ingest and is otherwise unused.
///
[FlowthruStep]
public static class CreateModelInputTableStep
{
- ///
- /// Creates a join function that combines shuttle, company, and review data into a single table for modeling.
- ///
- ///
- /// A function that performs inner joins to produce records.
- /// Records are filtered to include only reviews with valid numeric scores.
- ///
public static Func<
(
- IEnumerable,
- IEnumerable,
+ bool,
+ IEnumerable,
+ IEnumerable,
IEnumerable
),
IEnumerable
@@ -29,25 +25,15 @@ public static Func<
{
return (input) =>
{
- var (shuttles, companies, reviews) = input;
-
- // Parse reviews to have decimal scores
- var parsedReviews = reviews
- .Select(r => new
- {
- r.ShuttleId,
- Score = decimal.TryParse(r.ReviewScoresRating, out var score) ? score : (decimal?)null,
- })
- .Where(r => r.Score.HasValue)
- .ToList();
+ var (_, shuttles, companies, reviews) = input;
// Join reviews to shuttles
- var ratedShuttles = parsedReviews
+ var ratedShuttles = reviews
.Join(
shuttles,
r => r.ShuttleId,
s => s.Id,
- (r, s) => new { Shuttle = s, ReviewScore = r.Score!.Value }
+ (r, s) => new { Shuttle = s, ReviewScore = r.ReviewScoresRating }
)
.ToList();
@@ -74,7 +60,7 @@ public static Func<
ReviewScoresRating = rs.ReviewScore,
}
)
- .ToList(); // Materialize query to ensure LINQ execution completes
+ .ToList();
return modelInputTable;
};
diff --git a/examples/advanced/KedroSpaceflightsGQL/Flows/DataProcessing/Steps/PreprocessCompaniesStep.cs b/examples/advanced/KedroSpaceflightsGQL/Flows/DataProcessing/Steps/PreprocessCompaniesStep.cs
deleted file mode 100644
index b721bf52..00000000
--- a/examples/advanced/KedroSpaceflightsGQL/Flows/DataProcessing/Steps/PreprocessCompaniesStep.cs
+++ /dev/null
@@ -1,89 +0,0 @@
-using Flowthru.Core.Steps;
-using KedroSpaceflightsGQL.Data._02_Intermediate.Schemas;
-using KedroSpaceflightsGQL.Infra.GqlClient;
-
-namespace KedroSpaceflightsGQL.Flows.DataProcessing.Steps;
-
-///
-/// Preprocesses raw company data by parsing rating percentages and IATA approval flags.
-///
-[FlowthruStep]
-public static class PreprocessCompaniesStep
-{
- ///
- /// Creates a preprocessing function that transforms raw company records into strongly-typed records.
- ///
- ///
- /// A function that converts records to records.
- /// Records with invalid rating percentages are filtered out.
- ///
- public static Func<
- IEnumerable,
- IEnumerable
- > Create()
- {
- return (input) =>
- {
- var processed = input
- .Select(raw => Parse(raw))
- .Where(item => item != null)
- .Cast();
-
- return processed;
- };
- }
-
- ///
- /// Parses a raw company record into a preprocessed record with strongly-typed fields.
- ///
- /// The raw company record to parse.
- ///
- /// A if parsing succeeds; otherwise, null.
- ///
- private static PreprocessedCompanySchema? Parse(IGetCompanies_Companies raw)
- {
- // Parse "t" or "f" to boolean
- bool iataApproved = raw.IataApproved.Trim().ToLowerInvariant() == "t";
-
- // Parse percentage string (e.g., "90%" -> 0.90)
- if (!TryParsePercentage(raw.CompanyRating, out var rating))
- {
- return null;
- }
-
- return new PreprocessedCompanySchema
- {
- Id = raw.Id,
- CompanyRating = rating,
- IataApproved = iataApproved,
- CompanyLocation = raw.CompanyLocation,
- };
- }
-
- ///
- /// Parses a percentage string (e.g., "90%") to a decimal ratio (e.g., 0.90).
- ///
- /// The percentage string to parse. Expected format: digits followed by optional "%".
- ///
- /// When this method returns, contains the decimal ratio (0.0 to 1.0) if parsing succeeded,
- /// or zero if parsing failed.
- ///
- /// true if parsing succeeded; otherwise, false.
- private static bool TryParsePercentage(string value, out decimal result)
- {
- result = 0;
- if (string.IsNullOrWhiteSpace(value))
- {
- return false;
- }
-
- var cleaned = value.Replace("%", "").Trim();
- if (!decimal.TryParse(cleaned, out var parsed))
- {
- return false;
- }
-
- result = parsed / 100m;
- return true;
- }
-}
diff --git a/examples/advanced/KedroSpaceflightsGQL/Flows/DataProcessing/Steps/PreprocessShuttlesStep.cs b/examples/advanced/KedroSpaceflightsGQL/Flows/DataProcessing/Steps/PreprocessShuttlesStep.cs
deleted file mode 100644
index 099bf471..00000000
--- a/examples/advanced/KedroSpaceflightsGQL/Flows/DataProcessing/Steps/PreprocessShuttlesStep.cs
+++ /dev/null
@@ -1,105 +0,0 @@
-using Flowthru.Core.Steps;
-using KedroSpaceflightsGQL.Data._02_Intermediate.Schemas;
-using KedroSpaceflightsGQL.Infra.GqlClient;
-
-namespace KedroSpaceflightsGQL.Flows.DataProcessing.Steps;
-
-///
-/// Preprocesses raw shuttle data by parsing numeric fields, boolean flags, and currency values.
-///
-[FlowthruStep]
-public static class PreprocessShuttlesStep
-{
- ///
- /// Creates a preprocessing function that transforms raw shuttle records into strongly-typed records.
- ///
- ///
- /// A function that converts records to records.
- /// Records with invalid numeric fields or currency values are filtered out.
- ///
- public static Func<
- IEnumerable,
- IEnumerable
- > Create()
- {
- return (input) =>
- {
- var processed = input
- .Select(raw => Parse(raw))
- .Where(item => item != null)
- .Cast();
-
- return processed;
- };
- }
-
- ///
- /// Parses a raw shuttle record into a preprocessed record with strongly-typed fields.
- ///
- /// The raw shuttle record to parse.
- ///
- /// A if all fields parse successfully; otherwise, null.
- ///
- private static PreprocessedShuttleSchema? Parse(IGetShuttles_Shuttles raw)
- {
- // Parse boolean fields
- bool dCheckComplete = raw.DCheckComplete.Trim().ToLowerInvariant() == "t";
- bool moonClearanceComplete = raw.MoonClearanceComplete.Trim().ToLowerInvariant() == "t";
-
- // Parse numeric fields
- if (!int.TryParse(raw.Engines, out var engines))
- {
- return null;
- }
-
- if (!int.TryParse(raw.PassengerCapacity, out var passengerCapacity))
- {
- return null;
- }
-
- if (!int.TryParse(raw.Crew, out var crew))
- {
- return null;
- }
-
- // Parse money string (e.g., "$1,234.56" -> 1234.56)
- if (!TryParseMoney(raw.Price, out var price))
- {
- return null;
- }
-
- return new PreprocessedShuttleSchema
- {
- Id = raw.Id,
- ShuttleType = raw.ShuttleType,
- CompanyId = raw.CompanyId,
- Engines = engines,
- PassengerCapacity = passengerCapacity,
- Crew = crew,
- Price = price,
- DCheckComplete = dCheckComplete,
- MoonClearanceComplete = moonClearanceComplete,
- };
- }
-
- ///
- /// Parses a currency string (e.g., "$1,234.56") to a decimal value (e.g., 1234.56).
- ///
- /// The currency string to parse. Expected format: optional "$", digits with optional commas, optional decimal point.
- ///
- /// When this method returns, contains the decimal value if parsing succeeded,
- /// or zero if parsing failed.
- ///
- /// true if parsing succeeded; otherwise, false.
- private static bool TryParseMoney(string value, out decimal result)
- {
- result = 0;
- if (string.IsNullOrWhiteSpace(value))
- {
- return false;
- }
-
- var cleaned = value.Replace("$", "").Replace(",", "").Trim();
- return decimal.TryParse(cleaned, out result);
- }
-}
diff --git a/examples/advanced/KedroSpaceflightsGQL/Flows/Ingest/IngestFlow.cs b/examples/advanced/KedroSpaceflightsGQL/Flows/Ingest/IngestFlow.cs
index 2da3c41b..dd688ffa 100644
--- a/examples/advanced/KedroSpaceflightsGQL/Flows/Ingest/IngestFlow.cs
+++ b/examples/advanced/KedroSpaceflightsGQL/Flows/Ingest/IngestFlow.cs
@@ -6,35 +6,57 @@
namespace KedroSpaceflightsGQL.Flows.Ingest;
///
-/// Ingest pipeline: reads raw CSV/Excel files and seeds the GQL server via mutations.
-/// This flow must run before DataProcessing, which queries the GQL server.
+/// Ingest pipeline: preprocesses raw CSV/Excel files and seeds the GQL server via mutations.
+/// Preprocessing runs first so the GQL server stores typed values (int, decimal, bool)
+/// rather than raw strings. DataProcessing reads back through the GQL API and gets
+/// first-class C# types straight from the generated StrawberryShake interfaces.
///
public static class IngestFlow
{
///
/// Creates the ingest pipeline.
///
- /// Data catalog providing seed (CSV/Excel) and ack catalog entries.
- ///
- /// StrawberryShake GQL client. The same instance used by the DataProcessing catalog entries.
- ///
- ///
- /// Both and are resolved from DI
- /// by Flowthru's delegate-inspection registration — no factory lambda required.
- ///
public static Flow Create(Catalog catalog, ISpaceflightsClient client)
{
return FlowBuilder.CreateFlow(pipeline =>
{
+ pipeline.AddStep(
+ label: "PreprocessCompanies",
+ description: "Parses raw company CSV data (rating percentages, IATA flags) into typed records.",
+ transform: PreprocessCompaniesStep.Create(),
+ input: catalog.SeedCompanies,
+ output: catalog.PreprocessedCompanies
+ );
+
+ pipeline.AddStep(
+ label: "PreprocessShuttles",
+ description: "Parses raw shuttle Excel data (numeric fields, currency, boolean flags) into typed records.",
+ transform: PreprocessShuttlesStep.Create(),
+ input: catalog.SeedShuttles,
+ output: catalog.PreprocessedShuttles
+ );
+
+ pipeline.AddStep(
+ label: "PreprocessReviews",
+ description: "Parses raw review CSV data (rating strings) into typed decimal records.",
+ transform: PreprocessReviewsStep.Create(),
+ input: catalog.SeedReviews,
+ output: catalog.PreprocessedReviews
+ );
+
pipeline.AddStep(
label: "SeedGqlDatabase",
description: """
- Reads raw companies (CSV), shuttles (Excel), and reviews (CSV) then seeds the GraphQL
- server via addCompany / addShuttle / addReview mutations. Replace the in-process
- HotChocolate server with your own GQL endpoint in Program.cs to point at production data.
+ Seeds the GraphQL server with preprocessed typed data via addCompany / addShuttle /
+ addReview mutations. Replace the in-process HotChocolate server with your own GQL
+ endpoint in Program.cs to point at production data.
""",
transform: SeedGqlDatabaseStep.Create(client),
- input: (catalog.SeedCompanies, catalog.SeedShuttles, catalog.SeedReviews),
+ input: (
+ catalog.PreprocessedCompanies,
+ catalog.PreprocessedShuttles,
+ catalog.PreprocessedReviews
+ ),
output: catalog.GqlDatabaseSeeded
);
});
diff --git a/examples/advanced/KedroSpaceflightsGQL/Flows/Ingest/Steps/PreprocessCompaniesStep.cs b/examples/advanced/KedroSpaceflightsGQL/Flows/Ingest/Steps/PreprocessCompaniesStep.cs
new file mode 100644
index 00000000..f55eabf6
--- /dev/null
+++ b/examples/advanced/KedroSpaceflightsGQL/Flows/Ingest/Steps/PreprocessCompaniesStep.cs
@@ -0,0 +1,57 @@
+using Flowthru.Core.Steps;
+using KedroSpaceflightsGQL.Data._01_Raw.Schemas;
+using KedroSpaceflightsGQL.Data._02_Intermediate.Schemas;
+
+namespace KedroSpaceflightsGQL.Flows.Ingest.Steps;
+
+///
+/// Preprocesses raw company data by parsing rating percentages and IATA approval flags.
+/// Runs during Ingest so the GQL server stores typed values rather than raw strings.
+///
+[FlowthruStep]
+public static class PreprocessCompaniesStep
+{
+ ///
+ /// Creates a preprocessing function that transforms raw company records into strongly-typed records.
+ /// Records with invalid rating percentages are filtered out.
+ ///
+ public static Func<
+ IEnumerable,
+ IEnumerable
+ > Create() =>
+ input =>
+ input
+ .Select(raw => Parse(raw))
+ .Where(item => item != null)
+ .Cast();
+
+ private static PreprocessedCompanySchema? Parse(CompanySchema raw)
+ {
+ bool iataApproved = raw.IataApproved.Trim().ToLowerInvariant() == "t";
+
+ if (!TryParsePercentage(raw.CompanyRating, out var rating))
+ return null;
+
+ return new PreprocessedCompanySchema
+ {
+ Id = raw.Id,
+ CompanyRating = rating,
+ IataApproved = iataApproved,
+ CompanyLocation = raw.CompanyLocation,
+ };
+ }
+
+ private static bool TryParsePercentage(string value, out decimal result)
+ {
+ result = 0;
+ if (string.IsNullOrWhiteSpace(value))
+ return false;
+
+ var cleaned = value.Replace("%", "").Trim();
+ if (!decimal.TryParse(cleaned, out var parsed))
+ return false;
+
+ result = parsed / 100m;
+ return true;
+ }
+}
diff --git a/examples/advanced/KedroSpaceflightsGQL/Flows/Ingest/Steps/PreprocessReviewsStep.cs b/examples/advanced/KedroSpaceflightsGQL/Flows/Ingest/Steps/PreprocessReviewsStep.cs
new file mode 100644
index 00000000..1ad0b204
--- /dev/null
+++ b/examples/advanced/KedroSpaceflightsGQL/Flows/Ingest/Steps/PreprocessReviewsStep.cs
@@ -0,0 +1,34 @@
+using Flowthru.Core.Steps;
+using KedroSpaceflightsGQL.Data._01_Raw.Schemas;
+using KedroSpaceflightsGQL.Data._02_Intermediate.Schemas;
+
+namespace KedroSpaceflightsGQL.Flows.Ingest.Steps;
+
+///
+/// Preprocesses raw review data by parsing the rating string to a decimal score.
+/// Records with unparseable scores are dropped.
+///
+[FlowthruStep]
+public static class PreprocessReviewsStep
+{
+ ///
+ /// Creates a preprocessing function that transforms raw review records into strongly-typed records.
+ ///
+ public static Func<
+ IEnumerable,
+ IEnumerable
+ > Create() =>
+ input =>
+ input
+ .Select(raw => Parse(raw))
+ .Where(item => item != null)
+ .Cast();
+
+ private static PreprocessedReviewSchema? Parse(ReviewSchema raw)
+ {
+ if (!decimal.TryParse(raw.ReviewScoresRating, out var score))
+ return null;
+
+ return new PreprocessedReviewSchema { ShuttleId = raw.ShuttleId, ReviewScoresRating = score };
+ }
+}
diff --git a/examples/advanced/KedroSpaceflightsGQL/Flows/Ingest/Steps/PreprocessShuttlesStep.cs b/examples/advanced/KedroSpaceflightsGQL/Flows/Ingest/Steps/PreprocessShuttlesStep.cs
new file mode 100644
index 00000000..4d9cdbcc
--- /dev/null
+++ b/examples/advanced/KedroSpaceflightsGQL/Flows/Ingest/Steps/PreprocessShuttlesStep.cs
@@ -0,0 +1,65 @@
+using Flowthru.Core.Steps;
+using KedroSpaceflightsGQL.Data._01_Raw.Schemas;
+using KedroSpaceflightsGQL.Data._02_Intermediate.Schemas;
+
+namespace KedroSpaceflightsGQL.Flows.Ingest.Steps;
+
+///
+/// Preprocesses raw shuttle data by parsing numeric fields, boolean flags, and currency values.
+/// Runs during Ingest so the GQL server stores typed values rather than raw strings.
+/// Records with unparseable fields are dropped.
+///
+[FlowthruStep]
+public static class PreprocessShuttlesStep
+{
+ ///
+ /// Creates a preprocessing function that transforms raw shuttle records into strongly-typed records.
+ ///
+ public static Func<
+ IEnumerable,
+ IEnumerable
+ > Create() =>
+ input =>
+ input
+ .Select(raw => Parse(raw))
+ .Where(item => item != null)
+ .Cast();
+
+ private static PreprocessedShuttleSchema? Parse(ShuttleSchema raw)
+ {
+ bool dCheckComplete = raw.DCheckComplete?.Trim().ToLowerInvariant() == "t";
+ bool moonClearanceComplete = raw.MoonClearanceComplete?.Trim().ToLowerInvariant() == "t";
+
+ if (!int.TryParse(raw.Engines, out var engines))
+ return null;
+ if (!int.TryParse(raw.PassengerCapacity, out var passengerCapacity))
+ return null;
+ if (!int.TryParse(raw.Crew, out var crew))
+ return null;
+ if (!TryParseMoney(raw.Price, out var price))
+ return null;
+
+ return new PreprocessedShuttleSchema
+ {
+ Id = raw.Id,
+ ShuttleType = raw.ShuttleType,
+ CompanyId = raw.CompanyId,
+ Engines = engines,
+ PassengerCapacity = passengerCapacity,
+ Crew = crew,
+ Price = price,
+ DCheckComplete = dCheckComplete,
+ MoonClearanceComplete = moonClearanceComplete,
+ };
+ }
+
+ private static bool TryParseMoney(string? value, out decimal result)
+ {
+ result = 0;
+ if (string.IsNullOrWhiteSpace(value))
+ return false;
+
+ var cleaned = value.Replace("$", "").Replace(",", "").Trim();
+ return decimal.TryParse(cleaned, out result);
+ }
+}
diff --git a/examples/advanced/KedroSpaceflightsGQL/Flows/Ingest/Steps/SeedGqlDatabaseStep.cs b/examples/advanced/KedroSpaceflightsGQL/Flows/Ingest/Steps/SeedGqlDatabaseStep.cs
index 87ae90b8..ceb1a6a5 100644
--- a/examples/advanced/KedroSpaceflightsGQL/Flows/Ingest/Steps/SeedGqlDatabaseStep.cs
+++ b/examples/advanced/KedroSpaceflightsGQL/Flows/Ingest/Steps/SeedGqlDatabaseStep.cs
@@ -1,42 +1,35 @@
using Flowthru.Core.Steps;
-using KedroSpaceflightsGQL.Data._01_Raw.Schemas;
+using KedroSpaceflightsGQL.Data._02_Intermediate.Schemas;
using KedroSpaceflightsGQL.Infra.GqlClient;
using StrawberryShake;
namespace KedroSpaceflightsGQL.Flows.Ingest.Steps;
///
-/// Seeds the GQL server with company, shuttle, and review data via mutations.
-/// Reads from the three raw CSV/Excel catalog entries and calls one mutation per record.
+/// Seeds the GQL server with preprocessed, typed company, shuttle, and review data via mutations.
+/// Receives already-parsed records — no string conversion needed.
///
[FlowthruStep]
public static class SeedGqlDatabaseStep
{
///
- /// Creates the seeding transform. Receives all three raw collections and calls
+ /// Creates the seeding transform. Receives preprocessed typed collections and calls
/// the corresponding add-mutations; returns true when seeding is complete.
///
public static Func<
- (IEnumerable, IEnumerable, IEnumerable),
+ (
+ IEnumerable,
+ IEnumerable,
+ IEnumerable
+ ),
Task
> Create(ISpaceflightsClient client) =>
async (inputs) =>
{
var (companies, shuttles, reviews) = inputs;
- // Seed all three collections — run sequentially to avoid overwhelming
- // an in-process server; switch to parallel execution for a production endpoint.
foreach (var c in companies)
{
- // Skip records with missing required fields (sparse rows from source files)
- if (
- c.Id is null
- || c.CompanyRating is null
- || c.IataApproved is null
- || c.CompanyLocation is null
- )
- continue;
-
var result = await client.AddCompany.ExecuteAsync(
new AddCompanyInput
{
@@ -51,20 +44,6 @@ c.Id is null
foreach (var s in shuttles)
{
- // Skip records with missing required fields (sparse rows from source Excel files)
- if (
- s.Id is null
- || s.ShuttleType is null
- || s.CompanyId is null
- || s.Engines is null
- || s.PassengerCapacity is null
- || s.Crew is null
- || s.Price is null
- || s.DCheckComplete is null
- || s.MoonClearanceComplete is null
- )
- continue;
-
var result = await client.AddShuttle.ExecuteAsync(
new AddShuttleInput
{
@@ -84,10 +63,6 @@ s.Id is null
foreach (var r in reviews)
{
- // Skip records with missing required fields (sparse rows from source files)
- if (r.ShuttleId is null || r.ReviewScoresRating is null)
- continue;
-
var result = await client.AddReview.ExecuteAsync(
new AddReviewInput { ShuttleId = r.ShuttleId, ReviewScoresRating = r.ReviewScoresRating }
);
diff --git a/examples/advanced/KedroSpaceflightsGQL/Infra/GqlClient/schema.graphql b/examples/advanced/KedroSpaceflightsGQL/Infra/GqlClient/schema.graphql
index 61009e90..09333f3c 100644
--- a/examples/advanced/KedroSpaceflightsGQL/Infra/GqlClient/schema.graphql
+++ b/examples/advanced/KedroSpaceflightsGQL/Infra/GqlClient/schema.graphql
@@ -1,7 +1,9 @@
+scalar Decimal
+
type CompanyRecord {
id: String!
- companyRating: String!
- iataApproved: String!
+ companyRating: Decimal!
+ iataApproved: Boolean!
companyLocation: String!
}
@@ -9,23 +11,23 @@ type ShuttleRecord {
id: String!
shuttleType: String!
companyId: String!
- engines: String!
- passengerCapacity: String!
- crew: String!
- price: String!
- dCheckComplete: String!
- moonClearanceComplete: String!
+ engines: Int!
+ passengerCapacity: Int!
+ crew: Int!
+ price: Decimal!
+ dCheckComplete: Boolean!
+ moonClearanceComplete: Boolean!
}
type ReviewRecord {
shuttleId: String!
- reviewScoresRating: String!
+ reviewScoresRating: Decimal!
}
input AddCompanyInput {
id: String!
- companyRating: String!
- iataApproved: String!
+ companyRating: Decimal!
+ iataApproved: Boolean!
companyLocation: String!
}
@@ -33,17 +35,17 @@ input AddShuttleInput {
id: String!
shuttleType: String!
companyId: String!
- engines: String!
- passengerCapacity: String!
- crew: String!
- price: String!
- dCheckComplete: String!
- moonClearanceComplete: String!
+ engines: Int!
+ passengerCapacity: Int!
+ crew: Int!
+ price: Decimal!
+ dCheckComplete: Boolean!
+ moonClearanceComplete: Boolean!
}
input AddReviewInput {
shuttleId: String!
- reviewScoresRating: String!
+ reviewScoresRating: Decimal!
}
type Query {
diff --git a/examples/advanced/KedroSpaceflightsGQL/Infra/GqlServer/Types.cs b/examples/advanced/KedroSpaceflightsGQL/Infra/GqlServer/Types.cs
index b83cc3ab..04720359 100644
--- a/examples/advanced/KedroSpaceflightsGQL/Infra/GqlServer/Types.cs
+++ b/examples/advanced/KedroSpaceflightsGQL/Infra/GqlServer/Types.cs
@@ -5,8 +5,8 @@ namespace KedroSpaceflightsGQL.Infra.GqlServer.Types;
/// Raw string fields matching the CSV source schema.
public record CompanyRecord(
string Id,
- string CompanyRating,
- string IataApproved,
+ decimal CompanyRating,
+ bool IataApproved,
string CompanyLocation
);
@@ -15,24 +15,24 @@ public record ShuttleRecord(
string Id,
string ShuttleType,
string CompanyId,
- string Engines,
- string PassengerCapacity,
- string Crew,
- string Price,
- string DCheckComplete,
- string MoonClearanceComplete
+ int Engines,
+ int PassengerCapacity,
+ int Crew,
+ decimal Price,
+ bool DCheckComplete,
+ bool MoonClearanceComplete
);
/// Raw string fields matching the CSV source schema.
-public record ReviewRecord(string ShuttleId, string ReviewScoresRating);
+public record ReviewRecord(string ShuttleId, decimal ReviewScoresRating);
// ── GQL input types ────────────────────────────────────────────────────────
/// Input for the addCompany mutation.
public record AddCompanyInput(
string Id,
- string CompanyRating,
- string IataApproved,
+ decimal CompanyRating,
+ bool IataApproved,
string CompanyLocation
);
@@ -41,13 +41,13 @@ public record AddShuttleInput(
string Id,
string ShuttleType,
string CompanyId,
- string Engines,
- string PassengerCapacity,
- string Crew,
- string Price,
- string DCheckComplete,
- string MoonClearanceComplete
+ int Engines,
+ int PassengerCapacity,
+ int Crew,
+ decimal Price,
+ bool DCheckComplete,
+ bool MoonClearanceComplete
);
/// Input for the addReview mutation.
-public record AddReviewInput(string ShuttleId, string ReviewScoresRating);
+public record AddReviewInput(string ShuttleId, decimal ReviewScoresRating);
diff --git a/examples/advanced/RetailDataSplitFlow/Program.cs b/examples/advanced/RetailDataSplitFlow/Program.cs
index 614a12ac..17860704 100644
--- a/examples/advanced/RetailDataSplitFlow/Program.cs
+++ b/examples/advanced/RetailDataSplitFlow/Program.cs
@@ -1,9 +1,9 @@
using Flowthru.Core.Cli;
-using Flowthru.Extensions.Python;
-using Flowthru.Extensions.Python.Services;
using Flowthru.Core.Meta;
using Flowthru.Core.Meta.Providers;
using Flowthru.Core.Services;
+using Flowthru.Extensions.Python;
+using Flowthru.Extensions.Python.Services;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
@@ -111,6 +111,8 @@ string outputPath
mermaid.WithOutputDirectory(metadataPath)
);
});
+
+ flowthru.ConfigureExecution(opts => opts.MaxDegreeOfParallelism = 8);
});
services.AddLogging(logging =>
diff --git a/src/core/Flowthru.Core/Cli/ArgumentParser.cs b/src/core/Flowthru.Core/Cli/ArgumentParser.cs
index 68fb0d4e..58690c94 100644
--- a/src/core/Flowthru.Core/Cli/ArgumentParser.cs
+++ b/src/core/Flowthru.Core/Cli/ArgumentParser.cs
@@ -1,6 +1,6 @@
using Flowthru.Core.Flows;
-using Flowthru.Core.Services.Models;
using Flowthru.Core.Graph;
+using Flowthru.Core.Services.Models;
namespace Flowthru.Core.Cli;
@@ -108,6 +108,31 @@ public static ParsedArguments Parse(string[] args, IEnumerable available
}
break;
+ case "--parallelism":
+ if (i + 1 < args.Length)
+ {
+ var value = args[++i];
+ if (value.Equals("auto", StringComparison.OrdinalIgnoreCase))
+ {
+ options.MaxDegreeOfParallelism = Environment.ProcessorCount;
+ }
+ else if (int.TryParse(value, out var n) && n >= 1)
+ {
+ options.MaxDegreeOfParallelism = n;
+ }
+ else
+ {
+ throw new ArgumentException(
+ $"--parallelism requires a positive integer or 'auto', got: '{value}'"
+ );
+ }
+ }
+ else
+ {
+ throw new ArgumentException("--parallelism requires a value");
+ }
+ break;
+
case "--help":
case "-h":
return new ParsedArguments { ShowHelp = true };
diff --git a/src/core/Flowthru.Core/Cli/FlowthruCli.cs b/src/core/Flowthru.Core/Cli/FlowthruCli.cs
index ebf5cfd3..a22cb5ca 100644
--- a/src/core/Flowthru.Core/Cli/FlowthruCli.cs
+++ b/src/core/Flowthru.Core/Cli/FlowthruCli.cs
@@ -169,6 +169,8 @@ private void ShowHelp()
_output.WriteLine();
_output.WriteLine("Options:");
_output.WriteLine(" --dry-run Validate without executing steps");
+ _output.WriteLine(" --parallelism N|auto Max concurrent steps (default: 1 sequential).");
+ _output.WriteLine(" 'auto' uses Environment.ProcessorCount.");
_output.WriteLine(" --no-metadata Disable metadata export");
_output.WriteLine(" --metadata-output DIR Specify metadata output directory");
_output.WriteLine(" -h, --help Show this help message");
diff --git a/src/core/Flowthru.Core/Flows/ExecutionOptions.cs b/src/core/Flowthru.Core/Flows/ExecutionOptions.cs
index ee999ed4..627e549d 100644
--- a/src/core/Flowthru.Core/Flows/ExecutionOptions.cs
+++ b/src/core/Flowthru.Core/Flows/ExecutionOptions.cs
@@ -1,5 +1,5 @@
-using Flowthru.Core.Results;
using Flowthru.Core.Graph;
+using Flowthru.Core.Results;
namespace Flowthru.Core.Flows;
@@ -34,13 +34,23 @@ public class ExecutionOptions
public bool StopOnFirstError { get; set; } = true;
///
- /// Whether to enable parallel execution of nodes within the same layer.
+ /// Maximum number of steps that may execute concurrently.
///
///
- /// Phase 2 feature - currently not implemented.
- /// When true, nodes in the same execution layer run concurrently.
+ ///
+ /// Controls the degree of parallelism in the task-graph scheduler. Steps whose
+ /// dependencies are all satisfied are dispatched immediately, up to this limit.
+ ///
+ ///
+ ///
+ /// - null (default) — not specified at this layer; defers to the service-level
+ /// default set via flowthru.ConfigureExecution(), or 1 if that is also unset.
+ /// - 1 — sequential execution; one step at a time in topological order.
+ /// - N > 1 — up to N independent steps run concurrently.
+ ///
+ ///
///
- public bool EnableParallelExecution { get; set; } = false;
+ public int? MaxDegreeOfParallelism { get; set; } = null;
///
/// The result formatter to use for displaying execution results.
diff --git a/src/core/Flowthru.Core/Flows/Flow.cs b/src/core/Flowthru.Core/Flows/Flow.cs
index 29dc490f..b8302922 100644
--- a/src/core/Flowthru.Core/Flows/Flow.cs
+++ b/src/core/Flowthru.Core/Flows/Flow.cs
@@ -553,20 +553,40 @@ public DagMetadata ExportDag()
}
///
- /// /// Builds and executes the flow, returning comprehensive execution results.
+ /// Builds and executes the flow, returning comprehensive execution results.
///
/// Cancellation token to signal graceful shutdown
- /// FlowResult containing execution status, timing, and Flow results
+ /// FlowResult containing execution status, timing, and step results
///
- ///
/// This is the primary high-level API for executing flows. It automatically
- /// calls Build() if the Flow hasn't been built yet, then executes and tracks results.
+ /// calls Build() if the Flow hasn't been built yet, then executes via the
+ /// task-graph scheduler with default options (sequential, stop on first error).
+ ///
+ public Task RunAsync(CancellationToken cancellationToken) =>
+ RunAsync(new ExecutionOptions(), cancellationToken);
+
+ ///
+ /// Builds and executes the flow with the supplied execution options.
+ ///
+ /// Controls parallelism, error policy, and other execution behaviour.
+ /// Cancellation token to signal graceful shutdown
+ /// FlowResult containing execution status, timing, and step results
+ ///
+ ///
+ /// Steps are dispatched by the task-graph scheduler as soon as all their dependencies
+ /// complete, up to concurrent steps.
+ ///
+ ///
+ /// With MaxDegreeOfParallelism = 1 (default) execution is sequential and
+ /// behaviourally equivalent to the previous layer-by-layer loop.
///
///
- public async Task RunAsync(CancellationToken cancellationToken)
+ public async Task RunAsync(
+ ExecutionOptions options,
+ CancellationToken cancellationToken
+ )
{
var stopwatch = Stopwatch.StartNew();
- var stepResults = new Dictionary();
try
{
@@ -577,33 +597,37 @@ public async Task RunAsync(CancellationToken cancellationToken)
Build();
}
- Logger?.LogInformation("Starting flow execution via RunAsync()");
+ var stepList = (IReadOnlyList)(_slicedSteps ?? _steps);
- // Execute all layers
- foreach (var layer in ExecutionLayers!)
- {
- Logger?.LogInformation("Executing layer with {StepCount} steps", layer.Count);
+ // Resolve MaxDegreeOfParallelism: null means "not specified", default to 1 (sequential).
+ var parallelism = options.MaxDegreeOfParallelism ?? 1;
- foreach (var flowStep in layer)
- {
- // Check for cancellation before starting each step
- cancellationToken.ThrowIfCancellationRequested();
+ Logger?.LogInformation(
+ "Starting flow execution via RunAsync() ({StepCount} steps, parallelism={Parallelism})",
+ stepList.Count,
+ parallelism
+ );
- var stepResult = await ExecuteStepWithTrackingAsync(flowStep, cancellationToken);
- stepResults[flowStep.Label] = stepResult;
+ var executor = new Graph.TaskGraphExecutor(
+ stepList,
+ parallelism,
+ ExecuteStepWithTrackingAsync,
+ Logger
+ );
- // If step failed, stop execution
- if (!stepResult.Success)
- {
- stopwatch.Stop();
- return FlowResult.CreateFailure(
- stopwatch.Elapsed,
- stepResult.Exception!,
- stepResults,
- Name
- );
- }
- }
+ var stepResults = await executor.RunAsync(options.StopOnFirstError, cancellationToken);
+
+ // Surface the first failure when StopOnFirstError is true.
+ var firstFailure = stepResults.Values.FirstOrDefault(r => !r.Success);
+ if (firstFailure != null)
+ {
+ stopwatch.Stop();
+ return FlowResult.CreateFailure(
+ stopwatch.Elapsed,
+ firstFailure.Exception!,
+ stepResults,
+ Name
+ );
}
stopwatch.Stop();
@@ -616,8 +640,7 @@ public async Task RunAsync(CancellationToken cancellationToken)
}
catch (OperationCanceledException)
{
- // Re-throw cancellation exceptions so they propagate to the caller
- // Cancellation is not a failure but a requested abort
+ // Re-throw — cancellation is not a failure but a requested abort.
stopwatch.Stop();
throw;
}
@@ -625,31 +648,23 @@ public async Task RunAsync(CancellationToken cancellationToken)
{
stopwatch.Stop();
Logger?.LogError(ex, "Flow execution failed: {ErrorMessage}", ex.Message);
- return FlowResult.CreateFailure(stopwatch.Elapsed, ex, stepResults, Name);
+ return FlowResult.CreateFailure(
+ stopwatch.Elapsed,
+ ex,
+ new Dictionary(),
+ Name
+ );
}
}
///
- /// Executes the Flow sequentially, layer by layer.
+ /// Executes the flow in topological order, throwing on the first step failure.
///
/// Cancellation token to signal graceful shutdown
- /// Task representing the Flow execution
- /// Thrown if Flow has not been built
+ /// Task representing the flow execution
+ /// Thrown if the flow has not been built
///
- ///
- /// This method executes Flow in topological order:
- /// 1. Execute all Flow in layer 0 sequentially
- /// 2. Execute all Flow in layer 1 sequentially
- /// 3. Continue until all layers are complete
- ///
- ///
- /// Note: This method throws exceptions on failure. For result-based
- /// execution with error handling, use RunAsync() instead.
- ///
- ///
- /// In Phase 2, this will be replaced with a parallel executor that can run
- /// steps within the same layer concurrently.
- ///
+ /// For structured result-based execution (including parallel), use .
///
public async Task ExecuteAsync(CancellationToken cancellationToken)
{
@@ -664,22 +679,17 @@ public async Task ExecuteAsync(CancellationToken cancellationToken)
try
{
- foreach (var layer in ExecutionLayers!)
- {
- Logger?.LogInformation("Executing layer with {StepCount} steps", layer.Count);
+ var result = await RunAsync(cancellationToken);
- foreach (var flowStep in layer)
- {
- // Check for cancellation before starting each step
- cancellationToken.ThrowIfCancellationRequested();
-
- await ExecuteStepAsync(flowStep, cancellationToken);
- }
+ if (!result.Success)
+ {
+ throw result.Exception
+ ?? new InvalidOperationException("Flow execution failed with no exception details.");
}
Logger?.LogInformation("Flow execution completed successfully");
}
- catch (Exception ex)
+ catch (Exception ex) when (ex is not OperationCanceledException)
{
Logger?.LogError(ex, "Flow execution failed: {ErrorMessage}", ex.Message);
throw;
diff --git a/src/core/Flowthru.Core/Graph/TaskGraphExecutor.cs b/src/core/Flowthru.Core/Graph/TaskGraphExecutor.cs
new file mode 100644
index 00000000..132848e2
--- /dev/null
+++ b/src/core/Flowthru.Core/Graph/TaskGraphExecutor.cs
@@ -0,0 +1,332 @@
+using System.Collections.Concurrent;
+using Flowthru.Core.Flows;
+using Microsoft.Extensions.Logging;
+
+namespace Flowthru.Core.Graph;
+
+///
+/// Executes a built flow DAG using a task-graph scheduler.
+///
+///
+///
+/// Steps are dispatched as soon as all their dependencies complete, up to a configurable
+/// maximum degree of parallelism. With MaxDegreeOfParallelism = 1 the scheduler
+/// produces a valid topological order and is behaviourally equivalent to the previous
+/// sequential layer-by-layer loop, though the specific ordering within a layer may differ
+/// (depth-first rather than breadth-first).
+///
+///
+/// Correctness guarantees provided by the DAG:
+///
+/// - Single Producer Rule — no two steps write to the same catalog entry, so
+/// concurrent steps in the same "layer" cannot have write conflicts.
+/// - Dependency edges — a step is never dispatched before all its producers have
+/// written their outputs.
+///
+///
+///
+internal sealed class TaskGraphExecutor
+{
+ private readonly IReadOnlyList _steps;
+ private readonly int _maxDegreeOfParallelism;
+ private readonly ILogger? _logger;
+ private readonly Func> _executeStep;
+
+ ///
+ /// The flat list of steps to execute (after slicing). Dependencies must have been
+ /// resolved by before calling
+ /// .
+ ///
+ ///
+ /// Maximum concurrent steps. Pass 1 for sequential execution.
+ /// Pass for unbounded parallelism.
+ ///
+ /// Per-step execution delegate (matches ExecuteStepWithTrackingAsync).
+ /// Optional logger.
+ internal TaskGraphExecutor(
+ IReadOnlyList steps,
+ int maxDegreeOfParallelism,
+ Func> executeStep,
+ ILogger? logger = null
+ )
+ {
+ if (maxDegreeOfParallelism < 1 && maxDegreeOfParallelism != -1)
+ {
+ throw new ArgumentOutOfRangeException(
+ nameof(maxDegreeOfParallelism),
+ "Must be -1 (unbounded) or a positive integer."
+ );
+ }
+
+ _steps = steps;
+ _maxDegreeOfParallelism = maxDegreeOfParallelism == -1 ? int.MaxValue : maxDegreeOfParallelism;
+ _executeStep = executeStep;
+ _logger = logger;
+ }
+
+ ///
+ /// Runs all steps in dependency order, returning per-step results.
+ ///
+ ///
+ /// When true, cancels in-flight steps and stops dispatch on the first failure.
+ /// When false, only the failed step and its transitive dependents are skipped;
+ /// independent branches continue executing.
+ ///
+ /// External cancellation token.
+ ///
+ /// Dictionary of step label → . Includes results for every step
+ /// that was dispatched (not steps skipped due to a failed dependency).
+ ///
+ internal async Task> RunAsync(
+ bool stopOnFirstError,
+ CancellationToken cancellationToken
+ )
+ {
+ // Fast-path: surface external cancellation before any work begins.
+ // ThrowIfCancellationRequested throws OperationCanceledException (base type), which
+ // is what callers expect — not the TaskCanceledException that semaphore.WaitAsync
+ // would throw if we let it reach the dispatch loop with an already-cancelled token.
+ cancellationToken.ThrowIfCancellationRequested();
+
+ // --- Build scheduling state ---------------------------------------------------
+
+ // How many unfinished dependencies each step is still waiting on.
+ var pendingDeps = new ConcurrentDictionary(
+ _steps.Select(s => new KeyValuePair(s, s.Dependencies.Count))
+ );
+
+ // Reverse adjacency: for each step, which steps depend on it?
+ var dependents = _steps.ToDictionary(s => s, _ => new List());
+ foreach (var step in _steps)
+ {
+ foreach (var dep in step.Dependencies)
+ {
+ // dep must be in dependents because BuildDependencyGraph only adds
+ // steps that are in _steps (sliced or full set).
+ if (dependents.TryGetValue(dep, out var list))
+ {
+ list.Add(step);
+ }
+ }
+ }
+
+ // Steps whose upstream is fully satisfied (or has no dependencies).
+ // Channel is unbounded write / bounded dispatch (controlled by the semaphore).
+ var readyQueue = new ConcurrentQueue(_steps.Where(s => s.Dependencies.Count == 0));
+
+ // Tracks which steps were skipped because an upstream dependency failed.
+ var skipped = new HashSet();
+
+ var results = new ConcurrentDictionary();
+ var semaphore = new SemaphoreSlim(_maxDegreeOfParallelism, _maxDegreeOfParallelism);
+ var dispatchedCount = 0; // incremented atomically before each step starts
+
+ // Linked source: either the external token or our own stop-on-error cancellation.
+ using var internalCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
+ var dispatchToken = internalCts.Token;
+
+ // --- Dispatch loop ------------------------------------------------------------
+
+ // We spin tasks until every step has either produced a result or been skipped.
+ var inFlight = new List();
+ var totalSteps = _steps.Count;
+
+ // Pre-populate worker slot IDs (1..N). The semaphore guarantees at most N concurrent
+ // tasks hold the semaphore — so a slot is always available on a successful WaitAsync.
+ var slotCount = Math.Max(1, Math.Min(_maxDegreeOfParallelism, totalSteps));
+ var workerSlots = new ConcurrentQueue(Enumerable.Range(1, slotCount));
+
+ while (results.Count + skipped.Count < totalSteps)
+ {
+ // Drain all currently runnable steps into in-flight tasks.
+ while (readyQueue.TryDequeue(out var step))
+ {
+ if (skipped.Contains(step))
+ {
+ // Already marked skipped (a dependency failed after this was enqueued).
+ continue;
+ }
+
+ await semaphore.WaitAsync(dispatchToken).ConfigureAwait(false);
+
+ // Slot dequeue is safe here: semaphore ensures at most slotCount concurrent holders.
+ workerSlots.TryDequeue(out var workerId);
+ var capturedStep = step;
+ var task = Task.Run(
+ async () =>
+ {
+ try
+ {
+ var startOrdinal = Interlocked.Increment(ref dispatchedCount);
+ _logger?.LogInformation(
+ " → {StepName} executing... ({StartOrdinal} of {Total} steps, worker {WorkerId}/{TotalWorkers})",
+ capturedStep.Label,
+ startOrdinal,
+ totalSteps,
+ workerId,
+ slotCount
+ );
+
+ var result = await _executeStep(capturedStep, dispatchToken).ConfigureAwait(false);
+ results[capturedStep.Label] = result;
+ var completedCount = results.Count;
+
+ if (!result.Success)
+ {
+ _logger?.LogWarning(
+ " ✗ {StepName} failed ({CompletedCount} of {Total} steps, worker {WorkerId}/{TotalWorkers})",
+ capturedStep.Label,
+ completedCount,
+ totalSteps,
+ workerId,
+ slotCount
+ );
+
+ if (stopOnFirstError)
+ {
+ // Cancel everything else.
+ internalCts.Cancel();
+ }
+ else
+ {
+ // Only skip transitive dependents of this step.
+ SkipDownstream(capturedStep, skipped, dependents);
+ }
+ }
+ else
+ {
+ _logger?.LogInformation(
+ " ✓ {StepName,-40} {Duration,6:F2}s ({InputCount,6} → {OutputCount,6} records) ({CompletedCount} of {Total} steps, worker {WorkerId}/{TotalWorkers})",
+ capturedStep.Label,
+ result.ExecutionTime.TotalSeconds,
+ result.InputCount,
+ result.OutputCount,
+ completedCount,
+ totalSteps,
+ workerId,
+ slotCount
+ );
+
+ // Notify dependents — decrement their pending count.
+ EnqueueReadyDependents(capturedStep, pendingDeps, dependents, skipped, readyQueue);
+ }
+ }
+ finally
+ {
+ workerSlots.Enqueue(workerId);
+ semaphore.Release();
+ }
+ },
+ dispatchToken
+ );
+
+ inFlight.Add(task);
+ }
+
+ if (inFlight.Count == 0)
+ {
+ // Nothing dispatched and nothing running. If not all steps accounted for,
+ // the dependency graph has a cycle that AssignLayers should have caught.
+ if (results.Count + skipped.Count < totalSteps)
+ {
+ var unaccounted = _steps
+ .Where(s => !results.ContainsKey(s.Label) && !skipped.Contains(s))
+ .Select(s => s.Label);
+ throw new InvalidOperationException(
+ $"Task graph stalled — possible undetected cycle. Unresolved steps: "
+ + string.Join(", ", unaccounted)
+ );
+ }
+ break;
+ }
+
+ // Wait for at least one in-flight task to finish, then re-check the ready queue.
+ var completed = await Task.WhenAny(inFlight).ConfigureAwait(false);
+
+ // Propagate any unhandled exceptions from the task wrapper.
+ // (Step failures are encoded in StepResult, not thrown — but defensive.)
+ try
+ {
+ await completed.ConfigureAwait(false);
+ }
+ catch (OperationCanceledException)
+ {
+ // Expected when stopOnFirstError cancels the token.
+ }
+
+ inFlight.Remove(completed);
+
+ // External cancellation — break immediately; ThrowIfCancellationRequested below
+ // will surface the OperationCanceledException to the caller.
+ if (cancellationToken.IsCancellationRequested)
+ {
+ break;
+ }
+
+ // Internal stop-on-first-error cancellation — drain remaining in-flight tasks
+ // (they may still be running; let them finish before we return) then break.
+ if (dispatchToken.IsCancellationRequested)
+ {
+ try
+ {
+ await Task.WhenAll(inFlight).ConfigureAwait(false);
+ }
+ catch (OperationCanceledException) { }
+
+ break;
+ }
+ }
+
+ // Propagate external cancellation to the caller.
+ cancellationToken.ThrowIfCancellationRequested();
+
+ return new Dictionary(results);
+ }
+
+ // ---------------------------------------------------------------------------
+ // Helpers
+ // ---------------------------------------------------------------------------
+
+ private static void EnqueueReadyDependents(
+ FlowStep completedStep,
+ ConcurrentDictionary pendingDeps,
+ Dictionary> dependents,
+ HashSet skipped,
+ ConcurrentQueue readyQueue
+ )
+ {
+ foreach (var dependent in dependents[completedStep])
+ {
+ if (skipped.Contains(dependent))
+ {
+ continue;
+ }
+
+ var remaining = pendingDeps.AddOrUpdate(dependent, 0, (_, current) => current - 1);
+ if (remaining == 0)
+ {
+ readyQueue.Enqueue(dependent);
+ }
+ }
+ }
+
+ private static void SkipDownstream(
+ FlowStep failedStep,
+ HashSet skipped,
+ Dictionary> dependents
+ )
+ {
+ // BFS over the dependents graph.
+ var queue = new Queue(dependents[failedStep]);
+ while (queue.TryDequeue(out var step))
+ {
+ if (skipped.Add(step))
+ {
+ foreach (var downstream in dependents[step])
+ {
+ queue.Enqueue(downstream);
+ }
+ }
+ }
+ }
+}
diff --git a/src/core/Flowthru.Core/Services/FlowthruExecutionDefaults.cs b/src/core/Flowthru.Core/Services/FlowthruExecutionDefaults.cs
new file mode 100644
index 00000000..7c460857
--- /dev/null
+++ b/src/core/Flowthru.Core/Services/FlowthruExecutionDefaults.cs
@@ -0,0 +1,15 @@
+namespace Flowthru.Core.Services;
+
+///
+/// Carries service-level execution defaults registered by .
+/// Injected into and applied when a per-call
+/// does not specify a value.
+///
+internal sealed class FlowthruExecutionDefaults
+{
+ ///
+ /// Service-level default for .
+ /// null means "not configured at this layer; fall back to 1".
+ ///
+ public int? MaxDegreeOfParallelism { get; init; }
+}
diff --git a/src/core/Flowthru.Core/Services/FlowthruService.cs b/src/core/Flowthru.Core/Services/FlowthruService.cs
index 7d832b52..ff784ed0 100644
--- a/src/core/Flowthru.Core/Services/FlowthruService.cs
+++ b/src/core/Flowthru.Core/Services/FlowthruService.cs
@@ -27,6 +27,7 @@ internal sealed class FlowthruService : IFlowthruService
private readonly IServiceProvider _services;
private readonly ILogger _logger;
private readonly FlowthruMetadataBuilder? _metadataBuilder;
+ private readonly FlowthruExecutionDefaults _executionDefaults;
///
/// Initializes a new instance of FlowthruService.
@@ -36,6 +37,7 @@ public FlowthruService(
Dictionary pipelines,
IServiceProvider services,
ILogger logger,
+ FlowthruExecutionDefaults executionDefaults,
FlowthruMetadataBuilder? metadataBuilder = null
)
{
@@ -43,6 +45,7 @@ public FlowthruService(
_pipelines = pipelines ?? throw new ArgumentNullException(nameof(pipelines));
_services = services ?? throw new ArgumentNullException(nameof(services));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
+ _executionDefaults = executionDefaults ?? new FlowthruExecutionDefaults();
_metadataBuilder = metadataBuilder;
// Inject services into all registered catalogs
@@ -98,6 +101,10 @@ public async Task ExecuteFlowAsync(
options ??= new ExecutionOptions();
+ // Resolve MaxDegreeOfParallelism: CLI/caller value wins; service default is fallback; 1 is the floor.
+ options.MaxDegreeOfParallelism =
+ options.MaxDegreeOfParallelism ?? _executionDefaults.MaxDegreeOfParallelism ?? 1;
+
// ════════════════════════════════════════
// PRE-FLIGHT CHECKS
// ════════════════════════════════════════
@@ -136,8 +143,17 @@ public async Task ExecuteFlowAsync(
validationResult.ThrowIfInvalid();
}
- var layer0Steps = mergedPipeline.ExecutionLayers![0];
- validatedInputCount = layer0Steps.SelectMany(node => node.Inputs).Distinct().Count();
+ var allMergedSteps = mergedPipeline.Steps.ToList();
+ var mergedProducedLabels = new HashSet(
+ allMergedSteps.SelectMany(s => s.Outputs.Select(o => o.Label)),
+ StringComparer.OrdinalIgnoreCase
+ );
+ validatedInputCount = allMergedSteps
+ .SelectMany(s => s.Inputs)
+ .Where(i => !mergedProducedLabels.Contains(i.Label))
+ .Select(i => i.Label)
+ .Distinct(StringComparer.OrdinalIgnoreCase)
+ .Count();
_logger.LogInformation(" ✓ {Count} external data sources validated", validatedInputCount);
}
@@ -202,7 +218,7 @@ public async Task ExecuteFlowAsync(
_logger.LogInformation("");
// Execute merged pipeline
- var result = await mergedPipeline.RunAsync(cancellationToken);
+ var result = await mergedPipeline.RunAsync(options, cancellationToken);
// Format results
var formatter = options.GetFormatter();
@@ -222,12 +238,17 @@ public FlowMetadata GetFlowMetadata(string pipelineName)
);
}
- var externalInputs =
- pipeline
- .ExecutionLayers?[0].SelectMany(node => node.Inputs)
- .Select(e => e.Label)
- .Distinct()
- .ToList() ?? new List();
+ var allSteps = pipeline.Steps.ToList();
+ var producedLabels = new HashSet(
+ allSteps.SelectMany(s => s.Outputs.Select(o => o.Label)),
+ StringComparer.OrdinalIgnoreCase
+ );
+ var externalInputs = allSteps
+ .SelectMany(s => s.Inputs)
+ .Where(i => !producedLabels.Contains(i.Label))
+ .Select(i => i.Label)
+ .Distinct(StringComparer.OrdinalIgnoreCase)
+ .ToList();
return new FlowMetadata
{
diff --git a/src/core/Flowthru.Core/Services/FlowthruServiceBuilder.cs b/src/core/Flowthru.Core/Services/FlowthruServiceBuilder.cs
index 462b2926..bb08c02c 100644
--- a/src/core/Flowthru.Core/Services/FlowthruServiceBuilder.cs
+++ b/src/core/Flowthru.Core/Services/FlowthruServiceBuilder.cs
@@ -40,12 +40,42 @@ private readonly List<
private Func>? _flowFactory;
private FlowRegistrationEntry? _lastRegistration;
private IConfiguration? _configuration;
+ private int? _defaultMaxDegreeOfParallelism;
internal FlowthruServiceBuilder(IServiceCollection services)
{
_services = services ?? throw new ArgumentNullException(nameof(services));
}
+ ///
+ /// Configures default execution behaviour for all pipelines run through this service.
+ ///
+ ///
+ /// Settings applied here act as the service-level default. They are overridden by any
+ /// value explicitly supplied in the passed to
+ /// (e.g. from the CLI --parallelism
+ /// flag). Properties left null or at their default in the per-call options fall
+ /// back to these values.
+ ///
+ ///
+ ///
+ /// services.AddFlowthru(flowthru =>
+ /// {
+ /// flowthru.ConfigureExecution(opts =>
+ /// {
+ /// opts.MaxDegreeOfParallelism = Environment.ProcessorCount;
+ /// });
+ /// });
+ ///
+ ///
+ public FlowthruServiceBuilder ConfigureExecution(Action configure)
+ {
+ var defaults = new ExecutionOptions();
+ configure(defaults);
+ _defaultMaxDegreeOfParallelism = defaults.MaxDegreeOfParallelism;
+ return this;
+ }
+
///
/// Internal entry type that carries a Flow factory and its associated metadata.
/// Replaces the FlowRegistrar indirection for cleaner multi-catalog support.
@@ -619,6 +649,12 @@ private static void ApplyFileConfiguration(TBuilder builder, MetadataO
///
internal void RegisterFlowDictionary()
{
+ // Register the service-level default parallelism so FlowthruService can consume it.
+ var defaultParallelism = _defaultMaxDegreeOfParallelism;
+ _services.AddSingleton(
+ new FlowthruExecutionDefaults { MaxDegreeOfParallelism = defaultParallelism }
+ );
+
// Always register the catalog collection so FlowthruService can inject all catalogs.
// Merges both type-registered catalogs (RegisterCatalog) and dynamically constructed
// catalog collections (RegisterCatalogs).
diff --git a/tests/Flowthru.Tests/03_Execution/ParallelExecutionTests.cs b/tests/Flowthru.Tests/03_Execution/ParallelExecutionTests.cs
new file mode 100644
index 00000000..0a54a4fa
--- /dev/null
+++ b/tests/Flowthru.Tests/03_Execution/ParallelExecutionTests.cs
@@ -0,0 +1,355 @@
+using System.Collections.Concurrent;
+using Flowthru.Core.Flows;
+using Flowthru.Core.Graph;
+using Flowthru.Tests.Fixtures.TestCatalogs;
+using Flowthru.Tests.Fixtures.TestSteps;
+
+namespace Flowthru.Tests.Execution;
+
+///
+/// Tests verifying the task-graph scheduler's parallel dispatch behaviour.
+///
+///
+///
+/// Concurrency is verified structurally (overlapping execution windows) rather than
+/// purely by elapsed time — wall-clock assertions are inherently flaky on loaded CI
+/// machines. Each test that needs to prove steps ran concurrently compares the
+/// start/end timestamps recorded by and asserts that at
+/// least two windows overlap.
+///
+///
+/// The core DAG used across most tests is a diamond:
+///
+/// (external) Input ──→ StepA (BranchA) ──┐
+/// └──→ StepB (BranchB) ──┴──→ StepMerge (Merged)
+///
+/// StepA and StepB have no dependency on each other, so they are eligible for
+/// concurrent dispatch whenever MaxDegreeOfParallelism > 1.
+///
+///
+[TestFixture]
+[Category("Execution")]
+[Category("Parallel")]
+public class ParallelExecutionTests
+{
+ // ─────────────────────────────────────────────────────────────────────────
+ // Helpers
+ // ─────────────────────────────────────────────────────────────────────────
+
+ private static readonly IEnumerable SeedData =
+ [
+ new TestData
+ {
+ Id = 1,
+ Name = "A",
+ Value = 1.0,
+ },
+ new TestData
+ {
+ Id = 2,
+ Name = "B",
+ Value = 2.0,
+ },
+ ];
+
+ private static readonly TimeSpan BranchDelay = TimeSpan.FromMilliseconds(300);
+
+ ///
+ /// Returns true when two execution windows overlap in time.
+ ///
+ private static bool Overlaps(
+ (string Label, DateTime Start, DateTime End) a,
+ (string Label, DateTime Start, DateTime End) b
+ ) => a.Start < b.End && b.Start < a.End;
+
+ // ─────────────────────────────────────────────────────────────────────────
+ // Correctness — parallel produces the same results as sequential
+ // ─────────────────────────────────────────────────────────────────────────
+
+ [Test]
+ public async Task RunAsync_WithParallelism_ProducesCorrectResults()
+ {
+ var catalog = new ParallelBranchCatalog();
+ await catalog.Input.Save(SeedData).Run();
+
+ var flow = FlowBuilder.CreateFlow(builder =>
+ {
+ builder.AddStep("StepA", PassthroughStep.Create(), catalog.Input, catalog.BranchA);
+ builder.AddStep("StepB", PassthroughStep.Create(), catalog.Input, catalog.BranchB);
+ builder.AddStep(
+ "StepMerge",
+ MergeStep.Create(),
+ (catalog.BranchA, catalog.BranchB),
+ catalog.Merged
+ );
+ });
+
+ var result = await flow.RunAsync(
+ new ExecutionOptions { MaxDegreeOfParallelism = 2 },
+ CancellationToken.None
+ );
+
+ Assert.That(result.Success, Is.True);
+ var merged = await catalog.Merged.Load().Run();
+ Assert.That(
+ merged.Count(),
+ Is.EqualTo(SeedData.Count() * 2),
+ "Merge should contain both branches"
+ );
+ Assert.That(result.StepResults, Has.Count.EqualTo(3));
+ Assert.That(result.StepResults.Values.All(r => r.Success), Is.True);
+ }
+
+ // ─────────────────────────────────────────────────────────────────────────
+ // Concurrency — independent steps actually overlap when parallelism > 1
+ // ─────────────────────────────────────────────────────────────────────────
+
+ [Test]
+ public async Task RunAsync_WithParallelism_IndependentStepsOverlapInTime()
+ {
+ var catalog = new ParallelBranchCatalog();
+ await catalog.Input.Save(SeedData).Run();
+
+ var log = new ConcurrentBag<(string Label, DateTime Start, DateTime End)>();
+
+ var flow = FlowBuilder.CreateFlow(builder =>
+ {
+ builder.AddStep(
+ "StepA",
+ RecordingStep.Create(log, "StepA", BranchDelay),
+ catalog.Input,
+ catalog.BranchA
+ );
+ builder.AddStep(
+ "StepB",
+ RecordingStep.Create(log, "StepB", BranchDelay),
+ catalog.Input,
+ catalog.BranchB
+ );
+ });
+
+ var result = await flow.RunAsync(
+ new ExecutionOptions { MaxDegreeOfParallelism = 2 },
+ CancellationToken.None
+ );
+
+ Assert.That(result.Success, Is.True, "Flow should succeed");
+ Assert.That(log, Has.Count.EqualTo(2), "Both steps should have recorded");
+
+ var entries = log.ToList();
+ var stepA = entries.First(e => e.Label == "StepA");
+ var stepB = entries.First(e => e.Label == "StepB");
+
+ Assert.That(
+ Overlaps(stepA, stepB),
+ Is.True,
+ $"StepA [{stepA.Start:mm:ss.fff}–{stepA.End:mm:ss.fff}] and "
+ + $"StepB [{stepB.Start:mm:ss.fff}–{stepB.End:mm:ss.fff}] should overlap"
+ );
+ }
+
+ // ─────────────────────────────────────────────────────────────────────────
+ // Sequential constraint — MaxDegreeOfParallelism = 1 prevents overlap
+ // ─────────────────────────────────────────────────────────────────────────
+
+ [Test]
+ public async Task RunAsync_WithParallelismOne_IndependentStepsDoNotOverlap()
+ {
+ var catalog = new ParallelBranchCatalog();
+ await catalog.Input.Save(SeedData).Run();
+
+ var log = new ConcurrentBag<(string Label, DateTime Start, DateTime End)>();
+
+ var flow = FlowBuilder.CreateFlow(builder =>
+ {
+ builder.AddStep(
+ "StepA",
+ RecordingStep.Create(log, "StepA", BranchDelay),
+ catalog.Input,
+ catalog.BranchA
+ );
+ builder.AddStep(
+ "StepB",
+ RecordingStep.Create(log, "StepB", BranchDelay),
+ catalog.Input,
+ catalog.BranchB
+ );
+ });
+
+ var result = await flow.RunAsync(
+ new ExecutionOptions { MaxDegreeOfParallelism = 1 },
+ CancellationToken.None
+ );
+
+ Assert.That(result.Success, Is.True, "Flow should succeed");
+ var entries = log.ToList();
+ var stepA = entries.First(e => e.Label == "StepA");
+ var stepB = entries.First(e => e.Label == "StepB");
+
+ Assert.That(
+ Overlaps(stepA, stepB),
+ Is.False,
+ $"With MaxDegreeOfParallelism=1, StepA [{stepA.Start:mm:ss.fff}–{stepA.End:mm:ss.fff}] and "
+ + $"StepB [{stepB.Start:mm:ss.fff}–{stepB.End:mm:ss.fff}] must not overlap"
+ );
+ }
+
+ // ─────────────────────────────────────────────────────────────────────────
+ // StopOnFirstError = true (default) — halts on the first failure
+ // ─────────────────────────────────────────────────────────────────────────
+
+ [Test]
+ public async Task RunAsync_StopOnFirstErrorTrue_ReturnsFailureResult()
+ {
+ var catalog = new ParallelBranchCatalog();
+ await catalog.Input.Save(SeedData).Run();
+
+ var flow = FlowBuilder.CreateFlow(builder =>
+ {
+ builder.AddStep(
+ "FailingA",
+ FailingStep.Create("branch A failed"),
+ catalog.Input,
+ catalog.BranchA
+ );
+ builder.AddStep("StepB", PassthroughStep.Create(), catalog.Input, catalog.BranchB);
+ });
+
+ var result = await flow.RunAsync(
+ new ExecutionOptions { MaxDegreeOfParallelism = 2, StopOnFirstError = true },
+ CancellationToken.None
+ );
+
+ Assert.That(result.Success, Is.False, "Flow should fail");
+ Assert.That(result.Exception, Is.Not.Null);
+ Assert.That(result.StepResults["FailingA"].Success, Is.False);
+ }
+
+ // ─────────────────────────────────────────────────────────────────────────
+ // StopOnFirstError = false — independent branches complete; dependents skipped
+ // ─────────────────────────────────────────────────────────────────────────
+
+ [Test]
+ public async Task RunAsync_StopOnFirstErrorFalse_IndependentBranchCompletes()
+ {
+ var catalog = new ParallelBranchCatalog();
+ await catalog.Input.Save(SeedData).Run();
+
+ var flow = FlowBuilder.CreateFlow(builder =>
+ {
+ // StepA fails — BranchA will not be populated.
+ builder.AddStep(
+ "FailingA",
+ FailingStep.Create("branch A failed"),
+ catalog.Input,
+ catalog.BranchA
+ );
+ // StepB is independent and should still run and succeed.
+ builder.AddStep("StepB", PassthroughStep.Create(), catalog.Input, catalog.BranchB);
+ // StepMerge depends on BranchA (failed) — should be skipped.
+ builder.AddStep(
+ "StepMerge",
+ MergeStep.Create(),
+ (catalog.BranchA, catalog.BranchB),
+ catalog.Merged
+ );
+ });
+
+ var result = await flow.RunAsync(
+ new ExecutionOptions { MaxDegreeOfParallelism = 2, StopOnFirstError = false },
+ CancellationToken.None
+ );
+
+ Assert.That(result.Success, Is.False, "Flow should still report failure");
+ Assert.That(result.StepResults["FailingA"].Success, Is.False, "FailingA should be failed");
+ Assert.That(result.StepResults["StepB"].Success, Is.True, "Independent StepB should have run");
+ // StepMerge is a downstream dependent of FailingA — it should have been skipped.
+ Assert.That(
+ result.StepResults,
+ Does.Not.ContainKey("StepMerge"),
+ "StepMerge should be skipped"
+ );
+
+ var branchB = await catalog.BranchB.Load().Run();
+ Assert.That(branchB.Count(), Is.EqualTo(SeedData.Count()), "BranchB should have been written");
+ }
+
+ [Test]
+ public async Task RunAsync_StopOnFirstErrorFalse_DownstreamOfFailedStepIsSkipped()
+ {
+ // Linear chain: A → B → C. A fails. B and C should be skipped.
+ var catalog = new SimpleThreeStepCatalog();
+ await catalog.Input.Save(SeedData).Run();
+
+ var flow = FlowBuilder.CreateFlow(builder =>
+ {
+ builder.AddStep("StepA", FailingStep.Create("A failed"), catalog.Input, catalog.StepOne);
+ builder.AddStep("StepB", PassthroughStep.Create(), catalog.StepOne, catalog.StepTwo);
+ builder.AddStep("StepC", PassthroughStep.Create(), catalog.StepTwo, catalog.Output);
+ });
+
+ var result = await flow.RunAsync(
+ new ExecutionOptions { MaxDegreeOfParallelism = 1, StopOnFirstError = false },
+ CancellationToken.None
+ );
+
+ Assert.That(result.Success, Is.False);
+ Assert.That(result.StepResults["StepA"].Success, Is.False);
+ Assert.That(
+ result.StepResults,
+ Does.Not.ContainKey("StepB"),
+ "StepB (downstream) should be skipped"
+ );
+ Assert.That(
+ result.StepResults,
+ Does.Not.ContainKey("StepC"),
+ "StepC (downstream) should be skipped"
+ );
+ }
+
+ // ─────────────────────────────────────────────────────────────────────────
+ // Dependency ordering — downstream steps never start before their producers
+ // ─────────────────────────────────────────────────────────────────────────
+
+ [Test]
+ public async Task RunAsync_WithParallelism_DownstreamStepStartsAfterProducerCompletes()
+ {
+ // Linear chain: StepA (delayed) → StepB. Even with high parallelism, StepB must
+ // not start until StepA has written its output.
+ var catalog = new SimpleThreeStepCatalog();
+ await catalog.Input.Save(SeedData).Run();
+
+ var log = new ConcurrentBag<(string Label, DateTime Start, DateTime End)>();
+
+ var flow = FlowBuilder.CreateFlow(builder =>
+ {
+ builder.AddStep(
+ "StepA",
+ RecordingStep.Create(log, "StepA", BranchDelay),
+ catalog.Input,
+ catalog.StepOne
+ );
+ builder.AddStep(
+ "StepB",
+ RecordingStep.Create(log, "StepB", TimeSpan.Zero),
+ catalog.StepOne,
+ catalog.Output
+ );
+ });
+
+ var result = await flow.RunAsync(
+ new ExecutionOptions { MaxDegreeOfParallelism = 4 },
+ CancellationToken.None
+ );
+
+ Assert.That(result.Success, Is.True);
+
+ var entries = log.ToDictionary(e => e.Label);
+ Assert.That(
+ entries["StepB"].Start >= entries["StepA"].End,
+ Is.True,
+ $"StepB must not start [{entries["StepB"].Start:mm:ss.fff}] "
+ + $"before StepA finishes [{entries["StepA"].End:mm:ss.fff}]"
+ );
+ }
+}
diff --git a/tests/Flowthru.Tests/05_Cli/ArgumentParserParallelismTests.cs b/tests/Flowthru.Tests/05_Cli/ArgumentParserParallelismTests.cs
new file mode 100644
index 00000000..d257f813
--- /dev/null
+++ b/tests/Flowthru.Tests/05_Cli/ArgumentParserParallelismTests.cs
@@ -0,0 +1,102 @@
+using Flowthru.Core.Cli;
+using Flowthru.Core.Flows;
+
+namespace Flowthru.Tests.Cli;
+
+///
+/// Tests for focusing on the --parallelism flag.
+///
+[TestFixture]
+[Category("Cli")]
+[Category("Parallel")]
+public class ArgumentParserParallelismTests
+{
+ // Dummy flow names — ArgumentParser validates slicing flags against these;
+ // none of the parallelism tests use named flows so the list can be empty.
+ private static readonly IEnumerable NoFlows = [];
+
+ // ─────────────────────────────────────────────────────────────────────────
+ // Integer values
+ // ─────────────────────────────────────────────────────────────────────────
+
+ [TestCase(1)]
+ [TestCase(2)]
+ [TestCase(4)]
+ [TestCase(16)]
+ public void Parse_ParallelismInteger_SetsMaxDegreeOfParallelism(int n)
+ {
+ var parsed = ArgumentParser.Parse(["--parallelism", n.ToString()], NoFlows);
+
+ Assert.That(parsed.Options!.MaxDegreeOfParallelism, Is.EqualTo(n));
+ }
+
+ // ─────────────────────────────────────────────────────────────────────────
+ // "auto" keyword
+ // ─────────────────────────────────────────────────────────────────────────
+
+ [TestCase("auto")]
+ [TestCase("AUTO")]
+ [TestCase("Auto")]
+ public void Parse_ParallelismAuto_SetsMaxDegreeToProcessorCount(string token)
+ {
+ var parsed = ArgumentParser.Parse(["--parallelism", token], NoFlows);
+
+ Assert.That(
+ parsed.Options!.MaxDegreeOfParallelism,
+ Is.EqualTo(Environment.ProcessorCount),
+ "'auto' should map to Environment.ProcessorCount"
+ );
+ }
+
+ // ─────────────────────────────────────────────────────────────────────────
+ // Default (flag absent)
+ // ─────────────────────────────────────────────────────────────────────────
+
+ [Test]
+ public void Parse_NoParallelismFlag_LeavesNull()
+ {
+ var parsed = ArgumentParser.Parse([], NoFlows);
+
+ // Null means "unspecified" — the priority chain in FlowthruService resolves it
+ // to the service-level default, or 1 if none is configured.
+ Assert.That(parsed.Options!.MaxDegreeOfParallelism, Is.Null);
+ }
+
+ // ─────────────────────────────────────────────────────────────────────────
+ // Invalid values
+ // ─────────────────────────────────────────────────────────────────────────
+
+ [TestCase("0")]
+ [TestCase("-1")]
+ [TestCase("abc")]
+ [TestCase("3.5")]
+ public void Parse_ParallelismInvalidValue_Throws(string bad)
+ {
+ Assert.Throws(
+ () => ArgumentParser.Parse(["--parallelism", bad], NoFlows),
+ $"'--parallelism {bad}' should throw ArgumentException"
+ );
+ }
+
+ [Test]
+ public void Parse_ParallelismMissingValue_Throws()
+ {
+ Assert.Throws(
+ () => ArgumentParser.Parse(["--parallelism"], NoFlows),
+ "'--parallelism' without a value should throw ArgumentException"
+ );
+ }
+
+ // ─────────────────────────────────────────────────────────────────────────
+ // Composed with other flags
+ // ─────────────────────────────────────────────────────────────────────────
+
+ [Test]
+ public void Parse_ParallelismWithDryRun_BothOptionsSet()
+ {
+ var parsed = ArgumentParser.Parse(["--dry-run", "--parallelism", "4"], NoFlows);
+
+ Assert.That(parsed.Options!.MaxDegreeOfParallelism, Is.EqualTo(4));
+ Assert.That(parsed.Options.DryRun.Enabled, Is.True);
+ }
+}
diff --git a/tests/Flowthru.Tests/_Fixtures/TestCatalogs/TestCatalogs.cs b/tests/Flowthru.Tests/_Fixtures/TestCatalogs/TestCatalogs.cs
index c5a0a3fa..62dfb643 100644
--- a/tests/Flowthru.Tests/_Fixtures/TestCatalogs/TestCatalogs.cs
+++ b/tests/Flowthru.Tests/_Fixtures/TestCatalogs/TestCatalogs.cs
@@ -115,6 +115,38 @@ public DownstreamCatalog()
CreateItem(() => ItemFactory.Enumerable.Memory(label: "downstream_output"));
}
+///
+/// Diamond-topology catalog: a shared external input feeds two independent branches
+/// that are later merged into a single output.
+///
+///
+/// Topology:
+///
+/// (external) Input ──→ BranchA ──┐
+/// └──→ BranchB ──┴──→ Merged
+///
+/// StepA and StepB are independent — ideal for verifying parallel dispatch.
+///
+public class ParallelBranchCatalog : CatalogAbstract
+{
+ public ParallelBranchCatalog()
+ {
+ InitializeCatalogProperties();
+ }
+
+ public IItem> Input =>
+ CreateItem(() => ItemFactory.Enumerable.Memory(label: "pb_input"));
+
+ public IItem> BranchA =>
+ CreateItem(() => ItemFactory.Enumerable.Memory(label: "pb_branch_a"));
+
+ public IItem> BranchB =>
+ CreateItem(() => ItemFactory.Enumerable.Memory(label: "pb_branch_b"));
+
+ public IItem> Merged =>
+ CreateItem(() => ItemFactory.Enumerable.Memory(label: "pb_merged"));
+}
+
///
/// Third catalog for 3-arity pipeline tests.
///
diff --git a/tests/Flowthru.Tests/_Fixtures/TestSteps/TestSteps.cs b/tests/Flowthru.Tests/_Fixtures/TestSteps/TestSteps.cs
index 77347ba7..008e7d41 100644
--- a/tests/Flowthru.Tests/_Fixtures/TestSteps/TestSteps.cs
+++ b/tests/Flowthru.Tests/_Fixtures/TestSteps/TestSteps.cs
@@ -115,3 +115,28 @@ public static Func<
};
}
}
+
+///
+/// Step that records its execution window (start/end timestamp) to a shared log,
+/// then applies an optional delay. Used to verify concurrent vs. sequential dispatch.
+///
+public static class RecordingStep
+{
+ /// Shared bag that captures execution windows across steps.
+ /// Identifier written into the log entry.
+ /// How long the step blocks before completing.
+ public static Func, Task>> Create(
+ System.Collections.Concurrent.ConcurrentBag<(string Label, DateTime Start, DateTime End)> log,
+ string label,
+ TimeSpan delay
+ )
+ {
+ return async (input) =>
+ {
+ var start = DateTime.UtcNow;
+ await Task.Delay(delay);
+ log.Add((label, start, DateTime.UtcNow));
+ return input;
+ };
+ }
+}