Skip to content

Commit 461af16

Browse files
1 parent 1d0d4f7 commit 461af16

File tree

19 files changed

+1124
-66
lines changed

19 files changed

+1124
-66
lines changed

agent/ipc/messagebus/respondent.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,14 +76,14 @@ func (bus *MessageBus) ProcessHealthRequest() {
7676
var msg []byte
7777

7878
defer func() {
79-
if bus.healthChannel.IsConnect() {
79+
if bus.healthChannel.IsChannelInitialized() {
8080
if err = bus.healthChannel.Close(); err != nil {
8181
bus.context.Log().Errorf("failed to close health channel: %v", err)
8282
}
8383
}
8484
}()
8585

86-
for !bus.healthChannel.IsConnect() {
86+
for !bus.healthChannel.IsDialSuccessful() {
8787
if err = bus.dialToCoreAgentChannel(message.GetWorkerHealthRequest, message.GetWorkerHealthChannel); err != nil {
8888
// This happens when worker started before core agent is
8989
// and when the amazon-ssm-agent is terminated milliseconds after the ssm-agent-worker has been started
@@ -140,14 +140,14 @@ func (bus *MessageBus) ProcessTerminationRequest() {
140140
var err error
141141
var msg []byte
142142
defer func() {
143-
if bus.terminationChannel.IsConnect() {
143+
if bus.terminationChannel.IsChannelInitialized() {
144144
if err = bus.terminationChannel.Close(); err != nil {
145145
bus.context.Log().Errorf("failed to close termination channel: %v", err)
146146
}
147147
}
148148
}()
149149

150-
for !bus.terminationChannel.IsConnect() {
150+
for !bus.terminationChannel.IsDialSuccessful() {
151151
if err = bus.dialToCoreAgentChannel(message.TerminateWorkerRequest, message.TerminationWorkerChannel); err != nil {
152152
// This happens when worker started before core agent is
153153
// and when the amazon-ssm-agent is terminated milliseconds after the ssm-agent-worker has been started

agent/ipc/messagebus/respondent_test.go

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ func TestMessageBusTestSuite(t *testing.T) {
7272
}
7373

7474
func (suite *MessageBusTestSuite) TestProcessTerminationRequest_Successful() {
75-
suite.mockTerminateChannel.On("IsConnect").Return(true).Twice()
75+
suite.mockTerminateChannel.On("IsChannelInitialized").Return(true).Once()
76+
suite.mockTerminateChannel.On("IsDialSuccessful").Return(true).Once()
7677
suite.mockTerminateChannel.On("Close").Return(nil).Once()
7778

7879
request := message.CreateTerminateWorkerRequest()
@@ -91,35 +92,36 @@ func (suite *MessageBusTestSuite) TestProcessTerminationRequest_Successful() {
9192

9293
func (suite *MessageBusTestSuite) TestProcessTerminationRequest_SuccessfulConnectionRetry() {
9394
// First try channel not connected but fails initialize
94-
suite.mockTerminateChannel.On("IsConnect").Return(false).Once()
95+
suite.mockTerminateChannel.On("IsDialSuccessful").Return(false).Once()
9596
suite.mockTerminateChannel.On("Initialize", mock.Anything).Return(fmt.Errorf("SomeErr")).Once()
9697
suite.mockTerminateChannel.On("Close").Return(nil).Once()
9798

9899
// Second try channel not connected but fails dial
99-
suite.mockTerminateChannel.On("IsConnect").Return(false).Once()
100-
suite.mockTerminateChannel.On("Initialize", mock.Anything).Return(nil).Once()
100+
suite.mockTerminateChannel.On("IsDialSuccessful").Return(false).Once()
101+
suite.mockTerminateChannel.On("Initialize", mock.Anything).Return(nil)
101102
suite.mockTerminateChannel.On("Dial", mock.Anything).Return(fmt.Errorf("SomeDialError")).Once()
102103
suite.mockTerminateChannel.On("Close").Return(nil).Once()
103104

104105
// Third try channel not connected but finally succeeds
105-
suite.mockTerminateChannel.On("IsConnect").Return(false).Once()
106-
suite.mockTerminateChannel.On("Initialize", mock.Anything).Return(nil).Once()
106+
suite.mockTerminateChannel.On("IsDialSuccessful").Return(false).Once()
107+
suite.mockTerminateChannel.On("Initialize", mock.Anything).Return(nil)
107108
suite.mockTerminateChannel.On("Dial", mock.Anything).Return(nil).Once()
108-
109-
// Fourth call to isConnect succeeds, fourth call is for defer where it will call close
110-
suite.mockTerminateChannel.On("IsConnect").Return(true).Twice()
111-
suite.mockTerminateChannel.On("Close").Return(nil).Once()
109+
suite.mockTerminateChannel.On("IsDialSuccessful").Return(true).Once()
112110

113111
request := message.CreateTerminateWorkerRequest()
114112
requestString, _ := jsonutil.Marshal(request)
115113
suite.mockTerminateChannel.On("Recv").Return([]byte(requestString), nil)
116114
suite.mockTerminateChannel.On("Send", mock.Anything).Return(nil)
117115

118-
suite.messageBus.ProcessTerminationRequest()
116+
// Fourth call to isConnect succeeds, fourth call is for defer where it will call close
117+
suite.mockTerminateChannel.On("IsChannelInitialized").Return(true).Once()
118+
suite.mockTerminateChannel.On("Close").Return(nil).Once()
119119

120-
suite.mockTerminateChannel.AssertExpectations(suite.T())
120+
suite.messageBus.ProcessTerminationRequest()
121121

122122
// Assert termination channel connected and that a termination message is sent
123123
suite.Assertions.Equal(true, <-suite.messageBus.GetTerminationChannelConnectedChan())
124124
suite.Assertions.Equal(true, <-suite.messageBus.GetTerminationRequestChan())
125+
126+
suite.mockTerminateChannel.AssertExpectations(suite.T())
125127
}

common/channel/channel.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package channel
1616

1717
import (
18+
"errors"
1819
"runtime"
1920
"runtime/debug"
2021
"time"
@@ -35,12 +36,17 @@ type IChannel interface {
3536
SetOption(name string, value interface{}) error
3637
Listen(addr string) error
3738
Dial(addr string) error
38-
IsConnect() bool
39+
IsChannelInitialized() bool
40+
IsDialSuccessful() bool
41+
IsListenSuccessful() bool
3942
}
4043

4144
var (
4245
newNamedPipeChannelRef = NewNamedPipeChannel
4346
isDefaultChannelPresentRef = utils.IsDefaultChannelPresent
47+
48+
ErrIPCChannelClosed = errors.New("channel is closed")
49+
ErrDialListenUnSuccessful = errors.New("IPC connection is not established")
4450
)
4551

4652
// GetChannelCreator returns function reference for channel creation based
@@ -55,11 +61,11 @@ func GetChannelCreator(log log.T, appConfig appconfig.SsmagentConfig, identity i
5561
// canUseNamedPipe checks whether named pipe can be used for IPC or not
5662
func canUseNamedPipe(log log.T, appConfig appconfig.SsmagentConfig, identity identity.IAgentIdentity) (useNamedPipe bool) {
5763
// named pipes '.Listen' halts randomly on windows 2012, disabling named pipes on windows and using file channel instead
58-
if runtime.GOOS == "windows" {
59-
log.Info("Not using named pipe on windows")
64+
// On few mac2.metal instances, socket creation is getting blocked. Hence, permanently falling back to File based IPC for Darwin.
65+
if runtime.GOOS == "windows" || runtime.GOOS == "darwin" {
66+
log.Infof("Not using named pipe on %v", runtime.GOOS)
6067
return false
6168
}
62-
6369
if appConfig.Agent.ForceFileIPC {
6470
log.Info("Not using named pipe as force file IPC is set")
6571
return false
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"). You may not
4+
// use this file except in compliance with the License. A copy of the
5+
// License is located at
6+
//
7+
// http://aws.amazon.com/apache2.0/
8+
//
9+
// or in the "license" file accompanying this file. This file is distributed
10+
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
11+
// either express or implied. See the License for the specific language governing
12+
// permissions and limitations under the License.
13+
//
14+
//go:build darwin
15+
// +build darwin
16+
17+
// Package channel captures IPC implementation.
18+
package channel
19+
20+
import (
21+
"testing"
22+
23+
"github.com/aws/amazon-ssm-agent/agent/appconfig"
24+
"github.com/aws/amazon-ssm-agent/agent/log"
25+
logmocks "github.com/aws/amazon-ssm-agent/agent/mocks/log"
26+
"github.com/aws/amazon-ssm-agent/common/channel/mocks"
27+
"github.com/aws/amazon-ssm-agent/common/identity"
28+
identityMocks "github.com/aws/amazon-ssm-agent/common/identity/mocks"
29+
"github.com/stretchr/testify/assert"
30+
"github.com/stretchr/testify/suite"
31+
)
32+
33+
type IChannelTestSuite struct {
34+
suite.Suite
35+
log log.T
36+
identity identity.IAgentIdentity
37+
appconfig appconfig.SsmagentConfig
38+
}
39+
40+
// TestChannelSuite executes test suite
41+
func TestChannelSuite(t *testing.T) {
42+
suite.Run(t, new(IChannelTestSuite))
43+
}
44+
45+
// SetupTest initializes Setup
46+
func (suite *IChannelTestSuite) SetupTest() {
47+
suite.log = logmocks.NewMockLog()
48+
suite.identity = identityMocks.NewDefaultMockAgentIdentity()
49+
suite.appconfig = appconfig.DefaultConfig()
50+
}
51+
52+
func (suite *IChannelTestSuite) TestIPCSelection() {
53+
newNamedPipeChannelRef = func(log log.T, identity identity.IAgentIdentity) IChannel {
54+
return &mocks.IChannel{}
55+
}
56+
output := canUseNamedPipe(suite.log, suite.appconfig, suite.identity)
57+
assert.False(suite.T(), output)
58+
}

common/channel/channel_unix_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
// either express or implied. See the License for the specific language governing
1212
// permissions and limitations under the License.
1313
//
14-
//go:build darwin || freebsd || linux || netbsd || openbsd
15-
// +build darwin freebsd linux netbsd openbsd
14+
//go:build freebsd || linux || netbsd || openbsd
15+
// +build freebsd linux netbsd openbsd
1616

1717
// Package channel captures IPC implementation.
1818
package channel

common/channel/filechannel.go

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,13 @@ import (
2424
"github.com/aws/amazon-ssm-agent/common/message"
2525
)
2626

27+
var (
28+
getSurveyInstance = commProtocol.GetSurveyInstance
29+
getRespondentInstance = commProtocol.GetRespondentInstance
30+
)
31+
2732
// NewFileChannel creates an new instance of FileChannel which internally uses file watcher based ipc channel
33+
// This channel does not have multi-threading support. Currently, the invocations happen only in one go-routine
2834
func NewFileChannel(log log.T, identity identity.IAgentIdentity) IChannel {
2935
return &fileChannel{
3036
log: log,
@@ -38,25 +44,35 @@ type fileChannel struct {
3844
log log.T
3945
identity identity.IAgentIdentity
4046
isFileChannelInitialized bool
47+
isDialSuccessful bool
48+
isListenSuccessful bool
4149
msgProtocol utils.IFileChannelCommProtocol
4250
}
4351

4452
// Initialize initializes survey properties
4553
func (fc *fileChannel) Initialize(socketType utils.SocketType) error {
4654
fc.log.Info("using file channel for IPC")
4755
if socketType == utils.Surveyor {
48-
fc.msgProtocol = commProtocol.GetSurveyInstance(fc.log, fc.identity)
56+
fc.msgProtocol = getSurveyInstance(fc.log, fc.identity)
4957
} else if socketType == utils.Respondent {
50-
fc.msgProtocol = commProtocol.GetRespondentInstance(fc.log, fc.identity)
58+
fc.msgProtocol = getRespondentInstance(fc.log, fc.identity)
5159
} else {
5260
return fmt.Errorf("unsupported socket type")
5361
}
5462
fc.isFileChannelInitialized = true
63+
fc.isDialSuccessful = false
64+
fc.isListenSuccessful = false
5565
return nil
5666
}
5767

5868
// Send puts the message on the outbound send queue.
5969
func (fc *fileChannel) Send(message *message.Message) error {
70+
if !fc.IsChannelInitialized() {
71+
return ErrIPCChannelClosed
72+
}
73+
if !(fc.IsListenSuccessful() || fc.IsDialSuccessful()) {
74+
return ErrDialListenUnSuccessful
75+
}
6076
return fc.msgProtocol.Send(message)
6177
}
6278

@@ -69,6 +85,12 @@ func (fc *fileChannel) Close() error {
6985
// Recv gets the message from the IPC file channel created
7086
// message is returned whenever the worker creates a new file
7187
func (fc *fileChannel) Recv() ([]byte, error) {
88+
if !fc.IsChannelInitialized() {
89+
return nil, ErrIPCChannelClosed
90+
}
91+
if !(fc.IsListenSuccessful() || fc.IsDialSuccessful()) {
92+
return nil, ErrDialListenUnSuccessful
93+
}
7294
return fc.msgProtocol.Recv()
7395
}
7496

@@ -79,14 +101,37 @@ func (fc *fileChannel) SetOption(name string, value interface{}) (err error) {
79101

80102
// Listen creates a new channel in the main worker side
81103
func (fc *fileChannel) Listen(addr string) error {
82-
return fc.msgProtocol.Listen(addr)
104+
err := fc.msgProtocol.Listen(addr)
105+
if err != nil {
106+
return err
107+
}
108+
fc.isListenSuccessful = true
109+
return nil
83110
}
84111

85112
// Dial creates a new channel in the worker side
86113
func (fc *fileChannel) Dial(addr string) error {
87-
return fc.msgProtocol.Dial(addr)
114+
err := fc.msgProtocol.Dial(addr)
115+
if err != nil {
116+
return err
117+
}
118+
fc.isDialSuccessful = true
119+
return nil
88120
}
89121

90-
func (fc *fileChannel) IsConnect() bool {
122+
// IsChannelInitialized returns true if channel initialization is successful.
123+
func (fc *fileChannel) IsChannelInitialized() bool {
91124
return fc.isFileChannelInitialized
92125
}
126+
127+
// IsDialSuccessful returns true if Dial() successfully connects to IPC channels.
128+
// In Dial(), we connect to the IPC with address being the parameter.
129+
func (fc *fileChannel) IsDialSuccessful() bool {
130+
return fc.isDialSuccessful
131+
}
132+
133+
// IsListenSuccessful returns true if Listen() successfully creates IPC channels.
134+
// In Listen(), we will create named pipes on Windows and sockets on Linux/Darwin for IPC.
135+
func (fc *fileChannel) IsListenSuccessful() bool {
136+
return fc.isListenSuccessful
137+
}

0 commit comments

Comments
 (0)