1
1
from django .conf import settings
2
2
from django .db import connection , models
3
- from django .db .models import OuterRef , Subquery
3
+ from django .db .models import OuterRef , QuerySet , Subquery
4
4
from django .utils import timezone
5
5
6
6
from simple_history .utils import (
9
9
)
10
10
11
11
12
+ class HistoricalQuerySet (QuerySet ):
13
+ """
14
+ Enables additional functionality when working with historical records.
15
+
16
+ For additional history on this topic, see:
17
+ - https://github.com/jazzband/django-simple-history/pull/229
18
+ - https://github.com/jazzband/django-simple-history/issues/354
19
+ - https://github.com/jazzband/django-simple-history/issues/397
20
+ """
21
+
22
+ def __init__ (self , * args , ** kwargs ):
23
+ super (HistoricalQuerySet , self ).__init__ (* args , ** kwargs )
24
+ self ._as_instances = False
25
+ self ._pk_attr = self .model .instance_type ._meta .pk .attname
26
+
27
+ def as_instances (self ):
28
+ """
29
+ Return a queryset that generates instances instead of historical records.
30
+ Queries against the resulting queryset will translate `pk` into the
31
+ primary key field of the original type.
32
+
33
+ Returns a queryset.
34
+ """
35
+ if not self ._as_instances :
36
+ result = self .exclude (history_type = "-" )
37
+ result ._as_instances = True
38
+ else :
39
+ result = self ._clone ()
40
+ return result
41
+
42
+ def filter (self , * args , ** kwargs ):
43
+ """
44
+ If a `pk` filter arrives and the queryset is returning instances
45
+ then the caller actually wants to filter based on the original
46
+ type's primary key, and not the history_id (historical record's
47
+ primary key); this happens frequently with DRF.
48
+ """
49
+ if self ._as_instances and "pk" in kwargs :
50
+ kwargs [self ._pk_attr ] = kwargs .pop ("pk" )
51
+ return super ().filter (* args , ** kwargs )
52
+
53
+ def latest_of_each (self ):
54
+ """
55
+ Ensures results in the queryset are the latest historical record for each
56
+ primary key. Deletions are not removed.
57
+
58
+ Returns a queryset.
59
+ """
60
+ # If using MySQL, need to get a list of IDs in memory and then use them for the
61
+ # second query.
62
+ # Does mean two loops through the DB to get the full set, but still a speed
63
+ # improvement.
64
+ backend = connection .vendor
65
+ if backend == "mysql" :
66
+ history_ids = {}
67
+ for item in self .order_by ("-history_date" , "-pk" ):
68
+ if getattr (item , self ._pk_attr ) not in history_ids :
69
+ history_ids [getattr (item , self ._pk_attr )] = item .pk
70
+ latest_historics = self .filter (history_id__in = history_ids .values ())
71
+ elif backend == "postgresql" :
72
+ latest_pk_attr_historic_ids = (
73
+ self .order_by (self ._pk_attr , "-history_date" , "-pk" )
74
+ .distinct (self ._pk_attr )
75
+ .values_list ("pk" , flat = True )
76
+ )
77
+ latest_historics = self .filter (history_id__in = latest_pk_attr_historic_ids )
78
+ else :
79
+ latest_pk_attr_historic_ids = (
80
+ self .filter (** {self ._pk_attr : OuterRef (self ._pk_attr )})
81
+ .order_by ("-history_date" , "-pk" )
82
+ .values ("pk" )[:1 ]
83
+ )
84
+ latest_historics = self .filter (
85
+ history_id__in = Subquery (latest_pk_attr_historic_ids )
86
+ )
87
+ return latest_historics
88
+
89
+ def _clone (self ):
90
+ c = super ()._clone ()
91
+ c ._as_instances = self ._as_instances
92
+ c ._pk_attr = self ._pk_attr
93
+ return c
94
+
95
+ def _fetch_all (self ):
96
+ super ()._fetch_all ()
97
+ self ._instanceize ()
98
+
99
+ def _instanceize (self ):
100
+ """
101
+ Convert the result cache to instances if possible and it has not already been
102
+ done. If a query extracts `.values(...)` then the result cache will not contain
103
+ historical objects to be converted.
104
+ """
105
+ if (
106
+ self ._result_cache
107
+ and self ._as_instances
108
+ and isinstance (self ._result_cache [0 ], self .model )
109
+ ):
110
+ self ._result_cache = [item .instance for item in self ._result_cache ]
111
+
112
+
12
113
class HistoryDescriptor :
13
114
def __init__ (self , model ):
14
115
self .model = model
15
116
16
117
def __get__ (self , instance , owner ):
17
- if instance is None :
18
- return HistoryManager (self .model )
19
- return HistoryManager (self .model , instance )
118
+ return HistoryManager .from_queryset (HistoricalQuerySet )(self .model , instance )
20
119
21
120
22
121
class HistoryManager (models .Manager ):
@@ -66,16 +165,41 @@ def most_recent(self):
66
165
return self .instance .__class__ (** values )
67
166
68
167
def as_of (self , date ):
69
- """Get a snapshot as of a specific date.
168
+ """
169
+ Get a snapshot as of a specific date.
170
+
171
+ When this is used on an instance, it will return the instance based
172
+ on the specific date. If the instance did not exist yet, or had been
173
+ deleted, then a DoesNotExist error is railed.
174
+
175
+ When this is used on a model's history manager, the resulting queryset
176
+ will locate the most recent historical record before the specified date
177
+ for each primary key, generating instances. If the most recent historical
178
+ record is a deletion, that instance is dropped from the result.
179
+
180
+ A common usage pattern for querying is to accept an optional time
181
+ point `date` and then use:
182
+
183
+ `qs = <Model>.history.as_of(date) if date else <Model>.objects`
70
184
71
- Returns an instance, or an iterable of the instances, of the
72
- original model with all the attributes set according to what
73
- was present on the object on the date provided.
185
+ after which point one can add filters, values - anything a normal
186
+ queryset would support.
187
+
188
+ To retrieve historical records, query the model's history directly;
189
+ for example:
190
+ `qs = <Model>.history.filter(history_date__lte=date, pk=...)`
191
+
192
+ To retrieve the most recent historical record, including deletions,
193
+ you could then use:
194
+ `qs = qs.latest_of_each()`
74
195
"""
75
- if not self .instance :
76
- return self ._as_of_set (date )
77
196
queryset = self .get_queryset ().filter (history_date__lte = date )
197
+ if not self .instance :
198
+ queryset = queryset .latest_of_each ().as_instances ()
199
+ return queryset
200
+
78
201
try :
202
+ # historical records are sorted in reverse chronological order
79
203
history_obj = queryset [0 ]
80
204
except IndexError :
81
205
raise self .instance .DoesNotExist (
@@ -87,43 +211,6 @@ def as_of(self, date):
87
211
)
88
212
return history_obj .instance
89
213
90
- def _as_of_set (self , date ):
91
- model = type (self .model ().instance ) # a bit of a hack to get the model
92
- pk_attr = model ._meta .pk .name
93
- queryset = self .get_queryset ().filter (history_date__lte = date )
94
- # If using MySQL, need to get a list of IDs in memory and then use them for the
95
- # second query.
96
- # Does mean two loops through the DB to get the full set, but still a speed
97
- # improvement.
98
- backend = connection .vendor
99
- if backend == "mysql" :
100
- history_ids = {}
101
- for item in queryset .order_by ("-history_date" , "-pk" ):
102
- if getattr (item , pk_attr ) not in history_ids :
103
- history_ids [getattr (item , pk_attr )] = item .pk
104
- latest_historics = queryset .filter (history_id__in = history_ids .values ())
105
- elif backend == "postgresql" :
106
- latest_pk_attr_historic_ids = (
107
- queryset .order_by (pk_attr , "-history_date" , "-pk" )
108
- .distinct (pk_attr )
109
- .values_list ("pk" , flat = True )
110
- )
111
- latest_historics = queryset .filter (
112
- history_id__in = latest_pk_attr_historic_ids
113
- )
114
- else :
115
- latest_pk_attr_historic_ids = (
116
- queryset .filter (** {pk_attr : OuterRef (pk_attr )})
117
- .order_by ("-history_date" , "-pk" )
118
- .values ("pk" )[:1 ]
119
- )
120
- latest_historics = queryset .filter (
121
- history_id__in = Subquery (latest_pk_attr_historic_ids )
122
- )
123
- adjusted = latest_historics .exclude (history_type = "-" ).order_by (pk_attr )
124
- for historic_item in adjusted :
125
- yield historic_item .instance
126
-
127
214
def bulk_history_create (
128
215
self ,
129
216
objs ,
0 commit comments