-
Notifications
You must be signed in to change notification settings - Fork 5
Open
Labels
enhancementNew feature or requestNew feature or requesthelp wantedExtra attention is neededExtra attention is needed
Description
Issue Description
Overview
We would like to request the implementation of stream-dimension table JOIN functionality in StreamSQL, enabling real-time enrichment of streaming data with dimension table lookups. This feature would allow users to perform temporal joins between streaming data and slowly changing dimension tables.
Proposed SQL Syntax
SELECT
s.device_id,
s.temperature,
s.humidity,
s.timestamp,
d.device_name,
d.location,
d.device_type,
d.manufacturer
FROM stream s
JOIN device_info d
ON s.device_id = d.device_id
WHERE s.temperature > 80Usage Example
package main
import (
"fmt"
"log"
"time"
)
func main() {
// 1. Create StreamSQL instance
streamsql := NewStreamSQL()
// 2. Prepare device dimension table data using array format
deviceInfoArray := []map[string]interface{}{
{
"device_id": "DEV001",
"device_name": "Temperature Sensor 01",
"location": "Workshop A - Zone 1",
"device_type": "temperature_sensor",
"manufacturer": "Huawei",
"install_date": "2023-06-15",
"maintenance_cycle": 90,
},
{
"device_id": "DEV002",
"device_name": "Pressure Sensor 02",
"location": "Workshop B - Zone 2",
"device_type": "pressure_sensor",
"manufacturer": "Siemens",
"install_date": "2023-08-20",
"maintenance_cycle": 120,
},
{
"device_id": "DEV003",
"device_name": "Vibration Sensor 03",
"location": "Workshop C - Zone 1",
"device_type": "vibration_sensor",
"manufacturer": "Schneider",
"install_date": "2023-09-10",
"maintenance_cycle": 60,
},
{
"device_id": "DEV004",
"device_name": "Flow Meter 04",
"location": "Pipeline A - Section 3",
"device_type": "flow_meter",
"manufacturer": "Emerson",
"install_date": "2023-07-25",
"maintenance_cycle": 180,
},
{
"device_id": "DEV005",
"device_name": "Level Sensor 05",
"location": "Tank B - Top",
"device_type": "level_sensor",
"manufacturer": "Endress+Hauser",
"install_date": "2023-05-10",
"maintenance_cycle": 150,
},
}
// 3. Create device dimension source from array
deviceDimSource := NewArrayDimensionSource(deviceInfoArray, "device_id")
// 4. Register device dimension table
err := streamsql.RegisterDimensionTable("device_info", deviceDimSource)
if err != nil {
log.Fatal(err)
}
// 5. Execute device data JOIN query
sql := `
SELECT
s.device_id,
s.temperature,
s.humidity,
s.pressure,
s.timestamp,
s.status,
d.device_name,
d.location,
d.device_type,
d.manufacturer,
d.maintenance_cycle
FROM stream s
JOIN device_info d
ON s.device_id = d.device_id
WHERE s.temperature > 80
`
stream, err := streamsql.Execute(sql)
if err != nil {
log.Fatal(err)
}
Implementation Benefits
- Simplified Architecture: Focus on in-memory processing for fast lookups
- Clean Interface: Simple
DimensionSourceinterface for extensibility - Default Implementation: Ready-to-use in-memory dimension source
- Easy Integration: Minimal configuration required
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or requesthelp wantedExtra attention is neededExtra attention is needed