1313using Microsoft . Extensions . Logging ;
1414using System ;
1515using System . Collections . Generic ;
16+ using System . IO ;
1617using System . Linq ;
1718using System . Threading . Tasks ;
1819
@@ -81,6 +82,14 @@ public override async Task ExecuteJob(string jobType, Guid jobInstanceId, JobInv
8182 return ;
8283 }
8384
85+ var jobMetadata = new ItemJobMetadata
86+ {
87+ JobInstanceId = jobInstanceId ,
88+ JobType = jobType ,
89+ UseOneLake = _metadata . UseOneLake
90+ } ;
91+ await ItemMetadataStore . UpsertJob ( TenantObjectId , ItemObjectId , jobInstanceId , jobMetadata ) ;
92+
8493 var token = await AuthenticationService . GetAccessTokenOnBehalfOf ( AuthorizationContext , OneLakeConstants . OneLakeScopes ) ;
8594
8695 var op1 = _metadata . Operand1 ;
@@ -95,9 +104,21 @@ public override async Task ExecuteJob(string jobType, Guid jobInstanceId, JobInv
95104 await Task . Delay ( TimeSpan . FromSeconds ( 60 * 8 ) ) ;
96105 }
97106
107+ try
108+ {
109+ // Reload job metadata to check later if the job was cancelled
110+ jobMetadata = await ItemMetadataStore . LoadJob ( TenantObjectId , ItemObjectId , jobInstanceId ) ;
111+ }
112+ catch ( FileNotFoundException exc )
113+ {
114+ // Demonstrating a way to recover job metadata if it has been deleted.
115+ Logger . LogWarning ( exc , $ "{ nameof ( ExecuteJob ) } - Recreating missing job { jobInstanceId } metadata in tenant { TenantObjectId } item { ItemObjectId } .") ;
116+ await ItemMetadataStore . UpsertJob ( TenantObjectId , ItemObjectId , jobInstanceId , jobMetadata ) ;
117+ }
118+
98119 // Write result to Lakehouse if job is not cancelled
99- if ( ! ItemMetadataStore . JobCancelRequestExists ( TenantObjectId , ItemObjectId , jobInstanceId ) ) {
100- var filePath = GetCalculationResultFilePath ( jobType , jobInstanceId ) ;
120+ if ( ! jobMetadata . IsCanceled ) {
121+ var filePath = GetCalculationResultFilePath ( jobMetadata ) ;
101122 await OneLakeClientService . WriteToOneLakeFile ( token , filePath , result ) ;
102123
103124 _metadata . LastCalculationResultLocation = filePath ;
@@ -115,36 +136,43 @@ public override async Task<ItemJobInstanceState> GetJobState(string jobType, Gui
115136 } ;
116137 }
117138
118- var token = await AuthenticationService . GetAccessTokenOnBehalfOf ( AuthorizationContext , OneLakeConstants . OneLakeScopes ) ;
119-
120- var filePath = GetCalculationResultFilePath ( jobType , jobInstanceId ) ;
121- var fileExists = await OneLakeClientService . CheckIfFileExists ( token , filePath ) ;
139+ if ( ! ItemMetadataStore . ExistsJob ( TenantObjectId , ItemObjectId , jobInstanceId ) )
140+ {
141+ Logger . LogError ( $ "{ nameof ( GetJobState ) } - Job { jobInstanceId } metadata does not exist in tenant { TenantObjectId } item { ItemObjectId } .") ;
142+ return new ItemJobInstanceState ( ) { Status = JobInstanceStatus . Failed } ;
143+ }
122144
123- if ( ItemMetadataStore . JobCancelRequestExists ( TenantObjectId , ItemObjectId , jobInstanceId ) )
145+ var jobMetadata = await ItemMetadataStore . LoadJob ( TenantObjectId , ItemObjectId , jobInstanceId ) ;
146+ if ( jobMetadata . IsCanceled )
124147 {
125148 return new ItemJobInstanceState { Status = JobInstanceStatus . Cancelled } ;
126149 }
127150
151+ var filePath = GetCalculationResultFilePath ( jobMetadata ) ;
152+ var token = await AuthenticationService . GetAccessTokenOnBehalfOf ( AuthorizationContext , OneLakeConstants . OneLakeScopes ) ;
153+ var fileExists = await OneLakeClientService . CheckIfFileExists ( token , filePath ) ;
154+
128155 return new ItemJobInstanceState
129156 {
130157 Status = fileExists ? JobInstanceStatus . Completed : JobInstanceStatus . InProgress ,
131158 } ;
132159 }
133160
134- private string GetCalculationResultFilePath ( string jobType , Guid jobInstanceId )
161+ private string GetCalculationResultFilePath ( ItemJobMetadata jobMetadata )
135162 {
163+ var jobInstanceId = jobMetadata . JobInstanceId ;
136164 var typeToFileName = new Dictionary < string , string >
137165 {
138166 { Item1JobType . ScheduledJob , $ "CalculationResult_{ jobInstanceId } .txt" } ,
139167 { Item1JobType . CalculateAsText , $ "CalculationResult_{ jobInstanceId } .txt" } ,
140168 { Item1JobType . LongRunningCalculateAsText , $ "CalculationResult_{ jobInstanceId } .txt" } ,
141169 { Item1JobType . CalculateAsParquet , $ "CalculationResult_{ jobInstanceId } .parquet" }
142170 } ;
143- typeToFileName . TryGetValue ( jobType , out var fileName ) ;
171+ typeToFileName . TryGetValue ( jobMetadata . JobType , out var fileName ) ;
144172
145173 if ( fileName != null )
146174 {
147- return _metadata . UseOneLake
175+ return jobMetadata . UseOneLake
148176 ? OneLakeClientService . GetOneLakeFilePath ( WorkspaceObjectId , ItemObjectId , fileName )
149177 : OneLakeClientService . GetOneLakeFilePath ( _metadata . Lakehouse . WorkspaceId , _metadata . Lakehouse . Id , fileName ) ;
150178 }
0 commit comments