Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 3 additions & 47 deletions sdks/go/cmd/prism/prism.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,10 @@ import (
"flag"
"fmt"
"log"
"log/slog"
"os"
"strings"
"time"

beamlog "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism"
"github.com/golang-cz/devslog"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
Expand All @@ -44,57 +40,17 @@ var (

// Logging flags
var (
logKind = flag.String("log_kind", "dev",
logKindFlag = flag.String("log_kind", "dev",
"Determines the format of prism's logging to std err: valid values are `dev', 'json', or 'text'. Default is `dev`.")
logLevelFlag = flag.String("log_level", "info",
"Sets the minimum log level of Prism. Valid options are 'debug', 'info','warn', and 'error'. Default is 'info'. Debug adds prism source lines.")
)

var logLevel = new(slog.LevelVar)

func main() {
flag.Parse()
ctx, cancel := context.WithCancelCause(context.Background())

var logHandler slog.Handler
loggerOutput := os.Stderr
handlerOpts := &slog.HandlerOptions{
Level: logLevel,
}
switch strings.ToLower(*logLevelFlag) {
case "debug":
logLevel.Set(slog.LevelDebug)
handlerOpts.AddSource = true
case "info":
logLevel.Set(slog.LevelInfo)
case "warn":
logLevel.Set(slog.LevelWarn)
case "error":
logLevel.Set(slog.LevelError)
default:
log.Fatalf("Invalid value for log_level: %v, must be 'debug', 'info', 'warn', or 'error'", *logKind)
}
switch strings.ToLower(*logKind) {
case "dev":
logHandler =
devslog.NewHandler(loggerOutput, &devslog.Options{
TimeFormat: "[" + time.RFC3339Nano + "]",
StringerFormatter: true,
HandlerOptions: handlerOpts,
StringIndentation: false,
NewLineAfterLog: true,
MaxErrorStackTrace: 3,
})
case "json":
logHandler = slog.NewJSONHandler(loggerOutput, handlerOpts)
case "text":
logHandler = slog.NewTextHandler(loggerOutput, handlerOpts)
default:
log.Fatalf("Invalid value for log_kind: %v, must be 'dev', 'json', or 'text'", *logKind)
}

slog.SetDefault(slog.New(logHandler))

beamlog.SetupLogging(*logLevelFlag, *logKindFlag)
cli, err := makeJobClient(ctx,
prism.Options{
Port: *jobPort,
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/core/runtime/harness/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func MainWithOptions(ctx context.Context, loggingEndpoint, controlEndpoint strin

elmTimeout, err := parseTimeoutDurationFlag(ctx, beam.PipelineOptions.Get("element_processing_timeout"))
if err != nil {
log.Infof(ctx, "Failed to parse element_processing_timeout: %v, there will be no timeout for processing an element in a PTransform operation", err)
log.Debugf(ctx, "Failed to parse element_processing_timeout: %v, there will be no timeout for processing an element in a PTransform operation", err)
}

// Connect to FnAPI control server. Receive and execute work.
Expand Down
5 changes: 5 additions & 0 deletions sdks/go/pkg/beam/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/genx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/schema"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
)

// IMPLEMENTATION NOTE: functions and types in this file are assumed to be
Expand Down Expand Up @@ -51,6 +52,10 @@ func RegisterType(t reflect.Type) {
}

func init() {
runtime.RegisterInit(func() {
log.SetupLoggingWithDefault()
})

runtime.RegisterInit(func() {
if EnableSchemas {
schema.Initialize()
Expand Down
61 changes: 60 additions & 1 deletion sdks/go/pkg/beam/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,14 @@ package log
import (
"context"
"fmt"
"log"
"log/slog"
"os"
"strings"
"sync/atomic"
"time"

"github.com/golang-cz/devslog"
)

// Severity is the severity of the log message.
Expand All @@ -37,6 +43,11 @@ const (
SevFatal
)

var (
LogLevel = "info" // The logging level for slog. Valid values are `debug`, `info`, `warn` or `error`. Default is `info`.
LogKind = "text" // The logging format for slog. Valid values are `dev', 'json', or 'text'. Default is `text`.
)

// Logger is a context-aware logging backend. The richer context allows for
// more sophisticated logging setups. Must be concurrency safe.
type Logger interface {
Expand All @@ -54,7 +65,7 @@ type concreteLogger struct {
}

func init() {
logger.Store(&concreteLogger{&Standard{}})
logger.Store(&concreteLogger{&Structural{}})
}

// SetLogger sets the global Logger. Intended to be called during initialization
Expand Down Expand Up @@ -190,3 +201,51 @@ func Exitln(ctx context.Context, v ...any) {
Output(ctx, SevFatal, 1, fmt.Sprintln(v...))
os.Exit(1)
}

func SetupLoggingWithDefault() {
var logLevel = new(slog.LevelVar)
var logHandler slog.Handler
loggerOutput := os.Stderr
handlerOpts := &slog.HandlerOptions{
Level: logLevel,
}
switch strings.ToLower(LogLevel) {
case "debug":
logLevel.Set(slog.LevelDebug)
handlerOpts.AddSource = true
case "info":
logLevel.Set(slog.LevelInfo)
case "warn":
logLevel.Set(slog.LevelWarn)
case "error":
logLevel.Set(slog.LevelError)
default:
log.Fatalf("Invalid value for log_level: %v, must be 'debug', 'info', 'warn', or 'error'", LogLevel)
}
switch strings.ToLower(LogKind) {
case "dev":
logHandler =
devslog.NewHandler(loggerOutput, &devslog.Options{
TimeFormat: "[" + time.RFC3339Nano + "]",
StringerFormatter: true,
HandlerOptions: handlerOpts,
StringIndentation: false,
NewLineAfterLog: true,
MaxErrorStackTrace: 3,
})
Comment on lines +226 to +235
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The configuration for the dev log kind uses devslog, which produces a human-readable format (e.g., [<timestamp>] INFO ...). However, the 'After' log example in the pull request description shows a key=value format, which corresponds to the text log kind (slog.TextHandler). Since the default log_kind for Prism is dev, this discrepancy could be confusing. To align the default behavior with the example, you might consider changing the default log_kind flag in sdks/go/cmd/prism/prism.go to text, or updating the PR description to show an example of the devslog output.

case "json":
logHandler = slog.NewJSONHandler(loggerOutput, handlerOpts)
case "text":
logHandler = slog.NewTextHandler(loggerOutput, handlerOpts)
default:
log.Fatalf("Invalid value for log_kind: %v, must be 'dev', 'json', or 'text'", LogKind)
}

slog.SetDefault(slog.New(logHandler))
}

func SetupLogging(logLevel, logKind string) {
LogLevel = logLevel
LogKind = logKind
SetupLoggingWithDefault()
}
39 changes: 39 additions & 0 deletions sdks/go/pkg/beam/log/structural.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package log

import (
"context"
slogger "log/slog"
)

// Structural is a wrapper over slog
type Structural struct{}

var loggerMap = map[Severity]func(string, ...any){
SevUnspecified: slogger.Info,
SevDebug: slogger.Debug,
SevInfo: slogger.Info,
SevWarn: slogger.Warn,
SevError: slogger.Error,
SevFatal: slogger.Error,
}

// Log logs the message to the structural Go logger. For Panic, it does not
// perform the os.Exit(1) call, but defers to the log wrapper.
func (s *Structural) Log(ctx context.Context, sev Severity, _ int, msg string) {
loggerMap[sev](msg)
}
8 changes: 6 additions & 2 deletions sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
beamlog "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine"
Expand Down Expand Up @@ -224,7 +225,6 @@ func (wk *W) Logging(stream fnpb.BeamFnLogging_LoggingServer) error {
slog.String("transformID", l.GetTransformId()), // TODO: pull the unique name from the pipeline graph.
slog.String("location", l.GetLogLocation()),
slog.Time(slog.TimeKey, l.GetTimestamp().AsTime()),
slog.String(slog.MessageKey, l.GetMessage()),
}
if fs := l.GetCustomData().GetFields(); len(fs) > 0 {
var grp []any
Expand All @@ -245,7 +245,11 @@ func (wk *W) Logging(stream fnpb.BeamFnLogging_LoggingServer) error {
attrs = append(attrs, slog.Group("customData", grp...))
}

slog.LogAttrs(stream.Context(), toSlogSev(l.GetSeverity()), "log from SDK worker", slog.Any("worker", wk), slog.Group("sdk", attrs...))
if beamlog.LogLevel == "debug" {
slog.LogAttrs(stream.Context(), toSlogSev(l.GetSeverity()), "[SDK] "+l.GetMessage(), slog.Group("sdk", attrs...), slog.Any("worker", wk))
} else {
slog.LogAttrs(stream.Context(), toSlogSev(l.GetSeverity()), "[SDK] "+l.GetMessage())
}
}
}
}
Expand Down
12 changes: 11 additions & 1 deletion sdks/go/pkg/beam/runners/universal/runnerlib/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"fmt"
"io"
"strings"

"github.com/apache/beam/sdks/v2/go/container/tools"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
Expand Down Expand Up @@ -138,7 +139,16 @@ func WaitForCompletion(ctx context.Context, client jobpb.JobServiceClient, jobID
case msg.GetMessageResponse() != nil:
resp := msg.GetMessageResponse()

text := fmt.Sprintf("%v (%v): %v", resp.GetTime(), resp.GetMessageId(), resp.GetMessageText())
var b strings.Builder
if resp.GetTime() != "" {
fmt.Fprintf(&b, "(time=%v)", resp.GetTime())
}
if resp.GetMessageId() != "" {
fmt.Fprintf(&b, "(id=%v)", resp.GetMessageId())
}
b.WriteString(resp.GetMessageText())
text := b.String()

Comment on lines +142 to +151
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using strings.Builder is efficient, but the resulting log format (time=...)(id=...)message can be a bit hard to read, especially with no space between the parts. For improved readability and a more standard log appearance, consider building the message from parts with spaces in between. A slice-based approach can be more readable here, and the performance difference is likely negligible in this context.

Suggested change
var b strings.Builder
if resp.GetTime() != "" {
fmt.Fprintf(&b, "(time=%v)", resp.GetTime())
}
if resp.GetMessageId() != "" {
fmt.Fprintf(&b, "(id=%v)", resp.GetMessageId())
}
b.WriteString(resp.GetMessageText())
text := b.String()
var parts []string
if resp.GetTime() != "" {
parts = append(parts, fmt.Sprintf("time=%v", resp.GetTime()))
}
if resp.GetMessageId() != "" {
parts = append(parts, fmt.Sprintf("id=%v", resp.GetMessageId()))
}
if resp.GetMessageText() != "" {
parts = append(parts, resp.GetMessageText())
}
text := strings.Join(parts, " ")

log.Output(ctx, messageSeverity(resp.GetImportance()), 1, text)

if resp.GetImportance() >= jobpb.JobMessage_JOB_MESSAGE_ERROR {
Expand Down
3 changes: 2 additions & 1 deletion sdks/go/pkg/beam/runners/universal/universal.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx"
"google.golang.org/protobuf/encoding/prototext"

// Importing to get the side effect of the remote execution hook. See init().
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness/init"
Expand Down Expand Up @@ -92,7 +93,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error)
return nil, errors.WithContextf(err, "generating model pipeline")
}

log.Info(ctx, pipeline.String())
log.Debugf(ctx, "Pipeline proto: %s", prototext.Format(pipeline))

opt := &runnerlib.JobOptions{
Name: jobopts.GetJobName(),
Expand Down
9 changes: 7 additions & 2 deletions sdks/go/pkg/beam/x/debug/print_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package debug
import (
"bytes"
"log"
"log/slog"
"os"
"strings"
"testing"
Expand Down Expand Up @@ -92,10 +93,14 @@ func captureRunLogging(p *beam.Pipeline) string {
// Pipe output to out
var out bytes.Buffer
log.SetOutput(&out)
defer log.SetOutput(os.Stderr)

oldLogger := slog.Default()
logHandler := slog.NewTextHandler(&out, nil)
slog.SetDefault(slog.New(logHandler))
defer slog.SetDefault((oldLogger))

ptest.Run(p)

// Return to original state
log.SetOutput(os.Stderr)
return out.String()
}
Loading