diff --git a/mist-core/src/main/java/edu/snu/mist/core/parameters/ReplayServerAddress.java b/mist-core/src/main/java/edu/snu/mist/core/parameters/ReplayServerAddress.java new file mode 100644 index 00000000..e3acb986 --- /dev/null +++ b/mist-core/src/main/java/edu/snu/mist/core/parameters/ReplayServerAddress.java @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.mist.core.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +/** + * The address of the replay server. + */ +@NamedParameter(doc = "The address of the replay server.", default_value = "noReplay", short_name = "replay_address") +public class ReplayServerAddress implements Name { +} diff --git a/mist-core/src/main/java/edu/snu/mist/core/parameters/ReplayServerPort.java b/mist-core/src/main/java/edu/snu/mist/core/parameters/ReplayServerPort.java new file mode 100644 index 00000000..50ee3cc5 --- /dev/null +++ b/mist-core/src/main/java/edu/snu/mist/core/parameters/ReplayServerPort.java @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.mist.core.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +/** + * The port used for replaying. + */ +@NamedParameter(doc = "The port of the replay server.", default_value = "26523", short_name = "replay_port") +public class ReplayServerPort implements Name { +} diff --git a/mist-core/src/main/java/edu/snu/mist/core/replay/EventReplayUtils.java b/mist-core/src/main/java/edu/snu/mist/core/replay/EventReplayUtils.java index 122125a4..1f6abbe5 100644 --- a/mist-core/src/main/java/edu/snu/mist/core/replay/EventReplayUtils.java +++ b/mist-core/src/main/java/edu/snu/mist/core/replay/EventReplayUtils.java @@ -15,6 +15,9 @@ */ package edu.snu.mist.core.replay; +import edu.snu.mist.core.task.ExecutionDag; +import edu.snu.mist.core.task.ExecutionVertex; +import edu.snu.mist.core.task.PhysicalSource; import org.apache.reef.io.Tuple; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.json.simple.JSONArray; @@ -289,4 +292,67 @@ public static boolean removeOnCheckpoint(final String replayServerAddress, final private static String getReplayServerUrl(final String replayServerAddress, final int replayServerPort) { return "http://" + replayServerAddress + ":" + replayServerPort; } + + /** + * Start the sources. + */ + public static void startSources(final Collection executionDags) { + LOG.log(Level.INFO, "Starting sources..."); + for (final ExecutionDag executionDag : executionDags) { + for (final ExecutionVertex executionVertex : executionDag.getDag().getRootVertices()) { + final PhysicalSource src = (PhysicalSource) executionVertex; + src.start(); + } + } + } + + /** + * Send the MqttMessages of the sources in chronological order. + * @param srcAndMqttMessageListMap each value(List) is always required to be ordered in chronological order. + */ + public static void sendMsgs(final Map>> srcAndMqttMessageListMap) { + final Map, PhysicalSource> timestampMqttMessageTupleAndSrcMap = new HashMap<>(); + for (final Map.Entry>> entry : srcAndMqttMessageListMap.entrySet()) { + final List> messageList = entry.getValue(); + if (!messageList.isEmpty()) { + timestampMqttMessageTupleAndSrcMap.put(entry.getValue().remove(0), entry.getKey()); + } + } + while (!srcAndMqttMessageListMap.isEmpty()) { + // Select the entry with the minimum timestamp. + final Tuple, PhysicalSource> minTimestampEntry = + selectAndRemoveMinTimestampEntry(timestampMqttMessageTupleAndSrcMap); + final Tuple minTimestampTuple = minTimestampEntry.getKey(); + final PhysicalSource minTimestampSrc = minTimestampEntry.getValue(); + + // Emit the MqttMessage corresponding to the minimum timestamp. + minTimestampSrc.getEventGenerator().emitData(minTimestampTuple.getValue()); + + if (srcAndMqttMessageListMap.get(minTimestampSrc).isEmpty()) { + // If there are no more messages to send for the source, exclude it completely. + srcAndMqttMessageListMap.remove(minTimestampSrc); + } else { + // If there are still more messages to send for the source, put the next message in the + // timestampMqttMessageAndSrcMap. + timestampMqttMessageTupleAndSrcMap.put(srcAndMqttMessageListMap.get(minTimestampSrc).remove(0), + minTimestampSrc); + } + } + } + + private static Tuple, PhysicalSource> selectAndRemoveMinTimestampEntry( + final Map, PhysicalSource> timestampMqttMessageAndSrcMap) { + Long minimumTimestamp = Long.MAX_VALUE; + Tuple, PhysicalSource> result = null; + for (final Map.Entry, PhysicalSource> entry : timestampMqttMessageAndSrcMap.entrySet()) { + final Long entryTimestamp = entry.getKey().getKey(); + if (entryTimestamp < minimumTimestamp) { + minimumTimestamp = entryTimestamp; + result = new Tuple<>(entry.getKey(), entry.getValue()); + } + } + // Remove the entry to emit from the map. + timestampMqttMessageAndSrcMap.remove(result.getKey()); + return result; + } } diff --git a/mist-core/src/main/java/edu/snu/mist/core/sources/DataGenerator.java b/mist-core/src/main/java/edu/snu/mist/core/sources/DataGenerator.java index 47d9e8d6..732317f7 100644 --- a/mist-core/src/main/java/edu/snu/mist/core/sources/DataGenerator.java +++ b/mist-core/src/main/java/edu/snu/mist/core/sources/DataGenerator.java @@ -24,6 +24,11 @@ */ public interface DataGenerator extends AutoCloseable { + /** + * Starts receiving data. + */ + void setup(); + /** * Starts generating data. */ diff --git a/mist-core/src/main/java/edu/snu/mist/core/sources/KafkaDataGenerator.java b/mist-core/src/main/java/edu/snu/mist/core/sources/KafkaDataGenerator.java index bab6dba0..8ebe8f03 100644 --- a/mist-core/src/main/java/edu/snu/mist/core/sources/KafkaDataGenerator.java +++ b/mist-core/src/main/java/edu/snu/mist/core/sources/KafkaDataGenerator.java @@ -101,6 +101,11 @@ public KafkaDataGenerator( this.pollTimeout = kafkaSharedResource.getPollTimeout(); } + @Override + public void setup() { + // This method is only for MQTTDataGenerator. + } + @Override public void start() { if (started.compareAndSet(false, true)) { diff --git a/mist-core/src/main/java/edu/snu/mist/core/sources/MQTTDataGenerator.java b/mist-core/src/main/java/edu/snu/mist/core/sources/MQTTDataGenerator.java index cb229020..59d65e27 100644 --- a/mist-core/src/main/java/edu/snu/mist/core/sources/MQTTDataGenerator.java +++ b/mist-core/src/main/java/edu/snu/mist/core/sources/MQTTDataGenerator.java @@ -17,6 +17,8 @@ import org.eclipse.paho.client.mqttv3.MqttMessage; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; @@ -26,6 +28,11 @@ public final class MQTTDataGenerator implements DataGenerator { private static final Logger LOG = Logger.getLogger(MQTTDataGenerator.class.getName()); + /** + * A flag for setup to subscribe. + */ + private final AtomicBoolean setup; + /** * A flag for start. */ @@ -51,12 +58,19 @@ public final class MQTTDataGenerator implements DataGenerator { */ private EventGenerator eventGenerator; + /** + * A queue for events that must be emitted. + */ + private Queue receivedMessages; + public MQTTDataGenerator(final MQTTSubscribeClient subClient, final String topic) { this.subClient = subClient; this.topic = topic; + this.setup = new AtomicBoolean(false); this.started = new AtomicBoolean(false); this.closed = new AtomicBoolean(false); + this.receivedMessages = new LinkedBlockingDeque<>(); } /** @@ -65,18 +79,28 @@ public MQTTDataGenerator(final MQTTSubscribeClient subClient, * @param message the message to emit */ void emitData(final MqttMessage message) { - if (!closed.get() && eventGenerator != null) { + if (setup.get() && !started.get() && !closed.get()) { + receivedMessages.add(message); + } else if (!closed.get() && eventGenerator != null) { eventGenerator.emitData(message); } } @Override - public void start() { - if (started.compareAndSet(false, true)) { + public void setup() { + if (setup.compareAndSet(false, true)) { subClient.subscribe(topic); } } + @Override + public void start() { + while (!receivedMessages.isEmpty()) { + eventGenerator.emitData(receivedMessages.poll()); + } + started.compareAndSet(false, true); + } + @Override public void close() { closed.compareAndSet(false, true); diff --git a/mist-core/src/main/java/edu/snu/mist/core/sources/NettyTextDataGenerator.java b/mist-core/src/main/java/edu/snu/mist/core/sources/NettyTextDataGenerator.java index 77139904..87dcfeaf 100644 --- a/mist-core/src/main/java/edu/snu/mist/core/sources/NettyTextDataGenerator.java +++ b/mist-core/src/main/java/edu/snu/mist/core/sources/NettyTextDataGenerator.java @@ -77,6 +77,11 @@ public NettyTextDataGenerator( this.serverSocketAddress = new InetSocketAddress(serverAddr, port); } + @Override + public void setup() { + // This method is only for MQTTDataGenerator. + } + @Override public void start() { if (started.compareAndSet(false, true)) { diff --git a/mist-core/src/main/java/edu/snu/mist/core/task/BaseQueryStarter.java b/mist-core/src/main/java/edu/snu/mist/core/task/BaseQueryStarter.java new file mode 100644 index 00000000..66554921 --- /dev/null +++ b/mist-core/src/main/java/edu/snu/mist/core/task/BaseQueryStarter.java @@ -0,0 +1,98 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.mist.core.task; + +import edu.snu.mist.core.replay.EventReplayResult; +import edu.snu.mist.core.replay.EventReplayUtils; +import org.apache.reef.io.Tuple; +import org.apache.reef.tang.exceptions.InjectionException; +import org.eclipse.paho.client.mqttv3.MqttMessage; + +import java.io.IOException; +import java.util.*; +import java.util.logging.Level; +import java.util.logging.Logger; + +public abstract class BaseQueryStarter { + + private static final Logger LOG = Logger.getLogger(BaseQueryStarter.class.getName()); + + /** + * The map of sources and queryIds. This is only used for replaying events. + */ + private final Map srcAndQueryIdMap; + + /** + * The address of the replay server. + */ + private final String replayServerAddress; + + /** + * The port number of the replay server. + */ + private final int replayServerPort; + + protected BaseQueryStarter(final String replayServerAddress, + final int replayServerPort) { + this.srcAndQueryIdMap = new HashMap<>(); + this.replayServerAddress = replayServerAddress; + this.replayServerPort = replayServerPort; + } + + protected void replayAndStart(final Map>> queryIdAndBrokerTopicMap, + final long minTimestamp, + final Collection executionDagCollection) + throws InjectionException, IOException, ClassNotFoundException { + if (replayServerAddress.equals("noReplay")) { + LOG.log(Level.WARNING, "Replay server is not up."); + // Start the sources. + for (final ExecutionDag executionDag : executionDagCollection) { + for (final ExecutionVertex executionVertex : executionDag.getDag().getRootVertices()) { + final PhysicalSource src = (PhysicalSource) executionVertex; + src.start(); + } + } + } else { + final Set> replayedBrokerTopicSet = new HashSet<>(); + for (final ExecutionDag dag : executionDagCollection) { + final Map>> srcAndMqttMessageListMap = new HashMap<>(); + for (final ExecutionVertex source : dag.getDag().getRootVertices()) { + final PhysicalSource physicalSource = (PhysicalSource) source; + final String queryId = srcAndQueryIdMap.remove(source); + final Set> brokerURIAndTopicSet = queryIdAndBrokerTopicMap.get(queryId); + for (final Tuple brokerURIAndTopic : brokerURIAndTopicSet) { + if (!replayedBrokerTopicSet.contains(brokerURIAndTopic)) { + final EventReplayResult eventReplayResult = + EventReplayUtils.replay(replayServerAddress, replayServerPort, + brokerURIAndTopic.getValue(), brokerURIAndTopic.getKey(), minTimestamp); + if (eventReplayResult.isSuccess()) { + final List> mqttMessageList = eventReplayResult.getMqttMessages(); + srcAndMqttMessageListMap.put(physicalSource, mqttMessageList); + } else { + LOG.log(Level.WARNING, "Replay server is not up and/or replaying events has failed."); + LOG.log(Level.WARNING, + "Sources for query " + queryId + " will be started without or partial replaying of events."); + } + replayedBrokerTopicSet.add(brokerURIAndTopic); + } + } + } + EventReplayUtils.sendMsgs(srcAndMqttMessageListMap); + } + EventReplayUtils.startSources(executionDagCollection); + } + } +} diff --git a/mist-core/src/main/java/edu/snu/mist/core/task/PhysicalSourceImpl.java b/mist-core/src/main/java/edu/snu/mist/core/task/PhysicalSourceImpl.java index 05526ea3..4a5cd2e6 100644 --- a/mist-core/src/main/java/edu/snu/mist/core/task/PhysicalSourceImpl.java +++ b/mist-core/src/main/java/edu/snu/mist/core/task/PhysicalSourceImpl.java @@ -43,6 +43,7 @@ public PhysicalSourceImpl(final String sourceId, super(sourceId, configuration); this.dataGenerator = dataGenerator; this.eventGenerator = eventGenerator; + dataGenerator.setup(); } @Override diff --git a/mist-core/src/main/java/edu/snu/mist/core/task/QueryManager.java b/mist-core/src/main/java/edu/snu/mist/core/task/QueryManager.java index e7c4d743..a1103fa0 100644 --- a/mist-core/src/main/java/edu/snu/mist/core/task/QueryManager.java +++ b/mist-core/src/main/java/edu/snu/mist/core/task/QueryManager.java @@ -23,11 +23,13 @@ import edu.snu.mist.formats.avro.AvroDag; import edu.snu.mist.formats.avro.QueryCheckpoint; import edu.snu.mist.formats.avro.QueryControlResult; +import org.apache.reef.io.Tuple; import org.apache.reef.tang.annotations.DefaultImplementation; import org.apache.reef.tang.exceptions.InjectionException; import java.io.IOException; import java.util.List; +import java.util.Set; /** * This interface manages queries that are submitted from clients. @@ -50,6 +52,16 @@ public interface QueryManager extends AutoCloseable { QueryControlResult createQueryWithCheckpoint(AvroDag avroDag, QueryCheckpoint checkpointedState); + /** + * Recover a checkpointed query. + * This method does not replay missed events and does not start the sources of the query. + * @param avroDag + * @param checkpointedState + * @return the set of tuples (mqtt topic and the brokerURI) of the submitted query + */ + Set> setupQueryWithCheckpoint(AvroDag avroDag, + QueryCheckpoint checkpointedState); + /** * Create a query (this is for checkpointing). * @param queryId query id diff --git a/mist-core/src/main/java/edu/snu/mist/core/task/QueryStarter.java b/mist-core/src/main/java/edu/snu/mist/core/task/QueryStarter.java index 865ca214..d4e85d8a 100644 --- a/mist-core/src/main/java/edu/snu/mist/core/task/QueryStarter.java +++ b/mist-core/src/main/java/edu/snu/mist/core/task/QueryStarter.java @@ -18,10 +18,14 @@ import edu.snu.mist.common.graph.DAG; import edu.snu.mist.common.graph.MISTEdge; import edu.snu.mist.core.task.merging.ImmediateQueryMergingStarter; +import org.apache.reef.io.Tuple; import org.apache.reef.tang.annotations.DefaultImplementation; +import org.apache.reef.tang.exceptions.InjectionException; import java.io.IOException; import java.util.List; +import java.util.Map; +import java.util.Set; /** * This interface represents a component that is responsible for starting and executing queries. @@ -30,11 +34,19 @@ public interface QueryStarter { /** - * Start to execute the submitted query. + * Start or setup the submitted query. + * @param isStart whether to start the query or just set it up * @param queryId query id * @param configDag dag that has configuration of vertices */ - void start(String queryId, - Query query, DAG configDag, List jarFilePaths) + Set> startOrSetupForReplay(boolean isStart, String queryId, Query query, + DAG configDag, List jarFilePaths) throws IOException, ClassNotFoundException; + + /** + * Setup the submitted query but do not start the sources. + * @param queryIdAndBrokerTopicMap the map that contains the set of broker and topics per queryId + */ + void replayAndStart(Map>> queryIdAndBrokerTopicMap, long minTimestamp) + throws InjectionException, IOException, ClassNotFoundException; } diff --git a/mist-core/src/main/java/edu/snu/mist/core/task/checkpointing/DefaultCheckpointManagerImpl.java b/mist-core/src/main/java/edu/snu/mist/core/task/checkpointing/DefaultCheckpointManagerImpl.java index 66ce397e..c74b372b 100644 --- a/mist-core/src/main/java/edu/snu/mist/core/task/checkpointing/DefaultCheckpointManagerImpl.java +++ b/mist-core/src/main/java/edu/snu/mist/core/task/checkpointing/DefaultCheckpointManagerImpl.java @@ -19,10 +19,12 @@ import edu.snu.mist.core.task.Query; import edu.snu.mist.core.task.QueryManager; import edu.snu.mist.core.task.QueryRemover; +import edu.snu.mist.core.task.QueryStarter; import edu.snu.mist.core.task.groupaware.*; import edu.snu.mist.core.task.stores.GroupCheckpointStore; import edu.snu.mist.formats.avro.AvroDag; import edu.snu.mist.formats.avro.CheckpointResult; +import edu.snu.mist.formats.avro.GroupCheckpoint; import edu.snu.mist.formats.avro.QueryCheckpoint; import org.apache.reef.io.Tuple; import org.apache.reef.tang.InjectionFuture; @@ -31,9 +33,7 @@ import javax.inject.Inject; import java.io.FileNotFoundException; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -107,9 +107,14 @@ public void recoverGroup(final String groupId) throws IOException { final Map queryCheckpointMap; final List dagList; final QueryManager queryManager = queryManagerFuture.get(); + final GroupCheckpoint groupCheckpoint; + // The map used for replaying events. + final Map>> queryIdAndTopicBrokerURIMap = new HashMap<>(); + try { // Load the checkpointed states and the query lists. - queryCheckpointMap = checkpointStore.loadSavedGroupState(groupId).getQueryCheckpointMap(); + groupCheckpoint = checkpointStore.loadSavedGroupState(groupId); + queryCheckpointMap = groupCheckpoint.getQueryCheckpointMap(); final List queryIdListInGroup = new ArrayList<>(); for (final String queryId : queryCheckpointMap.keySet()) { queryIdListInGroup.add(queryId); @@ -123,9 +128,27 @@ public void recoverGroup(final String groupId) throws IOException { } for (final AvroDag avroDag : dagList) { - final QueryCheckpoint queryCheckpoint = queryCheckpointMap.get(avroDag.getQueryId()); - // Recover each query in the group. - queryManager.createQueryWithCheckpoint(avroDag, queryCheckpoint); + final String queryId = avroDag.getQueryId(); + final QueryCheckpoint queryCheckpoint = queryCheckpointMap.get(queryId); + // Setup each query in the group, but do not start them. + queryIdAndTopicBrokerURIMap.put(queryId, queryManager.setupQueryWithCheckpoint(avroDag, queryCheckpoint)); + } + + final Group recoveredGroup = groupMap.get(groupId); + final QueryStarter queryStarter; + if (recoveredGroup != null) { + queryStarter = recoveredGroup.getApplicationInfo().getQueryStarter(); + } else { + LOG.log(Level.WARNING, "No Group with id ({0}) exists in groupMap", groupId); + return; + } + + // Replay missed events and start the queries within the group. + try { + LOG.log(Level.INFO, "checkpoint min timestamp : " + groupCheckpoint.getCheckpointTimestamp()); + queryStarter.replayAndStart(queryIdAndTopicBrokerURIMap, groupCheckpoint.getCheckpointTimestamp()); + } catch (final Exception e) { + e.printStackTrace(); } } diff --git a/mist-core/src/main/java/edu/snu/mist/core/task/groupaware/GroupAwareQueryManagerImpl.java b/mist-core/src/main/java/edu/snu/mist/core/task/groupaware/GroupAwareQueryManagerImpl.java index 90abdf67..88ad67b3 100644 --- a/mist-core/src/main/java/edu/snu/mist/core/task/groupaware/GroupAwareQueryManagerImpl.java +++ b/mist-core/src/main/java/edu/snu/mist/core/task/groupaware/GroupAwareQueryManagerImpl.java @@ -42,6 +42,7 @@ import javax.inject.Inject; import java.io.IOException; import java.util.List; +import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.logging.Level; import java.util.logging.Logger; @@ -175,45 +176,8 @@ public QueryControlResult createQueryWithCheckpoint(final AvroDag avroDag, final String queryId = avroDag.getQueryId(); queryControlResult.setQueryId(queryId); try { - // Create the submitted query - // 1) Saves the avr dag to the PlanStore and - // converts the avro dag to the logical and execution dag - checkpointManager.storeQuery(avroDag); - - // Update app information - final String appId = avroDag.getAppId(); - - if (LOG.isLoggable(Level.FINE)) { - LOG.log(Level.FINE, "Create Query [aid: {0}, qid: {2}]", - new Object[]{appId, queryId}); - } - - if (!applicationMap.containsKey(appId)) { - createApplication(appId, avroDag.getJarPaths()); - } - - final ApplicationInfo applicationInfo = applicationMap.get(appId); - if (applicationInfo.getGroups().size() == 0) { - synchronized (applicationInfo) { - if (applicationInfo.getGroups().size() == 0) { - createGroup(applicationInfo); - // Waiting for group information being added - while (applicationInfo.getGroups().isEmpty()) { - Thread.sleep(100); - } - } - } - } - - final DAG configDag; - if (checkpointedState == null) { - configDag = configDagGenerator.generate(avroDag); - } else { - configDag = configDagGenerator.generateWithCheckpointedStates(avroDag, checkpointedState); - } - - final Query query = createAndStartQuery(queryId, applicationInfo, configDag); - + final Tuple> tuple = startSetupCommonOp(avroDag, checkpointedState); + final Query query = createAndStartQuery(queryId, tuple.getKey(), tuple.getValue()); queryControlResult.setIsSuccess(true); queryControlResult.setMsg(ResultMessage.submitSuccess(queryId)); return queryControlResult; @@ -229,6 +193,29 @@ public QueryControlResult createQueryWithCheckpoint(final AvroDag avroDag, } } + @Override + public Set> setupQueryWithCheckpoint(final AvroDag avroDag, + final QueryCheckpoint checkpointedState) { + final String queryId = avroDag.getQueryId(); + try { + final Tuple> tuple = startSetupCommonOp(avroDag, checkpointedState); + final ApplicationInfo applicationInfo = tuple.getKey(); + final DAG configDag = tuple.getValue(); + final Query query = new DefaultQueryImpl(queryId); + groupAllocationTableModifier.addEvent(new WritingEvent(WritingEvent.EventType.QUERY_ADD, + new Tuple<>(applicationInfo, query))); + // Setup the submitted dag + return applicationInfo.getQueryStarter().startOrSetupForReplay(false, queryId, query, configDag, + applicationInfo.getJarFilePath()); + } catch (final Exception e) { + e.printStackTrace(); + // [MIST-345] We need to release all of the information that is required for the query when it fails. + LOG.log(Level.SEVERE, "An exception occurred while starting {0} query: {1}", + new Object[] {queryId, e.toString()}); + return null; + } + } + @Override public Query createAndStartQuery(final String queryId, final ApplicationInfo applicationInfo, @@ -238,7 +225,8 @@ public Query createAndStartQuery(final String queryId, groupAllocationTableModifier.addEvent(new WritingEvent(WritingEvent.EventType.QUERY_ADD, new Tuple<>(applicationInfo, query))); // Start the submitted dag - applicationInfo.getQueryStarter().start(queryId, query, configDag, applicationInfo.getJarFilePath()); + applicationInfo.getQueryStarter().startOrSetupForReplay(true, queryId, query, configDag, + applicationInfo.getJarFilePath()); return query; } @@ -302,4 +290,47 @@ public QueryControlResult delete(final String groupId, final String queryId) { queryControlResult.setMsg(ResultMessage.deleteSuccess(queryId)); return queryControlResult; } + + private Tuple> startSetupCommonOp(final AvroDag avroDag, + final QueryCheckpoint state) + throws InjectionException, InterruptedException { + final String queryId = avroDag.getQueryId(); + // Create the submitted query + // 1) Saves the avro dag to the PlanStore and + // converts the avro dag to the logical and execution dag + checkpointManager.storeQuery(avroDag); + + // Update app information + final String appId = avroDag.getAppId(); + + if (LOG.isLoggable(Level.FINE)) { + LOG.log(Level.FINE, "Create Query [aid: {0}, qid: {2}]", + new Object[]{appId, queryId}); + } + + if (!applicationMap.containsKey(appId)) { + createApplication(appId, avroDag.getJarPaths()); + } + + final ApplicationInfo applicationInfo = applicationMap.get(appId); + if (applicationInfo.getGroups().size() == 0) { + synchronized (applicationInfo) { + if (applicationInfo.getGroups().size() == 0) { + createGroup(applicationInfo); + // Waiting for group information being added + while (applicationInfo.getGroups().isEmpty()) { + Thread.sleep(100); + } + } + } + } + + final DAG configDag; + if (state == null) { + configDag = configDagGenerator.generate(avroDag); + } else { + configDag = configDagGenerator.generateWithCheckpointedStates(avroDag, state); + } + return new Tuple<>(applicationInfo, configDag); + } } diff --git a/mist-core/src/main/java/edu/snu/mist/core/task/merging/ImmediateQueryMergingStarter.java b/mist-core/src/main/java/edu/snu/mist/core/task/merging/ImmediateQueryMergingStarter.java index 2696a080..1bcdbfdc 100644 --- a/mist-core/src/main/java/edu/snu/mist/core/task/merging/ImmediateQueryMergingStarter.java +++ b/mist-core/src/main/java/edu/snu/mist/core/task/merging/ImmediateQueryMergingStarter.java @@ -16,24 +16,34 @@ package edu.snu.mist.core.task.merging; import edu.snu.mist.common.SerializeUtils; +import edu.snu.mist.common.configurations.ConfKeys; import edu.snu.mist.common.graph.DAG; import edu.snu.mist.common.graph.GraphUtils; import edu.snu.mist.common.graph.MISTEdge; +import edu.snu.mist.core.parameters.ReplayServerAddress; +import edu.snu.mist.core.parameters.ReplayServerPort; import edu.snu.mist.core.task.*; import edu.snu.mist.core.task.codeshare.ClassLoaderProvider; +import org.apache.reef.io.Tuple; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.tang.exceptions.InjectionException; import javax.inject.Inject; import java.io.IOException; import java.net.URL; import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.logging.Level; +import java.util.logging.Logger; /** * This starter tries to merges the submitted dag with the currently running dag. * When a query is submitted, this starter first finds mergeable execution dags. * After that, it merges them with the submitted query. */ -public final class ImmediateQueryMergingStarter implements QueryStarter { +public final class ImmediateQueryMergingStarter extends BaseQueryStarter implements QueryStarter { + + private static final Logger LOG = Logger.getLogger(ImmediateQueryMergingStarter.class.getName()); /** * An algorithm for finding the sub-dag between the execution and submitted dag. @@ -86,6 +96,22 @@ public final class ImmediateQueryMergingStarter implements QueryStarter { */ private final List groupJarFilePaths; + /** + * The map of sources and queryIds. + * This is only used for replaying events and is empty at other times. + */ + private final Map srcAndQueryIdMap; + + /** + * The address of the replay server. + */ + private final String replayServerAddress; + + /** + * The port number of the replay server. + */ + private final int replayServerPort; + @Inject private ImmediateQueryMergingStarter(final CommonSubDagFinder commonSubDagFinder, final SrcAndDagMap> srcAndDagMap, @@ -95,7 +121,10 @@ private ImmediateQueryMergingStarter(final CommonSubDagFinder commonSubDagFinder final ExecutionVertexCountMap executionVertexCountMap, final ClassLoaderProvider classLoaderProvider, final ExecutionVertexGenerator executionVertexGenerator, - final ExecutionVertexDagMap executionVertexDagMap) { + final ExecutionVertexDagMap executionVertexDagMap, + @Parameter(ReplayServerAddress.class) final String replayServerAddress, + @Parameter(ReplayServerPort.class) final int replayServerPort) { + super(replayServerAddress, replayServerPort); this.commonSubDagFinder = commonSubDagFinder; this.srcAndDagMap = srcAndDagMap; this.queryIdConfigDagMap = queryIdConfigDagMap; @@ -106,13 +135,20 @@ private ImmediateQueryMergingStarter(final CommonSubDagFinder commonSubDagFinder this.executionVertexCountMap = executionVertexCountMap; this.executionVertexDagMap = executionVertexDagMap; this.groupJarFilePaths = new CopyOnWriteArrayList<>(); + this.srcAndQueryIdMap = new HashMap<>(); + this.replayServerAddress = replayServerAddress; + this.replayServerPort = replayServerPort; } @Override - public synchronized void start(final String queryId, - final Query query, - final DAG submittedDag, - final List jarFilePaths) throws IOException, ClassNotFoundException { + public synchronized Set> startOrSetupForReplay(final boolean isStart, + final String queryId, + final Query query, + final DAG submittedDag, + final List jarFilePaths) + throws IOException, ClassNotFoundException { + // The set to contain the tuples (topic and broker uri) of the submitted query. + final Set> topicAndBrokerURISet = new HashSet<>(); queryIdConfigDagMap.put(queryId, submittedDag); @@ -129,6 +165,21 @@ public synchronized void start(final String queryId, // Synchronize the execution dags to evade concurrent modifications // TODO:[MIST-590] We need to improve this code for concurrent modification synchronized (srcAndDagMap) { + if (!isStart) { + for (final ConfigVertex source : submittedDag.getRootVertices()) { + final Map srcConfMap = source.getConfiguration(); + + final String mqttTopic = srcConfMap.get(ConfKeys.MQTTSourceConf.MQTT_SRC_TOPIC.name()); + final String mqttBrokerAddressAndPort = srcConfMap.get(ConfKeys.MQTTSourceConf.MQTT_SRC_BROKER_URI.name()); + + if (mqttTopic != null && mqttBrokerAddressAndPort != null) { + final Tuple brokerTopic = new Tuple<>(mqttTopic, mqttBrokerAddressAndPort); + topicAndBrokerURISet.add(brokerTopic); + } else { + LOG.log(Level.WARNING, "No mqttTopic or mqttBrokerAddressAndPort were found"); + } + } + } // Find mergeable DAGs from the execution dags final Map, ExecutionDag> mergeableDags = findMergeableDags(submittedDag); @@ -139,10 +190,14 @@ public synchronized void start(final String queryId, QueryStarterUtils.setUpOutputEmitters(executionDag, query); for (final ExecutionVertex source : executionDag.getDag().getRootVertices()) { - // Start the source + // Set up the source final PhysicalSource src = (PhysicalSource) source; srcAndDagMap.put(src.getConfiguration(), executionDag); - src.start(); + if (!isStart) { + srcAndQueryIdMap.put(src, queryId); + } else { + src.start(); + } } // Update the execution dag of the execution vertex @@ -151,7 +206,7 @@ public synchronized void start(final String queryId, } executionDags.add(executionDag); - return; + return topicAndBrokerURISet; } // If there exist mergeable execution dags, @@ -192,6 +247,9 @@ public synchronized void start(final String queryId, sharableExecutionDag.getDag().addVertex(executionVertex); executionVertexCountMap.put(executionVertex, 1); executionVertexDagMap.put(executionVertex, sharableExecutionDag); + if (!isStart) { + srcAndQueryIdMap.put((PhysicalSource) executionVertex, queryId); + } } else { executionVertex = subDagMap.get(source); executionVertexCountMap.put(executionVertex, executionVertexCountMap.get(executionVertex) + 1); @@ -204,16 +262,27 @@ public synchronized void start(final String queryId, } } - // If there are sources that are not shared, start them + // If there are sources that are not shared, put them in the srcAndDagMap. for (final ConfigVertex source : submittedDag.getRootVertices()) { if (!subDagMap.containsKey(source)) { srcAndDagMap.put(source.getConfiguration(), sharableExecutionDag); - ((PhysicalSource)configExecutionVertexMap.get(source)).start(); + if (isStart) { + ((PhysicalSource)configExecutionVertexMap.get(source)).start(); + } } } + + return topicAndBrokerURISet; } } + @Override + public void replayAndStart(final Map>> queryIdAndBrokerTopicMap, + final long minTimestamp) + throws InjectionException, IOException, ClassNotFoundException { + super.replayAndStart(queryIdAndBrokerTopicMap, minTimestamp, executionDags.values()); + } + /** * Create the execution dag in dfs order. */ diff --git a/mist-core/src/main/java/edu/snu/mist/core/task/merging/NoMergingQueryStarter.java b/mist-core/src/main/java/edu/snu/mist/core/task/merging/NoMergingQueryStarter.java index 9682fdab..f95e7c3f 100644 --- a/mist-core/src/main/java/edu/snu/mist/core/task/merging/NoMergingQueryStarter.java +++ b/mist-core/src/main/java/edu/snu/mist/core/task/merging/NoMergingQueryStarter.java @@ -15,19 +15,29 @@ */ package edu.snu.mist.core.task.merging; +import edu.snu.mist.common.configurations.ConfKeys; import edu.snu.mist.common.graph.DAG; import edu.snu.mist.common.graph.MISTEdge; +import edu.snu.mist.core.parameters.ReplayServerAddress; +import edu.snu.mist.core.parameters.ReplayServerPort; import edu.snu.mist.core.task.*; +import org.apache.reef.io.Tuple; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.tang.exceptions.InjectionException; import javax.inject.Inject; import java.io.IOException; -import java.util.List; +import java.util.*; +import java.util.logging.Level; +import java.util.logging.Logger; /** * This query starter does not merge queries. * Instead, it executes them separately. */ -public final class NoMergingQueryStarter implements QueryStarter { +public final class NoMergingQueryStarter extends BaseQueryStarter implements QueryStarter { + + private static final Logger LOG = Logger.getLogger(NoMergingQueryStarter.class.getName()); /** * The map that has a query id as a key and an execution dag as a value. @@ -39,11 +49,32 @@ public final class NoMergingQueryStarter implements QueryStarter { */ private final DagGenerator dagGenerator; + /** + * The map of sources and queryIds. This is only used for replaying events. + */ + private final Map srcAndQueryIdMap; + + /** + * The address of the replay server. + */ + private final String replayServerAddress; + + /** + * The port number of the replay server. + */ + private final int replayServerPort; + @Inject private NoMergingQueryStarter(final ExecutionPlanDagMap executionPlanDagMap, - final DagGenerator dagGenerator) { + final DagGenerator dagGenerator, + @Parameter(ReplayServerAddress.class) final String replayServerAddress, + @Parameter(ReplayServerPort.class) final int replayServerPort) { + super(replayServerAddress, replayServerPort); this.executionPlanDagMap = executionPlanDagMap; this.dagGenerator = dagGenerator; + this.srcAndQueryIdMap = new HashMap<>(); + this.replayServerAddress = replayServerAddress; + this.replayServerPort = replayServerPort; } /** @@ -51,20 +82,51 @@ private NoMergingQueryStarter(final ExecutionPlanDagMap executionPlanDagMap, * and starts to receive input data stream from the sources. */ @Override - public void start(final String queryId, - final Query query, - final DAG configDag, - final List jarFilePaths) + public Set> startOrSetupForReplay(final boolean isStart, + final String queryId, + final Query query, + final DAG configDag, + final List jarFilePaths) throws IOException, ClassNotFoundException { + // The set to contain the tuples (topic and broker uri) of the submitted query. + final Set> topicAndBrokerURISet = new HashSet<>(); final ExecutionDag submittedExecutionDag = dagGenerator.generate(configDag, jarFilePaths); + if (!isStart) { + for (final ExecutionVertex source : submittedExecutionDag.getDag().getRootVertices()) { + srcAndQueryIdMap.put((PhysicalSource) source, queryId); + } + for (final ConfigVertex source : configDag.getRootVertices()) { + final Map srcConfMap = source.getConfiguration(); + + final String mqttTopic = srcConfMap.get(ConfKeys.MQTTSourceConf.MQTT_SRC_TOPIC.name()); + final String mqttBrokerAddressAndPort = srcConfMap.get(ConfKeys.MQTTSourceConf.MQTT_SRC_BROKER_URI.name()); + + if (mqttTopic != null && mqttBrokerAddressAndPort != null) { + final Tuple brokerTopic = new Tuple<>(mqttTopic, mqttBrokerAddressAndPort); + topicAndBrokerURISet.add(brokerTopic); + } else { + LOG.log(Level.WARNING, "No mqttTopic or mqttBrokerAddressAndPort were found"); + } + } + } executionPlanDagMap.put(queryId, submittedExecutionDag); QueryStarterUtils.setUpOutputEmitters(submittedExecutionDag, query); - // starts to receive input data stream from the sources - final DAG dag = submittedExecutionDag.getDag(); - for (final ExecutionVertex source : dag.getRootVertices()) { - final PhysicalSource ps = (PhysicalSource)source; - ps.start(); + if (isStart) { + // starts to receive input data stream from the sources + final DAG dag = submittedExecutionDag.getDag(); + for (final ExecutionVertex source : dag.getRootVertices()) { + final PhysicalSource ps = (PhysicalSource)source; + ps.start(); + } } + return topicAndBrokerURISet; + } + + @Override + public void replayAndStart(final Map>> queryIdAndBrokerTopicMap, + final long minTimestamp) + throws InjectionException, IOException, ClassNotFoundException { + super.replayAndStart(queryIdAndBrokerTopicMap, minTimestamp, executionPlanDagMap.getExecutionDags()); } } diff --git a/mist-core/src/test/java/edu/snu/mist/core/sources/MQTTSourceTest.java b/mist-core/src/test/java/edu/snu/mist/core/sources/MQTTSourceTest.java index 975e6d49..0963d2e3 100644 --- a/mist-core/src/test/java/edu/snu/mist/core/sources/MQTTSourceTest.java +++ b/mist-core/src/test/java/edu/snu/mist/core/sources/MQTTSourceTest.java @@ -86,8 +86,12 @@ public void testMQTTDataGenerator() throws Exception { dataGenerator1.setEventGenerator(eventGenerator1); dataGenerator2.setEventGenerator(eventGenerator2); + dataGenerator1.setup(); + dataGenerator2.setup(); dataGenerator1.start(); dataGenerator2.start(); + // Wait for the MQTT data generator to subscribe. + Thread.sleep(500); // create test client and publish data final PublishTestClient publishClient = new PublishTestClient(MqttUtils.BROKER_URI); diff --git a/mist-core/src/test/java/edu/snu/mist/core/task/merging/ImmediateQueryMergingStarterTest.java b/mist-core/src/test/java/edu/snu/mist/core/task/merging/ImmediateQueryMergingStarterTest.java index 99533556..0c569249 100644 --- a/mist-core/src/test/java/edu/snu/mist/core/task/merging/ImmediateQueryMergingStarterTest.java +++ b/mist-core/src/test/java/edu/snu/mist/core/task/merging/ImmediateQueryMergingStarterTest.java @@ -177,7 +177,7 @@ private Tuple, ExecutionDag> generateSimpleDag( public void singleQueryMergingTest() throws InjectionException, IOException, ClassNotFoundException { final List result = new LinkedList<>(); - // Physical vertices + // Physical vertices final Map sourceConf = idAndConfGenerator.generateConf(); final Map ocConf = idAndConfGenerator.generateConf(); final Map sinkConf = idAndConfGenerator.generateConf(); @@ -201,7 +201,7 @@ public void singleQueryMergingTest() throws InjectionException, IOException, Cla // Execute the query 1 final Query query1 = mock(Query.class); final List paths = mock(List.class); - queryStarter.start("q1", query1, dagTuple.getKey(), paths); + queryStarter.startOrSetupForReplay(true, "q1", query1, dagTuple.getKey(), paths); // Generate events for the query and check if the dag is executed correctly final String data1 = "Hello"; @@ -307,8 +307,8 @@ public void mergingDifferentSourceQueriesOneGroupTest() final String query1Id = "q1"; final String query2Id = "q2"; - queryStarter.start(query1Id, query1, dagTuple1.getKey(), paths1); - queryStarter.start(query2Id, query2, dagTuple2.getKey(), paths2); + queryStarter.startOrSetupForReplay(true, query1Id, query1, dagTuple1.getKey(), paths1); + queryStarter.startOrSetupForReplay(true, query2Id, query2, dagTuple2.getKey(), paths2); // The query 1 and 2 have different sources, so they should be executed separately final String data1 = "Hello"; @@ -434,8 +434,8 @@ public void mergingSameSourceAndSameOperatorQueriesOneGroupTest() final Query query2 = mock(Query.class); final String query1Id = "q1"; final String query2Id = "q2"; - queryStarter.start(query1Id, query1, dagTuple1.getKey(), paths1); - queryStarter.start(query2Id, query2, dagTuple2.getKey(), paths2); + queryStarter.startOrSetupForReplay(true, query1Id, query1, dagTuple1.getKey(), paths1); + queryStarter.startOrSetupForReplay(true, query2Id, query2, dagTuple2.getKey(), paths2); // Generate events for the merged query and check if the dag is executed correctly final String data = "Hello"; @@ -565,8 +565,8 @@ public void mergingSameSourceButDifferentOperatorQueriesOneGroupTest() final String query2Id = "q2"; final Query query1 = mock(Query.class); final Query query2 = mock(Query.class); - queryStarter.start(query1Id, query1, dagTuple1.getKey(), paths1); - queryStarter.start(query2Id, query2, dagTuple2.getKey(), paths2); + queryStarter.startOrSetupForReplay(true, query1Id, query1, dagTuple1.getKey(), paths1); + queryStarter.startOrSetupForReplay(true, query2Id, query2, dagTuple2.getKey(), paths2); // Generate events for the merged query and check if the dag is executed correctly final String data1 = "Hello"; diff --git a/mist-core/src/test/java/edu/snu/mist/core/task/utils/TestDataGenerator.java b/mist-core/src/test/java/edu/snu/mist/core/task/utils/TestDataGenerator.java index 34ef7a6a..9cea96b1 100644 --- a/mist-core/src/test/java/edu/snu/mist/core/task/utils/TestDataGenerator.java +++ b/mist-core/src/test/java/edu/snu/mist/core/task/utils/TestDataGenerator.java @@ -49,6 +49,11 @@ public TestDataGenerator(final List inputs) { this.inputs = inputs.iterator(); } + @Override + public void setup() { + // This method is only for MQTTDataGenerator. + } + @Override public void start() { if (started.compareAndSet(false, true)) {