Skip to content

Commit 4af7baa

Browse files
authored
Add idempotent flag and graph queries default to not idempotent (#93)
* Add idempotent flag and graph queries default to not idempotent If the proxy detects a graph query (using custom payload key "graph-source") it treats the queries a not idempotent, unless a idempotent flag is set to true. This not perfect, but it is not possible to easily determine if a graph query is idempotent. * Add support for DSE graph Things that changed: * If the backend cluster is DSE then `system.local` and `system.peers` queries return `dse_version` * Empty query strings are valid for graph query, data is transferred using custom payload * Add tests for `dse_version` and DSE Graph queries * Make idempotent flag only applies to graph queries * Fix formatting * Update flag name * Update README with new cli flags
1 parent 543e2cc commit 4af7baa

File tree

10 files changed

+302
-88
lines changed

10 files changed

+302
-88
lines changed

README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ Usage: cql-proxy
3939

4040
Flags:
4141
-h, --help Show context-sensitive help.
42-
-b, --astra-bundle=STRING Path to secure connect bundle for an Astra database. Requires '--username' and '--password'. Ignored if using the token or contact points option
43-
($ASTRA_BUNDLE).
42+
-b, --astra-bundle=STRING Path to secure connect bundle for an Astra database. Requires '--username' and '--password'. Ignored if using the token or contact points
43+
option ($ASTRA_BUNDLE).
4444
-t, --astra-token=STRING Token used to authenticate to an Astra database. Requires '--astra-database-id'. Ignored if using the bundle path or contact points option
4545
($ASTRA_TOKEN).
4646
-i, --astra-database-id=STRING Database ID of the Astra database. Requires '--astra-token' ($ASTRA_DATABASE_ID)
@@ -59,6 +59,8 @@ Flags:
5959
--heartbeat-interval=30s Interval between performing heartbeats to the cluster ($HEARTBEAT_INTERVAL)
6060
--idle-timeout=60s Duration between successful heartbeats before a connection to the cluster is considered unresponsive and closed ($IDLE_TIMEOUT)
6161
--readiness-timeout=30s Duration the proxy is unable to connect to the backend cluster before it is considered not ready ($READINESS_TIMEOUT)
62+
--idempotent-graph If true it will treat all graph queries as idempotent by default and retry them automatically. It may be dangerous to retry some graph
63+
queries -- use with caution ($IDEMPOTENT_GRAPH).
6264
--num-conns=1 Number of connection to create to each node of the backend cluster ($NUM_CONNS)
6365
--rpc-address=STRING Address to advertise in the 'system.local' table for 'rpc_address'. It must be set if configuring peer proxies ($RPC_ADDRESS)
6466
--data-center=STRING Data center to use in system tables ($DATA_CENTER)

parser/metadata.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,24 @@ var (
3535
{Keyspace: "system", Table: "local", Name: "host_id", Type: datatype.Uuid},
3636
}
3737

38+
DseSystemLocalColumns = []*message.ColumnMetadata{
39+
{Keyspace: "system", Table: "local", Name: "key", Type: datatype.Varchar},
40+
{Keyspace: "system", Table: "local", Name: "rpc_address", Type: datatype.Inet},
41+
{Keyspace: "system", Table: "local", Name: "data_center", Type: datatype.Varchar},
42+
// The column "dse_version" is important for some DSE advance workloads esp. for graph to determine the graph
43+
// language.
44+
{Keyspace: "system", Table: "local", Name: "dse_version", Type: datatype.Varchar}, // DSE only
45+
{Keyspace: "system", Table: "local", Name: "rack", Type: datatype.Varchar},
46+
{Keyspace: "system", Table: "local", Name: "tokens", Type: datatype.NewSetType(datatype.Varchar)},
47+
{Keyspace: "system", Table: "local", Name: "release_version", Type: datatype.Varchar},
48+
{Keyspace: "system", Table: "local", Name: "partitioner", Type: datatype.Varchar},
49+
{Keyspace: "system", Table: "local", Name: "cluster_name", Type: datatype.Varchar},
50+
{Keyspace: "system", Table: "local", Name: "cql_version", Type: datatype.Varchar},
51+
{Keyspace: "system", Table: "local", Name: "schema_version", Type: datatype.Uuid},
52+
{Keyspace: "system", Table: "local", Name: "native_protocol_version", Type: datatype.Varchar},
53+
{Keyspace: "system", Table: "local", Name: "host_id", Type: datatype.Uuid},
54+
}
55+
3856
SystemPeersColumns = []*message.ColumnMetadata{
3957
{Keyspace: "system", Table: "peers", Name: "peer", Type: datatype.Inet},
4058
{Keyspace: "system", Table: "peers", Name: "rpc_address", Type: datatype.Inet},
@@ -46,6 +64,18 @@ var (
4664
{Keyspace: "system", Table: "peers", Name: "host_id", Type: datatype.Uuid},
4765
}
4866

67+
DseSystemPeersColumns = []*message.ColumnMetadata{
68+
{Keyspace: "system", Table: "peers", Name: "peer", Type: datatype.Inet},
69+
{Keyspace: "system", Table: "peers", Name: "rpc_address", Type: datatype.Inet},
70+
{Keyspace: "system", Table: "peers", Name: "data_center", Type: datatype.Varchar},
71+
{Keyspace: "system", Table: "peers", Name: "dse_version", Type: datatype.Varchar}, // DSE only
72+
{Keyspace: "system", Table: "peers", Name: "rack", Type: datatype.Varchar},
73+
{Keyspace: "system", Table: "peers", Name: "tokens", Type: datatype.NewSetType(datatype.Varchar)},
74+
{Keyspace: "system", Table: "peers", Name: "release_version", Type: datatype.Varchar},
75+
{Keyspace: "system", Table: "peers", Name: "schema_version", Type: datatype.Uuid},
76+
{Keyspace: "system", Table: "peers", Name: "host_id", Type: datatype.Uuid},
77+
}
78+
4979
SystemSchemaKeyspaces = []*message.ColumnMetadata{
5080
{Keyspace: "system", Table: "schema_keyspaces", Name: "keyspace_name", Type: datatype.Varchar},
5181
{Keyspace: "system", Table: "schema_keyspaces", Name: "durable_writes", Type: datatype.Boolean},

proxy/codecs.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,6 @@ func (c *partialQueryCodec) EncodedLength(_ message.Message, _ primitive.Protoco
4141
func (c *partialQueryCodec) Decode(source io.Reader, _ primitive.ProtocolVersion) (message.Message, error) {
4242
if query, err := primitive.ReadLongString(source); err != nil {
4343
return nil, err
44-
} else if query == "" {
45-
return nil, fmt.Errorf("cannot read QUERY empty query string")
4644
} else {
4745
return &partialQuery{query}, nil
4846
}

proxy/proxy.go

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ type Config struct {
6767
Resolver proxycore.EndpointResolver
6868
ReconnectPolicy proxycore.ReconnectPolicy
6969
RetryPolicy RetryPolicy
70+
IdempotentGraph bool
7071
NumConns int
7172
Logger *zap.Logger
7273
HeartBeatInterval time.Duration
@@ -99,6 +100,7 @@ type Proxy struct {
99100
closed chan struct{}
100101
localNode *node
101102
nodes []*node
103+
onceUsingGraphLog sync.Once
102104
}
103105

104106
type node struct {
@@ -448,6 +450,7 @@ func (p *Proxy) buildLocalRow() {
448450
"cql_version": p.encodeTypeFatal(datatype.Varchar, p.cluster.Info.CQLVersion),
449451
"schema_version": p.encodeTypeFatal(datatype.Uuid, schemaVersion), // TODO: Make this match the downstream cluster(s)
450452
"native_protocol_version": p.encodeTypeFatal(datatype.Varchar, p.cluster.NegotiatedVersion.String()),
453+
"dse_version": p.encodeTypeFatal(datatype.Varchar, p.cluster.Info.DSEVersion),
451454
}
452455
}
453456

@@ -495,6 +498,12 @@ func (p *Proxy) maybeStorePreparedIdempotence(raw *frame.RawFrame, msg message.M
495498
}
496499
}
497500

501+
func (p *Proxy) maybeLogUsingGraph() {
502+
p.onceUsingGraphLog.Do(func() {
503+
p.logger.Warn("graph queries default to *not* being considered idempotent and will not be retried automatically. Use the idempotent graph flag to override.")
504+
})
505+
}
506+
498507
func (p *Proxy) addClient(cl *client) {
499508
p.mu.Lock()
500509
defer p.mu.Unlock()
@@ -562,9 +571,9 @@ func (c *client) Receive(reader io.Reader) error {
562571
case *message.Prepare:
563572
c.handlePrepare(raw, msg)
564573
case *partialExecute:
565-
c.handleExecute(raw, msg)
574+
c.handleExecute(raw, msg, body.CustomPayload)
566575
case *partialQuery:
567-
c.handleQuery(raw, msg)
576+
c.handleQuery(raw, msg, body.CustomPayload)
568577
case *partialBatch:
569578
c.execute(raw, notDetermined, c.keyspace, msg)
570579
default:
@@ -644,16 +653,16 @@ func (c *client) handlePrepare(raw *frame.RawFrame, msg *message.Prepare) {
644653
}
645654
}
646655

647-
func (c *client) handleExecute(raw *frame.RawFrame, msg *partialExecute) {
656+
func (c *client) handleExecute(raw *frame.RawFrame, msg *partialExecute, customPayload map[string][]byte) {
648657
id := preparedIdKey(msg.queryId)
649658
if stmt, ok := c.preparedSystemQuery[id]; ok {
650659
c.interceptSystemQuery(raw.Header, stmt)
651660
} else {
652-
c.execute(raw, notDetermined, "", msg)
661+
c.execute(raw, c.getDefaultIdempotency(customPayload), "", msg)
653662
}
654663
}
655664

656-
func (c *client) handleQuery(raw *frame.RawFrame, msg *partialQuery) {
665+
func (c *client) handleQuery(raw *frame.RawFrame, msg *partialQuery, customPayload map[string][]byte) {
657666
c.proxy.logger.Debug("handling query", zap.String("query", msg.query), zap.Int16("stream", raw.Header.StreamId))
658667

659668
handled, stmt, err := parser.IsQueryHandled(parser.IdentifierFromString(c.keyspace), msg.query)
@@ -666,12 +675,25 @@ func (c *client) handleQuery(raw *frame.RawFrame, msg *partialQuery) {
666675
c.interceptSystemQuery(raw.Header, stmt)
667676
}
668677
} else {
669-
c.execute(raw, notDetermined, c.keyspace, msg)
678+
c.execute(raw, c.getDefaultIdempotency(customPayload), c.keyspace, msg)
670679
}
671680
}
672681

673-
func (c *client) filterSystemLocalValues(stmt *parser.SelectStatement) (row []message.Column, err error) {
674-
return parser.FilterValues(stmt, parser.SystemLocalColumns, func(name string) (value message.Column, err error) {
682+
func (c *client) getDefaultIdempotency(customPayload map[string][]byte) idempotentState {
683+
state := notDetermined
684+
if _, ok := customPayload["graph-source"]; ok { // Graph queries default to non-idempotent unless overridden
685+
c.proxy.maybeLogUsingGraph()
686+
if c.proxy.config.IdempotentGraph {
687+
state = isIdempotent
688+
} else {
689+
state = notIdempotent
690+
}
691+
}
692+
return state
693+
}
694+
695+
func (c *client) filterSystemLocalValues(stmt *parser.SelectStatement, filtered []*message.ColumnMetadata) (row []message.Column, err error) {
696+
return parser.FilterValues(stmt, filtered, func(name string) (value message.Column, err error) {
675697
if name == "rpc_address" {
676698
return proxycore.EncodeType(datatype.Inet, c.proxy.cluster.NegotiatedVersion, c.localIP())
677699
} else if name == "host_id" {
@@ -701,8 +723,8 @@ func (c *client) localIP() net.IP {
701723
}
702724
}
703725

704-
func (c *client) filterSystemPeerValues(stmt *parser.SelectStatement, peer *node, peerCount int) (row []message.Column, err error) {
705-
return parser.FilterValues(stmt, parser.SystemPeersColumns, func(name string) (value message.Column, err error) {
726+
func (c *client) filterSystemPeerValues(stmt *parser.SelectStatement, filtered []*message.ColumnMetadata, peer *node, peerCount int) (row []message.Column, err error) {
727+
return parser.FilterValues(stmt, filtered, func(name string) (value message.Column, err error) {
706728
if name == "data_center" {
707729
return proxycore.EncodeType(datatype.Varchar, c.proxy.cluster.NegotiatedVersion, peer.dc)
708730
} else if name == "host_id" {
@@ -727,9 +749,13 @@ func (c *client) interceptSystemQuery(hdr *frame.Header, stmt interface{}) {
727749
switch s := stmt.(type) {
728750
case *parser.SelectStatement:
729751
if s.Table == "local" {
730-
if columns, err := parser.FilterColumns(s, parser.SystemLocalColumns); err != nil {
752+
localColumns := parser.SystemLocalColumns
753+
if len(c.proxy.cluster.Info.DSEVersion) > 0 {
754+
localColumns = parser.DseSystemLocalColumns
755+
}
756+
if columns, err := parser.FilterColumns(s, localColumns); err != nil {
731757
c.send(hdr, &message.Invalid{ErrorMessage: err.Error()})
732-
} else if row, err := c.filterSystemLocalValues(s); err != nil {
758+
} else if row, err := c.filterSystemLocalValues(s, columns); err != nil {
733759
c.send(hdr, &message.Invalid{ErrorMessage: err.Error()})
734760
} else {
735761
c.send(hdr, &message.RowsResult{
@@ -741,14 +767,18 @@ func (c *client) interceptSystemQuery(hdr *frame.Header, stmt interface{}) {
741767
})
742768
}
743769
} else if s.Table == "peers" {
744-
if columns, err := parser.FilterColumns(s, parser.SystemPeersColumns); err != nil {
770+
peersColumns := parser.SystemPeersColumns
771+
if len(c.proxy.cluster.Info.DSEVersion) > 0 {
772+
peersColumns = parser.DseSystemPeersColumns
773+
}
774+
if columns, err := parser.FilterColumns(s, peersColumns); err != nil {
745775
c.send(hdr, &message.Invalid{ErrorMessage: err.Error()})
746776
} else {
747777
var data []message.Row
748778
for _, n := range c.proxy.nodes {
749779
if n != c.proxy.localNode {
750780
var row message.Row
751-
row, err = c.filterSystemPeerValues(s, n, len(c.proxy.nodes)-1)
781+
row, err = c.filterSystemPeerValues(s, columns, n, len(c.proxy.nodes)-1)
752782
if err != nil {
753783
break
754784
}

proxy/proxy_retries_test.go

Lines changed: 73 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func TestProxy_Retries(t *testing.T) {
6969
2, // Retried on all remaining nodes
7070
},
7171
{
72-
"error response (truncate) w/ non-idempotent query, retry until succeeds or exhausts query plan",
72+
"error response (truncate) w/ non-idempotent query, don't retry",
7373
nonIdempotentQuery,
7474
&message.TruncateError{ErrorMessage: "Truncate"},
7575
1, // Tried the queried on the first node and it failed
@@ -316,13 +316,73 @@ func TestProxy_BatchRetries(t *testing.T) {
316316
}
317317
}
318318

319-
func testProxyRetry(t *testing.T, query message.Message, response message.Error, testMessage string) (numNodesTried, retryCount int, responseError error) {
319+
func TestProxy_RetryGraphQueries(t *testing.T) {
320+
var tests = []struct {
321+
msg string
322+
query string
323+
graph bool
324+
cfg *proxyTestConfig
325+
response message.Error
326+
numNodesTried int
327+
retryCount int
328+
}{
329+
{
330+
"error response (truncate) w/ graph query, not retried",
331+
"g.V().has('person', 'name', 'mike')",
332+
true,
333+
nil,
334+
&message.TruncateError{ErrorMessage: "Truncate"},
335+
1, // Tried on the first node and fails
336+
0, // Not retried because graph queries are not considered idempotent
337+
},
338+
{
339+
"error response (truncate) w/ graph query and idempotent override; retried on all nodes",
340+
"g.V().has('person', 'name', 'mike')",
341+
true,
342+
&proxyTestConfig{idempotentGraph: true}, // Override to consider graph queries as idempotent
343+
&message.TruncateError{ErrorMessage: "Truncate"},
344+
3, // Tried on all nodes because of the idempotent override
345+
2, // Retried twice after the initial failure
346+
},
347+
{
348+
"error response (truncate) w/ non-idempotent, non-graph query, should *not* be retried",
349+
nonIdempotentQuery,
350+
false,
351+
&proxyTestConfig{idempotentGraph: true}, // Override to consider graph queries as idempotent, but not CQL
352+
&message.TruncateError{ErrorMessage: "Truncate"},
353+
1, // Tried on the first node and fails
354+
0, // Not retried because graph queries are not considered idempotent
355+
},
356+
}
357+
358+
for _, tt := range tests {
359+
frm := frame.NewFrame(primitive.ProtocolVersion4, -1, &message.Query{Query: tt.query})
360+
if tt.graph {
361+
frm.SetCustomPayload(map[string][]byte{"graph-source": []byte("g")}) // This is used by the proxy to determine if it's a graph query
362+
}
363+
364+
numNodesTried, retryCount, err := testProxyRetryWithConfig(t, frm, tt.response, tt.cfg, tt.msg)
365+
366+
assert.Error(t, err, tt.msg)
367+
assert.IsType(t, err, &proxycore.CqlError{}, tt.msg)
368+
assert.Equal(t, tt.numNodesTried, numNodesTried, tt.msg)
369+
assert.Equal(t, tt.retryCount, retryCount, tt.msg)
370+
}
371+
}
372+
373+
func testProxyRetryWithConfig(t *testing.T, query *frame.Frame, response message.Error, cfg *proxyTestConfig, testMessage string) (numNodesTried, retryCount int, responseError error) {
374+
ctx, cancel := context.WithCancel(context.Background())
375+
defer cancel()
376+
320377
var mu sync.Mutex
321378
tried := make(map[string]int)
322379
prepared := make(map[[16]byte]string)
323380

324-
ctx, cancel := context.WithCancel(context.Background())
325-
tester, proxyContactPoint, err := setupProxyTest(ctx, 3, proxycore.MockRequestHandlers{
381+
if cfg == nil {
382+
cfg = &proxyTestConfig{}
383+
}
384+
385+
cfg.handlers = proxycore.MockRequestHandlers{
326386
primitive.OpCodeQuery: func(cl *proxycore.MockClient, frm *frame.Frame) message.Message {
327387
if msg := cl.InterceptQuery(frm.Header, frm.Body.Message.(*message.Query)); msg != nil {
328388
return msg
@@ -378,7 +438,9 @@ func testProxyRetry(t *testing.T, query message.Message, response message.Error,
378438
PreparedQueryId: id[:],
379439
}
380440
},
381-
})
441+
}
442+
443+
tester, proxyContactPoint, err := setupProxyTestWithConfig(ctx, 3, cfg)
382444
defer func() {
383445
cancel()
384446
tester.shutdown()
@@ -387,7 +449,7 @@ func testProxyRetry(t *testing.T, query message.Message, response message.Error,
387449

388450
cl := connectTestClient(t, ctx, proxyContactPoint)
389451

390-
_, err = cl.Query(ctx, primitive.ProtocolVersion4, query)
452+
_, err = cl.QueryFrame(ctx, query)
391453

392454
if cqlErr, ok := err.(*proxycore.CqlError); ok {
393455
if unprepared, ok := cqlErr.Message.(*message.Unprepared); ok {
@@ -397,7 +459,7 @@ func testProxyRetry(t *testing.T, query message.Message, response message.Error,
397459
if err != nil {
398460
return 0, 0, err
399461
}
400-
_, err = cl.Query(ctx, primitive.ProtocolVersion4, query)
462+
_, err = cl.QueryFrame(ctx, query)
401463
}
402464
}
403465

@@ -407,3 +469,7 @@ func testProxyRetry(t *testing.T, query message.Message, response message.Error,
407469
}
408470
return len(tried), retryCount - 1, err
409471
}
472+
473+
func testProxyRetry(t *testing.T, query message.Message, response message.Error, testMessage string) (numNodesTried, retryCount int, responseError error) {
474+
return testProxyRetryWithConfig(t, frame.NewFrame(primitive.ProtocolVersion4, -1, query), response, nil, testMessage)
475+
}

0 commit comments

Comments
 (0)