Skip to content

Commit 5efc62d

Browse files
committed
修改选择host的逻辑,先保证单主机单实例的发布流程
1 parent ad27f4d commit 5efc62d

File tree

3 files changed

+117
-13
lines changed

3 files changed

+117
-13
lines changed

internal/deploy/database/instance_repo.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,3 +351,42 @@ func (r *InstanceRepo) GetExistingInstancePorts(serviceName, instanceIP string)
351351

352352
return ports, nil
353353
}
354+
355+
// GetHostsRunningServiceVersion 查询已运行指定服务指定版本的主机ID列表
356+
func (r *InstanceRepo) GetHostsRunningServiceVersion(serviceName, serviceVersion string) ([]string, error) {
357+
// 参数验证
358+
if serviceName == "" {
359+
return nil, fmt.Errorf("serviceName cannot be empty")
360+
}
361+
if serviceVersion == "" {
362+
return nil, fmt.Errorf("serviceVersion cannot be empty")
363+
}
364+
365+
query := `
366+
SELECT DISTINCT host_id
367+
FROM instances
368+
WHERE service_name = $1 AND service_version = $2 AND status = 'active'
369+
ORDER BY host_id
370+
`
371+
372+
rows, err := r.db.Query(query, serviceName, serviceVersion)
373+
if err != nil {
374+
return nil, fmt.Errorf("failed to query hosts running service %s version %s: %w", serviceName, serviceVersion, err)
375+
}
376+
defer rows.Close()
377+
378+
var hostIDs []string
379+
for rows.Next() {
380+
var hostID string
381+
if err := rows.Scan(&hostID); err != nil {
382+
return nil, fmt.Errorf("failed to scan host_id: %w", err)
383+
}
384+
hostIDs = append(hostIDs, hostID)
385+
}
386+
387+
if err := rows.Err(); err != nil {
388+
return nil, fmt.Errorf("error iterating host IDs: %w", err)
389+
}
390+
391+
return hostIDs, nil
392+
}

internal/deploy/service/deploy_service.go

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -635,7 +635,7 @@ func (f *floyDeployService) deployToSingleInstance(instanceIP, service, version,
635635

636636
// 3. 推送配置文件
637637
if wantConfig {
638-
if err := f.pushConfig(instanceIP, service, fversion); err != nil {
638+
if err := f.pushConfig(instanceIP, service, fversion, version, packageFilePath); err != nil {
639639
return fmt.Errorf("推送配置文件失败: %v", err)
640640
}
641641
}
@@ -667,14 +667,14 @@ func (f *floyDeployService) rollbackToSingleInstance(instanceIP, service, target
667667
return fmt.Errorf("推送回滚包文件失败: %v", err)
668668
}
669669
}
670-
// // 2. 推送回滚包文件
670+
// 2. 推送回滚包文件
671671
// if err := f.pushPackage(instanceIP, service, fversion, targetVersion, packageFilePath, md5sum); err != nil {
672672
// return fmt.Errorf("推送回滚包文件失败: %v", err)
673673
// }
674674

675675
// 3. 推送配置文件
676676
if wantConfig {
677-
if err := f.pushConfig(instanceIP, service, fversion); err != nil {
677+
if err := f.pushConfig(instanceIP, service, fversion, targetVersion, packageFilePath); err != nil {
678678
return fmt.Errorf("推送配置文件失败: %v", err)
679679
}
680680
}
@@ -859,13 +859,15 @@ func (f *floyDeployService) pushPackage(instanceIP, service, fversion, version s
859859
}
860860

861861
// pushConfig 推送配置文件
862-
func (f *floyDeployService) pushConfig(instanceIP, service, fversion string) error {
862+
func (f *floyDeployService) pushConfig(instanceIP, service, fversion, version, packageFilePath string) error {
863863
baseURL := fmt.Sprintf("http://%s:%s", instanceIP, f.port)
864864

865-
// 简单的配置文件示例
866-
configContent := fmt.Sprintf("# Configuration for %s\nservice.name=%s\nservice.version=%s\n",
867-
service, service, fversion)
868-
configMD5 := md5.Sum([]byte(configContent))
865+
// 从修改后的包文件中提取配置文件内容
866+
configContent, err := f.extractConfigFromPackage(packageFilePath)
867+
if err != nil {
868+
return fmt.Errorf("failed to extract config from package: %v", err)
869+
}
870+
configMD5 := md5.Sum(configContent)
869871

870872
// 使用 multipart.Writer 构造请求体
871873
var buf bytes.Buffer
@@ -879,7 +881,7 @@ func (f *floyDeployService) pushConfig(instanceIP, service, fversion string) err
879881

880882
// 创建文件字段,设置 Content-Md5 头
881883
header := make(map[string][]string)
882-
header["Content-Disposition"] = []string{`form-data; name="file"; filename="app.conf"`}
884+
header["Content-Disposition"] = []string{fmt.Sprintf(`form-data; name="file"; filename="%s/config.yaml"`, version)}
883885
header["Content-Type"] = []string{"application/octet-stream"}
884886
header["Content-Md5"] = []string{base64.URLEncoding.EncodeToString(configMD5[:])}
885887
header["File-Mode"] = []string{"644"}
@@ -890,7 +892,7 @@ func (f *floyDeployService) pushConfig(instanceIP, service, fversion string) err
890892
}
891893

892894
// 写入配置文件内容
893-
_, err = fileWriter.Write([]byte(configContent))
895+
_, err = fileWriter.Write(configContent)
894896
if err != nil {
895897
return fmt.Errorf("failed to write config data: %v", err)
896898
}
@@ -1545,3 +1547,28 @@ func (f *floyDeployService) createTarGz(src, dest string) error {
15451547
return nil
15461548
})
15471549
}
1550+
1551+
// extractConfigFromPackage 从包文件中提取配置文件内容
1552+
func (f *floyDeployService) extractConfigFromPackage(packageFilePath string) ([]byte, error) {
1553+
// 创建临时目录
1554+
tempDir, err := os.MkdirTemp("", "extract-config-*")
1555+
if err != nil {
1556+
return nil, fmt.Errorf("创建临时目录失败: %v", err)
1557+
}
1558+
defer os.RemoveAll(tempDir)
1559+
1560+
// 解压包文件到临时目录
1561+
err = f.extractTarGz(packageFilePath, tempDir)
1562+
if err != nil {
1563+
return nil, fmt.Errorf("解压包文件失败: %v", err)
1564+
}
1565+
1566+
// 读取配置文件
1567+
configPath := filepath.Join(tempDir, "config.yaml")
1568+
configContent, err := os.ReadFile(configPath)
1569+
if err != nil {
1570+
return nil, fmt.Errorf("读取配置文件失败: %v", err)
1571+
}
1572+
1573+
return configContent, nil
1574+
}

internal/deploy/service/internal_utils.go

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,15 +156,53 @@ func CheckHostHealth(hostIpAddress string) (bool, error) {
156156
}
157157

158158
// SelectHostForNewInstance 为新实例选择合适的主机
159+
// 查询数据库,选择未运行当前服务当前版本的主机
159160
func SelectHostForNewInstance(availableHosts []*model.HostInfo, service string, version string) (*model.HostInfo, error) {
160161
// 参数验证
161162
if len(availableHosts) == 0 {
162163
return nil, fmt.Errorf("no available hosts")
163164
}
165+
if service == "" {
166+
return nil, fmt.Errorf("service name cannot be empty")
167+
}
168+
if version == "" {
169+
return nil, fmt.Errorf("service version cannot be empty")
170+
}
171+
172+
// 获取数据库连接
173+
_, err := initDatabase()
174+
if err != nil {
175+
return nil, fmt.Errorf("failed to initialize database: %w", err)
176+
}
177+
178+
// 查询已运行当前服务当前版本的主机ID列表
179+
runningHostIDs, err := instanceRepo.GetHostsRunningServiceVersion(service, version)
180+
if err != nil {
181+
return nil, fmt.Errorf("failed to query hosts running service %s version %s: %w", service, version, err)
182+
}
183+
184+
// 创建已运行主机ID的映射,便于快速查找
185+
runningHostMap := make(map[string]bool)
186+
for _, hostID := range runningHostIDs {
187+
runningHostMap[hostID] = true
188+
}
189+
190+
// 筛选出未运行当前服务当前版本的主机
191+
availableHostsForService := []*model.HostInfo{}
192+
for _, host := range availableHosts {
193+
if !runningHostMap[host.HostID] {
194+
availableHostsForService = append(availableHostsForService, host)
195+
}
196+
}
197+
198+
// 如果没有可用的主机,返回错误
199+
if len(availableHostsForService) == 0 {
200+
return nil, fmt.Errorf("all hosts are already running service %s version %s", service, version)
201+
}
164202

165-
// 随机选择一个主机
166-
randomIndex := rand.Intn(len(availableHosts))
167-
selectedHost := availableHosts[randomIndex]
203+
// 从可用主机中随机选择一个
204+
randomIndex := rand.Intn(len(availableHostsForService))
205+
selectedHost := availableHostsForService[randomIndex]
168206

169207
return selectedHost, nil
170208
}

0 commit comments

Comments
 (0)