2222"""Module containing the admin service implementation for v1."""
2323
2424import grpc
25+ import warnings
26+
27+ import semver
2528
2629from ansys .geometry .core .errors import protect_grpc
2730
2831from ..base .admin import GRPCAdminService
2932
33+ # Define BackendType if not already imported
34+ class BackendType :
35+ DISCOVERY = "DISCOVERY"
36+
3037
3138class GRPCAdminServiceV1 (GRPCAdminService ): # pragma: no cover
3239 """Admin service for gRPC communication with the Geometry server.
@@ -43,18 +50,82 @@ class GRPCAdminServiceV1(GRPCAdminService): # pragma: no cover
4350
4451 @protect_grpc
4552 def __init__ (self , channel : grpc .Channel ): # noqa: D102
46- from ansys .api .dbu .v1 .admin_pb2_grpc import AdminStub
53+ from ansys .api .discovery .v1 .commands . application_pb2_grpc import ApplicationStub
4754
48- self .stub = AdminStub (channel )
55+ self .stub = ApplicationStub (channel )
4956
5057 @protect_grpc
5158 def get_backend (self , ** kwargs ) -> dict : # noqa: D102
52- raise NotImplementedError
59+ # TODO: Remove this context and filter once the protobuf UserWarning is downgraded to INFO
60+ # https://github.com/grpc/grpc/issues/37609
61+ with warnings .catch_warnings ():
62+ warnings .filterwarnings (
63+ "ignore" , "Protobuf gencode version" , UserWarning , "google.protobuf.runtime_version"
64+ )
65+ from google .protobuf .empty_pb2 import Empty
66+
67+ # Create the request - assumes all inputs are valid and of the proper type
68+ request = Empty ()
69+
70+ # Call the gRPC service
71+ response = self .stub .GetBackend (request = request )
72+
73+ # COMPATIBILITY HACK: retrieve the backend version -- for versions after 24R1
74+ if hasattr (response , "version" ):
75+ ver = response .version
76+ backend_version = semver .Version (ver .major_release , ver .minor_release , ver .service_pack )
77+ api_server_build_info = f"{ ver .build_number } " if ver .build_number != 0 else "N/A"
78+ product_build_info = (
79+ response .backend_version_info .strip () if response .backend_version_info else "N/A"
80+ )
81+ else : # pragma: no cover
82+ # If the version is not available, set a default version
83+ backend_version = semver .Version (24 , 1 , 0 )
84+ api_server_build_info = "N/A"
85+ product_build_info = "N/A"
86+
87+ # Convert the response to a dictionary
88+ return {
89+ "backend" : BackendType .DISCOVERY ,
90+ "version" : backend_version ,
91+ "api_server_build_info" : api_server_build_info ,
92+ "product_build_info" : product_build_info ,
93+ "additional_info" : {k : v for k , v in response .additional_build_info .items ()},
94+ }
5395
5496 @protect_grpc
5597 def get_logs (self , ** kwargs ) -> dict : # noqa: D102
56- raise NotImplementedError
98+ from ansys .api .dbu .v1 .admin_pb2 import LogsRequest , LogsTarget , PeriodType
99+
100+ # Create the request - assumes all inputs are valid and of the proper type
101+ request = LogsRequest (
102+ target = LogsTarget .CLIENT ,
103+ period_type = PeriodType .CURRENT if not kwargs ["all_logs" ] else PeriodType .ALL ,
104+ null_path = None ,
105+ null_period = None ,
106+ )
107+
108+ # Call the gRPC service
109+ logs_generator = self .stub .GetLogs (request )
110+ logs : dict [str , str ] = {}
111+
112+ # Convert the response to a dictionary
113+ for chunk in logs_generator :
114+ if chunk .log_name not in logs :
115+ logs [chunk .log_name ] = ""
116+ logs [chunk .log_name ] += chunk .log_chunk .decode ()
117+
118+ return {"logs" : logs }
57119
58120 @protect_grpc
59121 def get_service_status (self , ** kwargs ) -> dict : # noqa: D102
60- raise NotImplementedError
122+ from google .protobuf .empty_pb2 import Empty
123+
124+ # Create the request - assumes all inputs are valid and of the proper type
125+ request = Empty ()
126+
127+ # Call the gRPC service
128+ response = self .stub .Health (request = request )
129+
130+ # Convert the response to a dictionary
131+ return {"healthy" : True if response .message == "I am healthy!" else False }
0 commit comments