Skip to content
Merged

Dev #43

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ jobs:

- name: Run tests
if: matrix.go-version != '1.21'
run: go test -v -race -timeout 300s ./...
run: go test -v -race -timeout 600s ./...

- name: Run tests with coverage
if: matrix.go-version == '1.21'
run: go test -v -race -coverprofile="codecov.report" -covermode=atomic -timeout 300s ./...
run: go test -v -race -coverprofile="codecov.report" -covermode=atomic -timeout 600s ./...

- name: Upload coverage reports to Codecov
if: matrix.go-version == '1.21'
Expand Down
104 changes: 94 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func main() {
// Step 1: Create StreamSQL Instance
// StreamSQL is the core component of the stream SQL processing engine, managing the entire stream processing lifecycle
ssql := streamsql.New()
defer ssql.Stop()
// Step 2: Define Stream SQL Query Statement
// This SQL statement showcases StreamSQL's core capabilities:
// - SELECT: Choose output fields and aggregation functions
Expand Down Expand Up @@ -267,8 +267,7 @@ func main() {
window_end() as end
FROM stream
WHERE device.info.type = 'temperature'
GROUP BY device.location, TumblingWindow('5s')
WITH (TIMESTAMP='timestamp', TIMEUNIT='ss')`
GROUP BY device.location, TumblingWindow('5s')`

err := ssql.Execute(rsql)
if err != nil {
Expand Down Expand Up @@ -335,6 +334,11 @@ Since stream data is unbounded, it cannot be processed as a whole. Windows provi
- **Characteristics**: The size of the window is not related to time but is divided based on the volume of data. It is suitable for segmenting data based on the amount of data.
- **Application Scenario**: In industrial IoT, an aggregation calculation is performed every time 100 device status data records are processed.

- **Session Window**
- **Definition**: A dynamic window based on data activity. When the interval between data exceeds a specified timeout, the current session ends and triggers the window.
- **Characteristics**: Window size changes dynamically, automatically dividing sessions based on data arrival intervals. When data arrives continuously, the session continues; when the data interval exceeds the timeout, the session ends and triggers the window.
- **Application Scenario**: In user behavior analysis, maintain a session when users operate continuously, and close the session and count operations within that session when users stop operating for more than 5 minutes.

### Stream

- **Definition**: A continuous sequence of data that is generated in an unbounded manner, typically from sensors, log systems, user behaviors, etc.
Expand All @@ -343,17 +347,97 @@ Since stream data is unbounded, it cannot be processed as a whole. Windows provi

### Time Semantics

- **Event Time**
- **Definition**: The actual time when the data occurred, usually represented by a timestamp generated by the data source.

- **Processing Time**
- **Definition**: The time when the data arrives at the processing system.
StreamSQL supports two time concepts that determine how windows are divided and triggered:

#### Event Time

- **Definition**: Event time refers to the actual time when data was generated, usually recorded in a field within the data itself (such as `event_time`, `timestamp`, `order_time`, etc.).
- **Characteristics**:
- Windows are divided based on timestamp field values in the data
- Even if data arrives late, it can be correctly counted into the corresponding window based on event time
- Uses Watermark mechanism to handle out-of-order and late data
- Results are accurate but may have delays (need to wait for late data)
- **Use Cases**:
- Scenarios requiring precise temporal analysis
- Scenarios where data may arrive out of order or delayed
- Historical data replay and analysis
- **Configuration**: Use `WITH (TIMESTAMP='field_name', TIMEUNIT='ms')` to specify the event time field
- **Example**:
```sql
SELECT deviceId, COUNT(*) as cnt
FROM stream
GROUP BY deviceId, TumblingWindow('1m')
WITH (TIMESTAMP='eventTime', TIMEUNIT='ms')
```

#### Processing Time

- **Definition**: Processing time refers to the time when data arrives at the StreamSQL processing system, i.e., the current time when the system receives the data.
- **Characteristics**:
- Windows are divided based on the time data arrives at the system (`time.Now()`)
- Regardless of the time field value in the data, it is counted into the current window based on arrival time
- Uses system clock (Timer) to trigger windows
- Low latency but results may be inaccurate (cannot handle out-of-order and late data)
- **Use Cases**:
- Real-time monitoring and alerting scenarios
- Scenarios with high latency requirements and relatively low accuracy requirements
- Scenarios where data arrives in order and delay is controllable
- **Configuration**: Default when `WITH (TIMESTAMP=...)` is not specified
- **Example**:
```sql
SELECT deviceId, COUNT(*) as cnt
FROM stream
GROUP BY deviceId, TumblingWindow('1m')
-- No WITH clause specified, defaults to processing time
```

#### Event Time vs Processing Time Comparison

| Feature | Event Time | Processing Time |
|---------|------------|-----------------|
| **Time Source** | Timestamp field in data | System current time |
| **Window Division** | Based on event timestamp | Based on data arrival time |
| **Late Data Handling** | Supported (Watermark mechanism) | Not supported |
| **Out-of-Order Handling** | Supported (Watermark mechanism) | Not supported |
| **Result Accuracy** | Accurate | May be inaccurate |
| **Processing Latency** | Higher (need to wait for late data) | Low (real-time trigger) |
| **Configuration** | `WITH (TIMESTAMP='field')` | Default (no WITH clause) |
| **Use Cases** | Precise temporal analysis, historical replay | Real-time monitoring, low latency requirements |

#### Window Time

- **Window Start Time**
- **Definition**: The starting time point of the window based on event time. For example, for a sliding window based on event time, the window start time is the timestamp of the earliest event within the window.
- **Event Time Windows**: The starting time point of the window, aligned to window boundaries based on event time (e.g., aligned to minute or hour boundaries).
- **Processing Time Windows**: The starting time point of the window, based on the time data arrives at the system.
- **Example**: For an event-time-based tumbling window `TumblingWindow('5m')`, the window start time aligns to multiples of 5 minutes (e.g., 10:00, 10:05, 10:10).

- **Window End Time**
- **Definition**: The ending time point of the window based on event time. Typically, the window end time is the window start time plus the duration of the window. For example, if the duration of a sliding window is 1 minute, then the window end time is the window start time plus 1 minute.
- **Event Time Windows**: The ending time point of the window, usually the window start time plus the window duration. Windows trigger when `watermark >= window_end`.
- **Processing Time Windows**: The ending time point of the window, based on the time data arrives at the system plus the window duration. Windows trigger when the system clock reaches the end time.
- **Example**: For a tumbling window with a duration of 1 minute, if the window start time is 10:00, then the window end time is 10:01.

#### Watermark Mechanism (Event Time Windows Only)

- **Definition**: Watermark indicates "events with timestamps less than this time should not arrive anymore", used to determine when windows can trigger.
- **Calculation Formula**: `Watermark = max(event_time) - MaxOutOfOrderness`
- **Window Trigger Condition**: Windows trigger when `watermark >= window_end`
- **Configuration Parameters**:
- `MAXOUTOFORDERNESS`: Maximum allowed out-of-order time for tolerating data disorder (default: 0, no out-of-order allowed)
- `ALLOWEDLATENESS`: Time window can accept late data after triggering (default: 0, no late data accepted)
- `IDLETIMEOUT`: Timeout for advancing Watermark based on processing time when data source is idle (default: 0, disabled)
- **Example**:
```sql
SELECT deviceId, COUNT(*) as cnt
FROM stream
GROUP BY deviceId, TumblingWindow('5m')
WITH (
TIMESTAMP='eventTime',
TIMEUNIT='ms',
MAXOUTOFORDERNESS='5s', -- Tolerate 5 seconds of out-of-order
ALLOWEDLATENESS='2s', -- Accept 2 seconds of late data after window triggers
IDLETIMEOUT='5s' -- Advance watermark based on processing time after 5s of no data
)
```

## Contribution Guidelines

Expand Down
105 changes: 95 additions & 10 deletions README_ZH.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ import (
func main() {
// 1. 创建StreamSQL实例 - 这是流式SQL处理引擎的入口
ssql := streamsql.New()
defer ssql.Stop()
// 2. 定义流式SQL查询语句
// 核心概念解析:
// - TumblingWindow('5s'): 滚动窗口,每5秒创建一个新窗口,窗口之间不重叠
Expand Down Expand Up @@ -285,8 +285,7 @@ func main() {
window_end() as end
FROM stream
WHERE device.info.type = 'temperature'
GROUP BY device.location, TumblingWindow('5s')
WITH (TIMESTAMP='timestamp', TIMEUNIT='ss')`
GROUP BY device.location, TumblingWindow('5s')`

err := ssql.Execute(rsql)
if err != nil {
Expand Down Expand Up @@ -347,6 +346,11 @@ StreamSQL 支持多种函数类型,包括数学、字符串、转换、聚合
- **特点**:窗口的大小与时间无关,而是根据数据量来划分,适合对数据量进行分段处理。
- **应用场景**:在工业物联网中,每处理 100 条设备状态数据后进行一次聚合计算。

- **会话窗口(Session Window)**
- **定义**:基于数据活跃度的动态窗口,当数据之间的间隔超过指定的超时时间时,当前会话结束并触发窗口。
- **特点**:窗口大小动态变化,根据数据到达的间隔自动划分会话。当数据连续到达时,会话持续;当数据间隔超过超时时间时,会话结束并触发窗口。
- **应用场景**:在用户行为分析中,当用户连续操作时保持会话,当用户停止操作超过 5 分钟后关闭会话并统计该会话内的操作次数。

### 流(Stream)

- **定义**:流是数据的连续序列,数据以无界的方式产生,通常来自于传感器、日志系统、用户行为等。
Expand All @@ -355,16 +359,97 @@ StreamSQL 支持多种函数类型,包括数学、字符串、转换、聚合

### 时间语义

- **事件时间(Event Time)**
- **定义**:数据实际发生的时间,通常由数据源生成的时间戳表示。
StreamSQL 支持两种时间概念,它们决定了窗口如何划分和触发:

#### 事件时间(Event Time)

- **定义**:事件时间是指数据实际产生的时间,通常记录在数据本身的某个字段中(如 `event_time`、`timestamp`、`order_time` 等)。
- **特点**:
- 窗口基于数据中的时间戳字段值来划分
- 即使数据延迟到达,也能根据事件时间正确统计到对应的窗口
- 使用 Watermark 机制来处理乱序和延迟数据
- 结果准确,但可能有延迟(需要等待延迟数据)
- **使用场景**:
- 需要精确时序分析的场景
- 数据可能乱序或延迟到达的场景
- 历史数据回放和分析
- **配置方法**:使用 `WITH (TIMESTAMP='field_name', TIMEUNIT='ms')` 指定事件时间字段
- **示例**:
```sql
SELECT deviceId, COUNT(*) as cnt
FROM stream
GROUP BY deviceId, TumblingWindow('1m')
WITH (TIMESTAMP='eventTime', TIMEUNIT='ms')
```

#### 处理时间(Processing Time)

- **定义**:处理时间是指数据到达 StreamSQL 处理系统的时间,即系统接收到数据时的当前时间。
- **特点**:
- 窗口基于数据到达系统的时间(`time.Now()`)来划分
- 不管数据中的时间字段是什么值,都按到达时间统计到当前窗口
- 使用系统时钟(Timer)来触发窗口
- 延迟低,但结果可能不准确(无法处理乱序和延迟数据)
- **使用场景**:
- 实时监控和告警场景
- 对延迟要求高,对准确性要求相对较低的场景
- 数据顺序到达且延迟可控的场景
- **配置方法**:不指定 `WITH (TIMESTAMP=...)` 时,默认使用处理时间
- **示例**:
```sql
SELECT deviceId, COUNT(*) as cnt
FROM stream
GROUP BY deviceId, TumblingWindow('1m')
-- 不指定 WITH 子句,默认使用处理时间
```

#### 事件时间 vs 处理时间对比

| 特性 | 事件时间 (Event Time) | 处理时间 (Processing Time) |
|------|---------------------|-------------------------|
| **时间来源** | 数据中的时间戳字段 | 系统当前时间 |
| **窗口划分** | 基于事件时间戳 | 基于数据到达时间 |
| **延迟处理** | 支持(Watermark机制) | 不支持 |
| **乱序处理** | 支持(Watermark机制) | 不支持 |
| **结果准确性** | 准确 | 可能不准确 |
| **处理延迟** | 较高(需等待延迟数据) | 低(实时触发) |
| **配置方式** | `WITH (TIMESTAMP='field')` | 默认(不指定WITH) |
| **适用场景** | 精确时序分析、历史回放 | 实时监控、低延迟要求 |

#### 窗口时间

- **处理时间(Processing Time)**
- **定义**:数据到达处理系统的时间。
- **窗口开始时间(Window Start Time)**
- **定义**:基于事件时间,窗口的起始时间点。例如,对于一个基于事件时间的滑动窗口,窗口开始时间是窗口内最早事件的时间戳。
- **事件时间窗口**:窗口的起始时间点,基于事件时间对齐到窗口边界(如对齐到分钟、小时的整点)。
- **处理时间窗口**:窗口的起始时间点,基于数据到达系统的时间。
- **示例**:对于一个基于事件时间的滚动窗口 `TumblingWindow('5m')`,窗口开始时间会对齐到5分钟的倍数(如 10:00、10:05、10:10)。

- **窗口结束时间(Window End Time)**
- **定义**:基于事件时间,窗口的结束时间点。通常窗口结束时间是窗口开始时间加上窗口的持续时间。
- 例如,一个滑动窗口的持续时间为 1 分钟,则窗口结束时间是窗口开始时间加上 1 分钟。
- **事件时间窗口**:窗口的结束时间点,通常是窗口开始时间加上窗口的持续时间。窗口在 `watermark >= window_end` 时触发。
- **处理时间窗口**:窗口的结束时间点,基于数据到达系统的时间加上窗口持续时间。窗口在系统时钟到达结束时间时触发。
- **示例**:一个持续时间为 1 分钟的滚动窗口,如果窗口开始时间是 10:00,则窗口结束时间是 10:01。

#### Watermark 机制(仅事件时间窗口)

- **定义**:Watermark 表示"小于该时间的事件不应该再到达",用于判断窗口是否可以触发。
- **计算公式**:`Watermark = max(event_time) - MaxOutOfOrderness`
- **窗口触发条件**:当 `watermark >= window_end` 时,窗口触发
- **配置参数**:
- `MAXOUTOFORDERNESS`:允许的最大乱序时间,用于容忍数据乱序(默认:0,不允许乱序)
- `ALLOWEDLATENESS`:窗口触发后还能接受延迟数据的时间(默认:0,不接受延迟数据)
- `IDLETIMEOUT`:数据源空闲时,基于处理时间推进 Watermark 的超时时间(默认:0,禁用)
- **示例**:
```sql
SELECT deviceId, COUNT(*) as cnt
FROM stream
GROUP BY deviceId, TumblingWindow('5m')
WITH (
TIMESTAMP='eventTime',
TIMEUNIT='ms',
MAXOUTOFORDERNESS='5s', -- 容忍5秒的乱序
ALLOWEDLATENESS='2s', -- 窗口触发后还能接受2秒的延迟数据
IDLETIMEOUT='5s' -- 5秒无数据,基于处理时间推进watermark
)
```

## 贡献指南

Expand Down
73 changes: 73 additions & 0 deletions doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ integration with the RuleGo ecosystem.
• Lightweight design - Pure in-memory operations, no external dependencies
• SQL syntax support - Process stream data using familiar SQL syntax
• Multiple window types - Sliding, tumbling, counting, and session windows
• Event time and processing time - Support both time semantics for accurate stream processing
• Watermark mechanism - Handle out-of-order and late-arriving data with configurable tolerance
• Rich aggregate functions - MAX, MIN, AVG, SUM, STDDEV, MEDIAN, PERCENTILE, etc.
• Plugin-based custom functions - Runtime dynamic registration, supports 8 function types
• RuleGo ecosystem integration - Extend input/output sources using RuleGo components
Expand Down Expand Up @@ -107,6 +109,77 @@ StreamSQL supports multiple window types:
// Session window - Automatically closes session after 5-minute timeout
SELECT user_id, COUNT(*) FROM stream GROUP BY user_id, SessionWindow('5m')

# Event Time vs Processing Time

StreamSQL supports two time semantics for window processing:

## Processing Time (Default)

Processing time uses the system clock when data arrives. Windows are triggered based on data arrival time:

// Processing time window (default)
SELECT COUNT(*) FROM stream GROUP BY TumblingWindow('5m')
// Windows are triggered every 5 minutes based on when data arrives

## Event Time

Event time uses timestamps embedded in the data itself. Windows are triggered based on event timestamps,
allowing correct handling of out-of-order and late-arriving data:

// Event time window - Use 'order_time' field as event timestamp
SELECT COUNT(*) as order_count
FROM stream
GROUP BY TumblingWindow('5m')
WITH (TIMESTAMP='order_time')

// Event time with integer timestamp (Unix milliseconds)
SELECT AVG(temperature) FROM stream
GROUP BY TumblingWindow('1m')
WITH (TIMESTAMP='event_time', TIMEUNIT='ms')

## Watermark and Late Data Handling

Event time windows use watermark mechanism to handle out-of-order and late data:

// Configure max out-of-orderness (tolerate 5 seconds of out-of-order data)
SELECT COUNT(*) FROM stream
GROUP BY TumblingWindow('5m')
WITH (
TIMESTAMP='order_time',
MAXOUTOFORDERNESS='5s' // Watermark = max(event_time) - 5s
)

// Configure allowed lateness (accept late data for 2 seconds after window closes)
SELECT COUNT(*) FROM stream
GROUP BY TumblingWindow('5m')
WITH (
TIMESTAMP='order_time',
ALLOWEDLATENESS='2s' // Window stays open for 2s after trigger
)

// Combine both configurations
SELECT COUNT(*) FROM stream
GROUP BY TumblingWindow('5m')
WITH (
TIMESTAMP='order_time',
MAXOUTOFORDERNESS='5s', // Tolerate 5s out-of-order before trigger
ALLOWEDLATENESS='2s' // Accept 2s late data after trigger
)

// Configure idle source mechanism (advance watermark based on processing time when data source is idle)
SELECT COUNT(*) FROM stream
GROUP BY TumblingWindow('5m')
WITH (
TIMESTAMP='order_time',
IDLETIMEOUT='5s' // If no data arrives within 5s, watermark advances based on processing time
)

Key concepts:
• MaxOutOfOrderness: Affects watermark calculation, delays window trigger to tolerate out-of-order data
• AllowedLateness: Keeps window open after trigger to accept late data and update results
• IdleTimeout: When data source is idle (no data arrives within timeout), watermark advances based on processing time to ensure windows can close
• Watermark: Indicates that no events with timestamp less than watermark are expected

# Custom Functions

StreamSQL supports plugin-based custom functions with runtime dynamic registration:
Expand Down
Loading
Loading