|
4 | 4 | "context" |
5 | 5 | "flag" |
6 | 6 | "fmt" |
| 7 | + "io" |
7 | 8 | "log" |
8 | 9 | "net" |
9 | 10 | "strconv" |
@@ -99,9 +100,62 @@ func (s *Server) Set(_ context.Context, req *gpb.SetRequest) (*gpb.SetResponse, |
99 | 100 | }, nil |
100 | 101 | } |
101 | 102 |
|
102 | | -func (s *Server) Subscribe(grpc.BidiStreamingServer[gpb.SubscribeRequest, gpb.SubscribeResponse]) error { |
103 | | - log.Printf("Received Subscribe request") |
104 | | - return status.Errorf(codes.Unimplemented, "method Subscribe not implemented") |
| 103 | +func (s *Server) Subscribe(stream grpc.BidiStreamingServer[gpb.SubscribeRequest, gpb.SubscribeResponse]) error { |
| 104 | + req, err := stream.Recv() |
| 105 | + switch { |
| 106 | + case err == io.EOF: |
| 107 | + return nil |
| 108 | + case err != nil: |
| 109 | + return err |
| 110 | + case req.GetSubscribe() == nil: |
| 111 | + return status.Errorf(codes.InvalidArgument, "the subscribe request must contain a subscription definition") |
| 112 | + } |
| 113 | + |
| 114 | + switch req.GetRequest().(type) { |
| 115 | + case *gpb.SubscribeRequest_Poll: |
| 116 | + return status.Errorf(codes.InvalidArgument, "invalid request type: %T", req.GetRequest()) |
| 117 | + case *gpb.SubscribeRequest_Subscribe: |
| 118 | + } |
| 119 | + |
| 120 | + switch mode := req.GetSubscribe().GetMode(); mode { |
| 121 | + case gpb.SubscriptionList_ONCE: |
| 122 | + log.Printf("Received Subscribe request with ONCE mode") |
| 123 | + |
| 124 | + paths := make([]*gpb.Path, 0, len(req.GetSubscribe().GetSubscription())) |
| 125 | + for _, r := range req.GetSubscribe().GetSubscription() { |
| 126 | + paths = append(paths, r.GetPath()) |
| 127 | + } |
| 128 | + |
| 129 | + res, err := s.Get(stream.Context(), &gpb.GetRequest{ |
| 130 | + Prefix: req.GetSubscribe().GetPrefix(), |
| 131 | + Path: paths, |
| 132 | + Encoding: req.GetSubscribe().GetEncoding(), |
| 133 | + UseModels: req.GetSubscribe().GetUseModels(), |
| 134 | + Extension: req.GetExtension(), |
| 135 | + }) |
| 136 | + if err != nil { |
| 137 | + return err |
| 138 | + } |
| 139 | + |
| 140 | + for _, notification := range res.GetNotification() { |
| 141 | + if err := stream.Send(&gpb.SubscribeResponse{ |
| 142 | + Response: &gpb.SubscribeResponse_Update{ |
| 143 | + Update: notification, |
| 144 | + }, |
| 145 | + }); err != nil { |
| 146 | + return status.Errorf(codes.Internal, "failed to send response: %v", err) |
| 147 | + } |
| 148 | + } |
| 149 | + |
| 150 | + case gpb.SubscriptionList_STREAM: |
| 151 | + return status.Errorf(codes.Unimplemented, "subscribe method Stream not implemented") |
| 152 | + case gpb.SubscriptionList_POLL: |
| 153 | + return status.Errorf(codes.Unimplemented, "subscribe method Poll not implemented") |
| 154 | + default: |
| 155 | + return status.Errorf(codes.InvalidArgument, "unknown subscribe request mode: %v", mode) |
| 156 | + } |
| 157 | + |
| 158 | + return nil |
105 | 159 | } |
106 | 160 |
|
107 | 161 | // State represents a JSON body that can be manipulated using [sjson] syntax. |
|
0 commit comments