Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 37 additions & 82 deletions api/fine-tuning/internal/services/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,35 +83,41 @@ func (f *Finalizer) Execute(ctx context.Context, task *db.Task, paths *utils.Tas

userAddr := common.HexToAddress(task.UserAddress)

// Check if storage upload should be skipped
// When skipped, still encrypt but save locally for TEE download via /v1/user/:address/task/:id/lora
if f.config.Service.SkipStorageUpload {
f.logger.Infof("Skipping 0G Storage upload (skipStorageUpload=true), encrypting locally for task %s", task.ID)

// Encrypt and save locally (no upload to storage)
settlementMetadata, err = f.encryptModelLocal(paths.Output, task)
if err != nil {
return err
// Step 1: Always encrypt and save locally first (this creates the TEE backup)
f.logger.Infof("Encrypting LoRA model locally for task %s", task.ID)
settlementMetadata, err = f.encryptModelLocal(paths.Output, task)
if err != nil {
return err
}
f.logger.Infof("Task %s encrypted locally. TEE backup available at /v1/user/%s/task/%s/lora", task.ID, task.UserAddress, task.ID)

// Step 2: Try to upload to 0G Storage (unless explicitly skipped)
if !f.config.Service.SkipStorageUpload {
encryptedFilePath := paths.Output + "_encrypted.data"
f.logger.Infof("Uploading encrypted LoRA to 0G Storage for task %s", task.ID)

storageRootHash, uploadErr := f.uploadModel(ctx, encryptedFilePath)
if uploadErr != nil {
// Upload failed - log warning but continue with local backup
f.logger.Warnf("Failed to upload to 0G Storage for task %s: %v. Using local backup hash.", task.ID, uploadErr)
// Keep the local hash (keccak256 of encrypted file) as the root hash
} else {
// Upload succeeded - use the storage root hash instead
f.logger.Infof("Successfully uploaded to 0G Storage for task %s, root hash: %s", task.ID, string(storageRootHash))
settlementMetadata.ModelRootHash = storageRootHash
}

f.logger.Infof("Task %s encrypted locally. LoRA available at /v1/user/%s/task/%s/lora", task.ID, task.UserAddress, task.ID)
} else {
// Normal flow: encrypt and upload to 0G Storage
settlementMetadata, err = f.encryptAndUploadModel(ctx, paths.Output, task)
if err != nil {
return err
}
f.logger.Infof("Skipping 0G Storage upload (skipStorageUpload=true) for task %s", task.ID)
}

// Note: DeliverIndex is deprecated since we now use task ID for deliverable identification
// Setting to 0 for backward compatibility
// Step 3: Update task in DB and contract
if err = f.db.UpdateTask(task.ID,
db.Task{
OutputRootHash: hexutil.Encode(settlementMetadata.ModelRootHash),
Secret: hexutil.Encode(settlementMetadata.Secret),
EncryptedSecret: hexutil.Encode(settlementMetadata.EncryptedSecret),
DeliverIndex: 0, // Deprecated: now using task ID instead of index
DeliverTime: time.Now().Unix(), // TODO: better use tx timestamp
DeliverTime: time.Now().Unix(),
}); err != nil {
f.logger.Errorf("Failed to update task: %v", err)
return err
Expand Down Expand Up @@ -190,77 +196,26 @@ func (f *Finalizer) encryptModelLocal(sourceDir string, task *db.Task) (*Settlem
}, nil
}

func (f *Finalizer) encryptAndUploadModel(ctx context.Context, sourceDir string, task *db.Task) (*SettlementMetadata, error) {
aesKey, err := util.GenerateAESKey(aesKeySize)
if err != nil {
return nil, err
}

plainFile, err := util.Zip(sourceDir)
if err != nil {
return nil, err
}
defer func() {
if err := os.Remove(plainFile); err != nil && !os.IsNotExist(err) {
f.logger.Errorf("Failed to remove temporary file %s: %v", plainFile, err)
}
}()

encryptFile, err := util.GetFileName(sourceDir, ".data")
if err != nil {
return nil, err
}

tag, err := util.AesEncryptLargeFile(aesKey, plainFile, encryptFile)
if err != nil {
return nil, err
}

tagSig, err := crypto.Sign(crypto.Keccak256(tag[:]), f.teeService.ProviderSigner)
if err != nil {
return nil, errors.Wrap(err, "sign tag failed")
}

err = util.WriteToFileHead(encryptFile, tagSig)
defer func() {
if err := os.Remove(encryptFile); err != nil && !os.IsNotExist(err) {
f.logger.Errorf("Failed to remove temporary file %s: %v", encryptFile, err)
}
}()

if err != nil {
return nil, err
}

modelRootHashes, err := f.uploadModel(ctx, encryptFile)
if err != nil {
return nil, err
}

encryptKey, err := f.encryptAESKey(aesKey, task.UserPublicKey)
if err != nil {
return nil, err
}

return &SettlementMetadata{
ModelRootHash: modelRootHashes,
Secret: aesKey,
EncryptedSecret: encryptKey,
}, nil
}
// encryptAndUploadModel is deprecated - kept for reference.
// The new flow uses encryptModelLocal() + uploadModel() separately,
// so the encrypted file is always kept as a local backup.

func (f *Finalizer) uploadModel(ctx context.Context, encryptFile string) ([]byte, error) {
modelRootHashes, err := f.uploadModelWithTimeout(ctx, encryptFile)
if err != nil {
return nil, err
}

if len(modelRootHashes) == 1 {
// Single fragment: return raw 32-byte hash
// This is consistent with localRootHash (crypto.Keccak256) which also returns raw bytes
return modelRootHashes[0].Bytes(), nil
}

// Multi-fragment: concatenate raw bytes of all hashes
var data []byte
for i, hash := range modelRootHashes {
if i > 0 {
data = append(data, ',')
}
data = append(data, []byte(hash.Hex())...)
for _, hash := range modelRootHashes {
data = append(data, hash.Bytes()...)
}
return data, nil
}
Expand Down
123 changes: 122 additions & 1 deletion api/fine-tuning/internal/services/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"math/big"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
Expand Down Expand Up @@ -313,7 +314,9 @@ func (s *Setup) useLocalDataset(localPath string, paths *utils.TaskPaths) error
return nil
}

// downloadDatasetFromStorage downloads dataset from 0G Storage
// downloadDatasetFromStorage downloads dataset from 0G Storage.
// Handles both ZIP archives (containing HF dataset) and raw JSONL files.
// If the downloaded file is a raw JSONL, it is converted to HuggingFace DatasetDict format.
func (s *Setup) downloadDatasetFromStorage(ctx context.Context, task *db.Task, paths *utils.TaskPaths) error {
datasetTopLevelDir, err := s.storage.DownloadFromStorage(ctx, task.DatasetHash, paths.Dataset, constant.IS_TURBO)
if err != nil {
Expand All @@ -331,6 +334,124 @@ func (s *Setup) downloadDatasetFromStorage(ctx context.Context, task *db.Task, p
return err
}
}

// Check if the downloaded dataset is a file (raw JSONL) rather than a directory (HF format).
// The token counter and training executor require HF DatasetDict format.
info, err := os.Stat(paths.Dataset)
if err != nil {
return errors.Wrap(err, "stat downloaded dataset")
}
if !info.IsDir() {
s.logger.Infof("Downloaded dataset is a raw file (likely JSONL), converting to HF format...")
if err := s.convertRawDatasetToHF(paths.Dataset); err != nil {
return errors.Wrap(err, "convert raw dataset from 0G Storage to HF format")
}
}

return nil
}

// convertRawDatasetToHF converts a raw JSONL dataset file to HuggingFace DatasetDict format.
// It replaces the raw file with a directory containing the HF dataset.
func (s *Setup) convertRawDatasetToHF(datasetPath string) error {
// Move the raw file to a temporary location
rawPath := datasetPath + ".jsonl"
if err := os.Rename(datasetPath, rawPath); err != nil {
return errors.Wrap(err, "move raw dataset to temp path")
}

// Python script to convert JSONL to HF format
pythonScript := `
import json
import sys
import os
from datasets import Dataset, DatasetDict

jsonl_file = sys.argv[1]
output_dir = sys.argv[2]

data = {"instruction": [], "input": [], "output": []}
messages_format = False
text_format = False

with open(jsonl_file, 'r') as f:
lines = [line.strip() for line in f if line.strip()]

if lines:
first_item = json.loads(lines[0])
if "messages" in first_item:
messages_format = True
elif "text" in first_item and "instruction" not in first_item:
text_format = True

if messages_format:
for line in lines:
item = json.loads(line)
messages = item.get("messages", [])
instruction = ""
output = ""
for msg in messages:
role = msg.get("role", "")
content = msg.get("content", "")
if role == "user":
instruction = content
elif role == "assistant":
output = content
data["instruction"].append(instruction)
data["input"].append("")
data["output"].append(output)
elif text_format:
data = {"text": []}
for line in lines:
item = json.loads(line)
data["text"].append(item.get("text", ""))
else:
for line in lines:
item = json.loads(line)
data["instruction"].append(item.get("instruction", ""))
data["input"].append(item.get("input", ""))
data["output"].append(item.get("output", ""))

ds = DatasetDict({"train": Dataset.from_dict(data)})
ds.save_to_disk(output_dir)
print(f"Converted {len(lines)} examples to {output_dir}")
`

// Save Python script to temp file
scriptPath := rawPath + "_convert.py"
if err := os.WriteFile(scriptPath, []byte(pythonScript), 0644); err != nil {
return errors.Wrap(err, "write conversion script")
}
defer os.Remove(scriptPath)

// Try running Python directly first
cmd := exec.Command("python3", scriptPath, rawPath, datasetPath)
output, err := cmd.CombinedOutput()
if err == nil {
s.logger.Infof("Converted raw dataset to HF format using Python: %s", string(output))
os.Remove(rawPath)
return nil
}
s.logger.Warnf("Direct Python conversion failed: %v (output: %s), trying Docker...", err, string(output))

// Fall back to Docker
parentDir := filepath.Dir(rawPath)
cmd = exec.Command("docker", "run", "--rm",
"-v", parentDir+":/data",
s.config.Images.ExecutionImageName,
"python3", "/data/"+filepath.Base(scriptPath),
"/data/"+filepath.Base(rawPath),
"/data/"+filepath.Base(datasetPath))

output, err = cmd.CombinedOutput()
if err != nil {
// Restore the raw file on failure
os.Rename(rawPath, datasetPath)
return errors.Wrapf(err, "convert raw dataset to HF format: %s", string(output))
}

s.logger.Infof("Converted raw dataset to HF format using Docker: %s", string(output))
os.Remove(rawPath)
return nil
}

Expand Down
Loading