Skip to content

Commit f7a894d

Browse files
Fix part of oppia#18422: Beam job to count and display skills that are hanging prerequisites (oppia#22203)
* Wrote beam job * Added tests * fixed failing tests * Fixed test * Fixed linting issues * Fixed linting issues * Improved beam job * Fixed type issue * Fixed linting issues * Added skill description to help server admins in skill deletion * Fixed type and lint checks * Fixed tests --------- Co-authored-by: Jay Vivarekar <jayviva@gmail.com>
1 parent 8a0ea4e commit f7a894d

File tree

5 files changed

+404
-0
lines changed

5 files changed

+404
-0
lines changed
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
# coding: utf-8
2+
#
3+
# Copyright 2025 The Oppia Authors. All Rights Reserved.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS-IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
"""Audit jobs that check if any skills in the datastore have hanging
18+
prerequisites. If a skill A is added as a prerequisite skill to
19+
some topic/skill and deleted or merged to another skill B, A is
20+
called a hanging prerequisite.
21+
"""
22+
23+
from __future__ import annotations
24+
25+
from core.jobs import base_jobs
26+
from core.jobs.io import ndb_io
27+
from core.jobs.types import job_run_result
28+
from core.platform import models
29+
30+
import apache_beam as beam
31+
from typing import Dict, Iterable, List, Optional, Tuple
32+
33+
MYPY = False
34+
if MYPY: # pragma: no cover
35+
from mypy_imports import skill_models
36+
37+
(skill_models,) = models.Registry.import_models([
38+
models.Names.SKILL])
39+
40+
41+
class CountHangingPrerequisiteSkillsJob(base_jobs.JobBase):
42+
"""Job that counts skills having hanging prerequisites."""
43+
44+
def run(self) -> beam.PCollection[job_run_result.JobRunResult]:
45+
"""Returns a PCollection of skills having hanging
46+
prerequisites aggregated from all Skill models.
47+
48+
Returns:
49+
PCollection. A PCollection of hanging prerequisite
50+
skills discovered during the validation.
51+
"""
52+
skill_models_pcoll = (
53+
self.pipeline
54+
| 'Get all SkillModels' >> ndb_io.GetModels(
55+
skill_models.SkillModel.get_all(
56+
include_deleted=False))
57+
)
58+
59+
all_skill_ids = (
60+
skill_models_pcoll
61+
| 'Extract skill model ids' >> beam.Map(
62+
lambda skill_model: skill_model.id
63+
)
64+
)
65+
66+
skills_description_map = (
67+
skill_models_pcoll
68+
| 'Create skill ID to description mapping' >> beam.Map(
69+
lambda skill_model: (
70+
skill_model.id,
71+
skill_model.description
72+
)
73+
)
74+
)
75+
76+
skills_with_superseding_skills = (
77+
skill_models_pcoll
78+
| 'Extract skill models with superseding IDs' >> beam.Map(
79+
lambda skill_model: (
80+
skill_model.id,
81+
skill_model.superseding_skill_id if hasattr(
82+
skill_model,
83+
'superseding_skill_id'
84+
) else None
85+
)
86+
)
87+
)
88+
89+
prerequisite_skill_ids = (
90+
skill_models_pcoll
91+
| 'Extract prerequisite skill ids' >> beam.FlatMap(
92+
lambda skill_model: skill_model.prerequisite_skill_ids
93+
if skill_model.prerequisite_skill_ids else []
94+
)
95+
| 'Remove duplicate prerequisites' >> beam.Filter(
96+
lambda x, seen=set(): not (x in seen or seen.add(x))
97+
)
98+
)
99+
100+
hanging_prerequisites = (
101+
prerequisite_skill_ids
102+
| 'Check if prerequisite exists' >> beam.ParDo(
103+
CheckPrerequisiteExists(),
104+
beam.pvalue.AsIter(all_skill_ids),
105+
beam.pvalue.AsDict(skills_with_superseding_skills),
106+
beam.pvalue.AsDict(skills_description_map)
107+
)
108+
| 'Filter hanging prerequisites' >> beam.Filter(
109+
# Keep only skills that don't exist (False) or have a
110+
# superseding skill.
111+
lambda result: not result[1] or result[2] is not None
112+
)
113+
| 'Create collection of hanging prerequisites' >> beam.Map(
114+
lambda result: job_run_result.JobRunResult.as_stdout(
115+
f"""Skill with ID: {result[0]}{f' (Description: {result[3]})' if result[3]!='' else ''} is referenced as a prerequisite but {'does not exist' if not result[1] else f'is superseded by skill with ID: {result[2]}'}""" # pylint: disable=line-too-long
116+
)
117+
)
118+
)
119+
120+
return hanging_prerequisites
121+
122+
123+
# TODO(#15613): Here we use MyPy ignore because the incomplete typing of
124+
# apache_beam library and absences of stubs in Typeshed, forces MyPy to
125+
# assume that DoFn class is of type Any. Thus to avoid MyPy's error (Class
126+
# cannot subclass 'DoFn' (has type 'Any')), we added an ignore here.
127+
class CheckPrerequisiteExists(beam.DoFn): # type: ignore[misc]
128+
"""DoFn to check if a prerequisite skill exists in the list of all
129+
skills and if it has a superseding skill.
130+
"""
131+
132+
def process(
133+
self,
134+
prerequisite_id: str,
135+
all_skill_ids: List[str],
136+
skill_superseding_map: Dict[str, Optional[str]],
137+
skills_description_map: Dict[str, str],
138+
) -> Iterable[Tuple[str, bool, Optional[str], str]]:
139+
"""Check if the prerequisite exists in all skill IDs.
140+
141+
Args:
142+
prerequisite_id: string. The ID of the prerequisite skill to check.
143+
all_skill_ids: list(string). List of all valid skill IDs.
144+
skill_superseding_map: dict(string, string). Map of skill IDs to
145+
their superseding skill IDs.
146+
skills_description_map: dict(string, string). Map of skill IDs to
147+
their descriptions.
148+
149+
Yields:
150+
Tuple(string, boolean, string, string). A tuple
151+
(prerequisite_id, exists, superseding_id) where exists is True if
152+
the prerequisite exists in all_skill_ids, False otherwise,
153+
superseding_id is the ID of the superseding skill if one exists,
154+
None otherwise and the description of the skill.
155+
"""
156+
exists = prerequisite_id in all_skill_ids
157+
superseding_id = skill_superseding_map.get(prerequisite_id, None)
158+
description = skills_description_map.get(prerequisite_id, '')
159+
yield (prerequisite_id, exists, superseding_id, description)

0 commit comments

Comments
 (0)