Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions cmd/config/app_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,20 +379,20 @@ func TestReadConfigLoadGen(t *testing.T) {
Block: workload.BlockProfile{Size: 500},
Transaction: workload.TransactionProfile{
ReadWriteCount: workload.NewConstantDistribution(2),
Policy: &workload.PolicyProfile{
ChannelID: "mychannel",
NamespacePolicies: map[string]*workload.Policy{
workload.DefaultGeneratedNamespaceID: {
Scheme: signature.Ecdsa, Seed: 10,
},
committerpb.MetaNamespaceID: {
Scheme: signature.Ecdsa, Seed: 11,
},
},
Policy: workload.PolicyProfile{
ChannelID: "mychannel",
NamespacePolicies: map[string]*workload.Policy{
workload.DefaultGeneratedNamespaceID: {
Scheme: signature.Ecdsa, Seed: 10,
},
OrdererEndpoints: []*commontypes.OrdererEndpoint{
newOrdererEndpoint("org", "orderer"),
committerpb.MetaNamespaceID: {
Scheme: signature.Ecdsa, Seed: 11,
},
},
OrdererEndpoints: []*commontypes.OrdererEndpoint{
newOrdererEndpoint("org", "orderer"),
},
},
Conflicts: workload.ConflictProfile{
InvalidSignatures: 0.1,
Expand Down
25 changes: 14 additions & 11 deletions cmd/config/samples/loadgen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,20 @@ load-profile:
transaction:
read-write-count:
const: 2
policy:
channel-id: mychannel
namespace-policies:
0:
scheme: ECDSA
seed: 10
_meta:
scheme: ECDSA
seed: 11
orderer-endpoints:
- id=0,msp-id=org,broadcast,deliver,orderer:7050
policy:
channel-id: mychannel
namespace-policies:
0:
# Supported schemes: MSP, ECDSA, EDDSA, and BLS
# If MSP is used, the default policy is requiring
# endorsement by all peer organizations.
scheme: ECDSA
seed: 10
_meta:
scheme: ECDSA
seed: 11
orderer-endpoints:
- id=0,msp-id=org,broadcast,deliver,orderer:7050
conflicts:
invalid-signatures: 0.1
seed: 12345
Expand Down
18 changes: 9 additions & 9 deletions cmd/config/templates/loadgen_common.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ load-profile:
read-only-count:
read-write-count:
const: 2
policy:
channel-id: {{ .Policy.ChannelID }}
namespace-policies:
{{ range $nsID, $element := .Policy.NamespacePolicies }}
{{ $nsID }}:
scheme: {{ $element.Scheme }}
seed: {{ $element.Seed }}
{{ end }}
config-block-path: {{ .ConfigBlockPath }}
policy:
channel-id: {{ .Policy.ChannelID }}
namespace-policies:
{{ range $nsID, $element := .Policy.NamespacePolicies }}
{{ $nsID }}:
scheme: {{ $element.Scheme }}
seed: {{ $element.Seed }}
{{ end }}
config-block-path: {{ .ConfigBlockPath }}
query:
query-size:
min-invalid-keys-portion:
Expand Down
2 changes: 1 addition & 1 deletion cmd/loadgen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func loadGenGenesisBlock() *cobra.Command {
}
cmd.SilenceUsage = true

block, err := workload.CreateConfigBlock(conf.LoadProfile.Transaction.Policy)
block, err := workload.CreateConfigBlock(&conf.LoadProfile.Policy)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion docker/images/test_node/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ ENV SC_SIDECAR_COMMITTER_ENDPOINT=localhost:9001
ENV SC_QUERY_DATABASE_ENDPOINTS=localhost:5433
ENV SC_VC_DATABASE_ENDPOINTS=localhost:5433
ENV SC_LOADGEN_ORDERER_CLIENT_ORDERER_CONNECTION_ENDPOINTS=localhost:7050
ENV SC_LOADGEN_LOAD_PROFILE_TRANSACTION_POLICY_ORDERER_ENDPOINTS="id=0,msp-id=org,broadcast,deliver,localhost:7050"
ENV SC_LOADGEN_LOAD_PROFILE_POLICY_ORDERER_ENDPOINTS="id=0,msp-id=org,broadcast,deliver,localhost:7050"
ENV SC_LOADGEN_ORDERER_CLIENT_SIDECAR_CLIENT_ENDPOINT=localhost:4001

# Disable TLS usage for db.
Expand Down
3 changes: 2 additions & 1 deletion docker/test/container_release_image_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ func TestCommitterReleaseImagesWithTLS(t *testing.T) {
v := config.NewViperWithLoadGenDefaults()
c, err := config.ReadLoadGenYamlAndSetupLogging(v, filepath.Join(localConfigPath, "loadgen.yaml"))
require.NoError(t, err)
configBlock, err := workload.CreateConfigBlock(c.LoadProfile.Transaction.Policy)
c.LoadProfile.Policy.CryptoMaterialPath = t.TempDir()
configBlock, err := workload.CreateConfigBlock(&c.LoadProfile.Policy)
require.NoError(t, err)
require.NoError(t, configtxgen.WriteOutputBlock(configBlock, configBlockPath))

Expand Down
4 changes: 2 additions & 2 deletions docker/test/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestStartTestNodeWithTLSModesAndRemoteConnection(t *testing.T) {
c, err := config.ReadLoadGenYamlAndSetupLogging(v, filepath.Join(localConfigPath, "loadgen.yaml"))
require.NoError(t, err)
ordererEp := mustGetEndpoint(ctx, t, containerName, mockOrdererPort)
c.LoadProfile.Transaction.Policy.OrdererEndpoints = []*commontypes.OrdererEndpoint{
c.LoadProfile.Policy.OrdererEndpoints = []*commontypes.OrdererEndpoint{
{
Host: ordererEp.Host, Port: ordererEp.Port, ID: 0, MspID: "org",
API: []string{commontypes.Broadcast, commontypes.Deliver},
Expand All @@ -98,7 +98,7 @@ func TestStartTestNodeWithTLSModesAndRemoteConnection(t *testing.T) {
Server: mustGetEndpoint(ctx, t, containerName, coordinatorServicePort),
},
},
Policy: c.LoadProfile.Transaction.Policy,
Policy: &c.LoadProfile.Policy,
},
DBEnv: vc.NewDatabaseTestEnvFromConnection(
t,
Expand Down
5 changes: 3 additions & 2 deletions integration/runner/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,9 @@ func NewRuntime(t *testing.T, conf *Config) *CommitterRuntime {
LoadGenBlockLimit: conf.LoadgenBlockLimit,
LoadGenWorkers: 1,
Policy: &workload.PolicyProfile{
ChannelID: TestChannelName,
NamespacePolicies: make(map[string]*workload.Policy),
ChannelID: TestChannelName,
NamespacePolicies: make(map[string]*workload.Policy),
CryptoMaterialPath: t.TempDir(),
},
Logging: &logging.DefaultConfig,
},
Expand Down
3 changes: 2 additions & 1 deletion integration/test/config_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ func TestConfigUpdate(t *testing.T) {
lgMetaTx := c.TxBuilder.MakeTx(metaTx)

c.AddOrUpdateNamespaces(t, committerpb.MetaNamespaceID)
metaPolicy := c.TxBuilder.TxEndorser.Policy(committerpb.MetaNamespaceID).VerificationPolicy()
verPolicies := c.TxBuilder.TxEndorser.VerificationPolicies()
metaPolicy := verPolicies[committerpb.MetaNamespaceID]
submitConfigBlock := func(endpoints []*commontypes.OrdererEndpoint) {
ordererEnv.SubmitConfigBlock(t, &workload.ConfigBlock{
ChannelID: c.SystemConfig.Policy.ChannelID,
Expand Down
5 changes: 3 additions & 2 deletions integration/test/namespace_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ func TestCreateUpdateNamespace(t *testing.T) {
})
c.Start(t, runner.FullTxPath)

policyBytesNs1, err := proto.Marshal(c.TxBuilder.TxEndorser.Policy("1").VerificationPolicy())
verPolicies := c.TxBuilder.TxEndorser.VerificationPolicies()
policyBytesNs1, err := proto.Marshal(verPolicies["1"])
require.NoError(t, err)
policyBytesNs2, err := proto.Marshal(c.TxBuilder.TxEndorser.Policy("2").VerificationPolicy())
policyBytesNs2, err := proto.Marshal(verPolicies["2"])
require.NoError(t, err)

tests := []struct {
Expand Down
10 changes: 6 additions & 4 deletions loadgen/adapters/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync/atomic"

"github.com/cockroachdb/errors"
"github.com/hyperledger/fabric-protos-go-apiv2/common"

"github.com/hyperledger/fabric-x-committer/api/committerpb"
"github.com/hyperledger/fabric-x-committer/api/servicepb"
Expand All @@ -24,10 +25,11 @@ import (
type (
// ClientResources holds client's pre-generated resources to be used by the adapters.
ClientResources struct {
Metrics *metrics.PerfMetrics
Profile *workload.Profile
Stream *workload.StreamOptions
Limit *GenerateLimit
Metrics *metrics.PerfMetrics
Profile *workload.Profile
Stream *workload.StreamOptions
ConfigBlock *common.Block
Limit *GenerateLimit
}

// Phases specify the generation phases to enable.
Expand Down
21 changes: 12 additions & 9 deletions loadgen/adapters/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,20 @@ type (
)

// NewSidecarAdapter instantiate SidecarAdapter.
func NewSidecarAdapter(config *SidecarClientConfig, res *ClientResources) *SidecarAdapter {
func NewSidecarAdapter(config *SidecarClientConfig, res *ClientResources) (*SidecarAdapter, error) {
// The sidecar adapter overwrite the orderer endpoints with its own.
// We first pre-allocate the ports.
for _, s := range config.OrdererServers {
_, err := s.PreAllocateListener()
if err != nil {
return nil, err
}
}
res.Profile.Policy.OrdererEndpoints = ordererconn.NewEndpoints(0, "msp", config.OrdererServers...)
return &SidecarAdapter{
commonAdapter: commonAdapter{res: res},
config: config,
}
}, nil
}

// RunWorkload applies load on the sidecar.
Expand All @@ -56,13 +65,7 @@ func (c *SidecarAdapter) RunWorkload(ctx context.Context, txStream *workload.Str
})

// The sidecar adapter submits a config block manually.
policy := *c.res.Profile.Transaction.Policy
policy.OrdererEndpoints = ordererconn.NewEndpoints(0, "msp", c.config.OrdererServers...)
configBlock, err := workload.CreateConfigBlock(&policy)
if err != nil {
return err
}
if err = orderer.SubmitBlock(ctx, configBlock); err != nil {
if err = orderer.SubmitBlock(ctx, c.res.ConfigBlock); err != nil {
return err
}
c.nextBlockNum.Add(1)
Expand Down
2 changes: 1 addition & 1 deletion loadgen/adapters/sidecar_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const (
// runSidecarReceiver start receiving blocks from the sidecar.
func runSidecarReceiver(ctx context.Context, params *sidecarReceiverParameters) error {
ledgerReceiver, err := sidecarclient.New(&sidecarclient.Parameters{
ChannelID: params.Res.Profile.Transaction.Policy.ChannelID,
ChannelID: params.Res.Profile.Policy.ChannelID,
Client: params.ClientConfig,
})
if err != nil {
Expand Down
24 changes: 10 additions & 14 deletions loadgen/adapters/sigverifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func NewSVAdapter(config *connection.MultiClientConfig, res *ClientResources) *S

// RunWorkload applies load on the SV.
func (c *SvAdapter) RunWorkload(ctx context.Context, txStream *workload.StreamWithSetup) error {
updateMsg, err := createUpdate(c.res.Profile.Transaction.Policy)
updateMsg, err := createUpdate(c.res)
if err != nil {
return err
}
Expand Down Expand Up @@ -80,34 +80,30 @@ func (c *SvAdapter) RunWorkload(ctx context.Context, txStream *workload.StreamWi
return errors.Wrap(g.Wait(), "workload done")
}

func createUpdate(policy *workload.PolicyProfile) (*servicepb.VerifierUpdates, error) {
txEndorser := workload.NewTxEndorserVerifier(policy)

envelopeBytes, err := workload.CreateConfigEnvelope(policy)
if err != nil {
return nil, err
}
func createUpdate(res *ClientResources) (*servicepb.VerifierUpdates, error) {
txEndorser := workload.NewTxEndorser(&res.Profile.Policy)
verPolicies := txEndorser.VerificationPolicies()
updateMsg := &servicepb.VerifierUpdates{
Config: &applicationpb.ConfigTransaction{
Envelope: envelopeBytes,
Envelope: res.ConfigBlock.Data.Data[0],
},
NamespacePolicies: &applicationpb.NamespacePolicies{
Policies: make([]*applicationpb.PolicyItem, 0, len(txEndorser.AllNamespaces())),
Policies: make([]*applicationpb.PolicyItem, 0, len(verPolicies)),
},
}

for _, ns := range txEndorser.AllNamespaces() {
if ns == committerpb.MetaNamespaceID {
for nsID, nsPolicy := range verPolicies {
if nsID == committerpb.MetaNamespaceID {
continue
}
policyBytes, err := proto.Marshal(txEndorser.Policy(ns).VerificationPolicy())
policyBytes, err := proto.Marshal(nsPolicy)
if err != nil {
return nil, errors.Wrap(err, "failed to serialize policy")
}
updateMsg.NamespacePolicies.Policies = append(
updateMsg.NamespacePolicies.Policies,
&applicationpb.PolicyItem{
Namespace: ns,
Namespace: nsID,
Policy: policyBytes,
},
)
Expand Down
28 changes: 19 additions & 9 deletions loadgen/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,24 +56,34 @@ var logger = logging.New("load-gen-client")

// NewLoadGenClient creates a new client instance.
func NewLoadGenClient(conf *ClientConfig) (*Client, error) {
logger.Infof("Config passed: %s", &utils.LazyJSON{O: conf})
logger.Debugf("Config passed: %s", &utils.LazyJSON{O: conf})

c := &Client{
conf: conf,
txStream: workload.NewTxStream(conf.LoadProfile, conf.Stream),
conf: conf,
resources: adapters.ClientResources{
Profile: conf.LoadProfile,
Stream: conf.Stream,
Limit: conf.Limit,
Metrics: metrics.NewLoadgenServiceMetrics(&conf.Monitoring),
},
healthcheck: connection.DefaultHealthCheckService(),
}
c.resources.Metrics = metrics.NewLoadgenServiceMetrics(&conf.Monitoring)

adapter, err := getAdapter(&conf.Adapter, &c.resources)
if err != nil {
return nil, err
}

// We generate the crypto material and block after we create the adapter since the sidecar adapter
// modifies the orderer endpoints.
c.resources.ConfigBlock, err = workload.CreateConfigBlock(&conf.LoadProfile.Policy)
if err != nil {
return nil, err
}

// After creating the material, we can create the stream.
c.txStream = workload.NewTxStream(conf.LoadProfile, conf.Stream)

c.adapter = adapter
conf.Generate = adapters.PhasesIntersect(conf.Generate, adapter.Supports())
return c, nil
Expand All @@ -88,7 +98,7 @@ func getAdapter(conf *adapters.AdapterConfig, res *adapters.ClientResources) (Se
case conf.OrdererClient != nil:
return adapters.NewOrdererAdapter(conf.OrdererClient, res), nil
case conf.SidecarClient != nil:
return adapters.NewSidecarAdapter(conf.SidecarClient, res), nil
return adapters.NewSidecarAdapter(conf.SidecarClient, res)
case conf.VerifierClient != nil:
return adapters.NewSVAdapter(conf.VerifierClient, res), nil
case conf.LoadGenClient != nil:
Expand Down Expand Up @@ -213,7 +223,7 @@ func (c *Client) runLimiterServer(ctx context.Context) error {
func (c *Client) submitWorkloadSetupTXs(ctx context.Context, txs chan *servicepb.LoadGenTx) error {
defer close(txs)

workloadSetupTXs, err := makeWorkloadSetupTXs(c.conf)
workloadSetupTXs, err := makeWorkloadSetupTXs(c.conf, &c.resources)
if err != nil {
return err
}
Expand All @@ -238,17 +248,17 @@ func (c *Client) submitWorkloadSetupTXs(ctx context.Context, txs chan *servicepb
return nil
}

func makeWorkloadSetupTXs(config *ClientConfig) ([]*servicepb.LoadGenTx, error) {
func makeWorkloadSetupTXs(config *ClientConfig, res *adapters.ClientResources) ([]*servicepb.LoadGenTx, error) {
workloadSetupTXs := make([]*servicepb.LoadGenTx, 0, 2)
if config.Generate.Config {
configTX, err := workload.CreateConfigTx(config.LoadProfile.Transaction.Policy)
configTX, err := workload.CreateConfigTxFromConfigBlock(res.ConfigBlock)
if err != nil {
return nil, err
}
workloadSetupTXs = append(workloadSetupTXs, configTX)
}
if config.Generate.Namespaces {
metaNsTX, err := workload.CreateLoadGenNamespacesTX(config.LoadProfile.Transaction.Policy)
metaNsTX, err := workload.CreateLoadGenNamespacesTX(&config.LoadProfile.Policy)
if err != nil {
return nil, err
}
Expand Down
Loading