Skip to content
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ pom.xml
*/target/*
*.orig
*.log
build-aux/fetchdep.sh
opentsdb.spec

#for Intellij
\.idea
Expand Down Expand Up @@ -50,3 +52,4 @@ tools/docker/opentsdb.conf
fat-jar-pom.xml
src-resources/
test-resources/
third_party/**/*.jar
5 changes: 5 additions & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ tsdb_SRC := \
src/core/RequestBuilder.java \
src/core/RowKey.java \
src/core/RowSeq.java \
src/core/RpcResponder.java \
src/core/iRowSeq.java \
src/core/SaltScanner.java \
src/core/SeekableView.java \
Expand Down Expand Up @@ -120,6 +121,7 @@ tsdb_SRC := \
src/query/expression/ExpressionReader.java \
src/query/expression/Expressions.java \
src/query/expression/ExpressionTree.java \
src/query/expression/FirstDifference.java \
src/query/expression/HighestCurrent.java \
src/query/expression/HighestMax.java \
src/query/expression/IntersectionIterator.java \
Expand Down Expand Up @@ -178,7 +180,9 @@ tsdb_SRC := \
src/tools/TSDMain.java \
src/tools/TextImporter.java \
src/tools/TreeSync.java \
src/tools/UidGarbageCollector.java \
src/tools/UidManager.java \
src/tools/Uids.java \
src/tools/ArgValueValidator.java \
src/tools/ConfigArgP.java \
src/tools/ConfigMetaType.java \
Expand Down Expand Up @@ -316,6 +320,7 @@ test_SRC := \
test/core/TestRateSpan.java \
test/core/TestRowKey.java \
test/core/TestRowSeq.java \
test/core/TestRpcResponsder.java \
test/core/TestSaltScanner.java \
test/core/TestSpan.java \
test/core/TestSpanGroup.java \
Expand Down
2 changes: 1 addition & 1 deletion configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# along with this library. If not, see <http://www.gnu.org/licenses/>.

# Semantic Versioning (see http://semver.org/).
AC_INIT([opentsdb], [2.4.0RC2], [opentsdb@googlegroups.com])
AC_INIT([opentsdb], [2.5.0-SNAPSHOT], [opentsdb@googlegroups.com])
AC_CONFIG_AUX_DIR([build-aux])
AM_INIT_AUTOMAKE([foreign])

Expand Down
110 changes: 110 additions & 0 deletions src/core/RpcResponder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// This file is part of OpenTSDB.
// Copyright (C) 2010-2017 The OpenTSDB Authors.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 2.1 of the License, or (at your
// option) any later version. This program is distributed in the hope that it
// will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
// General Public License for more details. You should have received a copy
// of the GNU Lesser General Public License along with this program. If not,
// see <http://www.gnu.org/licenses/>.
package net.opentsdb.core;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import net.opentsdb.utils.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
*
* This class is responsible for building result of requests and
* respond to clients asynchronously.
*
* It can reduce requests that stacking in AsyncHBase, especially put requests.
* When a HBase's RPC has completed, the "AsyncHBase I/O worker" just decodes
* the response, and then do callback by this class asynchronously. We should
* take up workers as short as possible time so that workers can remove RPCs
* from in-flight state more quickly.
*
*/
public class RpcResponder {

private static final Logger LOG = LoggerFactory.getLogger(RpcResponder.class);

public static final String TSD_RESPONSE_ASYNC_KEY = "tsd.core.response.async";
public static final boolean TSD_RESPONSE_ASYNC_DEFAULT = true;

public static final String TSD_RESPONSE_WORKER_NUM_KEY =
"tsd.core.response.worker.num";
public static final int TSD_RESPONSE_WORKER_NUM_DEFAULT = 10;

private final boolean async;
private ExecutorService responders;
private volatile boolean running = true;

RpcResponder(final Config config) {
async = config.getBoolean(TSD_RESPONSE_ASYNC_KEY,
TSD_RESPONSE_ASYNC_DEFAULT);

if (async) {
int threads = config.getInt(TSD_RESPONSE_WORKER_NUM_KEY,
TSD_RESPONSE_WORKER_NUM_DEFAULT);
responders = Executors.newFixedThreadPool(threads,
new ThreadFactoryBuilder()
.setNameFormat("OpenTSDB Responder #%d")
.setDaemon(true)
.setUncaughtExceptionHandler(new ExceptionHandler())
.build());
}

LOG.info("RpcResponder mode: {}", async ? "async" : "sync");
}

public void response(Runnable run) {
if (async) {
if (running) {
responders.execute(run);
} else {
throw new IllegalStateException("RpcResponder is closing or closed.");
}
} else {
run.run();
}
}

public void close() {
if (running) {
running = false;
responders.shutdown();
}

boolean completed;
try {
completed = responders.awaitTermination(5, TimeUnit.MINUTES);
} catch (InterruptedException e) {
completed = false;
}

if (!completed) {
LOG.warn(
"There are still some results that are not returned to the clients.");
}
}

public boolean isAsync() {
return async;
}

private class ExceptionHandler implements Thread.UncaughtExceptionHandler {
@Override
public void uncaughtException(Thread t, Throwable e) {
LOG.error("Run into an uncaught exception in thread: " + t.getName(), e);
}
}
}
51 changes: 43 additions & 8 deletions src/core/TSDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ public enum OperationMode {
/** Timer used for various tasks such as idle timeouts or query timeouts */
private final HashedWheelTimer timer;

/** RpcResponder for doing response asynchronously*/
private final RpcResponder rpcResponder;

/**
* Row keys that need to be compacted.
* Whenever we write a new data point to a row, we add the row key to this
Expand Down Expand Up @@ -343,7 +346,10 @@ public TSDB(final HBaseClient client, final Config config) {

// set any extra tags from the config for stats
StatsCollector.setGlobalTags(config);



rpcResponder = new RpcResponder(config);

LOG.debug(config.dumpConfiguration());
}

Expand Down Expand Up @@ -1657,20 +1663,43 @@ public String toString() {
}
}

final class RpcResponsderShutdown implements Callback<Object, Object> {
@Override
public Object call(Object arg) throws Exception {
try {
TSDB.this.rpcResponder.close();
} catch (Exception e) {
LOG.error(
"Run into unknown exception while closing RpcResponder.", e);
} finally {
return arg;
}
}
}

final class HClientShutdown implements Callback<Deferred<Object>, ArrayList<Object>> {
public Deferred<Object> call(final ArrayList<Object> args) {
public Deferred<Object> call(final ArrayList<Object> args) {
Callback<Object, Object> nextCallback;
if (storage_exception_handler != null) {
return client.shutdown().addBoth(new SEHShutdown());
nextCallback = new SEHShutdown();
} else {
nextCallback = new FinalShutdown();
}
return client.shutdown().addBoth(new FinalShutdown());

if (TSDB.this.rpcResponder.isAsync()) {
client.shutdown().addBoth(new RpcResponsderShutdown());
}

return client.shutdown().addBoth(nextCallback);
}
public String toString() {

public String toString() {
return "shutdown HBase client";
}
}

final class ShutdownErrback implements Callback<Object, Exception> {
public Object call(final Exception e) {
public Object call(final Exception e) {
final Logger LOG = LoggerFactory.getLogger(ShutdownErrback.class);
if (e instanceof DeferredGroupException) {
final DeferredGroupException ge = (DeferredGroupException) e;
Expand All @@ -1684,13 +1713,14 @@ public Object call(final Exception e) {
}
return new HClientShutdown().call(null);
}
public String toString() {

public String toString() {
return "shutdown HBase client after error";
}
}

final class CompactCB implements Callback<Object, ArrayList<Object>> {
public Object call(ArrayList<Object> compactions) throws Exception {
public Object call(ArrayList<Object> compactions) throws Exception {
return null;
}
}
Expand Down Expand Up @@ -2189,4 +2219,9 @@ final Deferred<Object> delete(final byte[] key, final byte[][] qualifiers) {
return client.delete(new DeleteRequest(table, key, FAMILY, qualifiers));
}

/** Do response by RpcResponder */
public void response(Runnable run) {
rpcResponder.response(run);
}

}
5 changes: 3 additions & 2 deletions src/core/TsdbQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -1544,14 +1544,15 @@ private long getScanStartTimeSeconds() {
}

/** Returns the UNIX timestamp at which we must stop scanning. */
private long getScanEndTimeSeconds() {
@VisibleForTesting
protected long getScanEndTimeSeconds() {
// Begin with the raw query end time.
long end = getEndTime();

// Convert to seconds if we have a query in ms.
if ((end & Const.SECOND_MASK) != 0L) {
end /= 1000L;
if (end - (end * 1000) < 1) {
if (end == 0) {
// handle an edge case where a user may request a ms time between
// 0 and 1 seconds. Just bump it a second.
end++;
Expand Down
1 change: 1 addition & 0 deletions src/query/expression/ExpressionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public final class ExpressionFactory {
available_functions.put("highestMax", new HighestMax());
available_functions.put("shift", new TimeShift());
available_functions.put("timeShift", new TimeShift());
available_functions.put("firstDiff", new FirstDifference());
}

/** Don't instantiate me! */
Expand Down
101 changes: 101 additions & 0 deletions src/query/expression/FirstDifference.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// This file is part of OpenTSDB.
// Copyright (C) 2015 The OpenTSDB Authors.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 2.1 of the License, or (at your
// option) any later version. This program is distributed in the hope that it
// will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
// General Public License for more details. You should have received a copy
// of the GNU Lesser General Public License along with this program. If not,
// see <http://www.gnu.org/licenses/>.
package net.opentsdb.query.expression;

import java.util.ArrayList;

import java.util.List;

import net.opentsdb.core.DataPoint;
import net.opentsdb.core.DataPoints;
import net.opentsdb.core.IllegalDataException;
import net.opentsdb.core.MutableDataPoint;
import net.opentsdb.core.SeekableView;
import net.opentsdb.core.TSQuery;
import net.opentsdb.core.Aggregators.Interpolation;

/**
* Implements a difference function, calculates the first difference of a given series
*
* @since 2.3
*/
public class FirstDifference implements net.opentsdb.query.expression.Expression {

@Override
public DataPoints[] evaluate(final TSQuery data_query,
final List<DataPoints[]> query_results, final List<String> params) {
if (data_query == null) {
throw new IllegalArgumentException("Missing time series query");
}
if (query_results == null || query_results.isEmpty()) {
return new DataPoints[]{};
}


int num_results = 0;
for (final DataPoints[] results : query_results) {
num_results += results.length;
}
final DataPoints[] results = new DataPoints[num_results];

int ix = 0;
// one or more sub queries (m=...&m=...&m=...)
for (final DataPoints[] sub_query_result : query_results) {
// group bys (m=sum:foo{host=*})
for (final DataPoints dps : sub_query_result) {
results[ix++] = firstDiff(dps);
}
}

return results;

}

/**
* return the first difference of datapoints
*
* @param points The data points to do difference
* @return The resulting data points
*/
private DataPoints firstDiff(final DataPoints points) {
final List<DataPoint> dps = new ArrayList<DataPoint>();
final SeekableView view = points.iterator();
List<Double> nums = new ArrayList<Double>();
List<Long> times = new ArrayList<Long>();
while (view.hasNext()) {
DataPoint pt = view.next();
nums.add(pt.toDouble());
times.add(pt.timestamp());
}
List<Double> diff = new ArrayList<Double>();
diff.add(0.0);
for (int j =0;j<nums.size()-1;j++){
diff.add(nums.get(j+1) - nums.get(j));
}
for (int j =0;j<nums.size();j++){
dps.add(MutableDataPoint.ofDoubleValue(times.get(j), diff.get(j)));
}
final DataPoint[] results = new DataPoint[dps.size()];
dps.toArray(results);
return new net.opentsdb.query.expression.PostAggregatedDataPoints(points, results);
}



@Override
public String writeStringField(final List<String> query_params,
final String inner_expression) {
return "firstDiff(" + inner_expression + ")";
}

}
Loading