Skip to content

Commit 3480f78

Browse files
authored
S3 support
1 parent 46ed2d5 commit 3480f78

File tree

1 file changed

+154
-29
lines changed

1 file changed

+154
-29
lines changed

go/server.go

Lines changed: 154 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,100 @@
11
package main
22

33
import (
4+
"context"
45
"fmt"
56
"io"
67
"log"
78
"net/http"
89
"os"
910
"path/filepath"
1011
"strings"
12+
"sync"
1113
"time"
1214

13-
"github.com/gin-gonic/gin"
15+
"github.com/aws/aws-sdk-go-v2/aws"
16+
"github.com/aws/aws-sdk-go-v2/credentials"
17+
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
18+
"github.com/aws/aws-sdk-go-v2/service/s3"
1419
"github.com/gin-contrib/cors"
20+
"github.com/gin-gonic/gin"
1521
)
1622

1723
const DBDir = ".local/tmp"
1824

25+
type StorageConfig struct {
26+
s3Client *s3.Client
27+
bucketName string
28+
useS3 bool
29+
uploadMutex sync.Mutex
30+
}
31+
1932
type HivePathInfo struct {
2033
Partitions map[string]string
2134
FileName string
2235
IsHiveStyle bool
2336
}
2437

38+
var storage StorageConfig
39+
40+
func initStorage() error {
41+
endpoint := getEnv("S3_ENDPOINT", "")
42+
bucketName := getEnv("S3_BUCKET", "")
43+
accessKey := getEnv("S3_ACCESS_KEY", "")
44+
secretKey := getEnv("S3_SECRET_KEY", "")
45+
useS3 := endpoint != "" && bucketName != "" && accessKey != "" && secretKey != ""
46+
47+
if useS3 {
48+
customResolver := aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) {
49+
return aws.Endpoint{
50+
URL: endpoint,
51+
SigningRegion: "us-east-1",
52+
HostnameImmutable: true,
53+
Source: aws.EndpointSourceCustom,
54+
}, nil
55+
})
56+
57+
cfg := aws.Config{
58+
Credentials: credentials.NewStaticCredentialsProvider(accessKey, secretKey, ""),
59+
EndpointResolver: customResolver,
60+
Region: "us-east-1",
61+
BaseEndpoint: aws.String(endpoint),
62+
}
63+
64+
client := s3.NewFromConfig(cfg, func(o *s3.Options) {
65+
o.UsePathStyle = true
66+
})
67+
68+
storage = StorageConfig{
69+
s3Client: client,
70+
bucketName: bucketName,
71+
useS3: true,
72+
}
73+
74+
log.Printf("S3-compatible storage initialized with endpoint: %s, bucket: %s", endpoint, bucketName)
75+
} else {
76+
storage = StorageConfig{
77+
useS3: false,
78+
}
79+
log.Println("Running in local storage mode")
80+
}
81+
82+
return nil
83+
}
84+
2585
func main() {
86+
if err := initStorage(); err != nil {
87+
log.Fatal(err)
88+
}
89+
2690
router := gin.Default()
2791

28-
// CORS configuration
2992
config := cors.Config{
30-
AllowOrigins: []string{"*"},
31-
AllowMethods: []string{"GET", "HEAD"},
32-
AllowHeaders: []string{"*"},
33-
ExposeHeaders: []string{},
34-
MaxAge: 5000,
93+
AllowOrigins: []string{"*"},
94+
AllowMethods: []string{"GET", "HEAD"},
95+
AllowHeaders: []string{"*"},
96+
ExposeHeaders: []string{},
97+
MaxAge: 5000,
3598
}
3699

37100
router.Use(cors.New(config))
@@ -88,6 +151,68 @@ func getFilePath(requestPath string) string {
88151
return filepath.Join(DBDir, requestPath)
89152
}
90153

154+
func fetchFromS3(s3Path string, localPath string) error {
155+
if !storage.useS3 {
156+
return fmt.Errorf("S3 storage not configured")
157+
}
158+
159+
ctx := context.Background()
160+
161+
if err := os.MkdirAll(filepath.Dir(localPath), os.ModePerm); err != nil {
162+
return err
163+
}
164+
165+
file, err := os.Create(localPath)
166+
if err != nil {
167+
return err
168+
}
169+
defer file.Close()
170+
171+
downloader := manager.NewDownloader(storage.s3Client)
172+
_, err = downloader.Download(ctx, file, &s3.GetObjectInput{
173+
Bucket: aws.String(storage.bucketName),
174+
Key: aws.String(s3Path),
175+
})
176+
177+
if err != nil {
178+
os.Remove(localPath)
179+
return fmt.Errorf("failed to download from S3: %v", err)
180+
}
181+
182+
log.Printf("Successfully downloaded %s from S3", s3Path)
183+
return nil
184+
}
185+
186+
func uploadToS3(localPath string, s3Path string) {
187+
if !storage.useS3 {
188+
return
189+
}
190+
191+
storage.uploadMutex.Lock()
192+
defer storage.uploadMutex.Unlock()
193+
194+
file, err := os.Open(localPath)
195+
if err != nil {
196+
log.Printf("Failed to open file for S3 upload: %v", err)
197+
return
198+
}
199+
defer file.Close()
200+
201+
ctx := context.Background()
202+
uploader := manager.NewUploader(storage.s3Client)
203+
_, err = uploader.Upload(ctx, &s3.PutObjectInput{
204+
Bucket: aws.String(storage.bucketName),
205+
Key: aws.String(s3Path),
206+
Body: file,
207+
})
208+
209+
if err != nil {
210+
log.Printf("Failed to upload to S3: %v", err)
211+
} else {
212+
log.Printf("Successfully uploaded %s to S3", s3Path)
213+
}
214+
}
215+
91216
func handleRequest(c *gin.Context) {
92217
requestPath := c.Param("path")
93218
if requestPath == "" {
@@ -160,9 +285,6 @@ func handleWildcardHeadRequest(c *gin.Context, matchingFiles []string) {
160285
}
161286
}
162287

163-
log.Printf("HEAD request: Total size: %d, Last modified: %s, Matched files: %d",
164-
totalSize, lastModified.UTC().Format(http.TimeFormat), len(matchingFiles))
165-
166288
c.Header("Content-Length", fmt.Sprintf("%d", totalSize))
167289
c.Header("Last-Modified", lastModified.UTC().Format(http.TimeFormat))
168290
c.Header("Accept-Ranges", "bytes")
@@ -197,17 +319,29 @@ func handleExactPathRequest(c *gin.Context, info HivePathInfo) {
197319

198320
fileInfo, err := os.Stat(filePath)
199321
if err != nil {
200-
if os.IsNotExist(err) {
322+
if os.IsNotExist(err) && storage.useS3 {
323+
// Try fetching from S3 before returning 404
324+
relPath, _ := filepath.Rel(DBDir, filePath)
325+
if err := fetchFromS3(relPath, filePath); err != nil {
326+
log.Printf("S3 fetch failed: %v", err)
327+
c.JSON(http.StatusNotFound, gin.H{"error": "Not found in local storage or S3"})
328+
return
329+
}
330+
fileInfo, err = os.Stat(filePath)
331+
if err != nil {
332+
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to stat file after S3 fetch"})
333+
return
334+
}
335+
} else if os.IsNotExist(err) {
201336
c.JSON(http.StatusNotFound, gin.H{"error": "Not found"})
337+
return
202338
} else {
203339
c.JSON(http.StatusInternalServerError, gin.H{"error": "Internal server error"})
340+
return
204341
}
205-
return
206342
}
207343

208344
if fileInfo.IsDir() {
209-
// handleDirectory(c, filePath)
210-
// Return an empty response for directory requests
211345
c.String(http.StatusOK, "")
212346
} else {
213347
handleSingleFile(c, filePath)
@@ -249,21 +383,6 @@ func handleMultipleFiles(c *gin.Context, matchingFiles []string) {
249383
}
250384
}
251385

252-
func handleDirectory(c *gin.Context, dirPath string) {
253-
files, err := filepath.Glob(filepath.Join(dirPath, "*"))
254-
if err != nil {
255-
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to read directory"})
256-
return
257-
}
258-
259-
relativeFiles := make([]string, len(files))
260-
for i, file := range files {
261-
relativeFiles[i], _ = filepath.Rel(DBDir, file)
262-
}
263-
264-
c.JSON(http.StatusOK, relativeFiles)
265-
}
266-
267386
func handlePostRequest(c *gin.Context) {
268387
requestPath := c.Param("path")
269388
if requestPath == "" {
@@ -290,5 +409,11 @@ func handlePostRequest(c *gin.Context) {
290409
}
291410

292411
relativePath, _ := filepath.Rel(DBDir, filePath)
412+
413+
// Asynchronously upload to S3 if configured
414+
if storage.useS3 {
415+
go uploadToS3(filePath, relativePath)
416+
}
417+
293418
c.JSON(http.StatusOK, gin.H{"success": true, "path": relativePath})
294419
}

0 commit comments

Comments
 (0)