Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions api/adc/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,12 +693,22 @@ const (
)

type SyncResult struct {
Status string `json:"status"`
TotalResources int `json:"total_resources"`
SuccessCount int `json:"success_count"`
FailedCount int `json:"failed_count"`
Success []SyncStatus `json:"success"`
Failed []SyncStatus `json:"failed"`
Status string `json:"status"`
TotalResources int `json:"total_resources"`
SuccessCount int `json:"success_count"`
FailedCount int `json:"failed_count"`
Success []SyncStatus `json:"success"`
Failed []SyncStatus `json:"failed"`
EndpointStatus []EndpointStatus `json:"endpoint_status,omitempty"`
}

// EndpointStatus represents the synchronization status of an APISIX standalone endpoint.
// This is only used in apisix-standalone mode where endpoint-level status is reported
// instead of resource-level status.
type EndpointStatus struct {
Server string `json:"server"`
Success bool `json:"success"`
Reason string `json:"reason,omitempty"`
}

type SyncStatus struct {
Expand Down
168 changes: 30 additions & 138 deletions internal/adc/client/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ import (
"net"
"net/http"
"os"
"os/exec"
"strings"
"sync"
"time"

"github.com/go-logr/logr"
Expand All @@ -47,135 +45,6 @@ type ADCExecutor interface {
Execute(ctx context.Context, config adctypes.Config, args []string) error
}

type DefaultADCExecutor struct {
sync.Mutex
log logr.Logger
}

func (e *DefaultADCExecutor) Execute(ctx context.Context, config adctypes.Config, args []string) error {
return e.runADC(ctx, config, args)
}

func (e *DefaultADCExecutor) runADC(ctx context.Context, config adctypes.Config, args []string) error {
var execErrs = types.ADCExecutionError{
Name: config.Name,
}

for _, addr := range config.ServerAddrs {
if err := e.runForSingleServerWithTimeout(ctx, addr, config, args); err != nil {
e.log.Error(err, "failed to run adc for server", "server", addr)
var execErr types.ADCExecutionServerAddrError
if errors.As(err, &execErr) {
execErrs.FailedErrors = append(execErrs.FailedErrors, execErr)
} else {
execErrs.FailedErrors = append(execErrs.FailedErrors, types.ADCExecutionServerAddrError{
ServerAddr: addr,
Err: err.Error(),
})
}
}
}
if len(execErrs.FailedErrors) > 0 {
return execErrs
}
return nil
}

func (e *DefaultADCExecutor) runForSingleServerWithTimeout(ctx context.Context, serverAddr string, config adctypes.Config, args []string) error {
ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
return e.runForSingleServer(ctx, serverAddr, config, args)
}

func (e *DefaultADCExecutor) runForSingleServer(ctx context.Context, serverAddr string, config adctypes.Config, args []string) error {
cmdArgs := append([]string{}, args...)
if !config.TlsVerify {
cmdArgs = append(cmdArgs, "--tls-skip-verify")
}

cmdArgs = append(cmdArgs, "--timeout", "15s")

env := e.prepareEnv(serverAddr, config.BackendType, config.Token)

var stdout, stderr bytes.Buffer
cmd := exec.CommandContext(ctx, "adc", cmdArgs...)
cmd.Stdout = &stdout
cmd.Stderr = &stderr
cmd.Env = append(os.Environ(), env...)

e.log.V(1).Info("running adc command",
"command", strings.Join(cmd.Args, " "),
"env", filterSensitiveEnv(env),
)

if err := cmd.Run(); err != nil {
return e.buildCmdError(err, stdout.Bytes(), stderr.Bytes())
}

result, err := e.handleOutput(stdout.Bytes())
if err != nil {
e.log.Error(err, "failed to handle adc output",
"stdout", stdout.String(),
"stderr", stderr.String())
return fmt.Errorf("failed to handle adc output: %w", err)
}
if result.FailedCount > 0 && len(result.Failed) > 0 {
reason := result.Failed[0].Reason
e.log.Error(fmt.Errorf("adc sync failed: %s", reason), "adc sync failed", "result", result)
return types.ADCExecutionServerAddrError{
ServerAddr: serverAddr,
Err: reason,
FailedStatuses: result.Failed,
}
}
e.log.V(1).Info("adc sync success", "result", result)
return nil
}

func (e *DefaultADCExecutor) prepareEnv(serverAddr, mode, token string) []string {
return []string{
"ADC_EXPERIMENTAL_FEATURE_FLAGS=remote-state-file,parallel-backend-request",
"ADC_RUNNING_MODE=ingress",
"ADC_BACKEND=" + mode,
"ADC_SERVER=" + serverAddr,
"ADC_TOKEN=" + token,
}
}

// filterSensitiveEnv filters out sensitive information from environment variables for logging
func filterSensitiveEnv(env []string) []string {
filtered := make([]string, 0, len(env))
for _, envVar := range env {
if strings.Contains(envVar, "ADC_TOKEN=") {
filtered = append(filtered, "ADC_TOKEN=***")
} else {
filtered = append(filtered, envVar)
}
}
return filtered
}

func (e *DefaultADCExecutor) buildCmdError(runErr error, stdout, stderr []byte) error {
errMsg := string(stderr)
if errMsg == "" {
errMsg = string(stdout)
}
e.log.Error(runErr, "failed to run adc", "output", string(stdout), "stderr", string(stderr))
return errors.New("failed to sync resources: " + errMsg + ", exit err: " + runErr.Error())
}

func (e *DefaultADCExecutor) handleOutput(output []byte) (*adctypes.SyncResult, error) {
e.log.V(1).Info("adc command output", "output", string(output))
var result adctypes.SyncResult
if lines := bytes.Split(output, []byte{'\n'}); len(lines) > 0 {
output = lines[len(lines)-1]
}
if err := json.Unmarshal(output, &result); err != nil {
return nil, errors.New("failed to unmarshal response: " + string(output) + ", err: " + err.Error())
}
return &result, nil
}

func BuildADCExecuteArgs(filePath string, labels map[string]string, types []string) []string {
args := []string{
"sync",
Expand Down Expand Up @@ -452,13 +321,36 @@ func (e *HTTPADCExecutor) handleHTTPResponse(resp *http.Response, serverAddr str
}

// Check for sync failures
if result.FailedCount > 0 && len(result.Failed) > 0 {
reason := result.Failed[0].Reason
e.log.Error(fmt.Errorf("ADC Server sync failed: %s", reason), "ADC Server sync failed", "result", result)
return types.ADCExecutionServerAddrError{
ServerAddr: serverAddr,
Err: reason,
FailedStatuses: result.Failed,
// For apisix-standalone mode: Failed is always empty, check EndpointStatus instead
if result.FailedCount > 0 {
if len(result.Failed) > 0 {
reason := result.Failed[0].Reason
e.log.Error(fmt.Errorf("ADC Server sync failed: %s", reason), "ADC Server sync failed", "result", result)
return types.ADCExecutionServerAddrError{
ServerAddr: serverAddr,
Err: reason,
FailedStatuses: result.Failed,
}
}
if len(result.EndpointStatus) > 0 {
// apisix-standalone mode: use EndpointStatus
var failedEndpoints []string
for _, ep := range result.EndpointStatus {
if !ep.Success {
failedEndpoints = append(failedEndpoints, fmt.Sprintf("%s: %s", ep.Server, ep.Reason))
}
}
if len(failedEndpoints) > 0 {
reason := strings.Join(failedEndpoints, "; ")
e.log.Error(fmt.Errorf("ADC Server sync failed (standalone mode): %s", reason), "ADC Server sync failed", "result", result)
return types.ADCExecutionServerAddrError{
ServerAddr: serverAddr,
Err: reason,
FailedStatuses: []adctypes.SyncStatus{
{Reason: reason},
},
}
}
}
}

Expand Down
Loading