22// Licensed under the MIT License.
33
44#nullable enable
5- using System ;
65using System . Collections . Generic ;
76using System . Diagnostics . ContractsLight ;
87using System . IO ;
1110using BuildXL . Cache . ContentStore . Distributed . NuCache ;
1211using BuildXL . Cache . ContentStore . Distributed . Stores ;
1312using BuildXL . Cache . ContentStore . Hashing ;
14- using BuildXL . Cache . ContentStore . Interfaces . FileSystem ;
1513using BuildXL . Cache . ContentStore . Interfaces . Results ;
1614using BuildXL . Cache . ContentStore . Interfaces . Sessions ;
1715using BuildXL . Cache . ContentStore . Interfaces . Tracing ;
@@ -90,36 +88,42 @@ public override Task<IEnumerable<Task<Indexed<PinResult>>>> PinAsync(Context con
9088
9189 protected override async Task < OpenStreamResult > OpenStreamCoreAsync ( OperationContext context , ContentHash contentHash , UrgencyHint urgencyHint , Counter retryCounter )
9290 {
93- // The following logic relies on the fact that we can create a file stream pointing to a file, emit a delete,
94- // and the file will be deleted when the last remaining file handle is closed.
95- using var temporary = new DisposableFile ( context , _ephemeralHost . FileSystem , AbsolutePath . CreateRandomFileName ( _ephemeralHost . Configuration . Workspace ) ) ;
91+ var local = await _local . OpenStreamAsync ( context , contentHash , context . Token , urgencyHint ) ;
92+ if ( local . Succeeded )
93+ {
94+ _ephemeralHost . PutElisionCache . TryAdd ( contentHash , local . StreamWithLength ! . Value . Length , _ephemeralHost . Configuration . PutCacheTimeToLive ) ;
9695
97- var placeResult = await PlaceFileCoreAsync (
98- context ,
99- contentHash ,
100- temporary . Path ,
101- FileAccessMode . ReadOnly ,
102- FileReplacementMode . ReplaceExisting ,
103- FileRealizationMode . Any ,
104- urgencyHint ,
105- retryCounter ) ;
106- if ( ! placeResult . Succeeded )
96+ return local ;
97+ }
98+
99+ using var guard = await _ephemeralHost . RemoteFetchLocks . AcquireAsync ( contentHash , context . Token ) ;
100+
101+ // Some other thread may have been downloading and inserting into the local cache. In such a case, we'll have
102+ // blocked above, and we can just return the result of the local cache.
103+ if ( ! guard . WaitFree )
107104 {
108- if ( placeResult . Code == PlaceFileResult . ResultCode . NotPlacedContentNotFound )
105+ local = await _local . OpenStreamAsync ( context , contentHash , context . Token , urgencyHint ) ;
106+ if ( local . Succeeded )
109107 {
110- return new OpenStreamResult ( OpenStreamResult . ResultCode . ContentNotFound , errorMessage : $ "Content with hash { contentHash } was not found") ;
108+ _ephemeralHost . PutElisionCache . TryAdd ( contentHash , local . StreamWithLength ! . Value . Length , _ephemeralHost . Configuration . PutCacheTimeToLive ) ;
109+
110+ return local ;
111111 }
112+ }
113+
114+ var putResult = await TryPeerToPeerFetchAsync ( context , contentHash , urgencyHint ) ;
115+ if ( putResult . Succeeded )
116+ {
117+ local = await _local . OpenStreamAsync ( context , contentHash , context . Token , urgencyHint ) ;
118+ if ( local . Succeeded )
119+ {
120+ _ephemeralHost . PutElisionCache . TryAdd ( contentHash , local . StreamWithLength ! . Value . Length , _ephemeralHost . Configuration . PutCacheTimeToLive ) ;
112121
113- return new OpenStreamResult ( placeResult , message : $ "Failed to find content with hash { contentHash } ") ;
122+ return local ;
123+ }
114124 }
115125
116- // We don't dispose the stream on purpose, because the callee takes ownership of it.
117- var stream = _ephemeralHost . FileSystem . TryOpen (
118- temporary . Path ,
119- FileAccess . Read ,
120- FileMode . Open ,
121- FileShare . Delete ) ;
122- return new OpenStreamResult ( stream ) ;
126+ return await _persistent . OpenStreamAsync ( context , contentHash , context . Token , urgencyHint ) ;
123127 }
124128
125129 protected override async Task < PlaceFileResult > PlaceFileCoreAsync (
@@ -132,7 +136,6 @@ protected override async Task<PlaceFileResult> PlaceFileCoreAsync(
132136 UrgencyHint urgencyHint ,
133137 Counter retryCounter )
134138 {
135- // Step 1: try to fetch it from the local content store.
136139 var local = await _local . PlaceFileAsync ( context , contentHash , path , accessMode , replacementMode , realizationMode , context . Token , urgencyHint ) ;
137140 if ( local . Succeeded )
138141 {
@@ -156,23 +159,29 @@ protected override async Task<PlaceFileResult> PlaceFileCoreAsync(
156159 }
157160 }
158161
159- // Step 2: try to fetch it from the datacenter cache.
160- var datacenter = await TryPlaceFromDatacenterCacheAsync (
162+ var datacenter = await TryPeerToPeerFetchAsync (
161163 context ,
162164 contentHash ,
163- path ,
164- accessMode ,
165- replacementMode ,
166- realizationMode ,
167165 urgencyHint ) ;
168166 if ( datacenter . Succeeded )
169167 {
170- _ephemeralHost . PutElisionCache . TryAdd ( contentHash , datacenter . FileSize , _ephemeralHost . Configuration . PutCacheTimeToLive ) ;
168+ local = await _local . PlaceFileAsync (
169+ context ,
170+ contentHash ,
171+ path ,
172+ accessMode ,
173+ replacementMode ,
174+ realizationMode ,
175+ context . Token ,
176+ urgencyHint ) ;
177+ if ( local . Succeeded )
178+ {
179+ return local . WithMaterializationSource ( PlaceFileResult . Source . DatacenterCache ) ;
180+ }
171181
172- return datacenter ;
182+ return new PlaceFileResult ( PlaceFileResult . ResultCode . NotPlacedContentNotFound , errorMessage : $ "Content hash ` { contentHash } ` inserted into local cache, but couldn't place from local" ) ;
173183 }
174184
175- // Step 3: try to fetch it from the persistent cache.
176185 var persistent = await _persistent . PlaceFileAsync (
177186 context ,
178187 contentHash ,
@@ -195,13 +204,9 @@ protected override async Task<PlaceFileResult> PlaceFileCoreAsync(
195204 return persistent . WithMaterializationSource ( PlaceFileResult . Source . BackingStore ) ;
196205 }
197206
198- private Task < PlaceFileResult > TryPlaceFromDatacenterCacheAsync (
207+ private Task < PutResult > TryPeerToPeerFetchAsync (
199208 OperationContext context ,
200209 ContentHash contentHash ,
201- AbsolutePath path ,
202- FileAccessMode accessMode ,
203- FileReplacementMode replacementMode ,
204- FileRealizationMode realizationMode ,
205210 UrgencyHint urgencyHint )
206211 {
207212 return context . PerformOperationAsync (
@@ -264,53 +269,33 @@ private Task<PlaceFileResult> TryPlaceFromDatacenterCacheAsync(
264269 var ( copyResult , tempLocation , attemptCount ) = copyInfo ;
265270 var local = _local as ITrustedContentSession ;
266271 Contract . AssertNotNull ( local , "The local content session was expected to be a trusted session, but failed to cast." ) ;
267- return local . PutTrustedFileAsync ( context , new ContentHashWithSize ( contentHash , contentEntry . Size ) , tempLocation , FileRealizationMode . Any , context . Token , urgencyHint ) ;
272+ return local . PutTrustedFileAsync ( context , new ContentHashWithSize ( contentHash , contentEntry . Size ) , tempLocation , FileRealizationMode . Move , context . Token , urgencyHint ) ;
268273 } ,
269274 CopyCompression . None ,
270275 null ,
271276 _ephemeralHost . Configuration . Workspace ) ) ;
272277
273- if ( datacenter . Succeeded )
274- {
275- var local = await _local . PlaceFileAsync (
276- context ,
277- contentHash ,
278- path ,
279- accessMode ,
280- replacementMode ,
281- realizationMode ,
282- context . Token ,
283- urgencyHint ) ;
284- if ( local . Succeeded )
285- {
286- return local . WithMaterializationSource ( PlaceFileResult . Source . DatacenterCache ) ;
287- }
288-
289- return new PlaceFileResult ( PlaceFileResult . ResultCode . NotPlacedContentNotFound , errorMessage : $ "Content hash `{ contentHash } ` inserted into local cache, but couldn't place from local") ;
290- }
291-
292- return new PlaceFileResult ( PlaceFileResult . ResultCode . NotPlacedContentNotFound , errorMessage : $ "Content hash `{ contentHash } ` couldn't be downloaded from peers") ;
278+ return datacenter ;
293279 }
294280
295- return new PlaceFileResult ( PlaceFileResult . ResultCode . NotPlacedContentNotFound , errorMessage : $ "Content hash `{ contentHash } ` found in the content tracker, but without any active locations") ;
281+ return new PutResult ( contentHash , $ "Content hash `{ contentHash } ` found in the content tracker, but without any active locations") ;
296282 }
297283
298- return new PlaceFileResult ( PlaceFileResult . ResultCode . NotPlacedContentNotFound , errorMessage : $ "Content hash `{ contentHash } ` not found in the content tracker") ;
284+ return new PutResult ( contentHash , errorMessage : $ "Content hash `{ contentHash } ` not found in the content tracker") ;
299285
300286 } ,
301- extraStartMessage : $ "({ contentHash . ToShortString ( ) } , { path } , { accessMode } , { replacementMode } , { realizationMode } )",
287+ extraStartMessage : $ "({ contentHash . ToShortString ( ) } )",
302288 traceOperationStarted : TraceOperationStarted ,
303289 extraEndMessage : result =>
304290 {
305- var message = $ "({ contentHash . ToShortString ( ) } , { path } , { accessMode } , { replacementMode } , { realizationMode } )";
306- if ( result . Metadata == null )
291+ var message = $ "({ contentHash . ToShortString ( ) } )";
292+ if ( result . MetaData == null )
307293 {
308294 return message ;
309295 }
310296
311- return message + $ " Gate.OccupiedCount={ result . Metadata . GateOccupiedCount } Gate.Wait={ result . Metadata . GateWaitTime . TotalMilliseconds } ms";
312- } ,
313- traceErrorsOnly : TraceErrorsOnlyForPlaceFile ( path ) ) ;
297+ return message + $ " Gate.OccupiedCount={ result . MetaData . GateOccupiedCount } Gate.Wait={ result . MetaData . GateWaitTime . TotalMilliseconds } ms";
298+ } ) ;
314299 }
315300
316301 protected override async Task < PutResult > PutFileCoreAsync (
0 commit comments