Skip to content

Commit e316625

Browse files
committed
[DOP-25470] Enable authentication using personal tokens
1 parent f647a5b commit e316625

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+1422
-383
lines changed

data_rentgen/consumer/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ async def security_lifespan(context: ContextRepo):
7474
await settings.kafka.security.destroy()
7575
tg.cancel_scope.cancel()
7676
except ExceptionGroup as e:
77-
for exception in e.exceptions:
77+
for exception in e.exceptions: # type: ignore[attr-defined]
7878
raise exception from None
7979

8080
return FastStream(

data_rentgen/consumer/settings/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,10 @@ class ConsumerApplicationSettings(BaseSettings):
3939
DATA_RENTGEN__LOGGING__PRESET=json
4040
""" # noqa: E501
4141

42-
database: DatabaseSettings = Field(description=":ref:`Database settings <configuration-database>`")
42+
database: DatabaseSettings = Field(
43+
default_factory=DatabaseSettings, # type: ignore[arg-type]
44+
description=":ref:`Database settings <configuration-database>`",
45+
)
4346
logging: LoggingSettings = Field(
4447
default_factory=LoggingSettings,
4548
description=":ref:`Logging settings <configuration-consumer-logging>`",

data_rentgen/http2kafka/__init__.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,16 @@
99
from faststream._compat import ExceptionGroup
1010
from faststream.kafka import KafkaBroker
1111
from faststream.kafka.publisher.asyncapi import AsyncAPIDefaultPublisher
12+
from sqlalchemy.ext.asyncio import AsyncSession
1213

1314
import data_rentgen
15+
from data_rentgen.db.factory import session_generator
1416
from data_rentgen.http2kafka.router import router as openlineage_router
1517
from data_rentgen.http2kafka.settings import Http2KafkaApplicationSettings
1618
from data_rentgen.logging.setup_logging import setup_logging
1719
from data_rentgen.server.api.handlers import apply_exception_handlers
1820
from data_rentgen.server.middlewares import apply_middlewares
21+
from data_rentgen.server.providers.auth.personal_token_provider import PersonalTokenAuthProvider
1922

2023
logger = logging.getLogger(__name__)
2124

@@ -34,7 +37,7 @@ async def lifespan(app: FastAPI):
3437
await settings.kafka.security.destroy()
3538
tg.cancel_scope.cancel()
3639
except ExceptionGroup as e:
37-
for exception in e.exceptions:
40+
for exception in e.exceptions: # type: ignore[attr-defined]
3841
raise exception from None
3942

4043

@@ -67,6 +70,8 @@ def application_factory(settings: Http2KafkaApplicationSettings) -> FastAPI:
6770
application.state.settings = settings
6871
application.include_router(openlineage_router)
6972

73+
PersonalTokenAuthProvider.setup(application)
74+
7075
# Reusing Server source code
7176
apply_exception_handlers(application)
7277
apply_middlewares(application, settings.server)
@@ -86,6 +91,7 @@ async def get_publisher():
8691
{
8792
Http2KafkaApplicationSettings: get_settings,
8893
AsyncAPIDefaultPublisher: get_publisher,
94+
AsyncSession: session_generator(settings.database), # type: ignore[dict-item]
8995
},
9096
)
9197
return application

data_rentgen/http2kafka/router/__init__.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,22 @@
88
from fastapi import APIRouter, Body, Depends, Request, Response
99
from faststream.kafka.publisher.asyncapi import AsyncAPIDefaultPublisher
1010

11+
from data_rentgen.db.models.user import User
1112
from data_rentgen.dependencies.stub import Stub
1213
from data_rentgen.http2kafka.router.gzip_route import SupportsGzipRoute
1314
from data_rentgen.openlineage.run_event import OpenLineageRunEvent
1415
from data_rentgen.openlineage.run_facets import OpenLineageParentRunFacet
1516
from data_rentgen.server.errors import get_error_responses
1617
from data_rentgen.server.errors.schemas import InvalidRequestSchema
18+
from data_rentgen.server.errors.schemas.not_authorized import NotAuthorizedSchema
19+
from data_rentgen.server.services.get_user import PersonalTokenPolicy, get_user
1720

1821
logger = logging.getLogger(__name__)
1922

2023
router = APIRouter(
2124
prefix="/v1/openlineage",
2225
tags=["OpenLineage"],
23-
responses=get_error_responses(include={InvalidRequestSchema}),
26+
responses=get_error_responses(include={InvalidRequestSchema, NotAuthorizedSchema}),
2427
route_class=SupportsGzipRoute,
2528
)
2629

@@ -35,6 +38,7 @@ async def send_events_to_kafka(
3538
event: Annotated[OpenLineageRunEvent, Body()],
3639
request: Request,
3740
kafka_publisher: Annotated[AsyncAPIDefaultPublisher, Depends(Stub(AsyncAPIDefaultPublisher))],
41+
current_user: Annotated[User, Depends(get_user(personal_token_policy=PersonalTokenPolicy.REQUIRE))],
3842
):
3943
body_json_bytes = await request.body()
4044
logger.debug("Got 1 message (%dKiB)", len(body_json_bytes) / 1024)
@@ -45,6 +49,7 @@ async def send_events_to_kafka(
4549
correlation_id=correlation_id.get(),
4650
headers={
4751
"content-type": "application/json",
52+
"reported-by-user-name": current_user.name,
4853
},
4954
# add event to message accumulator. messages are send in a background task.
5055
# do not wait until the message is actually send

data_rentgen/http2kafka/settings.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66

77
from data_rentgen.consumer.settings.kafka import KafkaSettings
88
from data_rentgen.consumer.settings.producer import ProducerSettings
9+
from data_rentgen.db.settings import DatabaseSettings
910
from data_rentgen.logging.settings import LoggingSettings
11+
from data_rentgen.server.settings.auth import AuthSettings
1012
from data_rentgen.server.settings.server import ServerSettings
1113

1214

@@ -41,6 +43,14 @@ class Http2KafkaApplicationSettings(BaseSettings):
4143
DATA_RENTGEN__PRODUCER__MAIN_TOPIC="input.runs"
4244
""" # noqa: E501
4345

46+
auth: AuthSettings = Field(
47+
default_factory=AuthSettings,
48+
description=":ref:`Authentication settings <configuration-server-authentication>`",
49+
)
50+
database: DatabaseSettings = Field(
51+
default_factory=DatabaseSettings, # type: ignore[arg-type]
52+
description=":ref:`Database settings <configuration-database>`",
53+
)
4454
logging: LoggingSettings = Field(
4555
default_factory=LoggingSettings,
4656
description=":ref:`Logging settings <configuration-server-logging>`",

data_rentgen/server/__init__.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from data_rentgen.server.api.handlers import apply_exception_handlers
1313
from data_rentgen.server.api.router import api_router
1414
from data_rentgen.server.middlewares import apply_middlewares
15+
from data_rentgen.server.providers.auth.personal_token_provider import PersonalTokenAuthProvider
1516
from data_rentgen.server.settings import ServerApplicationSettings
1617

1718
if TYPE_CHECKING:
@@ -36,11 +37,16 @@ def application_factory(settings: ServerApplicationSettings) -> FastAPI:
3637
apply_exception_handlers(application)
3738
auth_class: type[AuthProvider] = settings.auth.provider # type: ignore[assignment]
3839
auth_class.setup(application)
40+
41+
PersonalTokenAuthProvider.setup(application)
3942
apply_middlewares(application, settings.server)
4043

44+
async def get_settings():
45+
return settings
46+
4147
application.dependency_overrides.update(
4248
{
43-
ServerApplicationSettings: lambda: settings,
49+
ServerApplicationSettings: get_settings,
4450
AsyncSession: session_generator(settings.database), # type: ignore[dict-item]
4551
},
4652
)

data_rentgen/server/api/handlers.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,15 +95,14 @@ def application_exception_handler(request: Request, exc: ApplicationError) -> Re
9595

9696

9797
def not_authorized_redirect_exception_handler(request: Request, exc: RedirectError) -> Response:
98-
logger.debug("Redirect user to keycloak")
9998
response = get_response_for_exception(RedirectError)
10099
if not response:
101100
return unknown_exception_handler(request, exc)
101+
102102
content = response.schema( # type: ignore[call-arg]
103103
message=exc.message,
104104
details=exc.details,
105105
)
106-
107106
return exception_json_response(
108107
status=response.status,
109108
content=content,

data_rentgen/server/api/v1/router/auth.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
KeycloakAuthProvider,
2323
)
2424
from data_rentgen.server.schemas.v1.auth import AuthTokenSchema
25-
from data_rentgen.server.services import get_user
25+
from data_rentgen.server.services import PersonalTokenPolicy, get_user
2626

2727
router = APIRouter(
2828
prefix="/auth",
@@ -35,8 +35,10 @@
3535
summary="Get auth token",
3636
responses=get_error_responses(
3737
include={
38-
InvalidRequestSchema,
38+
NotAuthorizedSchema,
3939
NotAuthorizedRedirectSchema,
40+
InvalidRequestSchema,
41+
NotImplementedErrorSchema,
4042
},
4143
),
4244
)
@@ -56,6 +58,7 @@ async def token(
5658
summary="Handle redirect callback from OAuth2 provider",
5759
responses=get_error_responses(
5860
include={
61+
NotAuthorizedSchema,
5962
InvalidRequestSchema,
6063
NotImplementedErrorSchema,
6164
},
@@ -90,7 +93,7 @@ async def auth_callback(
9093
)
9194
async def logout(
9295
request: Request,
93-
current_user: Annotated[User, Depends(get_user())],
96+
current_user: Annotated[User, Depends(get_user(personal_token_policy=PersonalTokenPolicy.DENY))],
9497
auth_provider: Annotated[KeycloakAuthProvider, Depends(Stub(AuthProvider))],
9598
):
9699
refresh_token = request.session.get("refresh_token", None)

data_rentgen/server/api/v1/router/personal_token.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
from data_rentgen.db.models import User # noqa: TC001
1515
from data_rentgen.server.errors import get_error_responses
16+
from data_rentgen.server.providers.auth.personal_token_provider import PersonalTokenAuthProvider # noqa: TC001
1617
from data_rentgen.server.schemas.v1 import (
1718
PageResponseV1,
1819
PersonalTokenCreatedDetailedResponseV1,
@@ -23,7 +24,7 @@
2324
PersonalTokenResponseV1,
2425
PersonalTokenScopeV1,
2526
)
26-
from data_rentgen.server.services import PersonalTokenService, get_user
27+
from data_rentgen.server.services import PersonalTokenPolicy, PersonalTokenService, get_user
2728

2829
router = APIRouter(
2930
prefix="/personal-tokens",
@@ -50,8 +51,9 @@ async def get_personal_tokens(
5051
@router.post("")
5152
async def create_personal_token(
5253
token_params: PersonalTokenCreateRequestV1,
53-
current_user: Annotated[User, Depends(get_user())],
54+
current_user: Annotated[User, Depends(get_user(personal_token_policy=PersonalTokenPolicy.DENY))],
5455
user_token_service: Annotated[PersonalTokenService, Depends()],
56+
personal_token_auth_provider: Annotated[PersonalTokenAuthProvider, Depends()],
5557
) -> PersonalTokenCreatedDetailedResponseV1:
5658
async with user_token_service:
5759
token = await user_token_service.create(
@@ -72,16 +74,17 @@ async def create_personal_token(
7274
since=token.since,
7375
until=token.until,
7476
),
75-
content="TODO",
77+
content=personal_token_auth_provider.generate_jwt(user=current_user, token=token),
7678
)
7779

7880

7981
@router.patch("/{token_id}")
8082
async def reset_personal_token(
8183
token_id: UUID,
8284
new_token_params: PersonalTokenResetRequestV1,
83-
current_user: Annotated[User, Depends(get_user())],
85+
current_user: Annotated[User, Depends(get_user(personal_token_policy=PersonalTokenPolicy.DENY))],
8486
user_token_service: Annotated[PersonalTokenService, Depends()],
87+
personal_token_auth_provider: Annotated[PersonalTokenAuthProvider, Depends()],
8588
) -> PersonalTokenCreatedDetailedResponseV1:
8689
async with user_token_service:
8790
old_token = await user_token_service.revoke(current_user, token_id)
@@ -102,14 +105,14 @@ async def reset_personal_token(
102105
since=new_token.since,
103106
until=new_token.until,
104107
),
105-
content="TODO",
108+
content=personal_token_auth_provider.generate_jwt(user=current_user, token=new_token),
106109
)
107110

108111

109112
@router.delete("/{token_id}", status_code=HTTPStatus.NO_CONTENT)
110113
async def revoke_personal_token(
111114
token_id: UUID,
112-
current_user: Annotated[User, Depends(get_user())],
115+
current_user: Annotated[User, Depends(get_user(personal_token_policy=PersonalTokenPolicy.DENY))],
113116
user_token_service: Annotated[PersonalTokenService, Depends()],
114117
):
115118
async with user_token_service:

data_rentgen/server/errors/registration.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,18 @@ class APIErrorResponse(NamedTuple):
2020
_responses_by_status_code: dict[int, APIErrorResponse] = {}
2121

2222

23-
def register_error_response(exception: type[Exception], status: http.HTTPStatus):
23+
def register_error_response(
24+
exception: type[Exception],
25+
status: http.HTTPStatus,
26+
):
2427
"""Register mapping between exception, status code and JSON body schema."""
2528

2629
def wrapper(cls):
27-
response = APIErrorResponse(status.value, status.phrase, cls)
30+
response = APIErrorResponse(
31+
status=status.value,
32+
description=status.phrase,
33+
schema=cls,
34+
)
2835
_responses_by_exception[exception] = response
2936
_responses_by_status_code[status.value] = response
3037
return cls

0 commit comments

Comments
 (0)