Add Modbus mapper for KubeEdge 1.21#136
Conversation
Signed-off-by: Shelley-BaoYue <baoyue2@huawei.com>
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
Summary of ChangesHello @Shelley-BaoYue, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances KubeEdge's capabilities by introducing a new Modbus mapper tailored for version 1.21. The mapper provides a robust solution for integrating Modbus-enabled industrial devices, offering extensive options for data handling. It allows users to persist device data in various popular databases and publish it through multiple communication protocols, thereby expanding KubeEdge's reach into industrial IoT scenarios. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request adds a comprehensive Modbus mapper for KubeEdge, including support for various data persistence and publishing methods like InfluxDB, MySQL, Redis, TDengine, HTTP, MQTT, and OpenTelemetry. While the scope of the changes is impressive, the implementation has several critical issues that need to be addressed. These include security vulnerabilities (SQL injection), concurrency problems (thread-unsafe client maps), and bugs that will lead to application panics. There are also significant inefficiencies, such as creating new clients for every request and redundant data fetching. The Dockerfiles and Makefile contain errors and inconsistencies that will prevent successful builds and deployments. Much of the code appears to be copy-pasted from templates without proper adaptation, leading to numerous errors and inconsistencies. I've provided detailed comments and suggestions to fix these issues.
| if token := client.Connect(); token.Wait() && token.Error() != nil { | ||
| fmt.Println(token.Error()) | ||
| os.Exit(1) | ||
| } |
There was a problem hiding this comment.
Calling os.Exit(1) on a connection error is a critical issue. A library or handler function should never terminate the entire application. It should return an error to the caller to be handled gracefully.
| if token := client.Connect(); token.Wait() && token.Error() != nil { | |
| fmt.Println(token.Error()) | |
| os.Exit(1) | |
| } | |
| if token := client.Connect(); token.Wait() && token.Error() != nil { | |
| klog.Errorf("Failed to connect to MQTT broker: %v", token.Error()) | |
| return | |
| } |
| } | ||
|
|
||
| func (cfg *Config) InitProvider(reportCycle time.Duration, dataModel *common.DataModel) (*metric.MeterProvider, error) { | ||
| exp, err := otlpmetrichttp.New(context.Background(), WithEndpointURL(cfg.EndpointURL)...) |
There was a problem hiding this comment.
The WithEndpointURL function can return nil if url.Parse fails. The caller here does not check for nil and uses the result with the spread operator (...), which will cause a panic. You should handle the potential error from WithEndpointURL.
| exp, err := otlpmetrichttp.New(context.Background(), WithEndpointURL(cfg.EndpointURL)...) | |
| opts := WithEndpointURL(cfg.EndpointURL) | |
| if opts == nil { | |
| return nil, fmt.Errorf("invalid endpoint URL: %s", cfg.EndpointURL) | |
| } | |
| exp, err := otlpmetrichttp.New(context.Background(), opts...) |
| var value uint16 | ||
| switch visitor.Register { | ||
| case CoilRegister, InputRegister, DiscreteInputRegister: | ||
| //The returned Coil register data occupies 1 byte | ||
| value = uint16(data[0]) | ||
|
|
||
| case HoldingRegister: | ||
| // The returned Holding register data occupies 2 bytes | ||
| value = binary.BigEndian.Uint16(data) | ||
| } |
There was a problem hiding this comment.
The logic for reading register data only handles a single 16-bit value. It doesn't account for data types that may span multiple registers, such as 32-bit integers, floats, or strings. The Limit from the visitor config should be used to read the correct number of registers, and the resulting byte slice should be processed according to the data type and byte/word order settings (e.g., IsSwap, IsRegisterSwap).
| func (d *DataBaseConfig) GetDataByDeviceID(deviceID string) ([]*common.DataModel, error) { | ||
| ctx := context.Background() | ||
|
|
||
| dataJSON, err := RedisCli.ZRevRange(ctx, deviceID, 0, -1).Result() | ||
| if err != nil { | ||
| klog.V(4).Infof("fail query data for deviceName,err:%v", err) | ||
| } | ||
|
|
||
| var dataModels []*common.DataModel | ||
|
|
||
| for _, jsonStr := range dataJSON { | ||
| var data common.DataModel | ||
| if err := json.Unmarshal([]byte(jsonStr), &data); err != nil { | ||
| klog.V(4).Infof("Error unMarshaling data: %v\n", err) | ||
| continue | ||
| } | ||
|
|
||
| dataModels = append(dataModels, &data) | ||
| } | ||
| return dataModels, nil | ||
| } |
There was a problem hiding this comment.
| default: | ||
| klog.Errorf("Invalid CommunicateMode: %s", c.ConfigData.CommunicateMode) | ||
| } |
There was a problem hiding this comment.
If an invalid CommunicateMode is provided, an error is logged, but the function continues. This will lead to config being nil, and the call to NewModBusClient will eventually cause a panic. The function should return an error immediately.
| default: | |
| klog.Errorf("Invalid CommunicateMode: %s", c.ConfigData.CommunicateMode) | |
| } | |
| default: | |
| return fmt.Errorf("invalid CommunicateMode: %s", c.ConfigData.CommunicateMode) |
| return fmt.Errorf("get device data fail: %v", err) | ||
| } | ||
|
|
||
| o.ObserveFloat64(gauge, data.(float64)) |
There was a problem hiding this comment.
This is an unsafe type assertion. GetDeviceData returns an interface{}, and asserting it directly to float64 will cause a panic if the underlying type is different. You should use the two-value form of type assertion to check the type safely.
value, ok := data.(float64)
if !ok {
return fmt.Errorf("expected float64 from device, but got %T", data)
}
o.ObserveFloat64(gauge, value)| @@ -0,0 +1,157 @@ | |||
| package tdengine | |||
| klog.Errorf("init redis database client err: %v", err) | ||
| return | ||
| } | ||
| reportCycle := time.Duration(twin.Property.ReportCycle) |
There was a problem hiding this comment.
The ReportCycle from the twin property is in milliseconds. It should be multiplied by time.Millisecond to get the correct time.Duration. The current implementation treats it as nanoseconds, which will result in a much shorter (and likely incorrect) reporting interval.
| reportCycle := time.Duration(twin.Property.ReportCycle) | |
| reportCycle := time.Millisecond * time.Duration(twin.Property.ReportCycle) |
| p := influxdb2.NewPoint(d.Influxdb2DataConfig.Measurement, | ||
| d.Influxdb2DataConfig.Tag, | ||
| map[string]interface{}{d.Influxdb2DataConfig.FieldKey: data.Value}, | ||
| time.Now()) |
There was a problem hiding this comment.
The timestamp for the data point is being set to time.Now(), which is the time on the mapper. It should use the timestamp from the data model (data.TimeStamp) to accurately reflect when the data was generated or collected. data.TimeStamp is in milliseconds.
| time.Now()) | |
| time.UnixMilli(data.TimeStamp)) |
|
|
||
| func (d *DataBaseConfig) InitDbClient() influxdb2.Client { | ||
| var usrtoken string | ||
| usrtoken = os.Getenv("TOKEN") |
There was a problem hiding this comment.
Add Modbus Mapper based on KubeEdge v1.21 device api