Skip to content

Commit 609aa6b

Browse files
authored
feat: add implemention of nydus intercept (#155)
Signed-off-by: Yang Kaiyong <[email protected]> Co-authored-by: Yang Kaiyong <[email protected]>
1 parent fbf92fc commit 609aa6b

File tree

4 files changed

+167
-3
lines changed

4 files changed

+167
-3
lines changed

pkg/backend/build/builder_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,18 @@ package build
1919
import (
2020
"context"
2121
"errors"
22+
"io"
2223
"os"
2324
"path/filepath"
2425
"strings"
26+
"sync"
2527
"testing"
2628
"time"
2729

2830
modelspec "github.com/CloudNativeAI/model-spec/specs-go/v1"
2931
godigest "github.com/opencontainers/go-digest"
3032
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
33+
"github.com/stretchr/testify/assert"
3134
"github.com/stretchr/testify/mock"
3235
"github.com/stretchr/testify/suite"
3336

@@ -268,3 +271,18 @@ func (s *BuilderTestSuite) TestBuildModelConfig() {
268271
func TestBuilderSuite(t *testing.T) {
269272
suite.Run(t, new(BuilderTestSuite))
270273
}
274+
275+
func TestPipeReader(t *testing.T) {
276+
r := strings.NewReader("some io.Reader stream to be read\n")
277+
r1, r2 := splitReader(r)
278+
var wg sync.WaitGroup
279+
wg.Add(1)
280+
go func() {
281+
defer wg.Done()
282+
_, err := io.Copy(os.Stdout, r2)
283+
assert.NoError(t, err)
284+
}()
285+
_, err := io.Copy(os.Stdout, r1)
286+
assert.NoError(t, err)
287+
wg.Wait()
288+
}

pkg/backend/build/interceptor/nydus.go

Lines changed: 144 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,159 @@
1717
package interceptor
1818

1919
import (
20+
"archive/tar"
2021
"context"
22+
"encoding/json"
23+
"fmt"
24+
"hash/crc32"
2125
"io"
26+
"strings"
27+
28+
"github.com/CloudNativeAI/modctl/pkg/codec"
29+
modelspec "github.com/CloudNativeAI/model-spec/specs-go/v1"
30+
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
31+
)
32+
33+
const (
34+
CrcsKey = "org.cnai.nydus.crcs"
35+
DefaultFileChunkSize = 4 * 1024 * 1024
2236
)
2337

38+
var mediaTypeChunkSizeMap = map[string]int{
39+
modelspec.MediaTypeModelWeight: 64 * 1024 * 1024,
40+
modelspec.MediaTypeModelDataset: 64 * 1024 * 1024,
41+
}
42+
43+
var table = crc32.MakeTable(crc32.Castagnoli)
44+
2445
type nydus struct{}
2546

47+
type FileCrcList struct {
48+
Files []FileCrcInfo `json:"files"`
49+
}
50+
51+
type FileCrcInfo struct {
52+
FilePath string `json:"file_path"`
53+
ChunkCrcs string `json:"chunk_crcs"`
54+
}
55+
2656
func NewNydus() *nydus {
2757
return &nydus{}
2858
}
2959

3060
func (n *nydus) Intercept(ctx context.Context, mediaType string, filepath string, readerType string, reader io.Reader) (ApplyDescriptorFn, error) {
31-
// TODO: Implement nydus interceptor
32-
return nil, nil
61+
crcsStr := ""
62+
chunkSize := int64(DefaultFileChunkSize)
63+
if c, ok := mediaTypeChunkSizeMap[mediaType]; ok {
64+
chunkSize = int64(c)
65+
}
66+
67+
switch readerType {
68+
case codec.Tar:
69+
fileCrcs, err := calcCrc32inTar(ctx, reader, chunkSize)
70+
if err != nil {
71+
return nil, fmt.Errorf("failed to calculate crc32 in tar: %w", err)
72+
}
73+
b, err := json.Marshal(fileCrcs)
74+
if err != nil {
75+
return nil, fmt.Errorf("failed to marshal crcs: %w", err)
76+
}
77+
return func(desc *ocispec.Descriptor) {
78+
if desc.Annotations == nil {
79+
desc.Annotations = make(map[string]string)
80+
}
81+
desc.Annotations[CrcsKey] = string(b)
82+
}, nil
83+
case codec.Raw:
84+
crc32Results, err := calcCrc32(ctx, reader, chunkSize)
85+
if err != nil {
86+
return nil, fmt.Errorf("failed to calculate crc32: %w", err)
87+
}
88+
crcsStr = buildCrc32Str(crc32Results)
89+
crcInfo := FileCrcInfo{
90+
FilePath: filepath,
91+
ChunkCrcs: crcsStr,
92+
}
93+
crcs := FileCrcList{
94+
Files: []FileCrcInfo{crcInfo},
95+
}
96+
b, err := json.Marshal(crcs)
97+
if err != nil {
98+
return nil, fmt.Errorf("failed to marshal crcs: %w", err)
99+
}
100+
return func(desc *ocispec.Descriptor) {
101+
if desc.Annotations == nil {
102+
desc.Annotations = make(map[string]string)
103+
}
104+
desc.Annotations[CrcsKey] = string(b)
105+
}, nil
106+
default:
107+
return nil, fmt.Errorf("unsupported reader type: %s", readerType)
108+
}
109+
}
110+
111+
func calcCrc32inTar(ctx context.Context, r io.Reader, chunkSize int64) (*FileCrcList, error) {
112+
fileCrcList := FileCrcList{
113+
Files: make([]FileCrcInfo, 0),
114+
}
115+
tarReader := tar.NewReader(r)
116+
for {
117+
select {
118+
case <-ctx.Done():
119+
return nil, ctx.Err()
120+
default:
121+
header, err := tarReader.Next()
122+
if err == io.EOF {
123+
return &fileCrcList, nil
124+
}
125+
if err != nil {
126+
return nil, fmt.Errorf("error reading tar: %w", err)
127+
}
128+
if header.Typeflag == tar.TypeReg {
129+
crc32Results, err := calcCrc32(ctx, tarReader, chunkSize)
130+
if err != nil {
131+
return nil, fmt.Errorf("failed to calculate crc32: %w", err)
132+
}
133+
crcsStr := buildCrc32Str(crc32Results)
134+
crcInfo := FileCrcInfo{
135+
FilePath: header.Name,
136+
ChunkCrcs: crcsStr,
137+
}
138+
fileCrcList.Files = append(fileCrcList.Files, crcInfo)
139+
}
140+
}
141+
}
142+
}
143+
144+
func calcCrc32(ctx context.Context, r io.Reader, chunkSize int64) ([]uint32, error) {
145+
var crc32Results []uint32
146+
for {
147+
select {
148+
case <-ctx.Done():
149+
return nil, ctx.Err()
150+
default:
151+
limitedReader := io.LimitReader(r, chunkSize)
152+
hash := crc32.New(table)
153+
n, err := io.Copy(hash, limitedReader)
154+
if n == 0 || err == io.EOF {
155+
return crc32Results, nil
156+
}
157+
158+
if err != nil {
159+
return nil, fmt.Errorf("failed to read data: %w", err)
160+
}
161+
162+
if n > 0 {
163+
crc32Results = append(crc32Results, hash.Sum32())
164+
}
165+
}
166+
}
167+
}
168+
169+
func buildCrc32Str(crc32Results []uint32) string {
170+
hexCrcs := make([]string, len(crc32Results))
171+
for i, crc := range crc32Results {
172+
hexCrcs[i] = fmt.Sprintf("0x%x", crc)
173+
}
174+
return strings.Join(hexCrcs, ",")
33175
}

pkg/backend/build/remote.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ func (ro *remoteOutput) OutputLayer(ctx context.Context, mediaType, relPath, dig
6969
}
7070

7171
if exist {
72+
// In case the reader is from PipeReader, we need to read the whole reader to avoid the pipe being blocked.
73+
io.Copy(io.Discard, reader)
7274
hooks.OnComplete(relPath, desc)
7375
return desc, nil
7476
}

pkg/backend/nydusify.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package backend
1818

1919
import (
2020
"context"
21+
"os"
2122
"os/exec"
2223
)
2324

@@ -44,7 +45,8 @@ func (b *backend) Nydusify(ctx context.Context, source string) (string, error) {
4445
"--target",
4546
target,
4647
)
47-
48+
cmd.Stdout = os.Stdout
49+
cmd.Stderr = os.Stderr
4850
if err := cmd.Run(); err != nil {
4951
return "", err
5052
}

0 commit comments

Comments
 (0)