Skip to content

Commit e57fd61

Browse files
authored
Merge pull request #2 from ifwe/add-red53-agent
Add red53_agent
2 parents 4b9c899 + 2d219bb commit e57fd61

File tree

1 file changed

+205
-0
lines changed

1 file changed

+205
-0
lines changed

red53_agent.go

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
package main
2+
3+
// Originally adapted from https://github.com/awslabs/service-discovery-ecs-dns
4+
// This service is meant to run as a singleton instance in concert with redecs_agent
5+
// running on AWS ECS instances. This service will check Redis regularly for active
6+
// service updates and rewrite the A records for services to only contain recently
7+
// active ip addresses.
8+
9+
import (
10+
"errors"
11+
"fmt"
12+
log "github.com/Sirupsen/logrus"
13+
"github.com/aws/aws-sdk-go/aws"
14+
"github.com/aws/aws-sdk-go/aws/session"
15+
"github.com/aws/aws-sdk-go/service/route53"
16+
"gopkg.in/redis.v5"
17+
"os"
18+
"strings"
19+
"time"
20+
)
21+
22+
const checkInterval = 5 * time.Minute // how often to check Redis
23+
const defaultTTL = 60 // seconds
24+
const fetchLast = 300 // seconds
25+
26+
var DNSName = "servicediscovery.local"
27+
28+
func logErrorAndFail(err error) {
29+
if err != nil {
30+
// logrus calls os.exit(1)
31+
log.Fatal(err)
32+
}
33+
}
34+
35+
func logErrorNoFatal(err error) {
36+
if err != nil {
37+
log.Error(err)
38+
}
39+
}
40+
41+
type config struct {
42+
HostedZoneId string
43+
RedisHost string
44+
}
45+
46+
var configuration config
47+
48+
func getDNSHostedZoneId() (string, error) {
49+
r53 := route53.New(session.New())
50+
params := &route53.ListHostedZonesByNameInput{
51+
DNSName: aws.String(DNSName),
52+
}
53+
54+
zones, err := r53.ListHostedZonesByName(params)
55+
56+
if err == nil && len(zones.HostedZones) > 0 {
57+
return aws.StringValue(zones.HostedZones[0].Id), nil
58+
}
59+
60+
return "", err
61+
}
62+
63+
// Modify (or create) the A record for this serviceName, adding the private IP of the host.
64+
func modifyDNSRecord(serviceName string, ips []string) error {
65+
var err error
66+
r53 := route53.New(session.New())
67+
68+
aValues := make([]*route53.ResourceRecord, 0)
69+
// Put all the IPs in one A record
70+
for _, ip := range ips {
71+
aValues = append(aValues, &route53.ResourceRecord{Value: aws.String(ip)})
72+
}
73+
74+
// This API call creates a new DNS record for this service
75+
params := &route53.ChangeResourceRecordSetsInput{
76+
ChangeBatch: &route53.ChangeBatch{
77+
Changes: []*route53.Change{
78+
{
79+
Action: aws.String(route53.ChangeActionUpsert),
80+
ResourceRecordSet: &route53.ResourceRecordSet{
81+
Name: aws.String(serviceName + "." + DNSName),
82+
// It creates an A record with the name of the service
83+
Type: aws.String(route53.RRTypeA),
84+
ResourceRecords: aValues,
85+
TTL: aws.Int64(defaultTTL),
86+
},
87+
},
88+
},
89+
Comment: aws.String("Service Discovery Created Record"),
90+
},
91+
HostedZoneId: aws.String(configuration.HostedZoneId),
92+
}
93+
_, err = r53.ChangeResourceRecordSets(params)
94+
logErrorNoFatal(err)
95+
log.Debug(fmt.Sprintf("Record %s.%s updated with %d records.", serviceName, DNSName, len(ips)))
96+
return err
97+
}
98+
99+
var redisClient *redis.Client
100+
101+
func processServicePings(servicePings []string) {
102+
var serviceMap = make(map[string][]string)
103+
104+
for _, servicePing := range servicePings {
105+
pingChunks := strings.Split(servicePing, "_")
106+
serviceName := pingChunks[0]
107+
serviceIp := pingChunks[1]
108+
serviceMap[serviceName] = append(serviceMap[serviceName], serviceIp)
109+
}
110+
111+
for serviceName, serviceIps := range serviceMap {
112+
modifyDNSRecord(serviceName, serviceIps)
113+
}
114+
}
115+
116+
func fetchActiveServices() {
117+
var servicePings []string
118+
var sum int
119+
var err error
120+
log.Debug("Fetching active services ...")
121+
122+
// Fetch last "fetchLast" seconds of service pings
123+
now := time.Now().Unix()
124+
epoch := now - fetchLast // seconds
125+
126+
// Try several times with exponential backoff.
127+
sum = 1
128+
for {
129+
servicePings, err = redisClient.ZRange("redecs:service_pings", epoch, now).Result()
130+
131+
if err == nil {
132+
break
133+
}
134+
if sum > 8 {
135+
// Bail out if this is failing.
136+
logErrorAndFail(err)
137+
}
138+
time.Sleep(time.Duration(sum) * time.Second)
139+
sum += 2
140+
}
141+
142+
processServicePings(servicePings)
143+
144+
log.Debug("Done fetching active services.")
145+
}
146+
147+
func main() {
148+
var err error
149+
var sum int
150+
var zoneId string
151+
152+
if len(os.Args) != 3 {
153+
err = errors.New(fmt.Sprintf("Usage: %s [domain] [Redis host]\n", os.Args[0]))
154+
logErrorAndFail(err)
155+
}
156+
157+
DNSName = os.Args[1]
158+
configuration.RedisHost = os.Args[2]
159+
160+
sum = 1
161+
for {
162+
// We try to get the Hosted Zone Id using exponential backoff
163+
zoneId, err = getDNSHostedZoneId()
164+
if err == nil {
165+
break
166+
}
167+
if sum > 8 {
168+
logErrorAndFail(err)
169+
}
170+
time.Sleep(time.Duration(sum) * time.Second)
171+
sum += 2
172+
}
173+
configuration.HostedZoneId = zoneId
174+
175+
sum = 1
176+
for {
177+
// We try to get the Redis connection using exponential backoff
178+
redisClient = redis.NewClient(&redis.Options{
179+
Addr: configuration.RedisHost + ":6379",
180+
Password: "", // none
181+
DB: 0, // default DB
182+
MaxRetries: 3,
183+
})
184+
// Check the connection.
185+
err := redisClient.Ping().Err()
186+
187+
if err == nil {
188+
break
189+
}
190+
if sum > 8 {
191+
logErrorAndFail(err)
192+
}
193+
time.Sleep(time.Duration(sum) * time.Second)
194+
sum += 2
195+
}
196+
197+
// check regularly, specified by checkInterval
198+
ticker := time.NewTicker(checkInterval)
199+
200+
for {
201+
// TODO: Add Redis Pub/Sub
202+
fetchActiveServices()
203+
<-ticker.C
204+
}
205+
}

0 commit comments

Comments
 (0)