|
1 | 1 | import json |
| 2 | +from itertools import permutations |
2 | 3 | from typing import cast |
3 | 4 |
|
4 | | -from appsignal.check_in.event import Event, cron, describe, heartbeat, is_redundant |
| 5 | +from appsignal.check_in.event import ( |
| 6 | + Event, |
| 7 | + cron, |
| 8 | + deduplicate_cron, |
| 9 | + describe, |
| 10 | + heartbeat, |
| 11 | + is_redundant, |
| 12 | +) |
5 | 13 |
|
6 | 14 |
|
7 | 15 | def test_describe_no_events(): |
@@ -93,3 +101,51 @@ def test_heartbeat_event_json_serialised(): |
93 | 101 | assert '"timestamp":' in serialised |
94 | 102 | assert '"kind":' not in serialised |
95 | 103 | assert '"digest":' not in serialised |
| 104 | + |
| 105 | + |
| 106 | +def test_deduplicate_cron_removes_redundant_pairs(): |
| 107 | + first_start = cron("checkin-name", "first", "start") |
| 108 | + first_finish = cron("checkin-name", "first", "finish") |
| 109 | + second_start = cron("checkin-name", "second", "start") |
| 110 | + second_finish = cron("checkin-name", "second", "finish") |
| 111 | + base_events = [first_start, first_finish, second_start, second_finish] |
| 112 | + |
| 113 | + for events_tuple in permutations(base_events): |
| 114 | + events = list(events_tuple) # Convert tuple to list for in-place modification |
| 115 | + deduplicate_cron(events) |
| 116 | + |
| 117 | + assert len(events) == 2 |
| 118 | + assert events[0]["digest"] == events[1]["digest"] |
| 119 | + assert {"start", "finish"} == {events[0]["kind"], events[1]["kind"]} |
| 120 | + |
| 121 | + |
| 122 | +def test_deduplicate_cron_keeps_unmatched_pairs(): |
| 123 | + first_start = cron("checkin-name", "first", "start") |
| 124 | + second_start = cron("checkin-name", "second", "start") |
| 125 | + second_finish = cron("checkin-name", "second", "finish") |
| 126 | + third_finish = cron("checkin-name", "third", "finish") |
| 127 | + base_events = [first_start, second_start, second_finish, third_finish] |
| 128 | + |
| 129 | + for events_tuple in permutations(base_events): |
| 130 | + events = list(events_tuple) # Convert tuple to list for in-place modification |
| 131 | + deduplicate_cron(events) |
| 132 | + |
| 133 | + assert len(events) == 4 |
| 134 | + for event in [first_start, second_start, second_finish, third_finish]: |
| 135 | + assert event in events |
| 136 | + |
| 137 | + |
| 138 | +def test_deduplicate_cron_keeps_different_identifiers(): |
| 139 | + first_start = cron("checkin-name", "first", "start") |
| 140 | + first_finish = cron("checkin-name", "first", "finish") |
| 141 | + second_start = cron("other-name", "second", "start") |
| 142 | + second_finish = cron("other-name", "second", "finish") |
| 143 | + base_events = [first_start, first_finish, second_start, second_finish] |
| 144 | + |
| 145 | + for events_tuple in permutations(base_events): |
| 146 | + events = list(events_tuple) # Convert tuple to list for in-place modification |
| 147 | + deduplicate_cron(events) |
| 148 | + |
| 149 | + assert len(events) == 4 |
| 150 | + for event in [first_start, first_finish, second_start, second_finish]: |
| 151 | + assert event in events |
0 commit comments