Skip to content

Commit 7354f0c

Browse files
committed
s3
1 parent 51973a2 commit 7354f0c

File tree

1 file changed

+87
-69
lines changed

1 file changed

+87
-69
lines changed

go/server.go

Lines changed: 87 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -38,48 +38,57 @@ type HivePathInfo struct {
3838
var storage StorageConfig
3939

4040
func initStorage() error {
41-
endpoint := getEnv("S3_ENDPOINT", "")
42-
bucketName := getEnv("S3_BUCKET", "")
43-
accessKey := getEnv("S3_ACCESS_KEY", "")
44-
secretKey := getEnv("S3_SECRET_KEY", "")
45-
useS3 := endpoint != "" && bucketName != "" && accessKey != "" && secretKey != ""
46-
47-
if useS3 {
48-
customResolver := aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) {
49-
return aws.Endpoint{
50-
URL: endpoint,
51-
SigningRegion: "us-east-1",
52-
HostnameImmutable: true,
53-
Source: aws.EndpointSourceCustom,
54-
}, nil
55-
})
56-
57-
cfg := aws.Config{
58-
Credentials: credentials.NewStaticCredentialsProvider(accessKey, secretKey, ""),
59-
EndpointResolver: customResolver,
60-
Region: "us-east-1",
61-
BaseEndpoint: aws.String(endpoint),
62-
}
63-
64-
client := s3.NewFromConfig(cfg, func(o *s3.Options) {
65-
o.UsePathStyle = true
66-
})
67-
68-
storage = StorageConfig{
69-
s3Client: client,
70-
bucketName: bucketName,
71-
useS3: true,
72-
}
73-
74-
log.Printf("S3-compatible storage initialized with endpoint: %s, bucket: %s", endpoint, bucketName)
75-
} else {
76-
storage = StorageConfig{
77-
useS3: false,
78-
}
79-
log.Println("Running in local storage mode")
80-
}
81-
82-
return nil
41+
endpoint := getEnv("S3_ENDPOINT", "")
42+
bucketName := getEnv("S3_BUCKET", "")
43+
accessKey := getEnv("S3_ACCESS_KEY", "")
44+
secretKey := getEnv("S3_SECRET_KEY", "")
45+
useS3 := endpoint != "" && bucketName != "" && accessKey != "" && secretKey != ""
46+
47+
if useS3 {
48+
log.Printf("Initializing MinIO storage with endpoint: %s, bucket: %s", endpoint, bucketName)
49+
50+
cfg := aws.Config{
51+
Credentials: credentials.NewStaticCredentialsProvider(accessKey, secretKey, ""),
52+
EndpointResolver: aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) {
53+
return aws.Endpoint{
54+
URL: endpoint,
55+
Source: aws.EndpointSourceCustom,
56+
}, nil
57+
}),
58+
}
59+
60+
client := s3.NewFromConfig(cfg, func(o *s3.Options) {
61+
o.UsePathStyle = true
62+
})
63+
64+
// Verify MinIO connection
65+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
66+
defer cancel()
67+
68+
_, err := client.HeadBucket(ctx, &s3.HeadBucketInput{
69+
Bucket: aws.String(bucketName),
70+
})
71+
72+
if err != nil {
73+
log.Printf("Failed to connect to MinIO: %v", err)
74+
return fmt.Errorf("MinIO connection test failed: %v", err)
75+
}
76+
77+
storage = StorageConfig{
78+
s3Client: client,
79+
bucketName: bucketName,
80+
useS3: true,
81+
}
82+
83+
log.Printf("Successfully connected to MinIO storage")
84+
} else {
85+
storage = StorageConfig{
86+
useS3: false,
87+
}
88+
log.Println("Running in local storage mode")
89+
}
90+
91+
return nil
8392
}
8493

8594
func main() {
@@ -184,35 +193,44 @@ func fetchFromS3(s3Path string, localPath string) error {
184193
}
185194

186195
func uploadToS3(localPath string, s3Path string) {
187-
if !storage.useS3 {
188-
return
189-
}
190-
191-
storage.uploadMutex.Lock()
192-
defer storage.uploadMutex.Unlock()
193-
194-
file, err := os.Open(localPath)
195-
if err != nil {
196-
log.Printf("Failed to open file for S3 upload: %v", err)
197-
return
198-
}
199-
defer file.Close()
200-
201-
ctx := context.Background()
202-
uploader := manager.NewUploader(storage.s3Client)
203-
_, err = uploader.Upload(ctx, &s3.PutObjectInput{
204-
Bucket: aws.String(storage.bucketName),
205-
Key: aws.String(s3Path),
206-
Body: file,
207-
})
208-
209-
if err != nil {
210-
log.Printf("Failed to upload to S3: %v", err)
211-
} else {
212-
log.Printf("Successfully uploaded %s to S3", s3Path)
213-
}
196+
if !storage.useS3 {
197+
log.Printf("MinIO not configured, skipping upload")
198+
return
199+
}
200+
201+
// Clean the path
202+
s3Path = strings.TrimPrefix(s3Path, "/")
203+
log.Printf("Starting MinIO upload - local: %s, remote path: %s", localPath, s3Path)
204+
205+
storage.uploadMutex.Lock()
206+
defer storage.uploadMutex.Unlock()
207+
208+
file, err := os.Open(localPath)
209+
if err != nil {
210+
log.Printf("Failed to open file for MinIO upload: %v", err)
211+
return
212+
}
213+
defer file.Close()
214+
215+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
216+
defer cancel()
217+
218+
uploader := manager.NewUploader(storage.s3Client)
219+
result, err := uploader.Upload(ctx, &s3.PutObjectInput{
220+
Bucket: aws.String(storage.bucketName),
221+
Key: aws.String(s3Path),
222+
Body: file,
223+
})
224+
225+
if err != nil {
226+
log.Printf("Failed to upload to MinIO: %v", err)
227+
return
228+
}
229+
230+
log.Printf("Successfully uploaded file to MinIO: %s, Location: %s", s3Path, result.Location)
214231
}
215232

233+
216234
func handleRequest(c *gin.Context) {
217235
requestPath := c.Param("path")
218236
if requestPath == "" {

0 commit comments

Comments
 (0)