Skip to content

Commit 8b3d745

Browse files
authored
refactor: cleanup (#55)
Co-authored-by: Dimy Jeannot <>
1 parent 91e9ed8 commit 8b3d745

File tree

70 files changed

+1352
-547
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+1352
-547
lines changed

apps/connectors/poc/network-account/v1alpha/listeners/create.go

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
typev2pb "github.com/openecosystems/ecosystem/libs/protobuf/go/protobuf/gen/platform/type/v2"
1717
ecosystemv2alphapbmodel "github.com/openecosystems/ecosystem/libs/public/go/model/gen/platform/ecosystem/v2alpha"
1818
ecosystemv2alphapb "github.com/openecosystems/ecosystem/libs/public/go/sdk/gen/platform/ecosystem/v2alpha"
19+
"github.com/openecosystems/ecosystem/libs/public/go/sdk/gen/platform/ecosystem/v2alpha/ecosystemv2alphapbconnect"
1920
sdkv2alphalib "github.com/openecosystems/ecosystem/libs/public/go/sdk/v2alpha"
2021
)
2122

@@ -24,16 +25,10 @@ type CreateEcosystemListener struct{}
2425

2526
// GetConfiguration returns the listener configuration for the CreateEcosystemListener, including entity, subject, and queue details.
2627
func (l *CreateEcosystemListener) GetConfiguration() *natsnodev1.ListenerConfiguration {
27-
entity := &ecosystemv2alphapbmodel.EcosystemSpecEntity{}
28-
streamType := natsnodev1.InboundStream{}
29-
subject := natsnodev1.GetMultiplexedRequestSubjectName(streamType.StreamPrefix(), entity.CommandTopic())
30-
queue := natsnodev1.GetQueueGroupName(streamType.StreamPrefix(), entity.TypeName())
31-
3228
return &natsnodev1.ListenerConfiguration{
3329
Entity: &ecosystemv2alphapbmodel.EcosystemSpecEntity{},
34-
Subject: subject,
35-
Queue: queue,
3630
StreamType: &natsnodev1.InboundStream{},
31+
Procedure: ecosystemv2alphapbconnect.EcosystemServiceCreateEcosystemProcedure,
3732
JetstreamConfiguration: &jetstream.ConsumerConfig{
3833
Durable: "ecosystem-createEcosystem",
3934
AckPolicy: jetstream.AckExplicitPolicy,
@@ -46,7 +41,7 @@ func (l *CreateEcosystemListener) GetConfiguration() *natsnodev1.ListenerConfigu
4641

4742
// Listen starts the listener to process multiplexed spec events synchronously based on the provided context and configuration.
4843
func (l *CreateEcosystemListener) Listen(ctx context.Context, _ chan sdkv2alphalib.SpecListenableErr) {
49-
natsnodev1.ListenForMultiplexedSpecEventsSync(ctx, l)
44+
natsnodev1.ListenForMultiplexedRequests(ctx, l)
5045
}
5146

5247
// Process handles incoming listener messages to create and store a configuration, ensuring required fields are validated.
@@ -98,5 +93,5 @@ func (l *CreateEcosystemListener) Process(ctx context.Context, request *natsnode
9893
}
9994
log.Info("Create Ecosystem Response", zap.Any("id", response.Ecosystem.Id))
10095

101-
natsnodev1.RespondToSyncCommand(ctx, request, &response)
96+
natsnodev1.RespondToMultiplexedRequest(ctx, request, &response)
10297
}

apps/workloads/public/ecosystem/v2alpha/ecosystem/certificate/sign.go

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ package certificate
33
import (
44
"context"
55

6-
"github.com/nats-io/nats.go/jetstream"
7-
86
natsnodev1 "github.com/openecosystems/ecosystem/libs/partner/go/nats"
97
zaploggerv1 "github.com/openecosystems/ecosystem/libs/partner/go/zap"
108
specv2pb "github.com/openecosystems/ecosystem/libs/protobuf/go/protobuf/gen/platform/spec/v2"
@@ -18,29 +16,13 @@ type SignCertificateListener struct{}
1816

1917
// GetConfiguration provides the listener configuration for SignCertificateListener, including subject, queue, and jetstream settings.
2018
func (l *SignCertificateListener) GetConfiguration() *natsnodev1.ListenerConfiguration {
21-
entity := &cryptographyv2alphapb.CertificateSpecEntity{}
22-
streamType := natsnodev1.InboundStream{}
23-
subject := natsnodev1.GetMultiplexedRequestSubjectName(streamType.StreamPrefix(), entity.CommandTopic())
24-
queue := natsnodev1.GetQueueGroupName(streamType.StreamPrefix(), entity.TypeName())
25-
26-
return &natsnodev1.ListenerConfiguration{
27-
Entity: &cryptographyv2alphapb.CertificateSpecEntity{},
28-
Subject: subject,
29-
Queue: queue,
30-
StreamType: &natsnodev1.InboundStream{},
31-
JetstreamConfiguration: &jetstream.ConsumerConfig{
32-
Durable: "cryptography-signCertificate",
33-
AckPolicy: jetstream.AckExplicitPolicy,
34-
MemoryStorage: false,
35-
FilterSubject: "inbound-certificate.data.command",
36-
Metadata: nil,
37-
},
38-
}
19+
handler := cryptographyv2alphapb.CertificateServiceHandler{}
20+
return handler.GetSignCertificateConfiguration()
3921
}
4022

4123
// Listen synchronously listens for multiplexed spec events and routes them to the associated handler.
4224
func (l *SignCertificateListener) Listen(ctx context.Context, _ chan sdkv2alphalib.SpecListenableErr) {
43-
natsnodev1.ListenForMultiplexedSpecEventsSync(ctx, l)
25+
natsnodev1.ListenForMultiplexedRequests(ctx, l)
4426
}
4527

4628
// Process handles the incoming ListenerMessage, processes the request, and sends an appropriate response back to the client.
@@ -81,5 +63,5 @@ func (l *SignCertificateListener) Process(ctx context.Context, request *natsnode
8163

8264
// log.Info("Signed certificate successfully: " + response.Certificate.Id)
8365

84-
natsnodev1.RespondToSyncCommand(ctx, request, &response)
66+
natsnodev1.RespondToMultiplexedRequest(ctx, request, &response)
8567
}

apps/workloads/public/ecosystem/v2alpha/ecosystem/configuration/create.go

Lines changed: 4 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"fmt"
66

7-
"github.com/nats-io/nats.go/jetstream"
87
"go.uber.org/zap"
98
"google.golang.org/protobuf/proto"
109
"google.golang.org/protobuf/types/known/timestamppb"
@@ -26,29 +25,13 @@ type CreateConfigurationListener struct{}
2625

2726
// GetConfiguration returns the ListenerConfiguration for CreateConfigurationListener, defining subject, queue, entity, and stream settings.
2827
func (l *CreateConfigurationListener) GetConfiguration() *natsnodev1.ListenerConfiguration {
29-
entity := &configurationv2alphapb.ConfigurationSpecEntity{}
30-
streamType := natsnodev1.InboundStream{}
31-
subject := natsnodev1.GetMultiplexedRequestSubjectName(streamType.StreamPrefix(), entity.CommandTopic())
32-
queue := natsnodev1.GetQueueGroupName(streamType.StreamPrefix(), entity.TypeName())
33-
34-
return &natsnodev1.ListenerConfiguration{
35-
Entity: &configurationv2alphapb.ConfigurationSpecEntity{},
36-
Subject: subject,
37-
Queue: queue,
38-
StreamType: &natsnodev1.InboundStream{},
39-
JetstreamConfiguration: &jetstream.ConsumerConfig{
40-
Durable: "configuration-createConfiguration",
41-
AckPolicy: jetstream.AckExplicitPolicy,
42-
MemoryStorage: false,
43-
FilterSubject: "inbound-configuration.data.command",
44-
Metadata: nil,
45-
},
46-
}
28+
handler := configurationv2alphapb.ConfigurationServiceHandler{}
29+
return handler.GetCreateConfigurationConfiguration()
4730
}
4831

4932
// Listen starts the listener to process multiplexed spec events synchronously based on the provided context and configuration.
5033
func (l *CreateConfigurationListener) Listen(ctx context.Context, _ chan sdkv2alphalib.SpecListenableErr) {
51-
natsnodev1.ListenForMultiplexedSpecEventsSync(ctx, l)
34+
natsnodev1.ListenForMultiplexedRequests(ctx, l)
5235
}
5336

5437
// Process handles incoming listener messages to create and store a configuration, ensuring required fields are validated.
@@ -103,5 +86,5 @@ func (l *CreateConfigurationListener) Process(ctx context.Context, request *nats
10386
}
10487
log.Info("Create Configuration Response", zap.Any("id", response.Configuration.Id))
10588

106-
natsnodev1.RespondToSyncCommand(ctx, request, &response)
89+
natsnodev1.RespondToMultiplexedRequest(ctx, request, &response)
10790
}

apps/workloads/public/ecosystem/v2alpha/ecosystem/configuration/get.go

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package configuration
33
import (
44
"context"
55

6-
"github.com/nats-io/nats.go/jetstream"
76
"go.uber.org/zap"
87

98
configurationv2alphalib "github.com/openecosystems/ecosystem/libs/partner/go/configuration/v2alpha"
@@ -20,33 +19,13 @@ type GetConfigurationListener struct{}
2019

2120
// GetConfiguration creates and returns a ListenerConfiguration for the GetConfigurationListener.
2221
func (l *GetConfigurationListener) GetConfiguration() *natsnodev1.ListenerConfiguration {
23-
entity := &configurationv2alphapb.ConfigurationSpecEntity{}
24-
streamType := natsnodev1.InboundStream{}
25-
subject := natsnodev1.GetMultiplexedRequestSubjectName(streamType.StreamPrefix(), entity.EventTopic())
26-
queue := natsnodev1.GetQueueGroupName(streamType.StreamPrefix(), entity.TypeName())
27-
28-
return &natsnodev1.ListenerConfiguration{
29-
Entity: &configurationv2alphapb.ConfigurationSpecEntity{},
30-
Subject: subject,
31-
Queue: queue,
32-
StreamType: &natsnodev1.InboundStream{},
33-
JetstreamConfiguration: &jetstream.ConsumerConfig{
34-
Durable: "configuration-getConfiguration",
35-
//Durable: natsnodev1.GetListenerGroup(
36-
// &configurationv2alphapb.ConfigurationSpecEntity{},
37-
// &configurationv2alphapb.ConfigurationSpecEntity{},
38-
//),
39-
AckPolicy: jetstream.AckExplicitPolicy,
40-
MemoryStorage: false,
41-
FilterSubject: "inbound-configuration.data.event",
42-
Metadata: nil,
43-
},
44-
}
22+
handler := configurationv2alphapb.ConfigurationServiceHandler{}
23+
return handler.GetGetConfigurationConfiguration()
4524
}
4625

4726
// Listen subscribes the listener to a NATS subject to process multiplexed specification events synchronously.
4827
func (l *GetConfigurationListener) Listen(ctx context.Context, _ chan sdkv2alphalib.SpecListenableErr) {
49-
natsnodev1.ListenForMultiplexedSpecEventsSync(ctx, l)
28+
natsnodev1.ListenForMultiplexedRequests(ctx, l)
5029
}
5130

5231
// Process handles incoming listener messages, validates the request, retrieves platform configurations, and sends a response.
@@ -82,5 +61,5 @@ func (l *GetConfigurationListener) Process(ctx context.Context, request *natsnod
8261

8362
log.Info("Get Configuration Response", zap.Any("response", response))
8463

85-
natsnodev1.RespondToSyncCommand(ctx, request, response)
64+
natsnodev1.RespondToMultiplexedRequest(ctx, request, response)
8665
}

apps/workloads/public/ecosystem/v2alpha/ecosystem/ecosystem/create.go

Lines changed: 4 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"fmt"
66

7-
"github.com/nats-io/nats.go/jetstream"
87
"go.uber.org/zap"
98
"google.golang.org/protobuf/proto"
109
"google.golang.org/protobuf/types/known/timestamppb"
@@ -23,29 +22,13 @@ type CreateEcosystemListener struct{}
2322

2423
// GetConfiguration returns the listener configuration for the CreateEcosystemListener, including entity, subject, and queue details.
2524
func (l *CreateEcosystemListener) GetConfiguration() *natsnodev1.ListenerConfiguration {
26-
entity := &ecosystemv2alphapb.EcosystemSpecEntity{}
27-
streamType := natsnodev1.InboundStream{}
28-
subject := natsnodev1.GetMultiplexedRequestSubjectName(streamType.StreamPrefix(), entity.CommandTopic())
29-
queue := natsnodev1.GetQueueGroupName(streamType.StreamPrefix(), entity.TypeName())
30-
31-
return &natsnodev1.ListenerConfiguration{
32-
Entity: &ecosystemv2alphapb.EcosystemSpecEntity{},
33-
Subject: subject,
34-
Queue: queue,
35-
StreamType: &natsnodev1.InboundStream{},
36-
JetstreamConfiguration: &jetstream.ConsumerConfig{
37-
Durable: "ecosystem-createEcosystem",
38-
AckPolicy: jetstream.AckExplicitPolicy,
39-
MemoryStorage: false,
40-
FilterSubject: "inbound-ecosystem.data.command",
41-
Metadata: nil,
42-
},
43-
}
25+
handler := ecosystemv2alphapb.EcosystemServiceHandler{}
26+
return handler.GetCreateEcosystemConfiguration()
4427
}
4528

4629
// Listen starts the listener to process multiplexed spec events synchronously based on the provided context and configuration.
4730
func (l *CreateEcosystemListener) Listen(ctx context.Context, _ chan sdkv2alphalib.SpecListenableErr) {
48-
natsnodev1.ListenForMultiplexedSpecEventsSync(ctx, l)
31+
natsnodev1.ListenForMultiplexedRequests(ctx, l)
4932
}
5033

5134
// Process handles incoming listener messages to create and store a configuration, ensuring required fields are validated.
@@ -97,5 +80,5 @@ func (l *CreateEcosystemListener) Process(ctx context.Context, request *natsnode
9780
}
9881
log.Info("Create Ecosystem Response", zap.Any("id", response.Ecosystem.Id))
9982

100-
natsnodev1.RespondToSyncCommand(ctx, request, &response)
83+
natsnodev1.RespondToMultiplexedRequest(ctx, request, &response)
10184
}

apps/workloads/public/ecosystem/v2alpha/ecosystem/iam/create.go

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"fmt"
66

7-
"github.com/nats-io/nats.go/jetstream"
87
"go.uber.org/zap"
98
"google.golang.org/protobuf/types/known/timestamppb"
109

@@ -22,29 +21,13 @@ type CreateAccountListener struct{}
2221

2322
// GetConfiguration returns the listener configuration for the CreateAccountListener, including entity, subject, and queue details.
2423
func (l *CreateAccountListener) GetConfiguration() *natsnodev1.ListenerConfiguration {
25-
entity := &iamv2alphapb.AccountSpecEntity{}
26-
streamType := natsnodev1.InboundStream{}
27-
subject := natsnodev1.GetMultiplexedRequestSubjectName(streamType.StreamPrefix(), entity.CommandTopic())
28-
queue := natsnodev1.GetQueueGroupName(streamType.StreamPrefix(), entity.TypeName())
29-
30-
return &natsnodev1.ListenerConfiguration{
31-
Entity: &iamv2alphapb.AccountSpecEntity{},
32-
Subject: subject,
33-
Queue: queue,
34-
StreamType: &natsnodev1.InboundStream{},
35-
JetstreamConfiguration: &jetstream.ConsumerConfig{
36-
Durable: "iam-createAccount",
37-
AckPolicy: jetstream.AckExplicitPolicy,
38-
MemoryStorage: false,
39-
FilterSubject: "inbound-iam.data.command",
40-
Metadata: nil,
41-
},
42-
}
24+
handler := iamv2alphapb.AccountAuthorityServiceHandler{}
25+
return handler.GetCreateAccountAuthorityConfiguration()
4326
}
4427

4528
// Listen starts the listener to process multiplexed spec events synchronously based on the provided context and configuration.
4629
func (l *CreateAccountListener) Listen(ctx context.Context, _ chan sdkv2alphalib.SpecListenableErr) {
47-
natsnodev1.ListenForMultiplexedSpecEventsSync(ctx, l)
30+
natsnodev1.ListenForMultiplexedRequests(ctx, l)
4831
}
4932

5033
// Process handles incoming listener messages to create and store a configuration, ensuring required fields are validated.
@@ -61,7 +44,6 @@ func (l *CreateAccountListener) Process(ctx context.Context, request *natsnodev1
6144
return
6245
}
6346

64-
fmt.Println("REEEEEUUUUUU", req) //nolint:copylocks
6547
if req.Name == "" {
6648
fmt.Println("Name is required")
6749
return
@@ -100,5 +82,5 @@ func (l *CreateAccountListener) Process(ctx context.Context, request *natsnodev1
10082
}
10183
log.Info("Create Account Response", zap.Any("id", response.Account.Id))
10284

103-
natsnodev1.RespondToSyncCommand(ctx, request, &response)
85+
natsnodev1.RespondToMultiplexedRequest(ctx, request, &response)
10486
}
File renamed without changes.

0 commit comments

Comments
 (0)