1515
1616import com .facebook .airlift .json .JsonCodec ;
1717import com .facebook .airlift .log .Logger ;
18+ import com .facebook .airlift .units .DataSize ;
1819import com .facebook .airlift .units .Duration ;
1920import com .facebook .presto .Session ;
2021import com .facebook .presto .client .ServerInfo ;
2324import com .facebook .presto .spark .execution .http .server .RequestErrorTracker ;
2425import com .facebook .presto .spark .execution .http .server .smile .BaseResponse ;
2526import com .facebook .presto .spark .execution .property .NativeExecutionSystemConfig ;
27+ import com .facebook .presto .spark .execution .property .PrestoSparkWorkerProperty ;
2628import com .facebook .presto .spark .execution .property .WorkerProperty ;
2729import com .facebook .presto .spi .PrestoException ;
2830import com .google .common .annotations .VisibleForTesting ;
3234import com .google .common .util .concurrent .SettableFuture ;
3335import okhttp3 .HttpUrl ;
3436import okhttp3 .OkHttpClient ;
37+ import org .apache .spark .SparkConf ;
3538import org .apache .spark .SparkEnv$ ;
3639import org .apache .spark .SparkFiles ;
3740
6164import java .util .concurrent .TimeUnit ;
6265import java .util .concurrent .atomic .AtomicBoolean ;
6366
67+ import static com .facebook .airlift .units .DataSize .Unit .BYTE ;
68+ import static com .facebook .airlift .units .DataSize .Unit .GIGABYTE ;
6469import static com .facebook .presto .spi .StandardErrorCode .GENERIC_INTERNAL_ERROR ;
6570import static com .facebook .presto .spi .StandardErrorCode .NATIVE_EXECUTION_BINARY_NOT_EXIST ;
6671import static com .facebook .presto .spi .StandardErrorCode .NATIVE_EXECUTION_PROCESS_LAUNCH_ERROR ;
6772import static com .facebook .presto .spi .StandardErrorCode .NATIVE_EXECUTION_TASK_ERROR ;
68- import static com .google .common .base .Preconditions .checkArgument ;
6973import static com .google .common .util .concurrent .Futures .addCallback ;
7074import static com .google .common .util .concurrent .MoreExecutors .directExecutor ;
7175import static java .lang .String .format ;
@@ -82,6 +86,7 @@ public class NativeExecutionProcess
8286 private static final String WORKER_CONFIG_FILE = "/config.properties" ;
8387 private static final String WORKER_NODE_CONFIG_FILE = "/node.properties" ;
8488 private static final String WORKER_CONNECTOR_CONFIG_FILE = "/catalog/" ;
89+ private static final String NATIVE_PROCESS_MEMORY_SPARK_CONF_NAME = "spark.memory.offHeap.size" ;
8590 private static final int SIGSYS = 31 ;
8691
8792 private final String executablePath ;
@@ -131,6 +136,8 @@ public NativeExecutionProcess(
131136 scheduledExecutorService ,
132137 "getting native process status" );
133138 this .workerProperty = requireNonNull (workerProperty , "workerProperty is null" );
139+ // Update any runtime configs to be used by presto native worker
140+ updateWorkerProperties ();
134141 }
135142
136143 /**
@@ -325,25 +332,111 @@ private static int getAvailableTcpPort(String nodeInternalAddress)
325332 }
326333 }
327334
328- private String getNativeExecutionCatalogName (Session session )
335+ private void populateConfigurationFiles (String configBasePath )
336+ throws IOException
329337 {
330- checkArgument (session .getCatalog ().isPresent (), "Catalog isn't set in the session." );
331- return session .getCatalog ().get ();
338+ workerProperty .populateAllProperties (
339+ Paths .get (configBasePath , WORKER_CONFIG_FILE ),
340+ Paths .get (configBasePath , WORKER_NODE_CONFIG_FILE ),
341+ Paths .get (configBasePath , WORKER_CONNECTOR_CONFIG_FILE )); // Directory path for catalogs
332342 }
333343
334- private void populateConfigurationFiles (String configBasePath )
335- throws IOException
344+ private void updateWorkerProperties ()
336345 {
346+ // Update memory properties
347+ updateWorkerMemoryProperties ();
348+
337349 // The reason we have to pick and assign the port per worker is in our prod environment,
338350 // there is no port isolation among all the containers running on the same host, so we have
339351 // to pick unique port per worker to avoid port collision. This config will be passed down to
340352 // the native execution process eventually for process initialization.
341353 workerProperty .getSystemConfig ()
342354 .update (NativeExecutionSystemConfig .HTTP_SERVER_HTTP_PORT , String .valueOf (port ));
343- workerProperty .populateAllProperties (
344- Paths .get (configBasePath , WORKER_CONFIG_FILE ),
345- Paths .get (configBasePath , WORKER_NODE_CONFIG_FILE ),
346- Paths .get (configBasePath , WORKER_CONNECTOR_CONFIG_FILE )); // Directory path for catalogs
355+ }
356+
357+ protected SparkConf getSparkConf ()
358+ {
359+ return SparkEnv$ .MODULE$ .get () == null ? null : SparkEnv$ .MODULE$ .get ().conf ();
360+ }
361+
362+ protected PrestoSparkWorkerProperty getWorkerProperty ()
363+ {
364+ return (PrestoSparkWorkerProperty ) workerProperty ;
365+ }
366+
367+ /**
368+ * Computes values for system-memory-gb and query-memory-gb to start the native worker
369+ * with.
370+ * This logic is mainly useful when spark has provisioned larger containers to run
371+ * previously OOMing tasks. Spark will provision larger container but without below
372+ * logic the cpp process will not be able to use it.
373+ *
374+ * Also, we write the logic in a way that same logic applies during first attempt v/s
375+ * subsequent OOMed larger container retry attempts
376+ *
377+ * The logic is simple and is as below
378+ * - New system-memory-gb = spark.memory.offHeap.size
379+ * - Then to calculate the new value of query-memory-gb we assume that
380+ * the new query-memory to system-memory ratio should be same as old values.
381+ * So we set newQueryMemory = newSystemMemory = (oldQueryMemory/oldSystemMemory)
382+ *
383+ * TODO: In future make this algorithm more configurable. i.e. we might want a min/max
384+ * cap on the systemMemoryGb-queryMemoryGb buffer. Currently we just assume ratio
385+ * is good enough
386+ */
387+ protected void updateWorkerMemoryProperties ()
388+ {
389+ // If sparkConf.NATIVE_PROCESS_MEMORY_SPARK_CONF_NAME is not set
390+ // skip making any updates
391+ SparkConf conf = getSparkConf ();
392+ if (conf == null ) {
393+ log .info ("Not adjusting native process memory as conf is null" );
394+ return ;
395+ }
396+ if (!conf .contains (NATIVE_PROCESS_MEMORY_SPARK_CONF_NAME )) {
397+ log .info ("Not adjusting native process memory as %s is not set" , NATIVE_PROCESS_MEMORY_SPARK_CONF_NAME );
398+ return ;
399+ }
400+ DataSize offHeapMemoryBytes = DataSize .succinctDataSize (
401+ conf .getSizeAsBytes (NATIVE_PROCESS_MEMORY_SPARK_CONF_NAME ), BYTE );
402+ DataSize currentSystemMemory = DataSize .valueOf (workerProperty .getSystemConfig ().getAllProperties ()
403+ .get (NativeExecutionSystemConfig .SYSTEM_MEMORY_GB ) + GIGABYTE .getUnitString ());
404+ DataSize currentQueryMemory = DataSize .valueOf (workerProperty .getSystemConfig ().getAllProperties ()
405+ .get (NativeExecutionSystemConfig .QUERY_MEMORY_GB ) + GIGABYTE .getUnitString ());
406+ if (offHeapMemoryBytes .toBytes () == 0
407+ || currentSystemMemory .toBytes () == 0
408+ || offHeapMemoryBytes .toBytes () < currentSystemMemory .toBytes ()) {
409+ log .info ("Not adjusting native process memory as" +
410+ " offHeapMemoryBytes=%s,currentSystemMemory=%s are invalid" , offHeapMemoryBytes , currentSystemMemory .toBytes ());
411+ return ;
412+ }
413+
414+ log .info ("Setting Native Worker system-memory-gb to offHeap: %s" , offHeapMemoryBytes );
415+ DataSize newSystemMemory = offHeapMemoryBytes .convertTo (GIGABYTE );
416+
417+ double queryMemoryFraction = currentQueryMemory .toBytes () * 1.0 / currentSystemMemory .toBytes ();
418+ DataSize newQueryMemoryBytes = DataSize .succinctDataSize (
419+ queryMemoryFraction * newSystemMemory .toBytes (), BYTE );
420+ log .info ("Dynamically Tuning Presto Native Memory Configs. " +
421+ "Configured SparkOffHeap: %s; " +
422+ "[oldSystemMemory: %s, newSystemMemory: %s], queryMemoryFraction: %s, " +
423+ "[oldQueryMemory: %s, newQueryMemory: %s]" ,
424+ offHeapMemoryBytes ,
425+ currentSystemMemory ,
426+ newSystemMemory ,
427+ queryMemoryFraction ,
428+ currentQueryMemory ,
429+ newQueryMemoryBytes );
430+
431+ workerProperty .getSystemConfig ()
432+ .update (NativeExecutionSystemConfig .SYSTEM_MEMORY_GB ,
433+ String .valueOf ((int ) newSystemMemory .getValue (GIGABYTE )));
434+ workerProperty .getSystemConfig ()
435+ .update (NativeExecutionSystemConfig .QUERY_MEMORY_GB ,
436+ String .valueOf ((int ) newQueryMemoryBytes .getValue (GIGABYTE )));
437+ workerProperty .getSystemConfig ()
438+ .update (NativeExecutionSystemConfig .QUERY_MAX_MEMORY_PER_NODE ,
439+ newQueryMemoryBytes .convertTo (GIGABYTE ).toString ());
347440 }
348441
349442 private void doGetServerInfo (SettableFuture <ServerInfo > future )
0 commit comments