@@ -77,6 +77,39 @@ def login(self, username: str, password: str) -> Dict[str, Any]:
7777 # Login to DataSpace backend
7878 return self ._login_with_keycloak_token (keycloak_token )
7979
80+ def login_as_service_account (self ) -> Dict [str , Any ]:
81+ """
82+ Login using client credentials (service account).
83+
84+ This method authenticates the client itself (not a user) using
85+ the client_id and client_secret. Requires the Keycloak client
86+ to have "Service Accounts Enabled".
87+
88+ Returns:
89+ Dictionary containing user info and tokens
90+
91+ Raises:
92+ DataSpaceAuthError: If authentication fails
93+ """
94+ if not all (
95+ [
96+ self .keycloak_url ,
97+ self .keycloak_realm ,
98+ self .keycloak_client_id ,
99+ self .keycloak_client_secret ,
100+ ]
101+ ):
102+ raise DataSpaceAuthError (
103+ "Service account authentication requires keycloak_url, "
104+ "keycloak_realm, keycloak_client_id, and keycloak_client_secret."
105+ )
106+
107+ # Get Keycloak token using client credentials
108+ keycloak_token = self ._get_service_account_token ()
109+
110+ # Login to DataSpace backend
111+ return self ._login_with_keycloak_token (keycloak_token )
112+
80113 def _get_keycloak_token (self , username : str , password : str ) -> str :
81114 """
82115 Get Keycloak access token using username and password.
@@ -140,6 +173,64 @@ def _get_keycloak_token(self, username: str, password: str) -> str:
140173 except requests .RequestException as e :
141174 raise DataSpaceAuthError (f"Network error during Keycloak authentication: { str (e )} " )
142175
176+ def _get_service_account_token (self ) -> str :
177+ """
178+ Get Keycloak access token using client credentials (service account).
179+
180+ Returns:
181+ Keycloak access token
182+
183+ Raises:
184+ DataSpaceAuthError: If authentication fails
185+ """
186+ token_url = (
187+ f"{ self .keycloak_url } /auth/realms/{ self .keycloak_realm } /"
188+ f"protocol/openid-connect/token"
189+ )
190+
191+ data = {
192+ "grant_type" : "client_credentials" ,
193+ "client_id" : self .keycloak_client_id ,
194+ "client_secret" : self .keycloak_client_secret ,
195+ }
196+
197+ try :
198+ response = requests .post (
199+ token_url ,
200+ data = data ,
201+ headers = {"Content-Type" : "application/x-www-form-urlencoded" },
202+ )
203+
204+ if response .status_code == 200 :
205+ token_data = response .json ()
206+ self .keycloak_access_token = token_data .get ("access_token" )
207+ self .keycloak_refresh_token = token_data .get ("refresh_token" )
208+
209+ # Calculate token expiration time
210+ expires_in = token_data .get ("expires_in" , 300 )
211+ self .token_expires_at = time .time () + expires_in
212+
213+ if not self .keycloak_access_token :
214+ raise DataSpaceAuthError ("No access token in Keycloak response" )
215+
216+ return self .keycloak_access_token
217+ else :
218+ error_data = response .json ()
219+ error_msg = error_data .get (
220+ "error_description" ,
221+ error_data .get ("error" , "Service account authentication failed" ),
222+ )
223+ raise DataSpaceAuthError (
224+ f"Service account login failed: { error_msg } . "
225+ f"Ensure 'Service Accounts Enabled' is ON in Keycloak client settings." ,
226+ status_code = response .status_code ,
227+ response = error_data ,
228+ )
229+ except requests .RequestException as e :
230+ raise DataSpaceAuthError (
231+ f"Network error during service account authentication: { str (e )} "
232+ )
233+
143234 def _refresh_keycloak_token (self ) -> str :
144235 """
145236 Refresh Keycloak access token using refresh token.
0 commit comments