Skip to content

Commit 343cd21

Browse files
authored
fix(streaming): update ClickHouse windowed meter query (#3574)
1 parent 19867c9 commit 343cd21

File tree

3 files changed

+214
-1
lines changed

3 files changed

+214
-1
lines changed

e2e/e2e_test.go

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -746,6 +746,172 @@ func TestQuery(t *testing.T) {
746746
})
747747
}
748748

749+
func TestQueryDSTTransition(t *testing.T) {
750+
client := initClient(t)
751+
752+
// Test DST transitions with DAY window size in America/Los_Angeles timezone
753+
// Verify that window boundaries are continuous across DST changes
754+
// DST start: March 9, 2025 at 2am -> 3am (spring forward)
755+
// DST end: November 2, 2025 at 2am -> 1am (fall back)
756+
757+
losAngeles, err := time.LoadLocation("America/Los_Angeles")
758+
require.NoError(t, err)
759+
760+
eventType := "query"
761+
subject := "customer-dst"
762+
763+
t.Run("DST_Spring_Forward", func(t *testing.T) {
764+
// Test March 9, 2025 DST transition (spring forward: 2am -> 3am)
765+
// Test 3 days before and 3 days after the transition
766+
var events []cloudevents.Event
767+
768+
// Generate events for March 6-12, 2025 (3 days before, transition day, 3 days after)
769+
for day := 6; day <= 12; day++ {
770+
baseTime := time.Date(2025, time.March, day, 10, 0, 0, 0, losAngeles)
771+
ev := cloudevents.New()
772+
ev.SetID(ulid.Make().String())
773+
ev.SetSource("dst-test")
774+
ev.SetType(eventType)
775+
ev.SetSubject(subject)
776+
ev.SetTime(baseTime)
777+
_ = ev.SetData("application/json", map[string]interface{}{
778+
"duration_ms": "100",
779+
})
780+
events = append(events, ev)
781+
}
782+
783+
// Ingest events
784+
{
785+
resp, err := client.IngestEventBatchWithResponse(context.Background(), events)
786+
require.NoError(t, err)
787+
require.Equal(t, http.StatusNoContent, resp.StatusCode())
788+
}
789+
790+
// Query with DAY window size in Los Angeles timezone
791+
assert.EventuallyWithT(t, func(t *assert.CollectT) {
792+
from := time.Date(2025, time.March, 6, 0, 0, 0, 0, losAngeles)
793+
to := time.Date(2025, time.March, 13, 0, 0, 0, 0, losAngeles)
794+
795+
resp, err := client.QueryMeterWithResponse(context.Background(), eventType, &api.QueryMeterParams{
796+
Subject: &[]string{subject},
797+
WindowSize: lo.ToPtr(api.WindowSizeDay),
798+
WindowTimeZone: lo.ToPtr("America/Los_Angeles"),
799+
From: lo.ToPtr(from),
800+
To: lo.ToPtr(to),
801+
})
802+
require.NoError(t, err)
803+
require.Equal(t, http.StatusOK, resp.StatusCode())
804+
805+
require.NotNil(t, resp.JSON200)
806+
require.Len(t, resp.JSON200.Data, 7) // March 6-12
807+
808+
// Verify window continuity - each window's end should equal the next window's start
809+
for i := 0; i < len(resp.JSON200.Data)-1; i++ {
810+
currentWindowEnd := resp.JSON200.Data[i].WindowEnd.In(losAngeles)
811+
nextWindowStart := resp.JSON200.Data[i+1].WindowStart.In(losAngeles)
812+
assert.Equal(t, currentWindowEnd, nextWindowStart,
813+
"Gap detected: window %d ends at %v but window %d starts at %v",
814+
i, currentWindowEnd, i+1, nextWindowStart)
815+
}
816+
817+
// Verify all windows start at midnight in LA timezone
818+
for i, row := range resp.JSON200.Data {
819+
windowStart := row.WindowStart.In(losAngeles)
820+
assert.Equal(t, 0, windowStart.Hour(), "Day %d window doesn't start at midnight", i)
821+
assert.Equal(t, 0, windowStart.Minute(), "Day %d window doesn't start at midnight", i)
822+
assert.Equal(t, 0, windowStart.Second(), "Day %d window doesn't start at midnight", i)
823+
}
824+
825+
// Verify the DST transition day (March 9) windows are correct
826+
dstDay := resp.JSON200.Data[3] // March 9 is index 3 (6,7,8,9)
827+
assert.Equal(t, time.Date(2025, time.March, 9, 0, 0, 0, 0, losAngeles), dstDay.WindowStart.In(losAngeles))
828+
assert.Equal(t, time.Date(2025, time.March, 10, 0, 0, 0, 0, losAngeles), dstDay.WindowEnd.In(losAngeles))
829+
}, time.Minute, time.Second)
830+
})
831+
832+
t.Run("DST_Fall_Back", func(t *testing.T) {
833+
// Test November 2, 2025 DST transition (fall back: 2am -> 1am)
834+
// Test 3 days before and 3 days after the transition
835+
var events []cloudevents.Event
836+
837+
// Generate events for October 30 - November 5, 2025 (3 days before, transition day, 3 days after)
838+
days := []struct {
839+
month int
840+
day int
841+
}{
842+
{int(time.October), 30},
843+
{int(time.October), 31},
844+
{int(time.November), 1},
845+
{int(time.November), 2},
846+
{int(time.November), 3},
847+
{int(time.November), 4},
848+
{int(time.November), 5},
849+
}
850+
851+
for _, d := range days {
852+
baseTime := time.Date(2025, time.Month(d.month), d.day, 10, 0, 0, 0, losAngeles)
853+
ev := cloudevents.New()
854+
ev.SetID(ulid.Make().String())
855+
ev.SetSource("dst-test")
856+
ev.SetType(eventType)
857+
ev.SetSubject(subject)
858+
ev.SetTime(baseTime)
859+
_ = ev.SetData("application/json", map[string]interface{}{
860+
"duration_ms": "100",
861+
})
862+
events = append(events, ev)
863+
}
864+
865+
// Ingest events
866+
{
867+
resp, err := client.IngestEventBatchWithResponse(context.Background(), events)
868+
require.NoError(t, err)
869+
require.Equal(t, http.StatusNoContent, resp.StatusCode())
870+
}
871+
872+
// Query with DAY window size in Los Angeles timezone
873+
assert.EventuallyWithT(t, func(t *assert.CollectT) {
874+
from := time.Date(2025, time.October, 30, 0, 0, 0, 0, losAngeles)
875+
to := time.Date(2025, time.November, 6, 0, 0, 0, 0, losAngeles)
876+
877+
resp, err := client.QueryMeterWithResponse(context.Background(), eventType, &api.QueryMeterParams{
878+
Subject: &[]string{subject},
879+
WindowSize: lo.ToPtr(api.WindowSizeDay),
880+
WindowTimeZone: lo.ToPtr("America/Los_Angeles"),
881+
From: lo.ToPtr(from),
882+
To: lo.ToPtr(to),
883+
})
884+
require.NoError(t, err)
885+
require.Equal(t, http.StatusOK, resp.StatusCode())
886+
887+
require.NotNil(t, resp.JSON200)
888+
require.Len(t, resp.JSON200.Data, 7) // October 30 - November 5
889+
890+
// Verify window continuity - each window's end should equal the next window's start
891+
for i := 0; i < len(resp.JSON200.Data)-1; i++ {
892+
currentWindowEnd := resp.JSON200.Data[i].WindowEnd.In(losAngeles)
893+
nextWindowStart := resp.JSON200.Data[i+1].WindowStart.In(losAngeles)
894+
assert.Equal(t, currentWindowEnd, nextWindowStart,
895+
"Gap detected: window %d ends at %v but window %d starts at %v",
896+
i, currentWindowEnd, i+1, nextWindowStart)
897+
}
898+
899+
// Verify all windows start at midnight in LA timezone
900+
for i, row := range resp.JSON200.Data {
901+
windowStart := row.WindowStart.In(losAngeles)
902+
assert.Equal(t, 0, windowStart.Hour(), "Day %d window doesn't start at midnight", i)
903+
assert.Equal(t, 0, windowStart.Minute(), "Day %d window doesn't start at midnight", i)
904+
assert.Equal(t, 0, windowStart.Second(), "Day %d window doesn't start at midnight", i)
905+
}
906+
907+
// Verify the DST transition day (November 2) windows are correct
908+
dstDay := resp.JSON200.Data[3] // November 2 is index 3 (Oct 30, 31, Nov 1, 2)
909+
assert.Equal(t, time.Date(2025, time.November, 2, 0, 0, 0, 0, losAngeles), dstDay.WindowStart.In(losAngeles))
910+
assert.Equal(t, time.Date(2025, time.November, 3, 0, 0, 0, 0, losAngeles), dstDay.WindowEnd.In(losAngeles))
911+
}, time.Minute, time.Second)
912+
})
913+
}
914+
749915
func TestCredit(t *testing.T) {
750916
client := initClient(t)
751917
meterSlug := "credit_test_meter"

openmeter/streaming/clickhouse/meter_query.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func (d *queryMeter) toSQL() (string, []interface{}, error) {
118118
selectColumns = append(
119119
selectColumns,
120120
fmt.Sprintf("tumbleStart(%s, toIntervalDay(1), '%s') AS windowstart", timeColumn, tz),
121-
fmt.Sprintf("tumbleEnd(%s, toIntervalDay(1), '%s') AS windowend", timeColumn, tz),
121+
"windowstart + toIntervalDay(1) AS windowend",
122122
)
123123

124124
case meterpkg.WindowSizeMonth:

openmeter/streaming/clickhouse/meter_query_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,53 @@ func TestQueryMeter(t *testing.T) {
201201
wantSQL: "SELECT tumbleStart(om_events.time, toIntervalHour(1), 'Asia/Shanghai') AS windowstart, tumbleEnd(om_events.time, toIntervalHour(1), 'Asia/Shanghai') AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND om_events.time >= ? AND om_events.time < ? GROUP BY windowstart, windowend ORDER BY windowstart",
202202
wantArgs: []interface{}{"my_namespace", "event1", from.Unix(), to.Unix()},
203203
},
204+
{
205+
name: "Aggregate data between period, groupped by DAY window size",
206+
query: queryMeter{
207+
Database: "openmeter",
208+
EventsTableName: "om_events",
209+
Namespace: "my_namespace",
210+
Meter: meter.Meter{
211+
Key: "meter1",
212+
EventType: "event1",
213+
Aggregation: meter.MeterAggregationSum,
214+
ValueProperty: lo.ToPtr("$.value"),
215+
GroupBy: map[string]string{
216+
"group1": "$.group1",
217+
"group2": "$.group2",
218+
},
219+
},
220+
From: &from,
221+
To: &to,
222+
WindowSize: lo.ToPtr(meter.WindowSizeDay),
223+
},
224+
wantSQL: "SELECT tumbleStart(om_events.time, toIntervalDay(1), 'UTC') AS windowstart, windowstart + toIntervalDay(1) AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND om_events.time >= ? AND om_events.time < ? GROUP BY windowstart, windowend ORDER BY windowstart",
225+
wantArgs: []interface{}{"my_namespace", "event1", from.Unix(), to.Unix()},
226+
},
227+
{
228+
name: "Aggregate data between period in a different timezone, groupped by DAY window size",
229+
query: queryMeter{
230+
Database: "openmeter",
231+
EventsTableName: "om_events",
232+
Namespace: "my_namespace",
233+
Meter: meter.Meter{
234+
Key: "meter1",
235+
EventType: "event1",
236+
Aggregation: meter.MeterAggregationSum,
237+
ValueProperty: lo.ToPtr("$.value"),
238+
GroupBy: map[string]string{
239+
"group1": "$.group1",
240+
"group2": "$.group2",
241+
},
242+
},
243+
From: &from,
244+
To: &to,
245+
WindowSize: lo.ToPtr(meter.WindowSizeDay),
246+
WindowTimeZone: tz,
247+
},
248+
wantSQL: "SELECT tumbleStart(om_events.time, toIntervalDay(1), 'Asia/Shanghai') AS windowstart, windowstart + toIntervalDay(1) AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND om_events.time >= ? AND om_events.time < ? GROUP BY windowstart, windowend ORDER BY windowstart",
249+
wantArgs: []interface{}{"my_namespace", "event1", from.Unix(), to.Unix()},
250+
},
204251
{
205252
name: "Aggregate data for a single subject",
206253
query: queryMeter{

0 commit comments

Comments
 (0)