Skip to content

Commit 2cadd42

Browse files
authored
Fixes oppia#11736: Add audit job and beam cleanup for feedback threads marked as having suggestions but without a corresponding GeneralSuggestionModel. (oppia#24720)
* added files * added files * added files * added files * added files * added files * added files * resolved backend tests * resolved backend tests * resolved backend tests * Refactored code * Refactored code * Refactored code * Refactored code * Refactored code * Refactored code * Refactored code * updated implementation approach * fixed lint issue
1 parent 251a7ae commit 2cadd42

File tree

5 files changed

+431
-0
lines changed

5 files changed

+431
-0
lines changed
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
# coding: utf-8
2+
#
3+
# Copyright 2026 The Oppia Authors. All Rights Reserved.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS-IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
"""Audit job that finds feedback threads marked as having suggestions
18+
but without a corresponding GeneralSuggestionModel.
19+
"""
20+
21+
from __future__ import annotations
22+
23+
from core.jobs import base_jobs
24+
from core.jobs.io import ndb_io
25+
from core.jobs.types import job_run_result
26+
from core.platform import models
27+
28+
import apache_beam as beam
29+
30+
MYPY = False
31+
if MYPY: # pragma: no cover
32+
from mypy_imports import feedback_models, suggestion_models
33+
34+
(feedback_models, suggestion_models) = models.Registry.import_models(
35+
[
36+
models.Names.FEEDBACK,
37+
models.Names.SUGGESTION,
38+
]
39+
)
40+
41+
42+
class FixThreadsWithMissingSuggestionsJob(base_jobs.JobBase):
43+
"""Fixes feedback threads where has_suggestion=True but no suggestion exists.
44+
45+
When DATASTORE_UPDATES_ALLOWED is False, this job behaves as an audit job
46+
and only reports invalid models without mutating the datastore.
47+
"""
48+
49+
DATASTORE_UPDATES_ALLOWED = True
50+
51+
def run(self) -> beam.PCollection[job_run_result.JobRunResult]:
52+
"""Runs the job.
53+
54+
Returns:
55+
PCollection[JobRunResult]. Audit results when datastore updates
56+
are disabled, otherwise fix results.
57+
"""
58+
59+
suggestion_thread_ids = (
60+
self.pipeline
61+
| 'Get GeneralSuggestionModels'
62+
>> ndb_io.GetModels(
63+
suggestion_models.GeneralSuggestionModel.get_all(
64+
include_deleted=False
65+
)
66+
)
67+
| 'Extract suggestion thread ids'
68+
>> beam.Map(lambda model: model.id)
69+
)
70+
71+
feedback_threads = (
72+
self.pipeline
73+
| 'Get GeneralFeedbackThreadModels'
74+
>> ndb_io.GetModels(
75+
feedback_models.GeneralFeedbackThreadModel.get_all(
76+
include_deleted=False
77+
)
78+
)
79+
| 'Keep threads with has_suggestion=True'
80+
>> beam.Filter(lambda thread: thread.has_suggestion)
81+
)
82+
83+
feedback_id_to_model = (
84+
feedback_threads
85+
| 'Map feedback threads to (id, model)'
86+
>> beam.Map(lambda thread: (thread.id, thread))
87+
)
88+
89+
suggestion_id_to_none = (
90+
suggestion_thread_ids
91+
| 'Map suggestion ids to None'
92+
>> beam.Map(lambda thread_id: (thread_id, None))
93+
)
94+
95+
invalid_threads = (
96+
{
97+
'feedback': feedback_id_to_model,
98+
'suggestions': suggestion_id_to_none,
99+
}
100+
| 'CoGroup feedback threads with suggestion ids'
101+
>> beam.CoGroupByKey()
102+
| 'Select threads with missing suggestions'
103+
>> beam.FlatMap(
104+
lambda group: (
105+
group[1]['feedback'] if not group[1]['suggestions'] else []
106+
)
107+
)
108+
)
109+
110+
invalid_thread_logs = invalid_threads | 'Log invalid threads' >> beam.Map(
111+
lambda model: job_run_result.JobRunResult.as_stdout(
112+
(
113+
'GeneralFeedbackThreadModel marked as has_suggestion=True '
114+
'but no GeneralSuggestionModel exists: '
115+
f'id={model.id}'
116+
)
117+
)
118+
)
119+
120+
invalid_thread_count = (
121+
invalid_threads
122+
| 'Count invalid threads'
123+
>> beam.combiners.Count.Globally().with_defaults(0)
124+
| 'Report invalid thread count'
125+
>> beam.Map(
126+
lambda count: job_run_result.JobRunResult.as_stdout(
127+
f'invalid_feedback_thread_models_count: {count}'
128+
)
129+
)
130+
)
131+
132+
outputs = []
133+
134+
if self.DATASTORE_UPDATES_ALLOWED:
135+
unused_fixed_threads_put_results = (
136+
invalid_threads
137+
| 'Unset has_suggestion flag'
138+
>> beam.Map(self._unset_has_suggestion)
139+
| 'Put updated threads' >> ndb_io.PutModels()
140+
)
141+
142+
updated_thread_logs = (
143+
invalid_threads
144+
| 'Log fixed threads'
145+
>> beam.Map(
146+
lambda model: job_run_result.JobRunResult.as_stdout(
147+
(
148+
'Fixed GeneralFeedbackThreadModel by setting '
149+
f'has_suggestion=False: id={model.id}'
150+
)
151+
)
152+
)
153+
)
154+
155+
updated_thread_count = (
156+
invalid_threads
157+
| 'Count fixed threads'
158+
>> beam.combiners.Count.Globally().with_defaults(0)
159+
| 'Report fixed thread count'
160+
>> beam.Map(
161+
lambda count: job_run_result.JobRunResult.as_stdout(
162+
f'fixed_feedback_thread_models_count: {count}'
163+
)
164+
)
165+
)
166+
167+
outputs.extend(
168+
[
169+
updated_thread_logs,
170+
updated_thread_count,
171+
]
172+
)
173+
174+
else:
175+
outputs.extend(
176+
[
177+
invalid_thread_logs,
178+
invalid_thread_count,
179+
]
180+
)
181+
182+
return outputs | 'Flatten outputs' >> beam.Flatten()
183+
184+
@staticmethod
185+
def _unset_has_suggestion(
186+
thread: feedback_models.GeneralFeedbackThreadModel,
187+
) -> feedback_models.GeneralFeedbackThreadModel:
188+
"""Unsets has_suggestion flag."""
189+
thread.has_suggestion = False
190+
return thread
191+
192+
193+
class AuditThreadsWithMissingSuggestionsJob(
194+
FixThreadsWithMissingSuggestionsJob
195+
):
196+
"""Audit job reporting feedback threads with missing suggestions."""
197+
198+
DATASTORE_UPDATES_ALLOWED = False

0 commit comments

Comments
 (0)