-
Notifications
You must be signed in to change notification settings - Fork 59
Expand file tree
/
Copy pathEbeanLocalDAO.java
More file actions
1802 lines (1621 loc) · 84.7 KB
/
EbeanLocalDAO.java
File metadata and controls
1802 lines (1621 loc) · 84.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package com.linkedin.metadata.dao;
import com.google.common.annotations.VisibleForTesting;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.schema.DataSchema;
import com.linkedin.data.schema.RecordDataSchema;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.SetMode;
import com.linkedin.data.template.UnionTemplate;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates;
import com.linkedin.metadata.dao.builder.LocalRelationshipBuilderRegistry;
import com.linkedin.metadata.dao.exception.ModelConversionException;
import com.linkedin.metadata.dao.exception.RetryLimitReached;
import com.linkedin.metadata.dao.internal.BaseGraphWriterDAO;
import com.linkedin.metadata.dao.producer.BaseMetadataEventProducer;
import com.linkedin.metadata.dao.producer.BaseTrackingMetadataEventProducer;
import com.linkedin.metadata.dao.retention.TimeBasedRetention;
import com.linkedin.metadata.dao.retention.VersionBasedRetention;
import com.linkedin.metadata.dao.storage.LocalDAOStorageConfig;
import com.linkedin.metadata.dao.tracking.BaseDaoBenchmarkMetrics;
import com.linkedin.metadata.dao.tracking.BaseTrackingManager;
import com.linkedin.metadata.dao.urnpath.EmptyPathExtractor;
import com.linkedin.metadata.dao.urnpath.UrnPathExtractor;
import com.linkedin.metadata.dao.utils.EBeanDAOUtils;
import com.linkedin.metadata.dao.utils.ETagUtils;
import com.linkedin.metadata.dao.utils.ModelUtils;
import com.linkedin.metadata.dao.utils.QueryUtils;
import com.linkedin.metadata.dao.utils.RecordUtils;
import com.linkedin.metadata.dao.utils.SQLSchemaUtils;
import com.linkedin.metadata.events.IngestionTrackingContext;
import com.linkedin.metadata.internal.IngestionParams;
import com.linkedin.metadata.query.Condition;
import com.linkedin.metadata.query.ExtraInfo;
import com.linkedin.metadata.query.ExtraInfoArray;
import com.linkedin.metadata.query.IndexCriterion;
import com.linkedin.metadata.query.IndexFilter;
import com.linkedin.metadata.query.IndexGroupByCriterion;
import com.linkedin.metadata.query.IndexSortCriterion;
import com.linkedin.metadata.query.IndexValue;
import com.linkedin.metadata.query.ListResultMetadata;
import io.ebean.DuplicateKeyException;
import io.ebean.EbeanServer;
import io.ebean.PagedList;
import io.ebean.Query;
import io.ebean.SqlRow;
import io.ebean.SqlUpdate;
import io.ebean.Transaction;
import io.ebean.config.ServerConfig;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URISyntaxException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.persistence.OptimisticLockException;
import javax.persistence.RollbackException;
import javax.persistence.Table;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import pegasus.com.linkedin.metadata.events.IngestionAspectETag;
import pegasus.com.linkedin.metadata.events.IngestionAspectETagArray;
import static com.linkedin.metadata.dao.EbeanLocalAccess.*;
import static com.linkedin.metadata.dao.EbeanMetadataAspect.*;
import static com.linkedin.metadata.dao.utils.EBeanDAOUtils.*;
import static com.linkedin.metadata.dao.utils.EbeanServerUtils.*;
/**
* An Ebean implementation of {@link BaseLocalDAO}.
*/
@Slf4j
public class EbeanLocalDAO<ASPECT_UNION extends UnionTemplate, URN extends Urn>
extends BaseLocalDAO<ASPECT_UNION, URN> {
protected final EbeanServer _server;
protected final Class<URN> _urnClass;
private final static int DEFAULT_BATCH_SIZE = 50;
private int _queryKeysCount = DEFAULT_BATCH_SIZE;
private IEbeanLocalAccess<URN> _localAccess;
private EbeanLocalRelationshipWriterDAO _localRelationshipWriterDAO;
private LocalRelationshipBuilderRegistry _localRelationshipBuilderRegistry = null;
private SchemaConfig _schemaConfig = SchemaConfig.OLD_SCHEMA_ONLY;
private final EBeanDAOConfig _eBeanDAOConfig = new EBeanDAOConfig();
public enum SchemaConfig {
OLD_SCHEMA_ONLY, // Default: read from and write to the old schema table
NEW_SCHEMA_ONLY, // Read from and write to the new schema tables
DUAL_SCHEMA // Write to both the old and new tables and perform a comparison between values when reading
}
// TODO: clean up once AIM is no longer using existing local relationships - they should make new relationship tables with the aspect column
private boolean _useAspectColumnForRelationshipRemoval = false;
private boolean _noisyLogsEnabled = false;
// Which approach to be used for record retrieval when inserting a new record
// See GCN-38382
private FindMethodology _findMethodology = FindMethodology.UNIQUE_ID;
// true if metadata change will be persisted into the change log table (metadata_aspect)
private boolean _changeLogEnabled = true;
// TODO: remove this logic once metadata_aspect has been completed removed from TMS
// regarding metadata_aspect table:
// false = read/bump 2nd latest version + insert latest version
// true = overwrite 2nd latest version with latest version (equivalent to keeping only version = 0 rows in metadata_aspect)
private boolean _overwriteLatestVersionEnabled = false;
public void setChangeLogEnabled(boolean changeLogEnabled) {
if (_schemaConfig == SchemaConfig.NEW_SCHEMA_ONLY) {
_changeLogEnabled = changeLogEnabled;
} else {
// For non-new schema, _changeLog will be enforced to be true
log.warn("You can only enable or disable the change log in new schema mode."
+ "In old and dual schema modes, this setting is always enabled.");
_changeLogEnabled = true;
}
}
public boolean isChangeLogEnabled() {
return _changeLogEnabled;
}
/**
* Set a flag to indicate whether to use the aspect column for relationship removal. If set to true, only relationships from
* the same aspect class will be removed during ingestion or soft-deletion.
*/
public void setUseAspectColumnForRelationshipRemoval(boolean useAspectColumnForRelationshipRemoval) {
_useAspectColumnForRelationshipRemoval = useAspectColumnForRelationshipRemoval;
_localRelationshipWriterDAO.setUseAspectColumnForRelationshipRemoval(useAspectColumnForRelationshipRemoval);
}
/**
* Set a flag to indicate whether noisy info logs are enabled. Should only be used for debugging.
* @param noisyLogsEnabled whether the logs are enabled
*/
public void setNoisyLogsEnabled(boolean noisyLogsEnabled) {
_noisyLogsEnabled = noisyLogsEnabled;
}
public void setOverwriteLatestVersionEnabled(boolean overwriteLatestVersionEnabled) {
if (_schemaConfig == SchemaConfig.NEW_SCHEMA_ONLY) {
if (isChangeLogEnabled()) {
_overwriteLatestVersionEnabled = overwriteLatestVersionEnabled;
} else {
log.warn("You can only enable or disable overwriting the latest version when the change log is enabled as well.");
_overwriteLatestVersionEnabled = false;
}
} else {
// For non-new schema, _ovewriteLatestVersionEnabled will be enforced to be false
log.warn("You can only enable or disable overwriting the latest version in new schema mode."
+ "In old and dual schema modes, this setting is always disabled.");
_overwriteLatestVersionEnabled = false;
}
}
public enum FindMethodology {
UNIQUE_ID, // (legacy) https://javadoc.io/static/io.ebean/ebean/11.19.2/io/ebean/EbeanServer.html#find-java.lang.Class-java.lang.Object-
DIRECT_SQL, // https://javadoc.io/static/io.ebean/ebean/11.19.2/io/ebean/EbeanServer.html#findNative-java.lang.Class-java.lang.String-
QUERY_BUILDER // https://javadoc.io/static/io.ebean/ebean/11.19.2/io/ebean/Ebean.html#find-java.lang.Class-
}
@Value
static class GMAIndexPair {
public String valueType;
public Object value;
}
/**
* Constructor for EbeanLocalDAO.
*
* @param aspectUnionClass containing union of all supported aspects. Must be a valid aspect union defined in com.linkedin.metadata.aspect
* @param producer {@link BaseMetadataEventProducer} for the metadata event producer
* @param serverConfig {@link ServerConfig} that defines the configuration of EbeanServer instances
* @param urnClass Class of the entity URN
*/
public EbeanLocalDAO(@Nonnull Class<ASPECT_UNION> aspectUnionClass, @Nonnull BaseMetadataEventProducer producer,
@Nonnull ServerConfig serverConfig, @Nonnull Class<URN> urnClass) {
this(aspectUnionClass, producer, createServer(serverConfig), urnClass);
}
/**
* Constructor for EbeanLocalDAO.
*
* @param aspectUnionClass containing union of all supported aspects. Must be a valid aspect union defined in com.linkedin.metadata.aspect
* @param producer {@link BaseTrackingMetadataEventProducer} for the metadata event producer
* @param serverConfig {@link ServerConfig} that defines the configuration of EbeanServer instances
* @param urnClass Class of the entity URN
* @param trackingManager {@link BaseTrackingManager} tracking manager for producing tracking requests
*/
public EbeanLocalDAO(@Nonnull Class<ASPECT_UNION> aspectUnionClass, @Nonnull BaseTrackingMetadataEventProducer producer,
@Nonnull ServerConfig serverConfig, @Nonnull Class<URN> urnClass, @Nonnull BaseTrackingManager trackingManager) {
this(aspectUnionClass, producer, createServer(serverConfig), urnClass, trackingManager);
}
/**
* Constructor for EbeanLocalDAO with the option to use the new schema and enable dual-read.
*
* @param aspectUnionClass containing union of all supported aspects. Must be a valid aspect union defined in com.linkedin.metadata.aspect
* @param producer {@link BaseMetadataEventProducer} for the metadata event producer
* @param serverConfig {@link ServerConfig} that defines the configuration of EbeanServer instances
* @param urnClass Class of the entity URN
* @param schemaConfig Enum indicating which schema(s)/table(s) to read from and write to
*/
public EbeanLocalDAO(@Nonnull Class<ASPECT_UNION> aspectUnionClass, @Nonnull BaseMetadataEventProducer producer,
@Nonnull ServerConfig serverConfig, @Nonnull Class<URN> urnClass, @Nonnull SchemaConfig schemaConfig) {
this(aspectUnionClass, producer, createServer(serverConfig), serverConfig, urnClass, schemaConfig);
}
/**
* Constructor for EbeanLocalDAO with the option to use the new schema and enable dual-read.
*
* @param aspectUnionClass containing union of all supported aspects. Must be a valid aspect union defined in com.linkedin.metadata.aspect
* @param producer {@link BaseTrackingMetadataEventProducer} for the metadata event producer
* @param serverConfig {@link ServerConfig} that defines the configuration of EbeanServer instances
* @param urnClass Class of the entity URN
* @param schemaConfig Enum indicating which schema(s)/table(s) to read from and write to
* @param trackingManager {@link BaseTrackingManager} tracking manager for producing tracking requests
*/
public EbeanLocalDAO(@Nonnull Class<ASPECT_UNION> aspectUnionClass, @Nonnull BaseTrackingMetadataEventProducer producer,
@Nonnull ServerConfig serverConfig, @Nonnull Class<URN> urnClass, @Nonnull SchemaConfig schemaConfig,
@Nonnull BaseTrackingManager trackingManager) {
this(aspectUnionClass, producer, createServer(serverConfig), serverConfig, urnClass, schemaConfig, trackingManager);
}
/**
* Constructor for EbeanLocalDAO with the option to use an alternate Ebean find methodology for record insertion.
* See GCN-38382
*
* @param aspectUnionClass containing union of all supported aspects. Must be a valid aspect union defined in com.linkedin.metadata.aspect
* @param producer {@link BaseMetadataEventProducer} for the metadata event producer
* @param serverConfig {@link ServerConfig} that defines the configuration of EbeanServer instances
* @param urnClass Class of the entity URN
* @param findMethodology Enum indicating which find configuration to use
*/
public EbeanLocalDAO(@Nonnull Class<ASPECT_UNION> aspectUnionClass, @Nonnull BaseMetadataEventProducer producer,
@Nonnull ServerConfig serverConfig, @Nonnull Class<URN> urnClass, @Nonnull FindMethodology findMethodology) {
this(aspectUnionClass, producer, createServer(serverConfig), serverConfig, urnClass, findMethodology);
}
/**
* Constructor for EbeanLocalDAO with the option to use an alternate Ebean find methodology for record insertion.
* See GCN-38382
*
* @param aspectUnionClass containing union of all supported aspects. Must be a valid aspect union defined in com.linkedin.metadata.aspect
* @param producer {@link BaseTrackingMetadataEventProducer} for the metadata event producer
* @param serverConfig {@link ServerConfig} that defines the configuration of EbeanServer instances
* @param urnClass Class of the entity URN
* @param findMethodology Enum indicating which find configuration to use
* @param trackingManager {@link BaseTrackingManager} tracking manager for producing tracking requests
*/
public EbeanLocalDAO(@Nonnull Class<ASPECT_UNION> aspectUnionClass,
@Nonnull BaseTrackingMetadataEventProducer producer, @Nonnull ServerConfig serverConfig,
@Nonnull Class<URN> urnClass, @Nonnull FindMethodology findMethodology,
@Nonnull BaseTrackingManager trackingManager) {
this(aspectUnionClass, producer, createServer(serverConfig), serverConfig, urnClass, findMethodology, trackingManager);
}
/**
* Constructor for EbeanLocalDAO.
*
* @param producer {@link BaseMetadataEventProducer} for the metadata event producer
* @param serverConfig {@link ServerConfig} that defines the configuration of EbeanServer instances
* @param storageConfig {@link LocalDAOStorageConfig} containing storage config of full list of supported aspects
* @param urnClass class of the entity URN
* @param urnPathExtractor path extractor to index parts of URNs
*/
public EbeanLocalDAO(@Nonnull BaseMetadataEventProducer producer, @Nonnull ServerConfig serverConfig,
@Nonnull LocalDAOStorageConfig storageConfig, @Nonnull Class<URN> urnClass,
@Nonnull UrnPathExtractor<URN> urnPathExtractor) {
this(producer, createServer(serverConfig), storageConfig, urnClass, urnPathExtractor);
}
/**
* Constructor for EbeanLocalDAO.
*
* @param producer {@link BaseTrackingMetadataEventProducer} for the metadata event producer
* @param serverConfig {@link ServerConfig} that defines the configuration of EbeanServer instances
* @param storageConfig {@link LocalDAOStorageConfig} containing storage config of full list of supported aspects
* @param urnClass class of the entity URN
* @param urnPathExtractor path extractor to index parts of URNs to
* @param trackingManager {@link BaseTrackingManager} tracking manager for producing tracking requests
*/
public EbeanLocalDAO(@Nonnull BaseTrackingMetadataEventProducer producer, @Nonnull ServerConfig serverConfig,
@Nonnull LocalDAOStorageConfig storageConfig, @Nonnull Class<URN> urnClass,
@Nonnull UrnPathExtractor<URN> urnPathExtractor, @Nonnull BaseTrackingManager trackingManager) {
this(producer, createServer(serverConfig), storageConfig, urnClass, urnPathExtractor, trackingManager);
}
/**
* Constructor for EbeanLocalDAO with the option to use the new schema and enable dual-read.
*
* @param producer {@link BaseMetadataEventProducer} for the metadata event producer
* @param serverConfig {@link ServerConfig} that defines the configuration of EbeanServer instances
* @param storageConfig {@link LocalDAOStorageConfig} containing storage config of full list of supported aspects
* @param urnClass class of the entity URN
* @param urnPathExtractor path extractor to index parts of URNs
* @param schemaConfig Enum indicating which schema(s)/table(s) to read from and write to
*/
public EbeanLocalDAO(@Nonnull BaseMetadataEventProducer producer, @Nonnull ServerConfig serverConfig,
@Nonnull LocalDAOStorageConfig storageConfig, @Nonnull Class<URN> urnClass,
@Nonnull UrnPathExtractor<URN> urnPathExtractor, @Nonnull SchemaConfig schemaConfig) {
this(producer, createServer(serverConfig), serverConfig, storageConfig, urnClass, urnPathExtractor, schemaConfig);
}
/**
* Constructor for EbeanLocalDAO with the option to use the new schema and enable dual-read.
*
* @param producer {@link BaseTrackingMetadataEventProducer} for the metadata event producer
* @param serverConfig {@link ServerConfig} that defines the configuration of EbeanServer instances
* @param storageConfig {@link LocalDAOStorageConfig} containing storage config of full list of supported aspects
* @param urnClass class of the entity URN
* @param urnPathExtractor path extractor to index parts of URNs
* @param schemaConfig Enum indicating which schema(s)/table(s) to read from and write to
* @param trackingManager {@link BaseTrackingManager} tracking manager for producing tracking requests
*/
public EbeanLocalDAO(@Nonnull BaseTrackingMetadataEventProducer producer, @Nonnull ServerConfig serverConfig,
@Nonnull LocalDAOStorageConfig storageConfig, @Nonnull Class<URN> urnClass,
@Nonnull UrnPathExtractor<URN> urnPathExtractor, @Nonnull SchemaConfig schemaConfig, @Nonnull BaseTrackingManager trackingManager) {
this(producer, createServer(serverConfig), serverConfig, storageConfig, urnClass, urnPathExtractor, schemaConfig, trackingManager);
}
/**
* Constructor for EbeanLocalDAO.
*
* @param producer {@link BaseMetadataEventProducer} for the metadata event producer
* @param serverConfig {@link ServerConfig} that defines the configuration of EbeanServer instances
* @param storageConfig {@link LocalDAOStorageConfig} containing storage config of full list of supported aspects
* @param urnClass class of the entity URN
*/
public EbeanLocalDAO(@Nonnull BaseMetadataEventProducer producer, @Nonnull ServerConfig serverConfig,
@Nonnull LocalDAOStorageConfig storageConfig, @Nonnull Class<URN> urnClass) {
this(producer, createServer(serverConfig), storageConfig, urnClass, new EmptyPathExtractor<>());
}
/**
* Constructor for EbeanLocalDAO.
*
* @param producer {@link BaseTrackingMetadataEventProducer} for the tracking metadata event producer
* @param serverConfig {@link ServerConfig} that defines the configuration of EbeanServer instances
* @param storageConfig {@link LocalDAOStorageConfig} containing storage config of full list of supported aspects
* @param urnClass class of the entity URN
* @param trackingManager {@link BaseTrackingManager} tracking manager for producing tracking requests
*/
public EbeanLocalDAO(@Nonnull BaseTrackingMetadataEventProducer producer, @Nonnull ServerConfig serverConfig,
@Nonnull LocalDAOStorageConfig storageConfig, @Nonnull Class<URN> urnClass, @Nonnull BaseTrackingManager trackingManager) {
this(producer, createServer(serverConfig), storageConfig, urnClass, new EmptyPathExtractor<>(), trackingManager);
}
/**
* Constructor for EbeanLocalDAO with the option to use the new schema and enable dual-read.
*
* @param producer {@link BaseMetadataEventProducer} for the metadata event producer
* @param serverConfig {@link ServerConfig} that defines the configuration of EbeanServer instances
* @param storageConfig {@link LocalDAOStorageConfig} containing storage config of full list of supported aspects
* @param urnClass class of the entity URN
* @param schemaConfig Enum indicating which schema(s)/table(s) to read from and write to
*/
public EbeanLocalDAO(@Nonnull BaseMetadataEventProducer producer, @Nonnull ServerConfig serverConfig,
@Nonnull LocalDAOStorageConfig storageConfig, @Nonnull Class<URN> urnClass, @Nonnull SchemaConfig schemaConfig) {
this(producer, createServer(serverConfig), serverConfig, storageConfig, urnClass, new EmptyPathExtractor<>(), schemaConfig);
}
/**
* Constructor for EbeanLocalDAO with the option to use the new schema and enable dual-read.
*
* @param producer {@link BaseTrackingMetadataEventProducer} for the metadata event producer
* @param serverConfig {@link ServerConfig} that defines the configuration of EbeanServer instances
* @param storageConfig {@link LocalDAOStorageConfig} containing storage config of full list of supported aspects
* @param urnClass class of the entity URN
* @param schemaConfig Enum indicating which schema(s)/table(s) to read from and write to
* @param trackingManager {@link BaseTrackingManager} tracking manager for producing tracking requests
*/
public EbeanLocalDAO(@Nonnull BaseTrackingMetadataEventProducer producer, @Nonnull ServerConfig serverConfig,
@Nonnull LocalDAOStorageConfig storageConfig, @Nonnull Class<URN> urnClass, @Nonnull SchemaConfig schemaConfig,
@Nonnull BaseTrackingManager trackingManager) {
this(producer, createServer(serverConfig), serverConfig, storageConfig, urnClass, new EmptyPathExtractor<>(), schemaConfig, trackingManager);
}
public EbeanLocalDAO(@Nonnull Class<ASPECT_UNION> aspectUnionClass, @Nonnull BaseTrackingMetadataEventProducer producer,
@Nonnull EbeanServer server, @Nonnull ServerConfig serverConfig, @Nonnull SchemaConfig schemaConfig, @Nonnull Class<URN> urnClass) {
this(aspectUnionClass, producer, server, serverConfig, urnClass, schemaConfig, new EBeanDAOConfig());
}
private EbeanLocalDAO(@Nonnull Class<ASPECT_UNION> aspectUnionClass, @Nonnull BaseMetadataEventProducer producer,
@Nonnull EbeanServer server, @Nonnull Class<URN> urnClass) {
super(aspectUnionClass, producer, urnClass, new EmptyPathExtractor<>());
_server = server;
_urnClass = urnClass;
_localRelationshipWriterDAO = new EbeanLocalRelationshipWriterDAO(_server);
}
private EbeanLocalDAO(@Nonnull Class<ASPECT_UNION> aspectUnionClass, @Nonnull BaseTrackingMetadataEventProducer producer,
@Nonnull EbeanServer server, @Nonnull Class<URN> urnClass, @Nonnull BaseTrackingManager trackingManager) {
super(aspectUnionClass, producer, trackingManager, urnClass, new EmptyPathExtractor<>());
_server = server;
_urnClass = urnClass;
_localRelationshipWriterDAO = new EbeanLocalRelationshipWriterDAO(_server);
}
private EbeanLocalDAO(@Nonnull Class<ASPECT_UNION> aspectUnionClass, @Nonnull BaseMetadataEventProducer producer,
@Nonnull EbeanServer server, @Nonnull ServerConfig serverConfig, @Nonnull Class<URN> urnClass, @Nonnull SchemaConfig schemaConfig) {
this(aspectUnionClass, producer, server, urnClass);
_schemaConfig = schemaConfig;
if (schemaConfig != SchemaConfig.OLD_SCHEMA_ONLY) {
_localAccess = new EbeanLocalAccess<>(server, serverConfig, urnClass, _urnPathExtractor, _eBeanDAOConfig.isNonDollarVirtualColumnsEnabled());
}
}
private EbeanLocalDAO(@Nonnull Class<ASPECT_UNION> aspectUnionClass, @Nonnull BaseTrackingMetadataEventProducer producer,
@Nonnull EbeanServer server, @Nonnull ServerConfig serverConfig, @Nonnull Class<URN> urnClass, @Nonnull SchemaConfig schemaConfig,
@Nonnull BaseTrackingManager trackingManager) {
this(aspectUnionClass, producer, server, urnClass, trackingManager);
_schemaConfig = schemaConfig;
if (schemaConfig != SchemaConfig.OLD_SCHEMA_ONLY) {
_localAccess = new EbeanLocalAccess<>(server, serverConfig, urnClass, _urnPathExtractor, _eBeanDAOConfig.isNonDollarVirtualColumnsEnabled());
}
}
private EbeanLocalDAO(@Nonnull Class<ASPECT_UNION> aspectUnionClass, @Nonnull BaseTrackingMetadataEventProducer producer,
@Nonnull EbeanServer server, @Nonnull ServerConfig serverConfig, @Nonnull Class<URN> urnClass, @Nonnull SchemaConfig schemaConfig,
@Nonnull EBeanDAOConfig ebeanDAOConfig) {
this(aspectUnionClass, producer, server, urnClass);
_schemaConfig = schemaConfig;
if (schemaConfig != SchemaConfig.OLD_SCHEMA_ONLY) {
_localAccess = new EbeanLocalAccess<>(server, serverConfig, urnClass, _urnPathExtractor, ebeanDAOConfig.isNonDollarVirtualColumnsEnabled());
}
}
private EbeanLocalDAO(@Nonnull Class<ASPECT_UNION> aspectUnionClass, @Nonnull BaseMetadataEventProducer producer,
@Nonnull EbeanServer server, @Nonnull ServerConfig serverConfig, @Nonnull Class<URN> urnClass, @Nonnull FindMethodology findMethodology) {
this(aspectUnionClass, producer, server, urnClass);
_findMethodology = findMethodology;
}
private EbeanLocalDAO(@Nonnull Class<ASPECT_UNION> aspectUnionClass,
@Nonnull BaseTrackingMetadataEventProducer producer, @Nonnull EbeanServer server,
@Nonnull ServerConfig serverConfig, @Nonnull Class<URN> urnClass, @Nonnull FindMethodology findMethodology,
@Nonnull BaseTrackingManager trackingManager) {
this(aspectUnionClass, producer, server, urnClass, trackingManager);
_findMethodology = findMethodology;
}
// Only called in testing (test all possible combos of SchemaConfig, FindMethodology)
@VisibleForTesting
EbeanLocalDAO(@Nonnull Class<ASPECT_UNION> aspectUnionClass, @Nonnull BaseMetadataEventProducer producer,
@Nonnull EbeanServer server, @Nonnull ServerConfig serverConfig, @Nonnull Class<URN> urnClass,
@Nonnull SchemaConfig schemaConfig,
@Nonnull FindMethodology findMethodology, @Nonnull EBeanDAOConfig ebeanDAOConfig) {
this(aspectUnionClass, producer, server, serverConfig, urnClass, schemaConfig);
_findMethodology = findMethodology;
if (schemaConfig != SchemaConfig.OLD_SCHEMA_ONLY) {
_localAccess = new EbeanLocalAccess<>(server, serverConfig, urnClass, _urnPathExtractor, ebeanDAOConfig.isNonDollarVirtualColumnsEnabled());
}
}
@VisibleForTesting
EbeanLocalDAO(@Nonnull BaseMetadataEventProducer producer, @Nonnull EbeanServer server,
@Nonnull LocalDAOStorageConfig storageConfig, @Nonnull Class<URN> urnClass,
@Nonnull UrnPathExtractor<URN> urnPathExtractor) {
super(producer, storageConfig, urnClass, urnPathExtractor);
_server = server;
_urnClass = urnClass;
_localRelationshipWriterDAO = new EbeanLocalRelationshipWriterDAO(_server);
}
private EbeanLocalDAO(@Nonnull BaseTrackingMetadataEventProducer producer, @Nonnull EbeanServer server,
@Nonnull LocalDAOStorageConfig storageConfig, @Nonnull Class<URN> urnClass,
@Nonnull UrnPathExtractor<URN> urnPathExtractor, @Nonnull BaseTrackingManager trackingManager) {
super(producer, storageConfig, trackingManager, urnClass, urnPathExtractor);
_server = server;
_urnClass = urnClass;
_localRelationshipWriterDAO = new EbeanLocalRelationshipWriterDAO(_server);
}
private EbeanLocalDAO(@Nonnull BaseMetadataEventProducer producer, @Nonnull EbeanServer server,
@Nonnull ServerConfig serverConfig, @Nonnull LocalDAOStorageConfig storageConfig, @Nonnull Class<URN> urnClass,
@Nonnull UrnPathExtractor<URN> urnPathExtractor, @Nonnull SchemaConfig schemaConfig) {
this(producer, server, storageConfig, urnClass, urnPathExtractor);
_schemaConfig = schemaConfig;
if (schemaConfig != SchemaConfig.OLD_SCHEMA_ONLY) {
_localAccess = new EbeanLocalAccess<>(server, serverConfig, urnClass, urnPathExtractor, _eBeanDAOConfig.isNonDollarVirtualColumnsEnabled());
}
}
private EbeanLocalDAO(@Nonnull BaseTrackingMetadataEventProducer producer, @Nonnull EbeanServer server, @Nonnull ServerConfig serverConfig,
@Nonnull LocalDAOStorageConfig storageConfig, @Nonnull Class<URN> urnClass, @Nonnull UrnPathExtractor<URN> urnPathExtractor,
@Nonnull SchemaConfig schemaConfig, @Nonnull BaseTrackingManager trackingManager) {
this(producer, server, storageConfig, urnClass, urnPathExtractor, trackingManager);
_schemaConfig = schemaConfig;
if (schemaConfig != SchemaConfig.OLD_SCHEMA_ONLY) {
_localAccess = new EbeanLocalAccess<>(server, serverConfig, urnClass, urnPathExtractor, _eBeanDAOConfig.isNonDollarVirtualColumnsEnabled());
}
}
@VisibleForTesting
EbeanLocalDAO(@Nonnull BaseMetadataEventProducer producer, @Nonnull EbeanServer server,
@Nonnull LocalDAOStorageConfig storageConfig, @Nonnull Class<URN> urnClass) {
this(producer, server, storageConfig, urnClass, new EmptyPathExtractor<>());
}
@VisibleForTesting
EbeanLocalDAO(@Nonnull BaseMetadataEventProducer producer, @Nonnull EbeanServer server, @Nonnull ServerConfig serverConfig,
@Nonnull LocalDAOStorageConfig storageConfig, @Nonnull Class<URN> urnClass, @Nonnull SchemaConfig schemaConfig) {
this(producer, server, serverConfig, storageConfig, urnClass, new EmptyPathExtractor<>(), schemaConfig);
}
public void setUrnPathExtractor(@Nonnull UrnPathExtractor<URN> urnPathExtractor) {
if (_schemaConfig != SchemaConfig.OLD_SCHEMA_ONLY) {
_localAccess.setUrnPathExtractor(urnPathExtractor);
}
_urnPathExtractor = urnPathExtractor;
}
@Nonnull
public UrnPathExtractor<URN> getUrnPathExtractor() {
return _urnPathExtractor;
}
/**
* Return the {@link EbeanServer} server instance used for customized queries.
*/
public EbeanServer getServer() {
return _server;
}
/**
* Getter for which SchemaConfig this DAO is using.
* @return _schemaConfig
*/
public SchemaConfig getSchemaConfig() {
return _schemaConfig;
}
/**
* Overwride schema config, unit-test only.
* @param schemaConfig schema config
*/
void setSchemaConfig(SchemaConfig schemaConfig) {
_schemaConfig = schemaConfig;
}
/**
* Set benchmark metrics for DAO operation instrumentation. Wraps the underlying
* {@link IEbeanLocalAccess} with an {@link InstrumentedEbeanLocalAccess} decorator.
* No-op when {@code _localAccess} is {@code null} (OLD_SCHEMA_ONLY mode).
*
* @param metrics the benchmark metrics implementation to use
*/
public void setBenchmarkMetrics(@Nonnull BaseDaoBenchmarkMetrics metrics) {
if (_localAccess != null) {
_localAccess = new InstrumentedEbeanLocalAccess<>(_localAccess, metrics, _urnClass);
}
}
/**
* Ensure table schemas is up-to-date with db evolution scripts.
*/
public void ensureSchemaUpToDate() {
if (_schemaConfig.equals(SchemaConfig.OLD_SCHEMA_ONLY)) {
throw new UnsupportedOperationException("DB evolution script is not supported in old schema mode.");
}
_localAccess.ensureSchemaUpToDate();
}
@Nonnull
@Override
protected <T> T runInTransactionWithRetry(@Nonnull Supplier<T> block, int maxTransactionRetry) {
int retryCount = 0;
Exception lastException;
T result = null;
do {
try (Transaction transaction = _server.beginTransaction()) {
result = block.get();
transaction.commit();
lastException = null;
break;
} catch (RollbackException | DuplicateKeyException | OptimisticLockException exception) {
lastException = exception;
}
} while (++retryCount <= maxTransactionRetry);
if (lastException != null) {
throw new RetryLimitReached("Failed to add after " + maxTransactionRetry + " retries", lastException);
}
return result;
}
/**
* Extracts the optimistic lock for a specific aspect from the ingestion parameters if possible.
* @param ingestionParams the ingestion parameters containing the aspects and their eTags
* @param aspectClass aspect class
* @param urn asset urn
* @return the optimistic lock {@link AuditStamp} if it exists, otherwise null (i.e. if the aspect is not present in the ingestion params).
* @param <ASPECT> the aspect type
*/
@Override
@Nullable
public <ASPECT extends RecordTemplate> AuditStamp extractOptimisticLockForAspectFromIngestionParamsIfPossible(
@Nullable IngestionParams ingestionParams, @Nonnull Class<ASPECT> aspectClass, @Nonnull URN urn) {
if (ingestionParams == null) {
return null;
}
AuditStamp optimisticLockAuditStamp = null;
final IngestionAspectETagArray ingestionAspectETags = ingestionParams.getIngestionETags();
if (ingestionAspectETags != null) {
for (IngestionAspectETag ingestionAspectETag: ingestionAspectETags) {
final String aspectAlias;
try {
aspectAlias = SQLSchemaUtils.getColumnName(urn.getEntityType(), aspectClass.getCanonicalName());
} catch (Exception e) {
continue;
}
if (aspectAlias != null && aspectAlias.equalsIgnoreCase(ingestionAspectETag.getAspect_alias())) {
Long decryptedETag = getDecryptedETag(ingestionAspectETag);
if (decryptedETag != null) {
optimisticLockAuditStamp = new AuditStamp();
optimisticLockAuditStamp.setTime(decryptedETag);
break;
}
}
}
}
return optimisticLockAuditStamp;
}
/**
* When eTag is null, it means this is a regular ingestion request, no read-modify-write consistency guarantee.
*/
@Nullable
private Long getDecryptedETag(@Nonnull IngestionAspectETag ingestionAspectETag) {
try {
if (ingestionAspectETag.getEtag() == null) {
return null;
}
return ETagUtils.decrypt(ingestionAspectETag.getEtag());
} catch (Exception e) {
return null;
}
}
@Override
protected <ASPECT extends RecordTemplate> long saveLatest(@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass,
@Nullable ASPECT oldValue, @Nullable AuditStamp optimisticLockAuditStamp, @Nullable ASPECT newValue,
@Nonnull AuditStamp newAuditStamp, boolean isSoftDeleted, @Nullable IngestionTrackingContext trackingContext,
boolean isTestMode) {
// Save oldValue as the largest version + 1
long largestVersion = 0;
if ((isSoftDeleted || oldValue != null) && optimisticLockAuditStamp != null && _changeLogEnabled) {
// When saving on entity which has history version (including being soft deleted), and changeLog is enabled,
// the saveLatest will process the following steps:
// 1. get the next version from the metadata_aspect table
// 2. write value of latest version (version = 0) as a new version
// 3. update the latest version (version = 0) with the new value. If the value of latest version has been
// changed during this process, then rollback by throwing OptimisticLockException
largestVersion = getNextVersion(urn, aspectClass);
// TODO(yanyang) added for job-gms duplicity debug, throwaway afterwards
if (log.isDebugEnabled()) {
if ("AzkabanFlowInfo".equals(aspectClass.getSimpleName())) {
log.debug("Insert: {} => oldValue = {}, latest version = {}", urn, oldValue, largestVersion);
}
}
// Move latest version to historical version by insert a new record only if we are not overwriting the latest version.
if (!_overwriteLatestVersionEnabled) {
insert(urn, oldValue, aspectClass, optimisticLockAuditStamp, largestVersion, trackingContext, isTestMode);
}
// update latest version
updateWithOptimisticLocking(urn, newValue, aspectClass, newAuditStamp, LATEST_VERSION,
new Timestamp(optimisticLockAuditStamp.getTime()), trackingContext, isTestMode);
} else {
// When for fresh ingestion or with changeLog disabled
// TODO(yanyang) added for job-gms duplicity debug, throwaway afterwards
if (log.isDebugEnabled()) {
if ("AzkabanFlowInfo".equals(aspectClass.getSimpleName())) {
log.debug("Insert: {} => newValue = {}", urn, newValue);
}
}
insert(urn, newValue, aspectClass, newAuditStamp, LATEST_VERSION, trackingContext, isTestMode);
}
// This method will handle relationship ingestions and soft-deletions
handleRelationshipIngestion(urn, newValue, oldValue, aspectClass, isTestMode);
return largestVersion;
}
/**
* Insert a new aspect record into the metadata_aspect table.
*
* @param urn entity urn
* @param aspectCreateLambdas aspect create lambdas
* @param aspectValues aspect values
* @param newAuditStamp audit stamp
* @param trackingContext tracking context
* @param isTestMode test mode
* @param <ASPECT_UNION> aspect union type
* @return the number of rows inserted
*/
@Override
protected <ASPECT_UNION extends RecordTemplate> int createNewAssetWithAspects(@Nonnull URN urn,
@Nonnull List<AspectCreateLambda<? extends RecordTemplate>> aspectCreateLambdas,
@Nonnull List<? extends RecordTemplate> aspectValues, @Nonnull AuditStamp newAuditStamp,
@Nullable IngestionTrackingContext trackingContext, boolean isTestMode) {
return runInTransactionWithRetry(() -> {
// behavior of create: do a get to ensure the urn does not already exist
// if exists and deletedTs is null, then throw an exception
// if exists and deletedTs is not null, then update the deletedTs to null and create records
int rows = _localAccess.create(urn, aspectValues, aspectCreateLambdas, newAuditStamp, trackingContext, isTestMode);
// also insert any relationships associated with these aspects
for (int i = 0; i < aspectValues.size(); i++) {
Class<RecordTemplate> aspectClass = (Class<RecordTemplate>) aspectCreateLambdas.get(i).getAspectClass();
RecordTemplate newValue = aspectValues.get(i);
handleRelationshipIngestion(urn, newValue, null, aspectClass, isTestMode);
}
return rows;
}, 1);
}
@Override
protected int permanentDelete(@Nonnull URN urn, boolean isTestMode) {
// If the table does not have the URN, return empty map. Nothing to delete here.
if (!exists(urn)) {
return 0;
}
return _localAccess.softDeleteAsset(urn, isTestMode);
}
/**
* Delegates to {@link IEbeanLocalAccess#readDeletionInfoBatch}.
*/
public Map<URN, EntityDeletionInfo> readDeletionInfoBatch(@Nonnull List<URN> urns, boolean isTestMode) {
return _localAccess.readDeletionInfoBatch(urns, isTestMode);
}
/**
* Delegates to {@link IEbeanLocalAccess#batchSoftDeleteAssets}.
*/
public int batchSoftDeleteAssets(@Nonnull List<URN> urns, @Nonnull String cutoffTimestamp, boolean isTestMode) {
return _localAccess.batchSoftDeleteAssets(urns, cutoffTimestamp, isTestMode);
}
@Override
public <ASPECT extends RecordTemplate> void updateEntityTables(@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass) {
if (_schemaConfig == SchemaConfig.OLD_SCHEMA_ONLY) {
throw new UnsupportedOperationException("Entity tables cannot be used in OLD_SCHEMA_ONLY mode, so they cannot be backfilled.");
}
PrimaryKey key = new PrimaryKey(urn.toString(), aspectClass.getCanonicalName(), LATEST_VERSION);
runInTransactionWithRetry(() -> {
// use forUpdate() to lock the row during this transaction so that we can guarantee a consistent update.
// order by createdon desc to get the latest value in the case where there are multiple results
EbeanMetadataAspect result = _server.createQuery(EbeanMetadataAspect.class).setId(key).orderBy().desc("createdon").forUpdate().findOne();
if (result == null) {
return null; // unused
}
AuditStamp auditStamp = makeAuditStamp(result);
ASPECT aspect = toRecordTemplate(aspectClass, result).orElse(null);
_localAccess.add(urn, aspect, aspectClass, auditStamp, null, false);
// also insert any relationships associated with this aspect
handleRelationshipIngestion(urn, aspect, null, aspectClass, false);
return null; // unused
}, 1);
}
public <ASPECT extends RecordTemplate> List<LocalRelationshipUpdates> backfillLocalRelationships(
@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass) {
AspectKey<URN, ASPECT> key = new AspectKey<>(aspectClass, urn, LATEST_VERSION);
return runInTransactionWithRetry(() -> {
if (_noisyLogsEnabled) {
log.info("Backfilling local relationships for urn: {}, aspectClass: {}", urn, aspectClass);
}
List<EbeanMetadataAspect> results = batchGet(Collections.singleton(key), 1);
if (results.isEmpty()) {
if (_noisyLogsEnabled) {
log.info("Not backfilling any relationships because no aspect data was found for urn: {}, aspectClass: {}", urn, aspectClass);
}
return new ArrayList<>();
}
if (_noisyLogsEnabled) {
log.info("Trying to convert aspect data from the entity table to a RecordTemplate for urn: {}, aspectClass: {}", urn, aspectClass);
}
Optional<ASPECT> aspect = toRecordTemplate(aspectClass, results.get(0));
if (aspect.isPresent()) {
if (_noisyLogsEnabled) {
log.info("Successfully converted aspect data to a RecordTemplate for urn: {}, aspectClass: {}", urn, aspectClass);
}
return handleRelationshipIngestion(urn, aspect.get(), null, aspectClass, false);
}
if (_noisyLogsEnabled) {
log.info("Not backfilling any relationships because aspect data was unable to be converted to a RecordTemplate "
+ "for urn: {}, aspectClass: {}", urn, aspectClass);
}
return Collections.emptyList();
}, 1);
}
/**
* Get latest metadata aspect record by urn and aspect.
* @param urn entity urn
* @param aspectClass aspect class
* @param <ASPECT> aspect type
* @return metadata aspect ebean model {@link EbeanMetadataAspect}
*/
private @Nullable <ASPECT extends RecordTemplate> EbeanMetadataAspect queryLatest(@Nonnull URN urn,
@Nonnull Class<ASPECT> aspectClass, boolean isTestMode) {
EbeanMetadataAspect result;
if (_schemaConfig == SchemaConfig.OLD_SCHEMA_ONLY || _schemaConfig == SchemaConfig.DUAL_SCHEMA) {
final String aspectName = ModelUtils.getAspectName(aspectClass);
final PrimaryKey key = new PrimaryKey(urn.toString(), aspectName, LATEST_VERSION);
if (_findMethodology == FindMethodology.DIRECT_SQL) {
result = findLatestMetadataAspect(_server, urn, aspectClass);
if (result == null) {
// Attempt 1: retry
result = _server.find(EbeanMetadataAspect.class, key);
if (log.isDebugEnabled()) {
log.debug("Attempt 1: Retried on {}, {}", urn, result);
}
}
} else {
result = _server.find(EbeanMetadataAspect.class, key);
}
} else {
// for new schema, get latest data from the new schema entity table. (Resolving the read de-coupling issue)
final List<EbeanMetadataAspect> results =
_localAccess.batchGetUnion(Collections.singletonList(new AspectKey<>(aspectClass, urn, LATEST_VERSION)), 1, 0,
true, isTestMode);
result = results.isEmpty() ? null : results.get(0);
}
return result;
}
@Override
@Nonnull
protected <ASPECT extends RecordTemplate> AspectEntry<ASPECT> getLatest(@Nonnull URN urn,
@Nonnull Class<ASPECT> aspectClass, boolean isTestMode) {
EbeanMetadataAspect latest = queryLatest(urn, aspectClass, isTestMode);
if (latest == null) {
return new AspectEntry<>(null, null);
}
final ExtraInfo extraInfo = toExtraInfo(latest);
if (isSoftDeletedAspect(latest, aspectClass)) {
return new AspectEntry<>(null, extraInfo, true);
}
return new AspectEntry<>(RecordUtils.toRecordTemplate(aspectClass, latest.getMetadata()), extraInfo);
}
@Nonnull
private <ASPECT extends RecordTemplate> EbeanMetadataAspect buildMetadataAspectBean(@Nonnull URN urn,
@Nullable RecordTemplate value, @Nonnull Class<ASPECT> aspectClass, @Nonnull AuditStamp auditStamp, long version) {
final String aspectName = ModelUtils.getAspectName(aspectClass);
final EbeanMetadataAspect aspect = new EbeanMetadataAspect();
aspect.setKey(new PrimaryKey(urn.toString(), aspectName, version));
if (value != null) {
aspect.setMetadata(RecordUtils.toJsonString(value));
} else {
aspect.setMetadata(DELETED_VALUE);
}
aspect.setCreatedOn(new Timestamp(auditStamp.getTime()));
aspect.setCreatedBy(auditStamp.getActor().toString());
final Urn impersonator = auditStamp.getImpersonator();
if (impersonator != null) {
aspect.setCreatedFor(impersonator.toString());
}
return aspect;
}
// Build manual SQL update query to enable optimistic locking on a given column
// Optimistic locking is supported on ebean using @version, see https://ebean.io/docs/mapping/jpa/version
// But we can't use @version annotation for optimistic locking for two reasons:
// 1. That prevents flag guarding optimistic locking feature
// 2. When using @version annotation, Ebean starts to override all updates to that column
// by disregarding any user change.
// Ideally, another column for the sake of optimistic locking would be preferred but that means a change to
// metadata_aspect schema and we don't take this route here to keep this change backward compatible.
private static final String OPTIMISTIC_LOCKING_UPDATE_SQL = "UPDATE metadata_aspect "
+ "SET urn = :urn, aspect = :aspect, version = :version, metadata = :metadata, createdOn = :createdOn, createdBy = :createdBy "
+ "WHERE urn = :urn and aspect = :aspect and version = :version and createdOn = :oldTimestamp";
/**
* Assembly SQL UPDATE script for old Schema.
* @param aspect {@link EbeanMetadataAspect}
* @param oldTimestamp old timestamp.The generated SQL will use optimistic locking and do compare-and-set
* with oldTimestamp during the update.
* @return {@link SqlUpdate} for SQL update execution
*/
private SqlUpdate assembleOldSchemaSqlUpdate(@Nonnull EbeanMetadataAspect aspect, @Nonnull Timestamp oldTimestamp) {
final SqlUpdate oldSchemaSqlUpdate = _server.createSqlUpdate(OPTIMISTIC_LOCKING_UPDATE_SQL);
oldSchemaSqlUpdate.setParameter("oldTimestamp", oldTimestamp);
oldSchemaSqlUpdate.setParameter("urn", aspect.getKey().getUrn());
oldSchemaSqlUpdate.setParameter("aspect", aspect.getKey().getAspect());
oldSchemaSqlUpdate.setParameter("version", aspect.getKey().getVersion());
oldSchemaSqlUpdate.setParameter("metadata", aspect.getMetadata());
oldSchemaSqlUpdate.setParameter("createdOn", aspect.getCreatedOn());
oldSchemaSqlUpdate.setParameter("createdBy", aspect.getCreatedBy());
return oldSchemaSqlUpdate;
}
@VisibleForTesting
@Override
protected <ASPECT extends RecordTemplate> void updateWithOptimisticLocking(@Nonnull URN urn,
@Nullable RecordTemplate value, @Nonnull Class<ASPECT> aspectClass, @Nonnull AuditStamp newAuditStamp,
long version, @Nonnull Timestamp oldTimestamp, @Nullable IngestionTrackingContext trackingContext,
boolean isTestMode) {
final EbeanMetadataAspect aspect = buildMetadataAspectBean(urn, value, aspectClass, newAuditStamp, version);
if (!_changeLogEnabled) {
throw new UnsupportedOperationException(
String.format("updateWithOptimisticLocking should not be called when changeLog is disabled: %s", aspect));
}
int numOfUpdatedRows;
// ensure atomicity by running old schema update + new schema update in a transaction
final SqlUpdate oldSchemaSqlUpdate;
if (_schemaConfig == SchemaConfig.NEW_SCHEMA_ONLY) {
// In NEW_SCHEMA, the entity table is the SOT and getLatest (oldTimestamp) reads from the entity
// table. Therefore, we will apply compare-and-set with oldTimestamp on entity table (addWithOptimisticLocking).
// We will also apply an optimistic locking update over (urn, aspect, version) primary key combination to avoid duplicate
// key exceptions when the primary key includes createdon.
EbeanMetadataAspect result = findLatestMetadataAspect(_server, urn, aspectClass);
if (result == null) {
throw new IllegalStateException("No entry from aspect table found even though one was expected. Urn: " + urn + ", Aspect class:" + aspectClass);
}
oldSchemaSqlUpdate = assembleOldSchemaSqlUpdate(aspect, result.getCreatedOn());
numOfUpdatedRows = runInTransactionWithRetry(() -> {
// DUAL WRITE: 1) update aspect table, 2) update entity table.
// Note: when cold-archive is enabled, this method: updateWithOptimisticLocking will not be called.
_server.execute(oldSchemaSqlUpdate);
return _localAccess.addWithOptimisticLocking(urn, (ASPECT) value, aspectClass, newAuditStamp, oldTimestamp,
trackingContext, isTestMode, true);
}, 1);
} else {
// In OLD_SCHEMA and DUAL_SCHEMA mode, the aspect table is the SOT and the getLatest (oldTimestamp) is from the aspect table.
// Therefore, we will apply compare-and-set with oldTimestamp on aspect table (assemblyOldSchemaSqlUpdate)
oldSchemaSqlUpdate = assembleOldSchemaSqlUpdate(aspect, oldTimestamp);
numOfUpdatedRows = runInTransactionWithRetry(() -> {
// Additionally, in DUAL_SCHEMA mode: apply a regular update (no optimistic locking) to the entity table
if (_schemaConfig == SchemaConfig.DUAL_SCHEMA) {
_localAccess.addWithOptimisticLocking(urn, (ASPECT) value, aspectClass, newAuditStamp, null,
trackingContext, isTestMode, false);
}
return _server.execute(oldSchemaSqlUpdate);
}, 1);
}
// If there is no single updated row, emit OptimisticLockException
if (numOfUpdatedRows != 1) {
throw new OptimisticLockException(
String.format("%s rows updated during update on update: %s.", numOfUpdatedRows, aspect));
}
}
@Override
protected <ASPECT extends RecordTemplate> void insert(@Nonnull URN urn, @Nullable RecordTemplate value,
@Nonnull Class<ASPECT> aspectClass, @Nonnull AuditStamp auditStamp, long version,
@Nullable IngestionTrackingContext trackingContext, boolean isTestMode) {
final EbeanMetadataAspect aspect = buildMetadataAspectBean(urn, value, aspectClass, auditStamp, version);
if (_schemaConfig != SchemaConfig.OLD_SCHEMA_ONLY && version == LATEST_VERSION) {
// insert() could be called when updating log table (moving current versions into new history version)
// the metadata entity tables shouldn't been updated.
_localAccess.add(urn, (ASPECT) value, aspectClass, auditStamp, trackingContext, isTestMode);
}
// DO append change log table (metadata_aspect) if:
// 1. explicitly enabled
// AND
// 2. if NOT in test mode
// -> which is ALWAYS a dual-write operation (meaning this insertion will already happen in the "other" write)