@@ -316,11 +316,12 @@ impl<'a> Connection<'a> {
316
316
& self ,
317
317
key : & EntityKey ,
318
318
data : & serde_json:: Value ,
319
+ overwrite : bool ,
319
320
guard : Option < EntityFilter > ,
320
321
history_event : Option < & HistoryEvent > ,
321
322
) -> Result < usize , StoreError > {
322
323
let table = self . table ( & key. subgraph_id ) ?;
323
- table. update ( self . conn , key, data, guard, history_event)
324
+ table. update ( self . conn , key, data, overwrite , guard, history_event)
324
325
}
325
326
326
327
pub ( crate ) fn delete (
@@ -400,6 +401,7 @@ impl SplitTable {
400
401
conn : & PgConnection ,
401
402
key : & EntityKey ,
402
403
data : & serde_json:: Value ,
404
+ overwrite : bool ,
403
405
guard : Option < EntityFilter > ,
404
406
history_event : Option < & HistoryEvent > ,
405
407
) -> Result < usize , StoreError > {
@@ -430,23 +432,42 @@ impl SplitTable {
430
432
. filter ( entities:: entity. eq ( & key. entity_type ) )
431
433
. filter ( entities:: id. eq ( & key. entity_id ) ) ;
432
434
433
- Ok ( diesel:: update ( target)
434
- . set ( (
435
- entities:: data. eq ( entities:: data. merge ( & data) ) ,
436
- entities:: event_source. eq ( & event_source) ,
437
- ) )
438
- . filter ( filter)
439
- . execute ( conn) ?)
435
+ if overwrite {
436
+ Ok ( diesel:: update ( target)
437
+ . set ( (
438
+ entities:: data. eq ( & data) ,
439
+ entities:: event_source. eq ( & event_source) ,
440
+ ) )
441
+ . filter ( filter)
442
+ . execute ( conn) ?)
443
+ } else {
444
+ Ok ( diesel:: update ( target)
445
+ . set ( (
446
+ entities:: data. eq ( entities:: data. merge ( & data) ) ,
447
+ entities:: event_source. eq ( & event_source) ,
448
+ ) )
449
+ . filter ( filter)
450
+ . execute ( conn) ?)
451
+ }
440
452
} else {
441
453
// If there is no guard (which has to include all 'normal' subgraphs),
442
454
// we need to use a direct query since diesel::update does not like
443
455
// dynamic tables.
444
- let query = format ! (
445
- "update {}.entities
446
- set data = data || $3, event_source = $4
447
- where entity = $1 and id = $2" ,
448
- self . schema
449
- ) ;
456
+ let query = if overwrite {
457
+ format ! (
458
+ "update {}.entities
459
+ set data = $3, event_source = $4
460
+ where entity = $1 and id = $2" ,
461
+ self . schema
462
+ )
463
+ } else {
464
+ format ! (
465
+ "update {}.entities
466
+ set data = data || $3, event_source = $4
467
+ where entity = $1 and id = $2" ,
468
+ self . schema
469
+ )
470
+ } ;
450
471
let query = diesel:: sql_query ( query)
451
472
. bind :: < Text , _ > ( & key. entity_type )
452
473
. bind :: < Text , _ > ( & key. entity_id )
@@ -746,6 +767,7 @@ impl Table {
746
767
conn : & PgConnection ,
747
768
key : & EntityKey ,
748
769
data : & serde_json:: Value ,
770
+ overwrite : bool ,
749
771
guard : Option < EntityFilter > ,
750
772
history_event : Option < & HistoryEvent > ,
751
773
) -> Result < usize , StoreError > {
@@ -758,22 +780,42 @@ impl Table {
758
780
. filter ( public:: entities:: entity. eq ( & key. entity_type ) )
759
781
. filter ( public:: entities:: id. eq ( & key. entity_id ) ) ;
760
782
761
- let query = diesel:: update ( target) . set ( (
762
- public:: entities:: data. eq ( public:: entities:: data. merge ( data) ) ,
763
- public:: entities:: event_source. eq ( & event_source) ,
764
- ) ) ;
765
-
766
- match guard {
767
- Some ( filter) => {
768
- let filter = build_filter ( filter) . map_err ( |e| {
769
- TransactionAbortError :: Other ( format ! (
770
- "invalid filter '{}' for value '{}'" ,
771
- e. filter, e. value
772
- ) )
773
- } ) ?;
774
- Ok ( query. filter ( filter) . execute ( conn) ?)
783
+ if overwrite {
784
+ let query = diesel:: update ( target) . set ( (
785
+ public:: entities:: data. eq ( data) ,
786
+ public:: entities:: event_source. eq ( & event_source) ,
787
+ ) ) ;
788
+
789
+ match guard {
790
+ Some ( filter) => {
791
+ let filter = build_filter ( filter) . map_err ( |e| {
792
+ TransactionAbortError :: Other ( format ! (
793
+ "invalid filter '{}' for value '{}'" ,
794
+ e. filter, e. value
795
+ ) )
796
+ } ) ?;
797
+ Ok ( query. filter ( filter) . execute ( conn) ?)
798
+ }
799
+ None => Ok ( query. execute ( conn) ?) ,
800
+ }
801
+ } else {
802
+ let query = diesel:: update ( target) . set ( (
803
+ public:: entities:: data. eq ( public:: entities:: data. merge ( data) ) ,
804
+ public:: entities:: event_source. eq ( & event_source) ,
805
+ ) ) ;
806
+
807
+ match guard {
808
+ Some ( filter) => {
809
+ let filter = build_filter ( filter) . map_err ( |e| {
810
+ TransactionAbortError :: Other ( format ! (
811
+ "invalid filter '{}' for value '{}'" ,
812
+ e. filter, e. value
813
+ ) )
814
+ } ) ?;
815
+ Ok ( query. filter ( filter) . execute ( conn) ?)
816
+ }
817
+ None => Ok ( query. execute ( conn) ?) ,
775
818
}
776
- None => Ok ( query. execute ( conn) ?) ,
777
819
}
778
820
}
779
821
Table :: Split ( entities) => {
@@ -791,7 +833,7 @@ impl Table {
791
833
) ?;
792
834
}
793
835
794
- entities. update ( conn, key, data, guard, history_event)
836
+ entities. update ( conn, key, data, overwrite , guard, history_event)
795
837
}
796
838
}
797
839
}
0 commit comments