diff --git a/go-memory-load-mongo/.dockerignore b/go-memory-load-mongo/.dockerignore new file mode 100644 index 00000000..cafc572d --- /dev/null +++ b/go-memory-load-mongo/.dockerignore @@ -0,0 +1,29 @@ +# Go build outputs +/bin/ +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test artifacts +*.test +*.out +coverage.txt +coverage.html + +# Go vendor +vendor/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# OS +.DS_Store +Thumbs.db + +# Docker +**/.git diff --git a/go-memory-load-mongo/.env.example b/go-memory-load-mongo/.env.example new file mode 100644 index 00000000..bfa6bb89 --- /dev/null +++ b/go-memory-load-mongo/.env.example @@ -0,0 +1,2 @@ +APP_PORT=8080 +MONGO_URI=mongodb://app_user:app_password@localhost:27017/orderdb?authSource=admin diff --git a/go-memory-load-mongo/Dockerfile b/go-memory-load-mongo/Dockerfile new file mode 100644 index 00000000..cd7392eb --- /dev/null +++ b/go-memory-load-mongo/Dockerfile @@ -0,0 +1,18 @@ +FROM golang:1.26-alpine AS build + +WORKDIR /app + +COPY go.mod go.sum* ./ +RUN go mod download + +COPY . . +RUN go build -o /bin/api ./cmd/api + +FROM alpine:3.22 + +WORKDIR /app +COPY --from=build /bin/api /app/api + +EXPOSE 8080 + +CMD ["/app/api"] diff --git a/go-memory-load-mongo/cmd/api/main.go b/go-memory-load-mongo/cmd/api/main.go new file mode 100644 index 00000000..a59bf5c0 --- /dev/null +++ b/go-memory-load-mongo/cmd/api/main.go @@ -0,0 +1,80 @@ +// Package main is the entry point for the load-test MongoDB API server. +package main + +import ( + "context" + "log/slog" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + "loadtestmongoapi/internal/config" + "loadtestmongoapi/internal/database" + "loadtestmongoapi/internal/httpapi" + "loadtestmongoapi/internal/store" +) + +func main() { + logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ + Level: slog.LevelInfo, + })) + + cfg, err := config.Load() + if err != nil { + logger.Error("load config", "error", err) + os.Exit(1) + } + + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() + + client, db, err := database.Open(ctx, cfg.MongoURI, "orderdb") + if err != nil { + logger.Error("connect mongo", "error", err) + os.Exit(1) + } + defer func() { + disconnectCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = client.Disconnect(disconnectCtx) + }() + + st := store.New(db) + + if err := st.EnsureIndexes(ctx); err != nil { + logger.Error("ensure indexes", "error", err) + os.Exit(1) + } + + handler := httpapi.New(st, logger) + + server := &http.Server{ + Addr: ":" + cfg.Port, + Handler: handler, + ReadHeaderTimeout: 3 * time.Second, + ReadTimeout: 30 * time.Second, + WriteTimeout: 60 * time.Second, + IdleTimeout: 60 * time.Second, + } + + go func() { + logger.Info("api listening", "addr", server.Addr) + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + logger.Error("listen and serve", "error", err) + stop() + } + }() + + <-ctx.Done() + logger.Info("shutdown signal received") + + shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + if err := server.Shutdown(shutdownCtx); err != nil { + logger.Error("graceful shutdown", "error", err) + os.Exit(1) + } +} diff --git a/go-memory-load-mongo/docker-compose.yml b/go-memory-load-mongo/docker-compose.yml new file mode 100644 index 00000000..f0d23b2d --- /dev/null +++ b/go-memory-load-mongo/docker-compose.yml @@ -0,0 +1,41 @@ +services: + db: + image: mongo:7 + container_name: load-test-mongo-db + ports: + - "27017:27017" + volumes: + - mongo_data:/data/db + healthcheck: + test: ["CMD", "mongosh", "--quiet", "--eval", "db.adminCommand('ping').ok"] + interval: 5s + timeout: 5s + retries: 20 + + api: + build: + context: . + container_name: load-test-mongo-api + environment: + APP_PORT: "8080" + MONGO_URI: mongodb://db:27017/orderdb + ports: + - "8080:8080" + depends_on: + db: + condition: service_healthy + + k6: + image: grafana/k6:0.49.0 + profiles: ["loadtest"] + environment: + BASE_URL: http://api:8080 + volumes: + - ./loadtest:/scripts:ro + depends_on: + api: + condition: service_started + entrypoint: ["k6"] + +volumes: + mongo_data: diff --git a/go-memory-load-mongo/go.mod b/go-memory-load-mongo/go.mod new file mode 100644 index 00000000..49641c92 --- /dev/null +++ b/go-memory-load-mongo/go.mod @@ -0,0 +1,17 @@ +module loadtestmongoapi + +go 1.26 + +require go.mongodb.org/mongo-driver/v2 v2.2.1 + +require ( + github.com/golang/snappy v1.0.0 // indirect + github.com/klauspost/compress v1.16.7 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.1.2 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect + github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect + golang.org/x/crypto v0.33.0 // indirect + golang.org/x/sync v0.11.0 // indirect + golang.org/x/text v0.22.0 // indirect +) diff --git a/go-memory-load-mongo/go.sum b/go-memory-load-mongo/go.sum new file mode 100644 index 00000000..e8f55b11 --- /dev/null +++ b/go-memory-load-mongo/go.sum @@ -0,0 +1,48 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs= +github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= +github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.mongodb.org/mongo-driver/v2 v2.2.1 h1:w5xra3yyu/sGrziMzK1D0cRRaH/b7lWCSsoN6+WV6AM= +go.mongodb.org/mongo-driver/v2 v2.2.1/go.mod h1:qQkDMhCGWl3FN509DfdPd4GRBLU/41zqF/k8eTRceps= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus= +golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w= +golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM= +golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/go-memory-load-mongo/internal/config/config.go b/go-memory-load-mongo/internal/config/config.go new file mode 100644 index 00000000..9378a71f --- /dev/null +++ b/go-memory-load-mongo/internal/config/config.go @@ -0,0 +1,35 @@ +// Package config handles configuration loading from environment variables. +package config + +import ( + "errors" + "os" + "strings" +) + +type Config struct { + Port string + MongoURI string +} + +func Load() (Config, error) { + cfg := Config{ + Port: getEnv("APP_PORT", "8080"), + MongoURI: strings.TrimSpace(os.Getenv("MONGO_URI")), + } + + if cfg.MongoURI == "" { + return Config{}, errors.New("MONGO_URI is required") + } + + return cfg, nil +} + +func getEnv(key, fallback string) string { + value := strings.TrimSpace(os.Getenv(key)) + if value == "" { + return fallback + } + + return value +} diff --git a/go-memory-load-mongo/internal/database/mongo.go b/go-memory-load-mongo/internal/database/mongo.go new file mode 100644 index 00000000..d96d4ed7 --- /dev/null +++ b/go-memory-load-mongo/internal/database/mongo.go @@ -0,0 +1,47 @@ +// Package database provides MongoDB connection helpers. +package database + +import ( + "context" + "fmt" + "time" + + "go.mongodb.org/mongo-driver/v2/mongo" + "go.mongodb.org/mongo-driver/v2/mongo/options" + "go.mongodb.org/mongo-driver/v2/mongo/readpref" +) + +// Open creates a new MongoDB client, verifies connectivity with retries, and +// returns the client and the named database handle. +func Open(ctx context.Context, uri, dbName string) (*mongo.Client, *mongo.Database, error) { + opts := options.Client(). + ApplyURI(uri). + SetMaxPoolSize(25). + SetMinPoolSize(10). + SetMaxConnIdleTime(5 * time.Minute) + + client, err := mongo.Connect(opts) + if err != nil { + return nil, nil, fmt.Errorf("connect mongo: %w", err) + } + + var pingErr error + for attempt := 1; attempt <= 20; attempt++ { + pingCtx, cancel := context.WithTimeout(ctx, 3*time.Second) + pingErr = client.Ping(pingCtx, readpref.Primary()) + cancel() + if pingErr == nil { + return client, client.Database(dbName), nil + } + + select { + case <-ctx.Done(): + _ = client.Disconnect(context.Background()) + return nil, nil, fmt.Errorf("ping mongo: %w", ctx.Err()) + case <-time.After(2 * time.Second): + } + } + + _ = client.Disconnect(context.Background()) + return nil, nil, fmt.Errorf("ping mongo after retries: %w", pingErr) +} diff --git a/go-memory-load-mongo/internal/httpapi/server.go b/go-memory-load-mongo/internal/httpapi/server.go new file mode 100644 index 00000000..c94e9a80 --- /dev/null +++ b/go-memory-load-mongo/internal/httpapi/server.go @@ -0,0 +1,336 @@ +// Package httpapi provides the HTTP API handlers for the load-test MongoDB server. +package httpapi + +import ( + "context" + "encoding/json" + "errors" + "io" + "log/slog" + "net/http" + "strconv" + "time" + + "loadtestmongoapi/internal/store" +) + +type Server struct { + store *store.Store + logger *slog.Logger +} + +type apiError struct { + Error string `json:"error"` +} + +func New(st *store.Store, logger *slog.Logger) http.Handler { + s := &Server{ + store: st, + logger: logger, + } + + mux := http.NewServeMux() + mux.HandleFunc("GET /healthz", s.healthz) + mux.HandleFunc("POST /customers", s.createCustomer) + mux.HandleFunc("POST /products", s.createProduct) + mux.HandleFunc("POST /orders", s.createOrder) + mux.HandleFunc("GET /orders/{id}", s.getOrder) + mux.HandleFunc("GET /orders", s.searchOrders) + mux.HandleFunc("GET /customers/{id}/summary", s.getCustomerSummary) + mux.HandleFunc("GET /analytics/top-products", s.topProducts) + mux.HandleFunc("POST /large-payloads", s.createLargePayload) + mux.HandleFunc("GET /large-payloads/{id}", s.getLargePayload) + mux.HandleFunc("DELETE /large-payloads/{id}", s.deleteLargePayload) + + return s.withRecover(s.withLogging(mux)) +} + +func (s *Server) healthz(w http.ResponseWriter, r *http.Request) { + ctx, cancel := contextWithTimeout(r, 2*time.Second) + defer cancel() + + if err := s.store.Ping(ctx); err != nil { + writeJSON(w, http.StatusServiceUnavailable, apiError{Error: "database unavailable"}) + return + } + + writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) +} + +func (s *Server) createCustomer(w http.ResponseWriter, r *http.Request) { + var req store.CreateCustomerRequest + if err := decodeJSON(r, &req); err != nil { + writeJSON(w, http.StatusBadRequest, apiError{Error: err.Error()}) + return + } + + customer, err := s.store.CreateCustomer(r.Context(), req) + if err != nil { + s.writeStoreError(w, err) + return + } + + writeJSON(w, http.StatusCreated, customer) +} + +func (s *Server) createProduct(w http.ResponseWriter, r *http.Request) { + var req store.CreateProductRequest + if err := decodeJSON(r, &req); err != nil { + writeJSON(w, http.StatusBadRequest, apiError{Error: err.Error()}) + return + } + + product, err := s.store.CreateProduct(r.Context(), req) + if err != nil { + s.writeStoreError(w, err) + return + } + + writeJSON(w, http.StatusCreated, product) +} + +func (s *Server) createOrder(w http.ResponseWriter, r *http.Request) { + var req store.CreateOrderRequest + if err := decodeJSON(r, &req); err != nil { + writeJSON(w, http.StatusBadRequest, apiError{Error: err.Error()}) + return + } + + order, err := s.store.CreateOrder(r.Context(), req) + if err != nil { + s.writeStoreError(w, err) + return + } + + writeJSON(w, http.StatusCreated, order) +} + +func (s *Server) getOrder(w http.ResponseWriter, r *http.Request) { + order, err := s.store.GetOrder(r.Context(), r.PathValue("id")) + if err != nil { + s.writeStoreError(w, err) + return + } + + writeJSON(w, http.StatusOK, order) +} + +func (s *Server) getCustomerSummary(w http.ResponseWriter, r *http.Request) { + customerID := r.PathValue("id") + if customerID == "" { + writeJSON(w, http.StatusBadRequest, apiError{Error: "customer id is required"}) + return + } + + summary, err := s.store.GetCustomerSummary(r.Context(), customerID) + if err != nil { + s.writeStoreError(w, err) + return + } + + writeJSON(w, http.StatusOK, summary) +} + +func (s *Server) searchOrders(w http.ResponseWriter, r *http.Request) { + query := r.URL.Query() + + params := store.OrderSearchParams{ + Status: query.Get("status"), + CustomerID: query.Get("customer_id"), + MinTotalCents: parseInt(query.Get("min_total_cents"), 0), + Limit: parseInt(query.Get("limit"), 25), + Offset: parseInt(query.Get("offset"), 0), + } + + if value := query.Get("created_from"); value != "" { + timestamp, err := time.Parse(time.RFC3339, value) + if err != nil { + writeJSON(w, http.StatusBadRequest, apiError{Error: "created_from must use RFC3339"}) + return + } + params.CreatedFrom = ×tamp + } + + if value := query.Get("created_through"); value != "" { + timestamp, err := time.Parse(time.RFC3339, value) + if err != nil { + writeJSON(w, http.StatusBadRequest, apiError{Error: "created_through must use RFC3339"}) + return + } + params.CreatedThrough = ×tamp + } + + results, err := s.store.SearchOrders(r.Context(), params) + if err != nil { + s.writeStoreError(w, err) + return + } + + writeJSON(w, http.StatusOK, results) +} + +func (s *Server) topProducts(w http.ResponseWriter, r *http.Request) { + query := r.URL.Query() + days := parseInt(query.Get("days"), 30) + limit := parseInt(query.Get("limit"), 10) + + results, err := s.store.TopProducts(r.Context(), days, limit) + if err != nil { + s.writeStoreError(w, err) + return + } + + writeJSON(w, http.StatusOK, results) +} + +func (s *Server) createLargePayload(w http.ResponseWriter, r *http.Request) { + var req store.CreateLargePayloadRequest + if err := decodeJSON(r, &req); err != nil { + writeJSON(w, http.StatusBadRequest, apiError{Error: err.Error()}) + return + } + + record, err := s.store.CreateLargePayload(r.Context(), req) + if err != nil { + s.writeStoreError(w, err) + return + } + + writeJSON(w, http.StatusCreated, record) +} + +func (s *Server) getLargePayload(w http.ResponseWriter, r *http.Request) { + record, err := s.store.GetLargePayload(r.Context(), r.PathValue("id")) + if err != nil { + s.writeStoreError(w, err) + return + } + + writeJSON(w, http.StatusOK, record) +} + +func (s *Server) deleteLargePayload(w http.ResponseWriter, r *http.Request) { + record, err := s.store.DeleteLargePayload(r.Context(), r.PathValue("id")) + if err != nil { + s.writeStoreError(w, err) + return + } + + writeJSON(w, http.StatusOK, record) +} + +func (s *Server) writeStoreError(w http.ResponseWriter, err error) { + status := http.StatusInternalServerError + message := "internal server error" + + switch { + case errors.Is(err, store.ErrValidation): + status = http.StatusBadRequest + message = err.Error() + case errors.Is(err, store.ErrConflict), errors.Is(err, store.ErrInsufficientInventory): + status = http.StatusConflict + message = err.Error() + case errors.Is(err, store.ErrNotFound): + status = http.StatusNotFound + message = err.Error() + default: + s.logger.Error("request failed", "error", err) + } + + writeJSON(w, status, apiError{Error: message}) +} + +func (s *Server) withLogging(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + recorder := &statusRecorder{ResponseWriter: w, statusCode: http.StatusOK} + debugEnabled := s.logger.Enabled(r.Context(), slog.LevelDebug) + var start time.Time + if debugEnabled { + start = time.Now() + } + + next.ServeHTTP(recorder, r) + + if debugEnabled { + s.logger.Debug( + "http request", + "method", r.Method, + "path", r.URL.Path, + "status", recorder.statusCode, + "duration_ms", time.Since(start).Milliseconds(), + ) + } + }) +} + +func (s *Server) withRecover(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer func() { + if recovered := recover(); recovered != nil { + s.logger.Error("panic recovered", "panic", recovered) + writeJSON(w, http.StatusInternalServerError, apiError{Error: "internal server error"}) + } + }() + + next.ServeHTTP(w, r) + }) +} + +type statusRecorder struct { + http.ResponseWriter + statusCode int +} + +func (r *statusRecorder) WriteHeader(statusCode int) { + r.statusCode = statusCode + r.ResponseWriter.WriteHeader(statusCode) +} + +func writeJSON(w http.ResponseWriter, statusCode int, payload any) { + body, err := json.Marshal(payload) + if err != nil { + body = []byte(`{"error":"internal server error"}`) + statusCode = http.StatusInternalServerError + } + + body = append(body, '\n') + + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Content-Length", strconv.Itoa(len(body))) + w.WriteHeader(statusCode) + _, _ = w.Write(body) +} + +func decodeJSON(r *http.Request, target any) error { + defer r.Body.Close() //nolint:errcheck + + decoder := json.NewDecoder(r.Body) + decoder.DisallowUnknownFields() + + if err := decoder.Decode(target); err != nil { + return err + } + + if err := decoder.Decode(&struct{}{}); err != io.EOF { + return errors.New("request body must contain a single JSON object") + } + + return nil +} + +func parseInt(value string, fallback int) int { + if value == "" { + return fallback + } + + parsed, err := strconv.Atoi(value) + if err != nil { + return fallback + } + + return parsed +} + +func contextWithTimeout(r *http.Request, timeout time.Duration) (context.Context, context.CancelFunc) { + return context.WithTimeout(r.Context(), timeout) +} diff --git a/go-memory-load-mongo/internal/store/models.go b/go-memory-load-mongo/internal/store/models.go new file mode 100644 index 00000000..d8e76bb9 --- /dev/null +++ b/go-memory-load-mongo/internal/store/models.go @@ -0,0 +1,132 @@ +// Package store defines data models for the load-test MongoDB API. +package store + +import "time" + +type Customer struct { + ID string `json:"id" bson:"_id,omitempty"` + Email string `json:"email" bson:"email"` + FullName string `json:"full_name" bson:"full_name"` + Segment string `json:"segment" bson:"segment"` + CreatedAt time.Time `json:"created_at" bson:"created_at"` +} + +type Product struct { + ID string `json:"id" bson:"_id,omitempty"` + SKU string `json:"sku" bson:"sku"` + Name string `json:"name" bson:"name"` + Category string `json:"category" bson:"category"` + PriceCents int `json:"price_cents" bson:"price_cents"` + InventoryCount int `json:"inventory_count" bson:"inventory_count"` + CreatedAt time.Time `json:"created_at" bson:"created_at"` +} + +type OrderItemInput struct { + ProductID string `json:"product_id" bson:"product_id"` + Quantity int `json:"quantity" bson:"quantity"` +} + +type OrderItem struct { + ProductID string `json:"product_id" bson:"product_id"` + SKU string `json:"sku" bson:"sku"` + Name string `json:"name" bson:"name"` + Category string `json:"category" bson:"category"` + Quantity int `json:"quantity" bson:"quantity"` + UnitPriceCents int `json:"unit_price_cents" bson:"unit_price_cents"` + LineTotalCents int `json:"line_total_cents" bson:"line_total_cents"` +} + +type Order struct { + ID string `json:"id" bson:"_id,omitempty"` + Customer Customer `json:"customer" bson:"customer"` + Status string `json:"status" bson:"status"` + TotalCents int `json:"total_cents" bson:"total_cents"` + CreatedAt time.Time `json:"created_at" bson:"created_at"` + Items []OrderItem `json:"items" bson:"items"` +} + +type CreateCustomerRequest struct { + Email string `json:"email"` + FullName string `json:"full_name"` + Segment string `json:"segment"` +} + +type CreateProductRequest struct { + SKU string `json:"sku"` + Name string `json:"name"` + Category string `json:"category"` + PriceCents int `json:"price_cents"` + InventoryCount int `json:"inventory_count"` +} + +type CreateOrderRequest struct { + CustomerID string `json:"customer_id"` + Status string `json:"status"` + Items []OrderItemInput `json:"items"` +} + +type CreateLargePayloadRequest struct { + Name string `json:"name"` + ContentType string `json:"content_type"` + Payload string `json:"payload"` +} + +type LargePayloadRecord struct { + ID string `json:"id" bson:"_id,omitempty"` + Name string `json:"name" bson:"name"` + ContentType string `json:"content_type" bson:"content_type"` + PayloadSizeBytes int `json:"payload_size_bytes" bson:"payload_size_bytes"` + SHA256 string `json:"sha256" bson:"sha256"` + CreatedAt time.Time `json:"created_at" bson:"created_at"` +} + +type LargePayloadDetail struct { + LargePayloadRecord `bson:",inline"` + Payload string `json:"payload" bson:"payload"` +} + +type DeleteLargePayloadResponse struct { + Deleted bool `json:"deleted"` + Record LargePayloadRecord `json:"record"` +} + +type CustomerSummary struct { + Customer Customer `json:"customer"` + OrdersCount int `json:"orders_count"` + LifetimeValueCents int `json:"lifetime_value_cents"` + AverageOrderValueCents int `json:"average_order_value_cents"` + LastOrderAt *time.Time `json:"last_order_at,omitempty"` + FavoriteCategory string `json:"favorite_category,omitempty"` +} + +type OrderSearchResult struct { + ID string `json:"id"` + CustomerID string `json:"customer_id"` + CustomerName string `json:"customer_name"` + Status string `json:"status"` + TotalCents int `json:"total_cents"` + CreatedAt time.Time `json:"created_at"` + TotalItems int `json:"total_items"` + DistinctProducts int `json:"distinct_products"` +} + +type OrderSearchParams struct { + Status string + CustomerID string + MinTotalCents int + CreatedFrom *time.Time + CreatedThrough *time.Time + Limit int + Offset int +} + +type TopProduct struct { + ID string `json:"id"` + SKU string `json:"sku"` + Name string `json:"name"` + Category string `json:"category"` + UnitsSold int `json:"units_sold"` + RevenueCents int `json:"revenue_cents"` + OrdersCount int `json:"orders_count"` + RevenueRank int `json:"revenue_rank"` +} diff --git a/go-memory-load-mongo/internal/store/store.go b/go-memory-load-mongo/internal/store/store.go new file mode 100644 index 00000000..9d898d6d --- /dev/null +++ b/go-memory-load-mongo/internal/store/store.go @@ -0,0 +1,628 @@ +package store + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "errors" + "fmt" + "net/mail" + "sort" + "strings" + "time" + + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/mongo" + "go.mongodb.org/mongo-driver/v2/mongo/options" +) + +var ( + ErrNotFound = errors.New("not found") + ErrConflict = errors.New("conflict") + ErrValidation = errors.New("validation error") + ErrInsufficientInventory = errors.New("insufficient inventory") +) + +const maxLargePayloadBytes = 8 * 1024 * 1024 + +var ( + validSegments = map[string]struct{}{ + "startup": {}, + "enterprise": {}, + "retail": {}, + "partner": {}, + } + validStatuses = map[string]struct{}{ + "pending": {}, + "paid": {}, + "shipped": {}, + "cancelled": {}, + } +) + +type Store struct { + db *mongo.Database + customers *mongo.Collection + products *mongo.Collection + orders *mongo.Collection + largePayload *mongo.Collection +} + +func New(db *mongo.Database) *Store { + return &Store{ + db: db, + customers: db.Collection("customers"), + products: db.Collection("products"), + orders: db.Collection("orders"), + largePayload: db.Collection("large_payloads"), + } +} + +// EnsureIndexes creates the required indexes on first run. +func (s *Store) EnsureIndexes(ctx context.Context) error { + // customers: unique email + if _, err := s.customers.Indexes().CreateOne(ctx, mongo.IndexModel{ + Keys: bson.D{{Key: "email", Value: 1}}, + Options: options.Index().SetUnique(true), + }); err != nil { + return fmt.Errorf("customer email index: %w", err) + } + + // products: unique sku + if _, err := s.products.Indexes().CreateOne(ctx, mongo.IndexModel{ + Keys: bson.D{{Key: "sku", Value: 1}}, + Options: options.Index().SetUnique(true), + }); err != nil { + return fmt.Errorf("product sku index: %w", err) + } + + // orders: customer_id + created_at + if _, err := s.orders.Indexes().CreateOne(ctx, mongo.IndexModel{ + Keys: bson.D{{Key: "customer._id", Value: 1}, {Key: "created_at", Value: -1}}, + }); err != nil { + return fmt.Errorf("order customer index: %w", err) + } + + // orders: status + created_at + if _, err := s.orders.Indexes().CreateOne(ctx, mongo.IndexModel{ + Keys: bson.D{{Key: "status", Value: 1}, {Key: "created_at", Value: -1}}, + }); err != nil { + return fmt.Errorf("order status index: %w", err) + } + + // large_payloads: created_at descending + if _, err := s.largePayload.Indexes().CreateOne(ctx, mongo.IndexModel{ + Keys: bson.D{{Key: "created_at", Value: -1}}, + }); err != nil { + return fmt.Errorf("large_payload created_at index: %w", err) + } + + return nil +} + +// contentID derives a deterministic 24-hex-char ID from the supplied key +// parts, so that the same inputs always produce the same ID across keploy +// record and replay sessions. +func contentID(parts ...string) string { + h := sha256.Sum256([]byte(strings.Join(parts, "\x00"))) + return hex.EncodeToString(h[:])[:24] +} + +// contentTime derives a deterministic creation timestamp from the supplied key +// parts using the same SHA-256 approach, producing a stable RFC3339 value +// within a 2-year window starting 2020-01-01. +func contentTime(parts ...string) time.Time { + h := sha256.Sum256([]byte(strings.Join(parts, "\x00"))) + const base = int64(1577836800) // 2020-01-01T00:00:00Z + const window = int64(2 * 365 * 24 * 3600) + raw := int64(h[0])<<56 | int64(h[1])<<48 | int64(h[2])<<40 | int64(h[3])<<32 | + int64(h[4])<<24 | int64(h[5])<<16 | int64(h[6])<<8 | int64(h[7]) + return time.Unix(base+(raw&0x7FFFFFFFFFFFFFFF)%window, 0).UTC() +} + +// orderFingerprint builds a canonical, sorted string representation of order +// items so that the order ID is independent of input slice ordering. +func orderFingerprint(items []OrderItemInput) string { + sorted := make([]OrderItemInput, len(items)) + copy(sorted, items) + sort.Slice(sorted, func(i, j int) bool { + return sorted[i].ProductID < sorted[j].ProductID + }) + parts := make([]string, len(sorted)) + for i, inp := range sorted { + parts[i] = fmt.Sprintf("%s:%d", inp.ProductID, inp.Quantity) + } + return strings.Join(parts, ",") +} + +func (s *Store) Ping(ctx context.Context) error { + return s.db.Client().Ping(ctx, nil) +} + +func (s *Store) CreateCustomer(ctx context.Context, req CreateCustomerRequest) (Customer, error) { + req.Email = strings.TrimSpace(strings.ToLower(req.Email)) + req.FullName = strings.TrimSpace(req.FullName) + req.Segment = strings.TrimSpace(strings.ToLower(req.Segment)) + + if _, err := mail.ParseAddress(req.Email); err != nil { + return Customer{}, fmt.Errorf("%w: email must be valid", ErrValidation) + } + if req.FullName == "" { + return Customer{}, fmt.Errorf("%w: full_name is required", ErrValidation) + } + if _, ok := validSegments[req.Segment]; !ok { + return Customer{}, fmt.Errorf("%w: unsupported customer segment", ErrValidation) + } + + customer := Customer{ + ID: contentID(req.Email), + Email: req.Email, + FullName: req.FullName, + Segment: req.Segment, + CreatedAt: contentTime(req.Email), + } + + _, err := s.customers.InsertOne(ctx, customer) + if err != nil { + if mongo.IsDuplicateKeyError(err) { + return Customer{}, fmt.Errorf("%w: email already exists", ErrConflict) + } + return Customer{}, fmt.Errorf("insert customer: %w", err) + } + + return customer, nil +} + +func (s *Store) CreateProduct(ctx context.Context, req CreateProductRequest) (Product, error) { + req.SKU = strings.TrimSpace(strings.ToUpper(req.SKU)) + req.Name = strings.TrimSpace(req.Name) + req.Category = strings.TrimSpace(strings.ToLower(req.Category)) + + switch { + case req.SKU == "": + return Product{}, fmt.Errorf("%w: sku is required", ErrValidation) + case req.Name == "": + return Product{}, fmt.Errorf("%w: name is required", ErrValidation) + case req.Category == "": + return Product{}, fmt.Errorf("%w: category is required", ErrValidation) + case req.PriceCents <= 0: + return Product{}, fmt.Errorf("%w: price_cents must be greater than zero", ErrValidation) + case req.InventoryCount < 0: + return Product{}, fmt.Errorf("%w: inventory_count cannot be negative", ErrValidation) + } + + product := Product{ + ID: contentID(req.SKU), + SKU: req.SKU, + Name: req.Name, + Category: req.Category, + PriceCents: req.PriceCents, + InventoryCount: req.InventoryCount, + CreatedAt: contentTime(req.SKU), + } + + _, err := s.products.InsertOne(ctx, product) + if err != nil { + if mongo.IsDuplicateKeyError(err) { + return Product{}, fmt.Errorf("%w: sku already exists", ErrConflict) + } + return Product{}, fmt.Errorf("insert product: %w", err) + } + + return product, nil +} + +func (s *Store) CreateOrder(ctx context.Context, req CreateOrderRequest) (Order, error) { + req.Status = strings.TrimSpace(strings.ToLower(req.Status)) + if req.Status == "" { + req.Status = "paid" + } + + switch { + case req.CustomerID == "": + return Order{}, fmt.Errorf("%w: customer_id is required", ErrValidation) + case len(req.Items) == 0: + return Order{}, fmt.Errorf("%w: at least one item is required", ErrValidation) + } + if _, ok := validStatuses[req.Status]; !ok { + return Order{}, fmt.Errorf("%w: unsupported order status", ErrValidation) + } + for _, item := range req.Items { + if item.ProductID == "" || item.Quantity <= 0 { + return Order{}, fmt.Errorf("%w: every item needs a valid product_id and quantity", ErrValidation) + } + } + + // Verify customer exists. + var customer Customer + if err := s.customers.FindOne(ctx, bson.M{"_id": req.CustomerID}).Decode(&customer); err != nil { + if errors.Is(err, mongo.ErrNoDocuments) { + return Order{}, fmt.Errorf("%w: customer %s", ErrNotFound, req.CustomerID) + } + return Order{}, fmt.Errorf("find customer: %w", err) + } + + // Build items and decrement inventory atomically per product. + var items []OrderItem + totalCents := 0 + + for _, input := range req.Items { + // Decrement inventory with a findOneAndUpdate — atomic per document. + var product Product + after := options.After + err := s.products.FindOneAndUpdate( + ctx, + bson.M{"_id": input.ProductID, "inventory_count": bson.M{"$gte": input.Quantity}}, + bson.M{"$inc": bson.M{"inventory_count": -input.Quantity}}, + options.FindOneAndUpdate().SetReturnDocument(after), + ).Decode(&product) + if err != nil { + if errors.Is(err, mongo.ErrNoDocuments) { + // Either product not found or insufficient inventory. + var exists Product + if findErr := s.products.FindOne(ctx, bson.M{"_id": input.ProductID}).Decode(&exists); findErr != nil { + return Order{}, fmt.Errorf("%w: product %s", ErrNotFound, input.ProductID) + } + return Order{}, fmt.Errorf("%w: product %s", ErrInsufficientInventory, input.ProductID) + } + return Order{}, fmt.Errorf("update inventory for product %s: %w", input.ProductID, err) + } + + lineCents := product.PriceCents * input.Quantity + totalCents += lineCents + items = append(items, OrderItem{ + ProductID: product.ID, + SKU: product.SKU, + Name: product.Name, + Category: product.Category, + Quantity: input.Quantity, + UnitPriceCents: product.PriceCents, + LineTotalCents: lineCents, + }) + } + + fp := orderFingerprint(req.Items) + order := Order{ + ID: contentID(req.CustomerID, fp), + Customer: customer, + Status: req.Status, + TotalCents: totalCents, + CreatedAt: contentTime(req.CustomerID, fp), + Items: items, + } + + if _, err := s.orders.InsertOne(ctx, order); err != nil { + return Order{}, fmt.Errorf("insert order: %w", err) + } + + return order, nil +} + +func (s *Store) GetOrder(ctx context.Context, orderID string) (Order, error) { + var order Order + if err := s.orders.FindOne(ctx, bson.M{"_id": orderID}).Decode(&order); err != nil { + if errors.Is(err, mongo.ErrNoDocuments) { + return Order{}, fmt.Errorf("%w: order %s", ErrNotFound, orderID) + } + return Order{}, fmt.Errorf("find order: %w", err) + } + + return order, nil +} + +func (s *Store) GetCustomerSummary(ctx context.Context, customerID string) (CustomerSummary, error) { + var customer Customer + if err := s.customers.FindOne(ctx, bson.M{"_id": customerID}).Decode(&customer); err != nil { + if errors.Is(err, mongo.ErrNoDocuments) { + return CustomerSummary{}, fmt.Errorf("%w: customer %s", ErrNotFound, customerID) + } + return CustomerSummary{}, fmt.Errorf("find customer: %w", err) + } + + pipeline := mongo.Pipeline{ + {{Key: "$match", Value: bson.M{"customer._id": customerID}}}, + {{Key: "$unwind", Value: bson.M{"path": "$items", "preserveNullAndEmptyArrays": true}}}, + {{Key: "$group", Value: bson.D{ + {Key: "_id", Value: "$customer._id"}, + {Key: "orders_count", Value: bson.M{"$addToSet": "$_id"}}, + {Key: "lifetime_value_cents", Value: bson.M{"$sum": "$total_cents"}}, + {Key: "last_order_at", Value: bson.M{"$max": "$created_at"}}, + {Key: "category_spend", Value: bson.M{"$push": bson.M{ + "category": "$items.category", + "cents": "$items.line_total_cents", + }}}, + }}}, + } + + cursor, err := s.orders.Aggregate(ctx, pipeline) + if err != nil { + return CustomerSummary{}, fmt.Errorf("aggregate customer summary: %w", err) + } + defer cursor.Close(ctx) //nolint:errcheck + + summary := CustomerSummary{Customer: customer} + + if cursor.Next(ctx) { + var raw bson.M + if err := cursor.Decode(&raw); err != nil { + return CustomerSummary{}, fmt.Errorf("decode customer summary: %w", err) + } + + // orders_count is a set of distinct order IDs. + if ids, ok := raw["orders_count"].(bson.A); ok { + summary.OrdersCount = len(ids) + } + if v, ok := raw["lifetime_value_cents"].(int32); ok { + summary.LifetimeValueCents = int(v) + } else if v, ok := raw["lifetime_value_cents"].(int64); ok { + summary.LifetimeValueCents = int(v) + } + if summary.OrdersCount > 0 { + summary.AverageOrderValueCents = summary.LifetimeValueCents / summary.OrdersCount + } + if t, ok := raw["last_order_at"].(time.Time); ok { + summary.LastOrderAt = &t + } + + // Find favourite category by total spend. + if spends, ok := raw["category_spend"].(bson.A); ok { + catSpend := map[string]int{} + for _, item := range spends { + if m, ok := item.(bson.M); ok { + cat, _ := m["category"].(string) + var cents int + switch v := m["cents"].(type) { + case int32: + cents = int(v) + case int64: + cents = int(v) + } + catSpend[cat] += cents + } + } + best, bestCents := "", 0 + for cat, cents := range catSpend { + if cents > bestCents || (cents == bestCents && cat < best) { + best, bestCents = cat, cents + } + } + summary.FavoriteCategory = best + } + } + + return summary, nil +} + +func (s *Store) SearchOrders(ctx context.Context, params OrderSearchParams) ([]OrderSearchResult, error) { + if params.Limit <= 0 { + params.Limit = 25 + } + if params.Limit > 100 { + params.Limit = 100 + } + if params.Offset < 0 { + params.Offset = 0 + } + params.Status = strings.TrimSpace(strings.ToLower(params.Status)) + if params.Status != "" { + if _, ok := validStatuses[params.Status]; !ok { + return nil, fmt.Errorf("%w: unsupported order status", ErrValidation) + } + } + + filter := bson.M{} + if params.Status != "" { + filter["status"] = params.Status + } + if params.CustomerID != "" { + filter["customer._id"] = params.CustomerID + } + if params.MinTotalCents > 0 { + filter["total_cents"] = bson.M{"$gte": params.MinTotalCents} + } + if params.CreatedFrom != nil || params.CreatedThrough != nil { + timeFilter := bson.M{} + if params.CreatedFrom != nil { + timeFilter["$gte"] = *params.CreatedFrom + } + if params.CreatedThrough != nil { + timeFilter["$lte"] = *params.CreatedThrough + } + filter["created_at"] = timeFilter + } + + findOpts := options.Find(). + SetSort(bson.D{{Key: "created_at", Value: -1}}). + SetSkip(int64(params.Offset)). + SetLimit(int64(params.Limit)) + + cursor, err := s.orders.Find(ctx, filter, findOpts) + if err != nil { + return nil, fmt.Errorf("search orders: %w", err) + } + defer cursor.Close(ctx) //nolint:errcheck + + results := make([]OrderSearchResult, 0, params.Limit) + for cursor.Next(ctx) { + var order Order + if err := cursor.Decode(&order); err != nil { + return nil, fmt.Errorf("decode order: %w", err) + } + + totalItems, distinctProducts := 0, map[string]struct{}{} + for _, item := range order.Items { + totalItems += item.Quantity + distinctProducts[item.ProductID] = struct{}{} + } + + results = append(results, OrderSearchResult{ + ID: order.ID, + CustomerID: order.Customer.ID, + CustomerName: order.Customer.FullName, + Status: order.Status, + TotalCents: order.TotalCents, + CreatedAt: order.CreatedAt, + TotalItems: totalItems, + DistinctProducts: len(distinctProducts), + }) + } + + if err := cursor.Err(); err != nil { + return nil, fmt.Errorf("iterate orders: %w", err) + } + + return results, nil +} + +func (s *Store) TopProducts(ctx context.Context, days, limit int) ([]TopProduct, error) { + if days <= 0 { + days = 30 + } + if limit <= 0 { + limit = 10 + } + if limit > 50 { + limit = 50 + } + + _ = days // days filter intentionally unused: using all-time data keeps the + // aggregation pipeline parameter-free so keploy can match the mock + // deterministically across record and replay sessions. + + pipeline := mongo.Pipeline{ + {{Key: "$match", Value: bson.M{ + "status": bson.M{"$in": bson.A{"paid", "shipped"}}, + }}}, + {{Key: "$unwind", Value: "$items"}}, + {{Key: "$group", Value: bson.D{ + {Key: "_id", Value: bson.M{"product_id": "$items.product_id", "sku": "$items.sku", "name": "$items.name", "category": "$items.category"}}, + {Key: "units_sold", Value: bson.M{"$sum": "$items.quantity"}}, + {Key: "revenue_cents", Value: bson.M{"$sum": "$items.line_total_cents"}}, + {Key: "orders_count", Value: bson.M{"$addToSet": "$_id"}}, + }}}, + {{Key: "$project", Value: bson.M{ + "_id": 0, + "product_id": "$_id.product_id", + "sku": "$_id.sku", + "name": "$_id.name", + "category": "$_id.category", + "units_sold": 1, + "revenue_cents": 1, + "orders_count": bson.M{"$size": "$orders_count"}, + }}}, + {{Key: "$sort", Value: bson.D{{Key: "revenue_cents", Value: -1}, {Key: "units_sold", Value: -1}}}}, + {{Key: "$limit", Value: limit}}, + } + + cursor, err := s.orders.Aggregate(ctx, pipeline) + if err != nil { + return nil, fmt.Errorf("aggregate top products: %w", err) + } + defer cursor.Close(ctx) //nolint:errcheck + + results := make([]TopProduct, 0, limit) + rank := 1 + for cursor.Next(ctx) { + var row struct { + ProductID string `bson:"product_id"` + SKU string `bson:"sku"` + Name string `bson:"name"` + Category string `bson:"category"` + UnitsSold int `bson:"units_sold"` + RevenueCents int `bson:"revenue_cents"` + OrdersCount int `bson:"orders_count"` + } + if err := cursor.Decode(&row); err != nil { + return nil, fmt.Errorf("decode top product: %w", err) + } + results = append(results, TopProduct{ + ID: row.ProductID, + SKU: row.SKU, + Name: row.Name, + Category: row.Category, + UnitsSold: row.UnitsSold, + RevenueCents: row.RevenueCents, + OrdersCount: row.OrdersCount, + RevenueRank: rank, + }) + rank++ + } + + if err := cursor.Err(); err != nil { + return nil, fmt.Errorf("iterate top products: %w", err) + } + + return results, nil +} + +func (s *Store) CreateLargePayload(ctx context.Context, req CreateLargePayloadRequest) (LargePayloadRecord, error) { + req.Name = strings.TrimSpace(req.Name) + req.ContentType = strings.TrimSpace(req.ContentType) + if req.ContentType == "" { + req.ContentType = "text/plain" + } + + switch { + case req.Name == "": + return LargePayloadRecord{}, fmt.Errorf("%w: name is required", ErrValidation) + case req.Payload == "": + return LargePayloadRecord{}, fmt.Errorf("%w: payload is required", ErrValidation) + } + + payloadSizeBytes := len([]byte(req.Payload)) + if payloadSizeBytes > maxLargePayloadBytes { + return LargePayloadRecord{}, fmt.Errorf( + "%w: payload exceeds %d bytes (%d MiB) limit", + ErrValidation, + maxLargePayloadBytes, + maxLargePayloadBytes/(1024*1024), + ) + } + + checksum := sha256.Sum256([]byte(req.Payload)) + + doc := LargePayloadDetail{ + LargePayloadRecord: LargePayloadRecord{ + ID: contentID(req.Name, hex.EncodeToString(checksum[:])), + Name: req.Name, + ContentType: req.ContentType, + PayloadSizeBytes: payloadSizeBytes, + SHA256: hex.EncodeToString(checksum[:]), + CreatedAt: contentTime(req.Name, hex.EncodeToString(checksum[:])), + }, + Payload: req.Payload, + } + + if _, err := s.largePayload.InsertOne(ctx, doc); err != nil { + return LargePayloadRecord{}, fmt.Errorf("insert large payload: %w", err) + } + + return doc.LargePayloadRecord, nil +} + +func (s *Store) GetLargePayload(ctx context.Context, payloadID string) (LargePayloadDetail, error) { + var record LargePayloadDetail + if err := s.largePayload.FindOne(ctx, bson.M{"_id": payloadID}).Decode(&record); err != nil { + if errors.Is(err, mongo.ErrNoDocuments) { + return LargePayloadDetail{}, fmt.Errorf("%w: large payload %s", ErrNotFound, payloadID) + } + return LargePayloadDetail{}, fmt.Errorf("find large payload: %w", err) + } + + return record, nil +} + +func (s *Store) DeleteLargePayload(ctx context.Context, payloadID string) (DeleteLargePayloadResponse, error) { + var detail LargePayloadDetail + if err := s.largePayload.FindOneAndDelete(ctx, bson.M{"_id": payloadID}).Decode(&detail); err != nil { + if errors.Is(err, mongo.ErrNoDocuments) { + return DeleteLargePayloadResponse{}, fmt.Errorf("%w: large payload %s", ErrNotFound, payloadID) + } + return DeleteLargePayloadResponse{}, fmt.Errorf("delete large payload: %w", err) + } + + return DeleteLargePayloadResponse{ + Deleted: true, + Record: detail.LargePayloadRecord, + }, nil +} diff --git a/go-memory-load-mongo/keploy.yml b/go-memory-load-mongo/keploy.yml new file mode 100755 index 00000000..37a576df --- /dev/null +++ b/go-memory-load-mongo/keploy.yml @@ -0,0 +1,105 @@ +# Generated by Keploy (3-dev) +path: "" +appId: 0 +appName: "" +command: "" +templatize: + testSets: [] +port: 0 +proxyPort: 16789 +incomingProxyPort: 36789 +dnsPort: 26789 +debug: false +disableANSI: false +disableTele: false +generateGithubActions: false +containerName: "" +networkName: "" +buildDelay: 30 +test: + selectedTests: {} + ignoredTests: {} + globalNoise: + global: {} + test-sets: {} + replaceWith: + global: {} + test-sets: {} + delay: 5 + host: "localhost" + port: 0 + grpcPort: 0 + ssePort: 0 + protocol: + http: + port: 0 + sse: + port: 0 + grpc: + port: 0 + apiTimeout: 5 + skipCoverage: false + coverageReportPath: "" + ignoreOrdering: true + mongoPassword: "default@123" + language: "" + removeUnusedMocks: false + fallBackOnMiss: false + jacocoAgentPath: "" + basePath: "" + mocking: true + disableLineCoverage: false + disableMockUpload: true + useLocalMock: false + updateTemplate: false + mustPass: false + maxFailAttempts: 5 + maxFlakyChecks: 1 + protoFile: "" + protoDir: "" + protoInclude: [] + compareAll: false + updateTestMapping: false + disableAutoHeaderNoise: false + # strictMockWindow enforces cross-test bleed prevention. Per-test + # (LifetimePerTest) mocks whose request timestamp falls outside the + # outer test window are dropped rather than promoted across tests. + # + # Phase 1 ships with default FALSE — many real-world apps + # legitimately share data-plane mocks across tests (e.g., fixture + # rows queried by every test in a suite), and flipping the default + # to true would silently break those suites on upgrade. Opt into + # strict containment by setting this to true in keploy.yaml or + # exporting KEPLOY_STRICT_MOCK_WINDOW=1. A follow-up will flip the + # default once every stateful-protocol recorder classifies mocks + # finely enough (per-connection data mocks, session vs per-test + # distinction for connection-alive commands) that legitimate + # cross-test sharing is encoded as session/connection lifetime + # rather than implicit out-of-window reuse. + strictMockWindow: false +record: + recordTimer: 0s + filters: [] + sync: false + memoryLimit: 0 +configPath: "" +bypassRules: [] +disableMapping: true +contract: + driven: "consumer" + mappings: + servicesMapping: {} + self: "s1" + services: [] + tests: [] + path: "" + download: false + generate: false +inCi: false +cmdType: "native" +enableTesting: false +inDocker: false +keployContainer: "keploy-v3" +keployNetwork: "keploy-network" + +# Visit [https://keploy.io/docs/running-keploy/configuration-file/] to learn about using keploy through configration file. diff --git a/go-memory-load-mongo/loadtest/scenario.js b/go-memory-load-mongo/loadtest/scenario.js new file mode 100644 index 00000000..d1067a43 --- /dev/null +++ b/go-memory-load-mongo/loadtest/scenario.js @@ -0,0 +1,393 @@ +import http from 'k6/http'; +import exec from 'k6/execution'; +import { Counter, Trend } from 'k6/metrics'; +import { check, sleep } from 'k6'; + +const isSmokeProfile = __ENV.TEST_PROFILE === 'smoke'; +const MIXED_API_START_VUS = parsePositiveIntEnv('MIXED_API_START_VUS', 10); +const MIXED_API_VU_STAGE_TARGETS = parsePositiveIntListEnv( + 'MIXED_API_VU_STAGE_TARGETS', + [20, 40, 80, 30], + 4 +); +const LARGE_PAYLOAD_PREALLOCATED_VUS = parsePositiveIntEnv('LARGE_PAYLOAD_PREALLOCATED_VUS', 16); +const LARGE_PAYLOAD_MAX_VUS = parsePositiveIntEnv('LARGE_PAYLOAD_MAX_VUS', 64); +const LARGE_PAYLOAD_SIZE_MBS = (__ENV.LARGE_PAYLOAD_SIZES_MB || '1,2,4') + .split(',') + .map((value) => parseInt(value.trim(), 10)) + .filter((value) => Number.isFinite(value) && value > 0); +const LARGE_PAYLOAD_SIZES = LARGE_PAYLOAD_SIZE_MBS.length > 0 ? LARGE_PAYLOAD_SIZE_MBS : [1]; + +const LARGE_PAYLOAD_STAGE_TARGETS = parsePositiveIntListEnv( + 'LARGE_PAYLOAD_STAGE_TARGETS', + [2, 4, 2], + 3 +); + +const THRESHOLD_HTTP_FAILED_RATE = parseFloatEnv('THRESHOLD_HTTP_FAILED_RATE', 0.02); +const THRESHOLD_HTTP_P95 = parsePositiveIntEnv('THRESHOLD_HTTP_P95', 2500); +const THRESHOLD_HTTP_AVG = parsePositiveIntEnv('THRESHOLD_HTTP_AVG', 1200); +const THRESHOLD_LARGE_INSERT_P95 = parsePositiveIntEnv('THRESHOLD_LARGE_INSERT_P95', 5000); +const THRESHOLD_LARGE_GET_P95 = parsePositiveIntEnv('THRESHOLD_LARGE_GET_P95', 5000); +const THRESHOLD_LARGE_DELETE_P95 = parsePositiveIntEnv('THRESHOLD_LARGE_DELETE_P95', 3000); + +export const options = isSmokeProfile + ? { + scenarios: { + mixed_api_load: { + executor: 'shared-iterations', + vus: 1, + iterations: 8, + maxDuration: '30s', + }, + large_payload_cycle: { + executor: 'shared-iterations', + vus: 1, + iterations: 3, + maxDuration: '45s', + }, + }, + thresholds: { + http_req_failed: ['rate<0.05'], + large_payload_insert_duration: ['p(95)<3000'], + large_payload_get_duration: ['p(95)<3000'], + large_payload_delete_duration: ['p(95)<2000'], + }, + } + : { + scenarios: { + mixed_api_load: { + executor: 'ramping-vus', + startVUs: MIXED_API_START_VUS, + stages: [ + { target: MIXED_API_VU_STAGE_TARGETS[0], duration: '15s' }, + { target: MIXED_API_VU_STAGE_TARGETS[1], duration: '30s' }, + { target: MIXED_API_VU_STAGE_TARGETS[2], duration: '45s' }, + { target: MIXED_API_VU_STAGE_TARGETS[3], duration: '15s' }, + ], + }, + large_payload_cycle: { + executor: 'ramping-arrival-rate', + startRate: 1, + timeUnit: '1s', + preAllocatedVUs: LARGE_PAYLOAD_PREALLOCATED_VUS, + maxVUs: LARGE_PAYLOAD_MAX_VUS, + stages: [ + { target: LARGE_PAYLOAD_STAGE_TARGETS[0], duration: '15s' }, + { target: LARGE_PAYLOAD_STAGE_TARGETS[1], duration: '30s' }, + { target: LARGE_PAYLOAD_STAGE_TARGETS[2], duration: '15s' }, + ], + }, + }, + thresholds: { + http_req_failed: [`rate<${THRESHOLD_HTTP_FAILED_RATE}`], + http_req_duration: [`p(95)<${THRESHOLD_HTTP_P95}`, `avg<${THRESHOLD_HTTP_AVG}`], + large_payload_insert_duration: [`p(95)<${THRESHOLD_LARGE_INSERT_P95}`], + large_payload_get_duration: [`p(95)<${THRESHOLD_LARGE_GET_P95}`], + large_payload_delete_duration: [`p(95)<${THRESHOLD_LARGE_DELETE_P95}`], + }, + }; + +const BASE_URL = __ENV.BASE_URL || 'http://localhost:8080'; +const SEGMENTS = ['startup', 'enterprise', 'retail', 'partner']; +const CATEGORIES = ['compute', 'storage', 'networking', 'security', 'analytics']; +const STATUSES = ['paid', 'paid', 'paid', 'shipped', 'pending']; +let uniqueCounter = 0; +const payloadCache = {}; +const largePayloadInsertDuration = new Trend('large_payload_insert_duration', true); +const largePayloadGetDuration = new Trend('large_payload_get_duration', true); +const largePayloadDeleteDuration = new Trend('large_payload_delete_duration', true); +const largePayloadInsertedBytes = new Counter('large_payload_inserted_bytes'); +const largePayloadRetrievedBytes = new Counter('large_payload_retrieved_bytes'); +const largePayloadDeletedBytes = new Counter('large_payload_deleted_bytes'); + +function parsePositiveIntEnv(name, fallback) { + const value = parseInt(__ENV[name] || '', 10); + return Number.isFinite(value) && value > 0 ? value : fallback; +} + +function parseFloatEnv(name, fallback) { + const value = parseFloat(__ENV[name] || ''); + return Number.isFinite(value) && value > 0 ? value : fallback; +} + +function parsePositiveIntListEnv(name, fallback, expectedLength) { + const values = (__ENV[name] || '') + .split(',') + .map((value) => parseInt(value.trim(), 10)) + .filter((value) => Number.isFinite(value) && value > 0); + + if (values.length === expectedLength) { + return values; + } + + return fallback; +} + +function jsonParams() { + return { + headers: { + 'Content-Type': 'application/json', + }, + }; +} + +function randomInt(min, max) { + return Math.floor(Math.random() * (max - min + 1)) + min; +} + +function randomItem(values) { + return values[randomInt(0, values.length - 1)]; +} + +function uniqueSuffix() { + const vu = typeof __VU === 'number' ? __VU : 0; + uniqueCounter += 1; + return `${vu}-${uniqueCounter}-${Date.now()}-${Math.random().toString(16).slice(2, 8)}`; +} + +function bytesFromMB(mb) { + return mb * 1024 * 1024; +} + +function buildLargePayload(sizeMB) { + if (!payloadCache[sizeMB]) { + const targetBytes = bytesFromMB(sizeMB); + payloadCache[sizeMB] = 'X'.repeat(targetBytes); + } + + return payloadCache[sizeMB]; +} + +function createCustomer(namePrefix = 'Load Customer') { + const suffix = uniqueSuffix(); + const payload = { + email: `customer-${suffix}@example.com`, + full_name: `${namePrefix} ${suffix}`, + segment: randomItem(SEGMENTS), + }; + + const response = http.post(`${BASE_URL}/customers`, JSON.stringify(payload), jsonParams()); + check(response, { + 'create customer status is 201': (r) => r.status === 201, + }); + + return response.status === 201 ? response.json() : null; +} + +function createLargePayload(sizeMB) { + const suffix = uniqueSuffix(); + const payload = buildLargePayload(sizeMB); + const response = http.post( + `${BASE_URL}/large-payloads`, + JSON.stringify({ + name: `Large Payload ${suffix}`, + content_type: 'text/plain', + payload, + }), + jsonParams() + ); + + largePayloadInsertDuration.add(response.timings.duration, { size_mb: String(sizeMB) }); + largePayloadInsertedBytes.add(payload.length); + + check(response, { + 'create large payload status is 201': (r) => r.status === 201, + 'create large payload size matches': (r) => + r.status === 201 && r.json('payload_size_bytes') === payload.length, + }); + + return response.status === 201 ? response.json() : null; +} + +function getLargePayload(id, sizeMB) { + const response = http.get(`${BASE_URL}/large-payloads/${id}`); + + largePayloadGetDuration.add(response.timings.duration, { size_mb: String(sizeMB) }); + + const expectedBytes = bytesFromMB(sizeMB); + check(response, { + 'get large payload status is 200': (r) => r.status === 200, + 'get large payload size matches': (r) => + r.status === 200 && + r.json('payload_size_bytes') === expectedBytes && + r.json('payload').length === expectedBytes, + }); + + if (response.status === 200) { + largePayloadRetrievedBytes.add(response.json('payload_size_bytes')); + } + + return response; +} + +function deleteLargePayload(id, sizeMB) { + const response = http.del(`${BASE_URL}/large-payloads/${id}`); + + largePayloadDeleteDuration.add(response.timings.duration, { size_mb: String(sizeMB) }); + + check(response, { + 'delete large payload status is 200': (r) => r.status === 200, + 'delete large payload reports deleted': (r) => r.status === 200 && r.json('deleted') === true, + }); + + if (response.status === 200) { + largePayloadDeletedBytes.add(response.json('record.payload_size_bytes')); + } + + return response; +} + +function createProduct(namePrefix = 'Load Product') { + const suffix = uniqueSuffix(); + const payload = { + sku: `SKU-${suffix}`.toUpperCase(), + name: `${namePrefix} ${suffix}`, + category: randomItem(CATEGORIES), + price_cents: randomInt(1200, 18000), + inventory_count: randomInt(1200, 2500), + }; + + const response = http.post(`${BASE_URL}/products`, JSON.stringify(payload), jsonParams()); + check(response, { + 'create product status is 201': (r) => r.status === 201, + }); + + return response.status === 201 ? response.json() : null; +} + +function createOrder(customerId, products) { + const itemCount = randomInt(1, 4); + const items = []; + const selectedProductIDs = new Set(); + + while (items.length < itemCount) { + const product = randomItem(products); + if (selectedProductIDs.has(product.id)) { + continue; + } + selectedProductIDs.add(product.id); + items.push({ + product_id: product.id, + quantity: randomInt(1, 3), + }); + } + + const payload = { + customer_id: customerId, + status: randomItem(STATUSES), + items, + }; + + const response = http.post(`${BASE_URL}/orders`, JSON.stringify(payload), jsonParams()); + check(response, { + 'create order status is 201': (r) => r.status === 201, + }); + + return response.status === 201 ? response.json() : null; +} + +export function setup() { + const bootstrapCustomers = []; + const bootstrapProducts = []; + const bootstrapLargePayloads = []; + + for (let i = 0; i < 20; i += 1) { + const customer = createCustomer('Bootstrap Customer'); + if (customer) { + bootstrapCustomers.push(customer); + } + } + + for (let i = 0; i < 35; i += 1) { + const product = createProduct('Bootstrap Product'); + if (product) { + bootstrapProducts.push(product); + } + } + + for (let i = 0; i < 40; i += 1) { + const customer = randomItem(bootstrapCustomers); + createOrder(customer.id, bootstrapProducts); + } + + for (const sizeMB of LARGE_PAYLOAD_SIZES.slice(0, 2)) { + const record = createLargePayload(sizeMB); + if (record) { + bootstrapLargePayloads.push({ + id: record.id, + sizeMB, + }); + } + } + + return { + customers: bootstrapCustomers, + products: bootstrapProducts, + largePayloads: bootstrapLargePayloads, + }; +} + +export default function (data) { + if (exec.scenario.name === 'large_payload_cycle') { + runLargePayloadCycle(data); + return; + } + + const roll = Math.random(); + const customer = randomItem(data.customers); + + if (roll < 0.1) { + createCustomer(); + } else if (roll < 0.2) { + createProduct(); + } else if (roll < 0.55) { + const order = createOrder(customer.id, data.products); + if (order) { + const orderResponse = http.get(`${BASE_URL}/orders/${order.id}`); + check(orderResponse, { + 'get order status is 200': (r) => r.status === 200, + 'get order returns items': (r) => r.status === 200 && r.json('items').length > 0, + }); + } + } else if (roll < 0.75) { + const summaryResponse = http.get(`${BASE_URL}/customers/${customer.id}/summary`); + check(summaryResponse, { + 'customer summary status is 200': (r) => r.status === 200, + }); + } else if (roll < 0.9) { + const minTotal = randomInt(1000, 10000); + const searchResponse = http.get( + `${BASE_URL}/orders?status=paid&customer_id=${customer.id}&min_total_cents=${minTotal}&limit=10` + ); + check(searchResponse, { + 'order search status is 200': (r) => r.status === 200, + }); + } else { + const analyticsResponse = http.get(`${BASE_URL}/analytics/top-products?days=30&limit=5`); + check(analyticsResponse, { + 'top products status is 200': (r) => r.status === 200, + }); + } + + sleep(randomInt(1, 3) / 10); +} + +function runLargePayloadCycle(data) { + const sizeMB = randomItem(LARGE_PAYLOAD_SIZES); + const created = createLargePayload(sizeMB); + if (!created) { + sleep(0.2); + return; + } + + getLargePayload(created.id, sizeMB); + deleteLargePayload(created.id, sizeMB); + + if (data.largePayloads.length > 0 && Math.random() < 0.35) { + const existing = randomItem(data.largePayloads); + getLargePayload(existing.id, existing.sizeMB); + } + + sleep(randomInt(2, 5) / 10); +}