diff --git a/.github/workflows/alembic.yml b/.github/workflows/alembic.yml index 23c14cc..dfa8ba1 100644 --- a/.github/workflows/alembic.yml +++ b/.github/workflows/alembic.yml @@ -32,7 +32,9 @@ jobs: - name: Install dependencies run: | - sudo apt-get install python3.11 python3.11-venv + # for python3.11 (dear internet gods: we'll update to 3.13 or something in a year, i promise) + sudo add-apt-repository ppa:deadsnakes/ppa + sudo apt install python3.11 python3.11-venv python3.11 -m pip install --upgrade pip python3.11 -m venv venv source ./venv/bin/activate diff --git a/requirements.txt b/requirements.txt index c6e07d7..d29d625 100755 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ # major -fastapi==0.109.1 +fastapi==0.115.6 gunicorn==21.2.0 uvicorn[standard]==0.27.1 sqlalchemy[asyncio]==2.0.27 @@ -19,3 +19,4 @@ ruff==0.6.9 # test pytest pytest-asyncio +httpx \ No newline at end of file diff --git a/src/alembic/versions/166f3772fce7_auth_officer_init.py b/src/alembic/versions/166f3772fce7_auth_officer_init.py index fa436c7..2f3a431 100644 --- a/src/alembic/versions/166f3772fce7_auth_officer_init.py +++ b/src/alembic/versions/166f3772fce7_auth_officer_init.py @@ -42,8 +42,8 @@ def upgrade() -> None: sa.Column("id", sa.Integer(), primary_key=True, autoincrement=True), sa.Column("computing_id", sa.String(length=32), sa.ForeignKey("site_user.computing_id"), nullable=False), sa.Column("position", sa.String(length=128), nullable=False), - sa.Column("start_date", sa.DateTime(), nullable=False), - sa.Column("end_date", sa.DateTime(), nullable=True), + sa.Column("start_date", sa.Date(), nullable=False), + sa.Column("end_date", sa.Date(), nullable=True), sa.Column("nickname", sa.String(length=128), nullable=True), sa.Column("favourite_course_0", sa.String(length=32), nullable=True), sa.Column("favourite_course_1", sa.String(length=32), nullable=True), diff --git a/src/cron/daily.py b/src/cron/daily.py index a0a9755..00615d1 100644 --- a/src/cron/daily.py +++ b/src/cron/daily.py @@ -7,7 +7,7 @@ import google_api import utils from database import _db_session -from officers.crud import all_officer_data, get_user_by_username +from officers.crud import all_officers, get_user_by_username _logger = logging.getLogger(__name__) @@ -18,7 +18,7 @@ async def update_google_permissions(db_session): # TODO: for performance, only include officers with recent end-date (1 yr) # but measure performance first - for term in await all_officer_data(db_session): + for term in await all_officers(db_session): if utils.is_active(term): # TODO: if google drive permission is not active, update them pass @@ -31,7 +31,7 @@ async def update_google_permissions(db_session): async def update_github_permissions(db_session): github_permissions, team_id_map = github.all_permissions() - for term in await all_officer_data(db_session): + for term in await all_officers(db_session): new_teams = ( # move all active officers to their respective teams github.officer_teams(term.position) diff --git a/src/load_test_db.py b/src/load_test_db.py index ce72adf..b9bc596 100644 --- a/src/load_test_db.py +++ b/src/load_test_db.py @@ -212,10 +212,9 @@ async def load_test_officers_data(db_session: AsyncSession): )) await db_session.commit() +SYSADMIN_COMPUTING_ID = "gsa92" async def load_sysadmin(db_session: AsyncSession): # put your computing id here for testing purposes - SYSADMIN_COMPUTING_ID = "gsa92" - print(f"loading new sysadmin '{SYSADMIN_COMPUTING_ID}'") await create_user_session(db_session, f"temp_id_{SYSADMIN_COMPUTING_ID}", SYSADMIN_COMPUTING_ID) await create_new_officer_info(db_session, OfficerInfo( @@ -232,28 +231,11 @@ async def load_sysadmin(db_session: AsyncSession): await create_new_officer_term(db_session, OfficerTerm( computing_id=SYSADMIN_COMPUTING_ID, - position=OfficerPosition.SYSTEM_ADMINISTRATOR, - start_date=date.today() - timedelta(days=365), - end_date=None, - - nickname="Gabe", - favourite_course_0="CMPT 379", - favourite_course_1="CMPT 295", - - favourite_pl_0="Rust", - favourite_pl_1="C", - - biography="The systems are good o7", - photo_url=None, - )) - await create_new_officer_term(db_session, OfficerTerm( - computing_id=SYSADMIN_COMPUTING_ID, - position=OfficerPosition.FIRST_YEAR_REPRESENTATIVE, start_date=date.today() - timedelta(days=(365*3)), end_date=date.today() - timedelta(days=(365*2)), - nickname="Gabe", + nickname="G1", favourite_course_0="MACM 101", favourite_course_1="CMPT 125", @@ -270,7 +252,7 @@ async def load_sysadmin(db_session: AsyncSession): start_date=date.today() - timedelta(days=365), end_date=None, - nickname="Gabe", + nickname="G2", favourite_course_0="CMPT 379", favourite_course_1="CMPT 295", @@ -280,6 +262,24 @@ async def load_sysadmin(db_session: AsyncSession): biography="The systems are good o7", photo_url=None, )) + # a future term + await create_new_officer_term(db_session, OfficerTerm( + computing_id=SYSADMIN_COMPUTING_ID, + + position=OfficerPosition.DIRECTOR_OF_ARCHIVES, + start_date=date.today() + timedelta(days=365*1), + end_date=date.today() + timedelta(days=365*2), + + nickname="G3", + favourite_course_0="MACM 102", + favourite_course_1="CMPT 127", + + favourite_pl_0="C%", + favourite_pl_1="C$$", + + biography="o hey fellow kids \n\n\n I will can newline .... !!", + photo_url=None, + )) await db_session.commit() async def async_main(sessionmanager): diff --git a/src/main.py b/src/main.py index d2a01fa..f78060a 100755 --- a/src/main.py +++ b/src/main.py @@ -6,7 +6,6 @@ import database import officers.urls import permission.urls -import tests.urls logging.basicConfig(level=logging.DEBUG) database.setup_database() @@ -17,8 +16,6 @@ app.include_router(officers.urls.router) app.include_router(permission.urls.router) -app.include_router(tests.urls.router) - @app.get("/") async def read_root(): return {"message": "Hello! You might be lost, this is actually the sfucsss.org's backend api."} diff --git a/src/officers/constants.py b/src/officers/constants.py index 3f824ac..f226100 100644 --- a/src/officers/constants.py +++ b/src/officers/constants.py @@ -28,8 +28,7 @@ def position_list() -> list[str]: @staticmethod def length_in_semesters(position: str) -> int | None: - # TODO: ask the committee to maintain a json file with all the important details from the constitution - # (I can create the version version of the file) + # TODO (#101): ask the committee to maintain a json file with all the important details from the constitution """How many semester position is active for, according to the CSSS Constitution""" return _LENGTH_MAP[position] @@ -71,6 +70,7 @@ def is_signer(position: str) -> bool: @staticmethod def expected_positions() -> list[str]: + # TODO (#93): use this function in the daily cronjobs return [ OfficerPosition.PRESIDENT, OfficerPosition.VICE_PRESIDENT, @@ -81,11 +81,11 @@ def expected_positions() -> list[str]: OfficerPosition.DIRECTOR_OF_EDUCATIONAL_EVENTS, OfficerPosition.ASSISTANT_DIRECTOR_OF_EVENTS, OfficerPosition.DIRECTOR_OF_COMMUNICATIONS, - #OfficerPosition.DIRECTOR_OF_OUTREACH, # TODO: when https://github.com/CSSS/documents/pull/9/files merged + #OfficerPosition.DIRECTOR_OF_OUTREACH, # TODO (#101): when https://github.com/CSSS/documents/pull/9/files merged OfficerPosition.DIRECTOR_OF_MULTIMEDIA, OfficerPosition.DIRECTOR_OF_ARCHIVES, OfficerPosition.EXECUTIVE_AT_LARGE, - # TODO: expect these only during fall & spring semesters. Also, TODO: this todo is correct... + # TODO (#101): expect these only during fall & spring semesters. #OfficerPosition.FIRST_YEAR_REPRESENTATIVE, #ElectionsOfficer, @@ -121,8 +121,6 @@ def expected_positions() -> list[str]: OfficerPosition.SOCIAL_MEDIA_MANAGER: "N/A", } -# TODO: when an officer's start date is modified, update the end date as well if it's defined in this list -# a number of semesters (a semester begins on the 1st of each four month period, starting january) # None, means that the length of the position does not have a set length in semesters _LENGTH_MAP = { OfficerPosition.PRESIDENT: 3, diff --git a/src/officers/crud.py b/src/officers/crud.py index 48fd7f5..df92b56 100644 --- a/src/officers/crud.py +++ b/src/officers/crud.py @@ -14,195 +14,155 @@ _logger = logging.getLogger(__name__) +# NOTE: this module should not do any data validation; that should be done in the urls.py or higher layer -async def most_recent_officer_term(db_session: database.DBSession, computing_id: str) -> OfficerTerm | None: +async def current_officers( + db_session: database.DBSession, + include_private: bool +) -> dict[str, list[OfficerData]]: """ - Returns the most recent OfficerTerm an exec has held + Get info about officers that are active. Go through all active & complete officer terms. + + Returns a mapping between officer position and officer terms """ query = ( sqlalchemy .select(OfficerTerm) - .where(OfficerTerm.computing_id == computing_id) .order_by(OfficerTerm.start_date.desc()) - .limit(1) ) - return await db_session.scalar(query) + query = utils.is_active_officer(query) -async def current_officer_positions(db_session: database.DBSession, computing_id: str) -> list[str]: - """ - Returns the list of officer positions a user currently has. Returns [] if the user is not currently an officer. + officer_terms = (await db_session.scalars(query)).all() + officer_data = {} + for term in officer_terms: + officer_info_query = ( + sqlalchemy + .select(OfficerInfo) + .where(OfficerInfo.computing_id == term.computing_id) + ) + officer_info = (await db_session.scalars(officer_info_query)).first() + if officer_info is None: + # TODO (#93): make sure there are daily checks that this data actually exists + continue + elif term.position not in officer_data: + officer_data[term.position] = [] - An officer can have multiple positions at once, such as Webmaster, Frosh chair, and DoEE. + officer_data[term.position] += [ + OfficerData.from_data(term, officer_info, include_private, is_active=True) + ] + + return officer_data + +async def all_officers( + db_session: database.DBSession, + include_private_data: bool, + include_future_terms: bool +) -> list[OfficerData]: """ + This could be a lot of data, so be careful + """ + # NOTE: paginate data if needed query = ( sqlalchemy .select(OfficerTerm) - .where(OfficerTerm.computing_id == computing_id) - # In order of most recent start date first + # Ordered recent first .order_by(OfficerTerm.start_date.desc()) ) - query = utils.is_active_officer(query) + if not include_future_terms: + query = utils.has_started_term(query) - officer_term_list = (await db_session.scalars(query)).all() - return [term.position for term in officer_term_list] + officer_data_list = [] + officer_terms = (await db_session.scalars(query)).all() + for term in officer_terms: + officer_info = await db_session.scalar( + sqlalchemy + .select(OfficerInfo) + .where(OfficerInfo.computing_id == term.computing_id) + ) + officer_data_list += [OfficerData.from_data( + term, + officer_info, + include_private_data, + utils.is_active_term(term) + )] -async def officer_info(db_session: database.DBSession, computing_id: str) -> OfficerInfo: - query = ( + return officer_data_list + +async def get_officer_info_or_raise(db_session: database.DBSession, computing_id: str) -> OfficerInfo: + officer_term = await db_session.scalar( sqlalchemy .select(OfficerInfo) .where(OfficerInfo.computing_id == computing_id) ) - officer_term = await db_session.scalar(query) - if officer_term is None: - raise HTTPException(status_code=400, detail=f"officer_info for computing_id={computing_id} does not exist yet") - return officer_term - -async def officer_term(db_session: database.DBSession, term_id: int) -> OfficerTerm: - query = ( - sqlalchemy - .select(OfficerTerm) - .where(OfficerTerm.id == term_id) - ) - officer_term = await db_session.scalar(query) if officer_term is None: - raise HTTPException(status_code=400, detail=f"Could not find officer_term with id={term_id}") + raise HTTPException(status_code=404, detail=f"officer_info for computing_id={computing_id} does not exist yet") return officer_term async def get_officer_terms( db_session: database.DBSession, computing_id: str, - max_terms: None | int, - # will include term info for officers that are not active - # or have not yet been filled out - include_inactive: bool, + include_future_terms: bool, ) -> list[OfficerTerm]: query = ( sqlalchemy .select(OfficerTerm) .where(OfficerTerm.computing_id == computing_id) + # In order of most recent start date first .order_by(OfficerTerm.start_date.desc()) ) - if not include_inactive: - query = utils.is_active_officer(query) - - if max_terms is not None: - query.limit(max_terms) + if not include_future_terms: + query = utils.has_started_term(query) return (await db_session.scalars(query)).all() -async def current_executive_team(db_session: database.DBSession, include_private: bool) -> dict[str, list[OfficerData]]: +async def get_active_officer_terms( + db_session: database.DBSession, + computing_id: str +) -> list[OfficerTerm]: """ - Get info about officers that are active. Go through all active & complete officer terms. - - Returns a mapping between officer position and officer terms + Returns the list of active officer terms for a user. Returns [] if the user is not currently an officer. + An officer can have multiple positions at once, such as Webmaster, Frosh chair, and DoEE. """ - - query = sqlalchemy.select(OfficerTerm) + query = ( + sqlalchemy + .select(OfficerTerm) + .where(OfficerTerm.computing_id == computing_id) + # In order of most recent start date first + .order_by(OfficerTerm.start_date.desc()) + ) query = utils.is_active_officer(query) - query = query.order_by(OfficerTerm.start_date.desc()) - - officer_terms = (await db_session.scalars(query)).all() - num_officers = {} - officer_data = {} - - for term in officer_terms: - if term.position not in OfficerPosition.position_list(): - _logger.warning( - f"Unknown OfficerTerm.position={term.position} in database. Ignoring in request." - ) - continue - - officer_info_query = sqlalchemy.select(OfficerInfo) - officer_info_query = officer_info_query.where( - OfficerInfo.computing_id == term.computing_id - ) - officer_info = (await db_session.scalars(officer_info_query)).first() - if officer_info is None: - # TODO: make sure there are daily checks that this data actually exists - continue - - if term.position not in officer_data: - num_officers[term.position] = 0 - officer_data[term.position] = [] - num_officers[term.position] += 1 - # TODO: move this to a ~~daily cronjob~~ SQL model checking - if num_officers[term.position] > OfficerPosition.num_active(term.position): - # If there are more active positions than expected, log it to a file - _logger.warning( - f"There are more active {term.position} positions in the OfficerTerm than expected " - f"({num_officers[term.position]} > {OfficerPosition.num_active(term.position)})" - ) - - officer_data[term.position] += [OfficerData.from_data(term, officer_info, include_private, is_active=True)] - - # validate & warn if there are any data issues - # TODO: decide whether we should enforce empty instances or force the frontend to deal with it - for position in OfficerPosition.expected_positions(): - if position not in officer_data: - _logger.warning( - f"Expected position={position} in response current_executive_team." - ) - elif ( - OfficerPosition.num_active(position) is not None - and len(officer_data[position]) != OfficerPosition.num_active(position) - ): - _logger.warning( - f"Unexpected number of {position} entries " - f"({len(officer_data[position])} entries) in current_executive_team response." - ) - - return officer_data + officer_term_list = (await db_session.scalars(query)).all() + return officer_term_list -async def all_officer_data( - db_session: database.DBSession, - include_private: bool, - view_only_filled_in: bool, -) -> list[OfficerData]: +async def current_officer_positions(db_session: database.DBSession, computing_id: str) -> list[str]: """ - This could be a lot of data, so be careful. - - TODO: optionally paginate data, so it's not so bad. + Returns the list of officer positions a user currently has. [] if not currently an officer. """ - query = sqlalchemy.select(OfficerTerm) - if view_only_filled_in: - query = OfficerTerm.sql_is_filled_in(query) - # Ordered recent first - query = query.order_by(OfficerTerm.start_date.desc()) - officer_terms = (await db_session.scalars(query)).all() - - officer_data_list = [] - for term in officer_terms: - officer_info_query = ( - sqlalchemy - .select(OfficerInfo) - .where(OfficerInfo.computing_id == term.computing_id) - ) - officer_info = await db_session.scalar(officer_info_query) - - officer_data_list += [OfficerData.from_data( - term, - officer_info, - include_private, - utils.is_active_term(term) - )] + officer_term_list = await get_active_officer_terms(db_session, computing_id) + return [term.position for term in officer_term_list] - return officer_data_list +async def get_officer_term_by_id(db_session: database.DBSession, term_id: int) -> OfficerTerm: + officer_term = await db_session.scalar( + sqlalchemy + .select(OfficerTerm) + .where(OfficerTerm.id == term_id) + ) + if officer_term is None: + raise HTTPException(status_code=400, detail=f"Could not find officer_term with id={term_id}") + return officer_term async def create_new_officer_info( db_session: database.DBSession, new_officer_info: OfficerInfo ) -> bool: - """ - Return False if the officer already exists - """ - query = ( + """Return False if the officer already exists & don't do anything.""" + existing_officer_info = await db_session.scalar( sqlalchemy .select(OfficerInfo) .where(OfficerInfo.computing_id == new_officer_info.computing_id) ) - stored_officer_info = await db_session.scalar(query) - if stored_officer_info is not None: + if existing_officer_info is not None: return False db_session.add(new_officer_info) @@ -212,70 +172,67 @@ async def create_new_officer_term( db_session: database.DBSession, new_officer_term: OfficerTerm ): - # TODO: does this check need to be here? - # if new_officer_term.position not in OfficerPosition.position_list(): - # raise HTTPException(status_code=500) - - # when creating a new position, assign a default end date if one exists position_length = OfficerPosition.length_in_semesters(new_officer_term.position) if position_length is not None: + # when creating a new position, assign a default end date if one exists new_officer_term.end_date = semesters.step_semesters( semesters.current_semester_start(new_officer_term.start_date), position_length, ) db_session.add(new_officer_term) -async def update_officer_info(db_session: database.DBSession, new_officer_info: OfficerInfo) -> bool: +async def update_officer_info( + db_session: database.DBSession, + new_officer_info: OfficerInfo +) -> bool: """ Return False if the officer doesn't exist yet """ - query = ( + officer_info = await db_session.scalar( sqlalchemy .select(OfficerInfo) .where(OfficerInfo.computing_id == new_officer_info.computing_id) ) - officer_info = await db_session.scalar(query) if officer_info is None: return False - # TODO: how to detect an entry insert error? For example, what happens if - # we try to set our discord id to be the same as another executive's? - query = ( + # NOTE: if there's ever an insert entry error, it will raise SQLAlchemyError + # see: https://stackoverflow.com/questions/2136739/how-to-check-and-handle-errors-in-sqlalchemy + await db_session.execute( sqlalchemy .update(OfficerInfo) .where(OfficerInfo.computing_id == officer_info.computing_id) .values(new_officer_info.to_update_dict()) ) - await db_session.execute(query) - return True async def update_officer_term( db_session: database.DBSession, new_officer_term: OfficerTerm, -): +) -> bool: """ - Update based on the term id. - + Update all officer term data in `new_officer_term` based on the term id. Returns false if the above entry does not exist. """ - query = ( + officer_term = await db_session.scalar( sqlalchemy .select(OfficerTerm) .where(OfficerTerm.id == new_officer_term.id) ) - officer_term = await db_session.scalar(query) if officer_term is None: return False - query = ( + await db_session.execute( sqlalchemy .update(OfficerTerm) .where(OfficerTerm.id == new_officer_term.id) .values(new_officer_term.to_update_dict()) ) - await db_session.execute(query) return True -def remove_officer_term(): - pass +async def delete_officer_term_by_id(db_session: database.DBSession, term_id: int): + await db_session.execute( + sqlalchemy + .delete(OfficerTerm) + .where(OfficerTerm.id == term_id) + ) diff --git a/src/officers/tables.py b/src/officers/tables.py index a7f3eaf..dcf36c1 100644 --- a/src/officers/tables.py +++ b/src/officers/tables.py @@ -1,15 +1,12 @@ from __future__ import annotations from sqlalchemy import ( - # Boolean, Column, - DateTime, + Date, ForeignKey, Integer, - Select, String, Text, - and_, ) # from sqlalchemy.orm import relationship @@ -28,7 +25,9 @@ class OfficerTerm(Base): __tablename__ = "officer_term" + # TODO (#98): create a unique constraint for (computing_id, position, start_date). id = Column(Integer, primary_key=True, autoincrement=True) + computing_id = Column( String(COMPUTING_ID_LEN), ForeignKey("site_user.computing_id"), @@ -36,10 +35,9 @@ class OfficerTerm(Base): ) position = Column(String(128), nullable=False) - # TODO: replace these with Date, not Datetime - start_date = Column(DateTime, nullable=False) + start_date = Column(Date, nullable=False) # end_date is only not-specified for positions that don't have a length (ie. webmaster) - end_date = Column(DateTime, nullable=True) + end_date = Column(Date, nullable=True) nickname = Column(String(128), nullable=True) favourite_course_0 = Column(String(32), nullable=True) @@ -48,7 +46,7 @@ class OfficerTerm(Base): favourite_pl_0 = Column(String(32), nullable=True) favourite_pl_1 = Column(String(32), nullable=True) biography = Column(Text, nullable=True) - photo_url = Column(Text, nullable=True) # some urls get big, best to let it be a string + photo_url = Column(Text, nullable=True) # some urls get big, best to let it be a string def serializable_dict(self) -> dict: return { @@ -68,7 +66,6 @@ def serializable_dict(self) -> dict: "photo_url": self.photo_url, } - # a record will only be publically visible if sufficient data has been given def is_filled_in(self): return ( # photo & end_date don't have to be uploaded for the term to be "filled" @@ -83,26 +80,9 @@ def is_filled_in(self): and self.biography is not None ) - @staticmethod - def sql_is_filled_in(query: Select) -> Select: - """Should be identical to self.is_filled_in()""" - return query.where( - and_( - OfficerTerm.computing_id is not None, - OfficerTerm.start_date is not None, - OfficerTerm.nickname is not None, - OfficerTerm.favourite_course_0 is not None, - OfficerTerm.favourite_course_1 is not None, - OfficerTerm.favourite_pl_0 is not None, - OfficerTerm.favourite_pl_1 is not None, - OfficerTerm.biography is not None, - ) - ) - def to_update_dict(self) -> dict: return { - # TODO: do we want computing_id to be changeable? - # "computing_id": self.computing_id, + "computing_id": self.computing_id, "position": self.position, "start_date": self.start_date, @@ -128,12 +108,11 @@ class OfficerInfo(Base): primary_key=True, ) - # TODO: we'll need to use SFU's API to get the legal name for users + # TODO (#71): we'll need to use SFU's API to get the legal name for users legal_name = Column(String(128), nullable=False) # some people have long names, you never know phone_number = Column(String(24), nullable=True) - # a null discord id would mean you don't have discord - # TODO: add unique constraints to these (stops users from stealing the username of someone else) + # TODO (#99): add unique constraints to discord_id (stops users from stealing the username of someone else) discord_id = Column(String(DISCORD_ID_LEN), nullable=True) discord_name = Column(String(DISCORD_NAME_LEN), nullable=True) # this is their nickname in the csss server @@ -141,15 +120,13 @@ class OfficerInfo(Base): # Technically 320 is the most common max-size for emails, but we'll use 256 instead, # since it's reasonably large (input validate this too) - # TODO: add unique constraint to this (stops users from stealing the username of someone else) + # TODO (#99): add unique constraint to this (stops users from stealing the username of someone else) google_drive_email = Column(String(256), nullable=True) - # TODO: add unique constraint to this (stops users from stealing the username of someone else) + # TODO (#99): add unique constraint to this (stops users from stealing the username of someone else) github_username = Column(String(GITHUB_USERNAME_LEN), nullable=True) - # NOTE: not sure if we'll need this, depending on implementation - # TODO: get this data on the fly when requested, but rate limit users - # to something like 1/s 100/hour + # TODO (#22): add support for giving executives bitwarden access automagically # has_signed_into_bitwarden = Column(Boolean) def serializable_dict(self) -> dict: @@ -182,7 +159,7 @@ def is_filled_in(self): def to_update_dict(self) -> dict: return { - # TODO: if the API call to SFU's api to get legal name fails, we want to fail & not insert the entry. + # TODO (#71): if the API call to SFU's api to get legal name fails, we want to fail & not insert the entry. # for now, we should insert a default value "legal_name": "default name" if self.legal_name is None else self.legal_name, diff --git a/src/officers/types.py b/src/officers/types.py index 58a1397..87cf19d 100644 --- a/src/officers/types.py +++ b/src/officers/types.py @@ -1,30 +1,45 @@ from __future__ import annotations from dataclasses import asdict, dataclass -from datetime import date, datetime +from datetime import date from fastapi import HTTPException +import github +import utils from constants import COMPUTING_ID_MAX +from discord import discord from officers.constants import OfficerPosition from officers.tables import OfficerInfo, OfficerTerm +@dataclass +class InitialOfficerInfo: + computing_id: str + position: str + start_date: date + + def valid_or_raise(self): + if len(self.computing_id) > COMPUTING_ID_MAX: + raise HTTPException(status_code=400, detail=f"computing_id={self.computing_id} is too large") + elif self.computing_id == "": + raise HTTPException(status_code=400, detail="computing_id cannot be empty") + elif self.position not in OfficerPosition.position_list(): + raise HTTPException(status_code=400, detail=f"invalid position={self.position}") + @dataclass class OfficerInfoUpload: - # TODO: compute this using SFU's API; if unable, use a default value + # TODO (#71): compute this using SFU's API & remove from being uploaded legal_name: str phone_number: None | str = None discord_name: None | str = None github_username: None | str = None google_drive_email: None | str = None - def validate(self) -> None | HTTPException: - if self.legal_name is not None and self.legal_name == "": - return HTTPException(status_code=400, detail="legal name must not be empty") - # TODO: more checks - else: - return None + # TODO (#71): remove this once legal name is computed using SFU's API. + def valid_or_raise(self): + if self.legal_name is None or self.legal_name == "": + raise HTTPException(status_code=400, detail="legal name must not be empty") def to_officer_info(self, computing_id: str, discord_id: str | None, discord_nickname: str | None) -> OfficerInfo: return OfficerInfo( @@ -40,14 +55,67 @@ def to_officer_info(self, computing_id: str, discord_id: str | None, discord_nic google_drive_email = self.google_drive_email, ) + async def validate(self, computing_id: str, old_officer_info: OfficerInfo) -> tuple[list[str], OfficerInfo]: + """ + Validate that the uploaded officer info is correct; if it's not, revert it to old_officer_info. + """ + validation_failures = [] + corrected_officer_info = self.to_officer_info( + computing_id=computing_id, + discord_id=None, + discord_nickname=None, + ) + + if self.phone_number is None or not utils.is_valid_phone_number(self.phone_number): + validation_failures += [f"invalid phone number {self.phone_number}"] + corrected_officer_info.phone_number = old_officer_info.phone_number + + if self.discord_name is None or self.discord_name == "": + corrected_officer_info.discord_name = None + corrected_officer_info.discord_id = None + corrected_officer_info.discord_nickname = None + else: + discord_user_list = await discord.search_username(self.discord_name) + if len(discord_user_list) != 1: + validation_failures += [ + f"unable to find discord user with the name {self.discord_name}" + if len(discord_user_list) == 0 + else f"too many discord users start with {self.discord_name}" + ] + corrected_officer_info.discord_name = old_officer_info.discord_name + corrected_officer_info.discord_id = old_officer_info.discord_id + corrected_officer_info.discord_nickname = old_officer_info.discord_nickname + else: + discord_user = discord_user_list[0] + corrected_officer_info.discord_name = discord_user.username + corrected_officer_info.discord_id = discord_user.id + corrected_officer_info.discord_nickname = discord_user.global_name + + # TODO (#82): validate google-email using google module, by trying to assign the user to a permission or something + if not utils.is_valid_email(self.google_drive_email): + validation_failures += [f"invalid email format {self.google_drive_email}"] + corrected_officer_info.google_drive_email = old_officer_info.google_drive_email + + # validate that github user is real + if await github.internals.get_user_by_username(self.github_username) is None: + validation_failures += [f"invalid github username {self.github_username}"] + corrected_officer_info.github_username = old_officer_info.github_username + + # TODO (#93): add the following to the daily cronjob + # TODO (#97): if github user exists, invite the github user to the org (or can we simply add them directly?) + # -> do so outside this function. Also, detect if the github username is being changed & uninvite the old user + + return validation_failures, corrected_officer_info + @dataclass class OfficerTermUpload: # only admins can change: + computing_id: str position: str start_date: date end_date: None | date = None - # officer should change + # officer should change: nickname: None | str = None favourite_course_0: None | str = None favourite_course_1: None | str = None @@ -55,25 +123,20 @@ class OfficerTermUpload: favourite_pl_1: None | str = None biography: None | str = None - # TODO: we're going to need an API call to upload images - # NOTE: changing the name of this variable without changing all instances is breaking + # TODO (#39): we're going to need an endpoint for uploading images photo_url: None | str = None - def validate(self): - """input validation""" - # NOTE: An officer can change their own data for terms that are ongoing. + def valid_or_raise(self): if self.position not in OfficerPosition.position_list(): raise HTTPException(status_code=400, detail=f"invalid new position={self.position}") elif self.end_date is not None and self.start_date > self.end_date: raise HTTPException(status_code=400, detail="end_date must be after start_date") - def to_officer_term(self, term_id: str, computing_id:str) -> OfficerTerm: - # TODO: many positions have a length; if the length is defined, fill it in right here - # (end date is 1st of month, 12 months after start date's month). + def to_officer_term(self, term_id: str) -> OfficerTerm: return OfficerTerm( id = term_id, - computing_id = computing_id, + computing_id = self.computing_id, position = self.position, start_date = self.start_date, end_date = self.end_date, @@ -102,8 +165,8 @@ class OfficerData: # an officer may have multiple positions, such as FroshWeekChair & DirectorOfEvents position: str - start_date: datetime - end_date: datetime | None + start_date: date + end_date: date | None legal_name: str # some people have long names, you never know nickname: str | None @@ -117,7 +180,7 @@ class OfficerData: csss_email: str | None biography: str | None - photo_url: str | None # some urls get big... + photo_url: str | None private_data: OfficerPrivateData | None @@ -133,7 +196,7 @@ def serializable_dict(self): def from_data( term: OfficerTerm, officer_info: OfficerInfo, - include_private: bool, + include_private_data: bool, is_active: bool, ) -> OfficerData: return OfficerData( @@ -162,5 +225,5 @@ def from_data( phone_number = officer_info.phone_number, github_username = officer_info.github_username, google_drive_email = officer_info.google_drive_email, - ) if include_private else None, + ) if include_private_data else None, ) diff --git a/src/officers/urls.py b/src/officers/urls.py index 085a86a..4be10bb 100755 --- a/src/officers/urls.py +++ b/src/officers/urls.py @@ -1,21 +1,14 @@ import logging -from dataclasses import dataclass -from datetime import date, datetime -import sqlalchemy from fastapi import APIRouter, Body, HTTPException, Request from fastapi.responses import JSONResponse, PlainTextResponse import auth.crud import database -import github import officers.crud import utils -from constants import COMPUTING_ID_MAX -from discord import discord -from officers.constants import OfficerPosition from officers.tables import OfficerInfo, OfficerTerm -from officers.types import OfficerInfoUpload, OfficerTermUpload +from officers.types import InitialOfficerInfo, OfficerInfoUpload, OfficerTermUpload from permission.types import OfficerPrivateInfo, WebsiteAdmin _logger = logging.getLogger(__name__) @@ -25,7 +18,43 @@ tags=["officers"], ) -# TODO: combine the following two endpoints +# ---------------------------------------- # +# checks + +async def has_officer_private_info_access( + request: Request, + db_session: database.DBSession +) -> tuple[None | str, None | str, bool]: + """determine if the user has access to private officer info""" + session_id = request.cookies.get("session_id", None) + if session_id is None: + return None, None, False + + computing_id = await auth.crud.get_computing_id(db_session, session_id) + if computing_id is None: + return session_id, None, False + + has_private_access = await OfficerPrivateInfo.has_permission(db_session, computing_id) + return session_id, computing_id, has_private_access + +async def logged_in_or_raise( + request: Request, + db_session: database.DBSession +) -> tuple[str, str]: + """gets the user's computing_id, or raises an exception if the current request is not logged in""" + session_id = request.cookies.get("session_id", None) + if session_id is None: + raise HTTPException(status_code=401) + + session_computing_id = await auth.crud.get_computing_id(db_session, session_id) + if session_computing_id is None: + raise HTTPException(status_code=401) + + return session_id, session_computing_id + +# ---------------------------------------- # +# endpoints + @router.get( "/current", description="Get information about all the officers. More information is given if you're authenticated & have access to private executive data.", @@ -35,270 +64,173 @@ async def current_officers( request: Request, db_session: database.DBSession, ): - # determine if the user has access to this private data - session_id = request.cookies.get("session_id", None) - if session_id is None: - has_private_access = False - else: - computing_id = await auth.crud.get_computing_id(db_session, session_id) - has_private_access = await OfficerPrivateInfo.has_permission(db_session, computing_id) - - current_executives = await officers.crud.current_executive_team(db_session, has_private_access) - json_current_executives = { + _, _, has_private_access = await has_officer_private_info_access(request, db_session) + current_officers = await officers.crud.current_officers(db_session, has_private_access) + return JSONResponse({ position: [ - officer_data.serializable_dict() for officer_data in officer_data_list - ] for position, officer_data_list in current_executives.items() - } - - return JSONResponse(json_current_executives) + officer_data.serializable_dict() + for officer_data in officer_data_list + ] + for position, officer_data_list in current_officers.items() + }) @router.get( "/all", - description="Information from all exec terms. If year is not included, all years will be returned. If semester is not included, all semesters that year will be returned. If semester is given, but year is not, return all years and all semesters.", + description="Information for all execs from all exec terms" ) async def all_officers( request: Request, db_session: database.DBSession, - view_only_filled_in: bool = True, + # Officer terms for officers which have not yet started their term yet are considered private, + # and may only be accessed by that officer and executives. All other officer terms are public. + include_future_terms: bool = False, ): - async def has_access(db_session: database.DBSession, request: Request) -> bool: - # determine if user has access to this private data - session_id = request.cookies.get("session_id", None) - if session_id is None: - return False - - computing_id = await auth.crud.get_computing_id(db_session, session_id) - if computing_id is None: - return False - else: - has_private_access = await OfficerPrivateInfo.has_permission(db_session, computing_id) - is_website_admin = await WebsiteAdmin.has_permission(db_session, computing_id) - - if not view_only_filled_in and (session_id is None or not is_website_admin): - raise HTTPException(status_code=401, detail="must have private access to view not filled in terms") - - return has_private_access - - has_private_access = await has_access(db_session, request) - - all_officer_data = await officers.crud.all_officer_data(db_session, has_private_access, view_only_filled_in) - all_officer_data = [officer_data.serializable_dict() for officer_data in all_officer_data] - return JSONResponse(all_officer_data) + _, computing_id, has_private_access = await has_officer_private_info_access(request, db_session) + if include_future_terms: + is_website_admin = (computing_id is not None) and (await WebsiteAdmin.has_permission(db_session, computing_id)) + if not is_website_admin: + raise HTTPException(status_code=401, detail="only website admins can view all executive terms that have not started yet") + + all_officers = await officers.crud.all_officers(db_session, has_private_access, include_future_terms) + return JSONResponse([ + officer_data.serializable_dict() + for officer_data in all_officers + ]) @router.get( "/terms/{computing_id}", - description="Get term info for an executive. Private info will be provided if you have permissions.", + description=""" + Get term info for an executive. All term info is public for all past or active terms. + Future terms can only be accessed by website admins. + """, ) async def get_officer_terms( request: Request, db_session: database.DBSession, computing_id: str, - # the maximum number of terms to return, in chronological order - max_terms: int | None = None, - include_inactive: bool = False, + include_future_terms: bool = False ): - # TODO: put these into a function - session_id = request.cookies.get("session_id", None) - if session_id is None: - raise HTTPException(status_code=401) - - # TODO: put these into a function - session_computing_id = await auth.crud.get_computing_id(db_session, session_id) - if session_computing_id is None: - raise HTTPException(status_code=401) - - if ( - computing_id != session_computing_id - and include_inactive - and not await WebsiteAdmin.has_permission(db_session, session_computing_id) - ): - raise HTTPException(status_code=401) + if include_future_terms: + _, session_computing_id = await logged_in_or_raise(request, db_session) + if computing_id != session_computing_id: + await WebsiteAdmin.has_permission_or_raise(db_session, session_computing_id) # all term info is public, so anyone can get any of it officer_terms = await officers.crud.get_officer_terms( db_session, computing_id, - max_terms, - include_inactive=include_inactive + include_future_terms ) - return JSONResponse([term.serializable_dict() for term in officer_terms]) + return JSONResponse([ + term.serializable_dict() for term in officer_terms + ]) @router.get( "/info/{computing_id}", - description="Get officer info for the current user, if they've ever been an exec.", + description="Get officer info for the current user, if they've ever been an exec. Only admins can get info about another user.", ) async def get_officer_info( request: Request, db_session: database.DBSession, computing_id: str, ): - session_id = request.cookies.get("session_id", None) - if session_id is None: - raise HTTPException(status_code=401) - - session_computing_id = await auth.crud.get_computing_id(db_session, session_id) - if session_computing_id is None: - raise HTTPException(status_code=401) - - session_computing_id = await auth.crud.get_computing_id(db_session, session_id) - if ( - computing_id != session_computing_id - and not await WebsiteAdmin.has_permission(db_session, session_computing_id) - ): - # the current user can only input the info for another user if they have permissions - raise HTTPException(status_code=401, detail="must have website admin permissions to get officer info about another user") - - officer_info = await officers.crud.officer_info(db_session, computing_id) - if officer_info is None: - raise HTTPException(status_code=404, detail="user has no officer info") + _, session_computing_id = await logged_in_or_raise(request, db_session) + if computing_id != session_computing_id: + await WebsiteAdmin.has_permission_or_raise( + db_session, session_computing_id, + errmsg="must have website admin permissions to get officer info about another user" + ) + officer_info = await officers.crud.get_officer_info_or_raise(db_session, computing_id) return JSONResponse(officer_info.serializable_dict()) -@dataclass -class InitialOfficerInfo: - computing_id: str - position: str - start_date: date - @router.post( "/term", - description="Only the sysadmin, president, or DoA can submit this request. It will usually be the DoA. Updates the system with a new officer, and enables the user to login to the system to input their information.", + description=""" + Only the sysadmin, president, or DoA can submit this request. It will usually be the DoA. + Updates the system with a new officer, and enables the user to login to the system to input their information. + """, ) async def new_officer_term( request: Request, db_session: database.DBSession, - officer_info_list: list[InitialOfficerInfo] = Body(), # noqa: B008 + officer_info_list: list[InitialOfficerInfo] = Body(), # noqa: B008 ): """ If the current computing_id is not already an officer, officer_info will be created for them. """ for officer_info in officer_info_list: - if len(officer_info.computing_id) > COMPUTING_ID_MAX: - raise HTTPException(status_code=400, detail=f"computing_id={officer_info.computing_id} is too large") - elif officer_info.position not in OfficerPosition.position_list(): - raise HTTPException(status_code=400, detail=f"invalid position={officer_info.position}") + officer_info.valid_or_raise() - WebsiteAdmin.validate_request(db_session, request) + _, session_computing_id = await logged_in_or_raise(request, db_session) + await WebsiteAdmin.has_permission_or_raise(db_session, session_computing_id) for officer_info in officer_info_list: - await officers.crud.create_new_officer_info( - db_session, - # TODO: do I need this object atm? - OfficerInfoUpload( - # TODO: use sfu api to get legal name - legal_name = "default name", - ).to_officer_info(officer_info.computing_id, None, None), - ) - # TODO: update create_new_officer_term to be the same as create_new_officer_info + await officers.crud.create_new_officer_info(db_session, OfficerInfo( + computing_id = officer_info.computing_id, + # TODO (#71): use sfu api to get legal name from officer_info.computing_id + legal_name = "default name", + phone_number = None, + + discord_id = None, + discord_name = None, + discord_nickname = None, + + google_drive_email = None, + github_username = None, + )) await officers.crud.create_new_officer_term(db_session, OfficerTerm( computing_id = officer_info.computing_id, position = officer_info.position, - # TODO: remove the hours & seconds (etc.) from start_date start_date = officer_info.start_date, )) await db_session.commit() - return PlainTextResponse("ok") + return PlainTextResponse("success") @router.patch( "/info/{computing_id}", - description=( - "After elections, officer computing ids are input into our system. " - "If you have been elected as a new officer, you may authenticate with SFU CAS, " - "then input your information & the valid token for us. Admins may update this info." - ), + description=""" + After elections, officer computing ids are input into our system. + If you have been elected as a new officer, you may authenticate with SFU CAS, + then input your information & the valid token for us. Admins may update this info. + """ ) async def update_info( request: Request, db_session: database.DBSession, computing_id: str, - officer_info_upload: OfficerInfoUpload = Body(), # noqa: B008 + officer_info_upload: OfficerInfoUpload = Body() # noqa: B008 ): - http_exception = officer_info_upload.validate() - if http_exception is not None: - raise http_exception - - session_id = request.cookies.get("session_id", None) - if session_id is None: - raise HTTPException(status_code=401, detail="must be logged in") + officer_info_upload.valid_or_raise() + _, session_computing_id = await logged_in_or_raise(request, db_session) - session_computing_id = await auth.crud.get_computing_id(db_session, session_id) - if ( - computing_id != session_computing_id - and not await WebsiteAdmin.has_permission(db_session, session_computing_id) - ): - # the current user can only input the info for another user if they have permissions - raise HTTPException(status_code=401, detail="must have website admin permissions to update another user") + if computing_id != session_computing_id: + await WebsiteAdmin.has_permission_or_raise( + db_session, session_computing_id, + errmsg="must have website admin permissions to update another user" + ) - # TODO: log all important changes just to a .log file + old_officer_info = await officers.crud.get_officer_info_or_raise(db_session, computing_id) + validation_failures, corrected_officer_info = await officer_info_upload.validate(computing_id, old_officer_info) - old_officer_info = await officers.crud.officer_info(db_session, computing_id) - new_officer_info = officer_info_upload.to_officer_info( - computing_id=computing_id, - discord_id=None, - discord_nickname=None, - ) + # TODO (#27): log all important changes just to a .log file & persist them for a few years - # TODO: turn this into a function - validation_failures = [] - - if not utils.is_valid_phone_number(officer_info_upload.phone_number): - validation_failures += [f"invalid phone number {officer_info_upload.phone_number}"] - new_officer_info.phone_number = old_officer_info.phone_number - - if officer_info_upload.discord_name is None or officer_info_upload.discord_name == "": - new_officer_info.discord_name = None - new_officer_info.discord_id = None - new_officer_info.discord_nickname = None - else: - discord_user_list = await discord.search_username(officer_info_upload.discord_name) - if discord_user_list == []: - validation_failures += [f"unable to find discord user with the name {officer_info_upload.discord_name}"] - new_officer_info.discord_name = old_officer_info.discord_name - new_officer_info.discord_id = old_officer_info.discord_id - new_officer_info.discord_nickname = old_officer_info.discord_nickname - elif len(discord_user_list) > 1: - validation_failures += [f"too many discord users start with {officer_info_upload.discord_name}"] - new_officer_info.discord_name = old_officer_info.discord_name - new_officer_info.discord_id = old_officer_info.discord_id - new_officer_info.discord_nickname = old_officer_info.discord_nickname - else: - discord_user = discord_user_list[0] - new_officer_info.discord_name = discord_user.username - new_officer_info.discord_id = discord_user.id - new_officer_info.discord_nickname = ( - discord_user.global_name - if discord_user.global_name is not None - else discord_user.username - ) - - # TODO: validate google-email using google module, by trying to assign the user to a permission or something - if not utils.is_valid_email(officer_info_upload.google_drive_email): - validation_failures += [f"invalid email format {officer_info_upload.google_drive_email}"] - new_officer_info.google_drive_email = old_officer_info.google_drive_email - - # validate github user is real - if await github.internals.get_user_by_username(officer_info_upload.github_username) is None: - validation_failures += [f"invalid github username {officer_info_upload.github_username}"] - new_officer_info.github_username = old_officer_info.github_username - - # TODO: invite github user - # TODO: detect if changing github username & uninvite old user - - success = await officers.crud.update_officer_info(db_session, new_officer_info) + success = await officers.crud.update_officer_info(db_session, corrected_officer_info) if not success: raise HTTPException(status_code=400, detail="officer_info does not exist yet, please create the officer info entry first") await db_session.commit() - updated_officer_info = await officers.crud.officer_info(db_session, computing_id) + updated_officer_info = await officers.crud.get_officer_info_or_raise(db_session, computing_id) return JSONResponse({ - "updated_officer_info": updated_officer_info.serializable_dict(), + "officer_info": updated_officer_info.serializable_dict(), "validation_failures": validation_failures, }) @router.patch( "/term/{term_id}", + description="" ) async def update_term( request: Request, @@ -306,61 +238,75 @@ async def update_term( term_id: int, officer_term_upload: OfficerTermUpload = Body(), # noqa: B008 ): - officer_term_upload.validate() - - # Refactor all of these gets & raises into small functions - session_id = request.cookies.get("session_id", None) - if session_id is None: - raise HTTPException(status_code=401, detail="must be logged in") - - session_computing_id = await auth.crud.get_computing_id(db_session, session_id) - if session_computing_id is None: - raise HTTPException(status_code=401) - - old_officer_term = await officers.crud.officer_term(db_session, term_id) - - if ( - old_officer_term.computing_id != session_computing_id - and not await WebsiteAdmin.has_permission(db_session, session_computing_id) - ): - # the current user can only input the info for another user if they have permissions - raise HTTPException(status_code=401, detail="must have website admin permissions to update another user") + """ + A website admin may change the position & term length however they wish. + """ + officer_term_upload.valid_or_raise() + _, session_computing_id = await logged_in_or_raise(request, db_session) + + old_officer_term = await officers.crud.get_officer_term_by_id(db_session, term_id) + if old_officer_term.computing_id != session_computing_id: + await WebsiteAdmin.has_permission_or_raise( + db_session, session_computing_id, + errmsg="must have website admin permissions to update another user" + ) + elif utils.is_past_term(old_officer_term): + await WebsiteAdmin.has_permission_or_raise( + db_session, session_computing_id, + errmsg="only website admins can update past terms" + ) if ( - utils.is_past_term(old_officer_term) - and not await WebsiteAdmin.has_permission(db_session, session_computing_id) + officer_term_upload.computing_id != old_officer_term.computing_id + or officer_term_upload.position != old_officer_term.position + or officer_term_upload.start_date != old_officer_term.start_date + or officer_term_upload.end_date != old_officer_term.end_date ): - raise HTTPException(status_code=401, detail="only website admins can update past terms") - - # NOTE: Only admins can write new versions of position, start_date, and end_date. - if ( - officer_term_upload.position != old_officer_term.position - or officer_term_upload.start_date != old_officer_term.start_date.date() - or officer_term_upload.end_date != old_officer_term.end_date.date() - ) and not await WebsiteAdmin.has_permission(db_session, session_computing_id): - raise HTTPException(status_code=401, detail="Non-admins cannot modify position, start_date, or end_date.") + await WebsiteAdmin.has_permission_or_raise( + db_session, session_computing_id, + errmsg="only admins can write new versions of position, start_date, and end_date" + ) - # TODO: log all important changes to a .log file + # TODO (#27): log all important changes to a .log file success = await officers.crud.update_officer_term( db_session, - officer_term_upload.to_officer_term(term_id, old_officer_term.computing_id) + officer_term_upload.to_officer_term(term_id) ) if not success: raise HTTPException(status_code=400, detail="the associated officer_term does not exist yet, please create it first") await db_session.commit() - new_officer_term = await officers.crud.officer_term(db_session, term_id) + new_officer_term = await officers.crud.get_officer_term_by_id(db_session, term_id) return JSONResponse({ - "updated_officer_term": new_officer_term.serializable_dict(), - "validation_failures": [], # none for now, but may be important later + "officer_term": new_officer_term.serializable_dict(), + # none for now, but may be added if frontend requests + "validation_failures": [], }) -""" -@router.post( - "/remove", - description="Only the sysadmin, president, or DoA can submit this request. It will usually be the DoA. Removes the officer from the system entirely. BE CAREFUL WITH THIS OPTION aaaaaaaaaaaaaaaaaa.", +@router.delete( + "/term/{term_id}", + description="Remove the specified officer term. Only website admins can run this endpoint. BE CAREFUL WITH THIS!", ) -async def remove_officer(): - return {} -""" +async def remove_officer( + request: Request, + db_session: database.DBSession, + term_id: int, +): + _, session_computing_id = await logged_in_or_raise(request, db_session) + await WebsiteAdmin.has_permission_or_raise( + db_session, session_computing_id, + errmsg="must have website admin permissions to remove a term" + ) + + deleted_officer_term = await officers.crud.get_officer_term_by_id(db_session, term_id) + + # TODO (#27): log all important changes to a .log file + + # TODO (#100): return whether the deletion succeeded or not + await officers.crud.delete_officer_term_by_id(db_session, term_id) + await db_session.commit() + + return JSONResponse({ + "officer_term": deleted_officer_term.serializable_dict(), + }) diff --git a/src/permission/types.py b/src/permission/types.py index 859cd06..659ed32 100644 --- a/src/permission/types.py +++ b/src/permission/types.py @@ -1,12 +1,12 @@ -from datetime import UTC, datetime, timezone +from datetime import date from typing import ClassVar -from fastapi import HTTPException, Request +from fastapi import HTTPException -import auth.crud import database import officers.crud -from data.semesters import current_semester_start, step_semesters +import utils +from data.semesters import step_semesters from officers.constants import OfficerPosition @@ -18,19 +18,16 @@ async def has_permission(db_session: database.DBSession, computing_id: str) -> b A semester is defined in semester_start """ - term = await officers.crud.most_recent_officer_term(db_session, computing_id) - if term is None: - return False - elif term.end_date is None: - # considered an active exec if no end_date - return True + term_list = await officers.crud.get_officer_terms(db_session, computing_id, include_future_terms=False) + for term in term_list: + if utils.is_active_term(term): + return True - current_date = datetime.now(UTC) - semester_start = current_semester_start(current_date) - NUM_SEMESTERS = 5 - cutoff_date = step_semesters(semester_start, -NUM_SEMESTERS) + NUM_SEMESTERS = 5 + if date.today() <= step_semesters(term.end_date, NUM_SEMESTERS): + return True - return term.end_date > cutoff_date + return False class WebsiteAdmin: WEBSITE_ADMIN_POSITIONS: ClassVar[list[OfficerPosition]] = [ @@ -52,15 +49,10 @@ async def has_permission(db_session: database.DBSession, computing_id: str) -> b return False @staticmethod - async def validate_request(db_session: database.DBSession, request: Request) -> bool: - """ - Checks if the provided request satisfies these permissions, and raises the neccessary - exceptions if not - """ - session_id = request.cookies.get("session_id", None) - if session_id is None: - raise HTTPException(status_code=401, detail="must be logged in") - else: - computing_id = await auth.crud.get_computing_id(db_session, session_id) - if not await WebsiteAdmin.has_permission(db_session, computing_id): - raise HTTPException(status_code=401, detail="must have website admin permissions") + async def has_permission_or_raise( + db_session: database.DBSession, + computing_id: str, + errmsg:str = "must have website admin permissions" + ) -> bool: + if not await WebsiteAdmin.has_permission(db_session, computing_id): + raise HTTPException(status_code=401, detail=errmsg) diff --git a/src/tests/urls.py b/src/tests/urls.py deleted file mode 100644 index b4ea3d8..0000000 --- a/src/tests/urls.py +++ /dev/null @@ -1,20 +0,0 @@ -from pathlib import Path - -from fastapi import APIRouter -from fastapi.responses import HTMLResponse - -# ----------------------- # - -router = APIRouter(prefix="/tests") - - -@router.get("/{test_name}", description="For use in testing the backend api.") -async def get_test(test_name: str): - test_path = Path(f"../tests/{test_name}") - test_dir_path = Path("../tests/") - - if (test_dir_path in test_path.parents) and test_path.is_file(): - with Path.open(test_path) as file: - return HTMLResponse(file.read()) - else: - return HTMLResponse(f"invalid test_name {test_name}") diff --git a/src/utils.py b/src/utils.py index abfe4a5..acf5ad0 100644 --- a/src/utils.py +++ b/src/utils.py @@ -1,12 +1,12 @@ import re -from datetime import datetime +from datetime import date, datetime from sqlalchemy import Select # we can't use and/or in sql expressions, so we must use these functions from sqlalchemy.sql.expression import and_, or_ -from officers.tables import OfficerInfo, OfficerTerm +from officers.tables import OfficerTerm def is_iso_format(date_str: str) -> bool: @@ -17,29 +17,37 @@ def is_iso_format(date_str: str) -> bool: return False def is_active_officer(query: Select) -> Select: - query = query.where( + """ + An active officer is one who is currently part of the CSSS officer team. + That is, they are not upcoming, or in the past. + """ + return query.where( and_( # cannot be an officer who has not started yet - OfficerTerm.start_date <= datetime.today(), + OfficerTerm.start_date <= date.today(), or_( # executives without a specified end_date are considered active OfficerTerm.end_date.is_(None), # check that today's timestamp is before (smaller than) the term's end date - datetime.today() <= OfficerTerm.end_date, + date.today() <= OfficerTerm.end_date, ) ) ) - return OfficerTerm.sql_is_filled_in(query) + +def has_started_term(query: Select) -> bool: + return query.where( + OfficerTerm.start_date <= date.today() + ) def is_active_term(term: OfficerTerm) -> bool: return ( # cannot be an officer who has not started yet - term.start_date <= datetime.today() + term.start_date <= date.today() and ( # executives without a specified end_date are considered active term.end_date is None # check that today's timestamp is before (smaller than) the term's end date - or datetime.today() <= term.end_date + or date.today() <= term.end_date ) ) @@ -49,7 +57,7 @@ def is_past_term(term: OfficerTerm) -> bool: # an officer with no end date is current term.end_date is not None # if today is past the end date, it's a past term - and datetime.today() > term.end_date + and date.today() > term.end_date ) def is_valid_phone_number(phone_number: str) -> bool: diff --git a/tests/integration/test_officers.py b/tests/integration/test_officers.py index daa7878..fffb365 100644 --- a/tests/integration/test_officers.py +++ b/tests/integration/test_officers.py @@ -1,15 +1,29 @@ -import asyncio +import asyncio # NOTE: don't comment this out; it's required import pytest +from httpx import ASGITransport, AsyncClient import load_test_db +from auth.crud import create_user_session from database import SQLALCHEMY_TEST_DATABASE_URL, DatabaseSessionManager +from main import app from officers.constants import OfficerPosition -from officers.crud import all_officer_data, current_executive_team, most_recent_officer_term +from officers.crud import all_officers, current_officers, get_active_officer_terms # TODO: setup a database on the CI machine & run this as a unit test then (since # this isn't really an integration test) +@pytest.fixture(scope="session") +def anyio_backend(): + return "asyncio" + +@pytest.fixture(scope="session") +async def client(): + # base_url is just a random placeholder url + # ASGITransport is just telling the async client to pass all requests to app + async with AsyncClient(transport=ASGITransport(app), base_url="http://test") as client: + yield client + # run this again for every function @pytest.fixture(scope="function") async def database_setup(): @@ -17,49 +31,139 @@ async def database_setup(): print("Resetting DB...") sessionmanager = DatabaseSessionManager(SQLALCHEMY_TEST_DATABASE_URL, {"echo": False}, check_db=False) await DatabaseSessionManager.test_connection(SQLALCHEMY_TEST_DATABASE_URL) + # this resets the contents of the database to be whatever is from `load_test_db.py` await load_test_db.async_main(sessionmanager) print("Done setting up!") return sessionmanager +# TODO: switch to mark.anyio @pytest.mark.asyncio async def test__read_execs(database_setup): sessionmanager = await database_setup async with sessionmanager.session() as db_session: # test that reads from the database succeeded as expected - assert (await most_recent_officer_term(db_session, "blarg")) is None - assert await most_recent_officer_term(db_session, "abc22") is None - abc11_officer_term = await most_recent_officer_term(db_session, "abc11") - - assert abc11_officer_term.computing_id == "abc11" - assert abc11_officer_term.position == OfficerPosition.EXECUTIVE_AT_LARGE - assert abc11_officer_term.start_date is not None - assert abc11_officer_term.end_date is None - assert abc11_officer_term.nickname == "the holy A" - assert abc11_officer_term.favourite_course_0 == "CMPT 361" - assert abc11_officer_term.biography == "Hi! I'm person A and I want school to be over ; _ ;" - - current_exec_team = await current_executive_team(db_session, include_private=False) + assert (await get_active_officer_terms(db_session, "blarg")) == [] + assert (await get_active_officer_terms(db_session, "abc22")) != [] + + abc11_officer_terms = await get_active_officer_terms(db_session, "abc11") + assert len(abc11_officer_terms) == 1 + assert abc11_officer_terms[0].computing_id == "abc11" + assert abc11_officer_terms[0].position == OfficerPosition.EXECUTIVE_AT_LARGE + assert abc11_officer_terms[0].start_date is not None + assert abc11_officer_terms[0].nickname == "the holy A" + assert abc11_officer_terms[0].favourite_course_0 == "CMPT 361" + assert abc11_officer_terms[0].biography == "Hi! I'm person A and I want school to be over ; _ ;" + + current_exec_team = await current_officers(db_session, include_private=False) assert current_exec_team is not None - assert len(current_exec_team.keys()) == 1 - assert next(iter(current_exec_team.keys())) == OfficerPosition.PRESIDENT - assert next(iter(current_exec_team.values()))[0].favourite_course_0 == "CMPT 999" - assert next(iter(current_exec_team.values()))[0].csss_email == OfficerPosition.President.to_email() + assert len(current_exec_team.keys()) == 4 + assert next(iter(current_exec_team.keys())) == OfficerPosition.EXECUTIVE_AT_LARGE + assert next(iter(current_exec_team.values()))[0].favourite_course_0 == "CMPT 361" + assert next(iter(current_exec_team.values()))[0].csss_email == OfficerPosition.to_email(OfficerPosition.EXECUTIVE_AT_LARGE) assert next(iter(current_exec_team.values()))[0].private_data is None - current_exec_team = await current_executive_team(db_session, include_private=True) + current_exec_team = await current_officers(db_session, include_private=True) assert current_exec_team is not None - assert len(current_exec_team) == 1 - assert next(iter(current_exec_team.keys())) == OfficerPosition.PRESIDENT - assert next(iter(current_exec_team.values()))[0].favourite_course_0 == "CMPT 999" - assert next(iter(current_exec_team.values()))[0].csss_email == OfficerPosition.President.to_email() + assert len(current_exec_team) == 4 + assert next(iter(current_exec_team.keys())) == OfficerPosition.EXECUTIVE_AT_LARGE + assert next(iter(current_exec_team.values()))[0].favourite_course_0 == "CMPT 361" + assert next(iter(current_exec_team.values()))[0].csss_email == OfficerPosition.to_email(OfficerPosition.EXECUTIVE_AT_LARGE) assert next(iter(current_exec_team.values()))[0].private_data is not None - assert next(iter(current_exec_team.values()))[0].private_data.computing_id == "abc33" + assert next(iter(current_exec_team.values()))[0].private_data.computing_id == "abc11" - all_terms = await all_officer_data(db_session, include_private=True) - assert len(all_terms) == 3 + all_terms = await all_officers(db_session, include_private_data=True, include_future_terms=False) + assert len(all_terms) == 6 + all_terms = await all_officers(db_session, include_private_data=True, include_future_terms=True) + assert len(all_terms) == 7 #async def test__update_execs(database_setup): # # TODO: the second time an update_officer_info call occurs, the user should be updated with info # pass + +@pytest.mark.anyio +async def test__endpoints(client, database_setup): + # `database_setup` resets & loads the test database + + response = await client.get("/officers/current") + assert response.status_code == 200 + assert response.json() != {} + assert len(response.json().values()) == 4 + assert not response.json()["executive at large"][0]["private_data"] + + response = await client.get("/officers/all?include_future_terms=false") + assert response.status_code == 200 + assert response.json() != [] + assert len(response.json()) == 6 + assert response.json()[0]["private_data"] is None + + response = await client.get("/officers/all?include_future_terms=true") + assert response.status_code == 401 + + response = await client.get(f"/officers/terms/{load_test_db.SYSADMIN_COMPUTING_ID}?include_future_terms=false") + assert response.status_code == 200 + assert response.json() != [] + assert len(response.json()) == 2 + assert response.json()[0]["nickname"] == "G2" + assert response.json()[1]["nickname"] == "G1" + + response = await client.get("/officers/terms/balargho?include_future_terms=false") + assert response.status_code == 200 + assert response.json() == [] + + response = await client.get("/officers/terms/abc11?include_future_terms=true") + assert response.status_code == 401 + + response = await client.get("/officers/info/abc11") + assert response.status_code == 401 + response = await client.get(f"/officers/info/{load_test_db.SYSADMIN_COMPUTING_ID}") + assert response.status_code == 401 + + # TODO: add tests for the POST & PATCH commands + # TODO: ensure that the database is being reset every time! + +@pytest.mark.anyio +async def test__endpoints_admin(client, database_setup): + # login as website admin + session_id = "temp_id_" + load_test_db.SYSADMIN_COMPUTING_ID + async with database_setup.session() as db_session: + await create_user_session(db_session, session_id, load_test_db.SYSADMIN_COMPUTING_ID) + + client.cookies = { "session_id": session_id } + + # test that more info is given if logged in & with access to it + response = await client.get("/officers/current") + assert response.status_code == 200 + assert response.json() != {} + assert len(response.json().values()) == 4 + assert response.json()["executive at large"][0]["private_data"] + + response = await client.get("/officers/all?include_future_terms=true") + assert response.status_code == 200 + assert response.json() != [] + print(len(response.json())) + assert len(response.json()) == 7 + assert response.json()[1]["private_data"]["phone_number"] == "1234567890" + + response = await client.get(f"/officers/terms/{load_test_db.SYSADMIN_COMPUTING_ID}?include_future_terms=false") + assert response.status_code == 200 + assert response.json() != [] + assert len(response.json()) == 2 + + response = await client.get(f"/officers/terms/{load_test_db.SYSADMIN_COMPUTING_ID}?include_future_terms=true") + assert response.status_code == 200 + assert response.json() != [] + assert len(response.json()) == 3 + + response = await client.get("/officers/info/abc11") + assert response.status_code == 200 + assert response.json() != {} + assert response.json()["legal_name"] == "Person A" + response = await client.get(f"/officers/info/{load_test_db.SYSADMIN_COMPUTING_ID}") + assert response.status_code == 200 + assert response.json() != {} + response = await client.get("/officers/info/balargho") + assert response.status_code == 404 + + # TODO: ensure that all endpoints are tested at least once