Skip to content

Commit 34221fb

Browse files
authored
Subscription: avoid the file reading when the memory is not enough for tsfile slicing & implement all-out effort global timeout control & support client resume from breakpoint for tsfile consumption (apache#13992)
1 parent e1cc229 commit 34221fb

File tree

37 files changed

+788
-321
lines changed

37 files changed

+788
-321
lines changed

iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/exception/SubscriptionPipeTimeoutException.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
import java.util.Objects;
2323

24-
public class SubscriptionPipeTimeoutException extends SubscriptionRuntimeNonCriticalException {
24+
public class SubscriptionPipeTimeoutException extends SubscriptionTimeoutException {
2525

2626
public SubscriptionPipeTimeoutException(final String message) {
2727
super(message);
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.rpc.subscription.exception;
21+
22+
import java.util.Objects;
23+
24+
public class SubscriptionPollTimeoutException extends SubscriptionTimeoutException {
25+
26+
public SubscriptionPollTimeoutException(final String message) {
27+
super(message);
28+
}
29+
30+
public SubscriptionPollTimeoutException(final String message, final Throwable cause) {
31+
super(message, cause);
32+
}
33+
34+
@Override
35+
public boolean equals(final Object obj) {
36+
return obj instanceof SubscriptionPollTimeoutException
37+
&& Objects.equals(getMessage(), ((SubscriptionPollTimeoutException) obj).getMessage())
38+
&& Objects.equals(getTimeStamp(), ((SubscriptionPollTimeoutException) obj).getTimeStamp());
39+
}
40+
41+
@Override
42+
public int hashCode() {
43+
return super.hashCode();
44+
}
45+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.rpc.subscription.exception;
21+
22+
import java.util.Objects;
23+
24+
public abstract class SubscriptionTimeoutException extends SubscriptionRuntimeNonCriticalException {
25+
26+
public static String KEYWORD = "TimeoutException";
27+
28+
public SubscriptionTimeoutException(final String message) {
29+
super(message);
30+
}
31+
32+
public SubscriptionTimeoutException(final String message, final Throwable cause) {
33+
super(message, cause);
34+
}
35+
36+
@Override
37+
public boolean equals(final Object obj) {
38+
return obj instanceof SubscriptionTimeoutException
39+
&& Objects.equals(getMessage(), ((SubscriptionTimeoutException) obj).getMessage())
40+
&& Objects.equals(getTimeStamp(), ((SubscriptionTimeoutException) obj).getTimeStamp());
41+
}
42+
43+
@Override
44+
public int hashCode() {
45+
return super.hashCode();
46+
}
47+
}

iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public class SubscriptionPollRequest {
3636

3737
private final transient SubscriptionPollPayload payload;
3838

39-
private final transient long timeoutMs; // unused now
39+
private final transient long timeoutMs;
4040

4141
/** The maximum size, in bytes, for the response payload. */
4242
private final transient long maxBytes;

0 commit comments

Comments
 (0)