Skip to content

Commit c37347e

Browse files
committed
add row stats for parsed committee sessions
1 parent 7497d9d commit c37347e

1 file changed

Lines changed: 30 additions & 2 deletions

File tree

airflow/knesset_data_pipelines/committees/parsed_document_committee_sessions.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import os
22
import shutil
33
from textwrap import dedent
4+
from collections import defaultdict
45

56
import dataflows as DF
67

@@ -66,17 +67,41 @@ def parse_retry(type_, error, document_session_id, retry_report):
6667
print(f'document session {document_session_id} {type_} retry {retry}')
6768

6869

70+
def update_row_stats(row, stats):
71+
document_session_id = str(row['DocumentCommitteeSessionID'])
72+
download_filepath = os.path.join(
73+
config.KNESSET_PIPELINES_DATA_PATH,
74+
'committees', 'download_document_committee_session', row['download_filename']
75+
)
76+
if os.path.exists(download_filepath):
77+
stats['download exists'] += 1
78+
for type_ in ['text', 'parts']:
79+
filepath = os.path.join(
80+
config.KNESSET_PIPELINES_DATA_PATH,
81+
'committees', f'meeting_protocols_{type_}', 'files',
82+
document_session_id[0], document_session_id[1], f'{document_session_id}.{"txt" if type_ == "text" else "csv"}'
83+
)
84+
if os.path.exists(filepath):
85+
stats[f'{type_} exists'] += 1
86+
if os.path.exists(f'{filepath}.hash'):
87+
stats[f'{type_} hash exists'] += 1
88+
89+
6990
def process_rows(rows):
91+
stats = defaultdict(int)
7092
retry_report = []
7193
protocol_session_rows = {}
7294
for row in rows:
95+
stats['total_rows'] += 1
7396
# this if condition should match the one in /committees/filter_document_committee_sessions.py
7497
if (row['GroupTypeID'] != 23
7598
or row['ApplicationDesc'] != 'DOC'
7699
or (not row["FilePath"].lower().endswith('.doc')
77100
and not row["FilePath"].lower().endswith('.docx'))):
78101
yield row
79102
else:
103+
stats['protocol_rows'] += 1
104+
update_row_stats(row, stats)
80105
# parse_retry('text', row['text_error'], row['DocumentCommitteeSessionID'], retry_report)
81106
# parse_retry('parts', row['parts_error'], row['DocumentCommitteeSessionID'], retry_report)
82107
protocol_session_rows.setdefault(row['CommitteeSessionID'], []).append(row)
@@ -86,9 +111,12 @@ def process_rows(rows):
86111
if len(good_rows) > 0:
87112
rows = good_rows
88113
row = rows[0]
89-
legacy_fix('text', row)
90-
legacy_fix('parts', row)
114+
# legacy_fix('text', row)
115+
# legacy_fix('parts', row)
116+
stats['yielded_protocol_rows'] += 1
91117
yield row
118+
for k, v in stats.items():
119+
print(f'{k}: {v}')
92120

93121

94122
def main():

0 commit comments

Comments
 (0)