Skip to content

Commit b4269e0

Browse files
committed
fix(acarshub): improve connectivity
1 parent 4d91dc4 commit b4269e0

File tree

2 files changed

+98
-81
lines changed

2 files changed

+98
-81
lines changed

acarshub.go

Lines changed: 95 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"io"
77
"net"
88
"strings"
9+
"time"
910

1011
log "github.com/sirupsen/logrus"
1112
)
@@ -15,18 +16,21 @@ func ReadACARSHubACARSMessages() {
1516
return
1617
}
1718
address := fmt.Sprintf("%s:%d", config.ACARSHubHost, config.ACARSHubPort)
18-
log.Debugf("connecting to %s acars json port", address)
19-
s, err := net.Dial("tcp", address)
20-
if err != nil {
21-
log.Fatalf("error connecting to acars json: %v", err)
22-
}
23-
defer s.Close()
24-
log.Info("connected to acarshub acars json port successfully")
25-
r := io.Reader(s)
26-
log.Debug("handling acars json messages")
27-
// Input errors will return from the function, so restart it as necessary
2819
for {
20+
log.Debugf("connecting to %s acars json port", address)
21+
s, err := net.Dial("tcp", address)
22+
if err != nil {
23+
log.Errorf("error connecting to acars json: %v", err)
24+
time.Sleep(time.Second * 1)
25+
continue
26+
}
27+
log.Info("connected to acarshub acars json port successfully")
28+
r := io.Reader(s)
29+
log.Debug("handling acars json messages")
2930
HandleACARSJSONMessages(&r)
31+
log.Warn("acars handler exited, reconnecting")
32+
s.Close()
33+
time.Sleep(time.Second * 1)
3034
}
3135
}
3236

@@ -35,18 +39,21 @@ func ReadACARSHubVDLM2Messages() {
3539
return
3640
}
3741
address := fmt.Sprintf("%s:%d", config.ACARSHubVDLM2Host, config.ACARSHubVDLM2Port)
38-
log.Debugf("connecting to %s vdlm2 json port", address)
39-
s, err := net.Dial("tcp", address)
40-
if err != nil {
41-
log.Fatalf("error connecting to vdlm2 json: %v", err)
42-
}
43-
defer s.Close()
44-
log.Info("connected to acarshub vdlm2 json port successfully")
45-
r := io.Reader(s)
46-
log.Debug("handling vdlm2 json messages")
47-
// Input errors will return from the function, so restart it as necessary
4842
for {
43+
log.Debugf("connecting to %s vdlm2 json port", address)
44+
s, err := net.Dial("tcp", address)
45+
if err != nil {
46+
log.Errorf("error connecting to vdlm2 json: %v", err)
47+
time.Sleep(time.Second * 1)
48+
continue
49+
}
50+
log.Info("connected to acarshub vdlm2 json port successfully")
51+
r := io.Reader(s)
52+
log.Debug("handling vdlm2 json messages")
4953
HandleVDLM2JSONMessages(&r)
54+
log.Warn("vdlm2 handler exited, reconnecting")
55+
s.Close()
56+
time.Sleep(time.Second * 1)
5057
}
5158
}
5259

@@ -56,77 +63,86 @@ func SubscribeToACARSHub() {
5663
go ReadACARSHubVDLM2Messages()
5764
}
5865

59-
// Reads messages from the ACARSHub connection and annotates, then sends
66+
// Reads messages from the ACARSHub connection and annotates, then sends to
67+
// receivers. Called again if returned from.
6068
func HandleACARSJSONMessages(r *io.Reader) {
61-
readJson := json.NewDecoder(*r)
62-
annotations := map[string]any{}
63-
var next ACARSMessage
64-
if err := readJson.Decode(&next); err != nil {
65-
log.Fatalf("error decoding acars message: %s", err)
66-
}
67-
log.Info("new acars message received")
68-
if (next == ACARSMessage{}) {
69-
log.Errorf("json message did not match expected structure, we got: %+v", next)
70-
return
71-
} else {
72-
log.Debugf("new acars message content: %+v", next)
73-
ok, filters := ACARSCriteriaFilter{}.Filter(next)
74-
if !ok {
75-
log.Infof("message was filtered out by %s", strings.Join(filters, ","))
69+
for {
70+
readJson := json.NewDecoder(*r)
71+
annotations := map[string]any{}
72+
var next ACARSMessage
73+
if err := readJson.Decode(&next); err != nil {
74+
// Might have connection issues, exit to reconnect
75+
log.Errorf("error decoding acars message: %s", err)
7676
return
7777
}
78-
// Annotate the message via all enabled annotators
79-
for _, h := range enabledACARSAnnotators {
80-
log.Debugf("sending event to annotator %s: %+v", h.Name(), next)
81-
result := h.AnnotateACARSMessage(next)
82-
if result != nil {
83-
result = h.SelectFields(result)
84-
annotations = MergeMaps(result, annotations)
78+
log.Info("new acars message received")
79+
if (next == ACARSMessage{}) {
80+
log.Errorf("json message did not match expected structure, we got: %+v", next)
81+
continue
82+
} else {
83+
log.Debugf("new acars message content: %+v", next)
84+
ok, filters := ACARSCriteriaFilter{}.Filter(next)
85+
if !ok {
86+
log.Infof("message was filtered out by %s", strings.Join(filters, ","))
87+
continue
88+
}
89+
// Annotate the message via all enabled annotators
90+
for _, h := range enabledACARSAnnotators {
91+
log.Debugf("sending event to annotator %s: %+v", h.Name(), next)
92+
result := h.AnnotateACARSMessage(next)
93+
if result != nil {
94+
result = h.SelectFields(result)
95+
annotations = MergeMaps(result, annotations)
96+
}
8597
}
8698
}
87-
}
88-
for _, r := range enabledReceivers {
89-
log.Debugf("sending acars event to reciever %s: %+v", r.Name(), annotations)
90-
err := r.SubmitACARSAnnotations(annotations)
91-
if err != nil {
92-
log.Errorf("error submitting to %s, err: %v", r.Name(), err)
99+
for _, r := range enabledReceivers {
100+
log.Debugf("sending acars event to reciever %s: %+v", r.Name(), annotations)
101+
err := r.SubmitACARSAnnotations(annotations)
102+
if err != nil {
103+
log.Errorf("error submitting to %s, err: %v", r.Name(), err)
104+
}
93105
}
94106
}
95107
}
96108

97109
// Reads messages from the ACARSHub connection and annotates, then sends
98110
func HandleVDLM2JSONMessages(r *io.Reader) {
99-
readJson := json.NewDecoder(*r)
100-
annotations := map[string]any{}
101-
var next VDLM2Message
102-
// Decode consumes the buffer, so we use a second decoder
103-
if err := readJson.Decode(&next); err != nil {
104-
log.Fatalf("error decoding vdlm2 message: %s", err)
105-
}
106-
log.Info("new vdlm2 message received")
107-
if (next == VDLM2Message{}) {
108-
log.Errorf("json message did not match expected structure, we got: %+v", next)
109-
return
110-
}
111-
log.Debugf("new vdlm2 message content: %+v", next)
112-
ok, filters := VDLM2CriteriaFilter{}.Filter(next)
113-
if !ok {
114-
log.Infof("message was filtered out by %s", strings.Join(filters, ","))
115-
return
116-
} // Annotate the message via all enabled VDLM2 annotators
117-
for _, h := range enabledVDLM2Annotators {
118-
log.Debugf("sending event to annotator %s: %+v", h.Name(), next)
119-
result := h.AnnotateVDLM2Message(next)
120-
if result != nil {
121-
result = h.SelectFields(result)
122-
annotations = MergeMaps(result, annotations)
111+
for {
112+
readJson := json.NewDecoder(*r)
113+
annotations := map[string]any{}
114+
var next VDLM2Message
115+
// Decode consumes the buffer, so we use a second decoder
116+
if err := readJson.Decode(&next); err != nil {
117+
// Might have connection issues, exit to reconnect
118+
log.Errorf("error decoding vdlm2 message: %s", err)
119+
return
123120
}
124-
}
125-
for _, r := range enabledReceivers {
126-
log.Debugf("sending vdlm2 event to reciever %s: %+v", r.Name(), annotations)
127-
err := r.SubmitACARSAnnotations(annotations)
128-
if err != nil {
129-
log.Errorf("error submitting to %s, err: %v", r.Name(), err)
121+
log.Info("new vdlm2 message received")
122+
if (next == VDLM2Message{}) {
123+
log.Errorf("json message did not match expected structure, we got: %+v", next)
124+
continue
125+
}
126+
log.Debugf("new vdlm2 message content: %+v", next)
127+
ok, filters := VDLM2CriteriaFilter{}.Filter(next)
128+
if !ok {
129+
log.Infof("message was filtered out by %s", strings.Join(filters, ","))
130+
continue
131+
} // Annotate the message via all enabled VDLM2 annotators
132+
for _, h := range enabledVDLM2Annotators {
133+
log.Debugf("sending event to annotator %s: %+v", h.Name(), next)
134+
result := h.AnnotateVDLM2Message(next)
135+
if result != nil {
136+
result = h.SelectFields(result)
137+
annotations = MergeMaps(result, annotations)
138+
}
139+
}
140+
for _, r := range enabledReceivers {
141+
log.Debugf("sending vdlm2 event to reciever %s: %+v", r.Name(), annotations)
142+
err := r.SubmitACARSAnnotations(annotations)
143+
if err != nil {
144+
log.Errorf("error submitting to %s, err: %v", r.Name(), err)
145+
}
130146
}
131147
}
132148
}

filter_ollama.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,13 @@ func OllamaFilter(m string) bool {
5252
}
5353
url, err := url.Parse(config.OllamaURL)
5454
if err != nil {
55-
log.Fatalf("Ollama url could not be parsed: %s", err)
55+
log.Errorf("Ollama url could not be parsed: %s", err)
5656
return true
5757
}
5858
client := api.NewClient(url, &http.Client{})
5959
if err != nil {
60-
log.Fatalf("error initializing Ollama: %s", err)
60+
log.Errorf("error initializing Ollama: %s", err)
61+
return true
6162
}
6263

6364
if config.OllamaSystemPrompt != "" {

0 commit comments

Comments
 (0)