diff --git a/cmd/run.go b/cmd/run.go index 376d45629..c4637b617 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -34,6 +34,7 @@ import ( "github.com/agglayer/aggkit/prometheus" "github.com/agglayer/aggkit/reorgdetector" "github.com/agglayer/aggkit/rpc" + "github.com/agglayer/aggkit/sqliteapi" "github.com/ethereum/go-ethereum/ethclient" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/urfave/cli/v2" @@ -152,6 +153,11 @@ func start(cliCtx *cli.Context) error { go pprof.StartProfilingHTTPServer(cliCtx.Context, cfg.Profiling) } + // Start SQLite API service if enabled + if cfg.SqliteAPI.Enabled { + go startSQLiteAPIService(cliCtx.Context, cfg.SqliteAPI, cfg) + } + waitSignal(nil) return nil @@ -713,3 +719,23 @@ func startPrometheusHTTPServer(c prometheus.Config) { return } } + +func startSQLiteAPIService(ctx context.Context, cfg sqliteapi.Config, mainCfg *config.Config) { + logger := log.WithFields("module", "sqliteapi") + + // Build database paths map from configuration + dbPaths := make(map[string]string) + + // Add all configured databases + dbPaths["L1InfoTreeSync"] = mainCfg.L1InfoTreeSync.DBPath + dbPaths["BridgeL2Sync"] = mainCfg.BridgeL2Sync.DBPath + dbPaths["AggSender"] = mainCfg.AggSender.StoragePath + dbPaths["ReorgDetectorL1"] = mainCfg.ReorgDetectorL1.DBPath + dbPaths["ReorgDetectorL2"] = mainCfg.ReorgDetectorL2.DBPath + + service := sqliteapi.NewService(cfg, dbPaths, logger) + + if err := service.Start(); err != nil { + log.Fatal("Failed to start SQLite API service: ", err) + } +} diff --git a/config/config.go b/config/config.go index 5fc17ac3c..37c64fb0c 100644 --- a/config/config.go +++ b/config/config.go @@ -21,6 +21,7 @@ import ( "github.com/agglayer/aggkit/pprof" "github.com/agglayer/aggkit/prometheus" "github.com/agglayer/aggkit/reorgdetector" + "github.com/agglayer/aggkit/sqliteapi" "github.com/mitchellh/mapstructure" "github.com/pelletier/go-toml/v2" "github.com/spf13/viper" @@ -157,6 +158,9 @@ type Config struct { // Profiling is the configuration of the profiling service Profiling pprof.Config + + // SqliteAPI is the configuration for the SQLite API service + SqliteAPI sqliteapi.Config } // Load loads the configuration diff --git a/go.mod b/go.mod index 7ecf11a82..03c266e4a 100644 --- a/go.mod +++ b/go.mod @@ -115,6 +115,7 @@ require ( github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect github.com/googleapis/gax-go v1.0.3 // indirect github.com/googleapis/gax-go/v2 v2.14.1 // indirect + github.com/gorilla/mux v1.8.1 // indirect github.com/gorilla/websocket v1.5.3 // indirect github.com/hashicorp/go-bexpr v0.1.11 // indirect github.com/holiman/billy v0.0.0-20240216141850-2abb0c79d3c4 // indirect diff --git a/go.sum b/go.sum index 889b70c5d..c23780511 100644 --- a/go.sum +++ b/go.sum @@ -222,6 +222,8 @@ github.com/googleapis/gax-go v1.0.3/go.mod h1:QyXYajJFdARxGzjwUfbDFIse7Spkw81SJ4 github.com/googleapis/gax-go/v2 v2.0.2/go.mod h1:LLvjysVCY1JZeum8Z6l8qUty8fiNwE08qbEPm1M08qg= github.com/googleapis/gax-go/v2 v2.14.1 h1:hb0FFeiPaQskmvakKu5EbCbpntQn48jyHuvrkurSS/Q= github.com/googleapis/gax-go/v2 v2.14.1/go.mod h1:Hb/NubMaVM88SrNkvl8X/o8XWwDJEPqouaLeN2IUxoA= +github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= +github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hashicorp/go-bexpr v0.1.11 h1:6DqdA/KBjurGby9yTY0bmkathya0lfwF2SeuubCI7dY= diff --git a/sqliteapi/README.md b/sqliteapi/README.md new file mode 100644 index 000000000..0c3745f62 --- /dev/null +++ b/sqliteapi/README.md @@ -0,0 +1,183 @@ +# SQLite API Service + +This module provides a REST API service for interacting with SQLite databases used by various components of the aggkit project. + +## Features + +- **Database List**: Get list of available databases with their tables +- **Select Support**: Execute SELECT queries on SQLite databases +- **Rate Limiting**: Configurable rate limiting per IP address +- **CORS Support**: Cross-origin resource sharing enabled +- **JSON-RPC 2.0**: Standard JSON-RPC 2.0 protocol support + +## Configuration + +The SQLite API service can be configured in the main configuration file: + +```toml +[SqliteAPI] +Enabled = true +Host = "0.0.0.0" +Port = 8080 +MaxRequestsPerIPAndSecond = 10 +``` + +### Configuration Options + +- `Enabled`: Enable or disable the SQLite API service +- `Host`: Host address to bind the service to +- `Port`: Port number for the service +- `MaxRequestsPerIPAndSecond`: Maximum number of requests allowed per IP address per second + +## SQLite JSON-RPC Methods + +The following JSON-RPC methods can be used to perform database operations: + +### sqlite_getDbs + +This method retrieves the list of databases managed by the SQLite transaction manager. + +#### Request: +```json +{ + "id": 1, + "jsonrpc": "2.0", + "method": "sqlite_getDbs", + "params": [] +} +``` + +--- + +### sqlite_select + +This method executes a SQL `SELECT` query on the `monitored_txs` table to retrieve transaction data for a specific transaction ID. + +#### Request: +```json +{ + "id": 1, + "jsonrpc": "2.0", + "method": "sqlite_select", + "params": ["AggSender", "SELECT * FROM certificate_info LIMIT 2;"] +} +``` + + + +## Database Types + +The following database types are supported: + +- `L1InfoTreeSync`: L1 info tree sync database (L1InfoTreeSync.sqlite) +- `BridgeL2Sync`: Bridge L2 sync database (bridgel2sync.sqlite) +- `AggSender`: Aggregator sender database (aggsender.sqlite) +- `ReorgDetectorL1`: L1 reorg detector database (reorgdetectorl1.sqlite) +- `ReorgDetectorL2`: L2 reorg detector database (reorgdetectorl2.sqlite) + +## Error Handling + +The API returns standard JSON-RPC 2.0 error responses: + +```json +{ + "jsonrpc": "2.0", + "id": 1, + "result": null, + "error": { + "code": 400, + "message": "Invalid request body" + } +} +``` + + + +## Rate Limiting + +The service implements rate limiting based on client IP addresses. Each IP is limited to the number of requests specified in `MaxRequestsPerIPAndSecond` configuration. + +When rate limit is exceeded, the service returns HTTP 429 (Too Many Requests) status code. + +## Security Considerations + +- The service binds to all interfaces by default (`0.0.0.0`) +- Rate limiting helps prevent abuse +- CORS is enabled for cross-origin requests +- Only SELECT statements are allowed - all data modification operations are blocked +- Input validation is performed on all requests + +## Usage Example + +```bash +# Get available databases +curl -X POST http://localhost:8080/ \ + -H "Content-Type: application/json" \ + -d '{ + "jsonrpc": "2.0", + "id": 1, + "method": "sqlite_getDbs", + "params": [] + }' + +# Response includes: +# - AggSender: ["gorp_migrations", "certificate_info", "certificate_info_history", "key_value"] +# - BridgeL2Sync: ["gorp_migrations", "key_value", "block", "bridge", "claim", "root", "rht"] +# - L1InfoTreeSync: ["gorp_migrations", "key_value", "l1_info_root", "l1_info_rht", "block", "l1info_leaf", "verify_batches", "l1info_initial", "rollup_exit_root", "rollup_exit_rht"] +# - ReorgDetectorL1: ["gorp_migrations", "key_value", "tracked_block", "reorg_event"] +# - ReorgDetectorL2: ["gorp_migrations", "key_value", "tracked_block", "reorg_event"] + + + +# Select example - Query certificate_info table +curl -X POST http://localhost:8080/ \ + -H "Content-Type: application/json" \ + -d '{ + "jsonrpc": "2.0", + "id": 1, + "method": "sqlite_select", + "params": ["AggSender", "SELECT * FROM certificate_info LIMIT 2;"] + }' + +# Select example - Query ReorgDetectorL1 tracked_block table +curl -X POST http://localhost:8080/ \ + -H "Content-Type: application/json" \ + -d '{ + "jsonrpc": "2.0", + "id": 2, + "method": "sqlite_select", + "params": ["ReorgDetectorL1", "SELECT * FROM tracked_block LIMIT 2;"] + }' + + + +# Error example - Attempting DELETE operation (blocked) +curl -X POST http://localhost:8080/ \ + -H "Content-Type: application/json" \ + -d '{ + "jsonrpc": "2.0", + "id": 2, + "method": "sqlite_select", + "params": ["AggSender", "DELETE FROM certificate_info WHERE id = 1;"] + }' + + +``` + +## Integration with aggkit + +The SQLite API service is automatically started when the aggkit application starts and the `SqliteAPI.Enabled` configuration is set to `true`. The service runs in a separate goroutine and provides HTTP endpoints for database operations. + +The service maps database types to the actual database file paths used by various aggkit components, allowing external applications to interact with the underlying SQLite databases. + +## Database File Locations + +The service uses the actual database paths configured in the main configuration file: + +- `L1InfoTreeSync`: Uses `L1InfoTreeSync.DBPath` from configuration (default: `/tmp/aggkit/L1InfoTreeSync.sqlite`) +- `BridgeL2Sync`: Uses `BridgeL2Sync.DBPath` from configuration (default: `/tmp/aggkit/bridgel2sync.sqlite`) +- `AggSender`: Uses `AggSender.StoragePath` from configuration (default: `/tmp/aggkit/aggsender.sqlite`) +- `ReorgDetectorL1`: Uses `ReorgDetectorL1.DBPath` from configuration (default: `/tmp/aggkit/reorgdetectorl1.sqlite`) +- `ReorgDetectorL2`: Uses `ReorgDetectorL2.DBPath` from configuration (default: `/tmp/aggkit/reorgdetectorl2.sqlite`) + +The SQLite API service will attempt to access all configured databases. If a database file doesn't exist or is not accessible, the service will log a warning but continue to operate with the available databases. \ No newline at end of file diff --git a/sqliteapi/config.go b/sqliteapi/config.go new file mode 100644 index 000000000..731722c54 --- /dev/null +++ b/sqliteapi/config.go @@ -0,0 +1,13 @@ +package sqliteapi + +// Config is the configuration for the SQLite API service +type Config struct { + // Enabled is a flag to enable the SQLite API service + Enabled bool `mapstructure:"Enabled"` + // Host is the host address for the SQLite API service + Host string `mapstructure:"Host"` + // Port is the port for the SQLite API service + Port int `mapstructure:"Port"` + // MaxRequestsPerIPAndSecond is the maximum number of requests per IP per second + MaxRequestsPerIPAndSecond int `mapstructure:"MaxRequestsPerIPAndSecond"` +} diff --git a/sqliteapi/service.go b/sqliteapi/service.go new file mode 100644 index 000000000..265bf5349 --- /dev/null +++ b/sqliteapi/service.go @@ -0,0 +1,373 @@ +package sqliteapi + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + + "github.com/agglayer/aggkit/common" + "github.com/agglayer/aggkit/db" + "github.com/gorilla/mux" + "golang.org/x/time/rate" +) + +// Service represents the SQLite API service +type Service struct { + cfg Config + logger common.Logger + server *http.Server + dbPaths map[string]string + limiters map[string]*rate.Limiter +} + +// NewService creates a new SQLite API service +func NewService(cfg Config, dbPaths map[string]string, logger common.Logger) *Service { + return &Service{ + cfg: cfg, + logger: logger, + dbPaths: dbPaths, + limiters: make(map[string]*rate.Limiter), + } +} + +// Start starts the SQLite API service +func (s *Service) Start() error { + router := mux.NewRouter() + + // Add rate limiting middleware + router.Use(s.rateLimitMiddleware) + + // Add CORS middleware + router.Use(s.corsMiddleware) + + // Register routes - use root path and handle by method + router.HandleFunc("/", s.handleRequest).Methods("POST") + + s.server = &http.Server{ + Addr: fmt.Sprintf("%s:%d", s.cfg.Host, s.cfg.Port), + Handler: router, + } + + s.logger.Infof("Starting SQLite API service on %s:%d", + s.cfg.Host, s.cfg.Port) + return s.server.ListenAndServe() +} + +// Stop stops the SQLite API service +func (s *Service) Stop(ctx context.Context) error { + return s.server.Shutdown(ctx) +} + +// rateLimitMiddleware applies rate limiting based on IP address +func (s *Service) rateLimitMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ip := getClientIP(r) + limiter, exists := s.limiters[ip] + if !exists { + limiter = rate.NewLimiter(rate.Limit(s.cfg.MaxRequestsPerIPAndSecond), s.cfg.MaxRequestsPerIPAndSecond) + s.limiters[ip] = limiter + } + + if !limiter.Allow() { + http.Error(w, "Rate limit exceeded", http.StatusTooManyRequests) + return + } + + next.ServeHTTP(w, r) + }) +} + +// corsMiddleware adds CORS headers +func (s *Service) corsMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS") + w.Header().Set("Access-Control-Allow-Headers", "Content-Type") + + if r.Method == "OPTIONS" { + w.WriteHeader(http.StatusOK) + return + } + + next.ServeHTTP(w, r) + }) +} + +// handleRequest handles all requests and routes them based on the method +func (s *Service) handleRequest(w http.ResponseWriter, r *http.Request) { + // Read the request body to determine the method + var req map[string]interface{} + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + s.sendErrorResponse(w, "Invalid request body", http.StatusBadRequest) + return + } + + // Get the method from the request + method, ok := req["method"].(string) + if !ok { + s.sendErrorResponse(w, "Missing or invalid method", http.StatusBadRequest) + return + } + + // Route based on method + switch method { + case "sqlite_getDbs": + s.handleSQLiteGetDbs(w, r, req) + case "sqlite_select": + s.handleSQLiteSelect(w, r, req) + default: + s.sendErrorResponse(w, fmt.Sprintf("Unknown method: %s", method), http.StatusBadRequest) + } +} + +// handleSQLiteGetDbs handles sqlite_getDbs requests +func (s *Service) handleSQLiteGetDbs(w http.ResponseWriter, r *http.Request, req map[string]interface{}) { + // Validate JSON-RPC version + jsonrpc, ok := req["jsonrpc"].(string) + if !ok || jsonrpc != "2.0" { + s.sendErrorResponse(w, "Invalid JSON-RPC version", http.StatusBadRequest) + return + } + + // Get request ID + id, ok := req["id"].(float64) + if !ok { + s.sendErrorResponse(w, "Missing or invalid id", http.StatusBadRequest) + return + } + + // Return the list of available databases with their tables + result := make(map[string][]string) + for dbName, dbPath := range s.dbPaths { + db, err := db.NewSQLiteDB(dbPath) + if err != nil { + s.logger.Warnf("Failed to open database %s: %v", dbName, err) + continue + } + defer db.Close() + + // Query to get table names + rows, err := db.Query("SELECT name FROM sqlite_master WHERE type='table';") + if err != nil { + s.logger.Warnf("Failed to get tables for database %s: %v", dbName, err) + continue + } + defer rows.Close() + + var tables []string + for rows.Next() { + var tableName string + if err := rows.Scan(&tableName); err != nil { + s.logger.Warnf("Failed to scan table name for database %s: %v", dbName, err) + continue + } + tables = append(tables, tableName) + } + + result[dbName] = tables + } + + response := SQLiteResponse{ + JSONRPC: "2.0", + ID: int(id), + Result: result, + Error: nil, + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} + +// handleSQLiteSelect handles sqlite_select requests +func (s *Service) handleSQLiteSelect(w http.ResponseWriter, r *http.Request, req map[string]interface{}) { + // Validate JSON-RPC version + jsonrpc, ok := req["jsonrpc"].(string) + if !ok || jsonrpc != "2.0" { + s.sendErrorResponse(w, "Invalid JSON-RPC version", http.StatusBadRequest) + return + } + + // Get request ID + id, ok := req["id"].(float64) + if !ok { + s.sendErrorResponse(w, "Missing or invalid id", http.StatusBadRequest) + return + } + + // Get params + params, ok := req["params"].([]interface{}) + if !ok || len(params) < 2 { + s.sendErrorResponse(w, "Missing or invalid params", http.StatusBadRequest) + return + } + + dbName, ok := params[0].(string) + if !ok { + s.sendErrorResponse(w, "Invalid database name", http.StatusBadRequest) + return + } + + sqlQuery, ok := params[1].(string) + if !ok { + s.sendErrorResponse(w, "Invalid SQL query", http.StatusBadRequest) + return + } + + // Validate SQL query - only allow SELECT statements + if !s.isValidSelectQuery(sqlQuery) { + s.sendErrorResponse(w, "Only SELECT statements are allowed", http.StatusBadRequest) + return + } + + dbPath, exists := s.dbPaths[dbName] + if !exists { + s.sendErrorResponse(w, "Database not found", http.StatusNotFound) + return + } + + db, err := db.NewSQLiteDB(dbPath) + if err != nil { + s.sendErrorResponse(w, fmt.Sprintf("Failed to open database: %v", err), http.StatusInternalServerError) + return + } + defer db.Close() + + rows, err := db.Query(sqlQuery) + if err != nil { + s.sendErrorResponse(w, fmt.Sprintf("Query failed: %v", err), http.StatusInternalServerError) + return + } + defer rows.Close() + + columns, err := rows.Columns() + if err != nil { + s.sendErrorResponse(w, fmt.Sprintf("Failed to get columns: %v", err), http.StatusInternalServerError) + return + } + + var results []map[string]interface{} + for rows.Next() { + values := make([]interface{}, len(columns)) + valuePtrs := make([]interface{}, len(columns)) + for i := range values { + valuePtrs[i] = &values[i] + } + + if err := rows.Scan(valuePtrs...); err != nil { + s.sendErrorResponse(w, fmt.Sprintf("Failed to scan row: %v", err), http.StatusInternalServerError) + return + } + + // Create a map with "Fields" key containing the row data + fields := make(map[string]interface{}) + for i, col := range columns { + val := values[i] + if val == nil { + fields[col] = nil + } else { + switch v := val.(type) { + case []byte: + fields[col] = string(v) + default: + fields[col] = v + } + } + } + + row := map[string]interface{}{ + "Fields": fields, + } + results = append(results, row) + } + + response := SQLiteResponse{ + JSONRPC: "2.0", + ID: int(id), + Result: results, + Error: nil, + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} + +// sendErrorResponse sends an error response +func (s *Service) sendErrorResponse(w http.ResponseWriter, message string, statusCode int) { + response := SQLiteResponse{ + JSONRPC: "2.0", + ID: 1, + Result: nil, + Error: &SQLiteError{ + Code: statusCode, + Message: message, + }, + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(statusCode) + json.NewEncoder(w).Encode(response) +} + +// getClientIP extracts the client IP address from the request +func getClientIP(r *http.Request) string { + // Check for X-Forwarded-For header first + if forwarded := r.Header.Get("X-Forwarded-For"); forwarded != "" { + ips := strings.Split(forwarded, ",") + if len(ips) > 0 { + return strings.TrimSpace(ips[0]) + } + } + + // Check for X-Real-IP header + if realIP := r.Header.Get("X-Real-IP"); realIP != "" { + return realIP + } + + // Fall back to RemoteAddr + ip := r.RemoteAddr + if colonIndex := strings.LastIndex(ip, ":"); colonIndex != -1 { + ip = ip[:colonIndex] + } + return ip +} + +// isValidSelectQuery checks if the SQL query is a valid SELECT statement +func (s *Service) isValidSelectQuery(query string) bool { + // Convert to lowercase and trim whitespace for easier parsing + query = strings.ToLower(strings.TrimSpace(query)) + + // Check if it starts with "select" + if !strings.HasPrefix(query, "select") { + return false + } + + // Check for forbidden keywords that could modify data + forbiddenKeywords := []string{ + "insert", "update", "delete", "drop", "create", "alter", + "truncate", "replace", "merge", "upsert", + } + + for _, keyword := range forbiddenKeywords { + if strings.Contains(query, keyword) { + return false + } + } + + return true +} + +// SQLiteResponse represents a SQLite API response +type SQLiteResponse struct { + JSONRPC string `json:"jsonrpc"` + ID int `json:"id"` + Result interface{} `json:"result"` + Error *SQLiteError `json:"error"` +} + +// SQLiteError represents a SQLite API error +type SQLiteError struct { + Code int `json:"code"` + Message string `json:"message"` +}