diff --git a/_mocks/opencsg.com/csghub-server/builder/deploy/cluster/mock_Pool.go b/_mocks/opencsg.com/csghub-server/builder/deploy/cluster/mock_Pool.go index f9bb33378..1ae4955f4 100644 --- a/_mocks/opencsg.com/csghub-server/builder/deploy/cluster/mock_Pool.go +++ b/_mocks/opencsg.com/csghub-server/builder/deploy/cluster/mock_Pool.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.53.0. DO NOT EDIT. +// Code generated by mockery v2.53.5. DO NOT EDIT. package cluster diff --git a/_mocks/opencsg.com/csghub-server/builder/store/database/mock_LfsMetaObjectStore.go b/_mocks/opencsg.com/csghub-server/builder/store/database/mock_LfsMetaObjectStore.go index 3ef09d771..261b77ded 100644 --- a/_mocks/opencsg.com/csghub-server/builder/store/database/mock_LfsMetaObjectStore.go +++ b/_mocks/opencsg.com/csghub-server/builder/store/database/mock_LfsMetaObjectStore.go @@ -70,6 +70,63 @@ func (_c *MockLfsMetaObjectStore_BulkUpdateOrCreate_Call) RunAndReturn(run func( return _c } +// CheckIfAllMigratedToXnet provides a mock function with given fields: ctx, repoID +func (_m *MockLfsMetaObjectStore) CheckIfAllMigratedToXnet(ctx context.Context, repoID int64) (bool, error) { + ret := _m.Called(ctx, repoID) + + if len(ret) == 0 { + panic("no return value specified for CheckIfAllMigratedToXnet") + } + + var r0 bool + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, int64) (bool, error)); ok { + return rf(ctx, repoID) + } + if rf, ok := ret.Get(0).(func(context.Context, int64) bool); ok { + r0 = rf(ctx, repoID) + } else { + r0 = ret.Get(0).(bool) + } + + if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok { + r1 = rf(ctx, repoID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockLfsMetaObjectStore_CheckIfAllMigratedToXnet_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckIfAllMigratedToXnet' +type MockLfsMetaObjectStore_CheckIfAllMigratedToXnet_Call struct { + *mock.Call +} + +// CheckIfAllMigratedToXnet is a helper method to define mock.On call +// - ctx context.Context +// - repoID int64 +func (_e *MockLfsMetaObjectStore_Expecter) CheckIfAllMigratedToXnet(ctx interface{}, repoID interface{}) *MockLfsMetaObjectStore_CheckIfAllMigratedToXnet_Call { + return &MockLfsMetaObjectStore_CheckIfAllMigratedToXnet_Call{Call: _e.mock.On("CheckIfAllMigratedToXnet", ctx, repoID)} +} + +func (_c *MockLfsMetaObjectStore_CheckIfAllMigratedToXnet_Call) Run(run func(ctx context.Context, repoID int64)) *MockLfsMetaObjectStore_CheckIfAllMigratedToXnet_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64)) + }) + return _c +} + +func (_c *MockLfsMetaObjectStore_CheckIfAllMigratedToXnet_Call) Return(_a0 bool, _a1 error) *MockLfsMetaObjectStore_CheckIfAllMigratedToXnet_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockLfsMetaObjectStore_CheckIfAllMigratedToXnet_Call) RunAndReturn(run func(context.Context, int64) (bool, error)) *MockLfsMetaObjectStore_CheckIfAllMigratedToXnet_Call { + _c.Call.Return(run) + return _c +} + // Create provides a mock function with given fields: ctx, lfsObj func (_m *MockLfsMetaObjectStore) Create(ctx context.Context, lfsObj database.LfsMetaObject) (*database.LfsMetaObject, error) { ret := _m.Called(ctx, lfsObj) diff --git a/_mocks/opencsg.com/csghub-server/builder/store/database/mock_XnetMigrationTaskStore.go b/_mocks/opencsg.com/csghub-server/builder/store/database/mock_XnetMigrationTaskStore.go new file mode 100644 index 000000000..43c2564fa --- /dev/null +++ b/_mocks/opencsg.com/csghub-server/builder/store/database/mock_XnetMigrationTaskStore.go @@ -0,0 +1,313 @@ +// Code generated by mockery v2.53.5. DO NOT EDIT. + +package database + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" + database "opencsg.com/csghub-server/builder/store/database" + + types "opencsg.com/csghub-server/common/types" +) + +// MockXnetMigrationTaskStore is an autogenerated mock type for the XnetMigrationTaskStore type +type MockXnetMigrationTaskStore struct { + mock.Mock +} + +type MockXnetMigrationTaskStore_Expecter struct { + mock *mock.Mock +} + +func (_m *MockXnetMigrationTaskStore) EXPECT() *MockXnetMigrationTaskStore_Expecter { + return &MockXnetMigrationTaskStore_Expecter{mock: &_m.Mock} +} + +// CreateXnetMigrationTask provides a mock function with given fields: ctx, repoID, lastMessage +func (_m *MockXnetMigrationTaskStore) CreateXnetMigrationTask(ctx context.Context, repoID int64, lastMessage string) error { + ret := _m.Called(ctx, repoID, lastMessage) + + if len(ret) == 0 { + panic("no return value specified for CreateXnetMigrationTask") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, int64, string) error); ok { + r0 = rf(ctx, repoID, lastMessage) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockXnetMigrationTaskStore_CreateXnetMigrationTask_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateXnetMigrationTask' +type MockXnetMigrationTaskStore_CreateXnetMigrationTask_Call struct { + *mock.Call +} + +// CreateXnetMigrationTask is a helper method to define mock.On call +// - ctx context.Context +// - repoID int64 +// - lastMessage string +func (_e *MockXnetMigrationTaskStore_Expecter) CreateXnetMigrationTask(ctx interface{}, repoID interface{}, lastMessage interface{}) *MockXnetMigrationTaskStore_CreateXnetMigrationTask_Call { + return &MockXnetMigrationTaskStore_CreateXnetMigrationTask_Call{Call: _e.mock.On("CreateXnetMigrationTask", ctx, repoID, lastMessage)} +} + +func (_c *MockXnetMigrationTaskStore_CreateXnetMigrationTask_Call) Run(run func(ctx context.Context, repoID int64, lastMessage string)) *MockXnetMigrationTaskStore_CreateXnetMigrationTask_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64), args[2].(string)) + }) + return _c +} + +func (_c *MockXnetMigrationTaskStore_CreateXnetMigrationTask_Call) Return(_a0 error) *MockXnetMigrationTaskStore_CreateXnetMigrationTask_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockXnetMigrationTaskStore_CreateXnetMigrationTask_Call) RunAndReturn(run func(context.Context, int64, string) error) *MockXnetMigrationTaskStore_CreateXnetMigrationTask_Call { + _c.Call.Return(run) + return _c +} + +// GetXnetMigrationTaskByID provides a mock function with given fields: ctx, id +func (_m *MockXnetMigrationTaskStore) GetXnetMigrationTaskByID(ctx context.Context, id int64) (*database.XnetMigrationTask, error) { + ret := _m.Called(ctx, id) + + if len(ret) == 0 { + panic("no return value specified for GetXnetMigrationTaskByID") + } + + var r0 *database.XnetMigrationTask + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, int64) (*database.XnetMigrationTask, error)); ok { + return rf(ctx, id) + } + if rf, ok := ret.Get(0).(func(context.Context, int64) *database.XnetMigrationTask); ok { + r0 = rf(ctx, id) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*database.XnetMigrationTask) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok { + r1 = rf(ctx, id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockXnetMigrationTaskStore_GetXnetMigrationTaskByID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetXnetMigrationTaskByID' +type MockXnetMigrationTaskStore_GetXnetMigrationTaskByID_Call struct { + *mock.Call +} + +// GetXnetMigrationTaskByID is a helper method to define mock.On call +// - ctx context.Context +// - id int64 +func (_e *MockXnetMigrationTaskStore_Expecter) GetXnetMigrationTaskByID(ctx interface{}, id interface{}) *MockXnetMigrationTaskStore_GetXnetMigrationTaskByID_Call { + return &MockXnetMigrationTaskStore_GetXnetMigrationTaskByID_Call{Call: _e.mock.On("GetXnetMigrationTaskByID", ctx, id)} +} + +func (_c *MockXnetMigrationTaskStore_GetXnetMigrationTaskByID_Call) Run(run func(ctx context.Context, id int64)) *MockXnetMigrationTaskStore_GetXnetMigrationTaskByID_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64)) + }) + return _c +} + +func (_c *MockXnetMigrationTaskStore_GetXnetMigrationTaskByID_Call) Return(_a0 *database.XnetMigrationTask, _a1 error) *MockXnetMigrationTaskStore_GetXnetMigrationTaskByID_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockXnetMigrationTaskStore_GetXnetMigrationTaskByID_Call) RunAndReturn(run func(context.Context, int64) (*database.XnetMigrationTask, error)) *MockXnetMigrationTaskStore_GetXnetMigrationTaskByID_Call { + _c.Call.Return(run) + return _c +} + +// ListXnetMigrationTasksByRepoID provides a mock function with given fields: ctx, repoID +func (_m *MockXnetMigrationTaskStore) ListXnetMigrationTasksByRepoID(ctx context.Context, repoID int64) ([]*database.XnetMigrationTask, error) { + ret := _m.Called(ctx, repoID) + + if len(ret) == 0 { + panic("no return value specified for ListXnetMigrationTasksByRepoID") + } + + var r0 []*database.XnetMigrationTask + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, int64) ([]*database.XnetMigrationTask, error)); ok { + return rf(ctx, repoID) + } + if rf, ok := ret.Get(0).(func(context.Context, int64) []*database.XnetMigrationTask); ok { + r0 = rf(ctx, repoID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*database.XnetMigrationTask) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok { + r1 = rf(ctx, repoID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockXnetMigrationTaskStore_ListXnetMigrationTasksByRepoID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListXnetMigrationTasksByRepoID' +type MockXnetMigrationTaskStore_ListXnetMigrationTasksByRepoID_Call struct { + *mock.Call +} + +// ListXnetMigrationTasksByRepoID is a helper method to define mock.On call +// - ctx context.Context +// - repoID int64 +func (_e *MockXnetMigrationTaskStore_Expecter) ListXnetMigrationTasksByRepoID(ctx interface{}, repoID interface{}) *MockXnetMigrationTaskStore_ListXnetMigrationTasksByRepoID_Call { + return &MockXnetMigrationTaskStore_ListXnetMigrationTasksByRepoID_Call{Call: _e.mock.On("ListXnetMigrationTasksByRepoID", ctx, repoID)} +} + +func (_c *MockXnetMigrationTaskStore_ListXnetMigrationTasksByRepoID_Call) Run(run func(ctx context.Context, repoID int64)) *MockXnetMigrationTaskStore_ListXnetMigrationTasksByRepoID_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64)) + }) + return _c +} + +func (_c *MockXnetMigrationTaskStore_ListXnetMigrationTasksByRepoID_Call) Return(_a0 []*database.XnetMigrationTask, _a1 error) *MockXnetMigrationTaskStore_ListXnetMigrationTasksByRepoID_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockXnetMigrationTaskStore_ListXnetMigrationTasksByRepoID_Call) RunAndReturn(run func(context.Context, int64) ([]*database.XnetMigrationTask, error)) *MockXnetMigrationTaskStore_ListXnetMigrationTasksByRepoID_Call { + _c.Call.Return(run) + return _c +} + +// ListXnetMigrationTasksByStatus provides a mock function with given fields: ctx, status +func (_m *MockXnetMigrationTaskStore) ListXnetMigrationTasksByStatus(ctx context.Context, status types.XnetMigrationTaskStatus) ([]*database.XnetMigrationTask, error) { + ret := _m.Called(ctx, status) + + if len(ret) == 0 { + panic("no return value specified for ListXnetMigrationTasksByStatus") + } + + var r0 []*database.XnetMigrationTask + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, types.XnetMigrationTaskStatus) ([]*database.XnetMigrationTask, error)); ok { + return rf(ctx, status) + } + if rf, ok := ret.Get(0).(func(context.Context, types.XnetMigrationTaskStatus) []*database.XnetMigrationTask); ok { + r0 = rf(ctx, status) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*database.XnetMigrationTask) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, types.XnetMigrationTaskStatus) error); ok { + r1 = rf(ctx, status) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockXnetMigrationTaskStore_ListXnetMigrationTasksByStatus_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListXnetMigrationTasksByStatus' +type MockXnetMigrationTaskStore_ListXnetMigrationTasksByStatus_Call struct { + *mock.Call +} + +// ListXnetMigrationTasksByStatus is a helper method to define mock.On call +// - ctx context.Context +// - status types.XnetMigrationTaskStatus +func (_e *MockXnetMigrationTaskStore_Expecter) ListXnetMigrationTasksByStatus(ctx interface{}, status interface{}) *MockXnetMigrationTaskStore_ListXnetMigrationTasksByStatus_Call { + return &MockXnetMigrationTaskStore_ListXnetMigrationTasksByStatus_Call{Call: _e.mock.On("ListXnetMigrationTasksByStatus", ctx, status)} +} + +func (_c *MockXnetMigrationTaskStore_ListXnetMigrationTasksByStatus_Call) Run(run func(ctx context.Context, status types.XnetMigrationTaskStatus)) *MockXnetMigrationTaskStore_ListXnetMigrationTasksByStatus_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(types.XnetMigrationTaskStatus)) + }) + return _c +} + +func (_c *MockXnetMigrationTaskStore_ListXnetMigrationTasksByStatus_Call) Return(_a0 []*database.XnetMigrationTask, _a1 error) *MockXnetMigrationTaskStore_ListXnetMigrationTasksByStatus_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockXnetMigrationTaskStore_ListXnetMigrationTasksByStatus_Call) RunAndReturn(run func(context.Context, types.XnetMigrationTaskStatus) ([]*database.XnetMigrationTask, error)) *MockXnetMigrationTaskStore_ListXnetMigrationTasksByStatus_Call { + _c.Call.Return(run) + return _c +} + +// UpdateXnetMigrationTask provides a mock function with given fields: ctx, id, lastMessage, status +func (_m *MockXnetMigrationTaskStore) UpdateXnetMigrationTask(ctx context.Context, id int64, lastMessage string, status types.XnetMigrationTaskStatus) error { + ret := _m.Called(ctx, id, lastMessage, status) + + if len(ret) == 0 { + panic("no return value specified for UpdateXnetMigrationTask") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, int64, string, types.XnetMigrationTaskStatus) error); ok { + r0 = rf(ctx, id, lastMessage, status) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockXnetMigrationTaskStore_UpdateXnetMigrationTask_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateXnetMigrationTask' +type MockXnetMigrationTaskStore_UpdateXnetMigrationTask_Call struct { + *mock.Call +} + +// UpdateXnetMigrationTask is a helper method to define mock.On call +// - ctx context.Context +// - id int64 +// - lastMessage string +// - status types.XnetMigrationTaskStatus +func (_e *MockXnetMigrationTaskStore_Expecter) UpdateXnetMigrationTask(ctx interface{}, id interface{}, lastMessage interface{}, status interface{}) *MockXnetMigrationTaskStore_UpdateXnetMigrationTask_Call { + return &MockXnetMigrationTaskStore_UpdateXnetMigrationTask_Call{Call: _e.mock.On("UpdateXnetMigrationTask", ctx, id, lastMessage, status)} +} + +func (_c *MockXnetMigrationTaskStore_UpdateXnetMigrationTask_Call) Run(run func(ctx context.Context, id int64, lastMessage string, status types.XnetMigrationTaskStatus)) *MockXnetMigrationTaskStore_UpdateXnetMigrationTask_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64), args[2].(string), args[3].(types.XnetMigrationTaskStatus)) + }) + return _c +} + +func (_c *MockXnetMigrationTaskStore_UpdateXnetMigrationTask_Call) Return(_a0 error) *MockXnetMigrationTaskStore_UpdateXnetMigrationTask_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockXnetMigrationTaskStore_UpdateXnetMigrationTask_Call) RunAndReturn(run func(context.Context, int64, string, types.XnetMigrationTaskStatus) error) *MockXnetMigrationTaskStore_UpdateXnetMigrationTask_Call { + _c.Call.Return(run) + return _c +} + +// NewMockXnetMigrationTaskStore creates a new instance of MockXnetMigrationTaskStore. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockXnetMigrationTaskStore(t interface { + mock.TestingT + Cleanup(func()) +}) *MockXnetMigrationTaskStore { + mock := &MockXnetMigrationTaskStore{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/_mocks/opencsg.com/csghub-server/component/mock_LfsComponent.go b/_mocks/opencsg.com/csghub-server/component/mock_LfsComponent.go new file mode 100644 index 000000000..5c5b04e7b --- /dev/null +++ b/_mocks/opencsg.com/csghub-server/component/mock_LfsComponent.go @@ -0,0 +1,174 @@ +// Code generated by mockery v2.53.5. DO NOT EDIT. + +package component + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" +) + +// MockLfsComponent is an autogenerated mock type for the LfsComponent type +type MockLfsComponent struct { + mock.Mock +} + +type MockLfsComponent_Expecter struct { + mock *mock.Mock +} + +func (_m *MockLfsComponent) EXPECT() *MockLfsComponent_Expecter { + return &MockLfsComponent_Expecter{mock: &_m.Mock} +} + +// DispatchLfsXnetProgress provides a mock function with no fields +func (_m *MockLfsComponent) DispatchLfsXnetProgress() error { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for DispatchLfsXnetProgress") + } + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockLfsComponent_DispatchLfsXnetProgress_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DispatchLfsXnetProgress' +type MockLfsComponent_DispatchLfsXnetProgress_Call struct { + *mock.Call +} + +// DispatchLfsXnetProgress is a helper method to define mock.On call +func (_e *MockLfsComponent_Expecter) DispatchLfsXnetProgress() *MockLfsComponent_DispatchLfsXnetProgress_Call { + return &MockLfsComponent_DispatchLfsXnetProgress_Call{Call: _e.mock.On("DispatchLfsXnetProgress")} +} + +func (_c *MockLfsComponent_DispatchLfsXnetProgress_Call) Run(run func()) *MockLfsComponent_DispatchLfsXnetProgress_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockLfsComponent_DispatchLfsXnetProgress_Call) Return(_a0 error) *MockLfsComponent_DispatchLfsXnetProgress_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockLfsComponent_DispatchLfsXnetProgress_Call) RunAndReturn(run func() error) *MockLfsComponent_DispatchLfsXnetProgress_Call { + _c.Call.Return(run) + return _c +} + +// DispatchLfsXnetResult provides a mock function with no fields +func (_m *MockLfsComponent) DispatchLfsXnetResult() error { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for DispatchLfsXnetResult") + } + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockLfsComponent_DispatchLfsXnetResult_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DispatchLfsXnetResult' +type MockLfsComponent_DispatchLfsXnetResult_Call struct { + *mock.Call +} + +// DispatchLfsXnetResult is a helper method to define mock.On call +func (_e *MockLfsComponent_Expecter) DispatchLfsXnetResult() *MockLfsComponent_DispatchLfsXnetResult_Call { + return &MockLfsComponent_DispatchLfsXnetResult_Call{Call: _e.mock.On("DispatchLfsXnetResult")} +} + +func (_c *MockLfsComponent_DispatchLfsXnetResult_Call) Run(run func()) *MockLfsComponent_DispatchLfsXnetResult_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockLfsComponent_DispatchLfsXnetResult_Call) Return(_a0 error) *MockLfsComponent_DispatchLfsXnetResult_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockLfsComponent_DispatchLfsXnetResult_Call) RunAndReturn(run func() error) *MockLfsComponent_DispatchLfsXnetResult_Call { + _c.Call.Return(run) + return _c +} + +// PublishLfsMigrationMessage provides a mock function with given fields: ctx, repoID, oid +func (_m *MockLfsComponent) PublishLfsMigrationMessage(ctx context.Context, repoID int64, oid string) error { + ret := _m.Called(ctx, repoID, oid) + + if len(ret) == 0 { + panic("no return value specified for PublishLfsMigrationMessage") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, int64, string) error); ok { + r0 = rf(ctx, repoID, oid) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockLfsComponent_PublishLfsMigrationMessage_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PublishLfsMigrationMessage' +type MockLfsComponent_PublishLfsMigrationMessage_Call struct { + *mock.Call +} + +// PublishLfsMigrationMessage is a helper method to define mock.On call +// - ctx context.Context +// - repoID int64 +// - oid string +func (_e *MockLfsComponent_Expecter) PublishLfsMigrationMessage(ctx interface{}, repoID interface{}, oid interface{}) *MockLfsComponent_PublishLfsMigrationMessage_Call { + return &MockLfsComponent_PublishLfsMigrationMessage_Call{Call: _e.mock.On("PublishLfsMigrationMessage", ctx, repoID, oid)} +} + +func (_c *MockLfsComponent_PublishLfsMigrationMessage_Call) Run(run func(ctx context.Context, repoID int64, oid string)) *MockLfsComponent_PublishLfsMigrationMessage_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64), args[2].(string)) + }) + return _c +} + +func (_c *MockLfsComponent_PublishLfsMigrationMessage_Call) Return(_a0 error) *MockLfsComponent_PublishLfsMigrationMessage_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockLfsComponent_PublishLfsMigrationMessage_Call) RunAndReturn(run func(context.Context, int64, string) error) *MockLfsComponent_PublishLfsMigrationMessage_Call { + _c.Call.Return(run) + return _c +} + +// NewMockLfsComponent creates a new instance of MockLfsComponent. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockLfsComponent(t interface { + mock.TestingT + Cleanup(func()) +}) *MockLfsComponent { + mock := &MockLfsComponent{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/_mocks/opencsg.com/csghub-server/component/mock_RepoComponent.go b/_mocks/opencsg.com/csghub-server/component/mock_RepoComponent.go index 8eac084ff..66dd6e99c 100644 --- a/_mocks/opencsg.com/csghub-server/component/mock_RepoComponent.go +++ b/_mocks/opencsg.com/csghub-server/component/mock_RepoComponent.go @@ -3465,6 +3465,114 @@ func (_c *MockRepoComponent_LogsTree_Call) RunAndReturn(run func(context.Context return _c } +// MigrateToXnet provides a mock function with given fields: ctx, repoType, namespace, name +func (_m *MockRepoComponent) MigrateToXnet(ctx context.Context, repoType types.RepositoryType, namespace string, name string) error { + ret := _m.Called(ctx, repoType, namespace, name) + + if len(ret) == 0 { + panic("no return value specified for MigrateToXnet") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, types.RepositoryType, string, string) error); ok { + r0 = rf(ctx, repoType, namespace, name) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockRepoComponent_MigrateToXnet_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MigrateToXnet' +type MockRepoComponent_MigrateToXnet_Call struct { + *mock.Call +} + +// MigrateToXnet is a helper method to define mock.On call +// - ctx context.Context +// - repoType types.RepositoryType +// - namespace string +// - name string +func (_e *MockRepoComponent_Expecter) MigrateToXnet(ctx interface{}, repoType interface{}, namespace interface{}, name interface{}) *MockRepoComponent_MigrateToXnet_Call { + return &MockRepoComponent_MigrateToXnet_Call{Call: _e.mock.On("MigrateToXnet", ctx, repoType, namespace, name)} +} + +func (_c *MockRepoComponent_MigrateToXnet_Call) Run(run func(ctx context.Context, repoType types.RepositoryType, namespace string, name string)) *MockRepoComponent_MigrateToXnet_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(types.RepositoryType), args[2].(string), args[3].(string)) + }) + return _c +} + +func (_c *MockRepoComponent_MigrateToXnet_Call) Return(_a0 error) *MockRepoComponent_MigrateToXnet_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockRepoComponent_MigrateToXnet_Call) RunAndReturn(run func(context.Context, types.RepositoryType, string, string) error) *MockRepoComponent_MigrateToXnet_Call { + _c.Call.Return(run) + return _c +} + +// MigrateToXnetProgress provides a mock function with given fields: ctx, repoType, namespace, name +func (_m *MockRepoComponent) MigrateToXnetProgress(ctx context.Context, repoType types.RepositoryType, namespace string, name string) (types.XnetMigrationTaskProgress, error) { + ret := _m.Called(ctx, repoType, namespace, name) + + if len(ret) == 0 { + panic("no return value specified for MigrateToXnetProgress") + } + + var r0 types.XnetMigrationTaskProgress + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, types.RepositoryType, string, string) (types.XnetMigrationTaskProgress, error)); ok { + return rf(ctx, repoType, namespace, name) + } + if rf, ok := ret.Get(0).(func(context.Context, types.RepositoryType, string, string) types.XnetMigrationTaskProgress); ok { + r0 = rf(ctx, repoType, namespace, name) + } else { + r0 = ret.Get(0).(types.XnetMigrationTaskProgress) + } + + if rf, ok := ret.Get(1).(func(context.Context, types.RepositoryType, string, string) error); ok { + r1 = rf(ctx, repoType, namespace, name) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockRepoComponent_MigrateToXnetProgress_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MigrateToXnetProgress' +type MockRepoComponent_MigrateToXnetProgress_Call struct { + *mock.Call +} + +// MigrateToXnetProgress is a helper method to define mock.On call +// - ctx context.Context +// - repoType types.RepositoryType +// - namespace string +// - name string +func (_e *MockRepoComponent_Expecter) MigrateToXnetProgress(ctx interface{}, repoType interface{}, namespace interface{}, name interface{}) *MockRepoComponent_MigrateToXnetProgress_Call { + return &MockRepoComponent_MigrateToXnetProgress_Call{Call: _e.mock.On("MigrateToXnetProgress", ctx, repoType, namespace, name)} +} + +func (_c *MockRepoComponent_MigrateToXnetProgress_Call) Run(run func(ctx context.Context, repoType types.RepositoryType, namespace string, name string)) *MockRepoComponent_MigrateToXnetProgress_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(types.RepositoryType), args[2].(string), args[3].(string)) + }) + return _c +} + +func (_c *MockRepoComponent_MigrateToXnetProgress_Call) Return(_a0 types.XnetMigrationTaskProgress, _a1 error) *MockRepoComponent_MigrateToXnetProgress_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockRepoComponent_MigrateToXnetProgress_Call) RunAndReturn(run func(context.Context, types.RepositoryType, string, string) (types.XnetMigrationTaskProgress, error)) *MockRepoComponent_MigrateToXnetProgress_Call { + _c.Call.Return(run) + return _c +} + // MirrorFromSaas provides a mock function with given fields: ctx, namespace, name, currentUser, repoType func (_m *MockRepoComponent) MirrorFromSaas(ctx context.Context, namespace string, name string, currentUser string, repoType types.RepositoryType) error { ret := _m.Called(ctx, namespace, name, currentUser, repoType) diff --git a/builder/mq/types.go b/builder/mq/types.go index 8e71094f2..bae4ca4b5 100644 --- a/builder/mq/types.go +++ b/builder/mq/types.go @@ -24,6 +24,14 @@ const ( HighPriorityMsgSubject string = "notification.message.high" NormalPriorityMsgSubject string = "notification.message.normal" + + AgentSessionHistoryMsgSubject string = "agent.session.history.message" + + LfsXnetProcessedSubject string = "xnet.lfs.processed" + + LfsMigrationSubject string = "xnet.lfs.migrate" + LfsProgressSubject string = "xnet.lfs.progress" + LfsResultSubject string = "xnet.lfs.result" ) type MQGroup struct { @@ -76,6 +84,22 @@ var ( StreamName: "webhookEventStream", // webhook event stream name ConsumerName: "webhookEventConsumer", } + AgentSessionHistoryMsgGroup = MQGroup{ + StreamName: "agentSessionHistoryMsgStream", + ConsumerName: "agentSessionHistoryMsgConsumer", + } + LfsXnetResultGroup = MQGroup{ + StreamName: "lfsResultStreamClient", + ConsumerName: "lfsResultConsumerClient", + } + LfsXnetProgressGroup = MQGroup{ + StreamName: "lfsProgressStreamClient", + ConsumerName: "lfsProgressConsumerClient", + } + LfsXnetMigrateGroup = MQGroup{ + StreamName: "lfsMigrationStream", + ConsumerName: "lfsMigrationConsumer", + } ) type MessageMeta struct { diff --git a/builder/store/cache/lfs.go b/builder/store/cache/lfs.go new file mode 100644 index 000000000..724e19553 --- /dev/null +++ b/builder/store/cache/lfs.go @@ -0,0 +1,60 @@ +package cache + +import ( + "context" + "fmt" + "strconv" + + "opencsg.com/csghub-server/common/config" +) + +type LfsCacheImpl struct { + cache RedisClient +} + +type LfsCache interface { + CacheLfsProgress(ctx context.Context, repoID int64, oid string, progress int) error + GetLfsProgress(ctx context.Context, repoID int64, oid string) (int, error) + DeleteLfsProgress(ctx context.Context, repoID int64, oid string) error +} + +func NewLfsCache(config *config.Config) (LfsCache, error) { + cacheClient, err := NewCache(context.Background(), RedisConfig{ + Addr: config.Redis.Endpoint, + Username: config.Redis.User, + Password: config.Redis.Password, + }) + if err != nil { + return nil, err + } + return &LfsCacheImpl{ + cache: cacheClient, + }, nil +} + +func (l *LfsCacheImpl) CacheLfsProgress(ctx context.Context, repoID int64, oid string, progress int) error { + key := lfsProgressCacheKey(repoID, oid) + return l.cache.Set(ctx, key, strconv.Itoa(progress)) +} +func (l *LfsCacheImpl) GetLfsProgress(ctx context.Context, repoID int64, oid string) (int, error) { + key := lfsProgressCacheKey(repoID, oid) + val, err := l.cache.Get(ctx, key) + if err != nil { + return 0, err + } + var progress int + _, err = fmt.Sscanf(val, "%d", &progress) + if err != nil { + return 0, err + } + return progress, nil +} + +func (l *LfsCacheImpl) DeleteLfsProgress(ctx context.Context, repoID int64, oid string) error { + key := lfsProgressCacheKey(repoID, oid) + return l.cache.Del(ctx, key) +} + +func lfsProgressCacheKey(repoID int64, oid string) string { + return fmt.Sprintf("xnet:migration:lfs:progress:%d:%s", repoID, oid) +} diff --git a/builder/store/database/lfs_meta_object.go b/builder/store/database/lfs_meta_object.go index 767cdf3dd..b281db188 100644 --- a/builder/store/database/lfs_meta_object.go +++ b/builder/store/database/lfs_meta_object.go @@ -19,6 +19,8 @@ type LfsMetaObjectStore interface { RemoveByOid(ctx context.Context, oid string, repoID int64) error UpdateOrCreate(ctx context.Context, input LfsMetaObject) (*LfsMetaObject, error) BulkUpdateOrCreate(ctx context.Context, repoID int64, input []LfsMetaObject) error + UpdateXnetUsed(ctx context.Context, repoID int64, oid string, xnetUsed bool) error + CheckIfAllMigratedToXnet(ctx context.Context, repoID int64) (bool, error) } func NewLfsMetaObjectStore() LfsMetaObjectStore { @@ -127,3 +129,24 @@ func (s *lfsMetaObjectStoreImpl) BulkUpdateOrCreate(ctx context.Context, repoID return err }) } + +func (s *lfsMetaObjectStoreImpl) UpdateXnetUsed(ctx context.Context, repoID int64, oid string, xnetUsed bool) error { + _, err := s.db.Operator.Core.NewUpdate(). + Model(&LfsMetaObject{}). + Set("xnet_used = ?", xnetUsed). + Set("updated_at = ?", time.Now()). + Where("repository_id = ? AND oid = ?", repoID, oid). + Exec(ctx) + return err +} + +func (s *lfsMetaObjectStoreImpl) CheckIfAllMigratedToXnet(ctx context.Context, repoID int64) (bool, error) { + count, err := s.db.Operator.Core.NewSelect(). + Model((*LfsMetaObject)(nil)). + Where("repository_id = ? AND xnet_used = false", repoID). + Count(ctx) + if err != nil { + return false, err + } + return count == 0, nil +} diff --git a/builder/store/database/migrations/20260105122809_create_xnet_migration_task.go b/builder/store/database/migrations/20260105122809_create_xnet_migration_task.go new file mode 100644 index 000000000..17e5ec1a7 --- /dev/null +++ b/builder/store/database/migrations/20260105122809_create_xnet_migration_task.go @@ -0,0 +1,24 @@ +package migrations + +import ( + "context" + + "github.com/uptrace/bun" +) + +type XnetMigrationTask struct { + ID int64 `bun:"id,pk,autoincrement"` + RepositoryID int64 `bun:"repository_id,notnull"` + LastMessage string `bun:"last_message"` + Status string `bun:"status,notnull"` + + times +} + +func init() { + Migrations.MustRegister(func(ctx context.Context, db *bun.DB) error { + return createTables(ctx, db, XnetMigrationTask{}) + }, func(ctx context.Context, db *bun.DB) error { + return dropTables(ctx, db, XnetMigrationTask{}) + }) +} diff --git a/builder/store/database/xnet_migration_task.go b/builder/store/database/xnet_migration_task.go new file mode 100644 index 000000000..b5804baec --- /dev/null +++ b/builder/store/database/xnet_migration_task.go @@ -0,0 +1,73 @@ +package database + +import ( + "context" + + "opencsg.com/csghub-server/common/types" +) + +type XnetMigrationTaskStore interface { + CreateXnetMigrationTask(ctx context.Context, repoID int64, lastMessage string) error + UpdateXnetMigrationTask(ctx context.Context, id int64, lastMessage string, status types.XnetMigrationTaskStatus) error + GetXnetMigrationTaskByID(ctx context.Context, id int64) (*XnetMigrationTask, error) + ListXnetMigrationTasksByStatus(ctx context.Context, status types.XnetMigrationTaskStatus) ([]*XnetMigrationTask, error) + ListXnetMigrationTasksByRepoID(ctx context.Context, repoID int64) ([]*XnetMigrationTask, error) +} + +type XnetMigrationTaskStoreImpl struct { + db *DB +} + +func NewXnetMigrationTaskStore() XnetMigrationTaskStore { + return &XnetMigrationTaskStoreImpl{ + db: defaultDB, + } +} + +func NewXnetMigrationTaskStoreWithDB(db *DB) XnetMigrationTaskStore { + return &XnetMigrationTaskStoreImpl{ + db: db, + } +} + +type XnetMigrationTask struct { + ID int64 `bun:"id,pk,autoincrement"` + RepositoryID int64 `bun:"repository_id,notnull"` + LastMessage string `bun:"last_message"` + Status types.XnetMigrationTaskStatus `bun:"status,notnull"` + + times +} + +func (s *XnetMigrationTaskStoreImpl) CreateXnetMigrationTask(ctx context.Context, repoID int64, lastMessage string) error { + _, err := s.db.Operator.Core.NewInsert().Model(&XnetMigrationTask{ + RepositoryID: repoID, + LastMessage: lastMessage, + Status: types.XnetMigrationTaskStatusPending, + }).Exec(ctx) + return err +} +func (s *XnetMigrationTaskStoreImpl) UpdateXnetMigrationTask(ctx context.Context, id int64, lastMessage string, status types.XnetMigrationTaskStatus) error { + _, err := s.db.Operator.Core.NewUpdate().Model(&XnetMigrationTask{ + ID: id, + LastMessage: lastMessage, + Status: status, + }).WherePK().Exec(ctx) + return err +} +func (s *XnetMigrationTaskStoreImpl) GetXnetMigrationTaskByID(ctx context.Context, id int64) (*XnetMigrationTask, error) { + var task XnetMigrationTask + err := s.db.Operator.Core.NewSelect().Model(&task).Where("id = ?", id).Scan(ctx) + return &task, err +} +func (s *XnetMigrationTaskStoreImpl) ListXnetMigrationTasksByStatus(ctx context.Context, status types.XnetMigrationTaskStatus) ([]*XnetMigrationTask, error) { + var tasks []*XnetMigrationTask + err := s.db.Operator.Core.NewSelect().Model(&tasks).Where("status = ?", status).Scan(ctx) + return tasks, err +} + +func (s *XnetMigrationTaskStoreImpl) ListXnetMigrationTasksByRepoID(ctx context.Context, repoID int64) ([]*XnetMigrationTask, error) { + var tasks []*XnetMigrationTask + err := s.db.Operator.Core.NewSelect().Model(&tasks).Where("repository_id = ?", repoID).Scan(ctx) + return tasks, err +} diff --git a/common/types/lfs.go b/common/types/lfs.go index d538c5e81..2455c00fe 100644 --- a/common/types/lfs.go +++ b/common/types/lfs.go @@ -123,3 +123,16 @@ type XnetReq struct { ObjectKey string `json:"object_key"` Size int64 `json:"size"` } + +type XnetMigrationTaskProgress struct { + LfsProgress []LfsProgress `json:"lfs_progress"` + Status XnetMigrationTaskStatus `json:"status"` + LastMessage string `json:"last_message"` + TotalObjects int `json:"total_objects"` + MigratedObjects int `json:"migrated_objects"` +} + +type LfsProgress struct { + Oid string `json:"oid"` + Progress int `json:"progress"` +} diff --git a/common/types/xnet.go b/common/types/xnet.go index adff12031..ba214b0e8 100644 --- a/common/types/xnet.go +++ b/common/types/xnet.go @@ -30,3 +30,12 @@ type XnetDownloadURLResp struct { type XetFileExistsResp struct { Exists bool `json:"exists"` } + +type XnetMigrationTaskStatus string + +const ( + XnetMigrationTaskStatusPending XnetMigrationTaskStatus = "pending" + XnetMigrationTaskStatusRunning XnetMigrationTaskStatus = "running" + XnetMigrationTaskStatusCompleted XnetMigrationTaskStatus = "completed" + XnetMigrationTaskStatusFailed XnetMigrationTaskStatus = "failed" +) diff --git a/component/lfs.go b/component/lfs.go new file mode 100644 index 000000000..e4129257a --- /dev/null +++ b/component/lfs.go @@ -0,0 +1,9 @@ +package component + +import "context" + +type LfsComponent interface { + DispatchLfsXnetProgress() error + DispatchLfsXnetResult() error + PublishLfsMigrationMessage(ctx context.Context, repoID int64, oid string) error +} diff --git a/component/repo.go b/component/repo.go index 67dc2d70e..8e63673e6 100644 --- a/component/repo.go +++ b/component/repo.go @@ -37,6 +37,7 @@ import ( "opencsg.com/csghub-server/builder/git/mirrorserver" "opencsg.com/csghub-server/builder/multisync" "opencsg.com/csghub-server/builder/rpc" + storeCache "opencsg.com/csghub-server/builder/store/cache" "opencsg.com/csghub-server/builder/store/database" "opencsg.com/csghub-server/builder/store/s3" "opencsg.com/csghub-server/common/config" @@ -98,6 +99,9 @@ type repoComponentImpl struct { notificationSvcClient rpc.NotificationSvcClient mirrorSvcClient rpc.MirrorSvcClient xnetClient rpc.XnetSvcClient + lfsComponent LfsComponent + lfsCache storeCache.LfsCache + xnetMigrationTaskStore database.XnetMigrationTaskStore } type RepoComponent interface { @@ -192,6 +196,8 @@ type RepoComponent interface { CheckDeployPermissionForUser(ctx context.Context, deployReq types.DeployActReq) (*database.User, *database.Deploy, error) GetRepos(ctx context.Context, search, currentUser string, repoType types.RepositoryType) ([]string, error) IsXnetEnabled(ctx context.Context, repoType types.RepositoryType, namespace, name, username string) (*types.XetEnabled, error) + MigrateToXnet(ctx context.Context, repoType types.RepositoryType, namespace, name string) error + MigrateToXnetProgress(ctx context.Context, repoType types.RepositoryType, namespace, name string) (types.XnetMigrationTaskProgress, error) } func NewRepoComponentImpl(config *config.Config) (*repoComponentImpl, error) { diff --git a/component/repo_ce.go b/component/repo_ce.go index 1f51077fe..673f674a7 100644 --- a/component/repo_ce.go +++ b/component/repo_ce.go @@ -245,3 +245,12 @@ func (c *repoComponentImpl) mirrorFromSaasSync(ctx context.Context, mirror *data return nil } + +func (c *repoComponentImpl) MigrateToXnet(ctx context.Context, repoType types.RepositoryType, namespace, name string) error { + return nil +} + +func (c *repoComponentImpl) MigrateToXnetProgress(ctx context.Context, repoType types.RepositoryType, namespace, name string) (types.XnetMigrationTaskProgress, error) { + var p types.XnetMigrationTaskProgress + return p, nil +}