Skip to content

Commit 416bf5d

Browse files
stanchclaude
andauthored
Add 7-day timeline view alongside existing 30-minute timeline (#181)
* Add 7-day timeline view alongside existing 30-minute timeline - Implement side-by-side timeline charts showing 7 days and 30 minutes - Replace minute-based system with flexible bucket-based approach - Update backend to handle TimelineRequest with custom time ranges - Use efficient SQL with VALUES clause for bulk bucket queries - Support click-to-filter functionality across both timelines 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * Split into subqueries to use a covering index --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent 5cd03b4 commit 416bf5d

File tree

10 files changed

+283
-135
lines changed

10 files changed

+283
-135
lines changed

src/main/scala/com.snowplowanalytics.snowplow.micro/EventStorage.scala

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,14 @@ import com.snowplowanalytics.snowplow.micro.Configuration.StorageConfig
1515
import com.snowplowanalytics.snowplow.micro.model.ColumnStatsResponse
1616
import io.circe.Json
1717

18-
import java.time.Instant
19-
import java.time.temporal.ChronoUnit
2018
import scala.collection.mutable
2119

2220
trait EventStorage {
2321
def addToGood(events: List[GoodEvent]): IO[Unit]
2422
def addToBad(events: List[BadEvent]): IO[Unit]
2523
def reset(): IO[Unit]
2624
def getColumns: IO[List[String]]
27-
def getTimeline: IO[TimelineData]
25+
def getTimeline(request: TimelineRequest): IO[TimelineData]
2826
def getColumnStats(columns: List[String]): IO[ColumnStatsResponse]
2927
def getFilteredEvents(request: EventsRequest): IO[EventsResponse]
3028
}
@@ -34,7 +32,10 @@ object NoStorage extends EventStorage {
3432
def addToBad(events: List[BadEvent]): IO[Unit] = IO.unit
3533
def reset(): IO[Unit] = IO.unit
3634
def getColumns: IO[List[String]] = IO.pure(List.empty)
37-
def getTimeline: IO[TimelineData] = IO.pure(TimelineData(List.empty))
35+
def getTimeline(request: TimelineRequest): IO[TimelineData] = {
36+
val emptyPoints = request.buckets.map(bucket => TimelinePoint(0, 0, bucket))
37+
IO.pure(TimelineData(emptyPoints))
38+
}
3839
def getColumnStats(columns: List[String]): IO[ColumnStatsResponse] = IO.pure(Map.empty)
3940
def getFilteredEvents(request: EventsRequest): IO[EventsResponse] = IO.pure(EventsResponse(List.empty, 0, 0))
4041
}
@@ -54,16 +55,6 @@ object EventStorage {
5455
}
5556
}
5657

57-
def fillMissingMinutes(points: List[TimelinePoint]): List[TimelinePoint] = {
58-
// the first point is the latest one
59-
val latestTime = points.headOption.fold(Instant.now().truncatedTo(ChronoUnit.MINUTES))(_.timestamp)
60-
val pointMap = points.map(p => p.timestamp -> p).toMap
61-
62-
(0L to 30L).toList.map { delta =>
63-
val minute = latestTime.minus(delta, ChronoUnit.MINUTES)
64-
pointMap.getOrElse(minute, TimelinePoint(0, 0, minute))
65-
}
66-
}
6758

6859
def isTimestampColumn(column: String): Boolean = column.endsWith("_tstamp")
6960
def isContextsColumn(column: String): Boolean = column.startsWith("contexts_")

src/main/scala/com.snowplowanalytics.snowplow.micro/InMemoryStorage.scala

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import cats.effect.IO
1414
import com.snowplowanalytics.snowplow.micro.model.ColumnStatsResponse
1515
import io.circe.Json
1616

17-
import java.time.temporal.ChronoUnit
1817

1918
/** In-memory cache containing the results of the validation (or not) of the tracking events.
2019
* Good events are stored with their type, their schema and their contexts, if any,
@@ -99,17 +98,20 @@ private[micro] class InMemoryStorage extends EventStorage {
9998
}
10099
}
101100

102-
override def getTimeline: IO[TimelineData] = IO.delay {
103-
val groupedByMinute = LockGood.synchronized {
104-
good.groupBy(event => event.event.collector_tstamp.truncatedTo(ChronoUnit.MINUTES))
105-
.map {
106-
case (minute, events) =>
107-
val (failed, valid) = events.partition(_.incomplete)
108-
TimelinePoint(valid.size, failed.size, minute)
109-
}.toList
101+
override def getTimeline(request: TimelineRequest): IO[TimelineData] = IO.delay {
102+
val allEvents = getGoodAndIncomplete
103+
104+
val points = request.buckets.map { bucket =>
105+
val eventsInBucket = allEvents.filter { event =>
106+
val timestamp = event.event.collector_tstamp
107+
!timestamp.isBefore(bucket.start) && timestamp.isBefore(bucket.end)
108+
}
109+
110+
val (failed, valid) = eventsInBucket.partition(_.incomplete)
111+
TimelinePoint(valid.size, failed.size, bucket)
110112
}
111-
val filledPoints = EventStorage.fillMissingMinutes(groupedByMinute)
112-
TimelineData(filledPoints)
113+
114+
TimelineData(points)
113115
}
114116

115117
override def getColumnStats(columns: List[String]): IO[ColumnStatsResponse] = {

src/main/scala/com.snowplowanalytics.snowplow.micro/Routing.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,10 @@ sealed trait MicroRoutes[S <: EventStorage] extends Http4sDsl[IO] {
5555
request.as[ColumnStatsRequest].flatMap { req =>
5656
storage.getColumnStats(req.columns).flatMap(stats => Ok(stats))
5757
}
58-
case GET -> Root / "micro" / "timeline" =>
59-
storage.getTimeline.flatMap(timeline => Ok(timeline))
58+
case request @ POST -> Root / "micro" / "timeline" =>
59+
request.as[TimelineRequest].flatMap { req =>
60+
storage.getTimeline(req).flatMap(timeline => Ok(timeline))
61+
}
6062
case GET -> Root / "micro" / "iglu" / vendor / name / "jsonschema" / versionVar =>
6163
lookupSchema(vendor, name, versionVar)
6264
}
@@ -205,6 +207,7 @@ object Routing {
205207
implicit val e: Encoder[Event] = deriveEncoder
206208
implicit val be: Encoder[BadEvent] = deriveEncoder
207209
implicit val re: Encoder[ResolutionError] = deriveEncoder
210+
implicit val tb: Encoder[TimelineBucket] = deriveEncoder
208211
implicit val tp: Encoder[TimelinePoint] = deriveEncoder
209212
implicit val td: Encoder[TimelineData] = deriveEncoder
210213
implicit val cs: Encoder[ColumnStats] = deriveEncoder
@@ -217,6 +220,8 @@ object Routing {
217220
implicit val fg: Decoder[FiltersGood] = deriveDecoder
218221
implicit val fb: Decoder[FiltersBad] = deriveDecoder
219222
implicit val csr: Decoder[ColumnStatsRequest] = deriveDecoder
223+
implicit val tbd: Decoder[TimelineBucket] = deriveDecoder
224+
implicit val trd1: Decoder[TimelineRequest] = deriveDecoder
220225
implicit val efd: Decoder[EventsFilter] = deriveDecoder
221226
implicit val trd: Decoder[TimeRange] = deriveDecoder
222227
implicit val esd: Decoder[EventsSorting] = deriveDecoder

src/main/scala/com.snowplowanalytics.snowplow.micro/SqliteStorage.scala

Lines changed: 50 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -83,36 +83,67 @@ private[micro] class SqliteStorage(readXa: Transactor[IO], writeXa: Transactor[I
8383
.transact(readXa)
8484
}
8585

86-
override def getTimeline: IO[TimelineData] = {
86+
override def getTimeline(request: TimelineRequest): IO[TimelineData] = {
87+
if (request.buckets.isEmpty) {
88+
return IO.pure(TimelineData(List.empty))
89+
}
90+
91+
// Create a VALUES clause with all bucket ranges
92+
val bucketValues = request.buckets.zipWithIndex.map { case (bucket, idx) =>
93+
s"(${bucket.start.toEpochMilli}, ${bucket.end.toEpochMilli}, $idx)"
94+
}.mkString(", ")
95+
96+
// Split into two queries so that the timestamp + failed index can be used for each
8797
val query = sql"""
88-
WITH latest_event AS (
89-
SELECT COALESCE(MAX(timestamp), ${System.currentTimeMillis()}) as max_timestamp FROM events
98+
WITH buckets(bucket_start, bucket_end, bucket_order) AS (
99+
VALUES """ ++ Fragment.const(bucketValues) ++ sql"""
90100
),
91-
time_range AS (
101+
valid_counts AS (
92102
SELECT
93-
(max_timestamp / 60000) * 60000 as latest_minute,
94-
(max_timestamp / 60000) * 60000 - 30 * 60000 as start_minute
95-
FROM latest_event
103+
bucket_start,
104+
bucket_end,
105+
bucket_order,
106+
COUNT(events.rowid) as valid_count
107+
FROM buckets
108+
LEFT JOIN events ON events.timestamp >= buckets.bucket_start
109+
AND events.timestamp < buckets.bucket_end
110+
AND events.failed = false
111+
GROUP BY bucket_start, bucket_end, bucket_order
96112
),
97-
sparse_data AS (
113+
failed_counts AS (
98114
SELECT
99-
(timestamp / 60000) * 60000 as minute,
100-
COUNT(CASE WHEN NOT failed THEN 1 END) as valid_count,
101-
COUNT(CASE WHEN failed THEN 1 END) as failed_count
102-
FROM events, time_range
103-
WHERE timestamp >= time_range.start_minute AND timestamp < (time_range.latest_minute + 60000)
104-
GROUP BY minute
115+
bucket_start,
116+
bucket_end,
117+
bucket_order,
118+
COUNT(events.rowid) as failed_count
119+
FROM buckets
120+
INNER JOIN events ON events.timestamp >= buckets.bucket_start
121+
AND events.timestamp < buckets.bucket_end
122+
AND events.failed = true
123+
GROUP BY bucket_start, bucket_end, bucket_order
105124
)
106-
SELECT minute, valid_count, failed_count FROM sparse_data ORDER BY minute DESC
125+
SELECT
126+
v.bucket_start,
127+
v.bucket_end,
128+
v.bucket_order,
129+
COALESCE(v.valid_count, 0) as valid_count,
130+
COALESCE(f.failed_count, 0) as failed_count
131+
FROM valid_counts v
132+
LEFT JOIN failed_counts f ON v.bucket_start = f.bucket_start
133+
AND v.bucket_end = f.bucket_end
134+
ORDER BY v.bucket_order
107135
"""
108136

109137
query
110-
.query[TimelinePoint]
138+
.query[(Long, Long, Int, Int, Int)]
111139
.to[List]
112140
.transact(readXa)
113-
.map { sparsePoints =>
114-
val filledPoints = EventStorage.fillMissingMinutes(sparsePoints)
115-
TimelineData(filledPoints)
141+
.map { results =>
142+
val points = results.map { case (start, end, _, validCount, failedCount) =>
143+
val bucket = TimelineBucket(Instant.ofEpochMilli(start), Instant.ofEpochMilli(end))
144+
TimelinePoint(validCount, failedCount, bucket)
145+
}
146+
TimelineData(points)
116147
}
117148
}
118149

@@ -340,9 +371,4 @@ private[micro] object SqliteStorage {
340371

341372
implicit val instantEpochMeta: Meta[Instant] =
342373
Meta[Timestamp].timap(_.toInstant)(Timestamp.from)
343-
344-
implicit val timelinePointRead: Read[TimelinePoint] =
345-
Read[(Instant, Int, Int)].map { case (timestamp, validEvents, failedEvents) =>
346-
TimelinePoint(validEvents, failedEvents, timestamp)
347-
}
348374
}

src/main/scala/com.snowplowanalytics.snowplow.micro/model.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@ private [micro] final case class FiltersBad(
5353
private [micro] final case class ValidationSummary(total: Int, good: Int, bad: Int)
5454

5555
/** Timeline data structures for /micro/timeline endpoint. */
56-
final case class TimelinePoint(validEvents: Int, failedEvents: Int, timestamp: Instant)
56+
final case class TimelineBucket(start: Instant, end: Instant)
57+
final case class TimelineRequest(buckets: List[TimelineBucket])
58+
final case class TimelinePoint(validEvents: Int, failedEvents: Int, bucket: TimelineBucket)
5759
final case class TimelineData(points: List[TimelinePoint])
5860

5961
/** Column statistics data structures for /micro/columnStats endpoint. */

src/test/scala/com.snowplowanalytics.snowplow.micro/EventStorageSpec.scala

Lines changed: 58 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -188,41 +188,58 @@ trait EventStorageTimelineSpec {
188188

189189
import InMemoryStorageSpec._
190190

191+
// Helper function to create timeline requests for testing
192+
def createTestTimelineRequest(bucketCount: Int): TimelineRequest = {
193+
val now = java.time.Instant.now()
194+
val buckets = (0 until bucketCount).map { i =>
195+
val start = now.minusSeconds((i + 1) * 60L) // 1 minute buckets going backwards
196+
val end = start.plusSeconds(60L)
197+
TimelineBucket(start, end)
198+
}.reverse.toList // Most recent first
199+
TimelineRequest(buckets)
200+
}
201+
191202
def timelineTests(storageResource: Resource[IO, EventStorage], storageName: String): Fragment = {
192203
s"$storageName getTimeline" >> {
193204
"should return empty timeline for empty storage" >> {
194205
storageResource.use { storage =>
195-
storage.getTimeline.map { timeline =>
196-
timeline.points must have size(31)
206+
val request = createTestTimelineRequest(3) // Create 3 test buckets
207+
storage.getTimeline(request).map { timeline =>
208+
timeline.points must have size(3)
197209
timeline.points.forall(p => p.validEvents == 0 && p.failedEvents == 0) must beTrue
198210
}
199211
}.unsafeRunSync()
200212
}
201213

202-
"should return timeline with events grouped by minute" >> {
214+
"should return timeline with events grouped by buckets" >> {
203215
storageResource.use { storage =>
204216
for {
205217
_ <- storage.addToGood(List(GoodEvent3, GoodEvent2, GoodEvent1))
206-
timeline <- storage.getTimeline
218+
// Create buckets that span the event timestamps
219+
buckets = List(
220+
TimelineBucket(GoodEvent1.event.collector_tstamp.truncatedTo(ChronoUnit.MINUTES),
221+
GoodEvent1.event.collector_tstamp.truncatedTo(ChronoUnit.MINUTES).plusSeconds(60)),
222+
TimelineBucket(GoodEvent3.event.collector_tstamp.truncatedTo(ChronoUnit.MINUTES),
223+
GoodEvent3.event.collector_tstamp.truncatedTo(ChronoUnit.MINUTES).plusSeconds(60))
224+
)
225+
request = TimelineRequest(buckets)
226+
timeline <- storage.getTimeline(request)
207227
} yield {
208-
timeline.points must have size(31)
228+
timeline.points must have size(2)
209229

210230
val pointsWithEvents = timeline.points.filter(p => p.validEvents > 0 || p.failedEvents > 0)
211-
pointsWithEvents must have size(2) // GoodEvent1 & GoodEvent2 in same minute, GoodEvent3 in different minute
231+
pointsWithEvents must have size(2) // GoodEvent1 & GoodEvent2 in same bucket, GoodEvent3 in different bucket
212232

213-
val eventMinute12 = GoodEvent1.event.collector_tstamp.truncatedTo(ChronoUnit.MINUTES)
214-
val eventMinute3 = GoodEvent3.event.collector_tstamp.truncatedTo(ChronoUnit.MINUTES)
215-
216-
val minute12Point = pointsWithEvents.find(_.timestamp == eventMinute12)
217-
val minute3Point = pointsWithEvents.find(_.timestamp == eventMinute3)
233+
val minute12Point = pointsWithEvents.find(_.bucket == buckets(0))
234+
val minute3Point = pointsWithEvents.find(_.bucket == buckets(1))
218235

219236
minute12Point must beSome
220237
minute3Point must beSome
221238

222-
minute12Point.get.validEvents must_== 2
239+
minute12Point.get.validEvents must_== 2 // GoodEvent1 and GoodEvent2 are in the same minute
223240
minute12Point.get.failedEvents must_== 0
224241

225-
minute3Point.get.validEvents must_== 1
242+
minute3Point.get.validEvents must_== 1 // GoodEvent3
226243
minute3Point.get.failedEvents must_== 0
227244
}
228245
}.unsafeRunSync()
@@ -233,39 +250,54 @@ trait EventStorageTimelineSpec {
233250
storageResource.use { storage =>
234251
for {
235252
_ <- storage.addToGood(List(failedEvent, GoodEvent2, GoodEvent1))
236-
timeline <- storage.getTimeline
253+
// Create buckets that span the event timestamps
254+
buckets = List(
255+
TimelineBucket(GoodEvent1.event.collector_tstamp.truncatedTo(ChronoUnit.MINUTES),
256+
GoodEvent1.event.collector_tstamp.truncatedTo(ChronoUnit.MINUTES).plusSeconds(60)),
257+
TimelineBucket(failedEvent.event.collector_tstamp.truncatedTo(ChronoUnit.MINUTES),
258+
failedEvent.event.collector_tstamp.truncatedTo(ChronoUnit.MINUTES).plusSeconds(60))
259+
)
260+
request = TimelineRequest(buckets)
261+
timeline <- storage.getTimeline(request)
237262
} yield {
238-
timeline.points must have size(31)
263+
timeline.points must have size(2)
239264

240265
val pointsWithEvents = timeline.points.filter(p => p.validEvents > 0 || p.failedEvents > 0)
241266
pointsWithEvents must have size(2)
242267

243-
val eventMinute12 = GoodEvent1.event.collector_tstamp.truncatedTo(ChronoUnit.MINUTES)
244-
val eventMinute3 = failedEvent.event.collector_tstamp.truncatedTo(ChronoUnit.MINUTES)
245-
246-
val minute12Point = pointsWithEvents.find(_.timestamp == eventMinute12)
247-
val minute3Point = pointsWithEvents.find(_.timestamp == eventMinute3)
268+
val minute12Point = pointsWithEvents.find(_.bucket == buckets(0))
269+
val minute3Point = pointsWithEvents.find(_.bucket == buckets(1))
248270

249271
minute12Point must beSome
250272
minute3Point must beSome
251273

252-
minute12Point.get.validEvents must_== 2
274+
minute12Point.get.validEvents must_== 2 // GoodEvent1 and GoodEvent2
253275
minute12Point.get.failedEvents must_== 0
254276

255277
minute3Point.get.validEvents must_== 0
256-
minute3Point.get.failedEvents must_== 1
278+
minute3Point.get.failedEvents must_== 1 // failedEvent
257279
}
258280
}.unsafeRunSync()
259281
}
260282

261-
"should return timeline ordered by timestamp descending" >> {
283+
"should return timeline points in bucket order" >> {
262284
storageResource.use { storage =>
263285
for {
264286
_ <- storage.addToGood(List(GoodEvent1, GoodEvent2, GoodEvent3))
265-
timeline <- storage.getTimeline
287+
// Create 3 specific buckets in a known order
288+
buckets = List(
289+
TimelineBucket(java.time.Instant.ofEpochSecond(1761686340), java.time.Instant.ofEpochSecond(1761686400)), // Before Event1
290+
TimelineBucket(java.time.Instant.ofEpochSecond(1761686400), java.time.Instant.ofEpochSecond(1761686460)), // Covers Event1 & Event2
291+
TimelineBucket(java.time.Instant.ofEpochSecond(1761686520), java.time.Instant.ofEpochSecond(1761686580)) // Covers Event3
292+
)
293+
request = TimelineRequest(buckets)
294+
timeline <- storage.getTimeline(request)
266295
} yield {
267-
timeline.points must have size(31)
268-
timeline.points.map(_.timestamp.toEpochMilli) must beSorted(Ordering.Long.reverse)
296+
timeline.points must have size(3)
297+
// Timeline points should match the bucket order provided in the request
298+
timeline.points.zip(request.buckets).forall { case (point, bucket) =>
299+
point.bucket == bucket
300+
} must beTrue
269301
}
270302
}.unsafeRunSync()
271303
}

0 commit comments

Comments
 (0)