@@ -24,6 +24,7 @@ import (
2424 "io"
2525 "os"
2626 "path/filepath"
27+ "sync"
2728 "time"
2829
2930 modelspec "github.com/CloudNativeAI/model-spec/specs-go/v1"
@@ -34,6 +35,7 @@ import (
3435
3536 buildconfig "github.com/CloudNativeAI/modctl/pkg/backend/build/config"
3637 "github.com/CloudNativeAI/modctl/pkg/backend/build/hooks"
38+ "github.com/CloudNativeAI/modctl/pkg/backend/build/interceptor"
3739 "github.com/CloudNativeAI/modctl/pkg/codec"
3840 "github.com/CloudNativeAI/modctl/pkg/storage"
3941)
@@ -96,10 +98,11 @@ func NewBuilder(outputType OutputType, store storage.Storage, repo, tag string,
9698 }
9799
98100 return & abstractBuilder {
99- store : store ,
100- repo : repo ,
101- tag : tag ,
102- strategy : strategy ,
101+ store : store ,
102+ repo : repo ,
103+ tag : tag ,
104+ strategy : strategy ,
105+ interceptor : cfg .interceptor ,
103106 }, nil
104107}
105108
@@ -110,6 +113,8 @@ type abstractBuilder struct {
110113 tag string
111114 // strategy is the output strategy used to output the blob.
112115 strategy OutputStrategy
116+ // interceptor is the interceptor used to intercept the build process.
117+ interceptor interceptor.Interceptor
113118}
114119
115120func (ab * abstractBuilder ) BuildLayer (ctx context.Context , mediaType , workDir , path string , hooks hooks.Hooks ) (ocispec.Descriptor , error ) {
@@ -167,7 +172,39 @@ func (ab *abstractBuilder) BuildLayer(ctx context.Context, mediaType, workDir, p
167172 }
168173 }
169174
170- return ab .strategy .OutputLayer (ctx , mediaType , relPath , digest , size , reader , hooks )
175+ var (
176+ wg sync.WaitGroup
177+ itErr error
178+ applyDesc interceptor.ApplyDescriptorFn
179+ )
180+ // Intercept the reader if needed.
181+ if ab .interceptor != nil {
182+ var itReader io.Reader
183+ reader , itReader = splitReader (reader )
184+
185+ wg .Add (1 )
186+ go func () {
187+ defer wg .Done ()
188+ applyDesc , itErr = ab .interceptor .Intercept (ctx , mediaType , relPath , codec .Type (), itReader )
189+ }()
190+ }
191+
192+ desc , err := ab .strategy .OutputLayer (ctx , mediaType , relPath , digest , size , reader , hooks )
193+ if err != nil {
194+ return desc , err
195+ }
196+
197+ // Wait for the interceptor to finish.
198+ wg .Wait ()
199+ if itErr != nil {
200+ return desc , itErr
201+ }
202+
203+ if applyDesc != nil {
204+ applyDesc (& desc )
205+ }
206+
207+ return desc , nil
171208}
172209
173210func (ab * abstractBuilder ) BuildConfig (ctx context.Context , layers []ocispec.Descriptor , modelConfig * buildconfig.Model , hooks hooks.Hooks ) (ocispec.Descriptor , error ) {
@@ -247,3 +284,23 @@ func buildModelConfig(modelConfig *buildconfig.Model, layers []ocispec.Descripto
247284 ModelFS : fs ,
248285 }, nil
249286}
287+
288+ // splitReader splits the original reader into two readers.
289+ func splitReader (original io.Reader ) (io.Reader , io.Reader ) {
290+ r1 , w1 := io .Pipe ()
291+ r2 , w2 := io .Pipe ()
292+ multiWriter := io .MultiWriter (w1 , w2 )
293+
294+ go func () {
295+ defer w1 .Close ()
296+ defer w2 .Close ()
297+
298+ _ , err := io .Copy (multiWriter , original )
299+ if err != nil {
300+ w1 .CloseWithError (err )
301+ w2 .CloseWithError (err )
302+ }
303+ }()
304+
305+ return r1 , r2
306+ }
0 commit comments