Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
307 changes: 155 additions & 152 deletions pkg/server/store_ext_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,188 +16,191 @@ limitations under the License.
package server

import (
"context"
"errors"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strings"
sync "sync"
"syscall"
"time"

"github.com/linuxsuren/api-testing/pkg/util/home"

"github.com/linuxsuren/api-testing/pkg/downloader"
"github.com/linuxsuren/api-testing/pkg/logging"

fakeruntime "github.com/linuxsuren/go-fake-runtime"
"context"
"errors"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strings"
sync "sync"
"syscall"
"time"

"github.com/linuxsuren/api-testing/pkg/util/home"

"github.com/linuxsuren/api-testing/pkg/downloader"
"github.com/linuxsuren/api-testing/pkg/logging"

fakeruntime "github.com/linuxsuren/go-fake-runtime"
)

var (
serverLogger = logging.DefaultLogger(logging.LogLevelInfo).WithName("server")
serverLogger = logging.DefaultLogger(logging.LogLevelInfo).WithName("server")
)

type ExtManager interface {
Start(name, socket string) (err error)
StopAll() (err error)
WithDownloader(downloader.PlatformAwareOCIDownloader)
Start(name, socket string) (err error)
StopAll() (err error)
WithDownloader(downloader.PlatformAwareOCIDownloader)
}

type storeExtManager struct {
execer fakeruntime.Execer
ociDownloader downloader.PlatformAwareOCIDownloader
socketPrefix string
filesNeedToBeRemoved []string
extStatusMap map[string]bool
processs []fakeruntime.Process
processChan chan fakeruntime.Process
stopSingal chan struct{}
lock *sync.RWMutex
execer fakeruntime.Execer
ociDownloader downloader.PlatformAwareOCIDownloader
socketPrefix string
filesNeedToBeRemoved []string
extStatusMap map[string]bool
processs []fakeruntime.Process
processChan chan fakeruntime.Process
stopSingal chan struct{}
lock *sync.RWMutex
}

var ss *storeExtManager

func NewStoreExtManager(execer fakeruntime.Execer) ExtManager {
if ss == nil {
ss = &storeExtManager{
processChan: make(chan fakeruntime.Process),
stopSingal: make(chan struct{}, 1),
lock: &sync.RWMutex{},
}
ss.execer = execer
ss.socketPrefix = "unix://"
ss.extStatusMap = map[string]bool{}
ss.processCollect()
ss.WithDownloader(&nonDownloader{})
}
return ss
if ss == nil {
ss = &storeExtManager{
processChan: make(chan fakeruntime.Process),
stopSingal: make(chan struct{}, 1),
lock: &sync.RWMutex{},
}
ss.execer = execer
ss.socketPrefix = "unix://"
ss.extStatusMap = map[string]bool{}
ss.processCollect()
ss.WithDownloader(&nonDownloader{})
}
return ss
}

func NewStoreExtManagerInstance(execer fakeruntime.Execer) ExtManager {
ss = &storeExtManager{
processChan: make(chan fakeruntime.Process),
stopSingal: make(chan struct{}, 1),
lock: &sync.RWMutex{},
}
ss.execer = execer
ss.socketPrefix = "unix://"
ss.extStatusMap = map[string]bool{}
ss.processCollect()
ss.WithDownloader(&nonDownloader{})
return ss
ss = &storeExtManager{
processChan: make(chan fakeruntime.Process),
stopSingal: make(chan struct{}, 1),
lock: &sync.RWMutex{},
}
ss.execer = execer
ss.socketPrefix = "unix://"
ss.extStatusMap = map[string]bool{}
ss.processCollect()
ss.WithDownloader(&nonDownloader{})
return ss
}

func (s *storeExtManager) Start(name, socket string) (err error) {
if v, ok := s.extStatusMap[name]; ok && v {
return
}
if s.execer.OS() == "windows" {
name = name + ".exe"
}
targetDir := home.GetUserBinDir()
targetBinaryFile := filepath.Join(targetDir, name)

var binaryPath string
if _, err = os.Stat(targetBinaryFile); err == nil {
binaryPath = targetBinaryFile
} else {
serverLogger.Info("failed to find extension", "error", err.Error())

binaryPath, err = s.execer.LookPath(name)
if err != nil {
err = fmt.Errorf("not found extension, try to download it, error: %v", err)
go func() {
reader, dErr := s.ociDownloader.Download(name, "", "")
if dErr != nil {
serverLogger.Error(dErr, "failed to download extension", "name", name)
} else {
extFile := s.ociDownloader.GetTargetFile()

targetFile := filepath.Base(extFile)
if dErr = downloader.WriteTo(reader, targetDir, targetFile); dErr == nil {
binaryPath = filepath.Join(targetDir, targetFile)
s.startPlugin(socket, binaryPath, name)
} else {
serverLogger.Error(dErr, "failed to save extension", "targetFile", targetFile)
}
}
}()
}
}

if err == nil {
go s.startPlugin(socket, binaryPath, name)
}
return
if v, ok := s.extStatusMap[name]; ok && v {
return
}
platformBasedName := name
if s.execer.OS() == "windows" {
platformBasedName += ".exe"
}
targetDir := home.GetUserBinDir()
targetBinaryFile := filepath.Join(targetDir, platformBasedName)

var binaryPath string
if _, err = os.Stat(targetBinaryFile); err == nil {
binaryPath = targetBinaryFile
} else {
serverLogger.Info("failed to find extension", "error", err.Error())

binaryPath, err = s.execer.LookPath(platformBasedName)
if err != nil {
err = fmt.Errorf("not found extension, try to download it, error: %v", err)
go func() {
s.ociDownloader.WithKind("store")
s.ociDownloader.WithOS(s.execer.OS())
reader, dErr := s.ociDownloader.Download(name, "", "")
if dErr != nil {
serverLogger.Error(dErr, "failed to download extension", "name", name)
} else {
extFile := s.ociDownloader.GetTargetFile()

targetFile := filepath.Base(extFile)
if dErr = downloader.WriteTo(reader, targetDir, targetFile); dErr == nil {
binaryPath = filepath.Join(targetDir, targetFile)
s.startPlugin(socket, binaryPath, name)
} else {
serverLogger.Error(dErr, "failed to save extension", "targetFile", targetFile)
}
}
}()
}
}

if err == nil {
go s.startPlugin(socket, binaryPath, name)
}
return
}

func (s *storeExtManager) startPlugin(socketURL, plugin, pluginName string) (err error) {
if strings.Contains(socketURL, ":") && !strings.HasPrefix(socketURL, s.socketPrefix) {
err = s.startPluginViaHTTP(socketURL, plugin, pluginName)
return
}
socketFile := strings.TrimPrefix(socketURL, s.socketPrefix)
_ = os.RemoveAll(socketFile) // always deleting the socket file to avoid start failing
if strings.Contains(socketURL, ":") && !strings.HasPrefix(socketURL, s.socketPrefix) {
err = s.startPluginViaHTTP(socketURL, plugin, pluginName)
return
}
socketFile := strings.TrimPrefix(socketURL, s.socketPrefix)
_ = os.RemoveAll(socketFile) // always deleting the socket file to avoid start failing

s.lock.Lock()
s.filesNeedToBeRemoved = append(s.filesNeedToBeRemoved, socketFile)
s.extStatusMap[pluginName] = true
s.lock.Unlock()
s.lock.Lock()
s.filesNeedToBeRemoved = append(s.filesNeedToBeRemoved, socketFile)
s.extStatusMap[pluginName] = true
s.lock.Unlock()

if err = s.execer.RunCommandWithIO(plugin, "", os.Stdout, os.Stderr, s.processChan, "--socket", socketFile); err != nil {
serverLogger.Info("failed to start ext manager", "socket", socketURL, "error: ", err.Error())
}
return
if err = s.execer.RunCommandWithIO(plugin, "", os.Stdout, os.Stderr, s.processChan, "--socket", socketFile); err != nil {
serverLogger.Info("failed to start ext manager", "socket", socketURL, "error: ", err.Error())
}
return
}

func (s *storeExtManager) startPluginViaHTTP(httpURL, plugin, pluginName string) (err error) {
port := strings.Split(httpURL, ":")[1]
if err = s.execer.RunCommandWithIO(plugin, "", os.Stdout, os.Stderr, s.processChan, "--port", port); err != nil {
serverLogger.Info("failed to start ext manager", "port", port, "error: ", err.Error())
}
return
port := strings.Split(httpURL, ":")[1]
if err = s.execer.RunCommandWithIO(plugin, "", os.Stdout, os.Stderr, s.processChan, "--port", port); err != nil {
serverLogger.Info("failed to start ext manager", "port", port, "error: ", err.Error())
}
return
}

func (s *storeExtManager) StopAll() error {
serverLogger.Info("stop", "extensions", len(s.processs))
for _, p := range s.processs {
if p != nil {
// Use Kill on Windows, Signal on other platforms
if isWindows() {
p.Kill()
} else {
p.Signal(syscall.SIGTERM)
}
}
}
s.stopSingal <- struct{}{}
return nil
serverLogger.Info("stop", "extensions", len(s.processs))
for _, p := range s.processs {
if p != nil {
// Use Kill on Windows, Signal on other platforms
if isWindows() {
p.Kill()
} else {
p.Signal(syscall.SIGTERM)
}
}
}
s.stopSingal <- struct{}{}
return nil
}

// isWindows returns true if the program is running on Windows OS.
func isWindows() bool {
return strings.Contains(strings.ToLower(os.Getenv("OS")), "windows") ||
(strings.Contains(strings.ToLower(os.Getenv("GOOS")), "windows"))
return strings.Contains(strings.ToLower(os.Getenv("OS")), "windows") ||
(strings.Contains(strings.ToLower(os.Getenv("GOOS")), "windows"))
}

func (s *storeExtManager) WithDownloader(ociDownloader downloader.PlatformAwareOCIDownloader) {
s.ociDownloader = ociDownloader
s.ociDownloader = ociDownloader
}

func (s *storeExtManager) processCollect() {
go func() {
for {
select {
case p := <-s.processChan:
s.processs = append(s.processs, p)
case <-s.stopSingal:
return
}
}
}()
go func() {
for {
select {
case p := <-s.processChan:
s.processs = append(s.processs, p)
case <-s.stopSingal:
return
}
}
}()
}

var ErrDownloadNotSupport = errors.New("no support")
Expand All @@ -207,44 +210,44 @@ type nonDownloader struct{}
var _ downloader.PlatformAwareOCIDownloader = &nonDownloader{}

func (n *nonDownloader) WithBasicAuth(username string, password string) {
// Do nothing because this is an empty implementation
// Do nothing because this is an empty implementation
}

func (n *nonDownloader) Download(image, tag, file string) (reader io.Reader, err error) {
err = ErrDownloadNotSupport
return
err = ErrDownloadNotSupport
return
}

func (n *nonDownloader) WithOS(string) {
// Do nothing because this is an empty implementation
// Do nothing because this is an empty implementation
}

func (n *nonDownloader) WithArch(string) {
// Do nothing because this is an empty implementation
// Do nothing because this is an empty implementation
}

func (n *nonDownloader) WithRegistry(string) {
// Do nothing because this is an empty implementation
// Do nothing because this is an empty implementation
}

func (n *nonDownloader) WithKind(string) {
// Do nothing because this is an empty implementation
// Do nothing because this is an empty implementation
}

func (n *nonDownloader) WithImagePrefix(imagePrefix string) {
// Do nothing because this is an empty implementation
// Do nothing because this is an empty implementation
}
func (d *nonDownloader) WithRoundTripper(rt http.RoundTripper) {
// Do nothing because this is an empty implementation
// Do nothing because this is an empty implementation
}

func (d *nonDownloader) WithInsecure(bool) {
// Do nothing because this is an empty implementation
// Do nothing because this is an empty implementation
}

func (d *nonDownloader) WithTimeout(time.Duration) {}
func (d *nonDownloader) WithContext(context.Context) {}

func (n *nonDownloader) GetTargetFile() string {
return ""
return ""
}
Loading