Skip to content

Commit 45c3348

Browse files
committed
Allow provided location DB
Fixes #947
1 parent 85b4629 commit 45c3348

File tree

7 files changed

+88
-55
lines changed

7 files changed

+88
-55
lines changed

README.md

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -446,11 +446,7 @@ service name of `dstPort` port and `protocol` protocol. Unrecognized ports are i
446446
> Note: optionally supports custom network services resolution by defining configuration parameters
447447
> `servicesFile` and `protocolsFile` with paths to custom services/protocols files respectively
448448

449-
The rule `add_location` generates new fields with the geo-location information retrieved
450-
from DB [ip2location](https://lite.ip2location.com/) based on `dstIP` IP.
451-
All the geo-location fields will be named by appending `output` value
452-
(`dstLocation` in the example above) to their names in the [ip2location](https://lite.ip2location.com/ DB
453-
(e.g., `CountryName`, `CountryLongName`, `RegionName`, `CityName` , `Longitude` and `Latitude`)
449+
The rule `add_location` generates new fields with the geo-location information. It uses the [IP2Location LITE database](https://lite.ip2location.com/) in that purpose. All the geo-location fields will be named by prefixing the `output` value to their names in the IP2Location DB (e.g., `CountryName`, `CountryLongName`, `RegionName`, `CityName` , `Longitude` and `Latitude`).
454450

455451
The rule `add_kubernetes` generates new fields with kubernetes information by
456452
matching the `ipField` value (`srcIP` in the example above) with kubernetes `nodes`, `pods` and `services` IPs.
@@ -703,8 +699,6 @@ we can assume that it is the second step of the TCP handshake,
703699
the direction is from the server (source) to the client (destination) and we can swap them in the connection so the client will be the source and the server will be the destination.
704700
705701
706-
707-
708702
### Timebased TopK
709703
710704
It is sometimes desirable to return only a subset of records, such as those connections that use the most bandwidth.

docs/api.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,7 @@ Following is the supported API format for network transformations:
271271
add_location: Add location rule configuration
272272
input: entry input field
273273
output: entry output field
274+
file_path: path of the location DB file (zip archive), from ip2location.com (Lite DB9); leave unset to try downloading the file at startup
274275
add_subnet_label: Add subnet label rule configuration
275276
input: entry input field
276277
output: entry output field

pkg/api/transform_network.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ type NetworkTransformRule struct {
6767
KubernetesInfra *K8sInfraRule `yaml:"kubernetes_infra,omitempty" json:"kubernetes_infra,omitempty" doc:"Kubernetes infra rule configuration"`
6868
Kubernetes *K8sRule `yaml:"kubernetes,omitempty" json:"kubernetes,omitempty" doc:"Kubernetes rule configuration"`
6969
AddSubnet *NetworkAddSubnetRule `yaml:"add_subnet,omitempty" json:"add_subnet,omitempty" doc:"Add subnet rule configuration"`
70-
AddLocation *NetworkGenericRule `yaml:"add_location,omitempty" json:"add_location,omitempty" doc:"Add location rule configuration"`
70+
AddLocation *NetworkAddLocationRule `yaml:"add_location,omitempty" json:"add_location,omitempty" doc:"Add location rule configuration"`
7171
AddSubnetLabel *NetworkAddSubnetLabelRule `yaml:"add_subnet_label,omitempty" json:"add_subnet_label,omitempty" doc:"Add subnet label rule configuration"`
7272
AddService *NetworkAddServiceRule `yaml:"add_service,omitempty" json:"add_service,omitempty" doc:"Add service rule configuration"`
7373
DecodeTCPFlags *NetworkGenericRule `yaml:"decode_tcp_flags,omitempty" json:"decode_tcp_flags,omitempty" doc:"Decode bitwise TCP flags into a string"`
@@ -112,6 +112,12 @@ type NetworkAddSubnetRule struct {
112112
SubnetMask string `yaml:"subnet_mask,omitempty" json:"subnet_mask,omitempty" doc:"subnet mask field"`
113113
}
114114

115+
type NetworkAddLocationRule struct {
116+
Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry input field"`
117+
Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"`
118+
FilePath string `yaml:"file_path,omitempty" json:"file_path,omitempty" doc:"path of the location DB file (zip archive), from ip2location.com (Lite DB9); leave unset to try downloading the file at startup"`
119+
}
120+
115121
type NetworkAddSubnetLabelRule struct {
116122
Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry input field"`
117123
Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"`

pkg/pipeline/transform/location/location.go

Lines changed: 40 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -74,45 +74,52 @@ func init() {
7474
locationDBMutex = &sync.Mutex{}
7575
}
7676

77-
func InitLocationDB() error {
77+
func InitLocationDB(filepath string) error {
7878
locationDBMutex.Lock()
7979
defer locationDBMutex.Unlock()
8080

8181
if _, statErr := _osio.Stat(dbFileLocation); errors.Is(statErr, os.ErrNotExist) {
82-
log.Infof("Downloading location DB into local file %s ", dbFileLocation)
83-
out, createErr := _osio.Create(dbZIPFileLocation)
84-
if createErr != nil {
85-
return fmt.Errorf("failed os.Create %w ", createErr)
86-
}
82+
if filepath == "" {
83+
log.Infof("Downloading location DB into local file %s ", dbFileLocation)
84+
out, createErr := _osio.Create(dbZIPFileLocation)
85+
if createErr != nil {
86+
return fmt.Errorf("failed os.Create %w ", createErr)
87+
}
8788

88-
timeout := time.Minute
89-
tr := &http.Transport{IdleConnTimeout: timeout}
90-
client := &http.Client{Transport: tr, Timeout: timeout}
91-
resp, getErr := client.Get(_dbURL)
92-
if getErr != nil {
93-
return fmt.Errorf("failed http.Get %w ", getErr)
94-
}
89+
timeout := time.Minute
90+
tr := &http.Transport{IdleConnTimeout: timeout}
91+
client := &http.Client{Transport: tr, Timeout: timeout}
92+
resp, getErr := client.Get(_dbURL)
93+
if getErr != nil {
94+
return fmt.Errorf("failed http.Get %w ", getErr)
95+
}
9596

96-
log.Infof("Got response %s", resp.Status)
97+
log.Infof("Got response %s", resp.Status)
9798

98-
written, copyErr := io.Copy(out, resp.Body)
99-
if copyErr != nil {
100-
return fmt.Errorf("failed io.Copy %w ", copyErr)
101-
}
99+
written, copyErr := io.Copy(out, resp.Body)
100+
if copyErr != nil {
101+
return fmt.Errorf("failed io.Copy %w ", copyErr)
102+
}
102103

103-
log.Infof("Wrote %d bytes to %s", written, dbZIPFileLocation)
104+
log.Infof("Wrote %d bytes to %s", written, dbZIPFileLocation)
104105

105-
bodyCloseErr := resp.Body.Close()
106-
if bodyCloseErr != nil {
107-
return fmt.Errorf("failed resp.Body.Close %w ", bodyCloseErr)
108-
}
106+
bodyCloseErr := resp.Body.Close()
107+
if bodyCloseErr != nil {
108+
return fmt.Errorf("failed resp.Body.Close %w ", bodyCloseErr)
109+
}
110+
111+
outCloseErr := out.Close()
112+
if outCloseErr != nil {
113+
return fmt.Errorf("failed out.Close %w ", outCloseErr)
114+
}
109115

110-
outCloseErr := out.Close()
111-
if outCloseErr != nil {
112-
return fmt.Errorf("failed out.Close %w ", outCloseErr)
116+
filepath = dbZIPFileLocation
117+
log.Infof("Download completed successfully")
118+
} else {
119+
log.Infof("Using provided location DB: %s", filepath)
113120
}
114121

115-
unzipErr := unzip(dbZIPFileLocation, dbFileLocation)
122+
unzipErr := unzip(filepath, dbFileLocation)
116123
if unzipErr != nil {
117124
file, openErr := os.Open(dbFileLocation + "/" + dbFilename)
118125
if openErr == nil {
@@ -136,8 +143,8 @@ func InitLocationDB() error {
136143

137144
return fmt.Errorf("failed unzip %w ", unzipErr)
138145
}
139-
140-
log.Infof("Download completed successfully")
146+
} else {
147+
log.Infof("Location DB already exists in %s, using it", dbFileLocation)
141148
}
142149

143150
log.Debugf("Loading location DB")
@@ -150,6 +157,10 @@ func InitLocationDB() error {
150157
return nil
151158
}
152159

160+
func CleanupLocationDB() error {
161+
return os.RemoveAll(dbFileLocation)
162+
}
163+
153164
func GetLocation(ip string) (*Info, error) {
154165

155166
if locationDB == nil {

pkg/pipeline/transform/location/location_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func Test_InitLocationDB(t *testing.T) {
3535
// fail in os.Create
3636
_osio.Stat = func(_ string) (os.FileInfo, error) { return nil, os.ErrNotExist }
3737
_osio.Create = func(_ string) (*os.File, error) { return nil, fmt.Errorf("test") }
38-
err := InitLocationDB()
38+
err := InitLocationDB("")
3939
require.Contains(t, err.Error(), "os.Create")
4040
_osio.Stat = os.Stat
4141
_osio.Create = os.Create
@@ -44,7 +44,7 @@ func Test_InitLocationDB(t *testing.T) {
4444
_osio.Stat = func(_ string) (os.FileInfo, error) { return nil, os.ErrNotExist }
4545
_osio.Create = func(_ string) (*os.File, error) { return nil, nil }
4646
_dbURL = "test_fake"
47-
err = InitLocationDB()
47+
err = InitLocationDB("")
4848
require.Contains(t, err.Error(), "http.Get")
4949
_dbURL = dbURL
5050
_osio.Stat = os.Stat
@@ -57,7 +57,7 @@ func Test_InitLocationDB(t *testing.T) {
5757
_, _ = res.Write([]byte("test"))
5858
}))
5959
_dbURL = testServer.URL
60-
err = InitLocationDB()
60+
err = InitLocationDB("")
6161
require.Contains(t, err.Error(), "io.Copy")
6262
testServer.Close()
6363
_dbURL = dbURL
@@ -71,7 +71,7 @@ func Test_InitLocationDB(t *testing.T) {
7171
res.WriteHeader(http.StatusOK)
7272
}))
7373
_dbURL = testServer.URL
74-
err = InitLocationDB()
74+
err = InitLocationDB("")
7575
require.Contains(t, err.Error(), "io.Copy")
7676
testServer.Close()
7777
_dbURL = dbURL
@@ -85,7 +85,7 @@ func Test_InitLocationDB(t *testing.T) {
8585
res.WriteHeader(http.StatusOK)
8686
}))
8787
_dbURL = testServer.URL
88-
err = InitLocationDB()
88+
err = InitLocationDB("")
8989
require.Contains(t, err.Error(), "failed unzip")
9090
testServer.Close()
9191
_dbURL = dbURL
@@ -102,7 +102,7 @@ func Test_InitLocationDB(t *testing.T) {
102102
res.WriteHeader(http.StatusOK)
103103
}))
104104
_dbURL = testServer.URL
105-
err = InitLocationDB()
105+
err = InitLocationDB("")
106106
require.Error(t, err)
107107
testServer.Close()
108108
_dbURL = dbURL
@@ -111,7 +111,7 @@ func Test_InitLocationDB(t *testing.T) {
111111
// NOTE:: Downloading the DB is a long operation, about 30 seconds, and this delays the tests
112112
// TODO: Consider remove this test
113113
os.RemoveAll(dbFileLocation)
114-
initLocationDBErr := InitLocationDB()
114+
initLocationDBErr := InitLocationDB("")
115115
require.Nil(t, initLocationDBErr)
116116
}
117117

pkg/pipeline/transform/transform_network.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,6 @@ func (n *Network) Transform(inputEntry config.GenericMap) (config.GenericMap, bo
7070
outputEntry[rule.AddSubnet.Output] = ipv4Net.String()
7171
}
7272
case api.NetworkAddLocation:
73-
if rule.AddLocation == nil {
74-
log.Errorf("Missing add location configuration")
75-
continue
76-
}
7773
var locationInfo *location.Info
7874
locationInfo, err := location.GetLocation(util.ConvertToString(outputEntry[rule.AddLocation.Input]))
7975
if err != nil {
@@ -175,7 +171,7 @@ func (n *Network) applySubnetLabel(strIP string) string {
175171
//
176172
//nolint:cyclop
177173
func NewTransformNetwork(params config.StageParam, opMetrics *operational.Metrics) (Transformer, error) {
178-
var needToInitLocationDB = false
174+
var locationDBConfig *api.NetworkAddLocationRule
179175
var needToInitKubeData = false
180176
var needToInitNetworkServices = false
181177

@@ -186,7 +182,10 @@ func NewTransformNetwork(params config.StageParam, opMetrics *operational.Metric
186182
for _, rule := range jsonNetworkTransform.Rules {
187183
switch rule.Type {
188184
case api.NetworkAddLocation:
189-
needToInitLocationDB = true
185+
if rule.AddLocation == nil {
186+
return nil, fmt.Errorf("missing configuration for '%s' rule", api.NetworkAddLocation)
187+
}
188+
locationDBConfig = rule.AddLocation
190189
case api.NetworkAddKubernetes:
191190
needToInitKubeData = true
192191
case api.NetworkAddKubernetesInfra:
@@ -206,8 +205,8 @@ func NewTransformNetwork(params config.StageParam, opMetrics *operational.Metric
206205
}
207206
}
208207

209-
if needToInitLocationDB {
210-
err := location.InitLocationDB()
208+
if locationDBConfig != nil {
209+
err := location.InitLocationDB(locationDBConfig.FilePath)
211210
if err != nil {
212211
log.Debugf("location.InitLocationDB error: %v", err)
213212
}

pkg/pipeline/transform/transform_network_test.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func getMockNetworkTransformRules() api.NetworkTransformRules {
8282
},
8383
api.NetworkTransformRule{
8484
Type: "add_location",
85-
AddLocation: &api.NetworkGenericRule{
85+
AddLocation: &api.NetworkAddLocationRule{
8686
Input: "8888IP",
8787
Output: "8888IP_location",
8888
},
@@ -142,7 +142,29 @@ func Test_Transform(t *testing.T) {
142142
svcNames: getServicesDB(t),
143143
}
144144

145-
err := location.InitLocationDB()
145+
err := location.InitLocationDB("")
146+
require.NoError(t, err)
147+
148+
output, ok := networkTransform.Transform(entry)
149+
require.True(t, ok)
150+
require.Equal(t, expectedOutput, output)
151+
}
152+
153+
func Test_TransformProvidedLocationDB(t *testing.T) {
154+
entry := test.GetIngestMockEntry(false)
155+
rules := getMockNetworkTransformRules()
156+
expectedOutput := getExpectedOutput()
157+
err := location.CleanupLocationDB()
158+
require.NoError(t, err)
159+
160+
var networkTransform = Network{
161+
TransformNetwork: api.TransformNetwork{
162+
Rules: rules,
163+
},
164+
svcNames: getServicesDB(t),
165+
}
166+
167+
err = location.InitLocationDB("../../../contrib/location/location.db")
146168
require.NoError(t, err)
147169

148170
output, ok := networkTransform.Transform(entry)
@@ -166,7 +188,7 @@ func Test_TransformAddSubnetParseCIDRFailure(t *testing.T) {
166188
svcNames: getServicesDB(t),
167189
}
168190

169-
err := location.InitLocationDB()
191+
err := location.InitLocationDB("")
170192
require.NoError(t, err)
171193

172194
output, ok := networkTransform.Transform(entry)

0 commit comments

Comments
 (0)