Skip to content

Commit 1bd3624

Browse files
committed
fix: finalize transition
1 parent 6681e0a commit 1bd3624

6 files changed

Lines changed: 180 additions & 73 deletions

File tree

examples/advanced/RetailDataMultipipeline/Program.cs

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,33 @@ public class Program
2121
public static Task<int> Main(string[] args) =>
2222
FlowthruCli.RunStandaloneAsync(
2323
args,
24-
services => ConfigureServices(services, Directory.GetCurrentDirectory())
24+
services =>
25+
ConfigureServices(
26+
services,
27+
Directory.GetCurrentDirectory(),
28+
AppDomain.CurrentDomain.BaseDirectory
29+
)
2530
);
2631

27-
public static IServiceProvider ConfigureServices(string? basePath = null)
32+
public static IServiceProvider ConfigureServices(
33+
string? basePath = null,
34+
string? outputPath = null
35+
)
2836
{
2937
var services = new ServiceCollection();
30-
ConfigureServices(services, basePath ?? Directory.GetCurrentDirectory());
38+
ConfigureServices(
39+
services,
40+
basePath ?? Directory.GetCurrentDirectory(),
41+
outputPath ?? AppDomain.CurrentDomain.BaseDirectory
42+
);
3143
return services.BuildServiceProvider();
3244
}
3345

34-
private static void ConfigureServices(IServiceCollection services, string basePath)
46+
private static void ConfigureServices(
47+
IServiceCollection services,
48+
string basePath,
49+
string outputPath
50+
)
3551
{
3652
services.AddFlowthru(flowthru =>
3753
{
@@ -41,7 +57,8 @@ private static void ConfigureServices(IServiceCollection services, string basePa
4157
flowthru.UsePython(python =>
4258
{
4359
python.ModuleSearchPaths.Add(basePath);
44-
python.ModuleSearchPaths.Add(AppDomain.CurrentDomain.BaseDirectory);
60+
python.ModuleSearchPaths.Add(outputPath);
61+
python.VenvPath = outputPath;
4562
});
4663

4764
var dataPath = Path.Combine(basePath, "Data");

examples/advanced/SpaceflightsPythonEFCore/Program.cs

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,33 @@ public class Program
4343
public static Task<int> Main(string[] args) =>
4444
FlowthruCli.RunStandaloneAsync(
4545
args,
46-
services => ConfigureServices(services, Directory.GetCurrentDirectory())
46+
services =>
47+
ConfigureServices(
48+
services,
49+
Directory.GetCurrentDirectory(),
50+
AppDomain.CurrentDomain.BaseDirectory
51+
)
4752
);
4853

49-
public static IServiceProvider ConfigureServices(string? basePath = null)
54+
public static IServiceProvider ConfigureServices(
55+
string? basePath = null,
56+
string? outputPath = null
57+
)
5058
{
5159
var services = new ServiceCollection();
52-
ConfigureServices(services, basePath ?? Directory.GetCurrentDirectory());
60+
ConfigureServices(
61+
services,
62+
basePath ?? Directory.GetCurrentDirectory(),
63+
outputPath ?? AppDomain.CurrentDomain.BaseDirectory
64+
);
5365
return services.BuildServiceProvider();
5466
}
5567

56-
private static void ConfigureServices(IServiceCollection services, string basePath)
68+
private static void ConfigureServices(
69+
IServiceCollection services,
70+
string basePath,
71+
string outputPath
72+
)
5773
{
5874
services.AddLogging(logging =>
5975
{
@@ -91,7 +107,8 @@ private static void ConfigureServices(IServiceCollection services, string basePa
91107
flowthru.UsePython(python =>
92108
{
93109
python.ModuleSearchPaths.Add(basePath);
94-
python.ModuleSearchPaths.Add(AppDomain.CurrentDomain.BaseDirectory);
110+
python.ModuleSearchPaths.Add(outputPath);
111+
python.VenvPath = outputPath;
95112
});
96113

97114
// Resolve the Python executor before pipeline registration (Phase 6 workaround).

examples/starter/KedroIrisPython/Program.cs

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,24 +39,40 @@ public class Program
3939
public static Task<int> Main(string[] args) =>
4040
FlowthruCli.RunStandaloneAsync(
4141
args,
42-
services => ConfigureServices(services, Directory.GetCurrentDirectory())
42+
services =>
43+
ConfigureServices(
44+
services,
45+
Directory.GetCurrentDirectory(),
46+
AppDomain.CurrentDomain.BaseDirectory
47+
)
4348
);
4449

4550
/// <summary>
4651
/// Configures services for the application. Used by test infrastructure.
4752
/// </summary>
4853
/// <param name="basePath">Optional base path for data files (defaults to current directory)</param>
49-
public static IServiceProvider ConfigureServices(string? basePath = null)
54+
public static IServiceProvider ConfigureServices(
55+
string? basePath = null,
56+
string? outputPath = null
57+
)
5058
{
5159
var services = new ServiceCollection();
52-
ConfigureServices(services, basePath ?? Directory.GetCurrentDirectory());
60+
ConfigureServices(
61+
services,
62+
basePath ?? Directory.GetCurrentDirectory(),
63+
outputPath ?? AppDomain.CurrentDomain.BaseDirectory
64+
);
5365
return services.BuildServiceProvider();
5466
}
5567

5668
/// <summary>
5769
/// Shared service configuration logic.
5870
/// </summary>
59-
private static void ConfigureServices(IServiceCollection services, string basePath)
71+
private static void ConfigureServices(
72+
IServiceCollection services,
73+
string basePath,
74+
string outputPath
75+
)
6076
{
6177
// Add logging first (required by PythonRuntime)
6278
services.AddLogging(logging =>
@@ -84,14 +100,12 @@ private static void ConfigureServices(IServiceCollection services, string basePa
84100
// Configure Python runtime
85101
flowthru.UsePython(python =>
86102
{
87-
// Add project root to sys.path (for importing from Pipelines/)
103+
// Project root: makes Pipelines/ importable as a Python module tree
88104
python.ModuleSearchPaths.Add(basePath);
89-
90-
// Add output directory for flowthru Python package (contains @step decorator)
91-
var outputDir = AppDomain.CurrentDomain.BaseDirectory;
92-
python.ModuleSearchPaths.Add(outputDir);
93-
94-
// Note: PythonRuntime auto-discovers .venv in AppContext.BaseDirectory via uv sync
105+
// Output directory: contains the flowthru package (@step decorator)
106+
python.ModuleSearchPaths.Add(outputPath);
107+
// Use this example's own output directory for venv isolation
108+
python.VenvPath = outputPath;
95109
});
96110

97111
// Phase 6 workaround: Resolve Python dependencies for pipeline registration

examples/starter/KedroSpaceflightsPython/Program.cs

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,24 +40,40 @@ public class Program
4040
public static Task<int> Main(string[] args) =>
4141
FlowthruCli.RunStandaloneAsync(
4242
args,
43-
services => ConfigureServices(services, Directory.GetCurrentDirectory())
43+
services =>
44+
ConfigureServices(
45+
services,
46+
Directory.GetCurrentDirectory(),
47+
AppDomain.CurrentDomain.BaseDirectory
48+
)
4449
);
4550

4651
/// <summary>
4752
/// Configures services for the application. Used by test infrastructure.
4853
/// </summary>
4954
/// <param name="basePath">Optional base path for data files (defaults to current directory)</param>
50-
public static IServiceProvider ConfigureServices(string? basePath = null)
55+
public static IServiceProvider ConfigureServices(
56+
string? basePath = null,
57+
string? outputPath = null
58+
)
5159
{
5260
var services = new ServiceCollection();
53-
ConfigureServices(services, basePath ?? Directory.GetCurrentDirectory());
61+
ConfigureServices(
62+
services,
63+
basePath ?? Directory.GetCurrentDirectory(),
64+
outputPath ?? AppDomain.CurrentDomain.BaseDirectory
65+
);
5466
return services.BuildServiceProvider();
5567
}
5668

5769
/// <summary>
5870
/// Shared service configuration logic.
5971
/// </summary>
60-
private static void ConfigureServices(IServiceCollection services, string basePath)
72+
private static void ConfigureServices(
73+
IServiceCollection services,
74+
string basePath,
75+
string outputPath
76+
)
6177
{
6278
// Add logging first (required by PythonRuntime)
6379
services.AddLogging(logging =>
@@ -86,15 +102,12 @@ private static void ConfigureServices(IServiceCollection services, string basePa
86102
// Configure Python runtime
87103
flowthru.UsePython(python =>
88104
{
89-
// Add project root to sys.path (for importing from Pipelines/)
105+
// Project root: makes Pipelines/ importable as a Python module tree
90106
python.ModuleSearchPaths.Add(basePath);
91-
92-
// Add output directory for flowthru Python package (contains @step decorator)
93-
var outputDir = AppDomain.CurrentDomain.BaseDirectory;
94-
python.ModuleSearchPaths.Add(outputDir);
95-
96-
// Note: PythonRuntime auto-discovers .venv in AppContext.BaseDirectory via uv sync
97-
// No need to manually add site-packages path here
107+
// Output directory: contains the flowthru package (@step decorator)
108+
python.ModuleSearchPaths.Add(outputPath);
109+
// Use this example's own output directory for venv isolation
110+
python.VenvPath = outputPath;
98111
});
99112

100113
// Phase 6 workaround: Resolve Python dependencies for pipeline registration

src/extensions/Flowthru.Extensions.Python/Runtime/PythonRuntimeOptions.cs

Lines changed: 75 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
using System.Collections.Concurrent;
2+
13
namespace Flowthru.Extensions.Python.Runtime;
24

35
/// <summary>
@@ -267,12 +269,20 @@ public string GetResolvedPythonDll()
267269
/// </remarks>
268270
public string GetResolvedPythonExe()
269271
{
270-
// 1. Explicit VenvPath
272+
// 1. Explicit VenvPath — try as existing .venv, then as project dir for uv sync
271273
if (!string.IsNullOrWhiteSpace(VenvPath))
272274
{
273275
var exe = FindPythonExeInVenv(VenvPath);
274276
if (exe != null)
275277
return exe;
278+
279+
var uvVenvPath = EnsureVenvViaUv(VenvPath);
280+
if (uvVenvPath != null)
281+
{
282+
exe = FindPythonExeInVenv(uvVenvPath);
283+
if (exe != null)
284+
return exe;
285+
}
276286
}
277287

278288
// 2. Auto-init .venv via uv sync in output directory (same trigger as GetResolvedPythonDll)
@@ -424,6 +434,10 @@ public List<string> GetResolvedModuleSearchPaths()
424434
/// Returns the <c>.venv/</c> path if successful, or <c>null</c> if files are missing or sync fails.
425435
/// </para>
426436
/// </remarks>
437+
// One semaphore per directory prevents concurrent uv sync calls from racing on the same .venv.
438+
private static readonly ConcurrentDictionary<string, SemaphoreSlim> _uvSyncLocks =
439+
new(StringComparer.OrdinalIgnoreCase);
440+
427441
private string? EnsureVenvViaUv(string directory)
428442
{
429443
var pyprojectPath = Path.Combine(directory, "pyproject.toml");
@@ -437,59 +451,81 @@ public List<string> GetResolvedModuleSearchPaths()
437451
return null; // Not a uv-managed project
438452
}
439453

440-
// If .venv exists but lacks pyvenv.cfg, it's corrupt — delete and recreate
441-
if (Directory.Exists(venvPath))
454+
// Fast path: venv already exists and is valid — no lock needed.
455+
if (File.Exists(pyvenvCfg))
442456
{
443-
try
444-
{
445-
Directory.Delete(venvPath, recursive: true);
446-
}
447-
catch
448-
{
449-
// Deletion failed (permissions, locked files, etc.) — let uv sync handle it
450-
}
457+
return venvPath;
451458
}
452459

453-
// Run uv sync to materialize .venv
460+
// Slow path: venv is missing or corrupt. One caller creates it; others wait and reuse.
461+
var semaphore = _uvSyncLocks.GetOrAdd(directory, _ => new SemaphoreSlim(1, 1));
462+
semaphore.Wait();
454463
try
455464
{
456-
var startInfo = new System.Diagnostics.ProcessStartInfo
465+
// Re-check under the lock — another thread may have created it while we were waiting.
466+
if (File.Exists(pyvenvCfg))
457467
{
458-
FileName = UvPath,
459-
Arguments = "sync --frozen --python-preference only-managed",
460-
WorkingDirectory = directory,
461-
UseShellExecute = false,
462-
RedirectStandardOutput = true,
463-
RedirectStandardError = true,
464-
CreateNoWindow = true,
465-
};
466-
467-
using var process = System.Diagnostics.Process.Start(startInfo);
468-
if (process == null)
469-
{
470-
return null; // Failed to start process
468+
return venvPath;
471469
}
472470

473-
process.WaitForExit();
474-
475-
// Check if .venv was created successfully
476-
if (process.ExitCode == 0 && File.Exists(pyvenvCfg))
471+
// If .venv exists but lacks pyvenv.cfg, it's corrupt — delete and recreate
472+
if (Directory.Exists(venvPath))
477473
{
478-
return venvPath;
474+
try
475+
{
476+
Directory.Delete(venvPath, recursive: true);
477+
}
478+
catch
479+
{
480+
// Deletion failed (permissions, locked files, etc.) — let uv sync handle it
481+
}
479482
}
480483

481-
// Sync failed — log stderr if available for diagnostics
482-
var stderr = process.StandardError.ReadToEnd();
483-
if (!string.IsNullOrWhiteSpace(stderr))
484+
// Run uv sync to materialize .venv
485+
try
486+
{
487+
var startInfo = new System.Diagnostics.ProcessStartInfo
488+
{
489+
FileName = UvPath,
490+
Arguments = "sync --frozen --python-preference only-managed",
491+
WorkingDirectory = directory,
492+
UseShellExecute = false,
493+
RedirectStandardOutput = true,
494+
RedirectStandardError = true,
495+
CreateNoWindow = true,
496+
};
497+
498+
using var process = System.Diagnostics.Process.Start(startInfo);
499+
if (process == null)
500+
{
501+
return null; // Failed to start process
502+
}
503+
504+
process.WaitForExit();
505+
506+
// Check if .venv was created successfully
507+
if (process.ExitCode == 0 && File.Exists(pyvenvCfg))
508+
{
509+
return venvPath;
510+
}
511+
512+
// Sync failed — log stderr if available for diagnostics
513+
var stderr = process.StandardError.ReadToEnd();
514+
if (!string.IsNullOrWhiteSpace(stderr))
515+
{
516+
Console.Error.WriteLine($"uv sync failed: {stderr}");
517+
}
518+
return null;
519+
}
520+
catch
484521
{
485-
Console.Error.WriteLine($"uv sync failed: {stderr}");
522+
// uv not found, permission denied, etc. — fall through to other resolution methods
523+
return null;
486524
}
487-
return null;
488525
}
489-
catch
526+
finally
490527
{
491-
// uv not found, permission denied, etc. — fall through to other resolution methods
492-
return null;
528+
semaphore.Release();
493529
}
494530
}
495531
}

0 commit comments

Comments
 (0)