diff --git a/python/pyspark/tests/test_taskcontext.py b/python/pyspark/tests/test_taskcontext.py index 083d89eaedc7b..4287c97c58667 100644 --- a/python/pyspark/tests/test_taskcontext.py +++ b/python/pyspark/tests/test_taskcontext.py @@ -301,22 +301,33 @@ def tearDown(self): class TaskContextTestsWithResources(unittest.TestCase): def setUp(self): class_name = self.__class__.__name__ - self.tempFile = tempfile.NamedTemporaryFile(delete=False) - self.tempFile.write(b'echo {\\"name\\": \\"gpu\\", \\"addresses\\": [\\"0\\"]}') - self.tempFile.close() - # create temporary directory for Worker resources coordination - self.tempdir = tempfile.NamedTemporaryFile(delete=False) - os.unlink(self.tempdir.name) + # Create discovery script for GPU + self.gpuTempFile = tempfile.NamedTemporaryFile(delete=False) + self.gpuTempFile.write(b'echo {\\"name\\": \\"gpu\\", \\"addresses\\": [\\"0\\"]}') + self.gpuTempFile.close() os.chmod( - self.tempFile.name, + self.gpuTempFile.name, + stat.S_IRWXU | stat.S_IXGRP | stat.S_IRGRP | stat.S_IROTH | stat.S_IXOTH, + ) + # Create discovery script for FPGA (SPARK-54929) + self.fpgaTempFile = tempfile.NamedTemporaryFile(delete=False) + self.fpgaTempFile.write(b'echo {\\"name\\": \\"fpga\\", \\"addresses\\": [\\"0\\"]}') + self.fpgaTempFile.close() + os.chmod( + self.fpgaTempFile.name, stat.S_IRWXU | stat.S_IXGRP | stat.S_IRGRP | stat.S_IROTH | stat.S_IXOTH, ) conf = SparkConf().set("spark.test.home", SPARK_HOME) - conf = conf.set("spark.worker.resource.gpu.discoveryScript", self.tempFile.name) + conf = conf.set("spark.worker.resource.gpu.discoveryScript", self.gpuTempFile.name) conf = conf.set("spark.worker.resource.gpu.amount", 1) conf = conf.set("spark.task.cpus", 2) conf = conf.set("spark.task.resource.gpu.amount", "1") conf = conf.set("spark.executor.resource.gpu.amount", "1") + # Configure FPGA resource (SPARK-54929) + conf = conf.set("spark.worker.resource.fpga.discoveryScript", self.fpgaTempFile.name) + conf = conf.set("spark.worker.resource.fpga.amount", 1) + conf = conf.set("spark.task.resource.fpga.amount", "1") + conf = conf.set("spark.executor.resource.fpga.amount", "1") self.sc = SparkContext("local-cluster[2,2,1024]", class_name, conf=conf) def test_cpus(self): @@ -326,16 +337,20 @@ def test_cpus(self): self.assertEqual(cpus, 2) def test_resources(self): - """Test the resources are available.""" + """Test that multiple resources are all available (SPARK-54929).""" rdd = self.sc.parallelize(range(10)) resources = rdd.map(lambda x: TaskContext.get().resources()).take(1)[0] - self.assertEqual(len(resources), 1) + self.assertEqual(len(resources), 2) self.assertTrue("gpu" in resources) self.assertEqual(resources["gpu"].name, "gpu") self.assertEqual(resources["gpu"].addresses, ["0"]) + self.assertTrue("fpga" in resources) + self.assertEqual(resources["fpga"].name, "fpga") + self.assertEqual(resources["fpga"].addresses, ["0"]) def tearDown(self): - os.unlink(self.tempFile.name) + os.unlink(self.gpuTempFile.name) + os.unlink(self.fpgaTempFile.name) self.sc.stop() diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 386efadce58e2..cbdc66fc2f8ad 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -3480,7 +3480,6 @@ def main(infile, outfile): key = utf8_deserializer.loads(infile) name = utf8_deserializer.loads(infile) addresses = [] - taskContext._resources = {} for a in range(read_int(infile)): addresses.append(utf8_deserializer.loads(infile)) taskContext._resources[key] = ResourceInformation(name, addresses)