@@ -23,126 +23,98 @@ import (
23
23
"sync"
24
24
)
25
25
26
- // DataSource is an interface required from all data layer data collection
27
- // sources.
26
+ // DataSource provides raw data to registered Extractors.
28
27
type DataSource interface {
29
- // Name returns the name of this datasource.
30
28
Name () string
31
-
32
- // AddExtractor adds an extractor to the data source .
33
- // The extractor will be called whenever the Collector might
29
+ // AddExtractor adds an extractor to the data source. Multiple
30
+ // Extractors can be registered .
31
+ // The extractor will be called whenever the DataSource might
34
32
// have some new raw information regarding an endpoint.
35
33
// The Extractor's expected input type should be validated against
36
34
// the data source's output type upon registration.
37
35
AddExtractor (extractor Extractor ) error
38
-
39
36
// Collect is triggered by the data layer framework to fetch potentially new
40
- // data for an endpoint. It passes retrieved data to registered Extractors.
37
+ // data for an endpoint. Collect calls registered Extractors to convert the
38
+ // raw data into structured attributes.
41
39
Collect (ep Endpoint )
42
40
}
43
41
44
- // Extractor is used to convert raw data into relevant data layer information
45
- // for an endpoint. They are called by data sources whenever new data might be
46
- // available. Multiple Extractors can be registered with a source. Extractors
47
- // are expected to save their output with an endpoint so it becomes accessible
48
- // to consumers in other subsystem of the inference gateway (e.g., when making
49
- // scheduling decisions).
42
+ // Extractor transforms raw data into structured attributes.
50
43
type Extractor interface {
51
- // Name returns the name of the extractor.
52
44
Name () string
53
-
54
- // ExpectedType defines the type expected by the extractor. It must match
55
- // the output type of the data source where the extractor is registered.
45
+ // ExpectedType defines the type expected by the extractor.
56
46
ExpectedInputType () reflect.Type
57
-
58
- // Extract transforms the data source output into a concrete attribute that
59
- // is stored on the given endpoint.
47
+ // Extract transforms the raw data source output into a concrete structured
48
+ // attribute, stored on the given endpoint.
60
49
Extract (data any , ep Endpoint )
61
50
}
62
51
63
- var (
64
- // defaultDataSources is the system default data source registry.
65
- defaultDataSources = DataSourceRegistry {}
66
- )
52
+ var defaultDataSources = DataSourceRegistry {}
67
53
68
- // DataSourceRegistry stores named data sources and makes them
69
- // accessible to other subsystems in the inference gateway.
54
+ // DataSourceRegistry stores named data sources.
70
55
type DataSourceRegistry struct {
71
56
sources sync.Map
72
57
}
73
58
74
- // Register adds a source to the registry.
59
+ // Register adds a new DataSource to the registry.
75
60
func (dsr * DataSourceRegistry ) Register (src DataSource ) error {
76
61
if src == nil {
77
62
return errors .New ("unable to register a nil data source" )
78
63
}
79
-
80
- if _ , found := dsr .sources .Load (src .Name ()); found {
64
+ if _ , loaded := dsr .sources .LoadOrStore (src .Name (), src ); loaded {
81
65
return fmt .Errorf ("unable to register duplicate data source: %s" , src .Name ())
82
66
}
83
- dsr .sources .Store (src .Name (), src )
84
67
return nil
85
68
}
86
69
87
- // GetNamedSource returns the named data source, if found .
70
+ // GetNamedSource fetches a source by name .
88
71
func (dsr * DataSourceRegistry ) GetNamedSource (name string ) (DataSource , bool ) {
89
- if name == "" {
90
- return nil , false
91
- }
92
-
93
- if val , found := dsr .sources .Load (name ); found {
72
+ if val , ok := dsr .sources .Load (name ); ok {
94
73
if ds , ok := val .(DataSource ); ok {
95
74
return ds , true
96
- } // ignore type assertion failures and fall through
75
+ }
97
76
}
98
77
return nil , false
99
78
}
100
79
101
- // GetSources returns all sources registered.
80
+ // GetSources returns all registered sources .
102
81
func (dsr * DataSourceRegistry ) GetSources () []DataSource {
103
- sources := []DataSource {}
82
+ var result []DataSource
104
83
dsr .sources .Range (func (_ , val any ) bool {
105
84
if ds , ok := val .(DataSource ); ok {
106
- sources = append (sources , ds )
85
+ result = append (result , ds )
107
86
}
108
- return true // continue iteration
87
+ return true
109
88
})
110
- return sources
89
+ return result
111
90
}
112
91
113
- // RegisterSource adds the data source to the default registry.
92
+ // --- default registry accessors ---
93
+
114
94
func RegisterSource (src DataSource ) error {
115
95
return defaultDataSources .Register (src )
116
96
}
117
97
118
- // GetNamedSource returns the named source from the default registry,
119
- // if found.
120
98
func GetNamedSource (name string ) (DataSource , bool ) {
121
99
return defaultDataSources .GetNamedSource (name )
122
100
}
123
101
124
- // GetSources returns all sources in the default registry.
125
102
func GetSources () []DataSource {
126
103
return defaultDataSources .GetSources ()
127
104
}
128
105
129
106
// ValidateExtractorType checks if an extractor can handle
130
- // the collector's output.
131
- func ValidateExtractorType (collectorOutputType , extractorInputType reflect.Type ) error {
132
- if collectorOutputType == extractorInputType {
133
- return nil
134
- }
135
-
136
- // extractor accepts anything (i.e., interface{})
137
- if extractorInputType .Kind () == reflect .Interface && extractorInputType .NumMethod () == 0 {
138
- return nil
107
+ // the DataSource's output. It should be called by a DataSource
108
+ // when an extractor is added.
109
+ func ValidateExtractorType (collectorOutput , extractorInput reflect.Type ) error {
110
+ if collectorOutput == nil || extractorInput == nil {
111
+ return errors .New ("extractor input type or data source output type can't be nil" )
139
112
}
140
-
141
- // check if collector output implements extractor input interface
142
- if collectorOutputType . Implements (extractorInputType ) {
113
+ if collectorOutput == extractorInput ||
114
+ ( extractorInput . Kind () == reflect . Interface && extractorInput . NumMethod () == 0 ) ||
115
+ ( extractorInput . Kind () == reflect . Interface && collectorOutput . Implements (extractorInput ) ) {
143
116
return nil
144
117
}
145
-
146
- return fmt .Errorf ("extractor input type %v cannot handle collector output type %v" ,
147
- extractorInputType , collectorOutputType )
118
+ return fmt .Errorf ("extractor input type %v cannot handle data source output type %v" ,
119
+ extractorInput , collectorOutput )
148
120
}
0 commit comments