Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions .github/workflows/pr-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
name: PR Tests

on:
pull_request:
types: [opened, synchronize, reopened, ready_for_review]
paths-ignore:
- "**.md"
- "docs/**"
- ".github/instructions/**"

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
test:
runs-on: ubuntu-latest
if: github.event.pull_request.draft == false

permissions:
contents: read

steps:
- name: Checkout code
uses: actions/checkout@v4
with:
fetch-depth: 0

- name: Setup .NET
uses: actions/setup-dotnet@v4
with:
dotnet-version: "10.0.x"
dotnet-quality: "preview"
env:
DOTNET_SKIP_FIRST_TIME_EXPERIENCE: "true"
DOTNET_NOLOGO: "true"
DOTNET_CLI_TELEMETRY_OPTOUT: "true"

- name: Setup Node.js
uses: actions/setup-node@v4
with:
node-version: "22"

- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: "3.10"

- name: Setup UV
uses: astral-sh/setup-uv@v5

- name: Setup pnpm
uses: pnpm/action-setup@v4
with:
version: 10.6.3

- name: Cache NuGet packages
uses: actions/cache@v4
with:
path: ~/.nuget/packages
key: ${{ runner.os }}-nuget-${{ hashFiles('**/packages.lock.json', '**/*.csproj') }}
restore-keys: |
${{ runner.os }}-nuget-

- name: Install Node dependencies
run: pnpm install

- name: Run affected tests
env:
CI: "true"
run: |
pnpm nx affected -t test \
--base=${{ github.event.pull_request.base.sha }} --head=HEAD \
--output-style=stream \
--logger "console;verbosity=minimal"
6 changes: 3 additions & 3 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ This doesn't mean *ignoring* these concerns — it just means extending the API
The project uses NX for task orchestration. Common commands:

```bash
nx run flowthru:build # Build the solution
nx run flowthru:test # Run all tests with coverage
nx run flowthru:format:csharp # Format code with CSharpier
dotnet build # Confirm solution builds fully
nx run affected -t test # Run all test projects affected by current changes
dotnet test # Run all tests for the project.
```

To run a subset of tests by category:
Expand Down
2 changes: 2 additions & 0 deletions examples/advanced/KedroSpaceflights.Custom/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ private static void ConfigureServices(IServiceCollection services, string basePa
flowthru
.RegisterFlow(label: "Reporting", flow: ReportingFlow.Create)
.WithDescription("Generates reports and visualizations");

flowthru.ConfigureExecution(opts => opts.MaxDegreeOfParallelism = 8);
});

services.AddLogging(logging =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ public partial class Catalog
)
);

// /// <summary>
// /// In-memory flag written by the Ingest flow after all mutations succeed.
// /// Downstream flows depend on this via the DAG.
// /// </summary>
/// <summary>
/// In-memory flag written by the Ingest flow after all mutations succeed.
/// Downstream flows depend on this via the DAG.
/// </summary>
public IItem<bool> GqlDatabaseSeeded =>
CreateItem(() => ItemFactory.Single.Memory<bool>("GqlDatabaseSeeded"));

Expand All @@ -62,7 +62,7 @@ public partial class Catalog
CreateItem(
() =>
GqlItemFactory.Enumerable.Query<IGetCompaniesResult, IGetCompanies_Companies>(
label: "Companies",
label: "GQLCompanies",
queryFunc: ct => _client.GetCompanies.ExecuteAsync(ct),
selectData: r => r.Companies,
allowEmptyData: true
Expand All @@ -76,7 +76,7 @@ public partial class Catalog
CreateItem(
() =>
GqlItemFactory.Enumerable.Query<IGetShuttlesResult, IGetShuttles_Shuttles>(
label: "Shuttles",
label: "GQLShuttles",
queryFunc: ct => _client.GetShuttles.ExecuteAsync(ct),
selectData: r => r.Shuttles,
allowEmptyData: true
Expand All @@ -90,7 +90,7 @@ public partial class Catalog
CreateItem(
() =>
GqlItemFactory.Enumerable.Query<IGetReviewsResult, IGetReviews_Reviews>(
label: "Reviews",
label: "GQLReviews",
queryFunc: ct => _client.GetReviews.ExecuteAsync(ct),
selectData: r => r.Reviews,
allowEmptyData: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,16 @@ public partial class Catalog
filePath: $"{_basePath}/_02_Intermediate/Datasets/preprocessed_shuttles.parquet"
)
);

/// <summary>
/// Preprocessed review data with parsed decimal rating scores.
/// </summary>
public IItem<IEnumerable<PreprocessedReviewSchema>> PreprocessedReviews =>
CreateItem(
() =>
ItemFactory.Enumerable.Parquet<PreprocessedReviewSchema>(
label: "PreprocessedReviews",
filePath: $"{_basePath}/_02_Intermediate/Datasets/preprocessed_reviews.parquet"
)
);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using Flowthru.Core.Abstractions;

namespace KedroSpaceflightsGQL.Data._02_Intermediate.Schemas;

/// <summary>
/// Represents a preprocessed review with a parsed numeric rating.
/// Produced by filtering out records with unparseable rating strings.
/// </summary>
[FlowthruSchema]
public partial record PreprocessedReviewSchema
{
/// <summary>
/// Identifier of the shuttle being reviewed.
/// </summary>
[SerializedLabel("shuttle_id")]
public required string ShuttleId { get; init; }

/// <summary>
/// Review rating score as a decimal value.
/// </summary>
[SerializedLabel("review_scores_rating")]
public required decimal ReviewScoresRating { get; init; }
}
Original file line number Diff line number Diff line change
@@ -1,49 +1,28 @@
using Flowthru.Core.Flows;
using KedroSpaceflightsGQL.Data;
using KedroSpaceflightsGQL.Flows.DataProcessing.Steps;
using KedroSpaceflightsGQL.Infra.GqlClient;

namespace KedroSpaceflightsGQL.Flows.DataProcessing;

/// <summary>
/// Creates the data processing pipeline that preprocesses raw company and shuttle data
/// and joins it with reviews to create a model input table.
/// Data processing pipeline. Reads typed data from the GQL server and joins it into a
/// model input table. Depends on Ingest completing first via the GqlDatabaseSeeded gate.
/// </summary>
public static class DataProcessingFlow
{
/// <summary>
/// Creates the data processing pipeline.
/// </summary>
/// <param name="catalog">The data catalog containing input and output entries.</param>
/// <returns>A configured pipeline that produces a model input table from raw data sources.</returns>
public static Flow Create(Catalog catalog)
{
return FlowBuilder.CreateFlow(pipeline =>
{
pipeline.AddStep(
label: "PreprocessCompanies",
description: "Cleans and preprocesses raw company data.",
transform: PreprocessCompaniesStep.Create(),
input: catalog.Companies,
output: catalog.PreprocessedCompanies
);

pipeline.AddStep(
label: "PreprocessShuttles",
description: "Cleans and preprocesses raw shuttle data.",
transform: PreprocessShuttlesStep.Create(),
input: catalog.Shuttles,
output: catalog.PreprocessedShuttles
);

pipeline.AddStep(
label: "CreateModelInputTable",
description: """
Joins preprocessed shuttle and company data with review scores to create a
unified model input table.
Joins typed shuttle, company, and review data queried from the GQL server
into a unified model input table. GqlDatabaseSeeded is consumed as an explicit
DAG gate ensuring Ingest has completed before this step executes.
""",
transform: CreateModelInputTableStep.Create(),
input: (catalog.PreprocessedShuttles, catalog.PreprocessedCompanies, catalog.Reviews),
input: (catalog.GqlDatabaseSeeded, catalog.Shuttles, catalog.Companies, catalog.Reviews),
output: catalog.ModelInputTable
);
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,53 +1,39 @@
using Flowthru.Core.Steps;
using KedroSpaceflightsGQL.Data._02_Intermediate.Schemas;
using KedroSpaceflightsGQL.Data._03_Primary.Schemas;
using KedroSpaceflightsGQL.Infra.GqlClient;

namespace KedroSpaceflightsGQL.Flows.DataProcessing.Steps;

/// <summary>
/// Joins preprocessed shuttle and company data with review scores to create a unified model input table.
/// Joins typed shuttle, company, and review data from the GQL server into a unified model
/// input table. All fields are already strongly-typed — no parsing required.
/// The <c>bool</c> first input is the GqlDatabaseSeeded gate; it is consumed only to
/// express the DAG dependency on Ingest and is otherwise unused.
/// </summary>
[FlowthruStep]
public static class CreateModelInputTableStep
{
/// <summary>
/// Creates a join function that combines shuttle, company, and review data into a single table for modeling.
/// </summary>
/// <returns>
/// A function that performs inner joins to produce <see cref="ModelInputTableSchema"/> records.
/// Records are filtered to include only reviews with valid numeric scores.
/// </returns>
public static Func<
(
IEnumerable<PreprocessedShuttleSchema>,
IEnumerable<PreprocessedCompanySchema>,
bool,
IEnumerable<IGetShuttles_Shuttles>,
IEnumerable<IGetCompanies_Companies>,
IEnumerable<IGetReviews_Reviews>
),
IEnumerable<ModelInputTableSchema>
> Create()
{
return (input) =>
{
var (shuttles, companies, reviews) = input;

// Parse reviews to have decimal scores
var parsedReviews = reviews
.Select(r => new
{
r.ShuttleId,
Score = decimal.TryParse(r.ReviewScoresRating, out var score) ? score : (decimal?)null,
})
.Where(r => r.Score.HasValue)
.ToList();
var (_, shuttles, companies, reviews) = input;

// Join reviews to shuttles
var ratedShuttles = parsedReviews
var ratedShuttles = reviews
.Join(
shuttles,
r => r.ShuttleId,
s => s.Id,
(r, s) => new { Shuttle = s, ReviewScore = r.Score!.Value }
(r, s) => new { Shuttle = s, ReviewScore = r.ReviewScoresRating }
)
.ToList();

Expand All @@ -74,7 +60,7 @@ public static Func<
ReviewScoresRating = rs.ReviewScore,
}
)
.ToList(); // Materialize query to ensure LINQ execution completes
.ToList();

return modelInputTable;
};
Expand Down

This file was deleted.

Loading
Loading