Skip to content

Commit 0582ce5

Browse files
authored
Enforce API errors via the type system (#3512)
Sadly there doesn't seem to be a way to completely avoid AST code but at least it is simpler and the consumer side is as pleasant as it can be
1 parent 3ad80a9 commit 0582ce5

File tree

20 files changed

+514
-505
lines changed

20 files changed

+514
-505
lines changed

.github/actions/genprotos/action.yml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@ runs:
1010
with:
1111
path: |
1212
./flow/generated/protos
13+
./flow/cmd/typed_handler.go
1314
./nexus/pt/src/gen
1415
./ui/grpc_generated
15-
key: ${{ runner.os }}-build-genprotos-${{ hashFiles('buf.gen.yaml', './protos/peers.proto', './protos/flow.proto', './protos/route.proto') }}
16+
key: ${{ runner.os }}-build-genprotos-${{ hashFiles('buf.gen.yaml', './protos/peers.proto', './protos/flow.proto', './protos/route.proto', './flow/cmd/gen-grpc-wrapper/main.go') }}
1617

1718
- if: steps.cache.outputs.cache-hit != 'true'
1819
uses: bufbuild/buf-action@8f4a1456a0ab6a1eb80ba68e53832e6fcfacc16c # v1
@@ -21,4 +22,6 @@ runs:
2122
github_token: ${{ github.token }}
2223
- if: steps.cache.outputs.cache-hit != 'true'
2324
shell: sh
24-
run: buf generate protos
25+
run: |
26+
buf generate protos
27+
cd flow && go generate

flow/.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,6 @@ vendor/
1010

1111
# goenv local version. See https://github.com/syndbg/goenv/blob/master/COMMANDS.md#goenv-local for more info.
1212
.go-version
13+
14+
# generated files
15+
cmd/typed_handler.go

flow/cmd/alerts.go

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,44 +9,49 @@ import (
99
"github.com/PeerDB-io/peerdb/flow/generated/protos"
1010
"github.com/PeerDB-io/peerdb/flow/internal"
1111
"github.com/PeerDB-io/peerdb/flow/shared"
12-
"github.com/PeerDB-io/peerdb/flow/shared/exceptions"
1312
)
1413

15-
func (h *FlowRequestHandler) GetAlertConfigs(ctx context.Context, req *protos.GetAlertConfigsRequest) (*protos.GetAlertConfigsResponse, error) {
14+
func (h *FlowRequestHandler) GetAlertConfigs(
15+
ctx context.Context,
16+
req *protos.GetAlertConfigsRequest,
17+
) (*protos.GetAlertConfigsResponse, APIError) {
1618
rows, err := h.pool.Query(ctx, "SELECT id,service_type,service_config,enc_key_id,alert_for_mirrors from peerdb_stats.alerting_config")
1719
if err != nil {
18-
return nil, exceptions.NewInternalApiError(fmt.Errorf("failed to get alert configs: %w", err))
20+
return nil, NewInternalApiError(fmt.Errorf("failed to get alert configs: %w", err))
1921
}
2022

2123
configs, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (*protos.AlertConfig, error) {
2224
var serviceConfigPayload []byte
2325
var encKeyID string
2426
config := &protos.AlertConfig{}
2527
if err := row.Scan(&config.Id, &config.ServiceType, &serviceConfigPayload, &encKeyID, &config.AlertForMirrors); err != nil {
26-
return nil, exceptions.NewInternalApiError(fmt.Errorf("failed to scan alert config: %w", err))
28+
return nil, NewInternalApiError(fmt.Errorf("failed to scan alert config: %w", err))
2729
}
2830
serviceConfig, err := internal.Decrypt(ctx, encKeyID, serviceConfigPayload)
2931
if err != nil {
30-
return nil, exceptions.NewInternalApiError(fmt.Errorf("failed to decrypt alert config: %w", err))
32+
return nil, NewInternalApiError(fmt.Errorf("failed to decrypt alert config: %w", err))
3133
}
3234
config.ServiceConfig = string(serviceConfig)
3335
return config, nil
3436
})
3537
if err != nil {
36-
return nil, exceptions.NewInternalApiError(fmt.Errorf("failed to collect alert configs: %w", err))
38+
return nil, NewInternalApiError(fmt.Errorf("failed to collect alert configs: %w", err))
3739
}
3840

3941
return &protos.GetAlertConfigsResponse{Configs: configs}, nil
4042
}
4143

42-
func (h *FlowRequestHandler) PostAlertConfig(ctx context.Context, req *protos.PostAlertConfigRequest) (*protos.PostAlertConfigResponse, error) {
44+
func (h *FlowRequestHandler) PostAlertConfig(
45+
ctx context.Context,
46+
req *protos.PostAlertConfigRequest,
47+
) (*protos.PostAlertConfigResponse, APIError) {
4348
key, err := internal.PeerDBCurrentEncKey(ctx)
4449
if err != nil {
45-
return nil, exceptions.NewInternalApiError(fmt.Errorf("failed to get current enc key: %w", err))
50+
return nil, NewInternalApiError(fmt.Errorf("failed to get current enc key: %w", err))
4651
}
4752
serviceConfig, err := key.Encrypt(shared.UnsafeFastStringToReadOnlyBytes(req.Config.ServiceConfig))
4853
if err != nil {
49-
return nil, exceptions.NewInternalApiError(fmt.Errorf("failed to encrypt alert config: %w", err))
54+
return nil, NewInternalApiError(fmt.Errorf("failed to encrypt alert config: %w", err))
5055
}
5156

5257
if req.Config.Id == -1 {
@@ -69,7 +74,7 @@ func (h *FlowRequestHandler) PostAlertConfig(ctx context.Context, req *protos.Po
6974
key.ID,
7075
req.Config.AlertForMirrors,
7176
).Scan(&id); err != nil {
72-
return nil, exceptions.NewInternalApiError(fmt.Errorf("failed to insert alert config: %w", err))
77+
return nil, NewInternalApiError(fmt.Errorf("failed to insert alert config: %w", err))
7378
}
7479
return &protos.PostAlertConfigResponse{Id: id}, nil
7580
} else if _, err := h.pool.Exec(
@@ -81,17 +86,17 @@ func (h *FlowRequestHandler) PostAlertConfig(ctx context.Context, req *protos.Po
8186
req.Config.AlertForMirrors,
8287
req.Config.Id,
8388
); err != nil {
84-
return nil, exceptions.NewInternalApiError(fmt.Errorf("failed to update alert config: %w", err))
89+
return nil, NewInternalApiError(fmt.Errorf("failed to update alert config: %w", err))
8590
}
8691
return &protos.PostAlertConfigResponse{Id: req.Config.Id}, nil
8792
}
8893

8994
func (h *FlowRequestHandler) DeleteAlertConfig(
9095
ctx context.Context,
9196
req *protos.DeleteAlertConfigRequest,
92-
) (*protos.DeleteAlertConfigResponse, error) {
97+
) (*protos.DeleteAlertConfigResponse, APIError) {
9398
if _, err := h.pool.Exec(ctx, "delete from peerdb_stats.alerting_config where id = $1", req.Id); err != nil {
94-
return nil, exceptions.NewInternalApiError(fmt.Errorf("failed to delete alert config: %w", err))
99+
return nil, NewInternalApiError(fmt.Errorf("failed to delete alert config: %w", err))
95100
}
96101
return &protos.DeleteAlertConfigResponse{}, nil
97102
}

flow/cmd/api.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ func APIMain(ctx context.Context, args *APIServerParams) error {
259259
return fmt.Errorf("unable to start scheduler workflow: %w", err)
260260
}
261261

262-
protos.RegisterFlowServiceServer(grpcServer, flowHandler)
262+
protos.RegisterFlowServiceServer(grpcServer, NewFlowServiceAdapter(flowHandler))
263263
grpc_health_v1.RegisterHealthServer(grpcServer, health.NewServer())
264264
reflection.Register(grpcServer)
265265

flow/cmd/api_error.go

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
package cmd
2+
3+
import (
4+
"errors"
5+
"log/slog"
6+
7+
"github.com/gogo/googleapis/google/rpc"
8+
"google.golang.org/grpc/codes"
9+
"google.golang.org/grpc/status"
10+
"google.golang.org/protobuf/protoadapt"
11+
)
12+
13+
// APIError is a strongly-typed error that must be a gRPC status error.
14+
// All handler methods should return this type instead of the generic error interface.
15+
type APIError interface {
16+
error
17+
GRPCStatus() *status.Status
18+
Code() codes.Code
19+
}
20+
21+
type apiError struct {
22+
status *status.Status
23+
}
24+
25+
func newAPIError(s *status.Status) *apiError {
26+
return &apiError{status: s}
27+
}
28+
29+
func NewInvalidArgumentApiError(err error, details ...*rpc.ErrorInfo) *apiError {
30+
return newAPIError(convertToStatus(codes.InvalidArgument, err, details...))
31+
}
32+
33+
func NewFailedPreconditionApiError(err error, details ...*rpc.ErrorInfo) *apiError {
34+
return newAPIError(convertToStatus(codes.FailedPrecondition, err, details...))
35+
}
36+
37+
func NewInternalApiError(err error, details ...*rpc.ErrorInfo) *apiError {
38+
return newAPIError(convertToStatus(codes.Internal, err, details...))
39+
}
40+
41+
func NewUnavailableApiError(err error, details ...*rpc.ErrorInfo) *apiError {
42+
return newAPIError(convertToStatus(codes.Unavailable, err, details...))
43+
}
44+
45+
func NewUnimplementedApiError(err error, details ...*rpc.ErrorInfo) *apiError {
46+
return newAPIError(convertToStatus(codes.Unimplemented, err, details...))
47+
}
48+
49+
func NewAlreadyExistsApiError(err error, details ...*rpc.ErrorInfo) *apiError {
50+
return newAPIError(convertToStatus(codes.AlreadyExists, err, details...))
51+
}
52+
53+
func NewNotFoundApiError(err error, details ...*rpc.ErrorInfo) *apiError {
54+
return newAPIError(convertToStatus(codes.NotFound, err, details...))
55+
}
56+
57+
func (e *apiError) Error() string {
58+
if e.status == nil {
59+
return "unknown error"
60+
}
61+
return e.status.Err().Error()
62+
}
63+
64+
func (e *apiError) GRPCStatus() *status.Status {
65+
return e.status
66+
}
67+
68+
func (e *apiError) Code() codes.Code {
69+
if e.status == nil {
70+
return codes.Unknown
71+
}
72+
return e.status.Code()
73+
}
74+
75+
func convertToStatus(code codes.Code, err error, details ...*rpc.ErrorInfo) *status.Status {
76+
errorStatus := status.New(code, err.Error())
77+
if len(details) == 0 {
78+
return errorStatus
79+
}
80+
convertedDetails := make([]protoadapt.MessageV1, len(details))
81+
for i, detail := range details {
82+
convertedDetails[i] = detail
83+
}
84+
richStatus, err := errorStatus.WithDetails(convertedDetails...)
85+
if err != nil {
86+
// This cannot happen because we control all calls to convertToStatus and only pass code != OK and allow only rpc.ErrorInfo in details
87+
slog.Error("Failed to convert to grpc proto error", "error", err) //nolint:sloglint // No context in conversion helper
88+
return errorStatus
89+
}
90+
91+
return richStatus
92+
}
93+
94+
// AsAPIError converts an error to APIError if it's a gRPC status error,
95+
// otherwise wraps it as an Internal error
96+
func AsAPIError(err error) APIError {
97+
if err == nil {
98+
return nil
99+
}
100+
101+
if apiErr, ok := err.(APIError); ok {
102+
return apiErr
103+
}
104+
105+
if s, ok := status.FromError(err); ok {
106+
return newAPIError(s)
107+
}
108+
109+
return NewInternalApiError(err)
110+
}
111+
112+
const (
113+
ErrorInfoReasonClickHousePeer = "CLICKHOUSE_PEER"
114+
ErrorInfoReasonMirror = "MIRROR"
115+
)
116+
117+
const (
118+
ErrorInfoDomain = "peerdb.io"
119+
)
120+
121+
const (
122+
ErrorMetadataDownstreamErrorCode = "downstreamErrorCode"
123+
ErrorMetadataOffendingField = "offendingField"
124+
)
125+
126+
func NewClickHousePeerErrorInfo(metadata map[string]string) *rpc.ErrorInfo {
127+
return &rpc.ErrorInfo{
128+
Reason: ErrorInfoReasonClickHousePeer,
129+
Domain: ErrorInfoDomain,
130+
Metadata: metadata,
131+
}
132+
}
133+
134+
func NewMirrorErrorInfo(metadata map[string]string) *rpc.ErrorInfo {
135+
return &rpc.ErrorInfo{
136+
Reason: ErrorInfoReasonMirror,
137+
Domain: ErrorInfoDomain,
138+
Metadata: metadata,
139+
}
140+
}
141+
142+
var ErrUnderMaintenance = errors.New("PeerDB is under maintenance. Please retry in a few minutes")

0 commit comments

Comments
 (0)