Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions chapter2/py/src/mnmcount.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import print_function

import sys
import os

from pyspark.sql import SparkSession

Expand All @@ -9,9 +10,16 @@
print("Usage: mnmcount <file>", file=sys.stderr)
sys.exit(-1)

# Set Java system properties to fix Java 23 compatibility issues
os.environ["PYSPARK_SUBMIT_ARGS"] = "--conf spark.driver.extraJavaOptions='-Djava.security.manager=allow' --conf spark.executor.extraJavaOptions='-Djava.security.manager=allow' pyspark-shell"

spark = (SparkSession
.builder
.appName("PythonMnMCount")
.config("spark.driver.extraJavaOptions", "-Djava.security.manager=allow")
.config("spark.executor.extraJavaOptions", "-Djava.security.manager=allow")
.config("spark.sql.adaptive.enabled", "false")
.config("spark.sql.adaptive.coalescePartitions.enabled", "false")
.getOrCreate())
# get the M&M data set file name
mnm_file = sys.argv[1]
Expand Down Expand Up @@ -42,4 +50,7 @@

# show the resulting aggregation for California
ca_count_mnm_df.show(n=10, truncate=False)

# Stop the Spark session
spark.stop()

13 changes: 12 additions & 1 deletion chapter3/py/src/Example-3_6.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import os

# Set Java system properties to fix Java 23 compatibility issues
os.environ["PYSPARK_SUBMIT_ARGS"] = "--conf spark.driver.extraJavaOptions='-Djava.security.manager=allow' --conf spark.executor.extraJavaOptions='-Djava.security.manager=allow' pyspark-shell"

# define schema for our data
schema = StructType([
Expand All @@ -26,6 +30,10 @@
spark = (SparkSession
.builder
.appName("Example-3_6")
.config("spark.driver.extraJavaOptions", "-Djava.security.manager=allow")
.config("spark.executor.extraJavaOptions", "-Djava.security.manager=allow")
.config("spark.sql.adaptive.enabled", "false")
.config("spark.sql.adaptive.coalescePartitions.enabled", "false")
.getOrCreate())
# create a DataFrame using the schema defined above
blogs_df = spark.createDataFrame(data, schema)
Expand All @@ -40,4 +48,7 @@
blogs_df.select(expr("Hits * 2")).show(2)
# show heavy hitters
blogs_df.withColumn("Big Hitters", (expr("Hits > 10000"))).show()
print(blogs_df.schema)
print(blogs_df.schema)

# Stop the Spark session
spark.stop()