Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions packages/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ const (
operationCallGetPamSessionKey = "CallGetPamSessionKey"
operationCallUploadPamSessionLog = "CallUploadPamSessionLog"
operationCallPAMSessionTermination = "CallPAMSessionTermination"
operationCallOrgRelayHeartBeat = "CallOrgRelayHeartBeat"
operationCallInstanceRelayHeartBeat = "CallInstanceRelayHeartBeat"
)

func CallGetEncryptedWorkspaceKey(httpClient *resty.Client, request GetEncryptedWorkspaceKeyRequest) (GetEncryptedWorkspaceKeyResponse, error) {
Expand Down Expand Up @@ -679,6 +681,42 @@ func CallGatewayHeartBeatV2(httpClient *resty.Client) error {
return nil
}

func CallOrgRelayHeartBeat(httpClient *resty.Client, request RelayHeartbeatRequest) error {
response, err := httpClient.
R().
SetHeader("User-Agent", USER_AGENT).
SetBody(request).
Post(fmt.Sprintf("%v/v1/relays/heartbeat-org-relay", config.INFISICAL_URL))

if err != nil {
return NewGenericRequestError(operationCallOrgRelayHeartBeat, err)
}

if response.IsError() {
return NewAPIErrorWithResponse(operationCallOrgRelayHeartBeat, response, nil)
}

return nil
}

func CallInstanceRelayHeartBeat(httpClient *resty.Client, request RelayHeartbeatRequest) error {
response, err := httpClient.
R().
SetHeader("User-Agent", USER_AGENT).
SetBody(request).
Post(fmt.Sprintf("%v/v1/relays/heartbeat-instance-relay", config.INFISICAL_URL))

if err != nil {
return NewGenericRequestError(operationCallInstanceRelayHeartBeat, err)
}

if response.IsError() {
return NewAPIErrorWithResponse(operationCallInstanceRelayHeartBeat, response, nil)
}

return nil
}

func CallBootstrapInstance(httpClient *resty.Client, request BootstrapInstanceRequest) (BootstrapInstanceResponse, error) {
var resBody BootstrapInstanceResponse
response, err := httpClient.
Expand Down
4 changes: 4 additions & 0 deletions packages/api/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,3 +783,7 @@ type UploadSessionLogEntry struct {
type UploadPAMSessionLogsRequest struct {
Logs []UploadSessionLogEntry `json:"logs"`
}

type RelayHeartbeatRequest struct {
Name string `json:"name"`
}
120 changes: 101 additions & 19 deletions packages/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,87 @@ func (r *Relay) SetToken(token string) {
r.httpClient.SetAuthToken(token)
}

func (r *Relay) registerHeartBeat(ctx context.Context, errCh chan error) {
sendHeartbeat := func() error {
var err error
heartbeatBody := api.RelayHeartbeatRequest{Name: r.config.RelayName}
if r.config.Type == "instance" {
err = api.CallInstanceRelayHeartBeat(r.httpClient, heartbeatBody)
} else {
err = api.CallOrgRelayHeartBeat(r.httpClient, heartbeatBody)
}

if err != nil {
log.Warn().Msgf("Heartbeat failed: %v", err)
select {
case errCh <- err:
default:
log.Warn().Msg("Error channel full, skipping heartbeat error report")
}
return err
} else {
log.Info().Msg("Relay is reachable by Infisical")
return nil
}
}

go func() {
defer func() {
log.Debug().Msg("Heartbeat goroutine exiting")
}()

// Phase 1: Keep trying every 10 seconds until first success
func() {
retryTicker := time.NewTicker(10 * time.Second)
defer retryTicker.Stop()

for {
select {
case <-ctx.Done():
return
case <-retryTicker.C:
if err := sendHeartbeat(); err == nil {
// First success! Exit retry phase
return
}
}
}
}()

// Phase 2: Regular heartbeat every 30 minutes
regularTicker := time.NewTicker(30 * time.Minute)
defer regularTicker.Stop()

for {
select {
case <-ctx.Done():
return
case <-regularTicker.C:
sendHeartbeat()
}
}
}()
}

func (r *Relay) Start(ctx context.Context) error {
if err := r.registerRelay(); err != nil {
return fmt.Errorf("failed to register relay: %v", err)
}

errCh := make(chan error, 1)
r.registerHeartBeat(ctx, errCh)

go func() {
for {
select {
case <-ctx.Done():
return
case err := <-errCh:
log.Warn().Msgf("Heartbeat error received: %v", err)
}
}
}()

// Setup SSH server
if err := r.setupSSHServer(); err != nil {
return fmt.Errorf("failed to setup SSH server: %v", err)
Expand Down Expand Up @@ -432,34 +508,40 @@ func (r *Relay) handleTLSClient(conn net.Conn) {
}

func (r *Relay) handleClient(tlsConn *tls.Conn) {
var gatewayId string
state := tlsConn.ConnectionState()
if len(state.PeerCertificates) == 0 {
log.Warn().Msg("No peer certificates found")
return
}

cert := state.PeerCertificates[0]
gatewayId := cert.Subject.CommonName

if gatewayId == "00000000-0000-0000-0000-000000000000" {
log.Debug().Msg("Heartbeat check successful, closing connection.")
return
}

var gatewayName string
var orgDetails string
state := tlsConn.ConnectionState()

if len(state.PeerCertificates) > 0 {
cert := state.PeerCertificates[0]
gatewayId = cert.Subject.CommonName
if len(cert.Subject.Organization) > 0 {
orgDetails = cert.Subject.Organization[0]
}

for _, ext := range cert.Extensions {
if ext.Id.String() == RELAY_CONNECTING_GATEWAY_INFO_OID {
var connectingGatewayInfo ConnectingGatewayInfo
if err := json.Unmarshal(ext.Value, &connectingGatewayInfo); err != nil {
return
}

gatewayName = connectingGatewayInfo.Name
for _, ext := range cert.Extensions {
if ext.Id.String() == RELAY_CONNECTING_GATEWAY_INFO_OID {
var connectingGatewayInfo ConnectingGatewayInfo
if err := json.Unmarshal(ext.Value, &connectingGatewayInfo); err != nil {
log.Warn().Msgf("Failed to unmarshal connecting gateway info for %s: %v", gatewayId, err)
return
}
gatewayName = connectingGatewayInfo.Name
}

log.Info().Msgf("Client connected with certificate: %s (%s)", gatewayName, gatewayId)

} else {
log.Warn().Msg("No peer certificates found")
return
}

log.Info().Msgf("Client connected with certificate: %s (%s)", gatewayName, gatewayId)

// Get the SSH connection for this gateway
r.mu.RLock()
conn, exists := r.tunnels[gatewayId]
Expand Down