Skip to content

Commit e0331e5

Browse files
authored
feat: multithreaded flows
feat: multithreaded flow execution
2 parents 98c1cad + 16453d5 commit e0331e5

30 files changed

Lines changed: 1422 additions & 417 deletions

.github/workflows/pr-tests.yml

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
name: PR Tests
2+
3+
on:
4+
pull_request:
5+
types: [opened, synchronize, reopened, ready_for_review]
6+
paths-ignore:
7+
- "**.md"
8+
- "docs/**"
9+
- ".github/instructions/**"
10+
11+
concurrency:
12+
group: ${{ github.workflow }}-${{ github.ref }}
13+
cancel-in-progress: true
14+
15+
jobs:
16+
test:
17+
runs-on: ubuntu-latest
18+
if: github.event.pull_request.draft == false
19+
20+
permissions:
21+
contents: read
22+
23+
steps:
24+
- name: Checkout code
25+
uses: actions/checkout@v4
26+
with:
27+
fetch-depth: 0
28+
29+
- name: Setup .NET
30+
uses: actions/setup-dotnet@v4
31+
with:
32+
dotnet-version: "10.0.x"
33+
dotnet-quality: "preview"
34+
env:
35+
DOTNET_SKIP_FIRST_TIME_EXPERIENCE: "true"
36+
DOTNET_NOLOGO: "true"
37+
DOTNET_CLI_TELEMETRY_OPTOUT: "true"
38+
39+
- name: Setup Node.js
40+
uses: actions/setup-node@v4
41+
with:
42+
node-version: "22"
43+
44+
- name: Setup Python
45+
uses: actions/setup-python@v5
46+
with:
47+
python-version: "3.10"
48+
49+
- name: Setup UV
50+
uses: astral-sh/setup-uv@v5
51+
52+
- name: Setup pnpm
53+
uses: pnpm/action-setup@v4
54+
with:
55+
version: 10.6.3
56+
57+
- name: Cache NuGet packages
58+
uses: actions/cache@v4
59+
with:
60+
path: ~/.nuget/packages
61+
key: ${{ runner.os }}-nuget-${{ hashFiles('**/packages.lock.json', '**/*.csproj') }}
62+
restore-keys: |
63+
${{ runner.os }}-nuget-
64+
65+
- name: Install Node dependencies
66+
run: pnpm install
67+
68+
- name: Run affected tests
69+
env:
70+
CI: "true"
71+
run: |
72+
pnpm nx affected -t test \
73+
--base=${{ github.event.pull_request.base.sha }} --head=HEAD \
74+
--output-style=stream \
75+
--logger "console;verbosity=minimal"

CONTRIBUTING.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,9 @@ This doesn't mean *ignoring* these concerns — it just means extending the API
104104
The project uses NX for task orchestration. Common commands:
105105

106106
```bash
107-
nx run flowthru:build # Build the solution
108-
nx run flowthru:test # Run all tests with coverage
109-
nx run flowthru:format:csharp # Format code with CSharpier
107+
dotnet build # Confirm solution builds fully
108+
nx run affected -t test # Run all test projects affected by current changes
109+
dotnet test # Run all tests for the project.
110110
```
111111

112112
To run a subset of tests by category:

examples/advanced/KedroSpaceflights.Custom/Program.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ private static void ConfigureServices(IServiceCollection services, string basePa
9494
flowthru
9595
.RegisterFlow(label: "Reporting", flow: ReportingFlow.Create)
9696
.WithDescription("Generates reports and visualizations");
97+
98+
flowthru.ConfigureExecution(opts => opts.MaxDegreeOfParallelism = 8);
9799
});
98100

99101
services.AddLogging(logging =>

examples/advanced/KedroSpaceflightsGQL/Data/_01_Raw/Catalog.Raw.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,10 @@ public partial class Catalog
4646
)
4747
);
4848

49-
// /// <summary>
50-
// /// In-memory flag written by the Ingest flow after all mutations succeed.
51-
// /// Downstream flows depend on this via the DAG.
52-
// /// </summary>
49+
/// <summary>
50+
/// In-memory flag written by the Ingest flow after all mutations succeed.
51+
/// Downstream flows depend on this via the DAG.
52+
/// </summary>
5353
public IItem<bool> GqlDatabaseSeeded =>
5454
CreateItem(() => ItemFactory.Single.Memory<bool>("GqlDatabaseSeeded"));
5555

@@ -62,7 +62,7 @@ public partial class Catalog
6262
CreateItem(
6363
() =>
6464
GqlItemFactory.Enumerable.Query<IGetCompaniesResult, IGetCompanies_Companies>(
65-
label: "Companies",
65+
label: "GQLCompanies",
6666
queryFunc: ct => _client.GetCompanies.ExecuteAsync(ct),
6767
selectData: r => r.Companies,
6868
allowEmptyData: true
@@ -76,7 +76,7 @@ public partial class Catalog
7676
CreateItem(
7777
() =>
7878
GqlItemFactory.Enumerable.Query<IGetShuttlesResult, IGetShuttles_Shuttles>(
79-
label: "Shuttles",
79+
label: "GQLShuttles",
8080
queryFunc: ct => _client.GetShuttles.ExecuteAsync(ct),
8181
selectData: r => r.Shuttles,
8282
allowEmptyData: true
@@ -90,7 +90,7 @@ public partial class Catalog
9090
CreateItem(
9191
() =>
9292
GqlItemFactory.Enumerable.Query<IGetReviewsResult, IGetReviews_Reviews>(
93-
label: "Reviews",
93+
label: "GQLReviews",
9494
queryFunc: ct => _client.GetReviews.ExecuteAsync(ct),
9595
selectData: r => r.Reviews,
9696
allowEmptyData: true

examples/advanced/KedroSpaceflightsGQL/Data/_02_Intermediate/Catalog.Intermediate.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,16 @@ public partial class Catalog
2828
filePath: $"{_basePath}/_02_Intermediate/Datasets/preprocessed_shuttles.parquet"
2929
)
3030
);
31+
32+
/// <summary>
33+
/// Preprocessed review data with parsed decimal rating scores.
34+
/// </summary>
35+
public IItem<IEnumerable<PreprocessedReviewSchema>> PreprocessedReviews =>
36+
CreateItem(
37+
() =>
38+
ItemFactory.Enumerable.Parquet<PreprocessedReviewSchema>(
39+
label: "PreprocessedReviews",
40+
filePath: $"{_basePath}/_02_Intermediate/Datasets/preprocessed_reviews.parquet"
41+
)
42+
);
3143
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
using Flowthru.Core.Abstractions;
2+
3+
namespace KedroSpaceflightsGQL.Data._02_Intermediate.Schemas;
4+
5+
/// <summary>
6+
/// Represents a preprocessed review with a parsed numeric rating.
7+
/// Produced by filtering out records with unparseable rating strings.
8+
/// </summary>
9+
[FlowthruSchema]
10+
public partial record PreprocessedReviewSchema
11+
{
12+
/// <summary>
13+
/// Identifier of the shuttle being reviewed.
14+
/// </summary>
15+
[SerializedLabel("shuttle_id")]
16+
public required string ShuttleId { get; init; }
17+
18+
/// <summary>
19+
/// Review rating score as a decimal value.
20+
/// </summary>
21+
[SerializedLabel("review_scores_rating")]
22+
public required decimal ReviewScoresRating { get; init; }
23+
}

examples/advanced/KedroSpaceflightsGQL/Flows/DataProcessing/DataProcessingFlow.cs

Lines changed: 6 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,28 @@
11
using Flowthru.Core.Flows;
22
using KedroSpaceflightsGQL.Data;
33
using KedroSpaceflightsGQL.Flows.DataProcessing.Steps;
4-
using KedroSpaceflightsGQL.Infra.GqlClient;
54

65
namespace KedroSpaceflightsGQL.Flows.DataProcessing;
76

87
/// <summary>
9-
/// Creates the data processing pipeline that preprocesses raw company and shuttle data
10-
/// and joins it with reviews to create a model input table.
8+
/// Data processing pipeline. Reads typed data from the GQL server and joins it into a
9+
/// model input table. Depends on Ingest completing first via the GqlDatabaseSeeded gate.
1110
/// </summary>
1211
public static class DataProcessingFlow
1312
{
14-
/// <summary>
15-
/// Creates the data processing pipeline.
16-
/// </summary>
17-
/// <param name="catalog">The data catalog containing input and output entries.</param>
18-
/// <returns>A configured pipeline that produces a model input table from raw data sources.</returns>
1913
public static Flow Create(Catalog catalog)
2014
{
2115
return FlowBuilder.CreateFlow(pipeline =>
2216
{
23-
pipeline.AddStep(
24-
label: "PreprocessCompanies",
25-
description: "Cleans and preprocesses raw company data.",
26-
transform: PreprocessCompaniesStep.Create(),
27-
input: catalog.Companies,
28-
output: catalog.PreprocessedCompanies
29-
);
30-
31-
pipeline.AddStep(
32-
label: "PreprocessShuttles",
33-
description: "Cleans and preprocesses raw shuttle data.",
34-
transform: PreprocessShuttlesStep.Create(),
35-
input: catalog.Shuttles,
36-
output: catalog.PreprocessedShuttles
37-
);
38-
3917
pipeline.AddStep(
4018
label: "CreateModelInputTable",
4119
description: """
42-
Joins preprocessed shuttle and company data with review scores to create a
43-
unified model input table.
20+
Joins typed shuttle, company, and review data queried from the GQL server
21+
into a unified model input table. GqlDatabaseSeeded is consumed as an explicit
22+
DAG gate ensuring Ingest has completed before this step executes.
4423
""",
4524
transform: CreateModelInputTableStep.Create(),
46-
input: (catalog.PreprocessedShuttles, catalog.PreprocessedCompanies, catalog.Reviews),
25+
input: (catalog.GqlDatabaseSeeded, catalog.Shuttles, catalog.Companies, catalog.Reviews),
4726
output: catalog.ModelInputTable
4827
);
4928
});

examples/advanced/KedroSpaceflightsGQL/Flows/DataProcessing/Steps/CreateModelInputTableStep.cs

Lines changed: 11 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,39 @@
11
using Flowthru.Core.Steps;
2-
using KedroSpaceflightsGQL.Data._02_Intermediate.Schemas;
32
using KedroSpaceflightsGQL.Data._03_Primary.Schemas;
43
using KedroSpaceflightsGQL.Infra.GqlClient;
54

65
namespace KedroSpaceflightsGQL.Flows.DataProcessing.Steps;
76

87
/// <summary>
9-
/// Joins preprocessed shuttle and company data with review scores to create a unified model input table.
8+
/// Joins typed shuttle, company, and review data from the GQL server into a unified model
9+
/// input table. All fields are already strongly-typed — no parsing required.
10+
/// The <c>bool</c> first input is the GqlDatabaseSeeded gate; it is consumed only to
11+
/// express the DAG dependency on Ingest and is otherwise unused.
1012
/// </summary>
1113
[FlowthruStep]
1214
public static class CreateModelInputTableStep
1315
{
14-
/// <summary>
15-
/// Creates a join function that combines shuttle, company, and review data into a single table for modeling.
16-
/// </summary>
17-
/// <returns>
18-
/// A function that performs inner joins to produce <see cref="ModelInputTableSchema"/> records.
19-
/// Records are filtered to include only reviews with valid numeric scores.
20-
/// </returns>
2116
public static Func<
2217
(
23-
IEnumerable<PreprocessedShuttleSchema>,
24-
IEnumerable<PreprocessedCompanySchema>,
18+
bool,
19+
IEnumerable<IGetShuttles_Shuttles>,
20+
IEnumerable<IGetCompanies_Companies>,
2521
IEnumerable<IGetReviews_Reviews>
2622
),
2723
IEnumerable<ModelInputTableSchema>
2824
> Create()
2925
{
3026
return (input) =>
3127
{
32-
var (shuttles, companies, reviews) = input;
33-
34-
// Parse reviews to have decimal scores
35-
var parsedReviews = reviews
36-
.Select(r => new
37-
{
38-
r.ShuttleId,
39-
Score = decimal.TryParse(r.ReviewScoresRating, out var score) ? score : (decimal?)null,
40-
})
41-
.Where(r => r.Score.HasValue)
42-
.ToList();
28+
var (_, shuttles, companies, reviews) = input;
4329

4430
// Join reviews to shuttles
45-
var ratedShuttles = parsedReviews
31+
var ratedShuttles = reviews
4632
.Join(
4733
shuttles,
4834
r => r.ShuttleId,
4935
s => s.Id,
50-
(r, s) => new { Shuttle = s, ReviewScore = r.Score!.Value }
36+
(r, s) => new { Shuttle = s, ReviewScore = r.ReviewScoresRating }
5137
)
5238
.ToList();
5339

@@ -74,7 +60,7 @@ public static Func<
7460
ReviewScoresRating = rs.ReviewScore,
7561
}
7662
)
77-
.ToList(); // Materialize query to ensure LINQ execution completes
63+
.ToList();
7864

7965
return modelInputTable;
8066
};

examples/advanced/KedroSpaceflightsGQL/Flows/DataProcessing/Steps/PreprocessCompaniesStep.cs

Lines changed: 0 additions & 89 deletions
This file was deleted.

0 commit comments

Comments
 (0)