Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ feature runs and a namespace is dynamically generated. To not start the embedded
address of a server to use. Similarly, to not use a dynamic namespace (that may not be registered on the external
server), use `--namespace`.

Note: features under `features/nexus/` are not supported against Temporal Cloud — they
require `OperatorService.CreateNexusEndpoint`, which is only exposed on self-hosted
servers (e.g. the dev server).

### History Checking

History files are present at `features/<path/to/feature>/history/history.<lang>.<version>.json`. By default, three
Expand Down
111 changes: 101 additions & 10 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,17 @@ import (
"github.com/temporalio/features/harness/go/history"
"github.com/temporalio/features/sdkbuild"
"github.com/urfave/cli/v2"
nexuspb "go.temporal.io/api/nexus/v1"
"go.temporal.io/api/operatorservice/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/testsuite"
"gopkg.in/yaml.v3"
)

// nexusFeatureDirPrefix marks features that require a per-test Nexus endpoint.
const nexusFeatureDirPrefix = "nexus/"

const (
summaryListenAddr = "127.0.0.1:0"
FeaturePassed = "PASSED"
Expand Down Expand Up @@ -267,6 +272,15 @@ func (r *Runner) Run(ctx context.Context, patterns []string) error {
defer proxyServer.Close()
}

// Create a Nexus endpoint per feature under features/nexus/ targeting that feature's task
// queue. Endpoint names are passed to the lang harness through RunFeature.NexusEndpoint and
// the endpoints are deleted once the lang harness completes.
deleteEndpoints, err := r.createNexusEndpoints(ctx, run)
if err != nil {
return err
}
defer deleteEndpoints()

// Ensure any created temp dir is cleaned on ctrl-c or normal exit
if r.config.DirName == "" && !r.config.RetainTempDir {
c := make(chan os.Signal, 1)
Expand Down Expand Up @@ -300,16 +314,16 @@ func (r *Runner) Run(ctx context.Context, patterns []string) error {
err = r.RunGoExternal(ctx, run)
}
} else {
err = cmd.NewRunner(cmd.RunConfig{
Server: r.config.Server,
Namespace: r.config.Namespace,
ClientCertPath: r.config.ClientCertPath,
ClientKeyPath: r.config.ClientKeyPath,
CACertPath: r.config.CACertPath,
TLSServerName: r.config.TLSServerName,
SummaryURI: r.config.SummaryURI,
HTTPProxyURL: r.config.HTTPProxyURL,
}).Run(ctx, run)
err = cmd.NewRunner(cmd.RunConfig{
Server: r.config.Server,
Namespace: r.config.Namespace,
ClientCertPath: r.config.ClientCertPath,
ClientKeyPath: r.config.ClientKeyPath,
CACertPath: r.config.CACertPath,
TLSServerName: r.config.TLSServerName,
SummaryURI: r.config.SummaryURI,
HTTPProxyURL: r.config.HTTPProxyURL,
}).Run(ctx, run)
}
case "java":
if r.config.DirName != "" {
Expand Down Expand Up @@ -588,6 +602,83 @@ func rootDir() string {
return filepath.Dir(filepath.Dir(currFile))
}

// createNexusEndpoints creates a Nexus endpoint per RunFeature whose Dir is under
// features/nexus, populating RunFeature.NexusEndpoint. It returns a cleanup function that
// deletes the created endpoints. The cleanup function is always safe to call.
func (r *Runner) createNexusEndpoints(ctx context.Context, run *cmd.Run) (func(), error) {
noop := func() {}
var nexusFeatures []*cmd.RunFeature
for i := range run.Features {
if strings.HasPrefix(run.Features[i].Dir, nexusFeatureDirPrefix) {
nexusFeatures = append(nexusFeatures, &run.Features[i])
}
}
if len(nexusFeatures) == 0 {
return noop, nil
}

opts := client.Options{HostPort: r.config.Server, Namespace: r.config.Namespace, Logger: r.log}
tlsCfg, err := harness.LoadTLSConfig(r.config.ClientCertPath, r.config.ClientKeyPath, r.config.CACertPath, r.config.TLSServerName)
if err != nil {
return noop, fmt.Errorf("failed to load TLS config: %w", err)
}
opts.ConnectionOptions.TLS = tlsCfg
cl, err := client.Dial(opts)
if err != nil {
return noop, fmt.Errorf("failed creating client for nexus endpoint setup: %w", err)
}

type createdEndpoint struct {
ID string
Version int64
Name string
}
var created []createdEndpoint
cleanup := func() {
for _, ep := range created {
delCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this function get a context passed in so it respects any outer cancelations?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The caller has not context to pass in so don't think it is worth it at this point. The caller would just pass in context.Background()

_, err := cl.OperatorService().DeleteNexusEndpoint(delCtx, &operatorservice.DeleteNexusEndpointRequest{
Id: ep.ID,
Version: ep.Version,
})
cancel()
if err != nil {
r.log.Warn("Failed deleting nexus endpoint", "Endpoint", ep.Name, "Error", err)
}
}
cl.Close()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Want to also log if close fails?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Close can't fail, it doesn't return an error. If the Go SDK does detect an issue it will log though

}

// Nexus endpoint names have stricter, DNS-style validation than task queues, so / and _
// in the feature dir must be normalized to -.
sanitize := strings.NewReplacer("/", "-", "_", "-")
for _, feature := range nexusFeatures {
name := "features-nexus-" + sanitize.Replace(feature.Dir) + "-" + uuid.NewString()
createCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
res, err := cl.OperatorService().CreateNexusEndpoint(createCtx, &operatorservice.CreateNexusEndpointRequest{
Spec: &nexuspb.EndpointSpec{
Name: name,
Target: &nexuspb.EndpointTarget{
Variant: &nexuspb.EndpointTarget_Worker_{
Worker: &nexuspb.EndpointTarget_Worker{
Namespace: r.config.Namespace,
TaskQueue: feature.TaskQueue,
},
},
},
},
})
cancel()
if err != nil {
cleanup()
return noop, fmt.Errorf("failed creating nexus endpoint for %v: %w", feature.Dir, err)
}
feature.NexusEndpoint = name
created = append(created, createdEndpoint{ID: res.Endpoint.Id, Version: res.Endpoint.Version, Name: name})
}
return cleanup, nil
}

func (r *Runner) destroyTempDir() {
if r.program != nil {
_ = os.RemoveAll(r.program.Dir())
Expand Down
2 changes: 2 additions & 0 deletions features/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
deployment_versioning_routing_with_ramp "github.com/temporalio/features/features/deployment_versioning/routing_with_ramp"
eager_activity_non_remote_activities_worker "github.com/temporalio/features/features/eager_activity/non_remote_activities_worker"
eager_workflow_successful_start "github.com/temporalio/features/features/eager_workflow/successful_start"
nexus_sync_success "github.com/temporalio/features/features/nexus/sync_success"
query_successful_query "github.com/temporalio/features/features/query/successful_query"
query_timeout_due_to_no_active_workers "github.com/temporalio/features/features/query/timeout_due_to_no_active_workers"
query_unexpected_arguments "github.com/temporalio/features/features/query/unexpected_arguments"
Expand Down Expand Up @@ -93,6 +94,7 @@ func init() {
deployment_versioning_routing_with_ramp.Feature,
eager_activity_non_remote_activities_worker.Feature,
eager_workflow_successful_start.Feature,
nexus_sync_success.Feature,
query_successful_query.Feature,
query_timeout_due_to_no_active_workers.Feature,
query_unexpected_arguments.Feature,
Expand Down
11 changes: 11 additions & 0 deletions features/nexus/sync_success/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Nexus sync operation succeeds

A workflow invokes a synchronous Nexus operation and receives its result.

# Detailed spec
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth checking that the operation transitions to completed state, skipping started in this case.


- A Nexus service with a sync operation is registered on the worker.
- The workflow executes the operation against a Nexus endpoint and awaits the result.
- The operation completes synchronously and its output is returned as the workflow result.
- A sync operation never enters the started state — it transitions directly from scheduled
to completed.
74 changes: 74 additions & 0 deletions features/nexus/sync_success/feature.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package sync_success

import (
"context"
"fmt"
"time"

"github.com/nexus-rpc/sdk-go/nexus"
"github.com/temporalio/features/harness/go/harness"
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/workflow"
)

const ServiceName = "test-service"

var SayHelloOperation = nexus.NewSyncOperation(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor suggestion: Since we don't care about the actual operation's logic, do you think it would be net clearer to name this NexusSyncOperation instead? i.e. describe the kind of thing it is, since "SayHello" isn't that important in this context?

"say-hello",
func(ctx context.Context, name string, options nexus.StartOperationOptions) (string, error) {
return "Hello, " + name + "!", nil
},
)

var Service = func() *nexus.Service {
s := nexus.NewService(ServiceName)
s.MustRegister(SayHelloOperation)
return s
}()

func Workflow(ctx workflow.Context, endpoint string) (string, error) {
nc := workflow.NewNexusClient(endpoint, ServiceName)
fut := nc.ExecuteOperation(ctx, SayHelloOperation, "world", workflow.NexusOperationOptions{
ScheduleToCloseTimeout: time.Minute,
})
var result string
if err := fut.Get(ctx, &result); err != nil {
return "", err
}
return result, nil
}

var Feature = harness.Feature{
Workflows: Workflow,
NexusServices: Service,
ExpectRunResult: "Hello, world!",
Execute: func(ctx context.Context, runner *harness.Runner) (client.WorkflowRun, error) {
opts := client.StartWorkflowOptions{
TaskQueue: runner.TaskQueue,
WorkflowExecutionTimeout: time.Minute,
}
return runner.Client.ExecuteWorkflow(ctx, opts, Workflow, runner.NexusEndpoint)
},
CheckHistory: func(ctx context.Context, runner *harness.Runner, run client.WorkflowRun) error {
// Sync Nexus operations should transition directly from Scheduled to Completed with
// no Started event in between.
hasEvent := func(t enumspb.EventType) (bool, error) {
hist := runner.Client.GetWorkflowHistory(ctx, run.GetID(), run.GetRunID(), false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
ev, err := harness.FindEvent(hist, func(ev *historypb.HistoryEvent) bool { return ev.EventType == t })
return ev != nil, err
}
if ok, err := hasEvent(enumspb.EVENT_TYPE_NEXUS_OPERATION_COMPLETED); err != nil {
return err
} else if !ok {
return fmt.Errorf("did not find NexusOperationCompleted event in history")
}
if ok, err := hasEvent(enumspb.EVENT_TYPE_NEXUS_OPERATION_STARTED); err != nil {
return err
} else if ok {
return fmt.Errorf("unexpected NexusOperationStarted event for sync operation")
}
return nil
},
}
91 changes: 91 additions & 0 deletions features/nexus/sync_success/feature.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package nexus.sync_success;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import io.nexusrpc.Operation;
import io.nexusrpc.Service;
import io.nexusrpc.handler.OperationHandler;
import io.nexusrpc.handler.OperationImpl;
import io.nexusrpc.handler.ServiceImpl;
import io.temporal.client.WorkflowOptions;
import io.temporal.sdkfeatures.Feature;
import io.temporal.sdkfeatures.Run;
import io.temporal.sdkfeatures.Runner;
import io.temporal.workflow.NexusOperationOptions;
import io.temporal.workflow.NexusServiceOptions;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import java.time.Duration;

@WorkflowInterface
public interface feature extends Feature {
@WorkflowMethod
String workflow(String endpoint);

@Service
interface TestService {
@Operation
String sayHello(String name);
}

class Impl implements feature {
@Override
public String workflow(String endpoint) {
var serviceOptions =
NexusServiceOptions.newBuilder()
.setEndpoint(endpoint)
.setOperationOptions(
NexusOperationOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofMinutes(1))
.build())
.build();
TestService stub = Workflow.newNexusServiceStub(TestService.class, serviceOptions);
return stub.sayHello("world");
}

@Override
public Object[] nexusServiceImplementations() {
return new Object[] {new TestServiceImpl()};
}

@Override
public Run execute(Runner runner) throws Exception {
var options =
WorkflowOptions.newBuilder()
.setTaskQueue(runner.config.taskQueue)
.setWorkflowExecutionTimeout(Duration.ofMinutes(1))
.build();
return runner.executeSingleWorkflow(options, runner.nexusEndpoint);
}

@Override
public void checkResult(Runner runner, Run run) {
var result = runner.waitForRunResult(run, String.class);
assertEquals("Hello, world!", result);
}

@Override
public void checkHistory(Runner runner, Run run) throws Exception {
// Assert that the sync operation transitioned straight from Scheduled to
// Completed with no Started event.
var events = runner.getWorkflowHistory(run).getEventsList();
assertTrue(
events.stream().anyMatch(e -> e.hasNexusOperationCompletedEventAttributes()),
"expected NexusOperationCompleted event in history");
assertFalse(
events.stream().anyMatch(e -> e.hasNexusOperationStartedEventAttributes()),
"unexpected NexusOperationStarted event for sync operation");
}
}

@ServiceImpl(service = TestService.class)
class TestServiceImpl {
@OperationImpl
public OperationHandler<String, String> sayHello() {
return OperationHandler.sync((ctx, details, name) -> "Hello, " + name + "!");
}
}
}
19 changes: 15 additions & 4 deletions harness/go/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,25 @@ func (r *Run) ToArgs() []string {
ret := make([]string, len(r.Features))
for i, feature := range r.Features {
ret[i] = feature.Dir + ":" + feature.TaskQueue
if feature.NexusEndpoint != "" {
ret[i] += ":" + feature.NexusEndpoint
}
}
return ret
}

// FromArgs converts the given arguments to features to run.
func (r *Run) FromArgs(args []string) error {
for _, arg := range args {
colonIndex := strings.Index(arg, ":")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: For readability, what do you think about moving this parsing logic into a separate function? Something like:

feature, err := parseRunFeature(arg)
if err != nil {
   return fmt.Errorf("parsing argument %q: %w", arg, err)
}
r.Features = append(r.Features, feature)

if colonIndex == -1 {
pieces := strings.SplitN(arg, ":", 3)
if len(pieces) < 2 {
return fmt.Errorf("feature %v missing task queue", arg)
}
r.Features = append(r.Features, RunFeature{Dir: arg[:colonIndex], TaskQueue: arg[colonIndex+1:]})
runFeature := RunFeature{Dir: pieces[0], TaskQueue: pieces[1]}
if len(pieces) == 3 {
runFeature.NexusEndpoint = pieces[2]
}
r.Features = append(r.Features, runFeature)
}
return nil
}
Expand All @@ -70,7 +77,10 @@ func (r *Run) FromArgs(args []string) error {
type RunFeature struct {
Dir string
TaskQueue string
Config RunFeatureConfig
// NexusEndpoint is the pre-created Nexus endpoint name targeting this feature's namespace
// and task queue. Set by the top-level runner for features under features/nexus.
NexusEndpoint string
Config RunFeatureConfig
}

// RunFeatureConfig is config from .config.json.
Expand Down Expand Up @@ -237,6 +247,7 @@ func (r *Runner) Run(ctx context.Context, run *Run) error {
ClientKeyPath: r.config.ClientKeyPath,
CACertPath: r.config.CACertPath,
TaskQueue: runFeature.TaskQueue,
NexusEndpoint: runFeature.NexusEndpoint,
Log: r.log,
HTTPProxyURL: r.config.HTTPProxyURL,
TLSServerName: r.config.TLSServerName,
Expand Down
Loading
Loading