Skip to content

Commit 51973a2

Browse files
authored
Merge pull request #2 from metrico/s3
S3 support
2 parents 46ed2d5 + 64634e0 commit 51973a2

File tree

2 files changed

+201
-56
lines changed

2 files changed

+201
-56
lines changed

go/go.mod

Lines changed: 47 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,33 +2,53 @@ module github.com/metrico/duckdb-urlengine
22

33
go 1.22.5
44

5-
require github.com/gin-gonic/gin v1.10.0
5+
require (
6+
github.com/aws/aws-sdk-go-v2 v1.31.0
7+
github.com/aws/aws-sdk-go-v2/credentials v1.17.37
8+
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.25
9+
github.com/aws/aws-sdk-go-v2/service/s3 v1.63.3
10+
github.com/gin-contrib/cors v1.7.2
11+
github.com/gin-gonic/gin v1.10.0
12+
github.com/joho/godotenv v1.5.1
13+
github.com/mattn/go-sqlite3 v1.14.23
14+
github.com/metrico/pasticca v1.0.0
15+
)
616

717
require (
8-
github.com/bytedance/sonic v1.11.6 // indirect
9-
github.com/bytedance/sonic/loader v0.1.1 // indirect
10-
github.com/cloudwego/base64x v0.1.4 // indirect
11-
github.com/cloudwego/iasm v0.2.0 // indirect
12-
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
13-
github.com/gin-contrib/sse v0.1.0 // indirect
14-
github.com/go-playground/locales v0.14.1 // indirect
15-
github.com/go-playground/universal-translator v0.18.1 // indirect
16-
github.com/go-playground/validator/v10 v10.20.0 // indirect
17-
github.com/goccy/go-json v0.10.2 // indirect
18-
github.com/json-iterator/go v1.1.12 // indirect
19-
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
20-
github.com/leodido/go-urn v1.4.0 // indirect
21-
github.com/mattn/go-isatty v0.0.20 // indirect
22-
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
23-
github.com/modern-go/reflect2 v1.0.2 // indirect
24-
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
25-
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
26-
github.com/ugorji/go/codec v1.2.12 // indirect
27-
golang.org/x/arch v0.8.0 // indirect
28-
golang.org/x/crypto v0.23.0 // indirect
29-
golang.org/x/net v0.25.0 // indirect
30-
golang.org/x/sys v0.20.0 // indirect
31-
golang.org/x/text v0.15.0 // indirect
32-
google.golang.org/protobuf v1.34.1 // indirect
33-
gopkg.in/yaml.v3 v3.0.1 // indirect
18+
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.5 // indirect
19+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.18 // indirect
20+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.18 // indirect
21+
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.18 // indirect
22+
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.5 // indirect
23+
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.20 // indirect
24+
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.20 // indirect
25+
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.18 // indirect
26+
github.com/aws/smithy-go v1.21.0 // indirect
27+
github.com/bytedance/sonic v1.11.6 // indirect
28+
github.com/bytedance/sonic/loader v0.1.1 // indirect
29+
github.com/cloudwego/base64x v0.1.4 // indirect
30+
github.com/cloudwego/iasm v0.2.0 // indirect
31+
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
32+
github.com/gin-contrib/sse v0.1.0 // indirect
33+
github.com/go-playground/locales v0.14.1 // indirect
34+
github.com/go-playground/universal-translator v0.18.1 // indirect
35+
github.com/go-playground/validator/v10 v10.20.0 // indirect
36+
github.com/goccy/go-json v0.10.2 // indirect
37+
github.com/json-iterator/go v1.1.12 // indirect
38+
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
39+
github.com/kr/text v0.2.0 // indirect
40+
github.com/leodido/go-urn v1.4.0 // indirect
41+
github.com/mattn/go-isatty v0.0.20 // indirect
42+
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
43+
github.com/modern-go/reflect2 v1.0.2 // indirect
44+
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
45+
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
46+
github.com/ugorji/go/codec v1.2.12 // indirect
47+
golang.org/x/arch v0.8.0 // indirect
48+
golang.org/x/crypto v0.23.0 // indirect
49+
golang.org/x/net v0.25.0 // indirect
50+
golang.org/x/sys v0.20.0 // indirect
51+
golang.org/x/text v0.15.0 // indirect
52+
google.golang.org/protobuf v1.34.1 // indirect
53+
gopkg.in/yaml.v3 v3.0.1 // indirect
3454
)

go/server.go

Lines changed: 154 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,100 @@
11
package main
22

33
import (
4+
"context"
45
"fmt"
56
"io"
67
"log"
78
"net/http"
89
"os"
910
"path/filepath"
1011
"strings"
12+
"sync"
1113
"time"
1214

13-
"github.com/gin-gonic/gin"
15+
"github.com/aws/aws-sdk-go-v2/aws"
16+
"github.com/aws/aws-sdk-go-v2/credentials"
17+
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
18+
"github.com/aws/aws-sdk-go-v2/service/s3"
1419
"github.com/gin-contrib/cors"
20+
"github.com/gin-gonic/gin"
1521
)
1622

1723
const DBDir = ".local/tmp"
1824

25+
type StorageConfig struct {
26+
s3Client *s3.Client
27+
bucketName string
28+
useS3 bool
29+
uploadMutex sync.Mutex
30+
}
31+
1932
type HivePathInfo struct {
2033
Partitions map[string]string
2134
FileName string
2235
IsHiveStyle bool
2336
}
2437

38+
var storage StorageConfig
39+
40+
func initStorage() error {
41+
endpoint := getEnv("S3_ENDPOINT", "")
42+
bucketName := getEnv("S3_BUCKET", "")
43+
accessKey := getEnv("S3_ACCESS_KEY", "")
44+
secretKey := getEnv("S3_SECRET_KEY", "")
45+
useS3 := endpoint != "" && bucketName != "" && accessKey != "" && secretKey != ""
46+
47+
if useS3 {
48+
customResolver := aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) {
49+
return aws.Endpoint{
50+
URL: endpoint,
51+
SigningRegion: "us-east-1",
52+
HostnameImmutable: true,
53+
Source: aws.EndpointSourceCustom,
54+
}, nil
55+
})
56+
57+
cfg := aws.Config{
58+
Credentials: credentials.NewStaticCredentialsProvider(accessKey, secretKey, ""),
59+
EndpointResolver: customResolver,
60+
Region: "us-east-1",
61+
BaseEndpoint: aws.String(endpoint),
62+
}
63+
64+
client := s3.NewFromConfig(cfg, func(o *s3.Options) {
65+
o.UsePathStyle = true
66+
})
67+
68+
storage = StorageConfig{
69+
s3Client: client,
70+
bucketName: bucketName,
71+
useS3: true,
72+
}
73+
74+
log.Printf("S3-compatible storage initialized with endpoint: %s, bucket: %s", endpoint, bucketName)
75+
} else {
76+
storage = StorageConfig{
77+
useS3: false,
78+
}
79+
log.Println("Running in local storage mode")
80+
}
81+
82+
return nil
83+
}
84+
2585
func main() {
86+
if err := initStorage(); err != nil {
87+
log.Fatal(err)
88+
}
89+
2690
router := gin.Default()
2791

28-
// CORS configuration
2992
config := cors.Config{
30-
AllowOrigins: []string{"*"},
31-
AllowMethods: []string{"GET", "HEAD"},
32-
AllowHeaders: []string{"*"},
33-
ExposeHeaders: []string{},
34-
MaxAge: 5000,
93+
AllowOrigins: []string{"*"},
94+
AllowMethods: []string{"GET", "HEAD"},
95+
AllowHeaders: []string{"*"},
96+
ExposeHeaders: []string{},
97+
MaxAge: 5000,
3598
}
3699

37100
router.Use(cors.New(config))
@@ -88,6 +151,68 @@ func getFilePath(requestPath string) string {
88151
return filepath.Join(DBDir, requestPath)
89152
}
90153

154+
func fetchFromS3(s3Path string, localPath string) error {
155+
if !storage.useS3 {
156+
return fmt.Errorf("S3 storage not configured")
157+
}
158+
159+
ctx := context.Background()
160+
161+
if err := os.MkdirAll(filepath.Dir(localPath), os.ModePerm); err != nil {
162+
return err
163+
}
164+
165+
file, err := os.Create(localPath)
166+
if err != nil {
167+
return err
168+
}
169+
defer file.Close()
170+
171+
downloader := manager.NewDownloader(storage.s3Client)
172+
_, err = downloader.Download(ctx, file, &s3.GetObjectInput{
173+
Bucket: aws.String(storage.bucketName),
174+
Key: aws.String(s3Path),
175+
})
176+
177+
if err != nil {
178+
os.Remove(localPath)
179+
return fmt.Errorf("failed to download from S3: %v", err)
180+
}
181+
182+
log.Printf("Successfully downloaded %s from S3", s3Path)
183+
return nil
184+
}
185+
186+
func uploadToS3(localPath string, s3Path string) {
187+
if !storage.useS3 {
188+
return
189+
}
190+
191+
storage.uploadMutex.Lock()
192+
defer storage.uploadMutex.Unlock()
193+
194+
file, err := os.Open(localPath)
195+
if err != nil {
196+
log.Printf("Failed to open file for S3 upload: %v", err)
197+
return
198+
}
199+
defer file.Close()
200+
201+
ctx := context.Background()
202+
uploader := manager.NewUploader(storage.s3Client)
203+
_, err = uploader.Upload(ctx, &s3.PutObjectInput{
204+
Bucket: aws.String(storage.bucketName),
205+
Key: aws.String(s3Path),
206+
Body: file,
207+
})
208+
209+
if err != nil {
210+
log.Printf("Failed to upload to S3: %v", err)
211+
} else {
212+
log.Printf("Successfully uploaded %s to S3", s3Path)
213+
}
214+
}
215+
91216
func handleRequest(c *gin.Context) {
92217
requestPath := c.Param("path")
93218
if requestPath == "" {
@@ -160,9 +285,6 @@ func handleWildcardHeadRequest(c *gin.Context, matchingFiles []string) {
160285
}
161286
}
162287

163-
log.Printf("HEAD request: Total size: %d, Last modified: %s, Matched files: %d",
164-
totalSize, lastModified.UTC().Format(http.TimeFormat), len(matchingFiles))
165-
166288
c.Header("Content-Length", fmt.Sprintf("%d", totalSize))
167289
c.Header("Last-Modified", lastModified.UTC().Format(http.TimeFormat))
168290
c.Header("Accept-Ranges", "bytes")
@@ -197,17 +319,29 @@ func handleExactPathRequest(c *gin.Context, info HivePathInfo) {
197319

198320
fileInfo, err := os.Stat(filePath)
199321
if err != nil {
200-
if os.IsNotExist(err) {
322+
if os.IsNotExist(err) && storage.useS3 {
323+
// Try fetching from S3 before returning 404
324+
relPath, _ := filepath.Rel(DBDir, filePath)
325+
if err := fetchFromS3(relPath, filePath); err != nil {
326+
log.Printf("S3 fetch failed: %v", err)
327+
c.JSON(http.StatusNotFound, gin.H{"error": "Not found in local storage or S3"})
328+
return
329+
}
330+
fileInfo, err = os.Stat(filePath)
331+
if err != nil {
332+
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to stat file after S3 fetch"})
333+
return
334+
}
335+
} else if os.IsNotExist(err) {
201336
c.JSON(http.StatusNotFound, gin.H{"error": "Not found"})
337+
return
202338
} else {
203339
c.JSON(http.StatusInternalServerError, gin.H{"error": "Internal server error"})
340+
return
204341
}
205-
return
206342
}
207343

208344
if fileInfo.IsDir() {
209-
// handleDirectory(c, filePath)
210-
// Return an empty response for directory requests
211345
c.String(http.StatusOK, "")
212346
} else {
213347
handleSingleFile(c, filePath)
@@ -249,21 +383,6 @@ func handleMultipleFiles(c *gin.Context, matchingFiles []string) {
249383
}
250384
}
251385

252-
func handleDirectory(c *gin.Context, dirPath string) {
253-
files, err := filepath.Glob(filepath.Join(dirPath, "*"))
254-
if err != nil {
255-
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to read directory"})
256-
return
257-
}
258-
259-
relativeFiles := make([]string, len(files))
260-
for i, file := range files {
261-
relativeFiles[i], _ = filepath.Rel(DBDir, file)
262-
}
263-
264-
c.JSON(http.StatusOK, relativeFiles)
265-
}
266-
267386
func handlePostRequest(c *gin.Context) {
268387
requestPath := c.Param("path")
269388
if requestPath == "" {
@@ -290,5 +409,11 @@ func handlePostRequest(c *gin.Context) {
290409
}
291410

292411
relativePath, _ := filepath.Rel(DBDir, filePath)
412+
413+
// Asynchronously upload to S3 if configured
414+
if storage.useS3 {
415+
go uploadToS3(filePath, relativePath)
416+
}
417+
293418
c.JSON(http.StatusOK, gin.H{"success": true, "path": relativePath})
294419
}

0 commit comments

Comments
 (0)