Skip to content

feat: implement OPC UA mapper using v1beta1 mapper-framework#138

Open
Sarvagya035 wants to merge 1 commit intokubeedge:mainfrom
Sarvagya035:feat/opcua-mapper-v1beta1
Open

feat: implement OPC UA mapper using v1beta1 mapper-framework#138
Sarvagya035 wants to merge 1 commit intokubeedge:mainfrom
Sarvagya035:feat/opcua-mapper-v1beta1

Conversation

@Sarvagya035
Copy link

What type of PR is this?

/kind feature


What this PR does / why we need it:

This PR adds a built-in OPC UA mapper based on the v1beta1 version of the device management API and the new mapper-framework.
After the upgrade of the device management API to v1beta1, the existing built-in mappers based on v1alpha2 can no longer run in a KubeEdge cluster.
This change helps by extending built-in mapper support and improving the user experience for users migrating to v1beta1.


Which issue(s) this PR fixes:

Fixes kubeedge/kubeedge#6423


Signed-off-by: Sarvagya <sarvagya035@gmail.com>
@kubeedge-bot kubeedge-bot added the kind/feature Categorizes issue or PR as related to a new feature. label Feb 9, 2026
@kubeedge-bot
Copy link
Collaborator

Welcome @Sarvagya035! It looks like this is your first PR to kubeedge/mappers-go 🎉

@kubeedge-bot kubeedge-bot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label Feb 9, 2026
@kubeedge-bot
Copy link
Collaborator

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
To complete the pull request process, please assign fisherxu after the PR has been reviewed.
You can assign the PR to them by writing /assign @fisherxu in a comment when ready.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@gemini-code-assist
Copy link

Summary of Changes

Hello @Sarvagya035, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request delivers a critical update to KubeEdge's device management capabilities by introducing a new OPC UA mapper. This mapper is engineered to be fully compatible with the v1beta1 device management API and the modern mapper-framework, resolving issues faced by users with older v1alpha2-based mappers. The goal is to provide robust and flexible integration for OPC UA devices, enhancing the overall user experience and extending built-in support for industrial automation scenarios within KubeEdge.

Highlights

  • New OPC UA Mapper Implementation: This pull request introduces a new built-in OPC UA mapper, designed to integrate OPC UA devices with KubeEdge.
  • v1beta1 API and Mapper Framework Adoption: The new mapper is built using the v1beta1 version of the device management API and leverages the new mapper-framework, ensuring compatibility with the latest KubeEdge standards.
  • Enhanced Device Management Compatibility: This implementation addresses the incompatibility of older v1alpha2-based mappers with the v1beta1 API, providing extended built-in mapper support for users migrating to v1beta1.
  • Comprehensive Data Handling: The mapper includes support for various data persistence methods (InfluxDB2, MySQL, Redis, TDengine) and publishing methods (HTTP, MQTT, OpenTelemetry), along with a placeholder for stream data processing.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • mappers/Kubeedge-v1.22.0/opcua-mapper/Dockerfile_nostream
    • Added Dockerfile for building the OPC UA mapper image.
  • mappers/Kubeedge-v1.22.0/opcua-mapper/Makefile
    • Introduced a Makefile to streamline development tasks such as dependency management, code linting, building, and Docker image packaging for the mapper.
  • mappers/Kubeedge-v1.22.0/opcua-mapper/cmd/main.go
    • Added the main entry point for the OPC UA mapper application, setting up gRPC client/server communication and an HTTP server.
  • mappers/Kubeedge-v1.22.0/opcua-mapper/config.yaml
    • Added the default configuration file for the OPC UA mapper, defining gRPC server settings, common mapper properties, API version (v1beta1), and protocol details.
  • mappers/Kubeedge-v1.22.0/opcua-mapper/data/dbmethod/influxdb2/client.go
    • Implemented the InfluxDB2 client for storing device data.
  • mappers/Kubeedge-v1.22.0/opcua-mapper/data/dbmethod/influxdb2/handler.go
    • Added the data handler logic for InfluxDB2, enabling periodic data collection and storage.
  • mappers/Kubeedge-v1.22.0/opcua-mapper/data/dbmethod/mysql/client.go
    • Implemented the MySQL client for storing device data.
  • mappers/Kubeedge-v1.22.0/opcua-mapper/data/dbmethod/mysql/handler.go
    • Added the data handler logic for MySQL, enabling periodic data collection and storage.
  • mappers/Kubeedge-v1.22.0/opcua-mapper/data/dbmethod/redis/client.go
    • Implemented the Redis client for storing device data.
  • mappers/Kubeedge-v1.22.0/opcua-mapper/data/dbmethod/redis/handler.go
    • Added the data handler logic for Redis, enabling periodic data collection and storage.
  • mappers/Kubeedge-v1.22.0/opcua-mapper/data/dbmethod/tdengine/client.go
    • Implemented the TDengine client for storing device data.
  • mappers/Kubeedge-v1.22.0/opcua-mapper/data/dbmethod/tdengine/handler.go
    • Added the data handler logic for TDengine, enabling periodic data collection and storage.
  • mappers/Kubeedge-v1.22.0/opcua-mapper/data/publish/http/client.go
    • Implemented the HTTP client for publishing device data to external HTTP endpoints.
  • mappers/Kubeedge-v1.22.0/opcua-mapper/data/publish/mqtt/client.go
    • Implemented the MQTT client for publishing device data to MQTT brokers.
  • mappers/Kubeedge-v1.22.0/opcua-mapper/data/publish/otel/client.go
    • Implemented the OpenTelemetry (OTel) client for exporting device metrics.
  • mappers/Kubeedge-v1.22.0/opcua-mapper/data/publish/otel/handler.go
    • Added the data handler logic for OpenTelemetry (OTel), enabling periodic metric collection and export.
  • mappers/Kubeedge-v1.22.0/opcua-mapper/data/stream/handler.go
    • Added a placeholder for stream data processing, indicating future extensibility.
  • mappers/Kubeedge-v1.22.0/opcua-mapper/device/device.go
    • Implemented the core DevPanel logic for managing OPC UA devices, including initialization, starting/stopping devices, updating twins, and integrating various data handling mechanisms.
  • mappers/Kubeedge-v1.22.0/opcua-mapper/device/devicetwin.go
    • Defined the TwinData structure and associated methods for handling device twin updates and reporting to EdgeCore.
  • mappers/Kubeedge-v1.22.0/opcua-mapper/driver/devicetype.go
    • Defined data structures for CustomizedDev, CustomizedClient, ProtocolConfig, and VisitorConfig to represent OPC UA device configurations and client interactions.
  • mappers/Kubeedge-v1.22.0/opcua-mapper/driver/driver.go
    • Implemented the OPC UA client driver, including connection management, reading and writing device data, and handling security configurations.
  • mappers/Kubeedge-v1.22.0/opcua-mapper/go.mod
    • Updated Go module dependencies to include necessary libraries for OPC UA communication, various database clients, MQTT, and OpenTelemetry.
  • mappers/Kubeedge-v1.22.0/opcua-mapper/go.sum
    • Updated Go module checksums to reflect new dependencies.
  • mappers/Kubeedge-v1.22.0/opcua-mapper/hack/make-rules/mapper.sh
    • Added a shell script for automating mapper build processes, including dependency management, linting, building, and packaging.
  • mappers/Kubeedge-v1.22.0/opcua-mapper/resource/configmap.yaml
    • Added a Kubernetes ConfigMap definition for deploying the OPC UA mapper's configuration.
  • mappers/Kubeedge-v1.22.0/opcua-mapper/resource/deployment.yaml
    • Added an example Kubernetes Deployment manifest for deploying the OPC UA mapper on an edge node.
  • mappers/Kubeedge-v1.22.0/opcua-mapper/resource/device_instance_opcua.yaml
    • Added an example Kubernetes Device instance definition for an OPC UA lamp, demonstrating how to configure an OPC UA device.
  • mappers/Kubeedge-v1.22.0/opcua-mapper/resource/device_model_opcua.yaml
    • Added an example Kubernetes DeviceModel definition for an OPC UA device, specifying properties like temperature and a switcher.
Activity
  • This pull request introduces a new feature, the OPC UA mapper, with all files being newly added.
  • The initial commit includes the complete structure and implementation for the mapper, covering core logic, data handling, and deployment resources.
  • No prior comments or reviews are present, indicating this is the first iteration of this feature.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new OPC UA mapper built on the v1beta1 mapper-framework. The changes are extensive, adding a complete mapper implementation with drivers, data handlers for various databases and push methods, and associated configuration files. My review focuses on improving security, efficiency, and correctness across the new codebase. I've identified several critical SQL injection vulnerabilities, inefficient resource handling in MQTT and database clients, and potential race conditions or panics. I've provided suggestions to address these issues, such as using parameterized queries, reusing client connections, and ensuring thread safety.

Comment on lines +83 to +92
createTable := fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s` (id INT AUTO_INCREMENT PRIMARY KEY, ts DATETIME NOT NULL,field TEXT)", tableName)
_, err := DB.Exec(createTable)
if err != nil {
return fmt.Errorf("create tabe into mysql failed with err:%v", err)
}

stmt, err := DB.Prepare(fmt.Sprintf("INSERT INTO `%s` (ts,field) VALUES (?,?)", tableName))
if err != nil {
return fmt.Errorf("prepare parament failed with err:%v", err)
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Constructing SQL queries using fmt.Sprintf with inputs that can be controlled by users (like device metadata) creates a serious SQL injection vulnerability. An attacker could manipulate device names or properties to execute arbitrary SQL commands. Always use parameterized queries for all parts of the SQL statement, including table names if they must be dynamic (though it's better to have a fixed schema). Also, creating the table on every data insertion is inefficient and should be done once during initialization.

    // Sanitize table name to prevent SQL injection. Replace invalid characters.
	safeTableName := strings.ReplaceAll(tableName, "/", "_")
	safeTableName = strings.ReplaceAll(safeTableName, "`", "")

	createTable := fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s` (id INT AUTO_INCREMENT PRIMARY KEY, ts DATETIME NOT NULL, field TEXT)", safeTableName)
	_, err := DB.Exec(createTable)
	if err != nil {
		return fmt.Errorf("create table into mysql failed with err:%v", err)
	}

	stmt, err := DB.Prepare(fmt.Sprintf("INSERT INTO `%s` (ts,field) VALUES (?,?)", safeTableName))
	if err != nil {
		return fmt.Errorf("prepare statement failed with err:%v", err)
	}

Comment on lines +70 to +71
insertSQL := fmt.Sprintf("INSERT INTO %s USING %s TAGS ('%s') VALUES('%v','%s', '%s', '%s', '%s');",
legalTag, legalTable, legalTag, datatime, tableName, data.PropertyName, data.Value, data.Type)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Constructing SQL queries with fmt.Sprintf using potentially user-controlled data (like device and property names) leads to a critical SQL injection vulnerability. An attacker could craft malicious names to execute arbitrary SQL commands. You should always use parameterized queries or strictly sanitize any dynamic parts of the query.

Suggested change
insertSQL := fmt.Sprintf("INSERT INTO %s USING %s TAGS ('%s') VALUES('%v','%s', '%s', '%s', '%s');",
legalTag, legalTable, legalTag, datatime, tableName, data.PropertyName, data.Value, data.Type)
// It's safer to use parameterized queries if the driver supports them for table/tag names.
// If not, ensure stringent sanitization on legalTag and legalTable.
insertSQL := fmt.Sprintf("INSERT INTO ? USING ? TAGS (?) VALUES(?,?,?,?,?)")
// Then use DB.Exec(insertSQL, legalTag, legalTable, ...)

Comment on lines +56 to +63
resp, err := http.Post(targetUrl,
"application/x-www-form-urlencoded",
strings.NewReader(payload))

if err != nil {
fmt.Println(err)
}
defer resp.Body.Close()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

If http.Post returns an error, resp will be nil. The subsequent defer resp.Body.Close() will cause a panic. The defer statement should be placed after a successful error check. Additionally, the Timeout field in HTTPConfig is not being used. A default http.Client with a configured timeout should be used to prevent the application from hanging on unresponsive requests.

    client := http.Client{
        Timeout: time.Duration(pm.HTTP.Timeout) * time.Second,
    }
	resp, err := client.Post(targetUrl,
		"application/x-www-form-urlencoded",
		strings.NewReader(payload))

	if err != nil {
		klog.Errorf("HTTP Post error: %v", err)
		return
	}
	defer resp.Body.Close()

RUN CGO_ENABLED=0 GOOS=linux go build -o main cmd/main.go


FROM ubuntu:18.04

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The base image ubuntu:18.04 reached its end of standard support in April 2023. Using an outdated base image can expose the application to unpatched security vulnerabilities. It is highly recommended to upgrade to a more recent and supported LTS version, such as ubuntu:22.04, or consider a smaller, more secure base image like alpine.

FROM ubuntu:22.04

p := influxdb2.NewPoint(d.Influxdb2DataConfig.Measurement,
d.Influxdb2DataConfig.Tag,
map[string]interface{}{d.Influxdb2DataConfig.FieldKey: data.Value},
time.Now())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The AddData function uses time.Now() to timestamp the data point. However, the common.DataModel already contains a TimeStamp field (in milliseconds), which is set when the data is collected. Using time.Now() introduces a delay and uses a different timestamp from the actual data collection event. It's better to use the timestamp from the DataModel for data consistency.

Suggested change
time.Now())
time.UnixMilli(data.TimeStamp))

Comment on lines +15 to +17
var (
RedisCli *redis.Client
)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using a global variable RedisCli for the Redis client is not thread-safe and is considered bad practice. It can lead to race conditions and makes the code difficult to test. The Redis client should be a field within the DataBaseConfig struct, ensuring that each instance manages its own client.

var (
)

insertSQL := fmt.Sprintf("INSERT INTO %s USING %s TAGS ('%s') VALUES('%v','%s', '%s', '%s', '%s');",
legalTag, legalTable, legalTag, datatime, tableName, data.PropertyName, data.Value, data.Type)

rows, _ := DB.Query(stableName)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The error returned by DB.Query is being ignored. This is a bad practice as it can hide important issues, such as problems with the database connection or the query itself. The error should always be checked and handled appropriately.

    rows, err := DB.Query(stableName)
    if err != nil {
        klog.Errorf("failed to query stable: %v", err)
        return err
    }

Comment on lines +49 to +52
payload := data.PropertyName + "=" + data.Value
formatTimeStr := time.Unix(data.TimeStamp/1e3, 0).Format("2006-01-02 15:04:05")
currentTime := "&time" + "=" + formatTimeStr
payload += currentTime

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The payload for an application/x-www-form-urlencoded request is constructed by simple string concatenation. This is unsafe, as special characters in data.PropertyName or data.Value (e.g., &, =) will break the payload format. Use the net/url package to properly encode the values.

    form := url.Values{}
    form.Add(data.PropertyName, data.Value)
    form.Add("time", time.Unix(data.TimeStamp/1e3, 0).Format("2006-01-02 15:04:05"))
    payload := form.Encode()

Comment on lines +7 to +8
protocol: opcua # TODO add your protocol name
address: 127.0.0.1:1234 # TODO add your protocol address

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The configuration file contains TODO comments, which suggests that the configuration might be incomplete or using placeholder values. These should be removed and replaced with appropriate default values before merging to avoid confusion and ensure the mapper is configured correctly out-of-the-box.

  protocol:  opcua
  address: opc.tcp://127.0.0.1:4840 # Default OPC UA server address

@@ -0,0 +1,157 @@
package tdengine

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This file starts with a UTF-8 Byte Order Mark (BOM), which is unnecessary in Go source files and can sometimes cause issues with tools. It should be removed.

Suggested change
package tdengine
package tdengine

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kind/feature Categorizes issue or PR as related to a new feature. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement more built-in mappers for typical protocols

2 participants