Skip to content

Commit 07537c6

Browse files
committed
Attempt to make accumulators work again.
1 parent a8d488d commit 07537c6

File tree

2 files changed

+4
-19
lines changed

2 files changed

+4
-19
lines changed

core/src/main/scala/astraea/spark/rasterframes/util/ReadAccumulator.scala

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ import com.typesafe.scalalogging.LazyLogging
2727
import org.apache.spark.SparkContext
2828
import org.apache.spark.util.LongAccumulator
2929

30-
import scala.collection.mutable
31-
3230
/**
3331
* Support for keeping counts of read operations from RasterSource-s
3432
*
@@ -44,22 +42,9 @@ case class ReadAccumulator(reads: () ⇒ LongAccumulator, bytes: () ⇒ LongAccu
4442
}
4543

4644
object ReadAccumulator extends LazyLogging {
47-
private val reads: mutable.Map[String, LongAccumulator] = mutable.Map.empty
48-
private val bytes: mutable.Map[String, LongAccumulator] = mutable.Map.empty
49-
5045
def apply(sc: SparkContext, prefix: String): ReadAccumulator = this.synchronized {
51-
// TODO: Not sure how inititalize these in the proper scope... this is not it.
52-
reads.getOrElseUpdate(prefix, sc.longAccumulator(prefix + ".reads"))//.reset()
53-
bytes.getOrElseUpdate(prefix, sc.longAccumulator(prefix + ".bytes"))//.reset()
54-
new ReadAccumulator(() reads(prefix), () bytes(prefix))
55-
}
56-
57-
def log(): Unit = this.synchronized {
58-
val keys = reads.keySet.intersect(bytes.keySet)
59-
keys.foreach { key
60-
val r = reads(key).value
61-
val b = bytes(key).value
62-
logger.info(s"readCount=$r, totalBytes=$b")
63-
}
46+
val reads = sc.longAccumulator(prefix + ".reads")
47+
val bytes = sc.longAccumulator(prefix + ".bytes")
48+
new ReadAccumulator(() reads, () bytes)
6449
}
6550
}

experimental/src/it/scala/astraea/spark/rasterframes/experimental/datasource/awspds/L8RelationTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ class L8RelationTest extends TestEnvironment with BeforeAndAfterAll with BeforeA
5151
}
5252

5353
after {
54-
ReadAccumulator.log()
54+
spark.sparkContext.register()
5555
}
5656

5757
describe("Read L8 on PDS as a DataSource") {

0 commit comments

Comments
 (0)