diff --git a/chapter2/py/src/mnmcount.py b/chapter2/py/src/mnmcount.py index 35723bf..608dfe5 100644 --- a/chapter2/py/src/mnmcount.py +++ b/chapter2/py/src/mnmcount.py @@ -1,6 +1,7 @@ from __future__ import print_function import sys +import os from pyspark.sql import SparkSession @@ -9,9 +10,16 @@ print("Usage: mnmcount ", file=sys.stderr) sys.exit(-1) + # Set Java system properties to fix Java 23 compatibility issues + os.environ["PYSPARK_SUBMIT_ARGS"] = "--conf spark.driver.extraJavaOptions='-Djava.security.manager=allow' --conf spark.executor.extraJavaOptions='-Djava.security.manager=allow' pyspark-shell" + spark = (SparkSession .builder .appName("PythonMnMCount") + .config("spark.driver.extraJavaOptions", "-Djava.security.manager=allow") + .config("spark.executor.extraJavaOptions", "-Djava.security.manager=allow") + .config("spark.sql.adaptive.enabled", "false") + .config("spark.sql.adaptive.coalescePartitions.enabled", "false") .getOrCreate()) # get the M&M data set file name mnm_file = sys.argv[1] @@ -42,4 +50,7 @@ # show the resulting aggregation for California ca_count_mnm_df.show(n=10, truncate=False) + + # Stop the Spark session + spark.stop() diff --git a/chapter3/py/src/Example-3_6.py b/chapter3/py/src/Example-3_6.py index 1fe07f5..c86f236 100644 --- a/chapter3/py/src/Example-3_6.py +++ b/chapter3/py/src/Example-3_6.py @@ -1,6 +1,10 @@ from pyspark.sql.types import * from pyspark.sql import SparkSession from pyspark.sql.functions import * +import os + +# Set Java system properties to fix Java 23 compatibility issues +os.environ["PYSPARK_SUBMIT_ARGS"] = "--conf spark.driver.extraJavaOptions='-Djava.security.manager=allow' --conf spark.executor.extraJavaOptions='-Djava.security.manager=allow' pyspark-shell" # define schema for our data schema = StructType([ @@ -26,6 +30,10 @@ spark = (SparkSession .builder .appName("Example-3_6") + .config("spark.driver.extraJavaOptions", "-Djava.security.manager=allow") + .config("spark.executor.extraJavaOptions", "-Djava.security.manager=allow") + .config("spark.sql.adaptive.enabled", "false") + .config("spark.sql.adaptive.coalescePartitions.enabled", "false") .getOrCreate()) # create a DataFrame using the schema defined above blogs_df = spark.createDataFrame(data, schema) @@ -40,4 +48,7 @@ blogs_df.select(expr("Hits * 2")).show(2) # show heavy hitters blogs_df.withColumn("Big Hitters", (expr("Hits > 10000"))).show() - print(blogs_df.schema) \ No newline at end of file + print(blogs_df.schema) + + # Stop the Spark session + spark.stop() \ No newline at end of file