-
Notifications
You must be signed in to change notification settings - Fork 22
Add initial Nexus support #816
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,12 +23,17 @@ import ( | |
| "github.com/temporalio/features/harness/go/history" | ||
| "github.com/temporalio/features/sdkbuild" | ||
| "github.com/urfave/cli/v2" | ||
| nexuspb "go.temporal.io/api/nexus/v1" | ||
| "go.temporal.io/api/operatorservice/v1" | ||
| "go.temporal.io/sdk/client" | ||
| "go.temporal.io/sdk/log" | ||
| "go.temporal.io/sdk/testsuite" | ||
| "gopkg.in/yaml.v3" | ||
| ) | ||
|
|
||
| // nexusFeatureDirPrefix marks features that require a per-test Nexus endpoint. | ||
| const nexusFeatureDirPrefix = "nexus/" | ||
|
|
||
| const ( | ||
| summaryListenAddr = "127.0.0.1:0" | ||
| FeaturePassed = "PASSED" | ||
|
|
@@ -267,6 +272,15 @@ func (r *Runner) Run(ctx context.Context, patterns []string) error { | |
| defer proxyServer.Close() | ||
| } | ||
|
|
||
| // Create a Nexus endpoint per feature under features/nexus/ targeting that feature's task | ||
| // queue. Endpoint names are passed to the lang harness through RunFeature.NexusEndpoint and | ||
| // the endpoints are deleted once the lang harness completes. | ||
| deleteEndpoints, err := r.createNexusEndpoints(ctx, run) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| defer deleteEndpoints() | ||
|
|
||
| // Ensure any created temp dir is cleaned on ctrl-c or normal exit | ||
| if r.config.DirName == "" && !r.config.RetainTempDir { | ||
| c := make(chan os.Signal, 1) | ||
|
|
@@ -300,16 +314,16 @@ func (r *Runner) Run(ctx context.Context, patterns []string) error { | |
| err = r.RunGoExternal(ctx, run) | ||
| } | ||
| } else { | ||
| err = cmd.NewRunner(cmd.RunConfig{ | ||
| Server: r.config.Server, | ||
| Namespace: r.config.Namespace, | ||
| ClientCertPath: r.config.ClientCertPath, | ||
| ClientKeyPath: r.config.ClientKeyPath, | ||
| CACertPath: r.config.CACertPath, | ||
| TLSServerName: r.config.TLSServerName, | ||
| SummaryURI: r.config.SummaryURI, | ||
| HTTPProxyURL: r.config.HTTPProxyURL, | ||
| }).Run(ctx, run) | ||
| err = cmd.NewRunner(cmd.RunConfig{ | ||
| Server: r.config.Server, | ||
| Namespace: r.config.Namespace, | ||
| ClientCertPath: r.config.ClientCertPath, | ||
| ClientKeyPath: r.config.ClientKeyPath, | ||
| CACertPath: r.config.CACertPath, | ||
| TLSServerName: r.config.TLSServerName, | ||
| SummaryURI: r.config.SummaryURI, | ||
| HTTPProxyURL: r.config.HTTPProxyURL, | ||
| }).Run(ctx, run) | ||
| } | ||
| case "java": | ||
| if r.config.DirName != "" { | ||
|
|
@@ -588,6 +602,83 @@ func rootDir() string { | |
| return filepath.Dir(filepath.Dir(currFile)) | ||
| } | ||
|
|
||
| // createNexusEndpoints creates a Nexus endpoint per RunFeature whose Dir is under | ||
| // features/nexus, populating RunFeature.NexusEndpoint. It returns a cleanup function that | ||
| // deletes the created endpoints. The cleanup function is always safe to call. | ||
| func (r *Runner) createNexusEndpoints(ctx context.Context, run *cmd.Run) (func(), error) { | ||
| noop := func() {} | ||
| var nexusFeatures []*cmd.RunFeature | ||
| for i := range run.Features { | ||
| if strings.HasPrefix(run.Features[i].Dir, nexusFeatureDirPrefix) { | ||
| nexusFeatures = append(nexusFeatures, &run.Features[i]) | ||
| } | ||
| } | ||
| if len(nexusFeatures) == 0 { | ||
| return noop, nil | ||
| } | ||
|
|
||
| opts := client.Options{HostPort: r.config.Server, Namespace: r.config.Namespace, Logger: r.log} | ||
| tlsCfg, err := harness.LoadTLSConfig(r.config.ClientCertPath, r.config.ClientKeyPath, r.config.CACertPath, r.config.TLSServerName) | ||
| if err != nil { | ||
| return noop, fmt.Errorf("failed to load TLS config: %w", err) | ||
| } | ||
| opts.ConnectionOptions.TLS = tlsCfg | ||
| cl, err := client.Dial(opts) | ||
| if err != nil { | ||
| return noop, fmt.Errorf("failed creating client for nexus endpoint setup: %w", err) | ||
| } | ||
|
|
||
| type createdEndpoint struct { | ||
| ID string | ||
| Version int64 | ||
| Name string | ||
| } | ||
| var created []createdEndpoint | ||
| cleanup := func() { | ||
| for _, ep := range created { | ||
| delCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) | ||
| _, err := cl.OperatorService().DeleteNexusEndpoint(delCtx, &operatorservice.DeleteNexusEndpointRequest{ | ||
| Id: ep.ID, | ||
| Version: ep.Version, | ||
| }) | ||
| cancel() | ||
| if err != nil { | ||
| r.log.Warn("Failed deleting nexus endpoint", "Endpoint", ep.Name, "Error", err) | ||
| } | ||
| } | ||
| cl.Close() | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Want to also log if close fails?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Close can't fail, it doesn't return an error. If the Go SDK does detect an issue it will log though |
||
| } | ||
|
|
||
| // Nexus endpoint names have stricter, DNS-style validation than task queues, so / and _ | ||
| // in the feature dir must be normalized to -. | ||
| sanitize := strings.NewReplacer("/", "-", "_", "-") | ||
| for _, feature := range nexusFeatures { | ||
| name := "features-nexus-" + sanitize.Replace(feature.Dir) + "-" + uuid.NewString() | ||
| createCtx, cancel := context.WithTimeout(ctx, 10*time.Second) | ||
| res, err := cl.OperatorService().CreateNexusEndpoint(createCtx, &operatorservice.CreateNexusEndpointRequest{ | ||
| Spec: &nexuspb.EndpointSpec{ | ||
| Name: name, | ||
| Target: &nexuspb.EndpointTarget{ | ||
| Variant: &nexuspb.EndpointTarget_Worker_{ | ||
| Worker: &nexuspb.EndpointTarget_Worker{ | ||
| Namespace: r.config.Namespace, | ||
| TaskQueue: feature.TaskQueue, | ||
| }, | ||
| }, | ||
| }, | ||
| }, | ||
| }) | ||
| cancel() | ||
| if err != nil { | ||
| cleanup() | ||
| return noop, fmt.Errorf("failed creating nexus endpoint for %v: %w", feature.Dir, err) | ||
| } | ||
| feature.NexusEndpoint = name | ||
| created = append(created, createdEndpoint{ID: res.Endpoint.Id, Version: res.Endpoint.Version, Name: name}) | ||
| } | ||
| return cleanup, nil | ||
| } | ||
|
|
||
| func (r *Runner) destroyTempDir() { | ||
| if r.program != nil { | ||
| _ = os.RemoveAll(r.program.Dir()) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| # Nexus sync operation succeeds | ||
|
|
||
| A workflow invokes a synchronous Nexus operation and receives its result. | ||
|
|
||
| # Detailed spec | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Worth checking that the operation transitions to completed state, skipping started in this case. |
||
|
|
||
| - A Nexus service with a sync operation is registered on the worker. | ||
| - The workflow executes the operation against a Nexus endpoint and awaits the result. | ||
| - The operation completes synchronously and its output is returned as the workflow result. | ||
| - A sync operation never enters the started state — it transitions directly from scheduled | ||
| to completed. | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,74 @@ | ||
| package sync_success | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "time" | ||
|
|
||
| "github.com/nexus-rpc/sdk-go/nexus" | ||
| "github.com/temporalio/features/harness/go/harness" | ||
| enumspb "go.temporal.io/api/enums/v1" | ||
| historypb "go.temporal.io/api/history/v1" | ||
| "go.temporal.io/sdk/client" | ||
| "go.temporal.io/sdk/workflow" | ||
| ) | ||
|
|
||
| const ServiceName = "test-service" | ||
|
|
||
| var SayHelloOperation = nexus.NewSyncOperation( | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor suggestion: Since we don't care about the actual operation's logic, do you think it would be net clearer to name this |
||
| "say-hello", | ||
| func(ctx context.Context, name string, options nexus.StartOperationOptions) (string, error) { | ||
| return "Hello, " + name + "!", nil | ||
| }, | ||
| ) | ||
|
|
||
| var Service = func() *nexus.Service { | ||
| s := nexus.NewService(ServiceName) | ||
| s.MustRegister(SayHelloOperation) | ||
| return s | ||
| }() | ||
|
|
||
| func Workflow(ctx workflow.Context, endpoint string) (string, error) { | ||
| nc := workflow.NewNexusClient(endpoint, ServiceName) | ||
| fut := nc.ExecuteOperation(ctx, SayHelloOperation, "world", workflow.NexusOperationOptions{ | ||
| ScheduleToCloseTimeout: time.Minute, | ||
| }) | ||
| var result string | ||
| if err := fut.Get(ctx, &result); err != nil { | ||
| return "", err | ||
| } | ||
| return result, nil | ||
| } | ||
|
|
||
| var Feature = harness.Feature{ | ||
| Workflows: Workflow, | ||
| NexusServices: Service, | ||
| ExpectRunResult: "Hello, world!", | ||
| Execute: func(ctx context.Context, runner *harness.Runner) (client.WorkflowRun, error) { | ||
| opts := client.StartWorkflowOptions{ | ||
| TaskQueue: runner.TaskQueue, | ||
| WorkflowExecutionTimeout: time.Minute, | ||
| } | ||
| return runner.Client.ExecuteWorkflow(ctx, opts, Workflow, runner.NexusEndpoint) | ||
| }, | ||
| CheckHistory: func(ctx context.Context, runner *harness.Runner, run client.WorkflowRun) error { | ||
| // Sync Nexus operations should transition directly from Scheduled to Completed with | ||
| // no Started event in between. | ||
| hasEvent := func(t enumspb.EventType) (bool, error) { | ||
| hist := runner.Client.GetWorkflowHistory(ctx, run.GetID(), run.GetRunID(), false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) | ||
| ev, err := harness.FindEvent(hist, func(ev *historypb.HistoryEvent) bool { return ev.EventType == t }) | ||
| return ev != nil, err | ||
| } | ||
| if ok, err := hasEvent(enumspb.EVENT_TYPE_NEXUS_OPERATION_COMPLETED); err != nil { | ||
| return err | ||
| } else if !ok { | ||
| return fmt.Errorf("did not find NexusOperationCompleted event in history") | ||
| } | ||
| if ok, err := hasEvent(enumspb.EVENT_TYPE_NEXUS_OPERATION_STARTED); err != nil { | ||
| return err | ||
| } else if ok { | ||
| return fmt.Errorf("unexpected NexusOperationStarted event for sync operation") | ||
| } | ||
| return nil | ||
| }, | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,91 @@ | ||
| package nexus.sync_success; | ||
|
|
||
| import static org.junit.jupiter.api.Assertions.assertEquals; | ||
| import static org.junit.jupiter.api.Assertions.assertFalse; | ||
| import static org.junit.jupiter.api.Assertions.assertTrue; | ||
|
|
||
| import io.nexusrpc.Operation; | ||
| import io.nexusrpc.Service; | ||
| import io.nexusrpc.handler.OperationHandler; | ||
| import io.nexusrpc.handler.OperationImpl; | ||
| import io.nexusrpc.handler.ServiceImpl; | ||
| import io.temporal.client.WorkflowOptions; | ||
| import io.temporal.sdkfeatures.Feature; | ||
| import io.temporal.sdkfeatures.Run; | ||
| import io.temporal.sdkfeatures.Runner; | ||
| import io.temporal.workflow.NexusOperationOptions; | ||
| import io.temporal.workflow.NexusServiceOptions; | ||
| import io.temporal.workflow.Workflow; | ||
| import io.temporal.workflow.WorkflowInterface; | ||
| import io.temporal.workflow.WorkflowMethod; | ||
| import java.time.Duration; | ||
|
|
||
| @WorkflowInterface | ||
| public interface feature extends Feature { | ||
| @WorkflowMethod | ||
| String workflow(String endpoint); | ||
|
|
||
| @Service | ||
| interface TestService { | ||
| @Operation | ||
| String sayHello(String name); | ||
| } | ||
|
|
||
| class Impl implements feature { | ||
| @Override | ||
| public String workflow(String endpoint) { | ||
| var serviceOptions = | ||
| NexusServiceOptions.newBuilder() | ||
| .setEndpoint(endpoint) | ||
| .setOperationOptions( | ||
| NexusOperationOptions.newBuilder() | ||
| .setScheduleToCloseTimeout(Duration.ofMinutes(1)) | ||
| .build()) | ||
| .build(); | ||
| TestService stub = Workflow.newNexusServiceStub(TestService.class, serviceOptions); | ||
| return stub.sayHello("world"); | ||
| } | ||
|
|
||
| @Override | ||
| public Object[] nexusServiceImplementations() { | ||
| return new Object[] {new TestServiceImpl()}; | ||
| } | ||
|
|
||
| @Override | ||
| public Run execute(Runner runner) throws Exception { | ||
| var options = | ||
| WorkflowOptions.newBuilder() | ||
| .setTaskQueue(runner.config.taskQueue) | ||
| .setWorkflowExecutionTimeout(Duration.ofMinutes(1)) | ||
| .build(); | ||
| return runner.executeSingleWorkflow(options, runner.nexusEndpoint); | ||
| } | ||
|
|
||
| @Override | ||
| public void checkResult(Runner runner, Run run) { | ||
| var result = runner.waitForRunResult(run, String.class); | ||
| assertEquals("Hello, world!", result); | ||
| } | ||
|
|
||
| @Override | ||
| public void checkHistory(Runner runner, Run run) throws Exception { | ||
| // Assert that the sync operation transitioned straight from Scheduled to | ||
| // Completed with no Started event. | ||
| var events = runner.getWorkflowHistory(run).getEventsList(); | ||
| assertTrue( | ||
| events.stream().anyMatch(e -> e.hasNexusOperationCompletedEventAttributes()), | ||
| "expected NexusOperationCompleted event in history"); | ||
| assertFalse( | ||
| events.stream().anyMatch(e -> e.hasNexusOperationStartedEventAttributes()), | ||
| "unexpected NexusOperationStarted event for sync operation"); | ||
| } | ||
| } | ||
|
|
||
| @ServiceImpl(service = TestService.class) | ||
| class TestServiceImpl { | ||
| @OperationImpl | ||
| public OperationHandler<String, String> sayHello() { | ||
| return OperationHandler.sync((ctx, details, name) -> "Hello, " + name + "!"); | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -50,18 +50,25 @@ func (r *Run) ToArgs() []string { | |
| ret := make([]string, len(r.Features)) | ||
| for i, feature := range r.Features { | ||
| ret[i] = feature.Dir + ":" + feature.TaskQueue | ||
| if feature.NexusEndpoint != "" { | ||
| ret[i] += ":" + feature.NexusEndpoint | ||
| } | ||
| } | ||
| return ret | ||
| } | ||
|
|
||
| // FromArgs converts the given arguments to features to run. | ||
| func (r *Run) FromArgs(args []string) error { | ||
| for _, arg := range args { | ||
| colonIndex := strings.Index(arg, ":") | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: For readability, what do you think about moving this parsing logic into a separate function? Something like: feature, err := parseRunFeature(arg)
if err != nil {
return fmt.Errorf("parsing argument %q: %w", arg, err)
}
r.Features = append(r.Features, feature) |
||
| if colonIndex == -1 { | ||
| pieces := strings.SplitN(arg, ":", 3) | ||
| if len(pieces) < 2 { | ||
| return fmt.Errorf("feature %v missing task queue", arg) | ||
| } | ||
| r.Features = append(r.Features, RunFeature{Dir: arg[:colonIndex], TaskQueue: arg[colonIndex+1:]}) | ||
| runFeature := RunFeature{Dir: pieces[0], TaskQueue: pieces[1]} | ||
| if len(pieces) == 3 { | ||
| runFeature.NexusEndpoint = pieces[2] | ||
| } | ||
| r.Features = append(r.Features, runFeature) | ||
| } | ||
| return nil | ||
| } | ||
|
|
@@ -70,7 +77,10 @@ func (r *Run) FromArgs(args []string) error { | |
| type RunFeature struct { | ||
| Dir string | ||
| TaskQueue string | ||
| Config RunFeatureConfig | ||
| // NexusEndpoint is the pre-created Nexus endpoint name targeting this feature's namespace | ||
| // and task queue. Set by the top-level runner for features under features/nexus. | ||
| NexusEndpoint string | ||
| Config RunFeatureConfig | ||
| } | ||
|
|
||
| // RunFeatureConfig is config from .config.json. | ||
|
|
@@ -237,6 +247,7 @@ func (r *Runner) Run(ctx context.Context, run *Run) error { | |
| ClientKeyPath: r.config.ClientKeyPath, | ||
| CACertPath: r.config.CACertPath, | ||
| TaskQueue: runFeature.TaskQueue, | ||
| NexusEndpoint: runFeature.NexusEndpoint, | ||
| Log: r.log, | ||
| HTTPProxyURL: r.config.HTTPProxyURL, | ||
| TLSServerName: r.config.TLSServerName, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this function get a context passed in so it respects any outer cancelations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The caller has not context to pass in so don't think it is worth it at this point. The caller would just pass in
context.Background()