diff --git a/sdks/go/cmd/prism/prism.go b/sdks/go/cmd/prism/prism.go index 5e3f42a9e5a5..7fe9580e473b 100644 --- a/sdks/go/cmd/prism/prism.go +++ b/sdks/go/cmd/prism/prism.go @@ -22,14 +22,10 @@ import ( "flag" "fmt" "log" - "log/slog" - "os" - "strings" - "time" + beamlog "github.com/apache/beam/sdks/v2/go/pkg/beam/log" jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism" - "github.com/golang-cz/devslog" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) @@ -44,57 +40,17 @@ var ( // Logging flags var ( - logKind = flag.String("log_kind", "dev", + logKindFlag = flag.String("log_kind", "dev", "Determines the format of prism's logging to std err: valid values are `dev', 'json', or 'text'. Default is `dev`.") logLevelFlag = flag.String("log_level", "info", "Sets the minimum log level of Prism. Valid options are 'debug', 'info','warn', and 'error'. Default is 'info'. Debug adds prism source lines.") ) -var logLevel = new(slog.LevelVar) - func main() { flag.Parse() ctx, cancel := context.WithCancelCause(context.Background()) - var logHandler slog.Handler - loggerOutput := os.Stderr - handlerOpts := &slog.HandlerOptions{ - Level: logLevel, - } - switch strings.ToLower(*logLevelFlag) { - case "debug": - logLevel.Set(slog.LevelDebug) - handlerOpts.AddSource = true - case "info": - logLevel.Set(slog.LevelInfo) - case "warn": - logLevel.Set(slog.LevelWarn) - case "error": - logLevel.Set(slog.LevelError) - default: - log.Fatalf("Invalid value for log_level: %v, must be 'debug', 'info', 'warn', or 'error'", *logKind) - } - switch strings.ToLower(*logKind) { - case "dev": - logHandler = - devslog.NewHandler(loggerOutput, &devslog.Options{ - TimeFormat: "[" + time.RFC3339Nano + "]", - StringerFormatter: true, - HandlerOptions: handlerOpts, - StringIndentation: false, - NewLineAfterLog: true, - MaxErrorStackTrace: 3, - }) - case "json": - logHandler = slog.NewJSONHandler(loggerOutput, handlerOpts) - case "text": - logHandler = slog.NewTextHandler(loggerOutput, handlerOpts) - default: - log.Fatalf("Invalid value for log_kind: %v, must be 'dev', 'json', or 'text'", *logKind) - } - - slog.SetDefault(slog.New(logHandler)) - + beamlog.SetupLogging(*logLevelFlag, *logKindFlag) cli, err := makeJobClient(ctx, prism.Options{ Port: *jobPort, diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go index d75ae37c6109..969ac1b0a64e 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/harness.go +++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go @@ -101,7 +101,7 @@ func MainWithOptions(ctx context.Context, loggingEndpoint, controlEndpoint strin elmTimeout, err := parseTimeoutDurationFlag(ctx, beam.PipelineOptions.Get("element_processing_timeout")) if err != nil { - log.Infof(ctx, "Failed to parse element_processing_timeout: %v, there will be no timeout for processing an element in a PTransform operation", err) + log.Debugf(ctx, "Failed to parse element_processing_timeout: %v, there will be no timeout for processing an element in a PTransform operation", err) } // Connect to FnAPI control server. Receive and execute work. diff --git a/sdks/go/pkg/beam/forward.go b/sdks/go/pkg/beam/forward.go index b2f610b703e9..7b33ae1168d9 100644 --- a/sdks/go/pkg/beam/forward.go +++ b/sdks/go/pkg/beam/forward.go @@ -24,6 +24,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/genx" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/schema" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" ) // IMPLEMENTATION NOTE: functions and types in this file are assumed to be @@ -51,6 +52,10 @@ func RegisterType(t reflect.Type) { } func init() { + runtime.RegisterInit(func() { + log.SetupLoggingWithDefault() + }) + runtime.RegisterInit(func() { if EnableSchemas { schema.Initialize() diff --git a/sdks/go/pkg/beam/log/log.go b/sdks/go/pkg/beam/log/log.go index 4c1f5dddb018..784d1824e013 100644 --- a/sdks/go/pkg/beam/log/log.go +++ b/sdks/go/pkg/beam/log/log.go @@ -21,8 +21,14 @@ package log import ( "context" "fmt" + "log" + "log/slog" "os" + "strings" "sync/atomic" + "time" + + "github.com/golang-cz/devslog" ) // Severity is the severity of the log message. @@ -37,6 +43,11 @@ const ( SevFatal ) +var ( + LogLevel = "info" // The logging level for slog. Valid values are `debug`, `info`, `warn` or `error`. Default is `info`. + LogKind = "text" // The logging format for slog. Valid values are `dev', 'json', or 'text'. Default is `text`. +) + // Logger is a context-aware logging backend. The richer context allows for // more sophisticated logging setups. Must be concurrency safe. type Logger interface { @@ -54,7 +65,7 @@ type concreteLogger struct { } func init() { - logger.Store(&concreteLogger{&Standard{}}) + logger.Store(&concreteLogger{&Structural{}}) } // SetLogger sets the global Logger. Intended to be called during initialization @@ -190,3 +201,51 @@ func Exitln(ctx context.Context, v ...any) { Output(ctx, SevFatal, 1, fmt.Sprintln(v...)) os.Exit(1) } + +func SetupLoggingWithDefault() { + var logLevel = new(slog.LevelVar) + var logHandler slog.Handler + loggerOutput := os.Stderr + handlerOpts := &slog.HandlerOptions{ + Level: logLevel, + } + switch strings.ToLower(LogLevel) { + case "debug": + logLevel.Set(slog.LevelDebug) + handlerOpts.AddSource = true + case "info": + logLevel.Set(slog.LevelInfo) + case "warn": + logLevel.Set(slog.LevelWarn) + case "error": + logLevel.Set(slog.LevelError) + default: + log.Fatalf("Invalid value for log_level: %v, must be 'debug', 'info', 'warn', or 'error'", LogLevel) + } + switch strings.ToLower(LogKind) { + case "dev": + logHandler = + devslog.NewHandler(loggerOutput, &devslog.Options{ + TimeFormat: "[" + time.RFC3339Nano + "]", + StringerFormatter: true, + HandlerOptions: handlerOpts, + StringIndentation: false, + NewLineAfterLog: true, + MaxErrorStackTrace: 3, + }) + case "json": + logHandler = slog.NewJSONHandler(loggerOutput, handlerOpts) + case "text": + logHandler = slog.NewTextHandler(loggerOutput, handlerOpts) + default: + log.Fatalf("Invalid value for log_kind: %v, must be 'dev', 'json', or 'text'", LogKind) + } + + slog.SetDefault(slog.New(logHandler)) +} + +func SetupLogging(logLevel, logKind string) { + LogLevel = logLevel + LogKind = logKind + SetupLoggingWithDefault() +} diff --git a/sdks/go/pkg/beam/log/structural.go b/sdks/go/pkg/beam/log/structural.go new file mode 100644 index 000000000000..4ba9cd1af77f --- /dev/null +++ b/sdks/go/pkg/beam/log/structural.go @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package log + +import ( + "context" + slogger "log/slog" +) + +// Structural is a wrapper over slog +type Structural struct{} + +var loggerMap = map[Severity]func(string, ...any){ + SevUnspecified: slogger.Info, + SevDebug: slogger.Debug, + SevInfo: slogger.Info, + SevWarn: slogger.Warn, + SevError: slogger.Error, + SevFatal: slogger.Error, +} + +// Log logs the message to the structural Go logger. For Panic, it does not +// perform the os.Exit(1) call, but defers to the log wrapper. +func (s *Structural) Log(ctx context.Context, sev Severity, _ int, msg string) { + loggerMap[sev](msg) +} diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go index 5668449f6c9c..33c8c3a7de5f 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go @@ -36,6 +36,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" + beamlog "github.com/apache/beam/sdks/v2/go/pkg/beam/log" fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine" @@ -224,7 +225,6 @@ func (wk *W) Logging(stream fnpb.BeamFnLogging_LoggingServer) error { slog.String("transformID", l.GetTransformId()), // TODO: pull the unique name from the pipeline graph. slog.String("location", l.GetLogLocation()), slog.Time(slog.TimeKey, l.GetTimestamp().AsTime()), - slog.String(slog.MessageKey, l.GetMessage()), } if fs := l.GetCustomData().GetFields(); len(fs) > 0 { var grp []any @@ -245,7 +245,11 @@ func (wk *W) Logging(stream fnpb.BeamFnLogging_LoggingServer) error { attrs = append(attrs, slog.Group("customData", grp...)) } - slog.LogAttrs(stream.Context(), toSlogSev(l.GetSeverity()), "log from SDK worker", slog.Any("worker", wk), slog.Group("sdk", attrs...)) + if beamlog.LogLevel == "debug" { + slog.LogAttrs(stream.Context(), toSlogSev(l.GetSeverity()), "[SDK] "+l.GetMessage(), slog.Group("sdk", attrs...), slog.Any("worker", wk)) + } else { + slog.LogAttrs(stream.Context(), toSlogSev(l.GetSeverity()), "[SDK] "+l.GetMessage()) + } } } } diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/job.go b/sdks/go/pkg/beam/runners/universal/runnerlib/job.go index 7d6a3027e47e..81ff5a5eb94a 100644 --- a/sdks/go/pkg/beam/runners/universal/runnerlib/job.go +++ b/sdks/go/pkg/beam/runners/universal/runnerlib/job.go @@ -19,6 +19,7 @@ import ( "context" "fmt" "io" + "strings" "github.com/apache/beam/sdks/v2/go/container/tools" "github.com/apache/beam/sdks/v2/go/pkg/beam" @@ -138,7 +139,16 @@ func WaitForCompletion(ctx context.Context, client jobpb.JobServiceClient, jobID case msg.GetMessageResponse() != nil: resp := msg.GetMessageResponse() - text := fmt.Sprintf("%v (%v): %v", resp.GetTime(), resp.GetMessageId(), resp.GetMessageText()) + var b strings.Builder + if resp.GetTime() != "" { + fmt.Fprintf(&b, "(time=%v)", resp.GetTime()) + } + if resp.GetMessageId() != "" { + fmt.Fprintf(&b, "(id=%v)", resp.GetMessageId()) + } + b.WriteString(resp.GetMessageText()) + text := b.String() + log.Output(ctx, messageSeverity(resp.GetImportance()), 1, text) if resp.GetImportance() >= jobpb.JobMessage_JOB_MESSAGE_ERROR { diff --git a/sdks/go/pkg/beam/runners/universal/universal.go b/sdks/go/pkg/beam/runners/universal/universal.go index c63175c58578..25325b8fe9ce 100644 --- a/sdks/go/pkg/beam/runners/universal/universal.go +++ b/sdks/go/pkg/beam/runners/universal/universal.go @@ -23,6 +23,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx" + "google.golang.org/protobuf/encoding/prototext" // Importing to get the side effect of the remote execution hook. See init(). _ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness/init" @@ -92,7 +93,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error) return nil, errors.WithContextf(err, "generating model pipeline") } - log.Info(ctx, pipeline.String()) + log.Debugf(ctx, "Pipeline proto: %s", prototext.Format(pipeline)) opt := &runnerlib.JobOptions{ Name: jobopts.GetJobName(), diff --git a/sdks/go/pkg/beam/x/debug/print_test.go b/sdks/go/pkg/beam/x/debug/print_test.go index 0bbdee0b6fb9..e064cabb1f7e 100644 --- a/sdks/go/pkg/beam/x/debug/print_test.go +++ b/sdks/go/pkg/beam/x/debug/print_test.go @@ -18,6 +18,7 @@ package debug import ( "bytes" "log" + "log/slog" "os" "strings" "testing" @@ -92,10 +93,14 @@ func captureRunLogging(p *beam.Pipeline) string { // Pipe output to out var out bytes.Buffer log.SetOutput(&out) + defer log.SetOutput(os.Stderr) + + oldLogger := slog.Default() + logHandler := slog.NewTextHandler(&out, nil) + slog.SetDefault(slog.New(logHandler)) + defer slog.SetDefault((oldLogger)) ptest.Run(p) - // Return to original state - log.SetOutput(os.Stderr) return out.String() }