|
1 | 1 | from contextlib import contextmanager
|
2 |
| -from datetime import datetime |
| 2 | +from datetime import datetime, timedelta |
3 | 3 |
|
4 | 4 | from django.core import management
|
5 | 5 | from django.test import TestCase
|
6 | 6 | from six.moves import cStringIO as StringIO
|
7 | 7 |
|
8 | 8 | from simple_history import models as sh_models
|
9 |
| -from simple_history.management.commands import populate_history |
10 |
| -from ..models import Book, Poll, PollWithExcludeFields, Restaurant |
| 9 | +from simple_history.management.commands import populate_history, clean_duplicate_history |
| 10 | +from ..models import Book, Poll, PollWithExcludeFields, Restaurant, Place |
11 | 11 |
|
12 | 12 |
|
13 | 13 | @contextmanager
|
@@ -118,7 +118,7 @@ def test_existing_objects(self):
|
118 | 118 |
|
119 | 119 | def test_no_historical(self):
|
120 | 120 | out = StringIO()
|
121 |
| - with replace_registry(): |
| 121 | + with replace_registry({"test_place": Place}): |
122 | 122 | management.call_command(self.command_name, auto=True, stdout=out)
|
123 | 123 | self.assertIn(populate_history.Command.NO_REGISTERED_MODELS, out.getvalue())
|
124 | 124 |
|
@@ -180,3 +180,178 @@ def test_excluded_fields(self):
|
180 | 180 | )
|
181 | 181 | initial_history_record = PollWithExcludeFields.history.all()[0]
|
182 | 182 | self.assertEqual(initial_history_record.question, poll.question)
|
| 183 | + |
| 184 | + |
| 185 | +class TestCleanDuplicateHistory(TestCase): |
| 186 | + command_name = "clean_duplicate_history" |
| 187 | + command_error = (management.CommandError, SystemExit) |
| 188 | + |
| 189 | + def test_no_args(self): |
| 190 | + out = StringIO() |
| 191 | + management.call_command(self.command_name, stdout=out, stderr=StringIO()) |
| 192 | + self.assertIn(clean_duplicate_history.Command.COMMAND_HINT, out.getvalue()) |
| 193 | + |
| 194 | + def test_bad_args(self): |
| 195 | + test_data = ( |
| 196 | + (clean_duplicate_history.Command.MODEL_NOT_HISTORICAL, ("tests.place",)), |
| 197 | + (clean_duplicate_history.Command.MODEL_NOT_FOUND, ("invalid.model",)), |
| 198 | + (clean_duplicate_history.Command.MODEL_NOT_FOUND, ("bad_key",)), |
| 199 | + ) |
| 200 | + for msg, args in test_data: |
| 201 | + out = StringIO() |
| 202 | + self.assertRaises( |
| 203 | + self.command_error, |
| 204 | + management.call_command, |
| 205 | + self.command_name, |
| 206 | + *args, |
| 207 | + stdout=StringIO(), |
| 208 | + stderr=out |
| 209 | + ) |
| 210 | + self.assertIn(msg, out.getvalue()) |
| 211 | + |
| 212 | + def test_no_historical(self): |
| 213 | + out = StringIO() |
| 214 | + with replace_registry({"test_place": Place}): |
| 215 | + management.call_command(self.command_name, auto=True, stdout=out) |
| 216 | + self.assertIn( |
| 217 | + clean_duplicate_history.Command.NO_REGISTERED_MODELS, out.getvalue() |
| 218 | + ) |
| 219 | + |
| 220 | + def test_auto_dry_run(self): |
| 221 | + p = Poll.objects.create( |
| 222 | + question="Will this be deleted?", pub_date=datetime.now() |
| 223 | + ) |
| 224 | + p.save() |
| 225 | + |
| 226 | + # not related to dry_run test, just for increasing coverage :) |
| 227 | + # create instance with single-entry history older than "minutes" |
| 228 | + # so it is skipped |
| 229 | + p = Poll.objects.create( |
| 230 | + question="Will this be deleted?", pub_date=datetime.now() |
| 231 | + ) |
| 232 | + h = p.history.first() |
| 233 | + h.history_date -= timedelta(hours=1) |
| 234 | + h.save() |
| 235 | + |
| 236 | + self.assertEqual(Poll.history.all().count(), 3) |
| 237 | + out = StringIO() |
| 238 | + management.call_command( |
| 239 | + self.command_name, |
| 240 | + auto=True, |
| 241 | + minutes=50, |
| 242 | + dry=True, |
| 243 | + stdout=out, |
| 244 | + stderr=StringIO(), |
| 245 | + ) |
| 246 | + self.assertEqual( |
| 247 | + out.getvalue(), |
| 248 | + "Removed 1 historical records for " |
| 249 | + "<class 'simple_history.tests.models.Poll'>\n", |
| 250 | + ) |
| 251 | + self.assertEqual(Poll.history.all().count(), 3) |
| 252 | + |
| 253 | + def test_auto_cleanup(self): |
| 254 | + p = Poll.objects.create( |
| 255 | + question="Will this be deleted?", pub_date=datetime.now() |
| 256 | + ) |
| 257 | + self.assertEqual(Poll.history.all().count(), 1) |
| 258 | + p.save() |
| 259 | + self.assertEqual(Poll.history.all().count(), 2) |
| 260 | + p.question = "Maybe this one won't...?" |
| 261 | + p.save() |
| 262 | + self.assertEqual(Poll.history.all().count(), 3) |
| 263 | + out = StringIO() |
| 264 | + management.call_command( |
| 265 | + self.command_name, auto=True, stdout=out, stderr=StringIO() |
| 266 | + ) |
| 267 | + self.assertEqual( |
| 268 | + out.getvalue(), |
| 269 | + "Removed 1 historical records for " |
| 270 | + "<class 'simple_history.tests.models.Poll'>\n", |
| 271 | + ) |
| 272 | + self.assertEqual(Poll.history.all().count(), 2) |
| 273 | + |
| 274 | + def test_auto_cleanup_verbose(self): |
| 275 | + p = Poll.objects.create( |
| 276 | + question="Will this be deleted?", pub_date=datetime.now() |
| 277 | + ) |
| 278 | + self.assertEqual(Poll.history.all().count(), 1) |
| 279 | + p.save() |
| 280 | + p.question = "Maybe this one won't...?" |
| 281 | + p.save() |
| 282 | + self.assertEqual(Poll.history.all().count(), 3) |
| 283 | + out = StringIO() |
| 284 | + management.call_command( |
| 285 | + self.command_name, |
| 286 | + "tests.poll", |
| 287 | + auto=True, |
| 288 | + verbosity=2, |
| 289 | + stdout=out, |
| 290 | + stderr=StringIO(), |
| 291 | + ) |
| 292 | + self.assertEqual( |
| 293 | + out.getvalue(), |
| 294 | + "<class 'simple_history.tests.models.Poll'> has 3 historical entries\n" |
| 295 | + "Removed 1 historical records for " |
| 296 | + "<class 'simple_history.tests.models.Poll'>\n", |
| 297 | + ) |
| 298 | + self.assertEqual(Poll.history.all().count(), 2) |
| 299 | + |
| 300 | + def test_auto_cleanup_dated(self): |
| 301 | + the_time_is_now = datetime.now() |
| 302 | + p = Poll.objects.create( |
| 303 | + question="Will this be deleted?", pub_date=the_time_is_now |
| 304 | + ) |
| 305 | + self.assertEqual(Poll.history.all().count(), 1) |
| 306 | + p.save() |
| 307 | + p.save() |
| 308 | + self.assertEqual(Poll.history.all().count(), 3) |
| 309 | + p.question = "Or this one...?" |
| 310 | + p.save() |
| 311 | + p.save() |
| 312 | + self.assertEqual(Poll.history.all().count(), 5) |
| 313 | + |
| 314 | + for h in Poll.history.all()[2:]: |
| 315 | + h.history_date -= timedelta(hours=1) |
| 316 | + h.save() |
| 317 | + |
| 318 | + management.call_command( |
| 319 | + self.command_name, |
| 320 | + auto=True, |
| 321 | + minutes=50, |
| 322 | + stdout=StringIO(), |
| 323 | + stderr=StringIO(), |
| 324 | + ) |
| 325 | + self.assertEqual(Poll.history.all().count(), 4) |
| 326 | + |
| 327 | + def test_auto_cleanup_dated_extra_one(self): |
| 328 | + the_time_is_now = datetime.now() |
| 329 | + p = Poll.objects.create( |
| 330 | + question="Will this be deleted?", pub_date=the_time_is_now |
| 331 | + ) |
| 332 | + self.assertEqual(Poll.history.all().count(), 1) |
| 333 | + p.save() |
| 334 | + p.save() |
| 335 | + self.assertEqual(Poll.history.all().count(), 3) |
| 336 | + p.question = "Or this one...?" |
| 337 | + p.save() |
| 338 | + p.save() |
| 339 | + p.save() |
| 340 | + p.save() |
| 341 | + self.assertEqual(Poll.history.all().count(), 7) |
| 342 | + |
| 343 | + for h in Poll.history.all()[2:]: |
| 344 | + h.history_date -= timedelta(hours=1) |
| 345 | + h.save() |
| 346 | + |
| 347 | + management.call_command( |
| 348 | + self.command_name, |
| 349 | + auto=True, |
| 350 | + minutes=50, |
| 351 | + stdout=StringIO(), |
| 352 | + stderr=StringIO(), |
| 353 | + ) |
| 354 | + # even though only the last 2 entries match the date range |
| 355 | + # the "extra_one" (the record before the oldest match) |
| 356 | + # is identical to the oldest match, so oldest match is deleted |
| 357 | + self.assertEqual(Poll.history.all().count(), 5) |
0 commit comments