Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
7 changes: 7 additions & 0 deletions .config/dotnet-tools.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@
"dotnet-script"
],
"rollForward": false
},
"docfx": {
"version": "2.78.5",
"commands": [
"docfx"
],
"rollForward": false
}
}
}
8 changes: 4 additions & 4 deletions .vscode/flowthru.code-snippets
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@
"scope": "csharp",
"prefix": "_pipeline",
"body": [
"using Flowthru.Pipelines;",
"using Flowthru.Flows;",
"using $1",
"",
"public static class ${RELATIVE_FILEPATH/.*?(\\w+).cs$/$1/}",
"{",
" public static Pipeline Create(Catalog catalog)",
" {",
" return PipelineBuilder.CreatePipeline(pipeline =>",
" return FlowBuilder.CreateFlow(pipeline =>",
" {",
" pipeline.AddNode(",
" pipeline.AddStep(",
" label: \"$2\",",
" description: \"\"\"",
" $3",
Expand All @@ -34,7 +34,7 @@
],
"description": "Creates a basic Flowthru pipeline structure."
},
"FlowthruNode": {
"FlowthruStep": {
"scope": "csharp",
"prefix": "_node",
"body": [
Expand Down
4 changes: 2 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ Flowthru handles these through an **effect type** called `FlowIO<T>`. If you're
The key runtime guarantees:

- **All I/O is lazy and explicit.** Side effects cannot be accidentally dropped or silently ignored.
- **Errors are captured, never swallowed.** Node failures propagate to structured pipeline results. Silent `catch {}` blocks are a bug.
- **Nodes are isolated.** A failing node halts execution and reports which node failed and why — partial silent failures are not possible.
- **Errors are captured, never swallowed.** Step failures propagate to structured pipeline results. Silent `catch {}` blocks are a bug.
- **Steps are isolated.** A failing node halts execution and reports which node failed and why — partial silent failures are not possible.

## Decision Rules for Contributors

Expand Down
5 changes: 5 additions & 0 deletions Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
<RestorePackagesWithLockFile>false</RestorePackagesWithLockFile>
</PropertyGroup>

<!-- Generate XML documentation for src/ projects (consumed by DocFX) -->
<PropertyGroup Condition="$(MSBuildProjectDirectory.Contains('src'))">
<GenerateDocumentationFile>true</GenerateDocumentationFile>
</PropertyGroup>

<!-- Centralized NuGet package configuration for all src/ projects -->
<PropertyGroup Condition="$(MSBuildProjectDirectory.Contains('src'))">
<Version>0.1.33</Version>
Expand Down
4 changes: 2 additions & 2 deletions docs/CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ Consider these examples:
- **"How do I start writing my first pipeline?"** → This is someone learning. They need a tutorial.
- **"How can I read data from a database instead of local files?"** → This is someone working with a specific need. They need a guide.
- **"Why does Flowthru use so many types?"** → This is someone studying the framework's design. They need an explanation.
- **"What parameters does `CatalogEntry` accept?"** → This is someone looking up technical details. They need reference documentation.
- **"What parameters does `Item` accept?"** → This is someone looking up technical details. They need reference documentation.

The same topic can yield different documentation depending on the question:

Expand Down Expand Up @@ -156,7 +156,7 @@ Let's say you want to document "catalog entries." Start by listing questions:
- "How do I create my first catalog entry?" → Tutorial
- "How do I configure a catalog entry for Parquet files?" → Guide
- "Why are catalog entries properties rather than string keys?" → Explanation
- "What methods are available on `ICatalogEntry<T>`?" → Reference
- "What methods are available on `IItem<T>`?" → Reference

Each question becomes a separate piece of documentation in its appropriate category. Together, they serve users at every stage of their journey with catalog entries.

Expand Down
12 changes: 6 additions & 6 deletions docs/explanation/advanced/storage-composition.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,11 @@ public class EFCoreStorageAdapter<T> : IStorageAdapter<IEnumerable<T>>

## Catalog-Level Constraint Narrowing

After adapter creation, pipeline authors can further constrain catalog entries using `CatalogEntry<T>.Constrain()`:
After adapter creation, pipeline authors can further constrain catalog entries using `Item<T>.Constrain()`:

```csharp
public ICatalogEntry<IEnumerable<Company>> Companies => GetOrCreateEntry(() =>
CatalogEntries.Enumerable.Csv<Company>("companies", "data/companies.csv")
public IItem<IEnumerable<Company>> Companies => GetOrCreateEntry(() =>
ItemFactory.Enumerable.Csv<Company>("companies", "data/companies.csv")
.Constrain(traits => traits with { CanWrite = false })
);
```
Expand Down Expand Up @@ -201,7 +201,7 @@ The trait system solves all three:

- **Composable**: Eight independent boolean flags
- **Queryable**: `adapter.Traits.CanWrite` checked before operation
- **Fail-fast**: `CatalogEntry.Constrain()` validates at initialization
- **Fail-fast**: `Item.Constrain()` validates at initialization

The old interfaces remain as `[Obsolete]` for backward compatibility but have no effect on pipeline behavior.

Expand All @@ -211,9 +211,9 @@ The storage architecture is implemented in:

- `src/core/Flowthru/Data/Capabilities/StorageTraits.cs` — the trait record
- `src/core/Flowthru/Data/Storage/ComposedStorageAdapter.cs` — trait merging logic
- `src/core/Flowthru/Data/CatalogEntry.cs` — `Constrain()` with ratchet validation
- `src/core/Flowthru/Data/Item.cs` — `Constrain()` with ratchet validation
- `src/core/Flowthru/Data/Storage/Medium/` — medium implementations with traits
- `src/core/Flowthru/Data/Storage/Format/` — format serializers with traits

Catalog entry factories (`CatalogEntries.Enumerable.Csv<T>(...)`) construct composed adapters with sensible defaults. Extension authors creating custom adapters should declare traits that accurately reflect their capabilities.
Catalog entry factories (`ItemFactory.Enumerable.Csv<T>(...)`) construct composed adapters with sensible defaults. Extension authors creating custom adapters should declare traits that accurately reflect their capabilities.

32 changes: 16 additions & 16 deletions docs/explanation/anatomy-of-a-pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ In this example:
// Data/_01_Raw/Catalog.Raw.cs
public partial class Catalog
{
public ICatalogEntry<IEnumerable<IrisRawSchema>> IrisRaw =>
public IItem<IEnumerable<IrisRawSchema>> IrisRaw =>
GetOrCreateEntry(() =>
CatalogEntries.Enumerable.Csv<IrisRawSchema>(
ItemFactory.Enumerable.Csv<IrisRawSchema>(
label: "IrisRaw",
filePath: $"{_basePath}/_01_Raw/Datasets/iris.csv"
)
Expand All @@ -95,9 +95,9 @@ In this example:
// Data/_04_Feature/Catalog.Feature.cs
public partial class Catalog
{
public ICatalogEntry<IEnumerable<IrisFeatureSchema>> IrisFeatures =>
public IItem<IEnumerable<IrisFeatureSchema>> IrisFeatures =>
GetOrCreateEntry(() =>
CatalogEntries.Enumerable.Parquet<IrisFeatureSchema>(
ItemFactory.Enumerable.Parquet<IrisFeatureSchema>(
label: "IrisFeatures",
filePath: $"{_basePath}/_04_Feature/Datasets/iris_features.parquet"
)
Expand Down Expand Up @@ -169,13 +169,13 @@ Pipelines are made up of **nodes** — transformation functions that are built t

1. When writing a node, you don't need to worry about how it's connecting to other nodes — you just need to make sure it inputs the schema, and outputs the schema

### Nodes
### Steps

Nodes are simply functions. Easy! The **only** purpose of a node is to take in data that has one schema, and convert it to data that has another schema.
Steps are simply functions. Easy! The **only** purpose of a node is to take in data that has one schema, and convert it to data that has another schema.

```csharp
// Pipelines/DataEngineering/Nodes/SplitAndEncodeNode.cs
public static class SplitAndEncodeNode
// Pipelines/DataEngineering/Steps/SplitAndEncodeStep.cs
public static class SplitAndEncodeStep
{
public static Func<
IEnumerable<IrisRawSchema>, // Input Schema
Expand All @@ -194,8 +194,8 @@ public static class SplitAndEncodeNode

Key points:

- Nodes are a *contract*: that data for the node will **always** come in as the input schemas, and **always** come out as the output schemas.
- Nodes can have any number of inputs, and any number of outputs — as long as they're defined in the input and output schemas, you're not limited to just one-in, one-out.
- Steps are a *contract*: that data for the node will **always** come in as the input schemas, and **always** come out as the output schemas.
- Steps can have any number of inputs, and any number of outputs — as long as they're defined in the input and output schemas, you're not limited to just one-in, one-out.


### Pipelines
Expand All @@ -214,11 +214,11 @@ public static class DataEngineeringPipeline
{
public static Pipeline Create(Catalog catalog, Params parameters)
{
return PipelineBuilder.CreatePipeline(pipeline =>
return FlowBuilder.CreateFlow(pipeline =>
{
pipeline.AddNode(
pipeline.AddStep(
label: "SplitAndEncode", // Unique label for this node in the pipeline
transform: SplitAndEncodeNode.Create(),
transform: SplitAndEncodeStep.Create(),
input: catalog.IrisRaw,
output: catalog.IrisFeatures
);
Expand All @@ -229,7 +229,7 @@ public static class DataEngineeringPipeline

Key points:

- **Nodes are never directly connected to each other**: Nodes always take in Catalog entries, and output Catalog entries — *never* directly to each other.
- **Steps are never directly connected to each other**: Steps always take in Catalog entries, and output Catalog entries — *never* directly to each other.
- **Order doesn't matter.** A node in the pipeline is **only** ever concerned about its input data and output data. Flowthru handles the order when you run the pipeline: as long as the data is available, or generated by another node, your pipeline will run.

## Entry Point
Expand All @@ -241,7 +241,7 @@ private static void ConfigureServices(IServiceCollection services, string basePa
{
services.AddFlowthru(flowthru =>
{
flowthru.UseCatalog(_ => new Catalog(basePath: "Data")));
flowthru.RegisterCatalog(_ => new Catalog(basePath: "Data")));

flowthru.RegisterPipeline<Catalog>(
label: "DataEngineering",
Expand All @@ -256,4 +256,4 @@ private static void ConfigureServices(IServiceCollection services, string basePa
}
```

And that's it! This finishes the pipeline anatomy. At this point, you have Catalog entries, connected with Nodes in your Pipelines — everything you need to find new and creative ways to organize, analyze, and report on your data!
And that's it! This finishes the pipeline anatomy. At this point, you have Catalog entries, connected with Steps in your Pipelines — everything you need to find new and creative ways to organize, analyze, and report on your data!
22 changes: 11 additions & 11 deletions docs/guides/advanced/container-deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Deploy a Flowthru pipeline as a standalone container image for execution in envi
A containerized pipeline replaces `FlowthruCli` with a minimal `Program.cs` that owns its own DI container. The key difference: no CLI argument parsing, no filesystem-based configuration.

```csharp
using Flowthru.Pipelines;
using Flowthru.Flows;
using Flowthru.Services;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
Expand All @@ -26,7 +26,7 @@ services.AddLogging(logging =>
services.AddFlowthru(flowthru =>
{
flowthru
.UseCatalog<MyCatalog>()
.RegisterCatalog<MyCatalog>()
.RegisterPipeline<MyCatalog>("Ingest", catalog => IngestPipeline.Create(catalog))
.RegisterPipeline<MyCatalog>("Transform", catalog => TransformPipeline.Create(catalog));

Expand All @@ -50,19 +50,19 @@ return result.Success ? 0 : 1;

// --- helpers ---

static PipelineSliceStrategy BuildSliceStrategy()
static FlowSliceStrategy BuildSliceStrategy()
{
static HashSet<string>? ParseCsv(string? value) =>
string.IsNullOrWhiteSpace(value) ? null : new(value.Split(',', StringSplitOptions.RemoveEmptyEntries));

return new PipelineSliceStrategy
return new FlowSliceStrategy
{
Pipelines = ParseCsv(Environment.GetEnvironmentVariable("FLOWTHRU_PIPELINES")),
FromNodes = ParseCsv(Environment.GetEnvironmentVariable("FLOWTHRU_FROM_NODES")),
ToNodes = ParseCsv(Environment.GetEnvironmentVariable("FLOWTHRU_TO_NODES")),
FromSteps = ParseCsv(Environment.GetEnvironmentVariable("FLOWTHRU_FROM_NODES")),
ToSteps = ParseCsv(Environment.GetEnvironmentVariable("FLOWTHRU_TO_NODES")),
FromData = ParseCsv(Environment.GetEnvironmentVariable("FLOWTHRU_FROM_DATA")),
ToData = ParseCsv(Environment.GetEnvironmentVariable("FLOWTHRU_TO_DATA")),
OnlyNodes = ParseCsv(Environment.GetEnvironmentVariable("FLOWTHRU_ONLY_NODES")),
OnlySteps = ParseCsv(Environment.GetEnvironmentVariable("FLOWTHRU_ONLY_NODES")),
};
}
```
Expand All @@ -87,7 +87,7 @@ services.AddFlowthru(flowthru =>
{
// No UseConfiguration() call — the IConfiguration above is already registered
flowthru
.UseCatalog<MyCatalog>()
.RegisterCatalog<MyCatalog>()
.RegisterPipeline<MyCatalog>("Ingest", catalog => IngestPipeline.Create(catalog));
});
```
Expand Down Expand Up @@ -152,7 +152,7 @@ docker run \

## Inspecting Results

`ExecutePipelineAsync` returns a `PipelineResult` — a structured value object with `Success`, `ExecutionTime`, per-node `NodeResults` (including individual timing, I/O counts, and exceptions), and an optional top-level `Exception`. Serialize it to structured output for your runtime's observability:
`ExecutePipelineAsync` returns a `FlowResult` — a structured value object with `Success`, `ExecutionTime`, per-node `StepResults` (including individual timing, I/O counts, and exceptions), and an optional top-level `Exception`. Serialize it to structured output for your runtime's observability:

```csharp
var result = await flowthru.ExecutePipelineAsync(options, cancellationToken: cts.Token);
Expand All @@ -163,9 +163,9 @@ if (!result.Success)
result.ExecutionTime.TotalSeconds,
result.Exception?.Message);

foreach (var (name, node) in result.NodeResults.Where(n => !n.Value.Success))
foreach (var (name, node) in result.StepResults.Where(n => !n.Value.Success))
{
logger.LogError(" Node {Node} failed: {Error}", name, node.Exception?.Message);
logger.LogError(" Step {Step} failed: {Error}", name, node.Exception?.Message);
}
}
```
Expand Down
18 changes: 9 additions & 9 deletions docs/guides/advanced/metadata-providers.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ public class DashboardMetadataProvider : IMetadataProvider
{
var payload = new
{
PipelineName = dag.PipelineName,
NodeCount = dag.Nodes.Count,
FlowName = dag.FlowName,
StepCount = dag.Steps.Count,
EdgeCount = dag.Edges.Count,
Timestamp = DateTime.UtcNow
};
Expand Down Expand Up @@ -85,14 +85,14 @@ using Flowthru.Meta.Providers;

services.AddFlowthru(flowthru =>
{
flowthru.UseCatalog(_ => new MyCatalog());
flowthru.UsePipelines(_ => myPipelines);
flowthru.RegisterCatalog(_ => new MyCatalog());
flowthru.RegisterPipelines(_ => myPipelines);

flowthru.ConfigureMetadata(meta =>
{
meta.AddProvider<JsonMetadataProvider, JsonMetadataProviderBuilder>(json => json
.WithOutputDirectory("metadata")
.WithFilenameTemplate("dag-{PipelineName}-{Timestamp}")
.WithFilenameTemplate("dag-{FlowName}-{Timestamp}")
.WithTimestamp("yyyyMMdd-HHmmss")
.UseCompactFormat());

Expand Down Expand Up @@ -143,9 +143,9 @@ var singlePipelineDag = service.GetDagMetadata(pipelineName: "DataEngineering");

// Get DAG with slicing applied
var slicedDag = service.GetDagMetadata(
sliceStrategy: new PipelineSliceStrategy
sliceStrategy: new FlowSliceStrategy
{
ToNodes = new HashSet<string> { "TransformNode" }
ToSteps = new HashSet<string> { "TransformStep" }
}
);
```
Expand All @@ -159,7 +159,7 @@ Useful for tooling, tests, or debugging pipeline structure before execution.
```csharp
meta.AddProvider<JsonMetadataProvider, JsonMetadataProviderBuilder>(json => json
.WithOutputDirectory("metadata")
.WithFilenameTemplate("dag-{PipelineName}-{Timestamp}")
.WithFilenameTemplate("dag-{FlowName}-{Timestamp}")
.WithTimestamp("yyyyMMdd-HHmmss")
.UseCompactFormat());
```
Expand All @@ -170,6 +170,6 @@ meta.AddProvider<JsonMetadataProvider, JsonMetadataProviderBuilder>(json => json
meta.AddProvider<MermaidMetadataProvider, MermaidMetadataProviderBuilder>(mermaid => mermaid
.WithOutputDirectory("metadata")
.WithDirection(MermaidFlowchartDirection.LeftToRight)
.WithActiveNodeColor("#90EE90")
.WithActiveStepColor("#90EE90")
.WithActiveDataColor("#ADD8E6"));
```
Loading
Loading