Skip to content

Commit 384ba23

Browse files
authored
Add support for a per-operation context (#1619)
The goal of this commit is to allow low-level code, typically at the level of file reads (say, a custom `FileChannel` implementation), to obtain context on which higher level operation they correspond to. A typical use case may for tiered storage where `FileChannel` implementations may forward reads to remote storage, and where having context on the overall operation could help, say, take prior work done to set timeout, or aggregate metrics per "user-level" operations (rather than per `FileChannel` operation). To do this, we reuse the existing `ExecutorLocals` mechanism, and simply have the top-level operation setup the proper context as an `ExecutorLocal`, allowing it to be accessed by any low-level operations operating on behalf of that operation. This is, in many way, similar to tracing, but instead of a `TraceState` that collect what happens during the operation, it is a relatively flexible notion of `OperationContext`. As of this patch, this feature is fairly barebone and mostly exists for extensions in the following sense: 1. only user reads (`ReadCommand` execution) currently sets up such a context. The code is written in such a way that adding support for other operations should be easy, but this is not done. 2. the context set by reads by default has barely any information: it merely has a `toString()` method that can roughly tell what the operation itself is, and so could have use for debugging, but not much more. Further, that context is not read by anything in this patch. However, said context are created through a "factory" and the factory class is configurable through a system property. So extension can override the factory in order to create contexts with more information/state, and they fetch/use those context where they see fit.
1 parent 4c532d2 commit 384ba23

File tree

8 files changed

+230
-14
lines changed

8 files changed

+230
-14
lines changed

src/java/org/apache/cassandra/concurrent/ExecutorLocal.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@
1919
package org.apache.cassandra.concurrent;
2020

2121
import org.apache.cassandra.service.ClientWarn;
22+
import org.apache.cassandra.service.context.OperationContextTracker;
2223
import org.apache.cassandra.tracing.Tracing;
2324
import org.apache.cassandra.sensors.RequestTracker;
2425

2526
public interface ExecutorLocal<T>
2627
{
27-
ExecutorLocal[] all = { Tracing.instance, ClientWarn.instance, RequestTracker.instance };
28+
ExecutorLocal[] all = { Tracing.instance, ClientWarn.instance, RequestTracker.instance, OperationContextTracker.instance };
2829

2930
/**
3031
* This is called when scheduling the task, and also before calling {@link #set(Object)} when running on a

src/java/org/apache/cassandra/concurrent/ExecutorLocals.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.apache.cassandra.sensors.RequestSensors;
2424
import org.apache.cassandra.sensors.RequestTracker;
2525
import org.apache.cassandra.service.ClientWarn;
26+
import org.apache.cassandra.service.context.OperationContext;
27+
import org.apache.cassandra.service.context.OperationContextTracker;
2628
import org.apache.cassandra.tracing.TraceState;
2729
import org.apache.cassandra.tracing.Tracing;
2830

@@ -37,21 +39,24 @@ public class ExecutorLocals
3739
private static final ExecutorLocal<TraceState> tracing = Tracing.instance;
3840
private static final ExecutorLocal<ClientWarn.State> clientWarn = ClientWarn.instance;
3941
private static final ExecutorLocal<RequestSensors> requestTracker = RequestTracker.instance;
42+
private static final ExecutorLocal<OperationContext> operationContextTracker = OperationContextTracker.instance;
4043

4144
public final TraceState traceState;
4245
public final ClientWarn.State clientWarnState;
4346
public final RequestSensors sensors;
47+
public final OperationContext operationContext;
4448

45-
private ExecutorLocals(TraceState traceState, ClientWarn.State clientWarnState, RequestSensors sensors)
49+
private ExecutorLocals(TraceState traceState, ClientWarn.State clientWarnState, RequestSensors sensors, OperationContext operationContext)
4650
{
4751
this.traceState = traceState;
4852
this.clientWarnState = clientWarnState;
4953
this.sensors = sensors;
54+
this.operationContext = operationContext;
5055
}
5156

5257
static
5358
{
54-
assert Arrays.equals(ExecutorLocal.all, new ExecutorLocal[]{ tracing, clientWarn, requestTracker })
59+
assert Arrays.equals(ExecutorLocal.all, new ExecutorLocal[]{ tracing, clientWarn, requestTracker, operationContextTracker })
5560
: "ExecutorLocals has not been updated to reflect new ExecutorLocal.all";
5661
}
5762

@@ -68,29 +73,32 @@ public static ExecutorLocals create()
6873
TraceState traceState = tracing.get();
6974
ClientWarn.State clientWarnState = clientWarn.get();
7075
RequestSensors sensors = requestTracker.get();
71-
if (traceState == null && clientWarnState == null && sensors == null)
76+
OperationContext operationContext = operationContextTracker.get();
77+
if (traceState == null && clientWarnState == null && sensors == null && operationContext == null)
7278
return null;
7379
else
74-
return new ExecutorLocals(traceState, clientWarnState, sensors);
80+
return new ExecutorLocals(traceState, clientWarnState, sensors, operationContext);
7581
}
7682

77-
public static ExecutorLocals create(TraceState traceState, ClientWarn.State clientWarnState, RequestSensors sensors)
83+
public static ExecutorLocals create(TraceState traceState, ClientWarn.State clientWarnState, RequestSensors sensors, OperationContext operationContext)
7884
{
79-
return new ExecutorLocals(traceState, clientWarnState, sensors);
85+
return new ExecutorLocals(traceState, clientWarnState, sensors, operationContext);
8086
}
8187

8288
public static ExecutorLocals create(TraceState traceState)
8389
{
8490
ClientWarn.State clientWarnState = clientWarn.get();
8591
RequestSensors sensors = requestTracker.get();
86-
return new ExecutorLocals(traceState, clientWarnState, sensors);
92+
OperationContext operationContext = operationContextTracker.get();
93+
return new ExecutorLocals(traceState, clientWarnState, sensors, operationContext);
8794
}
8895

8996
public static ExecutorLocals create(RequestSensors sensors)
9097
{
9198
TraceState traceState = tracing.get();
9299
ClientWarn.State clientWarnState = clientWarn.get();
93-
return new ExecutorLocals(traceState, clientWarnState, sensors);
100+
OperationContext operationContext = operationContextTracker.get();
101+
return new ExecutorLocals(traceState, clientWarnState, sensors, operationContext);
94102
}
95103

96104
public static void set(ExecutorLocals locals)

src/java/org/apache/cassandra/config/CassandraRelevantProperties.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.cassandra.metrics.TableMetrics;
2828
import org.apache.cassandra.net.MessagingService;
2929
import org.apache.cassandra.sensors.SensorsFactory;
30+
import org.apache.cassandra.service.context.OperationContext;
3031
import org.apache.cassandra.service.reads.range.EndpointGroupingRangeCommandIterator;
3132

3233
/** A class that extracts system properties for the cassandra node it runs within. */
@@ -600,7 +601,13 @@ public enum CassandraRelevantProperties
600601
* Do not try to calculate optimal streaming candidates. This can take a lot of time in some configs specially
601602
* with vnodes.
602603
*/
603-
SKIP_OPTIMAL_STREAMING_CANDIDATES_CALCULATION("cassandra.skip_optimal_streaming_candidates_calculation", "false");
604+
SKIP_OPTIMAL_STREAMING_CANDIDATES_CALCULATION("cassandra.skip_optimal_streaming_candidates_calculation", "false"),
605+
606+
/**
607+
* Allows custom implementation of {@link OperationContext.Factory} to optionally create and configure custom
608+
* {@link OperationContext} instances.
609+
*/
610+
OPERATION_CONTEXT_FACTORY("cassandra.operation_context_factory_class");
604611

605612
CassandraRelevantProperties(String key, String defaultVal)
606613
{

src/java/org/apache/cassandra/db/ReadExecutionController.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.apache.cassandra.index.Index;
2828
import org.apache.cassandra.metrics.DecayingEstimatedHistogramReservoir;
2929
import org.apache.cassandra.schema.TableMetadata;
30+
import org.apache.cassandra.service.context.OperationContext;
31+
import org.apache.cassandra.service.context.OperationContextTracker;
3032
import org.apache.cassandra.tracing.Tracing;
3133
import org.apache.cassandra.utils.MonotonicClock;
3234
import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -143,6 +145,8 @@ static ReadExecutionController forCommand(ReadCommand command, boolean trackRepa
143145

144146
long createdAtNanos = baseCfs.metric.topLocalReadQueryTime.isEnabled() ? clock.now() : NO_SAMPLING;
145147

148+
OperationContextTracker.start(OperationContext.FACTORY.forRead(command, baseCfs));
149+
146150
if (indexCfs == null)
147151
return new ReadExecutionController(command, baseCfs.readOrdering.start(), baseCfs.metadata(), null, null, createdAtNanos, trackRepairedStatus);
148152

@@ -176,6 +180,7 @@ static ReadExecutionController forCommand(ReadCommand command, boolean trackRepa
176180
if (indexController != null)
177181
indexController.close();
178182
}
183+
OperationContextTracker.endCurrent();
179184
throw e;
180185
}
181186
}
@@ -217,6 +222,8 @@ public void close()
217222
}
218223
}
219224

225+
OperationContextTracker.endCurrent();
226+
220227
if (createdAtNanos != NO_SAMPLING)
221228
addSample();
222229

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.service.context;
20+
21+
import java.util.function.Supplier;
22+
import org.apache.cassandra.db.ColumnFamilyStore;
23+
import org.apache.cassandra.db.ReadCommand;
24+
25+
/**
26+
* Default implementation of {@link OperationContext}.
27+
* <p>
28+
* This default implementation is mostly only useful for debugging as the only concrete method is provices is a
29+
* {@link #toString()} method giving details on the operation the context corresponds to (though the context object
30+
* also identify the operation, so it could also theoretically be used from 2 separate place in the code to decide
31+
* if they execute as part of the same operation).
32+
*/
33+
public class DefaultOperationContext implements OperationContext
34+
{
35+
private final Supplier<String> toDebugString;
36+
37+
private DefaultOperationContext(Supplier<String> toDebugString)
38+
{
39+
this.toDebugString = toDebugString;
40+
}
41+
42+
@Override
43+
public void close()
44+
{
45+
}
46+
47+
@Override
48+
public String toString()
49+
{
50+
return String.format("[%d] %s", System.identityHashCode(this), toDebugString.get());
51+
}
52+
53+
/**
54+
* Simple default implementation of {@link OperationContext.Factory} that creates {@link DefaultOperationContext}.
55+
*/
56+
static class Factory implements OperationContext.Factory
57+
{
58+
@Override
59+
public OperationContext forRead(ReadCommand command, ColumnFamilyStore cfs)
60+
{
61+
return new DefaultOperationContext(command::toCQLString);
62+
}
63+
}
64+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.service.context;
20+
21+
import org.apache.cassandra.concurrent.ExecutorLocal;
22+
import org.apache.cassandra.db.ColumnFamilyStore;
23+
import org.apache.cassandra.db.ReadCommand;
24+
import org.apache.cassandra.utils.FBUtilities;
25+
26+
import static org.apache.cassandra.config.CassandraRelevantProperties.OPERATION_CONTEXT_FACTORY;
27+
28+
/**
29+
* Represents some context about a "top-level" operation.
30+
* <p>
31+
* This interface is fairly open on purpose, as implementations for different operations could look fairly different.
32+
* But it is also open-ended as it is an extension point: the {@link #FACTORY} used to create the context instances
33+
* is configurable, and meant to allow extensions to add whatever information they need to the context.
34+
* <p>
35+
* Also note that what consistute a "top-level" operation is not strictly defined. At the time of this writing, those
36+
* context are not serialized across nodes, so "top-level" is understood as "for a node", and so correspond to
37+
* operations like "a `ReadCommand` execution on a replica".
38+
* <p>
39+
* The context of executing operation is tracked by {@link OperationContextTracker} which use the {@link ExecutorLocal}
40+
* concept to make that context available to any methods that execute as part of the operation. Basically, this is a way
41+
* to make the context available everwhere along the path of execution of the operation, without needing to pass that
42+
* context as argument of every single method that could be involved by the operation execution (which in most cases
43+
* would be <b>a lot of methods</b>).
44+
*/
45+
public interface OperationContext extends AutoCloseable
46+
{
47+
Factory FACTORY = OPERATION_CONTEXT_FACTORY.getString() == null
48+
? new DefaultOperationContext.Factory()
49+
: FBUtilities.construct(OPERATION_CONTEXT_FACTORY.getString(), "operation context factory");
50+
51+
52+
/**
53+
* Called when the operation this is a context of terminates, and thus when the context will not be used/retrieved
54+
* anymore.
55+
*/
56+
@Override
57+
void close();
58+
59+
/**
60+
* Factory used to create {@link OperationContext} instances.
61+
* <p>
62+
* The intent is that every operation that wants to set a context should have its own method in this interface, but
63+
* operations are added as needed (instead of trying to cover every possible operation upfront).
64+
* <p>
65+
* Do note however that there can only be one operation context "active" at any given time (meaning, any thread
66+
* execute can only see at most one context), so the context should be set at the higher level that make sense
67+
* (and if necessary, sub-operations can enrich the context of their parent, assuming the parent context make room
68+
* for this).
69+
*/
70+
interface Factory
71+
{
72+
OperationContext forRead(ReadCommand command, ColumnFamilyStore cfs);
73+
}
74+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.service.context;
20+
21+
import io.netty.util.concurrent.FastThreadLocal;
22+
import org.apache.cassandra.concurrent.ExecutorLocal;
23+
24+
public class OperationContextTracker implements ExecutorLocal<OperationContext>
25+
{
26+
public static final OperationContextTracker instance = new OperationContextTracker();
27+
private final FastThreadLocal<OperationContext> current = new FastThreadLocal<>();
28+
29+
@Override
30+
public OperationContext get()
31+
{
32+
return current.get();
33+
}
34+
35+
@Override
36+
public void set(OperationContext value)
37+
{
38+
current.set(value);
39+
}
40+
41+
public static void start(OperationContext context)
42+
{
43+
instance.set(context);
44+
}
45+
46+
public static void endCurrent()
47+
{
48+
OperationContext ctx = instance.get();
49+
if (ctx != null)
50+
{
51+
ctx.close();
52+
instance.current.remove();
53+
}
54+
}
55+
}

src/java/org/apache/cassandra/utils/concurrent/Timer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.cassandra.concurrent.ExecutorLocals;
3232
import org.apache.cassandra.sensors.RequestSensors;
3333
import org.apache.cassandra.service.ClientWarn;
34+
import org.apache.cassandra.service.context.OperationContext;
3435
import org.apache.cassandra.tracing.TraceState;
3536

3637
/**
@@ -70,13 +71,12 @@ public Future<Void> onTimeout(Runnable task, long timeout, TimeUnit unit)
7071
*/
7172
public Future<Void> onTimeout(Runnable task, long timeout, TimeUnit unit, ExecutorLocals executorLocals)
7273
{
73-
ClientWarn.State clientWarnState = executorLocals == null ? null : executorLocals.clientWarnState;
74-
TraceState traceState = executorLocals == null ? null : executorLocals.traceState;
75-
RequestSensors sensors = executorLocals == null ? null : executorLocals.sensors;
74+
7675
CompletableFuture<Void> result = new CompletableFuture<>();
7776
Timeout handle = timer.newTimeout(ignored ->
7877
{
79-
ExecutorLocals.set(ExecutorLocals.create(traceState, clientWarnState, sensors));
78+
if (executorLocals != null)
79+
ExecutorLocals.set(executorLocals);
8080
try
8181
{
8282
task.run();

0 commit comments

Comments
 (0)