Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
*/
package org.apache.drill.exec.store.kafka;

import org.apache.drill.PlanTestBase;
import org.apache.drill.categories.KafkaStorageTest;
import org.apache.drill.categories.SlowTest;
import org.apache.kafka.common.serialization.StringSerializer;

import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand All @@ -32,10 +32,8 @@
@Category({KafkaStorageTest.class, SlowTest.class})
public class KafkaFilterPushdownTest extends KafkaTestBase {
private static final int NUM_PARTITIONS = 5;
private static final String expectedSubStr = " \"kafkaScanSpec\" : {\n" +
" \"topicName\" : \"drill-pushdown-topic\"\n" +
" },\n" +
" \"cost\"";
private static final String expectedPattern = "kafkaScanSpec.*\\n.*\"topicName\" : \"drill-pushdown-topic\"\\n(" +
".*\\n)?(.*\\n)?(.*\\n)?.*cost\"(.*\\n)(.*\\n).*outputRowCount\" : (%s.0)";

@BeforeClass
public static void setup() throws Exception {
Expand Down Expand Up @@ -63,7 +61,9 @@ public void testPushdownOnOffset() throws Exception {
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);

runKafkaSQLVerifyCount(queryString, expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCount)},
new String[]{});
}

/**
Expand All @@ -79,7 +79,9 @@ public void testPushdownOnPartition() throws Exception {
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);

runKafkaSQLVerifyCount(queryString, expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCount)},
new String[]{});
}

/**
Expand All @@ -95,7 +97,9 @@ public void testPushdownOnTimestamp() throws Exception {
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);

runKafkaSQLVerifyCount(queryString,expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCount)},
new String[]{});
}

/**
Expand All @@ -112,7 +116,9 @@ public void testPushdownUnorderedTimestamp() throws Exception {
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);

runKafkaSQLVerifyCount(queryString,expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowInPlan));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowInPlan)},
new String[]{});
}

/**
Expand All @@ -128,7 +134,9 @@ public void testPushdownWhenTimestampDoesNotExist() throws Exception {
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);

runKafkaSQLVerifyCount(queryString,expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCount)},
new String[]{});
}

/**
Expand All @@ -144,7 +152,9 @@ public void testPushdownWhenPartitionDoesNotExist() throws Exception {
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);

runKafkaSQLVerifyCount(queryString,expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCount)},
new String[]{});
}

/**
Expand All @@ -161,7 +171,9 @@ public void testPushdownForEmptyScanSpec() throws Exception {
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);

runKafkaSQLVerifyCount(queryString,expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCount)},
new String[]{});
}

/**
Expand All @@ -178,42 +190,54 @@ public void testPushdownOffsetNoRecordsReturnedWithBoundaryConditions() throws E
TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset = 10");

runKafkaSQLVerifyCount(queryString,expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCount)},
new String[]{});

//"equal" such that value < startOffset
queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset = -1");

runKafkaSQLVerifyCount(queryString,expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCount)},
new String[]{});

//"greater_than" such that value = endOffset-1
queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset > 9");

runKafkaSQLVerifyCount(queryString,expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCount)},
new String[]{});

//"greater_than_or_equal" such that value = endOffset
queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset >= 10");

runKafkaSQLVerifyCount(queryString,expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCount)},
new String[]{});

//"less_than" such that value = startOffset
queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset < 0");

runKafkaSQLVerifyCount(queryString,expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCount)},
new String[]{});

//"less_than_or_equal" such that value < startOffset
queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset <= -1");

runKafkaSQLVerifyCount(queryString,expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCount)},
new String[]{});
}

/**
Expand All @@ -230,21 +254,27 @@ public void testPushdownOffsetOneRecordReturnedWithBoundaryConditions() throws E
TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset = 9");

runKafkaSQLVerifyCount(queryString, expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCount)},
new String[]{});

//"greater_than" such that value = endOffset-2
queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset > 8");

runKafkaSQLVerifyCount(queryString,expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCount)},
new String[]{});

//"greater_than_or_equal" such that value = endOffset-1
queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset >= 9");

runKafkaSQLVerifyCount(queryString,expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCount)},
new String[]{});
}

/**
Expand All @@ -262,7 +292,9 @@ public void testPushdownWithOr() throws Exception {
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);

runKafkaSQLVerifyCount(queryString,expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCount)},
new String[]{});
}

/**
Expand All @@ -280,7 +312,9 @@ public void testPushdownWithOr1() throws Exception {
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);

runKafkaSQLVerifyCount(queryString,expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowInPlan));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowInPlan)},
new String[]{});
}

/**
Expand All @@ -299,7 +333,9 @@ public void testPushdownWithAndOrCombo() throws Exception {
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2, predicate3);

runKafkaSQLVerifyCount(queryString,expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCount)},
new String[]{});
}

/**
Expand All @@ -319,7 +355,9 @@ public void testPushdownWithAndOrCombo2() throws Exception {
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2, predicate3, predicate4);

runKafkaSQLVerifyCount(queryString,expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCountInPlan));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCountInPlan)},
new String[]{});
}

/**
Expand All @@ -338,7 +376,9 @@ public void testPushdownTimestampWithNonMetaField() throws Exception {
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);

runKafkaSQLVerifyCount(queryString,expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCountInPlan));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCountInPlan)},
new String[]{});
}

/**
Expand All @@ -358,7 +398,9 @@ public void testNoPushdownOfOffsetWithNonMetadataField() throws Exception {
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2, predicate3);

runKafkaSQLVerifyCount(queryString,expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCountInPlan));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCountInPlan)},
new String[]{});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,7 @@ private ExecConstants() {
new OptionDescription("Indicates how long a query can wait in queue before the query fails. Range: 0-9223372036854775807"));

// New Smart RM boot time configs
public static final String RM_WAIT_THREAD_INTERVAL = "exec.rm.wait_thread_interval";
public static final String RM_QUERY_TAGS_KEY = "exec.rm.queryTags";
public static final StringValidator RM_QUERY_TAGS_VALIDATOR = new StringValidator(RM_QUERY_TAGS_KEY,
new OptionDescription("Allows user to set coma separated list of tags for all the queries submitted over a session"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@
*/
package org.apache.drill.exec.coord;

import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.drill.exec.coord.store.TransientStore;
import org.apache.drill.exec.coord.store.TransientStoreConfig;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State;
import org.apache.drill.exec.work.foreman.DrillbitStatusListener;

import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
* Pluggable interface built to manage cluster coordination. Allows Drillbit or DrillClient to register its capabilities
* as well as understand other node's existence and capabilities.
Expand Down Expand Up @@ -60,6 +61,10 @@ public abstract class ClusterCoordinator implements AutoCloseable {
*/
public abstract Collection<DrillbitEndpoint> getAvailableEndpoints();

public Map<String, DrillbitEndpoint> getAvailableEndpointsUUID() {
throw new UnsupportedOperationException("Only supported by Zookeeper Cluster Coordinator outside YARN");
}

/**
* Get a collection of ONLINE drillbit endpoints by excluding the drillbits
* that are in QUIESCENT state (drillbits that are shutting down). Primarily used by the planner
Expand All @@ -70,6 +75,10 @@ public abstract class ClusterCoordinator implements AutoCloseable {

public abstract Collection<DrillbitEndpoint> getOnlineEndPoints();

public Map<DrillbitEndpoint, String> getOnlineEndpointsUUID() {
throw new UnsupportedOperationException("Only supported by Zookeeper Cluster Coordinator outside YARN");
}

public abstract RegistrationHandle update(RegistrationHandle handle, State state);

public interface RegistrationHandle {
Expand All @@ -79,6 +88,8 @@ public interface RegistrationHandle {
*/
public abstract DrillbitEndpoint getEndPoint();

public abstract String getId();

public abstract void setEndPoint(DrillbitEndpoint endpoint);
}

Expand Down
Loading