diff --git a/service/vc/state_import.go b/service/vc/state_import.go new file mode 100644 index 000000000..54733f703 --- /dev/null +++ b/service/vc/state_import.go @@ -0,0 +1,217 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package vc + +import ( + "context" + "encoding/binary" + "fmt" + + "github.com/cockroachdb/errors" + "github.com/hyperledger/fabric-x-common/api/committerpb" + "github.com/yugabyte/pgx/v5" +) + +// StateImporter provides bulk state import capabilities for initializing the +// committer's state database from an external source such as a Fabric peer snapshot. +// +// Unlike the normal transaction processing pipeline (preparer -> validator -> committer), +// imported state bypasses MVCC validation and endorsement verification since it originates +// from a trusted, pre-validated source. This makes it suitable for the bootstrap-from-snapshot +// startup mode where the committer initializes its state database from a migration data file. +// +// Usage: +// +// importer := NewStateImporter(db) +// _ = importer.InitSchema(ctx) +// _ = importer.CreateNamespace(ctx, "myns") +// count, _ := importer.ImportNamespaceState(ctx, "myns", iterator) +// _ = importer.SetBlockHeight(ctx, snapshotBlockNum) +// _ = importer.VerifyNamespaceRowCount(ctx, "myns", expectedCount) +type StateImporter struct { + db *database +} + +// NewStateImporter creates a StateImporter backed by the given database. +// This is intended for use within the vc package and tests. +func NewStateImporter(db *database) *StateImporter { + return &StateImporter{db: db} +} + +// NewStateImporterFromConfig creates a StateImporter by connecting to the database +// described by the given configuration. The caller is responsible for calling Close() +// when the importer is no longer needed. +func NewStateImporterFromConfig(ctx context.Context, config *DatabaseConfig) (*StateImporter, error) { + metrics := newVCServiceMetrics() + db, err := newDatabase(ctx, config, metrics) + if err != nil { + return nil, errors.Wrap(err, "failed to create database for state import") + } + return &StateImporter{db: db}, nil +} + +// Close releases the database connection pool held by the importer. +func (si *StateImporter) Close() { + si.db.close() +} + +// StateIterator provides a streaming interface for reading key-value-version +// triples during bulk state import. Implementations should return data in +// whatever order the source provides; the importer does not require sorted input. +// +// Callers must check Err() after Next() returns false to distinguish EOF from failure. +type StateIterator interface { + // Next advances the iterator to the next record. Returns false when exhausted or on error. + Next() bool + // Key returns the current record's key. Only valid after a successful Next() call. + Key() []byte + // Value returns the current record's value. Only valid after a successful Next() call. + Value() []byte + // Version returns the current record's MVCC version. Only valid after a successful Next() call. + Version() uint64 + // Err returns any error encountered during iteration. + Err() error +} + +// ImportSummary contains statistics collected during a bulk import operation. +// It can be used with VerifyImport to validate integrity after import completes. +type ImportSummary struct { + // NamespaceRowCounts maps each namespace ID to the number of rows imported. + NamespaceRowCounts map[string]int64 + // BlockNumber is the snapshot block height that was set during import. + BlockNumber uint64 + // TotalKeys is the sum of all rows across all namespaces. + TotalKeys int64 +} + +// InitSchema creates the system tables (metadata, tx_status) and system namespace +// tables (__meta, __config). This must be called before any other import operation. +func (si *StateImporter) InitSchema(ctx context.Context) error { + return si.db.setupSystemTablesAndNamespaces(ctx) +} + +// CreateNamespace creates the table and stored functions for a user-defined namespace. +// The namespace must not already exist. System namespaces (__meta, __config) are created +// by InitSchema and should not be passed here. +func (si *StateImporter) CreateNamespace(ctx context.Context, nsID string) error { + return createNsTables(nsID, si.db.tablePreSplitTablets, func(q string) error { + _, err := si.db.pool.Exec(ctx, q) + return err + }) +} + +// ImportNamespaceState bulk-inserts key-value-version state into a namespace table +// using PostgreSQL's COPY protocol for high throughput. +// +// The namespace table must already exist (created via CreateNamespace or InitSchema). +// Returns the number of rows imported. The iterator is consumed fully; callers should +// check iter.Err() after this method returns for any source-side errors. +func (si *StateImporter) ImportNamespaceState( + ctx context.Context, + nsID string, + iter StateIterator, +) (int64, error) { + tableName := pgx.Identifier{TableName(nsID)} + columns := []string{"key", "value", "version"} + + src := ©FromIterator{iter: iter} + count, err := si.db.pool.CopyFrom(ctx, tableName, columns, src) + if err != nil { + return 0, errors.Wrapf(err, "COPY into namespace [%s] failed", nsID) + } + + if iterErr := iter.Err(); iterErr != nil { + return count, errors.Wrapf(iterErr, "iterator error after importing namespace [%s]", nsID) + } + + logger.Infof("Imported %d rows into namespace [%s]", count, nsID) + return count, nil +} + +// ImportPolicy inserts a namespace policy entry into the __meta system namespace. +// This is used during snapshot import to register the endorsement policies for +// each migrated namespace so that the coordinator can recover them on startup. +func (si *StateImporter) ImportPolicy(ctx context.Context, nsID string, policy []byte) error { + q := FmtNsID(insertNsStatesSQLTempl, committerpb.MetaNamespaceID) + ret := si.db.pool.QueryRow(ctx, q, [][]byte{[]byte(nsID)}, [][]byte{policy}) + violating, err := readArrayResult[[]byte](ret) + if err != nil { + return errors.Wrapf(err, "failed to import policy for namespace [%s]", nsID) + } + if len(violating) > 0 { + return errors.Newf("policy for namespace [%s] already exists", nsID) + } + return nil +} + +// SetBlockHeight sets the last committed block number in the metadata table. +// After import, the committer will resume processing from blockNumber+1. +func (si *StateImporter) SetBlockHeight(ctx context.Context, blockNumber uint64) error { + v := make([]byte, 8) + binary.BigEndian.PutUint64(v, blockNumber) + _, err := si.db.pool.Exec(ctx, setMetadataPrepSQLStmt, lastCommittedBlockNumberKey, v) + return errors.Wrap(err, "failed to set block height after import") +} + +// VerifyNamespaceRowCount checks that the actual number of rows in a namespace table +// matches the expected count. Returns nil if they match, or an error describing the mismatch. +func (si *StateImporter) VerifyNamespaceRowCount(ctx context.Context, nsID string, expected int64) error { + var actual int64 + query := fmt.Sprintf("SELECT count(*) FROM %s", TableName(nsID)) + if err := si.db.pool.QueryRow(ctx, query).Scan(&actual); err != nil { + return errors.Wrapf(err, "failed to count rows in namespace [%s]", nsID) + } + if actual != expected { + return errors.Newf( + "row count mismatch for namespace [%s]: expected %d, got %d", + nsID, expected, actual, + ) + } + return nil +} + +// VerifyImport validates the integrity of a completed import by checking row counts +// for every namespace in the summary and verifying the stored block height. +func (si *StateImporter) VerifyImport(ctx context.Context, expected *ImportSummary) error { + for nsID, expectedCount := range expected.NamespaceRowCounts { + if err := si.VerifyNamespaceRowCount(ctx, nsID, expectedCount); err != nil { + return err + } + } + + blkRef, err := si.db.getNextBlockNumberToCommit(ctx) + if err != nil { + return errors.Wrap(err, "failed to read block height during verification") + } + + // getNextBlockNumberToCommit returns lastCommitted+1, so we compare against expected+1. + if blkRef.Number != expected.BlockNumber+1 { + return errors.Newf( + "block height mismatch: expected next block %d, got %d", + expected.BlockNumber+1, blkRef.Number, + ) + } + + return nil +} + +// copyFromIterator adapts a StateIterator to pgx.CopyFromSource. +type copyFromIterator struct { + iter StateIterator +} + +func (c *copyFromIterator) Next() bool { + return c.iter.Next() +} + +func (c *copyFromIterator) Values() ([]any, error) { + return []any{c.iter.Key(), c.iter.Value(), int64(c.iter.Version())}, nil //nolint:gosec +} + +func (c *copyFromIterator) Err() error { + return c.iter.Err() +} diff --git a/service/vc/state_import_test.go b/service/vc/state_import_test.go new file mode 100644 index 000000000..1da79ddd6 --- /dev/null +++ b/service/vc/state_import_test.go @@ -0,0 +1,372 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package vc + +import ( + "fmt" + "testing" + + "github.com/hyperledger/fabric-x-common/api/committerpb" + "github.com/stretchr/testify/require" +) + +// sliceIterator is a test helper that implements StateIterator over in-memory slices. +type sliceIterator struct { + keys [][]byte + values [][]byte + versions []uint64 + pos int + err error +} + +func newSliceIterator(keys, values [][]byte, versions []uint64) *sliceIterator { + return &sliceIterator{keys: keys, values: values, versions: versions, pos: -1} +} + +func (s *sliceIterator) Next() bool { + if s.err != nil { + return false + } + s.pos++ + return s.pos < len(s.keys) +} + +func (s *sliceIterator) Key() []byte { return s.keys[s.pos] } +func (s *sliceIterator) Value() []byte { return s.values[s.pos] } +func (s *sliceIterator) Version() uint64 { return s.versions[s.pos] } +func (s *sliceIterator) Err() error { return s.err } + +// emptyIterator returns no records. +type emptyIterator struct{} + +func (*emptyIterator) Next() bool { return false } +func (*emptyIterator) Key() []byte { return nil } +func (*emptyIterator) Value() []byte { return nil } +func (*emptyIterator) Version() uint64 { return 0 } +func (*emptyIterator) Err() error { return nil } + +func newImporterWithSchema(t *testing.T) (*StateImporter, *DatabaseTestEnv) { + t.Helper() + env := NewDatabaseTestEnv(t) + ctx, _ := createContext(t) + importer := NewStateImporter(env.DB) + require.NoError(t, importer.InitSchema(ctx)) + return importer, env +} + +func TestStateImporter_InitSchema(t *testing.T) { + t.Parallel() + env := NewDatabaseTestEnv(t) + ctx, _ := createContext(t) + importer := NewStateImporter(env.DB) + + require.NoError(t, importer.InitSchema(ctx)) + + // Verify system tables exist. + tables := readTables(t, env.DB.pool) + require.Contains(t, tables, "metadata") + require.Contains(t, tables, "tx_status") + require.Contains(t, tables, TableName(committerpb.MetaNamespaceID)) + require.Contains(t, tables, TableName(committerpb.ConfigNamespaceID)) +} + +func TestStateImporter_CreateNamespace(t *testing.T) { + t.Parallel() + importer, env := newImporterWithSchema(t) + ctx, _ := createContext(t) + + require.NoError(t, importer.CreateNamespace(ctx, "myns")) + + tables := readTables(t, env.DB.pool) + require.Contains(t, tables, TableName("myns")) + + methods := readMethods(t, env.DB.pool) + require.Contains(t, methods, "insert_ns_myns") + require.Contains(t, methods, "update_ns_myns") + require.Contains(t, methods, "validate_reads_ns_myns") +} + +func TestStateImporter_ImportNamespaceState(t *testing.T) { + t.Parallel() + importer, env := newImporterWithSchema(t) + ctx, _ := createContext(t) + + require.NoError(t, importer.CreateNamespace(ctx, "importns")) + + keys := [][]byte{[]byte("k1"), []byte("k2"), []byte("k3")} + values := [][]byte{[]byte("v1"), []byte("v2"), []byte("v3")} + versions := []uint64{5, 10, 15} + + iter := newSliceIterator(keys, values, versions) + count, err := importer.ImportNamespaceState(ctx, "importns", iter) + require.NoError(t, err) + require.Equal(t, int64(3), count) + + // Verify the data was written correctly. + rows := env.FetchKeys(t, "importns", keys) + require.Len(t, rows, 3) + require.Equal(t, []byte("v1"), rows["k1"].Value) + require.Equal(t, uint64(5), rows["k1"].Version) + require.Equal(t, []byte("v2"), rows["k2"].Value) + require.Equal(t, uint64(10), rows["k2"].Version) + require.Equal(t, []byte("v3"), rows["k3"].Value) + require.Equal(t, uint64(15), rows["k3"].Version) +} + +func TestStateImporter_ImportNamespaceState_Empty(t *testing.T) { + t.Parallel() + importer, _ := newImporterWithSchema(t) + ctx, _ := createContext(t) + + require.NoError(t, importer.CreateNamespace(ctx, "emptyns")) + + count, err := importer.ImportNamespaceState(ctx, "emptyns", &emptyIterator{}) + require.NoError(t, err) + require.Equal(t, int64(0), count) +} + +func TestStateImporter_ImportNamespaceState_LargeBatch(t *testing.T) { + t.Parallel() + importer, _ := newImporterWithSchema(t) + ctx, _ := createContext(t) + + require.NoError(t, importer.CreateNamespace(ctx, "largens")) + + const numKeys = 10_000 + keys := make([][]byte, numKeys) + values := make([][]byte, numKeys) + versions := make([]uint64, numKeys) + for i := range numKeys { + keys[i] = []byte(fmt.Sprintf("key-%06d", i)) + values[i] = []byte(fmt.Sprintf("val-%06d", i)) + versions[i] = uint64(i) + } + + iter := newSliceIterator(keys, values, versions) + count, err := importer.ImportNamespaceState(ctx, "largens", iter) + require.NoError(t, err) + require.Equal(t, int64(numKeys), count) + + // Spot-check a few rows. + require.NoError(t, importer.VerifyNamespaceRowCount(ctx, "largens", numKeys)) +} + +func TestStateImporter_ImportNamespaceState_NonexistentTable(t *testing.T) { + t.Parallel() + importer, _ := newImporterWithSchema(t) + ctx, _ := createContext(t) + + // Table was never created — import should fail. + iter := newSliceIterator([][]byte{[]byte("k")}, [][]byte{[]byte("v")}, []uint64{0}) + _, err := importer.ImportNamespaceState(ctx, "no_such_ns", iter) + require.Error(t, err) + require.Contains(t, err.Error(), "no_such_ns") +} + +func TestStateImporter_ImportPolicy(t *testing.T) { + t.Parallel() + importer, env := newImporterWithSchema(t) + ctx, _ := createContext(t) + + require.NoError(t, importer.CreateNamespace(ctx, "policyns")) + policyBytes := []byte("serialized-policy-data") + + require.NoError(t, importer.ImportPolicy(ctx, "policyns", policyBytes)) + + // Verify policy was written to __meta namespace. + rows := env.FetchKeys(t, committerpb.MetaNamespaceID, [][]byte{[]byte("policyns")}) + require.Len(t, rows, 1) + require.Equal(t, policyBytes, rows["policyns"].Value) +} + +func TestStateImporter_ImportPolicy_Duplicate(t *testing.T) { + t.Parallel() + importer, _ := newImporterWithSchema(t) + ctx, _ := createContext(t) + + require.NoError(t, importer.ImportPolicy(ctx, "dupns", []byte("policy1"))) + + // Second import for the same namespace should fail. + err := importer.ImportPolicy(ctx, "dupns", []byte("policy2")) + require.Error(t, err) + require.Contains(t, err.Error(), "already exists") +} + +func TestStateImporter_SetBlockHeight(t *testing.T) { + t.Parallel() + importer, env := newImporterWithSchema(t) + ctx, _ := createContext(t) + + require.NoError(t, importer.SetBlockHeight(ctx, 42)) + + // Verify via the existing database method. + blkRef, err := env.DB.getNextBlockNumberToCommit(ctx) + require.NoError(t, err) + require.Equal(t, uint64(43), blkRef.Number) // next = last + 1 +} + +func TestStateImporter_SetBlockHeight_Zero(t *testing.T) { + t.Parallel() + importer, env := newImporterWithSchema(t) + ctx, _ := createContext(t) + + require.NoError(t, importer.SetBlockHeight(ctx, 0)) + + blkRef, err := env.DB.getNextBlockNumberToCommit(ctx) + require.NoError(t, err) + require.Equal(t, uint64(1), blkRef.Number) +} + +func TestStateImporter_VerifyNamespaceRowCount_Match(t *testing.T) { + t.Parallel() + importer, _ := newImporterWithSchema(t) + ctx, _ := createContext(t) + + require.NoError(t, importer.CreateNamespace(ctx, "verifyns")) + + iter := newSliceIterator( + [][]byte{[]byte("a"), []byte("b")}, + [][]byte{[]byte("1"), []byte("2")}, + []uint64{0, 0}, + ) + _, err := importer.ImportNamespaceState(ctx, "verifyns", iter) + require.NoError(t, err) + + require.NoError(t, importer.VerifyNamespaceRowCount(ctx, "verifyns", 2)) +} + +func TestStateImporter_VerifyNamespaceRowCount_Mismatch(t *testing.T) { + t.Parallel() + importer, _ := newImporterWithSchema(t) + ctx, _ := createContext(t) + + require.NoError(t, importer.CreateNamespace(ctx, "mismatchns")) + + iter := newSliceIterator( + [][]byte{[]byte("a")}, + [][]byte{[]byte("1")}, + []uint64{0}, + ) + _, err := importer.ImportNamespaceState(ctx, "mismatchns", iter) + require.NoError(t, err) + + err = importer.VerifyNamespaceRowCount(ctx, "mismatchns", 5) + require.Error(t, err) + require.Contains(t, err.Error(), "row count mismatch") + require.Contains(t, err.Error(), "expected 5") + require.Contains(t, err.Error(), "got 1") +} + +func TestStateImporter_VerifyImport(t *testing.T) { + t.Parallel() + importer, _ := newImporterWithSchema(t) + ctx, _ := createContext(t) + + require.NoError(t, importer.CreateNamespace(ctx, "ns_a")) + require.NoError(t, importer.CreateNamespace(ctx, "ns_b")) + + iterA := newSliceIterator( + [][]byte{[]byte("k1"), []byte("k2")}, + [][]byte{[]byte("v1"), []byte("v2")}, + []uint64{0, 1}, + ) + _, err := importer.ImportNamespaceState(ctx, "ns_a", iterA) + require.NoError(t, err) + + iterB := newSliceIterator( + [][]byte{[]byte("k3")}, + [][]byte{[]byte("v3")}, + []uint64{0}, + ) + _, err = importer.ImportNamespaceState(ctx, "ns_b", iterB) + require.NoError(t, err) + + require.NoError(t, importer.SetBlockHeight(ctx, 100)) + + summary := &ImportSummary{ + NamespaceRowCounts: map[string]int64{ + "ns_a": 2, + "ns_b": 1, + }, + BlockNumber: 100, + TotalKeys: 3, + } + + require.NoError(t, importer.VerifyImport(ctx, summary)) +} + +func TestStateImporter_VerifyImport_BlockHeightMismatch(t *testing.T) { + t.Parallel() + importer, _ := newImporterWithSchema(t) + ctx, _ := createContext(t) + + require.NoError(t, importer.SetBlockHeight(ctx, 50)) + + summary := &ImportSummary{ + NamespaceRowCounts: map[string]int64{}, + BlockNumber: 99, // does not match + } + + err := importer.VerifyImport(ctx, summary) + require.Error(t, err) + require.Contains(t, err.Error(), "block height mismatch") +} + +func TestStateImporter_EndToEnd_MultiNamespace(t *testing.T) { + t.Parallel() + importer, env := newImporterWithSchema(t) + ctx, _ := createContext(t) + + // Simulate importing a Fabric snapshot with 3 namespaces. + namespaces := []string{"chaincode1", "chaincode2", "lifecycle"} + for _, ns := range namespaces { + require.NoError(t, importer.CreateNamespace(ctx, ns)) + require.NoError(t, importer.ImportPolicy(ctx, ns, []byte("policy-for-"+ns))) + } + + // Import state for each namespace. + summary := &ImportSummary{ + NamespaceRowCounts: make(map[string]int64), + } + + for i, ns := range namespaces { + numKeys := (i + 1) * 100 + keys := make([][]byte, numKeys) + values := make([][]byte, numKeys) + versions := make([]uint64, numKeys) + for j := range numKeys { + keys[j] = []byte(fmt.Sprintf("%s-key-%04d", ns, j)) + values[j] = []byte(fmt.Sprintf("%s-val-%04d", ns, j)) + versions[j] = uint64(j) + } + + count, err := importer.ImportNamespaceState(ctx, ns, newSliceIterator(keys, values, versions)) + require.NoError(t, err) + require.Equal(t, int64(numKeys), count) + summary.NamespaceRowCounts[ns] = int64(numKeys) + summary.TotalKeys += int64(numKeys) + } + + // Set block height. + summary.BlockNumber = 500 + require.NoError(t, importer.SetBlockHeight(ctx, summary.BlockNumber)) + + // Full verification. + require.NoError(t, importer.VerifyImport(ctx, summary)) + + // Verify policies are readable by the existing readNamespacePolicies method. + policies, err := env.DB.readNamespacePolicies(ctx) + require.NoError(t, err) + require.Len(t, policies.Policies, len(namespaces)) + + // Verify data survives and is usable by the normal pipeline — spot-check reads. + for _, ns := range namespaces { + rows := env.FetchKeys(t, ns, [][]byte{[]byte(ns + "-key-0000")}) + require.Len(t, rows, 1) + require.Equal(t, []byte(ns+"-val-0000"), rows[ns+"-key-0000"].Value) + } +} diff --git a/service/vc/test_exports.go b/service/vc/test_exports.go index bbeacbdb8..e730e3d14 100644 --- a/service/vc/test_exports.go +++ b/service/vc/test_exports.go @@ -356,6 +356,11 @@ func (env *DatabaseTestEnv) rowExists(t *testing.T, nsID string, exp namespaceWr } } +// NewStateImporterForTest creates a StateImporter backed by the test environment's database. +func (env *DatabaseTestEnv) NewStateImporterForTest() *StateImporter { + return NewStateImporter(env.DB) +} + func (env *DatabaseTestEnv) rowNotExists(t *testing.T, nsID string, keys [][]byte) { t.Helper() actualRows := env.FetchKeys(t, nsID, keys)