1+ from concurrent import futures
2+ import grpc
3+ import time
4+ import google .protobuf .any_pb2 as any_pb2
5+ from ni .pythonpanel .v1 .python_panel_service_pb2 import ConnectRequest , ConnectResponse , DisconnectRequest , DisconnectResponse , GetValueRequest , GetValueResponse , SetValueRequest , SetValueResponse
6+ from ni .pythonpanel .v1 .python_panel_service_pb2_grpc import PythonPanelServiceServicer , add_PythonPanelServiceServicer_to_server
7+
8+ class FakePythonPanelService (PythonPanelServiceServicer ):
9+ def Connect (self , request , context ):
10+ # Basic implementation for testing
11+ print (f"Connecting to panel: { request .panel_id } at { request .panel_uri } " )
12+ return ConnectResponse ()
13+
14+ def Disconnect (self , request , context ):
15+ # Basic implementation for testing
16+ print (f"Disconnecting from panel: { request .panel_id } " )
17+ return DisconnectResponse ()
18+
19+ def GetValue (self , request , context ):
20+ # Basic implementation for testing
21+ print (f"Getting value for panel: { request .panel_id } , value_id: { request .value_id } " )
22+ value = any_pb2 .Any () # Placeholder for actual value
23+ return GetValueResponse (value = value )
24+
25+ def SetValue (self , request , context ):
26+ # Basic implementation for testing
27+ print (f"Setting value for panel: { request .panel_id } , value_id: { request .value_id } , value: { request .value } " )
28+ return SetValueResponse ()
29+
30+ def serve ():
31+ server = grpc .server (futures .ThreadPoolExecutor (max_workers = 10 ))
32+ add_PythonPanelServiceServicer_to_server (FakePythonPanelService (), server )
33+ server .add_insecure_port ('[::]:50051' ) # TODO: do we need to find a free port?
34+ server .start ()
35+ print ("Server is running on port 50051..." )
36+ try :
37+ while True :
38+ time .sleep (86400 ) # Keep the server running
39+ except KeyboardInterrupt :
40+ server .stop (0 )
41+
42+ if __name__ == '__main__' :
43+ serve ()
0 commit comments