Skip to content

Commit db61c7a

Browse files
zeyuyuyuWuying Created Local Userscursoragent
authored
feat(fine-tuning): storage-first delivery flow with TEE fallback (#354)
* feat: storage-first flow - upload to 0G Storage + keep local backup - Finalizer now always encrypts locally first (TEE backup) - Then attempts upload to 0G Storage (unless skipStorageUpload=true) - If storage upload fails, falls back to local hash gracefully - Encrypted file is always retained for TEE download fallback - Removes old encryptAndUploadModel (split into encrypt + upload) Co-authored-by: Cursor <cursoragent@cursor.com> * fix: handle raw JSONL files downloaded from 0G Storage Previously DownloadFromStorage always assumed ZIP format, causing "zip: not a valid zip file" when CLI uploads raw JSONL to 0G Storage. Changes: - Detect file type via magic bytes before attempting unzip - For non-ZIP files (raw JSONL), move directly to target path - Convert raw JSONL to HuggingFace DatasetDict format after download (required by token counter and training executor) Co-authored-by: Cursor <cursoragent@cursor.com> * fix: handle raw JSONL from 0G Storage and auto-calculate fee - DownloadFromStorage: detect file type via magic bytes, handle non-ZIP - Clean up existing target before rename on retry - Convert raw JSONL to HF DatasetDict format after download - Auto-calculate fee when fee=0 (broker-calculated fee mode) - Verify signature with original fee to avoid mismatch Co-authored-by: Cursor <cursoragent@cursor.com> * fix: address code review feedback for storage client - [HIGH] Use passed ctx instead of context.Background() in Download call for proper cancellation/timeout propagation - [HIGH] Add defer cleanup for temp download file to prevent disk leaks on early error returns - [MEDIUM] Improve isZipFile() to return errors on I/O failures instead of silently treating them as non-ZIP files Co-authored-by: Cursor <cursoragent@cursor.com> * fix: upgrade 0g-storage-client to v1.2.2 and fix root hash encoding - Upgrade 0g-storage-client v1.2.1 → v1.2.2 to fix ABI incompatibility with testnet Flow contract (was causing `execution reverted; data: 0x`) - Remove deprecated WithRoutines() calls (now set internally via UploaderConfig in v1.2.2) - Fix uploadModel() to return raw 32-byte hash (hash.Bytes()) instead of hex string bytes ([]byte(hash.Hex())), which caused double-encoding via hexutil.Encode and prevented client from downloading model via 0G Storage Tested: Full flow verified with Qwen2.5-0.5B and Qwen3-32B (1.1GB model) — broker upload, client download from 0G Storage, decrypt, LoRA verified. Co-authored-by: Cursor <cursoragent@cursor.com> --------- Co-authored-by: Wuying Created Local Users <zeyu@hoexzqc9b0fyi8p.US-EAST-1-198-1-1.WUYING.LOCAL> Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent bed782d commit db61c7a

File tree

5 files changed

+228
-97
lines changed

5 files changed

+228
-97
lines changed

api/fine-tuning/internal/services/finalizer.go

Lines changed: 37 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -83,35 +83,41 @@ func (f *Finalizer) Execute(ctx context.Context, task *db.Task, paths *utils.Tas
8383

8484
userAddr := common.HexToAddress(task.UserAddress)
8585

86-
// Check if storage upload should be skipped
87-
// When skipped, still encrypt but save locally for TEE download via /v1/user/:address/task/:id/lora
88-
if f.config.Service.SkipStorageUpload {
89-
f.logger.Infof("Skipping 0G Storage upload (skipStorageUpload=true), encrypting locally for task %s", task.ID)
90-
91-
// Encrypt and save locally (no upload to storage)
92-
settlementMetadata, err = f.encryptModelLocal(paths.Output, task)
93-
if err != nil {
94-
return err
86+
// Step 1: Always encrypt and save locally first (this creates the TEE backup)
87+
f.logger.Infof("Encrypting LoRA model locally for task %s", task.ID)
88+
settlementMetadata, err = f.encryptModelLocal(paths.Output, task)
89+
if err != nil {
90+
return err
91+
}
92+
f.logger.Infof("Task %s encrypted locally. TEE backup available at /v1/user/%s/task/%s/lora", task.ID, task.UserAddress, task.ID)
93+
94+
// Step 2: Try to upload to 0G Storage (unless explicitly skipped)
95+
if !f.config.Service.SkipStorageUpload {
96+
encryptedFilePath := paths.Output + "_encrypted.data"
97+
f.logger.Infof("Uploading encrypted LoRA to 0G Storage for task %s", task.ID)
98+
99+
storageRootHash, uploadErr := f.uploadModel(ctx, encryptedFilePath)
100+
if uploadErr != nil {
101+
// Upload failed - log warning but continue with local backup
102+
f.logger.Warnf("Failed to upload to 0G Storage for task %s: %v. Using local backup hash.", task.ID, uploadErr)
103+
// Keep the local hash (keccak256 of encrypted file) as the root hash
104+
} else {
105+
// Upload succeeded - use the storage root hash instead
106+
f.logger.Infof("Successfully uploaded to 0G Storage for task %s, root hash: %s", task.ID, string(storageRootHash))
107+
settlementMetadata.ModelRootHash = storageRootHash
95108
}
96-
97-
f.logger.Infof("Task %s encrypted locally. LoRA available at /v1/user/%s/task/%s/lora", task.ID, task.UserAddress, task.ID)
98109
} else {
99-
// Normal flow: encrypt and upload to 0G Storage
100-
settlementMetadata, err = f.encryptAndUploadModel(ctx, paths.Output, task)
101-
if err != nil {
102-
return err
103-
}
110+
f.logger.Infof("Skipping 0G Storage upload (skipStorageUpload=true) for task %s", task.ID)
104111
}
105112

106-
// Note: DeliverIndex is deprecated since we now use task ID for deliverable identification
107-
// Setting to 0 for backward compatibility
113+
// Step 3: Update task in DB and contract
108114
if err = f.db.UpdateTask(task.ID,
109115
db.Task{
110116
OutputRootHash: hexutil.Encode(settlementMetadata.ModelRootHash),
111117
Secret: hexutil.Encode(settlementMetadata.Secret),
112118
EncryptedSecret: hexutil.Encode(settlementMetadata.EncryptedSecret),
113119
DeliverIndex: 0, // Deprecated: now using task ID instead of index
114-
DeliverTime: time.Now().Unix(), // TODO: better use tx timestamp
120+
DeliverTime: time.Now().Unix(),
115121
}); err != nil {
116122
f.logger.Errorf("Failed to update task: %v", err)
117123
return err
@@ -190,77 +196,26 @@ func (f *Finalizer) encryptModelLocal(sourceDir string, task *db.Task) (*Settlem
190196
}, nil
191197
}
192198

193-
func (f *Finalizer) encryptAndUploadModel(ctx context.Context, sourceDir string, task *db.Task) (*SettlementMetadata, error) {
194-
aesKey, err := util.GenerateAESKey(aesKeySize)
195-
if err != nil {
196-
return nil, err
197-
}
198-
199-
plainFile, err := util.Zip(sourceDir)
200-
if err != nil {
201-
return nil, err
202-
}
203-
defer func() {
204-
if err := os.Remove(plainFile); err != nil && !os.IsNotExist(err) {
205-
f.logger.Errorf("Failed to remove temporary file %s: %v", plainFile, err)
206-
}
207-
}()
208-
209-
encryptFile, err := util.GetFileName(sourceDir, ".data")
210-
if err != nil {
211-
return nil, err
212-
}
213-
214-
tag, err := util.AesEncryptLargeFile(aesKey, plainFile, encryptFile)
215-
if err != nil {
216-
return nil, err
217-
}
218-
219-
tagSig, err := crypto.Sign(crypto.Keccak256(tag[:]), f.teeService.ProviderSigner)
220-
if err != nil {
221-
return nil, errors.Wrap(err, "sign tag failed")
222-
}
223-
224-
err = util.WriteToFileHead(encryptFile, tagSig)
225-
defer func() {
226-
if err := os.Remove(encryptFile); err != nil && !os.IsNotExist(err) {
227-
f.logger.Errorf("Failed to remove temporary file %s: %v", encryptFile, err)
228-
}
229-
}()
230-
231-
if err != nil {
232-
return nil, err
233-
}
234-
235-
modelRootHashes, err := f.uploadModel(ctx, encryptFile)
236-
if err != nil {
237-
return nil, err
238-
}
239-
240-
encryptKey, err := f.encryptAESKey(aesKey, task.UserPublicKey)
241-
if err != nil {
242-
return nil, err
243-
}
244-
245-
return &SettlementMetadata{
246-
ModelRootHash: modelRootHashes,
247-
Secret: aesKey,
248-
EncryptedSecret: encryptKey,
249-
}, nil
250-
}
199+
// encryptAndUploadModel is deprecated - kept for reference.
200+
// The new flow uses encryptModelLocal() + uploadModel() separately,
201+
// so the encrypted file is always kept as a local backup.
251202

252203
func (f *Finalizer) uploadModel(ctx context.Context, encryptFile string) ([]byte, error) {
253204
modelRootHashes, err := f.uploadModelWithTimeout(ctx, encryptFile)
254205
if err != nil {
255206
return nil, err
256207
}
257208

209+
if len(modelRootHashes) == 1 {
210+
// Single fragment: return raw 32-byte hash
211+
// This is consistent with localRootHash (crypto.Keccak256) which also returns raw bytes
212+
return modelRootHashes[0].Bytes(), nil
213+
}
214+
215+
// Multi-fragment: concatenate raw bytes of all hashes
258216
var data []byte
259-
for i, hash := range modelRootHashes {
260-
if i > 0 {
261-
data = append(data, ',')
262-
}
263-
data = append(data, []byte(hash.Hex())...)
217+
for _, hash := range modelRootHashes {
218+
data = append(data, hash.Bytes()...)
264219
}
265220
return data, nil
266221
}

api/fine-tuning/internal/services/setup.go

Lines changed: 122 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"math/big"
88
"os"
9+
"os/exec"
910
"path/filepath"
1011
"strings"
1112
"time"
@@ -313,7 +314,9 @@ func (s *Setup) useLocalDataset(localPath string, paths *utils.TaskPaths) error
313314
return nil
314315
}
315316

316-
// downloadDatasetFromStorage downloads dataset from 0G Storage
317+
// downloadDatasetFromStorage downloads dataset from 0G Storage.
318+
// Handles both ZIP archives (containing HF dataset) and raw JSONL files.
319+
// If the downloaded file is a raw JSONL, it is converted to HuggingFace DatasetDict format.
317320
func (s *Setup) downloadDatasetFromStorage(ctx context.Context, task *db.Task, paths *utils.TaskPaths) error {
318321
datasetTopLevelDir, err := s.storage.DownloadFromStorage(ctx, task.DatasetHash, paths.Dataset, constant.IS_TURBO)
319322
if err != nil {
@@ -331,6 +334,124 @@ func (s *Setup) downloadDatasetFromStorage(ctx context.Context, task *db.Task, p
331334
return err
332335
}
333336
}
337+
338+
// Check if the downloaded dataset is a file (raw JSONL) rather than a directory (HF format).
339+
// The token counter and training executor require HF DatasetDict format.
340+
info, err := os.Stat(paths.Dataset)
341+
if err != nil {
342+
return errors.Wrap(err, "stat downloaded dataset")
343+
}
344+
if !info.IsDir() {
345+
s.logger.Infof("Downloaded dataset is a raw file (likely JSONL), converting to HF format...")
346+
if err := s.convertRawDatasetToHF(paths.Dataset); err != nil {
347+
return errors.Wrap(err, "convert raw dataset from 0G Storage to HF format")
348+
}
349+
}
350+
351+
return nil
352+
}
353+
354+
// convertRawDatasetToHF converts a raw JSONL dataset file to HuggingFace DatasetDict format.
355+
// It replaces the raw file with a directory containing the HF dataset.
356+
func (s *Setup) convertRawDatasetToHF(datasetPath string) error {
357+
// Move the raw file to a temporary location
358+
rawPath := datasetPath + ".jsonl"
359+
if err := os.Rename(datasetPath, rawPath); err != nil {
360+
return errors.Wrap(err, "move raw dataset to temp path")
361+
}
362+
363+
// Python script to convert JSONL to HF format
364+
pythonScript := `
365+
import json
366+
import sys
367+
import os
368+
from datasets import Dataset, DatasetDict
369+
370+
jsonl_file = sys.argv[1]
371+
output_dir = sys.argv[2]
372+
373+
data = {"instruction": [], "input": [], "output": []}
374+
messages_format = False
375+
text_format = False
376+
377+
with open(jsonl_file, 'r') as f:
378+
lines = [line.strip() for line in f if line.strip()]
379+
380+
if lines:
381+
first_item = json.loads(lines[0])
382+
if "messages" in first_item:
383+
messages_format = True
384+
elif "text" in first_item and "instruction" not in first_item:
385+
text_format = True
386+
387+
if messages_format:
388+
for line in lines:
389+
item = json.loads(line)
390+
messages = item.get("messages", [])
391+
instruction = ""
392+
output = ""
393+
for msg in messages:
394+
role = msg.get("role", "")
395+
content = msg.get("content", "")
396+
if role == "user":
397+
instruction = content
398+
elif role == "assistant":
399+
output = content
400+
data["instruction"].append(instruction)
401+
data["input"].append("")
402+
data["output"].append(output)
403+
elif text_format:
404+
data = {"text": []}
405+
for line in lines:
406+
item = json.loads(line)
407+
data["text"].append(item.get("text", ""))
408+
else:
409+
for line in lines:
410+
item = json.loads(line)
411+
data["instruction"].append(item.get("instruction", ""))
412+
data["input"].append(item.get("input", ""))
413+
data["output"].append(item.get("output", ""))
414+
415+
ds = DatasetDict({"train": Dataset.from_dict(data)})
416+
ds.save_to_disk(output_dir)
417+
print(f"Converted {len(lines)} examples to {output_dir}")
418+
`
419+
420+
// Save Python script to temp file
421+
scriptPath := rawPath + "_convert.py"
422+
if err := os.WriteFile(scriptPath, []byte(pythonScript), 0644); err != nil {
423+
return errors.Wrap(err, "write conversion script")
424+
}
425+
defer os.Remove(scriptPath)
426+
427+
// Try running Python directly first
428+
cmd := exec.Command("python3", scriptPath, rawPath, datasetPath)
429+
output, err := cmd.CombinedOutput()
430+
if err == nil {
431+
s.logger.Infof("Converted raw dataset to HF format using Python: %s", string(output))
432+
os.Remove(rawPath)
433+
return nil
434+
}
435+
s.logger.Warnf("Direct Python conversion failed: %v (output: %s), trying Docker...", err, string(output))
436+
437+
// Fall back to Docker
438+
parentDir := filepath.Dir(rawPath)
439+
cmd = exec.Command("docker", "run", "--rm",
440+
"-v", parentDir+":/data",
441+
s.config.Images.ExecutionImageName,
442+
"python3", "/data/"+filepath.Base(scriptPath),
443+
"/data/"+filepath.Base(rawPath),
444+
"/data/"+filepath.Base(datasetPath))
445+
446+
output, err = cmd.CombinedOutput()
447+
if err != nil {
448+
// Restore the raw file on failure
449+
os.Rename(rawPath, datasetPath)
450+
return errors.Wrapf(err, "convert raw dataset to HF format: %s", string(output))
451+
}
452+
453+
s.logger.Infof("Converted raw dataset to HF format using Docker: %s", string(output))
454+
os.Remove(rawPath)
334455
return nil
335456
}
336457

0 commit comments

Comments
 (0)