1414using System . Reflection ;
1515using System . Text . Json ;
1616using System . Text . Json . Serialization ;
17+ using static CommunityToolkit . Datasync . Client . Offline . Operations . PullOperationManager ;
1718
1819namespace CommunityToolkit . Datasync . Client . Offline . Operations ;
1920
@@ -53,70 +54,87 @@ public async Task<PullResult> ExecuteAsync(IEnumerable<PullRequest> requests, Pu
5354
5455 QueueHandler < PullResponse > databaseUpdateQueue = new ( 1 , async pullResponse =>
5556 {
56- DateTimeOffset lastSynchronization = await DeltaTokenStore . GetDeltaTokenAsync ( pullResponse . QueryId , cancellationToken ) . ConfigureAwait ( false ) ;
57- foreach ( object item in pullResponse . Items )
57+ if ( pullResponse . Items . Any ( ) )
5858 {
59- EntityMetadata metadata = EntityResolver . GetEntityMetadata ( item , pullResponse . EntityType ) ;
60- object ? originalEntity = await context . FindAsync ( pullResponse . EntityType , [ metadata . Id ] , cancellationToken ) . ConfigureAwait ( false ) ;
61-
62- if ( originalEntity is null && ! metadata . Deleted )
63- {
64- _ = context . Add ( item ) ;
65- result . IncrementAdditions ( ) ;
66- }
67- else if ( originalEntity is not null && metadata . Deleted )
59+ DateTimeOffset lastSynchronization = await DeltaTokenStore . GetDeltaTokenAsync ( pullResponse . QueryId , cancellationToken ) . ConfigureAwait ( false ) ;
60+ foreach ( object item in pullResponse . Items )
6861 {
69- _ = context . Remove ( originalEntity ) ;
70- result . IncrementDeletions ( ) ;
71- }
72- else if ( originalEntity is not null && ! metadata . Deleted )
73- {
74- // Gather properties marked with [JsonIgnore]
75- HashSet < string > ignoredProps = pullResponse . EntityType
76- . GetProperties ( BindingFlags . Public | BindingFlags . Instance )
77- . Where ( p => p . IsDefined ( typeof ( JsonIgnoreAttribute ) , inherit : true ) )
78- . Select ( p => p . Name )
79- . ToHashSet ( ) ;
80-
81- EntityEntry originalEntry = context . Entry ( originalEntity ) ;
82- EntityEntry newEntry = context . Entry ( item ) ;
62+ EntityMetadata metadata = EntityResolver . GetEntityMetadata ( item , pullResponse . EntityType ) ;
63+ object ? originalEntity = await context . FindAsync ( pullResponse . EntityType , [ metadata . Id ] , cancellationToken ) . ConfigureAwait ( false ) ;
8364
84- // Only copy properties that are not marked with [JsonIgnore]
85- foreach ( IProperty property in originalEntry . Metadata . GetProperties ( ) )
65+ if ( originalEntity is null && ! metadata . Deleted )
8666 {
87- if ( ! ignoredProps . Contains ( property . Name ) )
67+ _ = context . Add ( item ) ;
68+ result . IncrementAdditions ( ) ;
69+ }
70+ else if ( originalEntity is not null && metadata . Deleted )
71+ {
72+ _ = context . Remove ( originalEntity ) ;
73+ result . IncrementDeletions ( ) ;
74+ }
75+ else if ( originalEntity is not null && ! metadata . Deleted )
76+ {
77+ // Gather properties marked with [JsonIgnore]
78+ HashSet < string > ignoredProps = pullResponse . EntityType
79+ . GetProperties ( BindingFlags . Public | BindingFlags . Instance )
80+ . Where ( p => p . IsDefined ( typeof ( JsonIgnoreAttribute ) , inherit : true ) )
81+ . Select ( p => p . Name )
82+ . ToHashSet ( ) ;
83+
84+ EntityEntry originalEntry = context . Entry ( originalEntity ) ;
85+ EntityEntry newEntry = context . Entry ( item ) ;
86+
87+ // Only copy properties that are not marked with [JsonIgnore]
88+ foreach ( IProperty property in originalEntry . Metadata . GetProperties ( ) )
8889 {
89- originalEntry . Property ( property . Name ) . CurrentValue = newEntry . Property ( property . Name ) . CurrentValue ;
90+ if ( ! ignoredProps . Contains ( property . Name ) )
91+ {
92+ originalEntry . Property ( property . Name ) . CurrentValue = newEntry . Property ( property . Name ) . CurrentValue ;
93+ }
9094 }
95+
96+ result . IncrementReplacements ( ) ;
9197 }
9298
93- result . IncrementReplacements ( ) ;
99+ if ( metadata . UpdatedAt > lastSynchronization )
100+ {
101+ lastSynchronization = metadata . UpdatedAt . Value ;
102+ bool isAdded = await DeltaTokenStore . SetDeltaTokenAsync ( pullResponse . QueryId , metadata . UpdatedAt . Value , cancellationToken ) . ConfigureAwait ( false ) ;
103+ if ( isAdded )
104+ {
105+ // Sqlite oddity - you can't add then update; it changes the change type to UPDATE, which then fails.
106+ _ = await context . SaveChangesAsync ( true , false , cancellationToken ) . ConfigureAwait ( false ) ;
107+ }
108+ }
94109 }
95110
96- if ( metadata . UpdatedAt > lastSynchronization )
111+ if ( pullOptions . SaveAfterEveryServiceRequest )
97112 {
98- lastSynchronization = metadata . UpdatedAt . Value ;
99- bool isAdded = await DeltaTokenStore . SetDeltaTokenAsync ( pullResponse . QueryId , metadata . UpdatedAt . Value , cancellationToken ) . ConfigureAwait ( false ) ;
100- if ( isAdded )
101- {
102- // Sqlite oddity - you can't add then update; it changes the change type to UPDATE, which then fails.
103- _ = await context . SaveChangesAsync ( true , false , cancellationToken ) . ConfigureAwait ( false ) ;
104- }
113+ _ = await context . SaveChangesAsync ( true , false , cancellationToken ) . ConfigureAwait ( false ) ;
105114 }
106- }
107115
108- context . SendSynchronizationEvent ( new SynchronizationEventArgs ( )
109- {
110- EventType = SynchronizationEventType . ItemsCommitted ,
111- EntityType = pullResponse . EntityType ,
112- ItemsProcessed = pullResponse . TotalItemsProcessed ,
113- TotalNrItems = pullResponse . TotalRequestItems ,
114- QueryId = pullResponse . QueryId
115- } ) ;
116+ context . SendSynchronizationEvent ( new SynchronizationEventArgs ( )
117+ {
118+ EventType = SynchronizationEventType . ItemsCommitted ,
119+ EntityType = pullResponse . EntityType ,
120+ ItemsProcessed = pullResponse . TotalItemsProcessed ,
121+ TotalNrItems = pullResponse . TotalRequestItems ,
122+ QueryId = pullResponse . QueryId
123+ } ) ;
124+ }
116125
117- if ( pullOptions . SaveAfterEveryServiceRequest )
126+ if ( pullResponse . Completed )
118127 {
119- _ = await context . SaveChangesAsync ( true , false , cancellationToken ) . ConfigureAwait ( false ) ;
128+ context . SendSynchronizationEvent ( new SynchronizationEventArgs ( )
129+ {
130+ EventType = SynchronizationEventType . PullEnded ,
131+ EntityType = pullResponse . EntityType ,
132+ ItemsProcessed = pullResponse . TotalItemsProcessed ,
133+ TotalNrItems = pullResponse . TotalRequestItems ,
134+ QueryId = pullResponse . QueryId ,
135+ Exception = pullResponse . Exception ,
136+ ServiceResponse = pullResponse . Exception is DatasyncPullException ex ? ex . ServiceResponse : null
137+ } ) ;
120138 }
121139 } ) ;
122140
@@ -125,15 +143,24 @@ public async Task<PullResult> ExecuteAsync(IEnumerable<PullRequest> requests, Pu
125143 Uri endpoint = ExecutableOperation . MakeAbsoluteUri ( pullRequest . HttpClient . BaseAddress , pullRequest . Endpoint ) ;
126144 Uri requestUri = new UriBuilder ( endpoint ) { Query = pullRequest . QueryDescription . ToODataQueryString ( ) } . Uri ;
127145 Type pageType = typeof ( Page < > ) . MakeGenericType ( pullRequest . EntityType ) ;
146+ long itemsProcessed = 0 ;
147+ long totalCount = 0 ;
128148
129149 try
130150 {
131151 bool completed = false ;
132- long itemsProcessed = 0 ;
152+ // Signal we started the pull operation.
153+ context . SendSynchronizationEvent ( new SynchronizationEventArgs ( )
154+ {
155+ EventType = SynchronizationEventType . PullStarted ,
156+ EntityType = pullRequest . EntityType ,
157+ QueryId = pullRequest . QueryId
158+ } ) ;
133159 do
134160 {
135161 Page < object > page = await GetPageAsync ( pullRequest . HttpClient , requestUri , pageType , cancellationToken ) . ConfigureAwait ( false ) ;
136162 itemsProcessed += page . Items . Count ( ) ;
163+ totalCount = page . Count ?? totalCount ;
137164
138165 context . SendSynchronizationEvent ( new SynchronizationEventArgs ( )
139166 {
@@ -144,7 +171,6 @@ public async Task<PullResult> ExecuteAsync(IEnumerable<PullRequest> requests, Pu
144171 QueryId = pullRequest . QueryId
145172 } ) ;
146173
147- databaseUpdateQueue . Enqueue ( new PullResponse ( pullRequest . EntityType , pullRequest . QueryId , page . Items , page . Count ?? 0 , itemsProcessed ) ) ;
148174 if ( ! string . IsNullOrEmpty ( page . NextLink ) )
149175 {
150176 requestUri = new UriBuilder ( endpoint ) { Query = page . NextLink } . Uri ;
@@ -153,12 +179,15 @@ public async Task<PullResult> ExecuteAsync(IEnumerable<PullRequest> requests, Pu
153179 {
154180 completed = true ;
155181 }
182+
183+ databaseUpdateQueue . Enqueue ( new PullResponse ( pullRequest . EntityType , pullRequest . QueryId , page . Items , totalCount , itemsProcessed , completed ) ) ;
156184 }
157185 while ( ! completed ) ;
158186 }
159187 catch ( DatasyncPullException ex )
160188 {
161189 result . AddFailedRequest ( requestUri , ex . ServiceResponse ) ;
190+ databaseUpdateQueue . Enqueue ( new PullResponse ( pullRequest . EntityType , pullRequest . QueryId , Enumerable . Empty < object > ( ) , totalCount , itemsProcessed , true , ex ) ) ;
162191 }
163192 } ) ;
164193
@@ -263,6 +292,8 @@ internal static QueryDescription PrepareQueryDescription(QueryDescription source
263292 /// <param name="Items">The list of items to process.</param>
264293 /// <param name="TotalRequestItems">The total number of items in the current pull request.</param>
265294 /// <param name="TotalItemsProcessed">The total number of items processed, <paramref name="Items"/> included.</param>
295+ /// <param name="Completed">If <c>true</c>, indicates that the pull request is completed.</param>
296+ /// <param name="Exception">Indicates an exception occured during fetching of data</param>
266297 [ ExcludeFromCodeCoverage ]
267- internal record PullResponse ( Type EntityType , string QueryId , IEnumerable < object > Items , long TotalRequestItems , long TotalItemsProcessed ) ;
298+ internal record PullResponse ( Type EntityType , string QueryId , IEnumerable < object > Items , long TotalRequestItems , long TotalItemsProcessed , bool Completed , Exception ? Exception = null ) ;
268299}
0 commit comments