Skip to content

Commit e29108a

Browse files
committed
[feature][dingo-common] Add Executor memory manager
1 parent 955396c commit e29108a

File tree

69 files changed

+3014
-213
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

69 files changed

+3014
-213
lines changed

dingo-calcite/src/main/java/io/dingodb/calcite/visitor/DingoJobVisitor.java

Lines changed: 11 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@
110110
import io.dingodb.calcite.visitor.function.DingoVectorVisitFun;
111111
import io.dingodb.calcite.visitor.function.DingoWindowVisitFun;
112112
import io.dingodb.common.ExecuteVariables;
113+
import io.dingodb.common.ExecutionContext;
113114
import io.dingodb.common.Location;
114115
import io.dingodb.common.log.LogUtils;
115116
import io.dingodb.exec.base.IdGenerator;
@@ -144,9 +145,6 @@ public class DingoJobVisitor implements DingoRelVisitor<Collection<Vertex>> {
144145
@Setter
145146
private boolean isScan;
146147

147-
@Getter
148-
private ExecuteVariables executeVariables;
149-
150148
@Getter
151149
private long pointTs;
152150

@@ -162,54 +160,43 @@ public class DingoJobVisitor implements DingoRelVisitor<Collection<Vertex>> {
162160
@Getter
163161
private long updateLimit;
164162

165-
@Getter
166-
private String user;
167-
168-
@Getter
169-
private String host;
170-
171163
private DingoJobVisitor(Job job, IdGenerator idGenerator, Location currentLocation, ITransaction transaction,
172-
SqlKind kind, ExecuteVariables executeVariables, long pointTs, boolean forUpdate,
173-
boolean replaceInto, boolean isIgnore, long updateLimit, String user, String host) {
164+
SqlKind kind, long pointTs, boolean forUpdate,
165+
boolean replaceInto, boolean isIgnore, long updateLimit) {
174166
this.job = job;
175167
this.idGenerator = idGenerator;
176168
this.currentLocation = currentLocation;
177169
this.transaction = transaction;
178170
this.kind = kind;
179-
this.executeVariables = executeVariables;
180171
this.pointTs = pointTs;
181172
this.forUpdate = forUpdate;
182173
this.replaceInto = replaceInto;
183174
this.isIgnore = isIgnore;
184175
this.updateLimit = updateLimit;
185-
this.user = user;
186-
this.host = host;
187176
}
188177

189178
public static void renderJob(JobManager jobManager, Job job, RelNode input, Location currentLocation) {
190179
String user = "root";
191180
String host = "%";
192-
renderJob(jobManager, job, input, currentLocation, false, null, null,
193-
new ExecuteVariables(), user, host);
181+
renderJob(jobManager, job, input, currentLocation, false, null, null);
194182
}
195183

196184
public static void renderJob(JobManager jobManager, Job job, RelNode input, Location currentLocation,
197-
boolean checkRoot, ITransaction transaction, SqlKind kind,
198-
ExecuteVariables executeVariables, String user, String host) {
185+
boolean checkRoot, ITransaction transaction, SqlKind kind) {
199186
renderJob(jobManager, job, input, currentLocation, checkRoot, transaction, kind,
200-
executeVariables, 0, false, false, false, -1, user, host);
187+
0, false, false, false, -1);
201188
}
202189

203190
public static void renderJob(JobManager jobManager, Job job, RelNode input, Location currentLocation,
204191
boolean checkRoot, ITransaction transaction, SqlKind kind,
205-
ExecuteVariables executeVariables, long pointTs, boolean forUpdate,
206-
boolean replaceInto, boolean isIgnore, long updateLimit, String user, String host) {
192+
long pointTs, boolean forUpdate,
193+
boolean replaceInto, boolean isIgnore, long updateLimit) {
207194
try {
208195
IdGenerator idGenerator = new IdGeneratorImpl(job.getJobId().seq);
209196
DingoJobVisitor visitor = new DingoJobVisitor(
210197
job, idGenerator, currentLocation, transaction, kind,
211-
executeVariables, pointTs, forUpdate, replaceInto, isIgnore,
212-
updateLimit, user, host
198+
pointTs, forUpdate, replaceInto, isIgnore,
199+
updateLimit
213200
);
214201
Collection<Vertex> outputs = dingo(input).accept(visitor);
215202
if (checkRoot && !outputs.isEmpty()) {
@@ -444,7 +431,7 @@ public Collection<Vertex> visit(@NonNull DingoWindow dingoWindow) {
444431

445432
public Collection<Vertex> visit(@NonNull DingoRepeatUnion dingoRepeatUnion) {
446433
return DingoRepeatUnionVisitFun.visit(
447-
job, idGenerator, currentLocation, this, transaction, dingoRepeatUnion, executeVariables
434+
job, idGenerator, currentLocation, this, transaction, dingoRepeatUnion
448435
);
449436
}
450437

dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoFunctionScanVisitFun.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public static Collection<Vertex> visit(
7272
false,
7373
false,
7474
null,
75-
visitor.getExecuteVariables().getConcurrencyLevel()
75+
job.getExecutionContext().getConcurrencyLevel()
7676
);
7777
Vertex calcVertex = new Vertex(CALC_DISTRIBUTION_1, distributionParam);
7878
Task task = job.getOrCreate(currentLocation, idGenerator);

dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoGetByIndexMergeVisitFun.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public static Collection<Vertex> visit(
114114
false,
115115
false,
116116
keyTuple,
117-
visitor.getExecuteVariables().getConcurrencyLevel()
117+
job.getExecutionContext().getConcurrencyLevel()
118118
));
119119

120120
Task task;

dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoGetByIndexVisitFun.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public static LinkedList<Vertex> visit(
112112
false,
113113
false,
114114
keyTuple,
115-
visitor.getExecuteVariables().getConcurrencyLevel()
115+
job.getExecutionContext().getConcurrencyLevel()
116116
));
117117

118118
Task task;

dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoHashJoinVisitFun.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,8 @@ public static List<Vertex> visit(
116116
TupleMapping.of(rightKeys), rel.getLeft().getRowType().getFieldCount(),
117117
rel.getRight().getRowType().getFieldCount(),
118118
rel.getJoinType() == JoinRelType.LEFT || rel.getJoinType() == JoinRelType.FULL,
119-
rel.getJoinType() == JoinRelType.RIGHT || rel.getJoinType() == JoinRelType.FULL
119+
rel.getJoinType() == JoinRelType.RIGHT || rel.getJoinType() == JoinRelType.FULL,
120+
job.getExecutionContext()
120121
);
121122
param.setJoinType(rel.getJoinType().lowerName);
122123
param.setOtherExpr(otherCondition);

dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoIndexFullScanVisitFun.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ private DingoIndexFullScanVisitFun() {
9595
Optional.mapOrGet(rel.getFilter(), __ -> __.getKind() == SqlKind.NOT, () -> false),
9696
false,
9797
null,
98-
visitor.getExecuteVariables().getConcurrencyLevel()
98+
job.getExecutionContext().getConcurrencyLevel()
9999
);
100100
distributionParam.setKeepOrder(rel.getKeepSerialOrder());
101101
Vertex calcVertex = new Vertex(CALC_DISTRIBUTION_1, distributionParam);

dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoIndexRangeScanVisitFun.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ private DingoIndexRangeScanVisitFun() {
149149
Optional.mapOrGet(rel.getFilter(), __ -> __.getKind() == SqlKind.NOT, () -> false),
150150
false,
151151
null,
152-
visitor.getExecuteVariables().getConcurrencyLevel()
152+
job.getExecutionContext().getConcurrencyLevel()
153153
);
154154
distributionParam.setKeepOrder(rel.getKeepSerialOrder());
155155
Vertex calcVertex = new Vertex(CALC_DISTRIBUTION_1, distributionParam);

dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoIndexScanWithRelOpVisitFun.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ private DingoIndexScanWithRelOpVisitFun() {
105105
outputs.add(createVerticesForRange(
106106
task,
107107
idGenerator,
108-
(start, end) -> createCalcRangeDistributionVertex(rel, start, end, false, visitor),
108+
(start, end) -> createCalcRangeDistributionVertex(rel, start, end, false, visitor, job),
109109
null,
110110
null,
111111
scanVertexCreator
@@ -116,7 +116,7 @@ private DingoIndexScanWithRelOpVisitFun() {
116116
outputs.add(createVerticesForRange(
117117
task,
118118
idGenerator,
119-
(start, end) -> createCalcDistributionVertex(rel, start, end, false, visitor),
119+
(start, end) -> createCalcDistributionVertex(rel, start, end, false, visitor, job),
120120
null,
121121
null,
122122
scanVertexCreator
@@ -126,7 +126,7 @@ private DingoIndexScanWithRelOpVisitFun() {
126126
outputs.add(createVerticesForRange(
127127
task,
128128
idGenerator,
129-
(start, end) -> createCalcRangeDistributionVertex(rel, start, end, false, visitor),
129+
(start, end) -> createCalcRangeDistributionVertex(rel, start, end, false, visitor, job),
130130
null,
131131
null,
132132
scanVertexCreator
@@ -140,7 +140,7 @@ private DingoIndexScanWithRelOpVisitFun() {
140140
outputs.add(createVerticesForRange(
141141
task,
142142
idGenerator,
143-
(start, end) -> createCalcDistributionVertex(rel, start, end, false, visitor),
143+
(start, end) -> createCalcDistributionVertex(rel, start, end, false, visitor, job),
144144
partition.getStart(),
145145
i < partitionNum - 1 ? partitions.get(i + 1).getStart() : null,
146146
scanVertexCreator
@@ -274,7 +274,8 @@ private DingoIndexScanWithRelOpVisitFun() {
274274
byte[] startKey,
275275
byte[] endKey,
276276
boolean withEnd,
277-
DingoJobVisitor visitor
277+
DingoJobVisitor visitor,
278+
Job job
278279
) {
279280
MetaService metaService = MetaService.root(visitor.getPointTs());
280281
final IndexTable td = rel.getIndexTable();
@@ -298,7 +299,7 @@ private DingoIndexScanWithRelOpVisitFun() {
298299
Optional.mapOrGet(rel.getFilter(), __ -> __.getKind() == SqlKind.NOT, () -> false),
299300
false,
300301
null,
301-
visitor.getExecuteVariables().getConcurrencyLevel()
302+
job.getExecutionContext().getConcurrencyLevel()
302303
);
303304
distributionParam.setKeepOrder(rel.getKeepSerialOrder());
304305
distributionParam.setFilterRange(rel.isRangeScan());
@@ -310,7 +311,8 @@ private DingoIndexScanWithRelOpVisitFun() {
310311
byte[] startKey,
311312
byte[] endKey,
312313
boolean withEnd,
313-
DingoJobVisitor visitor
314+
DingoJobVisitor visitor,
315+
Job job
314316
) {
315317
MetaService metaService = MetaService.root(visitor.getPointTs());
316318
final IndexTable td = rel.getIndexTable();
@@ -341,7 +343,7 @@ private DingoIndexScanWithRelOpVisitFun() {
341343
Optional.mapOrGet(rel.getFilter(), __ -> __.getKind() == SqlKind.NOT, () -> false),
342344
false,
343345
null,
344-
visitor.getExecuteVariables().getConcurrencyLevel()
346+
job.getExecutionContext().getConcurrencyLevel()
345347
);
346348
boolean filterRange = false;
347349
distributionParam.setKeepOrder(rel.getKeepSerialOrder());

dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoInfoSchemaScanVisitFun.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ public static List<Vertex> visit(
5959
} else {
6060
tableName = td.getName();
6161
}
62-
String user = visitor.getUser();
63-
String host = visitor.getHost();
62+
String user = job.getExecutionContext().getUser();
63+
String host = job.getExecutionContext().getHost();
6464
InfoSchemaScanParam param = new InfoSchemaScanParam(
6565
td.tupleType(),
6666
td.version,

dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoInsertIgnoreVisitFun.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ public static Collection<Vertex> visit(Job job, IdGenerator idGenerator, Locatio
154154
transaction.getStartTs(),
155155
transaction.getForUpdateTs(),
156156
transaction.getLockTimeOut(),
157-
visitor.getExecuteVariables().isInsertCheckInplace(),
157+
job.getExecutionContext().isInsertCheckInplace(),
158158
td,
159159
rel.isHasAutoIncrement(),
160160
rel.getAutoIncrementColIndex()
@@ -181,7 +181,7 @@ public static Collection<Vertex> visit(Job job, IdGenerator idGenerator, Locatio
181181
transaction.getStartTs(),
182182
0L,
183183
transaction.getLockTimeOut(),
184-
visitor.getExecuteVariables().isInsertCheckInplace(),
184+
job.getExecutionContext().isInsertCheckInplace(),
185185
td,
186186
rel.isHasAutoIncrement(),
187187
rel.getAutoIncrementColIndex()

0 commit comments

Comments
 (0)