Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions pw/patchwork.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,21 @@ def get_patches_all(self, delegate=None, project=None, since=None, action_requir
query['archived'] = 'false'
return self.get_all('patches', query)

def get_series_all(self, project=None, since=None):
def get_new_series(self, project=None, since=None):
if project is None:
project = self._project
return self.get_all('series', {'project': project, 'since': since})
event_params = {
'project': project,
'since': since,
'order': 'date',
'category': 'series-created',
}
events = self.get_all('events', event_params)
if not events:
return [], since
since = events[-1]['date']
series = [self.get('series', e['payload']['series']['id']) for e in events]
return series, since

def post_check(self, patch, name, state, url, desc):
headers = {}
Expand Down
60 changes: 9 additions & 51 deletions pw_poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,10 @@ def __init__(self, config) -> None:
self._pw = Patchwork(config)

self._state = {
'last_poll': (datetime.datetime.now() - datetime.timedelta(hours=2)).timestamp(),
'done_series': [],
'last_event_ts': (datetime.datetime.now() -
datetime.timedelta(hours=2)).strftime('%Y-%m-%dT%H:%M:%S'),
}
self.init_state_from_disk()
self.seen_series = set(self._state['done_series'])
self.done_series = self.seen_series.copy()

self._recheck_period = config.getint('poller', 'recheck_period', fallback=3)
self._recheck_lookback = config.getint('poller', 'recheck_lookback', fallback=9)
Expand Down Expand Up @@ -152,10 +150,6 @@ def series_determine_tree(self, s: PwSeries) -> str:
return ret

def _process_series(self, pw_series) -> None:
if pw_series['id'] in self.seen_series:
log(f"Already seen {pw_series['id']}", "")
return

s = PwSeries(self._pw, pw_series)

log("Series info",
Expand Down Expand Up @@ -184,8 +178,6 @@ def _process_series(self, pw_series) -> None:
core.write_tree_selection_result(self.result_dir, s, comment)
core.mark_done(self.result_dir, s)

self.seen_series.add(s['id'])

def process_series(self, pw_series) -> None:
log_open_sec(f"Checking series {pw_series['id']} with {pw_series['total']} patches")
try:
Expand All @@ -194,59 +186,26 @@ def process_series(self, pw_series) -> None:
log_end_sec()

def run(self, life) -> None:
partial_series = {}
since = self._state['last_event_ts']

prev_big_scan = datetime.datetime.fromtimestamp(self._state['last_poll'])
prev_req_time = datetime.datetime.now()

# We poll every 2 minutes, for series from last 10 minutes
# Every 3 hours we do a larger check of series of last 12 hours to make sure we didn't miss anything
# apparently patchwork uses the time from the email headers and people back date their emails, a lot
# We keep a history of the series we've seen in and since the last big poll to not process twice
try:
# We poll every 2 minutes after this
secs = 0
while life.next_poll(secs):
this_poll_seen = set()
req_time = datetime.datetime.now()

# Decide if this is a normal 4 minute history poll or big scan of last 12 hours
if prev_big_scan + datetime.timedelta(hours=self._recheck_period) < req_time:
big_scan = True
since = prev_big_scan - datetime.timedelta(hours=self._recheck_lookback)
log_open_sec(f"Big scan of last 12 hours at {req_time} since {since}")
else:
big_scan = False
since = prev_req_time - datetime.timedelta(minutes=10)
log_open_sec(f"Checking at {req_time} since {since}")

json_resp = self._pw.get_series_all(since=since)
json_resp, since = self._pw.get_new_series(since=since)
log(f"Loaded {len(json_resp)} series", "")

had_partial_series = False
for pw_series in json_resp:
try:
self.process_series(pw_series)
this_poll_seen.add(pw_series['id'])
except IncompleteSeries:
partial_series.setdefault(pw_series['id'], 0)
if partial_series[pw_series['id']] < 5:
had_partial_series = True
partial_series[pw_series['id']] += 1

if big_scan:
prev_req_time = req_time
prev_big_scan = req_time
# Shorten the history of series we've seen to just the last 12 hours
self.seen_series = this_poll_seen
self.done_series &= self.seen_series
elif had_partial_series:
log("Partial series, not moving time forward", "")
else:
prev_req_time = req_time
# didn't make it to the list fully, patchwork
# shouldn't have had this event at all though
pass

while not self._done_queue.empty():
s = self._done_queue.get()
self.done_series.add(s['id'])
log(f"Testing complete for series {s['id']}", "")

secs = 120 - (datetime.datetime.now() - req_time).total_seconds()
Expand All @@ -257,8 +216,7 @@ def run(self, life) -> None:
pass # finally will still run, but don't splat
finally:
# Dump state before trying to stop workers, in case they hang
self._state['last_poll'] = prev_big_scan.timestamp()
self._state['done_series'] = list(self.seen_series)
self._state['last_event_ts'] = since
with open('poller.state', 'w') as f:
json.dump(self._state, f)

Expand Down