Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions config/api_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,32 @@ func (c *Config) NewApiClient() (*httpclient.ApiClient, error) {
}
retryTimeout := time.Duration(orDefault(c.RetryTimeoutSeconds, 300)) * time.Second
httpTimeout := time.Duration(orDefault(c.HTTPTimeoutSeconds, 60)) * time.Second

// Set Files API defaults if not configured
if c.FilesAPIMultipartUploadMinStreamSize == 0 {
c.FilesAPIMultipartUploadMinStreamSize = 100 * 1024 * 1024 // 100MB
}
if c.FilesAPIMultipartUploadChunkSize == 0 {
c.FilesAPIMultipartUploadChunkSize = 100 * 1024 * 1024 // 100MB
}
if c.FilesAPIMultipartUploadBatchURLCount == 0 {
c.FilesAPIMultipartUploadBatchURLCount = 10
}
if c.FilesAPIMultipartUploadMaxRetries == 0 {
c.FilesAPIMultipartUploadMaxRetries = 3
}
if c.FilesAPIMultipartUploadSingleChunkUploadTimeoutSeconds == 0 {
c.FilesAPIMultipartUploadSingleChunkUploadTimeoutSeconds = 300
}
if c.FilesAPIMultipartUploadURLExpirationDurationSeconds == 0 {
c.FilesAPIMultipartUploadURLExpirationDurationSeconds = 3600 // 1 hour
}
if c.FilesAPIClientDownloadMaxTotalRecovers == 0 {
c.FilesAPIClientDownloadMaxTotalRecovers = 10
}
if c.FilesAPIClientDownloadMaxTotalRecoversWithoutProgressing == 0 {
c.FilesAPIClientDownloadMaxTotalRecoversWithoutProgressing = 3
}
return httpclient.NewApiClient(httpclient.ClientConfig{
RetryTimeout: retryTimeout,
HTTPTimeout: httpTimeout,
Expand Down
18 changes: 18 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,24 @@ type Config struct {
// If negative, the client will retry on retriable errors indefinitely.
RetryTimeoutSeconds int `name:"retry_timeout_seconds" auth:"-"`

// Files API configuration for enhanced upload/download functionality
// Minimum stream size to trigger multipart upload (default: 100MB)
FilesAPIMultipartUploadMinStreamSize int64 `name:"files_api_multipart_upload_min_stream_size" env:"DATABRICKS_FILES_API_MULTIPART_UPLOAD_MIN_STREAM_SIZE" auth:"-"`
// Chunk size for multipart uploads (default: 100MB)
FilesAPIMultipartUploadChunkSize int64 `name:"files_api_multipart_upload_chunk_size" env:"DATABRICKS_FILES_API_MULTIPART_UPLOAD_CHUNK_SIZE" auth:"-"`
// Number of upload URLs to request in a batch (default: 10)
FilesAPIMultipartUploadBatchURLCount int64 `name:"files_api_multipart_upload_batch_url_count" env:"DATABRICKS_FILES_API_MULTIPART_UPLOAD_BATCH_URL_COUNT" auth:"-"`
// Maximum number of retries for multipart upload (default: 3)
FilesAPIMultipartUploadMaxRetries int64 `name:"files_api_multipart_upload_max_retries" env:"DATABRICKS_FILES_API_MULTIPART_UPLOAD_MAX_RETRIES" auth:"-"`
// Timeout for single chunk upload in seconds (default: 300)
FilesAPIMultipartUploadSingleChunkUploadTimeoutSeconds int64 `name:"files_api_multipart_upload_single_chunk_upload_timeout_seconds" env:"DATABRICKS_FILES_API_MULTIPART_UPLOAD_SINGLE_CHUNK_UPLOAD_TIMEOUT_SECONDS" auth:"-"`
// URL expiration duration in seconds (default: 3600)
FilesAPIMultipartUploadURLExpirationDurationSeconds int64 `name:"files_api_multipart_upload_url_expiration_duration_seconds" env:"DATABRICKS_FILES_API_MULTIPART_UPLOAD_URL_EXPIRATION_DURATION_SECONDS" auth:"-"`
// Maximum total recovers for downloads (default: 10)
FilesAPIClientDownloadMaxTotalRecovers int64 `name:"files_api_client_download_max_total_recovers" env:"DATABRICKS_FILES_API_CLIENT_DOWNLOAD_MAX_TOTAL_RECOVERS" auth:"-"`
// Maximum recovers without progressing for downloads (default: 3)
FilesAPIClientDownloadMaxTotalRecoversWithoutProgressing int64 `name:"files_api_client_download_max_total_recovers_without_progressing" env:"DATABRICKS_FILES_API_CLIENT_DOWNLOAD_MAX_TOTAL_RECOVERS_WITHOUT_PROGRESSING" auth:"-"`

// HTTPTransport can be overriden for unit testing and together with tooling like https://github.com/google/go-replayers
HTTPTransport http.RoundTripper

Expand Down
227 changes: 227 additions & 0 deletions examples/files_ext_example.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
package main

import (
"context"
"fmt"
"io"
"log"
"os"
"strings"
"time"

"github.com/databricks/databricks-sdk-go/client"
"github.com/databricks/databricks-sdk-go/config"
"github.com/databricks/databricks-sdk-go/service/files"
)

func main() {
// Example 1: Basic setup and usage
fmt.Println("=== Enhanced Files API Example ===")

// Create configuration
cfg := &config.Config{
Host: os.Getenv("DATABRICKS_HOST"),
Token: os.Getenv("DATABRICKS_TOKEN"),
}

// Create client
databricksClient, err := client.New(cfg)
if err != nil {
log.Fatalf("Failed to create client: %v", err)
}

// Create enhanced Files API
filesExt := files.NewFilesExt(databricksClient)

// Example 2: Upload a small file (one-shot upload)
fmt.Println("\n--- Uploading small file ---")
smallContent := strings.NewReader("Hello, World! This is a small file.")
err = filesExt.Upload(context.Background(), files.UploadRequest{
FilePath: "/Volumes/example-catalog/example-schema/example-volume/small-file.txt",
Contents: io.NopCloser(smallContent),
Overwrite: true,
})
if err != nil {
log.Printf("Small file upload failed: %v", err)
} else {
fmt.Println("✓ Small file uploaded successfully")
}

// Example 3: Upload a large file (multipart upload)
fmt.Println("\n--- Uploading large file ---")
largeContent := strings.NewReader(strings.Repeat("Large file content for multipart upload demonstration. ", 100000))
err = filesExt.Upload(context.Background(), files.UploadRequest{
FilePath: "/Volumes/example-catalog/example-schema/example-volume/large-file.txt",
Contents: io.NopCloser(largeContent),
Overwrite: true,
})
if err != nil {
log.Printf("Large file upload failed: %v", err)
} else {
fmt.Println("✓ Large file uploaded successfully")
}

// Example 4: Download a file with resilient download
fmt.Println("\n--- Downloading file ---")
response, err := filesExt.Download(context.Background(), files.DownloadRequest{
FilePath: "/Volumes/example-catalog/example-schema/example-volume/small-file.txt",
})
if err != nil {
log.Printf("Download failed: %v", err)
} else {
defer response.Contents.Close()

content, err := io.ReadAll(response.Contents)
if err != nil {
log.Printf("Failed to read content: %v", err)
} else {
fmt.Printf("✓ Downloaded file successfully\n")
fmt.Printf(" Content length: %d bytes\n", len(content))
fmt.Printf(" Content: %s\n", string(content))
}
}

// Example 5: Streaming download with recovery
fmt.Println("\n--- Streaming download ---")
streamResponse, err := filesExt.Download(context.Background(), files.DownloadRequest{
FilePath: "/Volumes/example-catalog/example-schema/example-volume/large-file.txt",
})
if err != nil {
log.Printf("Streaming download failed: %v", err)
} else {
defer streamResponse.Contents.Close()

buffer := make([]byte, 1024)
totalBytes := 0
chunks := 0

for {
n, err := streamResponse.Contents.Read(buffer)
if err == io.EOF {
break
}
if err != nil {
log.Printf("Streaming read error: %v", err)
break
}

totalBytes += n
chunks++

// Process the chunk (in this example, just count bytes)
_ = buffer[:n]
}

fmt.Printf("✓ Streaming download completed\n")
fmt.Printf(" Total bytes: %d\n", totalBytes)
fmt.Printf(" Chunks processed: %d\n", chunks)
}

// Example 6: Configuration demonstration
fmt.Println("\n--- Configuration ---")
uploadConfig := files.DefaultUploadConfig()
fmt.Printf("Default configuration (for reference):\n")
fmt.Printf(" Min stream size: %d bytes (%d MB)\n",
uploadConfig.MultipartUploadMinStreamSize,
uploadConfig.MultipartUploadMinStreamSize/(1024*1024))
fmt.Printf(" Chunk size: %d bytes (%d MB)\n",
uploadConfig.MultipartUploadChunkSize,
uploadConfig.MultipartUploadChunkSize/(1024*1024))
fmt.Printf(" Batch URL count: %d\n", uploadConfig.MultipartUploadBatchURLCount)
fmt.Printf(" Max retries: %d\n", uploadConfig.MultipartUploadMaxRetries)
fmt.Printf(" Download max recovers: %d\n", uploadConfig.FilesAPIClientDownloadMaxTotalRecovers)

// Example 6.1: Client configuration demonstration (with automatic defaults)
fmt.Println("\n--- Client Configuration (with automatic defaults) ---")
clientConfig := filesExt.GetUploadConfig()
fmt.Printf("Client configuration (defaults automatically applied):\n")
fmt.Printf(" Min stream size: %d bytes (%d MB)\n",
clientConfig.MultipartUploadMinStreamSize,
clientConfig.MultipartUploadMinStreamSize/(1024*1024))
fmt.Printf(" Chunk size: %d bytes (%d MB)\n",
clientConfig.MultipartUploadChunkSize,
clientConfig.MultipartUploadChunkSize/(1024*1024))
fmt.Printf(" Batch URL count: %d\n", clientConfig.MultipartUploadBatchURLCount)
fmt.Printf(" Max retries: %d\n", clientConfig.MultipartUploadMaxRetries)
fmt.Printf(" Download max recovers: %d\n", clientConfig.FilesAPIClientDownloadMaxTotalRecovers)

// Example 7: Custom configuration
demonstrateCustomConfig()

// Example 8: Client with custom Files API configuration
demonstrateCustomClientConfig()

fmt.Println("\n=== Example completed ===")
}

// Helper function to demonstrate custom configuration
func demonstrateCustomConfig() {
fmt.Println("\n--- Custom Configuration Example ---")

customConfig := &files.UploadConfig{
MultipartUploadMinStreamSize: 50 * 1024 * 1024, // 50MB
MultipartUploadChunkSize: 50 * 1024 * 1024, // 50MB
MultipartUploadBatchURLCount: 5,
MultipartUploadMaxRetries: 5,
MultipartUploadSingleChunkUploadTimeoutSeconds: 600,
MultipartUploadURLExpirationDuration: time.Hour * 2,
FilesAPIClientDownloadMaxTotalRecovers: 15,
FilesAPIClientDownloadMaxTotalRecoversWithoutProgressing: 5,
}

fmt.Printf("Custom configuration:\n")
fmt.Printf(" Min stream size: %d bytes (%d MB)\n",
customConfig.MultipartUploadMinStreamSize,
customConfig.MultipartUploadMinStreamSize/(1024*1024))
fmt.Printf(" Chunk size: %d bytes (%d MB)\n",
customConfig.MultipartUploadChunkSize,
customConfig.MultipartUploadChunkSize/(1024*1024))
fmt.Printf(" Batch URL count: %d\n", customConfig.MultipartUploadBatchURLCount)
fmt.Printf(" Max retries: %d\n", customConfig.MultipartUploadMaxRetries)
fmt.Printf(" Download max recovers: %d\n", customConfig.FilesAPIClientDownloadMaxTotalRecovers)
}

// Helper function to demonstrate custom client configuration
func demonstrateCustomClientConfig() {
fmt.Println("\n--- Custom Client Configuration Example ---")

// Create configuration with custom Files API settings
customCfg := &config.Config{
Host: os.Getenv("DATABRICKS_HOST"),
Token: os.Getenv("DATABRICKS_TOKEN"),

// Custom Files API configuration
FilesAPIMultipartUploadMinStreamSize: 25 * 1024 * 1024, // 25MB
FilesAPIMultipartUploadChunkSize: 25 * 1024 * 1024, // 25MB
FilesAPIMultipartUploadBatchURLCount: 3,
FilesAPIMultipartUploadMaxRetries: 7,
FilesAPIMultipartUploadSingleChunkUploadTimeoutSeconds: 900,
FilesAPIMultipartUploadURLExpirationDurationSeconds: 7200, // 2 hours
FilesAPIClientDownloadMaxTotalRecovers: 20,
FilesAPIClientDownloadMaxTotalRecoversWithoutProgressing: 7,
}

// Create client with custom configuration
databricksClient, err := client.New(customCfg)
if err != nil {
log.Printf("Failed to create client with custom config: %v", err)
return
}

// Create enhanced Files API with custom configuration
filesExt := files.NewFilesExt(databricksClient)

// Get the configuration (should reflect custom values)
clientConfig := filesExt.GetUploadConfig()

fmt.Printf("Custom client configuration:\n")
fmt.Printf(" Min stream size: %d bytes (%d MB)\n",
clientConfig.MultipartUploadMinStreamSize,
clientConfig.MultipartUploadMinStreamSize/(1024*1024))
fmt.Printf(" Chunk size: %d bytes (%d MB)\n",
clientConfig.MultipartUploadChunkSize,
clientConfig.MultipartUploadChunkSize/(1024*1024))
fmt.Printf(" Batch URL count: %d\n", clientConfig.MultipartUploadBatchURLCount)
fmt.Printf(" Max retries: %d\n", clientConfig.MultipartUploadMaxRetries)
fmt.Printf(" Download max recovers: %d\n", clientConfig.FilesAPIClientDownloadMaxTotalRecovers)
}
104 changes: 104 additions & 0 deletions examples/large-file-download/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Large File Download

This program downloads a large file from Databricks using the Files API and saves it to disk.

## Features

- Downloads files from Databricks using the Files API
- Supports Unity Catalog volumes
- Saves downloaded content to a local file
- Shows download progress and statistics
- Verifies download by checking file size
- **Content validation**: Compares downloaded file with original local file
- **Clean start**: Automatically deletes existing downloaded file before starting
- Uses efficient streaming download

## Prerequisites

1. **Databricks Configuration**: Make sure you have a Databricks profile configured
2. **File Exists**: The file must exist in the specified remote location
3. **Unity Catalog Volume**: The source volume must be accessible

## Configuration

Update the configuration in `main.go`:

```go
cfg := &databricks.Config{
Profile: "your-profile-name", // Update this to your profile
}
```

And update the remote file path:

```go
remoteFilePath := "/Volumes/your-catalog/your-schema/your-volume/your-file.bin"
```

You can also customize the local file name:

```go
localFilePath := "your-local-filename.bin"
```

## Usage

```bash
cd examples/large-file-download
go run main.go
```

The program will:
- **Delete existing downloaded file** (if present)
- Connect to your Databricks workspace
- Download the specified file from the remote location
- Save it to the local file system
- Show download statistics (time taken, speed)
- Verify the download by checking file size
- **Compare contents** with the original local file (if available)

## Output

The program will show:
- Confirmation of existing file deletion (if applicable)
- Remote file path being downloaded
- Local file path where it's being saved
- Download progress
- Final download statistics
- Verification results
- Content comparison results (if original file exists)

## Error Handling

The program includes comprehensive error handling for:
- Missing remote files
- Network connectivity issues
- Authentication problems
- File system errors
- Insufficient disk space

## Notes

- The download uses streaming to handle large files efficiently
- The program verifies the download by comparing file sizes
- **Content validation**: If `large_random_file.bin` exists locally, the program will perform a byte-by-byte comparison
- The comparison uses efficient buffered reading (64KB chunks) to handle large files
- **Clean start**: The program automatically deletes any existing `downloaded_large_file.bin` before starting
- Make sure you have sufficient disk space for the downloaded file
- The local file will be created in the current directory

## Example Workflow

1. **Upload a file** (using the upload program):
```bash
cd examples/large-file-upload
go run main.go
```

2. **Download the file**:
```bash
cd examples/large-file-download
go run main.go
```

This creates a complete round-trip workflow for testing large file operations with the Databricks Files API.
Loading
Loading