Skip to content

Commit 68068c7

Browse files
committed
Fixed bugs in PDS catalog caching.
1 parent 6f97fe5 commit 68068c7

File tree

4 files changed

+29
-6
lines changed

4 files changed

+29
-6
lines changed

experimental/src/it/scala/org/locationtech/rasterframes/experimental/datasource/awspds/L8CatalogRelationTest.scala

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.locationtech.rasterframes.datasource.raster._
3232
class L8CatalogRelationTest extends TestEnvironment {
3333
import spark.implicits._
3434

35-
val catalog = spark.read.l8Catalog.load()
35+
val catalog = spark.read.l8Catalog.load().cache()
3636

3737
val scenes = catalog
3838
.where($"acquisition_date" === to_timestamp(lit("2017-04-04 15:12:55.394")))
@@ -104,5 +104,28 @@ class L8CatalogRelationTest extends TestEnvironment {
104104
stats.data_cells should be (512L * 512L)
105105
stats.mean shouldBe > (10000.0)
106106
}
107+
108+
it("should construct an RGB composite") {
109+
val aoi = "LINESTRING (31.115 29.963, 31.148 29.99)"
110+
val sceneCat = catalog
111+
.where(
112+
to_date($"acquisition_date") === to_date(lit("2019-07-03")) &&
113+
st_intersects(st_geometry($"bounds_wgs84"), st_geomFromWKT(aoi))
114+
)
115+
116+
catalog.orderBy(desc("acquisition_date")).select($"acquisition_date").show(false)
117+
catalog.where(to_date($"acquisition_date") === to_date(lit("2019-03-07"))).show(false)
118+
119+
//sceneCat.show(false)
120+
121+
122+
// val df = spark.read.raster
123+
// .fromCatalog(scenes, "B4", "B3", "B2")
124+
// .withTileDimensions(128, 128)
125+
// .load()
126+
// .where
127+
128+
129+
}
107130
}
108131
}

experimental/src/main/scala/org/locationtech/rasterframes/experimental/datasource/CachedDatasetRelation.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import com.typesafe.scalalogging.LazyLogging
2525
import org.apache.hadoop.fs.{FileSystem, Path => HadoopPath}
2626
import org.apache.spark.rdd.RDD
2727
import org.apache.spark.sql.sources.BaseRelation
28-
import org.apache.spark.sql.{Dataset, Row}
28+
import org.apache.spark.sql.{Dataset, Row, SaveMode}
2929
import org.locationtech.rasterframes.util._
3030

3131
/**
@@ -40,12 +40,12 @@ trait CachedDatasetRelation extends ResourceCacheSupport { self: BaseRelation wi
4040
def buildScan(): RDD[Row] = {
4141
val conf = sqlContext.sparkContext.hadoopConfiguration
4242
implicit val fs: FileSystem = FileSystem.get(conf)
43-
val catalog = cacheFile.when(fs.exists)
43+
val catalog = cacheFile.when(p => fs.exists(p) && !expired(p))
4444
.map(p {logger.debug("Reading " + p); p})
4545
.map(p sqlContext.read.parquet(p.toString))
4646
.getOrElse {
4747
val scenes = constructDataset
48-
scenes.write.parquet(cacheFile.toString)
48+
scenes.write.mode(SaveMode.Overwrite).parquet(cacheFile.toString)
4949
scenes
5050
}
5151

experimental/src/main/scala/org/locationtech/rasterframes/experimental/datasource/ResourceCacheSupport.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ trait ResourceCacheSupport extends DownloadSupport { self: LazyLogging ⇒
5151
else {
5252

5353
val time = fs.getFileStatus(p).getModificationTime
54-
val exp = Instant.ofEpochMilli(time).isAfter(Instant.now().plus(Duration.ofHours(maxCacheFileAgeHours)))
54+
val exp = Instant.ofEpochMilli(time).plus(Duration.ofHours(maxCacheFileAgeHours)).isBefore(Instant.now())
5555
if(exp) logger.debug(s"'$p' is expired with mod time of '$time'")
5656
exp
5757
}

experimental/src/main/scala/org/locationtech/rasterframes/experimental/datasource/awspds/L8CatalogDataSource.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ class L8CatalogDataSource extends DataSourceRegister with RelationProvider {
3939
def shortName = L8CatalogDataSource.SHORT_NAME
4040

4141
def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = {
42-
require(parameters.get("path").isEmpty, "MODISCatalogDataSource doesn't support specifying a path. Please use `load()`.")
42+
require(parameters.get("path").isEmpty, "L8CatalogDataSource doesn't support specifying a path. Please use `load()`.")
4343

4444
val conf = sqlContext.sparkContext.hadoopConfiguration
4545
implicit val fs = FileSystem.get(conf)

0 commit comments

Comments
 (0)