Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions daemon/daemon_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import (
"context"
"net/url"
"strconv"

"github.com/bsv-blockchain/teranode/errors"
Expand Down Expand Up @@ -30,7 +31,7 @@
mainBlockAssemblyClient blockassembly.ClientI
mainP2PClient p2p.ClientI
mainSubtreeStore blob.Store
mainBlockchainStore blockchainstore.Store
mainBlockchainStoreCache map[url.URL]blockchainstore.Store

Check failure on line 34 in daemon/daemon_stores.go

View workflow job for this annotation

GitHub Actions / golangci-lint

File is not properly formatted (gci)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using url.URL as a map key is problematic. URLs are structs with pointers and slices internally, which means two URLs with identical string representations may not be equal as map keys. This could lead to cache misses where the same URL creates multiple store instances.

Consider using the URL string representation as the map key instead:

mainBlockchainStoreCache map[string]blockchainstore.Store
// Then use: blockchainStoreURL.String() as the key

mainSubtreeValidationClient subtreevalidation.Interface
mainTempStore blob.Store
mainTxStore blob.Store
Expand Down Expand Up @@ -332,14 +333,17 @@
}

func (d *Stores) GetBlockchainStore(_ context.Context, logger ulogger.Logger, appSettings *settings.Settings) (blockchainstore.Store, error) {
if d.mainBlockchainStore != nil {
return d.mainBlockchainStore, nil
}

// Create the blockchain store url from the app settings
blockchainStoreURL := appSettings.BlockChain.StoreURL
if blockchainStoreURL == nil {
return nil, errors.NewStorageError("blockchain store url not found")
return nil, errors.NewStorageError("blockchain storeURL must be set")
}

if d.mainBlockchainStoreCache == nil {
d.mainBlockchainStoreCache = make(map[url.URL]blockchainstore.Store)
}

if store, found := d.mainBlockchainStoreCache[*blockchainStoreURL]; found {
return store, nil
}

// Create the blockchain store
Expand All @@ -348,7 +352,7 @@
return nil, err
}

d.mainBlockchainStore = blockchainStore
d.mainBlockchainStoreCache[*blockchainStoreURL] = blockchainStore
return blockchainStore, nil
}

Expand Down
42 changes: 21 additions & 21 deletions daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,12 @@ func TestDaemon_Start_AllServices(t *testing.T) {
// Setup dynamic ports for services to avoid conflicts
var p2pPort int

p2pPort, err = getFreePort()
p2pPort, err = GetFreePort()
require.NoError(t, err, "Failed to get free port for P2P")

var assetPort int

assetPort, err = getFreePort()
assetPort, err = GetFreePort()
require.NoError(t, err, "Failed to get free port for Asset")

// Configure settings - this will now pick up KAFKA_PORT and persister URLs from gocore.Config
Expand All @@ -242,116 +242,116 @@ func TestDaemon_Start_AllServices(t *testing.T) {

// Also set centrifuge port even though disabled, to avoid any initialization issues
var centrifugePort int
centrifugePort, err = getFreePort()
centrifugePort, err = GetFreePort()
require.NoError(t, err, "Failed to get free port for centrifuge")
appSettings.Asset.CentrifugeListenAddress = fmt.Sprintf(":%d", centrifugePort)

// Use dynamic port for health check to avoid conflicts in parallel tests
var healthCheckPort int
healthCheckPort, err = getFreePort()
healthCheckPort, err = GetFreePort()
require.NoError(t, err, "Failed to get free port for health check")
appSettings.HealthCheckHTTPListenAddress = fmt.Sprintf(":%d", healthCheckPort)

// Use dynamic ports for all service HTTP listeners to avoid conflicts
var blockchainHTTPPort int
blockchainHTTPPort, err = getFreePort()
blockchainHTTPPort, err = GetFreePort()
require.NoError(t, err, "Failed to get free port for blockchain HTTP")
appSettings.BlockChain.HTTPListenAddress = fmt.Sprintf(":%d", blockchainHTTPPort)

var validatorHTTPPort int
validatorHTTPPort, err = getFreePort()
validatorHTTPPort, err = GetFreePort()
require.NoError(t, err, "Failed to get free port for validator HTTP")
appSettings.Validator.HTTPListenAddress = fmt.Sprintf(":%d", validatorHTTPPort)
validatorHTTPURL, err := url.Parse(fmt.Sprintf("http://localhost:%d", validatorHTTPPort))
require.NoError(t, err, "Failed to parse validator HTTP URL")
appSettings.Validator.HTTPAddress = validatorHTTPURL

var propagationHTTPPort int
propagationHTTPPort, err = getFreePort()
propagationHTTPPort, err = GetFreePort()
require.NoError(t, err, "Failed to get free port for propagation HTTP")
appSettings.Propagation.HTTPListenAddress = fmt.Sprintf(":%d", propagationHTTPPort)
appSettings.Propagation.HTTPAddresses = []string{fmt.Sprintf("localhost:%d", propagationHTTPPort)}

var p2pHTTPPort int
p2pHTTPPort, err = getFreePort()
p2pHTTPPort, err = GetFreePort()
require.NoError(t, err, "Failed to get free port for p2p HTTP")
appSettings.P2P.HTTPListenAddress = fmt.Sprintf(":%d", p2pHTTPPort)
appSettings.P2P.HTTPAddress = fmt.Sprintf("localhost:%d", p2pHTTPPort)

var faucetHTTPPort int
faucetHTTPPort, err = getFreePort()
faucetHTTPPort, err = GetFreePort()
require.NoError(t, err, "Failed to get free port for faucet HTTP")
appSettings.Faucet.HTTPListenAddress = fmt.Sprintf(":%d", faucetHTTPPort)

var rpcPort int
rpcPort, err = getFreePort()
rpcPort, err = GetFreePort()
require.NoError(t, err, "Failed to get free port for RPC")
rpcURL, err := url.Parse(fmt.Sprintf("http://localhost:%d", rpcPort))
require.NoError(t, err, "Failed to parse RPC URL")
appSettings.RPC.RPCListenerURL = rpcURL

var persisterHTTPPort int
persisterHTTPPort, err = getFreePort()
persisterHTTPPort, err = GetFreePort()
require.NoError(t, err, "Failed to get free port for persister HTTP")
appSettings.BlockPersister.HTTPListenAddress = fmt.Sprintf(":%d", persisterHTTPPort)

// Use dynamic ports for all service gRPC listeners to avoid conflicts
var blockchainGRPCPort int
blockchainGRPCPort, err = getFreePort()
blockchainGRPCPort, err = GetFreePort()
require.NoError(t, err, "Failed to get free port for blockchain gRPC")
appSettings.BlockChain.GRPCListenAddress = fmt.Sprintf(":%d", blockchainGRPCPort)
appSettings.BlockChain.GRPCAddress = fmt.Sprintf("localhost:%d", blockchainGRPCPort)

var blockAssemblyGRPCPort int
blockAssemblyGRPCPort, err = getFreePort()
blockAssemblyGRPCPort, err = GetFreePort()
require.NoError(t, err, "Failed to get free port for block assembly gRPC")
appSettings.BlockAssembly.GRPCListenAddress = fmt.Sprintf(":%d", blockAssemblyGRPCPort)
appSettings.BlockAssembly.GRPCAddress = fmt.Sprintf("localhost:%d", blockAssemblyGRPCPort)

var blockValidationGRPCPort int
blockValidationGRPCPort, err = getFreePort()
blockValidationGRPCPort, err = GetFreePort()
require.NoError(t, err, "Failed to get free port for block validation gRPC")
appSettings.BlockValidation.GRPCListenAddress = fmt.Sprintf(":%d", blockValidationGRPCPort)
appSettings.BlockValidation.GRPCAddress = fmt.Sprintf("localhost:%d", blockValidationGRPCPort)

var validatorGRPCPort int
validatorGRPCPort, err = getFreePort()
validatorGRPCPort, err = GetFreePort()
require.NoError(t, err, "Failed to get free port for validator gRPC")
appSettings.Validator.GRPCListenAddress = fmt.Sprintf(":%d", validatorGRPCPort)
appSettings.Validator.GRPCAddress = fmt.Sprintf("localhost:%d", validatorGRPCPort)

var propagationGRPCPort int
propagationGRPCPort, err = getFreePort()
propagationGRPCPort, err = GetFreePort()
require.NoError(t, err, "Failed to get free port for propagation gRPC")
appSettings.Propagation.GRPCListenAddress = fmt.Sprintf(":%d", propagationGRPCPort)
appSettings.Propagation.GRPCAddresses = []string{fmt.Sprintf("localhost:%d", propagationGRPCPort)}

var p2pGRPCPort int
p2pGRPCPort, err = getFreePort()
p2pGRPCPort, err = GetFreePort()
require.NoError(t, err, "Failed to get free port for p2p gRPC")
appSettings.P2P.GRPCListenAddress = fmt.Sprintf(":%d", p2pGRPCPort)
appSettings.P2P.GRPCAddress = fmt.Sprintf("localhost:%d", p2pGRPCPort)

var subtreeValidationGRPCPort int
subtreeValidationGRPCPort, err = getFreePort()
subtreeValidationGRPCPort, err = GetFreePort()
require.NoError(t, err, "Failed to get free port for subtree validation gRPC")
appSettings.SubtreeValidation.GRPCListenAddress = fmt.Sprintf(":%d", subtreeValidationGRPCPort)
appSettings.SubtreeValidation.GRPCAddress = fmt.Sprintf("localhost:%d", subtreeValidationGRPCPort)

var legacyGRPCPort int
legacyGRPCPort, err = getFreePort()
legacyGRPCPort, err = GetFreePort()
require.NoError(t, err, "Failed to get free port for legacy gRPC")
appSettings.Legacy.GRPCListenAddress = fmt.Sprintf(":%d", legacyGRPCPort)
appSettings.Legacy.GRPCAddress = fmt.Sprintf("localhost:%d", legacyGRPCPort)

var coinbaseGRPCPort int
coinbaseGRPCPort, err = getFreePort()
coinbaseGRPCPort, err = GetFreePort()
require.NoError(t, err, "Failed to get free port for coinbase gRPC")
appSettings.Coinbase.GRPCListenAddress = fmt.Sprintf(":%d", coinbaseGRPCPort)
appSettings.Coinbase.GRPCAddress = fmt.Sprintf("localhost:%d", coinbaseGRPCPort)

var prunerGRPCPort int
prunerGRPCPort, err = getFreePort()
prunerGRPCPort, err = GetFreePort()
require.NoError(t, err, "Failed to get free port for pruner gRPC")
appSettings.Pruner.GRPCListenAddress = fmt.Sprintf(":%d", prunerGRPCPort)
appSettings.Pruner.GRPCAddress = fmt.Sprintf("localhost:%d", prunerGRPCPort)
Expand Down
51 changes: 30 additions & 21 deletions daemon/test_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -130,6 +129,7 @@
// Log format: [TestName:serviceName] LEVEL: message
// This provides consistent formatting and ensures all logs are captured by go test.
UseUnifiedLogger bool
SettingsContext string
}

// JSONError represents a JSON error response from the RPC server.
Expand All @@ -155,13 +155,18 @@
appSettings *settings.Settings
)

appSettings = settings.NewSettings() // This reads gocore.Config and applies sensible defaults
if opts.SettingsContext != "" {
appSettings = settings.NewSettings(opts.SettingsContext) // This reads gocore.Config and applies sensible defaults
} else {
appSettings = settings.NewSettings() // This reads gocore.Config and applies sensible defaults
}


Check failure on line 164 in daemon/test_daemon.go

View workflow job for this annotation

GitHub Actions / golangci-lint

File is not properly formatted (gci)
// Generate a unique context for this TestDaemon to ensure util.GetListener
// creates unique listeners instead of returning cached ones from another TestDaemon.
// The counter ensures uniqueness even when tests run in quick succession.
uniqueID := atomic.AddUint64(&testDaemonCounter, 1)
appSettings.Context = fmt.Sprintf("%s-testdaemon-%d", appSettings.Context, uniqueID)
// uniqueID := atomic.AddUint64(&testDaemonCounter, 1)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These commented-out lines that modify appSettings.Context appear to be important for test isolation. The comment explains they ensure unique listeners for each TestDaemon instance.

Without this code, multiple TestDaemons running concurrently might share cached listeners from util.GetListener, potentially causing port conflicts or cross-test interference.

Was this intentionally removed? If so, how is test isolation now ensured?

// appSettings.Context = fmt.Sprintf("%s-testdaemon-%d", appSettings.Context, uniqueID)

var (
listenAddr string
Expand Down Expand Up @@ -246,18 +251,19 @@
appSettings.Validator.HTTPAddress, _ = url.Parse(clientAddr)

// P2P - allocate port for libp2p (doesn't support pre-created listeners)
p2pPort, err := getFreePort()
require.NoError(t, err)
appSettings.P2P.StaticPeers = nil
appSettings.P2P.ListenAddresses = []string{"0.0.0.0"}
appSettings.P2P.Port = p2pPort


// P2P gRPC
// Just set the port from the settings override function
if opts.EnableP2P {
var tmpSettings settings.Settings
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code creates a temporary settings.Settings struct just to extract the P2P port from the override function. This is fragile because:

  1. The temporary settings will not have the full configuration context
  2. The override function may expect a properly initialized settings object
  3. If the override function has side effects or checks other fields, this could fail silently

Consider either:

  • Calling the override function after all port assignments
  • Adding the P2P port to TestOptions as an explicit field
  • Documenting that SettingsOverrideFunc should set P2P.Port and reading it after the override is applied to the real appSettings

opts.SettingsOverrideFunc(&tmpSettings)
// P2P gRPC uses a random port — the specified port is for libp2p
_, listenAddr, clientAddr, err = util.GetListener(appSettings.Context, "p2p", "", ":0")
require.NoError(t, err)
appSettings.P2P.GRPCListenAddress = listenAddr
appSettings.P2P.GRPCAddress = clientAddr
appSettings.P2P.Port = tmpSettings.P2P.Port
appSettings.P2P.ListenAddresses = []string{"0.0.0.0"}
}

if opts.EnableBlockPersister {
Expand Down Expand Up @@ -309,7 +315,7 @@
// Create a unique data directory per test
// Use test name to ensure each test has its own isolated directory
testName := strings.ReplaceAll(t.Name(), "/", "_")
appSettings.ClientName = testName
// appSettings.ClientName = testName

// Determine the data directory path
var path string
Expand Down Expand Up @@ -389,6 +395,8 @@
opts.SettingsOverrideFunc(appSettings)
}



// Initialize container manager for UTXO store if UTXOStoreType is specified
var containerManager *containers.ContainerManager
if opts.ContainerManager != nil {
Expand Down Expand Up @@ -789,8 +797,8 @@
return result
}

// getFreePort asks the kernel for a free open port that is ready to use.
func getFreePort() (int, error) {
// GetFreePort asks the kernel for a free open port that is ready to use.
func GetFreePort() (int, error) {
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
if err != nil {
return 0, err
Expand Down Expand Up @@ -1115,9 +1123,9 @@
// With GlobalBlockHeightRetention=1, transactions older than 1 block should be pruned.
// Current height is 10, so we wait for the pruner to complete its cycle.
t.Logf("Waiting for pruner to complete pruning cycle...")
prunedHeight, recordsProcessed, err := td.prunerObserver.waitForPrune(timeout)
prunedHeight, recordsProcessed, clientName, err := td.prunerObserver.waitForPrune(timeout)
require.NoError(t, err, "Timeout waiting for pruner to complete")
t.Logf("✓ Pruner completed pruning up to height %d, processed %d records", prunedHeight, recordsProcessed)
t.Logf("✓ Pruner completed pruning up to height %d, processed %d records, client: %s", prunedHeight, recordsProcessed, clientName)
}

// WaitForBlobDeletion waits for the blob deletion worker to complete processing.
Expand Down Expand Up @@ -2240,6 +2248,7 @@
type pruneEvent struct {
height uint32
recordsProcessed int64
clientName string
}

type testPrunerObserver struct {
Expand All @@ -2254,21 +2263,21 @@
}
}

func (o *testPrunerObserver) OnPruneComplete(height uint32, recordsProcessed int64) {
o.t.Logf("✓ Pruner callback invoked for height %d with %d records processed", height, recordsProcessed)
func (o *testPrunerObserver) OnPruneComplete(height uint32, recordsProcessed int64, clientName string) {
o.t.Logf("✓ Pruner callback invoked for height %d with %d records processed, client: %s", height, recordsProcessed, clientName)
select {
case o.pruneCompleted <- pruneEvent{height: height, recordsProcessed: recordsProcessed}:
case o.pruneCompleted <- pruneEvent{height: height, recordsProcessed: recordsProcessed, clientName: clientName}:
default:
o.t.Logf("Warning: pruneCompleted channel is full, dropping event for height %d", height)
}
}

func (o *testPrunerObserver) waitForPrune(timeout time.Duration) (uint32, int64, error) {
func (o *testPrunerObserver) waitForPrune(timeout time.Duration) (uint32, int64, string, error) {
select {
case event := <-o.pruneCompleted:
return event.height, event.recordsProcessed, nil
return event.height, event.recordsProcessed, event.clientName, nil
case <-time.After(timeout):
return 0, 0, errors.NewProcessingError("timeout waiting for prune completion")
return 0, 0, "", errors.NewProcessingError("timeout waiting for prune completion")
}
}

Expand Down
2 changes: 0 additions & 2 deletions services/blockchain/Client.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@
FSMEventRUN = blockchain_api.FSMEventType_RUN
FSMEventCATCHUPBLOCKS = blockchain_api.FSMEventType_CATCHUPBLOCKS
FSMEventLEGACYSYNC = blockchain_api.FSMEventType_LEGACYSYNC
)

Check failure on line 84 in services/blockchain/Client.go

View workflow job for this annotation

GitHub Actions / golangci-lint

File is not properly formatted (gci)

// NewClient creates a new blockchain client with default address settings.
func NewClient(ctx context.Context, logger ulogger.Logger, tSettings *settings.Settings, source string) (ClientI, error) {
logger = logger.New("blkcC")
Expand All @@ -91,7 +90,6 @@
if blockchainGrpcAddress == "" {
return nil, errors.NewConfigurationError("no blockchain_grpcAddress setting found")
}

return NewClientWithAddress(ctx, logger, tSettings, blockchainGrpcAddress, source)
}

Expand Down
2 changes: 1 addition & 1 deletion services/blockpersister/streaming_process_subtree.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func (u *Server) readSubtree(ctx context.Context, subtreeHash chainhash.Hash) (*
if err != nil {
subtreeReader, err = u.subtreeStore.GetIoReader(ctx, subtreeHash.CloneBytes(), fileformat.FileTypeSubtreeToCheck)
if err != nil {
return nil, errors.NewStorageError("[BlockPersister] failed to get subtree from store", err)
return nil, errors.NewStorageError("[BlockPersister] failed to get subtree %s from store", subtreeHash.String(), err)
}
}

Expand Down
Loading
Loading