Skip to content

Commit 1a89e45

Browse files
efirsclaude
andcommitted
feat: add Bundle API support for streaming multi-object tar download
Add BundleObjects method that POSTs to /{bucket}?bundle with SigV4 signing and returns a streaming io.ReadCloser for tar consumption. Includes WithBundleCompression/WithBundleOnError header helpers, unit tests, and godoc example. Assisted-by: Claude Opus 4.6 via Claude Code Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Yevgeniy Firsov <firsov@tigrisdata.com>
1 parent 0383375 commit 1a89e45

5 files changed

Lines changed: 399 additions & 1 deletion

File tree

bundle.go

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
package storage
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"crypto/sha256"
7+
"encoding/hex"
8+
"encoding/json"
9+
"fmt"
10+
"io"
11+
"net"
12+
"net/http"
13+
"strings"
14+
"time"
15+
16+
v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
17+
)
18+
19+
const (
20+
// BundleFormatTar is the tar archive format for bundle responses.
21+
BundleFormatTar = "tar"
22+
23+
// BundleCompressionNone disables compression (default).
24+
BundleCompressionNone = "none"
25+
// BundleCompressionGzip enables gzip compression.
26+
BundleCompressionGzip = "gzip"
27+
// BundleCompressionZstd enables zstd compression.
28+
BundleCompressionZstd = "zstd"
29+
30+
// BundleOnErrorSkip silently omits missing objects from the archive (default).
31+
BundleOnErrorSkip = "skip"
32+
// BundleOnErrorFail returns an error if any object is missing.
33+
BundleOnErrorFail = "fail"
34+
)
35+
36+
// bundleHTTPClient is reused across calls. No overall timeout — the caller's
37+
// context controls cancellation, which avoids cutting off streaming reads.
38+
var bundleHTTPClient = &http.Client{
39+
Transport: &http.Transport{
40+
Proxy: http.ProxyFromEnvironment,
41+
DialContext: (&net.Dialer{Timeout: 30 * time.Second}).DialContext,
42+
TLSHandshakeTimeout: 10 * time.Second,
43+
ResponseHeaderTimeout: 60 * time.Second,
44+
},
45+
}
46+
47+
// BundleObjectsInput is the input for a BundleObjects request.
48+
type BundleObjectsInput struct {
49+
// Bucket is the name of the bucket containing the objects. Required.
50+
Bucket string
51+
52+
// Keys is the list of object keys to include in the bundle. Required.
53+
// Maximum 5,000 keys per request.
54+
Keys []string
55+
56+
// Compression sets the compression algorithm for the response.
57+
// Valid values: "none" (default), "gzip", "zstd".
58+
Compression string
59+
60+
// OnError controls behavior when objects are missing.
61+
// "skip" (default): omit missing objects and append __bundle_errors.json to the tar.
62+
// "fail": return an error before streaming if any object is missing.
63+
OnError string
64+
}
65+
66+
// BundleObjectsOutput is the response from a BundleObjects request.
67+
//
68+
// The Body contains a streaming tar archive. Callers are responsible for closing Body.
69+
// Use archive/tar to iterate entries:
70+
//
71+
// tr := tar.NewReader(output.Body)
72+
// for {
73+
// hdr, err := tr.Next()
74+
// if err == io.EOF { break }
75+
// // process hdr.Name, tr
76+
// }
77+
//
78+
// If compression was requested, wrap Body with the appropriate decompressor first:
79+
//
80+
// gz, _ := gzip.NewReader(output.Body)
81+
// tr := tar.NewReader(gz)
82+
type BundleObjectsOutput struct {
83+
// Body is the streaming tar archive. Must be closed by the caller.
84+
Body io.ReadCloser
85+
86+
// ContentType is the response Content-Type (e.g. "application/x-tar", "application/gzip").
87+
ContentType string
88+
89+
// StatusCode is the HTTP status code of the response.
90+
StatusCode int
91+
}
92+
93+
type bundleRequestBody struct {
94+
Keys []string `json:"keys"`
95+
}
96+
97+
// BundleObjects fetches multiple objects from a bucket as a streaming tar archive
98+
// in a single HTTP request.
99+
//
100+
// This is a Tigris extension to the S3 API, designed for ML training workloads
101+
// that need to fetch thousands of objects per batch without per-object HTTP overhead.
102+
//
103+
// The caller is responsible for closing the returned Body.
104+
func (c *Client) BundleObjects(ctx context.Context, in *BundleObjectsInput) (*BundleObjectsOutput, error) {
105+
if in.Bucket == "" {
106+
return nil, fmt.Errorf("storage: BundleObjects: bucket is required")
107+
}
108+
if len(in.Keys) == 0 {
109+
return nil, fmt.Errorf("storage: BundleObjects: at least one key is required")
110+
}
111+
112+
compression := in.Compression
113+
if compression == "" {
114+
compression = BundleCompressionNone
115+
}
116+
117+
onError := in.OnError
118+
if onError == "" {
119+
onError = BundleOnErrorSkip
120+
}
121+
122+
opts := c.Client.Options()
123+
124+
endpoint := GlobalEndpoint
125+
if opts.BaseEndpoint != nil {
126+
endpoint = *opts.BaseEndpoint
127+
}
128+
endpoint = strings.TrimRight(endpoint, "/")
129+
130+
reqURL := fmt.Sprintf("%s/%s?bundle", endpoint, in.Bucket)
131+
132+
body, err := json.Marshal(bundleRequestBody{Keys: in.Keys})
133+
if err != nil {
134+
return nil, fmt.Errorf("storage: BundleObjects: failed to marshal keys: %w", err)
135+
}
136+
137+
req, err := http.NewRequestWithContext(ctx, http.MethodPost, reqURL, bytes.NewReader(body))
138+
if err != nil {
139+
return nil, fmt.Errorf("storage: BundleObjects: failed to create request: %w", err)
140+
}
141+
142+
req.Header.Set("Content-Type", "application/json")
143+
req.Header.Set("X-Tigris-Bundle-Format", BundleFormatTar)
144+
req.Header.Set("X-Tigris-Bundle-Compression", compression)
145+
req.Header.Set("X-Tigris-Bundle-On-Error", onError)
146+
147+
// Sign request with SigV4.
148+
if opts.Credentials != nil {
149+
creds, err := opts.Credentials.Retrieve(ctx)
150+
if err != nil {
151+
return nil, fmt.Errorf("storage: BundleObjects: failed to retrieve credentials: %w", err)
152+
}
153+
154+
payloadHash := sha256Hex(body)
155+
req.Header.Set("X-Amz-Content-Sha256", payloadHash)
156+
157+
signer := v4.NewSigner()
158+
region := opts.Region
159+
if region == "" {
160+
region = "auto"
161+
}
162+
163+
err = signer.SignHTTP(ctx, creds, req, payloadHash, "s3", region, time.Now())
164+
if err != nil {
165+
return nil, fmt.Errorf("storage: BundleObjects: failed to sign request: %w", err)
166+
}
167+
}
168+
169+
resp, err := bundleHTTPClient.Do(req)
170+
if err != nil {
171+
return nil, fmt.Errorf("storage: BundleObjects: request failed: %w", err)
172+
}
173+
174+
if resp.StatusCode >= 400 {
175+
defer resp.Body.Close()
176+
errBody, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
177+
return nil, fmt.Errorf("storage: BundleObjects: HTTP %d: %s", resp.StatusCode, string(errBody))
178+
}
179+
180+
return &BundleObjectsOutput{
181+
Body: resp.Body,
182+
ContentType: resp.Header.Get("Content-Type"),
183+
StatusCode: resp.StatusCode,
184+
}, nil
185+
}
186+
187+
func sha256Hex(data []byte) string {
188+
h := sha256.Sum256(data)
189+
return hex.EncodeToString(h[:])
190+
}

bundle_test.go

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
package storage
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"io"
7+
"net/http"
8+
"net/http/httptest"
9+
"testing"
10+
)
11+
12+
func TestBundleObjects_Validation(t *testing.T) {
13+
cli := &Client{}
14+
15+
t.Run("missing bucket", func(t *testing.T) {
16+
_, err := cli.BundleObjects(context.Background(), &BundleObjectsInput{
17+
Keys: []string{"a"},
18+
})
19+
if err == nil {
20+
t.Fatal("expected error for missing bucket")
21+
}
22+
})
23+
24+
t.Run("missing keys", func(t *testing.T) {
25+
_, err := cli.BundleObjects(context.Background(), &BundleObjectsInput{
26+
Bucket: "test-bucket",
27+
})
28+
if err == nil {
29+
t.Fatal("expected error for missing keys")
30+
}
31+
})
32+
33+
t.Run("empty keys", func(t *testing.T) {
34+
_, err := cli.BundleObjects(context.Background(), &BundleObjectsInput{
35+
Bucket: "test-bucket",
36+
Keys: []string{},
37+
})
38+
if err == nil {
39+
t.Fatal("expected error for empty keys")
40+
}
41+
})
42+
}
43+
44+
func TestBundleObjects_RequestConstruction(t *testing.T) {
45+
var capturedReq *http.Request
46+
var capturedBody []byte
47+
48+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
49+
capturedReq = r
50+
capturedBody, _ = io.ReadAll(r.Body)
51+
w.Header().Set("Content-Type", "application/x-tar")
52+
w.WriteHeader(http.StatusOK)
53+
}))
54+
defer server.Close()
55+
56+
cli, err := New(context.Background(),
57+
WithEndpoint(server.URL),
58+
WithAccessKeypair("test-key", "test-secret"),
59+
)
60+
if err != nil {
61+
t.Fatal(err)
62+
}
63+
64+
t.Run("default options", func(t *testing.T) {
65+
output, err := cli.BundleObjects(context.Background(), &BundleObjectsInput{
66+
Bucket: "my-bucket",
67+
Keys: []string{"a.jpg", "b.jpg"},
68+
})
69+
if err != nil {
70+
t.Fatal(err)
71+
}
72+
defer output.Body.Close()
73+
74+
if capturedReq.Method != "POST" {
75+
t.Errorf("method = %q, want POST", capturedReq.Method)
76+
}
77+
if capturedReq.URL.Path != "/my-bucket" {
78+
t.Errorf("path = %q, want /my-bucket", capturedReq.URL.Path)
79+
}
80+
if capturedReq.URL.Query().Get("bundle") != "" {
81+
// query param "bundle" should be present with empty value
82+
} else if !capturedReq.URL.Query().Has("bundle") {
83+
t.Error("missing ?bundle query parameter")
84+
}
85+
86+
if capturedReq.Header.Get("Content-Type") != "application/json" {
87+
t.Errorf("content-type = %q, want application/json", capturedReq.Header.Get("Content-Type"))
88+
}
89+
if capturedReq.Header.Get("X-Tigris-Bundle-Format") != "tar" {
90+
t.Errorf("bundle-format = %q, want tar", capturedReq.Header.Get("X-Tigris-Bundle-Format"))
91+
}
92+
if capturedReq.Header.Get("X-Tigris-Bundle-Compression") != "none" {
93+
t.Errorf("compression = %q, want none", capturedReq.Header.Get("X-Tigris-Bundle-Compression"))
94+
}
95+
if capturedReq.Header.Get("X-Tigris-Bundle-On-Error") != "skip" {
96+
t.Errorf("on-error = %q, want skip", capturedReq.Header.Get("X-Tigris-Bundle-On-Error"))
97+
}
98+
99+
// Verify body contains keys.
100+
var body bundleRequestBody
101+
if err := json.Unmarshal(capturedBody, &body); err != nil {
102+
t.Fatalf("failed to unmarshal body: %v", err)
103+
}
104+
if len(body.Keys) != 2 || body.Keys[0] != "a.jpg" || body.Keys[1] != "b.jpg" {
105+
t.Errorf("body keys = %v, want [a.jpg b.jpg]", body.Keys)
106+
}
107+
108+
// Verify SigV4 authorization header is present.
109+
if capturedReq.Header.Get("Authorization") == "" {
110+
t.Error("missing Authorization header (SigV4)")
111+
}
112+
113+
if output.ContentType != "application/x-tar" {
114+
t.Errorf("content-type = %q, want application/x-tar", output.ContentType)
115+
}
116+
})
117+
118+
t.Run("custom compression and error mode", func(t *testing.T) {
119+
output, err := cli.BundleObjects(context.Background(), &BundleObjectsInput{
120+
Bucket: "my-bucket",
121+
Keys: []string{"x.txt"},
122+
Compression: BundleCompressionGzip,
123+
OnError: BundleOnErrorFail,
124+
})
125+
if err != nil {
126+
t.Fatal(err)
127+
}
128+
defer output.Body.Close()
129+
130+
if capturedReq.Header.Get("X-Tigris-Bundle-Compression") != "gzip" {
131+
t.Errorf("compression = %q, want gzip", capturedReq.Header.Get("X-Tigris-Bundle-Compression"))
132+
}
133+
if capturedReq.Header.Get("X-Tigris-Bundle-On-Error") != "fail" {
134+
t.Errorf("on-error = %q, want fail", capturedReq.Header.Get("X-Tigris-Bundle-On-Error"))
135+
}
136+
})
137+
}
138+
139+
func TestBundleObjects_HTTPError(t *testing.T) {
140+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
141+
w.WriteHeader(http.StatusBadRequest)
142+
w.Write([]byte(`<Error><Code>InvalidArgument</Code></Error>`))
143+
}))
144+
defer server.Close()
145+
146+
cli, err := New(context.Background(),
147+
WithEndpoint(server.URL),
148+
WithAccessKeypair("test-key", "test-secret"),
149+
)
150+
if err != nil {
151+
t.Fatal(err)
152+
}
153+
154+
_, err = cli.BundleObjects(context.Background(), &BundleObjectsInput{
155+
Bucket: "my-bucket",
156+
Keys: []string{"a.jpg"},
157+
})
158+
if err == nil {
159+
t.Fatal("expected error for HTTP 400")
160+
}
161+
}

0 commit comments

Comments
 (0)