diff --git a/_mocks/opencsg.com/csghub-server/component/mock_LfsComponent.go b/_mocks/opencsg.com/csghub-server/component/mock_LfsComponent.go new file mode 100644 index 00000000..3a8468e7 --- /dev/null +++ b/_mocks/opencsg.com/csghub-server/component/mock_LfsComponent.go @@ -0,0 +1,175 @@ +// Code generated by mockery v2.53.5. DO NOT EDIT. + +package component + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" + database "opencsg.com/csghub-server/builder/store/database" +) + +// MockLfsComponent is an autogenerated mock type for the LfsComponent type +type MockLfsComponent struct { + mock.Mock +} + +type MockLfsComponent_Expecter struct { + mock *mock.Mock +} + +func (_m *MockLfsComponent) EXPECT() *MockLfsComponent_Expecter { + return &MockLfsComponent_Expecter{mock: &_m.Mock} +} + +// DispatchLfsXnetProgress provides a mock function with no fields +func (_m *MockLfsComponent) DispatchLfsXnetProgress() error { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for DispatchLfsXnetProgress") + } + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockLfsComponent_DispatchLfsXnetProgress_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DispatchLfsXnetProgress' +type MockLfsComponent_DispatchLfsXnetProgress_Call struct { + *mock.Call +} + +// DispatchLfsXnetProgress is a helper method to define mock.On call +func (_e *MockLfsComponent_Expecter) DispatchLfsXnetProgress() *MockLfsComponent_DispatchLfsXnetProgress_Call { + return &MockLfsComponent_DispatchLfsXnetProgress_Call{Call: _e.mock.On("DispatchLfsXnetProgress")} +} + +func (_c *MockLfsComponent_DispatchLfsXnetProgress_Call) Run(run func()) *MockLfsComponent_DispatchLfsXnetProgress_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockLfsComponent_DispatchLfsXnetProgress_Call) Return(_a0 error) *MockLfsComponent_DispatchLfsXnetProgress_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockLfsComponent_DispatchLfsXnetProgress_Call) RunAndReturn(run func() error) *MockLfsComponent_DispatchLfsXnetProgress_Call { + _c.Call.Return(run) + return _c +} + +// DispatchLfsXnetResult provides a mock function with no fields +func (_m *MockLfsComponent) DispatchLfsXnetResult() error { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for DispatchLfsXnetResult") + } + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockLfsComponent_DispatchLfsXnetResult_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DispatchLfsXnetResult' +type MockLfsComponent_DispatchLfsXnetResult_Call struct { + *mock.Call +} + +// DispatchLfsXnetResult is a helper method to define mock.On call +func (_e *MockLfsComponent_Expecter) DispatchLfsXnetResult() *MockLfsComponent_DispatchLfsXnetResult_Call { + return &MockLfsComponent_DispatchLfsXnetResult_Call{Call: _e.mock.On("DispatchLfsXnetResult")} +} + +func (_c *MockLfsComponent_DispatchLfsXnetResult_Call) Run(run func()) *MockLfsComponent_DispatchLfsXnetResult_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockLfsComponent_DispatchLfsXnetResult_Call) Return(_a0 error) *MockLfsComponent_DispatchLfsXnetResult_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockLfsComponent_DispatchLfsXnetResult_Call) RunAndReturn(run func() error) *MockLfsComponent_DispatchLfsXnetResult_Call { + _c.Call.Return(run) + return _c +} + +// PublishLfsMigrationMessage provides a mock function with given fields: ctx, repo, oid +func (_m *MockLfsComponent) PublishLfsMigrationMessage(ctx context.Context, repo *database.Repository, oid string) error { + ret := _m.Called(ctx, repo, oid) + + if len(ret) == 0 { + panic("no return value specified for PublishLfsMigrationMessage") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *database.Repository, string) error); ok { + r0 = rf(ctx, repo, oid) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockLfsComponent_PublishLfsMigrationMessage_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PublishLfsMigrationMessage' +type MockLfsComponent_PublishLfsMigrationMessage_Call struct { + *mock.Call +} + +// PublishLfsMigrationMessage is a helper method to define mock.On call +// - ctx context.Context +// - repo *database.Repository +// - oid string +func (_e *MockLfsComponent_Expecter) PublishLfsMigrationMessage(ctx interface{}, repo interface{}, oid interface{}) *MockLfsComponent_PublishLfsMigrationMessage_Call { + return &MockLfsComponent_PublishLfsMigrationMessage_Call{Call: _e.mock.On("PublishLfsMigrationMessage", ctx, repo, oid)} +} + +func (_c *MockLfsComponent_PublishLfsMigrationMessage_Call) Run(run func(ctx context.Context, repo *database.Repository, oid string)) *MockLfsComponent_PublishLfsMigrationMessage_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*database.Repository), args[2].(string)) + }) + return _c +} + +func (_c *MockLfsComponent_PublishLfsMigrationMessage_Call) Return(_a0 error) *MockLfsComponent_PublishLfsMigrationMessage_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockLfsComponent_PublishLfsMigrationMessage_Call) RunAndReturn(run func(context.Context, *database.Repository, string) error) *MockLfsComponent_PublishLfsMigrationMessage_Call { + _c.Call.Return(run) + return _c +} + +// NewMockLfsComponent creates a new instance of MockLfsComponent. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockLfsComponent(t interface { + mock.TestingT + Cleanup(func()) +}) *MockLfsComponent { + mock := &MockLfsComponent{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/builder/store/database/lfs_meta_object_test.go b/builder/store/database/lfs_meta_object_test.go index 82291395..a5954d76 100644 --- a/builder/store/database/lfs_meta_object_test.go +++ b/builder/store/database/lfs_meta_object_test.go @@ -84,3 +84,101 @@ func TestLfsMetaStore_CRUD(t *testing.T) { require.NotNil(t, err) } + +func TestLfsMetaStore_UpdateXnetUsed(t *testing.T) { + db := tests.InitTestDB() + defer db.Close() + ctx := context.TODO() + + store := database.NewLfsMetaObjectStoreWithDB(db) + + // Create a test LFS object + _, err := store.Create(ctx, database.LfsMetaObject{ + RepositoryID: 123, + Oid: "test-oid-123", + Size: 1024, + XnetUsed: false, + }) + require.Nil(t, err) + + // Verify initial state + obj, err := store.FindByOID(ctx, 123, "test-oid-123") + require.Nil(t, err) + require.Equal(t, "test-oid-123", obj.Oid) + require.Equal(t, false, obj.XnetUsed) + + // Update XnetUsed to true + err = store.UpdateXnetUsed(ctx, 123, "test-oid-123", true) + require.Nil(t, err) + + // Verify update + obj, err = store.FindByOID(ctx, 123, "test-oid-123") + require.Nil(t, err) + require.Equal(t, true, obj.XnetUsed) + + // Update XnetUsed back to false + err = store.UpdateXnetUsed(ctx, 123, "test-oid-123", false) + require.Nil(t, err) + + // Verify update + obj, err = store.FindByOID(ctx, 123, "test-oid-123") + require.Nil(t, err) + require.Equal(t, false, obj.XnetUsed) + + // Test updating non-existent object (should not error but affect 0 rows) + err = store.UpdateXnetUsed(ctx, 999, "non-existent-oid", true) + require.Nil(t, err) +} + +func TestLfsMetaStore_CheckIfAllMigratedToXnet(t *testing.T) { + db := tests.InitTestDB() + defer db.Close() + ctx := context.TODO() + + store := database.NewLfsMetaObjectStoreWithDB(db) + + // Test with no objects + allMigrated, err := store.CheckIfAllMigratedToXnet(ctx, 123) + require.Nil(t, err) + require.True(t, allMigrated, "No objects should return true") + + // Create test objects + testCases := []database.LfsMetaObject{ + {RepositoryID: 123, Oid: "oid-1", Size: 1024, XnetUsed: false}, + {RepositoryID: 123, Oid: "oid-2", Size: 2048, XnetUsed: false}, + {RepositoryID: 456, Oid: "oid-3", Size: 3072, XnetUsed: true}, // Different repo + } + + for _, tc := range testCases { + _, err := store.Create(ctx, tc) + require.Nil(t, err) + } + + // Test with some objects not migrated + allMigrated, err = store.CheckIfAllMigratedToXnet(ctx, 123) + require.Nil(t, err) + require.False(t, allMigrated, "Should return false when some objects are not migrated") + + // Test with all objects migrated in another repo + allMigrated, err = store.CheckIfAllMigratedToXnet(ctx, 456) + require.Nil(t, err) + require.True(t, allMigrated, "Should return true when all objects are migrated") + + // Update one object to migrated + err = store.UpdateXnetUsed(ctx, 123, "oid-1", true) + require.Nil(t, err) + + // Test with one migrated, one not migrated + allMigrated, err = store.CheckIfAllMigratedToXnet(ctx, 123) + require.Nil(t, err) + require.False(t, allMigrated, "Should return false when not all objects are migrated") + + // Update all objects to migrated + err = store.UpdateXnetUsed(ctx, 123, "oid-2", true) + require.Nil(t, err) + + // Test with all objects migrated + allMigrated, err = store.CheckIfAllMigratedToXnet(ctx, 123) + require.Nil(t, err) + require.True(t, allMigrated, "Should return true when all objects are migrated") +} diff --git a/builder/store/database/xnet_migration_task_test.go b/builder/store/database/xnet_migration_task_test.go new file mode 100644 index 00000000..60e00080 --- /dev/null +++ b/builder/store/database/xnet_migration_task_test.go @@ -0,0 +1,82 @@ +package database_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "opencsg.com/csghub-server/builder/store/database" + "opencsg.com/csghub-server/common/tests" + "opencsg.com/csghub-server/common/types" +) + +func TestXnetMigrationTaskStore_CRUD(t *testing.T) { + db := tests.InitTestDB() + defer db.Close() + ctx := context.TODO() + + store := database.NewXnetMigrationTaskStoreWithDB(db) + + // Test CreateXnetMigrationTask + err := store.CreateXnetMigrationTask(ctx, 123, "Initial task") + require.Nil(t, err) + + // Test ListXnetMigrationTasksByRepoID + tasks, err := store.ListXnetMigrationTasksByRepoID(ctx, 123) + require.Nil(t, err) + require.Len(t, tasks, 1) + require.Equal(t, int64(123), tasks[0].RepositoryID) + require.Equal(t, "Initial task", tasks[0].LastMessage) + require.Equal(t, types.XnetMigrationTaskStatusPending, tasks[0].Status) + + taskID := tasks[0].ID + + // Test GetXnetMigrationTaskByID + task, err := store.GetXnetMigrationTaskByID(ctx, taskID) + require.Nil(t, err) + require.Equal(t, taskID, task.ID) + require.Equal(t, int64(123), task.RepositoryID) + + // Test UpdateXnetMigrationTask + err = store.UpdateXnetMigrationTask(ctx, taskID, "Updated message", types.XnetMigrationTaskStatusRunning) + require.Nil(t, err) + + // Verify update + task, err = store.GetXnetMigrationTaskByID(ctx, taskID) + require.Nil(t, err) + require.Equal(t, "Updated message", task.LastMessage) + require.Equal(t, types.XnetMigrationTaskStatusRunning, task.Status) + + // Create more tasks with different statuses + err = store.CreateXnetMigrationTask(ctx, 456, "Task 2") + require.Nil(t, err) + tasks, err = store.ListXnetMigrationTasksByRepoID(ctx, 456) + require.Nil(t, err) + require.Len(t, tasks, 1) + err = store.UpdateXnetMigrationTask(ctx, tasks[0].ID, "Task 2 completed", types.XnetMigrationTaskStatusCompleted) + require.Nil(t, err) + + // Test ListXnetMigrationTasksByStatus + runningTasks, err := store.ListXnetMigrationTasksByStatus(ctx, types.XnetMigrationTaskStatusRunning) + require.Nil(t, err) + require.Len(t, runningTasks, 1) + require.Equal(t, "Updated message", runningTasks[0].LastMessage) + + completedTasks, err := store.ListXnetMigrationTasksByStatus(ctx, types.XnetMigrationTaskStatusCompleted) + require.Nil(t, err) + require.Len(t, completedTasks, 1) + require.Equal(t, "Task 2 completed", completedTasks[0].LastMessage) + + pendingTasks, err := store.ListXnetMigrationTasksByStatus(ctx, types.XnetMigrationTaskStatusPending) + require.Nil(t, err) + require.Len(t, pendingTasks, 0) + + // Test with non-existent repo ID + noTasks, err := store.ListXnetMigrationTasksByRepoID(ctx, 999) + require.Nil(t, err) + require.Len(t, noTasks, 0) + + // Test with non-existent task ID + _, err = store.GetXnetMigrationTaskByID(ctx, 999) + require.NotNil(t, err) +} diff --git a/component/lfs.go b/component/lfs.go new file mode 100644 index 00000000..6215450e --- /dev/null +++ b/component/lfs.go @@ -0,0 +1,13 @@ +package component + +import ( + "context" + + "opencsg.com/csghub-server/builder/store/database" +) + +type LfsComponent interface { + DispatchLfsXnetProgress() error + DispatchLfsXnetResult() error + PublishLfsMigrationMessage(ctx context.Context, repo *database.Repository, oid string) error +}