Skip to content

Commit 0ea86d4

Browse files
authored
Merge pull request #55 from SentriusLLC/proxy_Chat
proxy Chat
2 parents e5080e6 + 4d32ac1 commit 0ea86d4

File tree

23 files changed

+377
-37
lines changed

23 files changed

+377
-37
lines changed

.local.env

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
SENTRIUS_VERSION=1.1.81
2-
SENTRIUS_SSH_VERSION=1.1.17
3-
SENTRIUS_KEYCLOAK_VERSION=1.1.23
4-
SENTRIUS_AGENT_VERSION=1.1.17
5-
SENTRIUS_AI_AGENT_VERSION=1.1.32
6-
LLMPROXY_VERSION=1.0.17
7-
LAUNCHER_VERSION=1.0.22
1+
SENTRIUS_VERSION=1.1.95
2+
SENTRIUS_SSH_VERSION=1.1.18
3+
SENTRIUS_KEYCLOAK_VERSION=1.1.25
4+
SENTRIUS_AGENT_VERSION=1.1.18
5+
SENTRIUS_AI_AGENT_VERSION=1.1.33
6+
LLMPROXY_VERSION=1.0.18
7+
LAUNCHER_VERSION=1.0.29

.local.env.bak

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
SENTRIUS_VERSION=1.1.81
2-
SENTRIUS_SSH_VERSION=1.1.17
3-
SENTRIUS_KEYCLOAK_VERSION=1.1.22
4-
SENTRIUS_AGENT_VERSION=1.1.17
5-
SENTRIUS_AI_AGENT_VERSION=1.1.32
6-
LLMPROXY_VERSION=1.0.17
7-
LAUNCHER_VERSION=1.0.22
1+
SENTRIUS_VERSION=1.1.95
2+
SENTRIUS_SSH_VERSION=1.1.18
3+
SENTRIUS_KEYCLOAK_VERSION=1.1.25
4+
SENTRIUS_AGENT_VERSION=1.1.18
5+
SENTRIUS_AI_AGENT_VERSION=1.1.33
6+
LLMPROXY_VERSION=1.0.18
7+
LAUNCHER_VERSION=1.0.28

agent-launcher/src/main/java/io/sentrius/agent/launcher/service/PodLauncherService.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,19 @@ public class PodLauncherService {
2828
@Value("${sentrius.agent.registry.version}")
2929
private String agentVersion;
3030

31+
@Value("${sentrius.agent.callback.format.url:http://sentrius-agent-%s.%s.svc.cluster.local:8090}")
32+
private String callbackFormatUrl;
33+
3134
public PodLauncherService() throws IOException {
3235
ApiClient client = Config.defaultClient(); // in-cluster or kubeconfig
3336
this.coreV1Api = new CoreV1Api(client);
3437
}
3538

39+
private String buildAgentCallbackUrl(String agentId) {
40+
return String.format(callbackFormatUrl, agentId, agentNamespace);
41+
}
42+
43+
3644
public V1Pod launchAgentPod(String agentId, String callbackUrl) throws Exception {
3745
if (agentRegistry != null ) {
3846
if ("local".equalsIgnoreCase(agentRegistry)) {
@@ -42,6 +50,8 @@ public V1Pod launchAgentPod(String agentId, String callbackUrl) throws Exception
4250
}
4351
}
4452

53+
var constructedCallbackUrl = buildAgentCallbackUrl(agentId);
54+
4555
String image = String.format("%ssentrius-launchable-agent:%s", agentRegistry, agentVersion);
4656

4757
log.info("Launching agent pod with ID: {}, Image: {}, Callback URL: {}", agentId, image, callbackUrl);
@@ -56,7 +66,9 @@ public V1Pod launchAgentPod(String agentId, String callbackUrl) throws Exception
5666
.imagePullPolicy("IfNotPresent")
5767

5868
.args(List.of("--spring.config.location=file:/config/agent.properties",
59-
"--agent.namePrefix=" + agentId, "--agent.ai.config=/config/chat-helper.yaml", "--agent.listen.websocket=true"))
69+
"--agent.namePrefix=" + agentId, "--agent.ai.config=/config/chat-helper.yaml", "--agent.listen.websocket=true",
70+
"--agent.callback.url=" + constructedCallbackUrl
71+
))
6072
.resources(new V1ResourceRequirements()
6173
.limits(Map.of(
6274
"cpu", Quantity.fromString("500m"),

api/src/main/java/io/sentrius/sso/controllers/api/AgentApiController.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@
33
import java.net.URLDecoder;
44
import java.nio.charset.StandardCharsets;
55
import java.security.GeneralSecurityException;
6+
import java.security.NoSuchAlgorithmException;
67
import java.sql.SQLException;
78
import java.time.LocalDateTime;
89
import java.util.HashSet;
910
import java.util.List;
1011
import java.util.Map;
12+
import java.util.Optional;
1113
import java.util.Set;
1214
import java.util.UUID;
1315
import java.util.concurrent.ExecutionException;
@@ -40,6 +42,7 @@
4042
import io.sentrius.sso.provenance.kafka.ProvenanceKafkaProducer;
4143
import jakarta.servlet.http.HttpServletRequest;
4244
import jakarta.servlet.http.HttpServletResponse;
45+
import jakarta.transaction.Transactional;
4346
import lombok.extern.slf4j.Slf4j;
4447
import org.apache.http.HttpStatus;
4548
import org.jetbrains.annotations.NotNull;
@@ -172,7 +175,7 @@ public ResponseEntity<?> requestRegistration(
172175

173176
// Approve the request if the agent has an active policy ( and it is known and allowed ).
174177
if (atplPolicyService.getPolicy(operatingUser).isPresent()) {
175-
var admin = userService.getUser(UserType.createSystemAdmin().getId());
178+
var admin = createOrGetSystemAdmin();
176179
var approval = ztatService.approveOpsAccessToken(ztatRequest, admin);
177180

178181
return ResponseEntity.ok(Map.of("ztat_token", approval.getToken().toString(), "communication_id",communicationId ));
@@ -186,6 +189,21 @@ public ResponseEntity<?> requestRegistration(
186189

187190

188191

192+
}
193+
194+
@Transactional
195+
protected synchronized User createOrGetSystemAdmin() throws NoSuchAlgorithmException {
196+
var admin = userService.getUserByUsername("SYSTEM");
197+
if (null == admin){
198+
var systemAdmin = User.builder()
199+
.username("SYSTEM")
200+
.name("System Admin")
201+
.userId("SYSTEM")
202+
.emailAddress("email").password( userService.encodePassword(UUID.randomUUID().toString())).authorizationType(UserType.createSystemAdmin()).identityType(IdentityType.NON_PERSON_ENTITY);
203+
return userService.save(systemAdmin.build());
204+
}
205+
return admin;
206+
189207
}
190208

191209
@PostMapping("/provenance/submit")

api/src/main/java/io/sentrius/sso/controllers/api/EnclaveApiController.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,8 @@ public ResponseEntity<HostGroupDTO> setAssignments(HttpServletRequest request, H
7575
List<User> newUserList = new ArrayList<>();
7676
for(var userId : (List<String>) payload.get("userIds")) {
7777
var u = userService.getUser(Long.valueOf(userId));
78-
if (null != u) {
79-
newUserList.add(u);
78+
if (u.isPresent()) {
79+
newUserList.add(u.get());
8080
}
8181
}
8282
hg.setUsers(newUserList);

api/src/main/java/io/sentrius/sso/controllers/api/UserApiController.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,19 @@ public String deleteUser(@RequestParam("userId") String userId, @RequestParam(re
142142
log.info("Deleting non-person entity user with id: {}", userId);
143143
String userIdStr = cryptoService.decrypt(userId);
144144
var usr = userService.getUserByUserid(userIdStr);
145+
if (usr.getId() < 0) {
146+
log.info("User with id {} is a system user and cannot be deleted", usr.getId());
147+
return "redirect:/sso/v1/users/list?message=" + MessagingUtil.getMessageId(MessagingUtil.UNEXPECTED_ERROR);
148+
149+
}
145150
userService.deleteUser(usr.getId());
146151
} else {
147152
Long id = Long.parseLong(cryptoService.decrypt(userId));
153+
if (id < 0) {
154+
log.info("User with id {} is a system user and cannot be deleted", id);
155+
return "redirect:/sso/v1/users/list?message=" +
156+
MessagingUtil.getMessageId(MessagingUtil.UNEXPECTED_ERROR);
157+
}
148158
userService.deleteUser(id);
149159
}
150160
return "redirect:/sso/v1/users/list?message=" + MessagingUtil.getMessageId(MessagingUtil.USER_DELETE_SUCCESS);
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package io.sentrius.sso.locator;
2+
3+
4+
import org.springframework.beans.factory.annotation.Value;
5+
import org.springframework.stereotype.Component;
6+
7+
import java.net.URI;
8+
9+
@Component
10+
public class KubernetesAgentLocator {
11+
12+
13+
public URI resolveWebSocketUri(String host, String sessionId, String chatGroupId, String ztat) {
14+
// DNS: sentrius-agent-[ID].[namespace].svc.cluster.local
15+
///api/v1/chat/attach/subscribe?sessionId=${encodeURIComponent(this.sessionId)}&chatGroupId=${this.chatGroupId}&ztat=${encodeURIComponent(jwt)
16+
String fqdn = String.format("%s/api/v1/chat/attach/subscribe?sessionId=%s&chatGroupId=%s&ztat=%s",
17+
host, sessionId, chatGroupId, ztat);
18+
return URI.create(fqdn);
19+
}
20+
}

api/src/main/java/io/sentrius/sso/startup/ConfigurationApplicationTask.java

Lines changed: 53 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,8 @@ public List<SideEffect> initialize(InstallConfiguration installConfiguration, bo
189189

190190
// first we create the admin user, then the user types followed by all users
191191
sideEffects.addAll(createAdminUser(installConfiguration, action));
192+
sideEffects.addAll(createSystemAdmin(installConfiguration, action));
193+
192194

193195
createSystemUser(installConfiguration);
194196

@@ -681,16 +683,16 @@ protected List<User> createUsers(
681683
}
682684
}
683685
if (action){
684-
user = userService.getUser(user.getId());
686+
var newUser = userService.getUser(user.getId());
685687
var definition = userDTO.getAtlpDefinition();
686688
if (null != definition && !definition.isEmpty()) {
687689
Optional<ATPLPolicyEntity> policy = policyList.stream()
688690
.filter(p -> p.getPolicyId().equals(definition))
689691
.findFirst();
690-
if (policy.isPresent()) {
691-
atplPolicyService.assignPolicyToUser(user, policy.get());
692+
if (policy.isPresent() & newUser.isPresent()) {
693+
atplPolicyService.assignPolicyToUser(newUser.get(), policy.get());
692694
} else {
693-
log.warn("No ATPL policy found for user {} with policy id {}", user.getUsername(),
695+
log.warn("No ATPL policy found for user {} with policy id {}", newUser.get().getUsername(),
694696
definition);
695697
}
696698
}
@@ -809,16 +811,16 @@ protected List<User> createNPEs(
809811
}
810812
}
811813
if (action){
812-
user = userService.getUser(user.getId());
814+
var newUser = userService.getUser(user.getId());
813815
var definition = userDTO.getAtlpDefinition();
814816
if (null != definition && !definition.isEmpty()) {
815817
Optional<ATPLPolicyEntity> policy = policyList.stream()
816818
.filter(p -> p.getPolicyId().equals(definition))
817819
.findFirst();
818820
if (policy.isPresent()) {
819-
atplPolicyService.assignPolicyToUser(user, policy.get());
821+
atplPolicyService.assignPolicyToUser(newUser.get(), policy.get());
820822
} else {
821-
log.warn("No ATPL policy found for user {} with policy id {}", user.getUsername(),
823+
log.warn("No ATPL policy found for user {} with policy id {}", newUser.get().getUsername(),
822824
definition);
823825
}
824826
}
@@ -880,6 +882,50 @@ protected List<SideEffect> createAdminUser(InstallConfiguration installConfigura
880882
return sideEffects;
881883
}
882884

885+
@Transactional
886+
public List<SideEffect> createSystemAdmin(InstallConfiguration installConfiguration, boolean action) throws NoSuchAlgorithmException {
887+
888+
var user = installConfiguration.getSystemUser();
889+
890+
if (null == user) {
891+
throw new IllegalStateException("Admin user not found in configuration");
892+
}
893+
List<SideEffect> sideEffects = new ArrayList<>();
894+
userService.findByUsername("SYSTEM").ifPresentOrElse(
895+
user1 -> {
896+
// ignore
897+
},
898+
() -> {
899+
sideEffects.add(SideEffect.builder().sideEffectDescription("Creating admin user " + user.getUsername()).type(
900+
SideEffectType.UPDATE_DATABASE).asset("Users").build());
901+
if (action) {
902+
try {
903+
user.setUserId("SYSTEM");
904+
user.setPassword(userService.encodePassword(UUID.randomUUID().toString()));
905+
user.setAuthorizationType(UserType.createSystemAdmin().toDTO());
906+
user.setIdentityType(IdentityType.NON_PERSON_ENTITY.toString());
907+
908+
var type =
909+
userService.getUserType(UserType.createSystemAdmin());
910+
if (type.isEmpty()){
911+
type = Optional.of( userService.saveUserType(UserType.createSystemAdmin()) );
912+
}
913+
914+
userService.addUscer(User.from(user, type.get()));
915+
} catch (NoSuchAlgorithmException e) {
916+
throw new RuntimeException(e);
917+
}
918+
919+
// insert default admin user
920+
921+
}
922+
}
923+
);
924+
925+
926+
return sideEffects;
927+
}
928+
883929
@Transactional
884930
protected void createSystemUser(InstallConfiguration connection) throws NoSuchAlgorithmException {
885931

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package io.sentrius.sso.websocket;
2+
3+
import org.springframework.http.server.ServerHttpRequest;
4+
import org.springframework.http.server.ServerHttpResponse;
5+
import org.springframework.web.socket.WebSocketHandler;
6+
import org.springframework.web.socket.server.HandshakeInterceptor;
7+
8+
import java.util.Map;
9+
10+
import org.springframework.web.util.UriComponentsBuilder;
11+
12+
public class AgentHandshakeInterceptor implements HandshakeInterceptor {
13+
14+
@Override
15+
public boolean beforeHandshake(ServerHttpRequest request,
16+
ServerHttpResponse response,
17+
WebSocketHandler wsHandler,
18+
Map<String, Object> attributes) {
19+
20+
String query = request.getURI().getQuery();
21+
Map<String, String> queryParams = UriComponentsBuilder.fromUri(request.getURI()).build().getQueryParams().toSingleValueMap();
22+
23+
attributes.put("host", queryParams.get("phost"));
24+
attributes.put("sessionId", queryParams.get("sessionId"));
25+
attributes.put("chatGroupId", queryParams.get("chatGroupId"));
26+
attributes.put("ztat", queryParams.get("ztat"));
27+
28+
return true;
29+
30+
}
31+
32+
@Override
33+
public void afterHandshake(ServerHttpRequest request,
34+
ServerHttpResponse response,
35+
WebSocketHandler wsHandler,
36+
Exception exception) {
37+
// no-op
38+
}
39+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package io.sentrius.sso.websocket;
2+
3+
import java.net.URI;
4+
import java.security.GeneralSecurityException;
5+
6+
import io.sentrius.sso.locator.KubernetesAgentLocator;
7+
import lombok.RequiredArgsConstructor;
8+
9+
import org.springframework.stereotype.Component;
10+
import org.springframework.web.reactive.socket.WebSocketHandler;
11+
import org.springframework.web.reactive.socket.WebSocketMessage;
12+
import org.springframework.web.reactive.socket.WebSocketSession;
13+
import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient;
14+
15+
import io.sentrius.sso.core.services.security.CryptoService;
16+
import lombok.extern.slf4j.Slf4j;
17+
import reactor.core.publisher.Mono;
18+
19+
@Component
20+
@Slf4j
21+
@RequiredArgsConstructor
22+
public class AgentWebSocketProxyHandler implements WebSocketHandler {
23+
24+
private final KubernetesAgentLocator agentLocator;
25+
private final CryptoService cryptoService;
26+
27+
@Override
28+
public Mono<Void> handle(WebSocketSession clientSession) {
29+
try {
30+
String host = (String) clientSession.getAttributes().get("host");
31+
var decryptedHost = cryptoService.decrypt(host); // Ensure host is decrypted if necessary
32+
String sessionId = (String) clientSession.getAttributes().get("sessionId");
33+
String chatGroupId = (String) clientSession.getAttributes().get("chatGroupId");
34+
String ztat = (String) clientSession.getAttributes().get("ztat");
35+
log.info("Handling WebSocket connection for host: {}, sessionId: {}, chatGroupId: {}, ztat: {}",
36+
decryptedHost, sessionId, chatGroupId, ztat);
37+
URI agentUri = agentLocator.resolveWebSocketUri(decryptedHost, sessionId, chatGroupId, ztat);
38+
39+
ReactorNettyWebSocketClient proxyClient = new ReactorNettyWebSocketClient();
40+
41+
return proxyClient.execute(agentUri, agentSession -> {
42+
// Forward messages from client to agent
43+
Mono<Void> clientToAgent = clientSession.receive()
44+
.map(WebSocketMessage::getPayload)
45+
.map(dataBuffer -> agentSession.binaryMessage(factory -> dataBuffer))
46+
.as(agentSession::send);
47+
48+
// Forward messages from agent to client
49+
Mono<Void> agentToClient = agentSession.receive()
50+
.map(WebSocketMessage::getPayload)
51+
.map(dataBuffer -> clientSession.binaryMessage(factory -> dataBuffer))
52+
.as(clientSession::send);
53+
54+
// Run both directions in parallel, complete when both are done
55+
return Mono.zip(clientToAgent, agentToClient).then();
56+
});
57+
} catch (GeneralSecurityException ex) {
58+
throw new RuntimeException("Failed to decrypt host", ex);
59+
}
60+
}
61+
}

0 commit comments

Comments
 (0)