Skip to content

Commit 40ba8a4

Browse files
Kunal Khatuakkhatua
authored andcommitted
DRILL-2362: Profile Mgmt
1 parent 05a1a3a commit 40ba8a4

File tree

12 files changed

+851
-66
lines changed

12 files changed

+851
-66
lines changed

exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,11 @@ private ExecConstants() {
236236
public static final String SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE = "drill.exec.sys.store.provider.local.write";
237237
public static final String PROFILES_STORE_INMEMORY = "drill.exec.profiles.store.inmemory";
238238
public static final String PROFILES_STORE_CAPACITY = "drill.exec.profiles.store.capacity";
239+
public static final String PROFILES_STORE_CACHE_SIZE = "drill.exec.profiles.store.cache.size";
240+
public static final String PROFILES_STORE_INDEX_ENABLED = "drill.exec.profiles.store.index.enabled";
241+
public static final String PROFILES_STORE_INDEX_FORMAT = "drill.exec.profiles.store.index.format";
242+
public static final String PROFILES_STORE_INDEX_MAX = "drill.exec.profiles.store.index.max";
243+
public static final String PROFILES_STORE_INDEX_SUPPORTED_FS = "drill.exec.profiles.store.index.supported.fs";
239244
public static final String IMPERSONATION_ENABLED = "drill.exec.impersonation.enabled";
240245
public static final String IMPERSONATION_MAX_CHAINED_USER_HOPS = "drill.exec.impersonation.max_chained_user_hops";
241246
public static final String AUTHENTICATION_MECHANISMS = "drill.exec.security.auth.mechanisms";

exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,16 +33,20 @@
3333
import org.apache.drill.exec.exception.DrillbitStartupException;
3434
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
3535
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State;
36+
import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
3637
import org.apache.drill.exec.server.DrillbitStateManager.DrillbitState;
3738
import org.apache.drill.exec.server.options.OptionDefinition;
3839
import org.apache.drill.exec.server.options.OptionValue;
3940
import org.apache.drill.exec.server.options.OptionValue.OptionScope;
41+
import org.apache.drill.exec.server.profile.ProfileIndexer;
4042
import org.apache.drill.exec.server.options.SystemOptionManager;
4143
import org.apache.drill.exec.server.rest.WebServer;
4244
import org.apache.drill.exec.service.ServiceEngine;
4345
import org.apache.drill.exec.store.StoragePluginRegistry;
46+
import org.apache.drill.exec.store.sys.PersistentStore;
4447
import org.apache.drill.exec.store.sys.PersistentStoreProvider;
4548
import org.apache.drill.exec.store.sys.PersistentStoreRegistry;
49+
import org.apache.drill.exec.store.sys.store.LocalPersistentStore;
4650
import org.apache.drill.exec.store.sys.store.provider.CachingPersistentStoreProvider;
4751
import org.apache.drill.exec.store.sys.store.provider.InMemoryStoreProvider;
4852
import org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider;
@@ -235,6 +239,15 @@ public void run() throws Exception {
235239
shutdownHook = new ShutdownThread(this, new StackTrace());
236240
Runtime.getRuntime().addShutdownHook(shutdownHook);
237241
gracefulShutdownThread.start();
242+
243+
// Launch an archiving job that is # files and time bound
244+
PersistentStore<QueryProfile> queryProfileStore = drillbitContext.getProfileStoreContext().getCompletedProfileStore();
245+
if (queryProfileStore instanceof LocalPersistentStore
246+
&& context.getConfig().getBoolean(ExecConstants.PROFILES_STORE_INDEX_ENABLED)) {
247+
ProfileIndexer profileIndexer = new ProfileIndexer(coord, drillbitContext);
248+
profileIndexer.indexProfiles();
249+
}
250+
238251
logger.info("Startup completed ({} ms).", w.elapsed(TimeUnit.MILLISECONDS));
239252
}
240253

Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.drill.exec.server.profile;
19+
20+
import static org.apache.drill.exec.ExecConstants.DRILL_SYS_FILE_SUFFIX;
21+
22+
import java.io.IOException;
23+
import java.io.InputStream;
24+
import java.text.SimpleDateFormat;
25+
import java.util.Date;
26+
import java.util.HashMap;
27+
import java.util.List;
28+
import java.util.Map;
29+
import java.util.concurrent.TimeUnit;
30+
31+
import org.apache.commons.io.IOUtils;
32+
import org.apache.commons.lang3.exception.ExceptionUtils;
33+
import org.apache.drill.common.config.DrillConfig;
34+
import org.apache.drill.exec.ExecConstants;
35+
import org.apache.drill.exec.coord.ClusterCoordinator;
36+
import org.apache.drill.exec.coord.DistributedSemaphore;
37+
import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease;
38+
import org.apache.drill.exec.coord.zk.ZKClusterCoordinator;
39+
import org.apache.drill.exec.coord.zk.ZkDistributedSemaphore;
40+
import org.apache.drill.exec.exception.StoreException;
41+
import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
42+
import org.apache.drill.exec.server.DrillbitContext;
43+
import org.apache.drill.exec.server.QueryProfileStoreContext;
44+
import org.apache.drill.exec.store.dfs.DrillFileSystem;
45+
import org.apache.drill.exec.store.sys.PersistentStoreConfig;
46+
import org.apache.drill.exec.store.sys.store.DrillSysFilePathFilter;
47+
import org.apache.drill.exec.store.sys.store.LocalPersistentStore;
48+
import org.apache.drill.exec.store.sys.store.ProfileSet;
49+
import org.apache.drill.exec.store.sys.store.provider.ZookeeperPersistentStoreProvider;
50+
import org.apache.drill.exec.util.DrillFileSystemUtil;
51+
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
52+
import org.apache.hadoop.fs.FileStatus;
53+
import org.apache.hadoop.fs.Path;
54+
import org.apache.hadoop.fs.PathFilter;
55+
import org.slf4j.Logger;
56+
import org.slf4j.LoggerFactory;
57+
58+
/**
59+
* Manage profiles by archiving
60+
*/
61+
public class ProfileIndexer {
62+
private static final Logger logger = LoggerFactory.getLogger(ProfileIndexer.class);
63+
private static final String lockPathString = "/profileIndexer";
64+
private static final int DRILL_SYS_FILE_EXT_SIZE = DRILL_SYS_FILE_SUFFIX.length();
65+
66+
private final ZKClusterCoordinator zkCoord;
67+
private final DrillFileSystem fs;
68+
private final Path basePath;
69+
private final ProfileSet profiles;
70+
private final int indexingRate;
71+
private final PathFilter sysFileSuffixFilter;
72+
private SimpleDateFormat indexedPathFormat;
73+
private final boolean useZkCoordinatedManagement;
74+
private DrillConfig drillConfig;
75+
76+
private PersistentStoreConfig<QueryProfile> pStoreConfig;
77+
private LocalPersistentStore<QueryProfile> completedProfileStore;
78+
private Stopwatch indexWatch;
79+
private int indexedCount;
80+
private int currentProfileCount;
81+
82+
83+
/**
84+
* ProfileIndexer
85+
*/
86+
public ProfileIndexer(ClusterCoordinator coord, DrillbitContext context) throws StoreException, IOException {
87+
drillConfig = context.getConfig();
88+
89+
// FileSystem
90+
try {
91+
this.fs = inferFileSystem(drillConfig);
92+
} catch (IOException ex) {
93+
throw new StoreException("Unable to get filesystem", ex);
94+
}
95+
96+
//Use Zookeeper for coordinated management
97+
final List<String> supportedFS = drillConfig.getStringList(ExecConstants.PROFILES_STORE_INDEX_SUPPORTED_FS);
98+
if (this.useZkCoordinatedManagement = supportedFS.contains(fs.getScheme())) {
99+
this.zkCoord = (ZKClusterCoordinator) coord;
100+
} else {
101+
this.zkCoord = null;
102+
}
103+
104+
// Query Profile Store
105+
QueryProfileStoreContext pStoreContext = context.getProfileStoreContext();
106+
this.completedProfileStore = (LocalPersistentStore<QueryProfile>) pStoreContext.getCompletedProfileStore();
107+
this.pStoreConfig = pStoreContext.getProfileStoreConfig();
108+
this.basePath = completedProfileStore.getBasePath();
109+
110+
this.indexingRate = drillConfig.getInt(ExecConstants.PROFILES_STORE_INDEX_MAX);
111+
this.profiles = new ProfileSet(indexingRate);
112+
this.indexWatch = Stopwatch.createUnstarted();
113+
this.sysFileSuffixFilter = new DrillSysFilePathFilter();
114+
String indexPathPattern = drillConfig.getString(ExecConstants.PROFILES_STORE_INDEX_FORMAT);
115+
this.indexedPathFormat = new SimpleDateFormat(indexPathPattern);
116+
logger.info("Organizing any existing unindexed profiles");
117+
}
118+
119+
120+
/**
121+
* Index profiles
122+
*/
123+
public void indexProfiles() {
124+
this.indexWatch.start();
125+
126+
// Acquire lock IFF required
127+
if (useZkCoordinatedManagement) {
128+
DistributedSemaphore indexerMutex = new ZkDistributedSemaphore(zkCoord.getCurator(), lockPathString, 1);
129+
try (DistributedLease lease = indexerMutex.acquire(0, TimeUnit.SECONDS)) {
130+
if (lease != null) {
131+
listAndIndex();
132+
} else {
133+
logger.debug("Couldn't get a lease acquisition");
134+
}
135+
} catch (Exception e) {
136+
//DoNothing since lease acquisition failed
137+
logger.error("Exception during lease-acquisition:: {}", e);
138+
}
139+
} else {
140+
try {
141+
listAndIndex();
142+
} catch (IOException e) {
143+
logger.error("Failed to index: {}", e);
144+
}
145+
}
146+
logger.info("Successfully indexed {} of {} profiles during startup in {} seconds", indexedCount, currentProfileCount, this.indexWatch.stop().elapsed(TimeUnit.SECONDS));
147+
}
148+
149+
150+
//Lists and Indexes the latest profiles
151+
private void listAndIndex() throws IOException {
152+
currentProfileCount = listForArchiving();
153+
indexedCount = 0;
154+
logger.info("Found {} profiles that need to be indexed. Will attempt to index {} profiles", currentProfileCount,
155+
(currentProfileCount > this.indexingRate) ? this.indexingRate : currentProfileCount);
156+
157+
// Track MRU index paths
158+
Map<String, Path> mruIndexPath = new HashMap<>();
159+
if (currentProfileCount > 0) {
160+
while (!this.profiles.isEmpty()) {
161+
String profileToIndex = profiles.removeYoungest() + DRILL_SYS_FILE_SUFFIX;
162+
Path srcPath = new Path(basePath, profileToIndex);
163+
long profileStartTime = getProfileStart(srcPath);
164+
if (profileStartTime < 0) {
165+
logger.debug("Will skip indexing {}", srcPath);
166+
continue;
167+
}
168+
String indexPath = indexedPathFormat.format(new Date(profileStartTime));
169+
//Check if dest dir exists
170+
Path indexDestPath = null;
171+
if (!mruIndexPath.containsKey(indexPath)) {
172+
indexDestPath = new Path(basePath, indexPath);
173+
if (!fs.isDirectory(indexDestPath)) {
174+
// Build dir
175+
if (fs.mkdirs(indexDestPath)) {
176+
mruIndexPath.put(indexPath, indexDestPath);
177+
} else {
178+
//Creation failed. Did someone else create?
179+
if (fs.isDirectory(indexDestPath)) {
180+
mruIndexPath.put(indexPath, indexDestPath);
181+
}
182+
}
183+
} else {
184+
mruIndexPath.put(indexPath, indexDestPath);
185+
}
186+
} else {
187+
indexDestPath = mruIndexPath.get(indexPath);
188+
}
189+
190+
//Attempt Move
191+
boolean renameStatus = false;
192+
if (indexDestPath != null) {
193+
Path destPath = new Path(indexDestPath, profileToIndex);
194+
renameStatus = DrillFileSystemUtil.rename(fs, srcPath, destPath);
195+
if (renameStatus) {
196+
indexedCount++;
197+
}
198+
}
199+
if (indexDestPath == null || !renameStatus) {
200+
// Stop attempting any more archiving since other StoreProviders might be archiving
201+
logger.error("Move failed for {} [{} | {}]", srcPath, indexDestPath == null, renameStatus);
202+
continue;
203+
}
204+
}
205+
}
206+
}
207+
208+
// Deserialized and extract the profile's start time
209+
private long getProfileStart(Path srcPath) {
210+
try (InputStream is = fs.open(srcPath)) {
211+
QueryProfile profile = pStoreConfig.getSerializer().deserialize(IOUtils.toByteArray(is));
212+
return profile.getStart();
213+
} catch (IOException e) {
214+
logger.info("Unable to deserialize {}\n---{}====", srcPath, e.getMessage()); //Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens at [Source: [B@f76ca5b; line: 1, column: 65538]
215+
logger.info("deserialization RCA==> \n {}", ExceptionUtils.getRootCause(e));
216+
}
217+
return Long.MIN_VALUE;
218+
}
219+
220+
// List all profiles in store's root and identify potential candidates for archiving
221+
private int listForArchiving() throws IOException {
222+
// Not performing recursive search of profiles
223+
List<FileStatus> fileStatuses = DrillFileSystemUtil.listFiles(fs, basePath, false, sysFileSuffixFilter );
224+
225+
int numProfilesInStore = 0;
226+
for (FileStatus stat : fileStatuses) {
227+
String profileName = stat.getPath().getName();
228+
//Strip extension and store only query ID
229+
profiles.add(profileName.substring(0, profileName.length() - DRILL_SYS_FILE_EXT_SIZE), false);
230+
numProfilesInStore++;
231+
}
232+
233+
return numProfilesInStore;
234+
}
235+
236+
// Infers File System of Local Store
237+
private DrillFileSystem inferFileSystem(DrillConfig drillConfig) throws IOException {
238+
boolean hasZkBlobRoot = drillConfig.hasPath(ZookeeperPersistentStoreProvider.DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT);
239+
final Path blobRoot = hasZkBlobRoot ?
240+
new org.apache.hadoop.fs.Path(drillConfig.getString(ZookeeperPersistentStoreProvider.DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT)) :
241+
LocalPersistentStore.getLogDir();
242+
243+
return LocalPersistentStore.getFileSystem(drillConfig, blobRoot);
244+
}
245+
246+
}

exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import javax.ws.rs.core.UriInfo;
3838
import javax.xml.bind.annotation.XmlRootElement;
3939

40+
4041
import org.apache.drill.common.config.DrillConfig;
4142
import org.apache.drill.common.exceptions.DrillRuntimeException;
4243
import org.apache.drill.common.exceptions.UserException;
@@ -90,7 +91,7 @@ public static class ProfileInfo implements Comparable<ProfileInfo> {
9091

9192
public ProfileInfo(DrillConfig drillConfig, String queryId, long startTime, long endTime, String foreman, String query,
9293
String state, String user, double totalCost, String queueName) {
93-
this.queryId = queryId;
94+
this.queryId = queryId.substring(queryId.lastIndexOf('/') + 1);
9495
this.startTime = startTime;
9596
this.endTime = endTime;
9697
this.time = new Date(startTime);
@@ -349,7 +350,7 @@ private QueryProfile getQueryProfile(String queryId) {
349350
return queryProfile;
350351
}
351352
} catch (final Exception e) {
352-
throw new DrillRuntimeException("error while retrieving profile", e);
353+
throw new DrillRuntimeException("Error while retrieving profile: " + e.getMessage(), e);
353354
}
354355

355356
throw UserException.validationError()
@@ -378,7 +379,7 @@ public Viewable getProfile(@PathParam("queryid") String queryId){
378379
ProfileWrapper wrapper = new ProfileWrapper(getQueryProfile(queryId), work.getContext().getConfig());
379380
return ViewableWithPermissions.create(authEnabled.get(), "/rest/profile/profile.ftl", sc, wrapper);
380381
} catch (Exception | Error e) {
381-
logger.error("Exception was thrown when fetching profile {} :\n{}", queryId, e);
382+
logger.error("Exception was thrown when fetching profile {} :\n{}\n====\n", queryId, e);
382383
return ViewableWithPermissions.create(authEnabled.get(), "/rest/errorMessage.ftl", sc, e);
383384
}
384385
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.drill.exec.store.sys.store;
19+
20+
import static org.apache.drill.exec.ExecConstants.DRILL_SYS_FILE_SUFFIX;
21+
22+
import org.apache.hadoop.fs.Path;
23+
import org.apache.hadoop.fs.PathFilter;
24+
25+
/**
26+
* Filter for Drill System Files
27+
*/
28+
public class DrillSysFilePathFilter implements PathFilter {
29+
30+
//NOTE: The filename is a combination of query ID (which is monotonically
31+
//decreasing value derived off epoch timestamp) and a random value. This
32+
//filter helps eliminate that list
33+
String cutoffFileName = null;
34+
public DrillSysFilePathFilter() {}
35+
36+
public DrillSysFilePathFilter(String cutoffSysFileName) {
37+
if (cutoffSysFileName != null) {
38+
this.cutoffFileName = cutoffSysFileName + DRILL_SYS_FILE_SUFFIX;
39+
}
40+
}
41+
42+
/* (non-Javadoc)
43+
* @see org.apache.hadoop.fs.PathFilter#accept(org.apache.hadoop.fs.Path)
44+
*/
45+
@Override
46+
public boolean accept(Path file){
47+
if (file.getName().endsWith(DRILL_SYS_FILE_SUFFIX)) {
48+
if (cutoffFileName != null) {
49+
return (file.getName().compareTo(cutoffFileName) <= 0);
50+
} else {
51+
return true;
52+
}
53+
}
54+
return false;
55+
}
56+
}

0 commit comments

Comments
 (0)