Skip to content

Commit 256c6db

Browse files
authored
Feature/1 (#21)
* users schema update: added email column, added indexes * tests update * user repo GetByEmail test added * user email prop added * new google oauth login will result in new user * reset password handler * username vo update * test fix * switched from bcrypt to argon2id * fixed a bug * added Delete method to cache interface * UserPasswordChanged event added * logging fixed * removed auth context * event processor implemented * removed domain event handling from user repository * moved event processing logic out of AuthService * minor changes * replaced plan controller with event handler * new values in env * new docker compose service * implemented fetching plan data via ws * refactored rest api * plan handler refactoring * error message update * minor fix * minor improvements in authenticator.go * using dtos in plan_handler.go * moved some files to other location * minor change * reduced user restriction service cache ttl to 15 seconds * minor change in handler.go * introduced a dialer service, changed project structure a bit * introduced a dialer service, changed project structure a bit * traffic reporter optimizations * changed buf size in Proxy * removed validateArgon2idHash * buf optimizations * fixed copyTrafficAndReport * minor change in dialer_pool.go * minor performance oriented improvements * AuthService.AuthorizeBasic optimized with singleflight * test fix * gitignore update * dependencies update
1 parent 6b9e1ee commit 256c6db

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+2085
-1028
lines changed

.env

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,14 @@ PROXY_KAFKA_GROUP_ID=PROXY
3030
PROXY_KAFKA_AUTO_OFFSET_RESET=earliest
3131

3232
#PlansKafka
33-
PLAN_KAFKA_TOPIC=PLAN
33+
PLAN_KAFKA_TOPIC=PLAN
34+
35+
#Google OAuth Client
36+
GOOGLE_AUTH_CLIENT_ID=secret
37+
GOOGLE_AUTH_CLIENT_SECRET=secret
38+
GOOGLE_AUTH_HOST=http://localhost
39+
GOOGLE_AUTH_PORT=3030
40+
41+
#REST API KAFKA
42+
USERS_REST_API_KAFKA_GROUP_ID=rest-api
43+
USERS_REST_API_KAFKA_TOPIC=PROXY

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,4 @@ go.work.sum
2828
/.idea/goproxy.iml
2929
/.idea/dataSources.xml
3030
/.idea/sqldialects.xml
31+
/.idea/forwardedPorts.xml

docker-compose.yml

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,39 @@ services:
9999
depends_on:
100100
lt-billing-postgres:
101101
condition: service_healthy
102+
103+
google-auth-api:
104+
build:
105+
context: ./src
106+
container_name: google-auth-api
107+
ports:
108+
- "3030:3030"
109+
environment:
110+
- MODE=google-auth
111+
- SERVE_PORT=3030
112+
- ALLOWED_ORIGINS=http://localhost:5173
113+
- DB_DATABASE=${DB_DATABASE}
114+
- DB_USER=${DB_USER}
115+
- DB_PASS=${DB_PASS}
116+
- DB_HOST=${DB_HOST}
117+
- DB_PORT=${DB_PORT}
118+
- ENVIRONMENT=development
119+
- GOOGLE_AUTH_CLIENT_ID=${GOOGLE_AUTH_CLIENT_ID}
120+
- GOOGLE_AUTH_CLIENT_SECRET=${GOOGLE_AUTH_CLIENT_SECRET}
121+
- GOOGLE_AUTH_HOST=${GOOGLE_AUTH_HOST}
122+
- GOOGLE_AUTH_PORT=${GOOGLE_AUTH_PORT}
123+
- PROXY_KAFKA_AUTO_OFFSET_RESET=${PROXY_KAFKA_AUTO_OFFSET_RESET}
124+
- PROXY_KAFKA_BOOTSTRAP_SERVERS=kafka:9092
125+
- PROXY_KAFKA_GROUP_ID=GOOGLE-AUTH
126+
- PROXY_KAFKA_TOPIC=PROXY
127+
- TC_CACHE_HOST=${TC_CACHE_HOST}
128+
- TC_CACHE_PORT=${TC_CACHE_PORT}
129+
- TC_CACHE_USER=${TC_CACHE_USER}
130+
depends_on:
131+
postgres:
132+
condition: service_healthy
133+
plan-controller-cache:
134+
condition: service_healthy
102135

103136
proxy:
104137
build:
@@ -143,6 +176,11 @@ services:
143176
- DB_HOST=${DB_HOST}
144177
- DB_PORT=${DB_PORT}
145178
- HTTP_REST_API_PORT=80
179+
- PROXY_KAFKA_BOOTSTRAP_SERVERS=kafka:9092
180+
- PROXY_KAFKA_GROUP_ID=${USERS_REST_API_KAFKA_GROUP_ID}
181+
- PROXY_KAFKA_AUTO_OFFSET_RESET=${PROXY_KAFKA_AUTO_OFFSET_RESET}
182+
- PROXY_KAFKA_TOPIC=${USERS_REST_API_KAFKA_TOPIC}
183+
- ALLOWED_ORIGINS=*
146184
depends_on:
147185
- auth-ctx-migrator
148186

@@ -229,8 +267,11 @@ services:
229267
build:
230268
context: ./src
231269
container_name: plan-controller
270+
ports:
271+
- "3031:3031"
232272
environment:
233273
- MODE=plan-controller
274+
- USERS_API_HOST=http://rest-api:80
234275
- DB_USER=${PLANS_DB_USER}
235276
- DB_PASS=${PLANS_DB_PASS}
236277
- DB_HOST=${PLANS_DB_HOST}
@@ -249,6 +290,7 @@ services:
249290
- TC_CACHE_PORT=${TC_CACHE_PORT}
250291
- TC_CACHE_USER=${TC_CACHE_USER}
251292
- TC_CACHE_PASSWORD=${TC_CACHE_PASSWORD}
293+
- ALLOWED_ORIGINS=http://localhost:5173
252294
depends_on:
253295
init-kafka-topics:
254296
condition: service_completed_successfully
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package aplication_errors
2+
3+
type ErrIpPoolEmpty struct {
4+
}
5+
6+
func (ip ErrIpPoolEmpty) Error() string {
7+
return "IP pool is empty"
8+
}

src/application/dialer_pool.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package application
2+
3+
import (
4+
"net"
5+
"time"
6+
)
7+
8+
type DialerPool interface {
9+
// GetDialer retrieves a dialer for the given network and userId. If userId is bound, the corresponding IP is used.
10+
GetDialer(network string, userId int) (*net.Dialer, error)
11+
12+
// BindDialerToUser binds an IP to a user for the specified TTL without creating a dialer.
13+
BindDialerToUser(userId int, ttl time.Duration) error
14+
15+
// SetPool sets ip addresses available to be used with dialer
16+
SetPool(ips []net.IP)
17+
}

src/application/event_processor.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package application
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"log"
8+
)
9+
10+
type EventHandler interface {
11+
Handle(payload string) error
12+
}
13+
14+
type EventProcessor struct {
15+
messageBus MessageBusService
16+
handlerMap map[string]EventHandler
17+
topics []string
18+
}
19+
20+
func NewEventProcessor(messageBus MessageBusService) *EventProcessor {
21+
return &EventProcessor{
22+
messageBus: messageBus,
23+
handlerMap: make(map[string]EventHandler),
24+
}
25+
}
26+
27+
func (e *EventProcessor) RegisterHandler(eventType string, handler EventHandler) *EventProcessor {
28+
e.handlerMap[eventType] = handler
29+
return e
30+
}
31+
32+
func (e *EventProcessor) RegisterTopic(topic string) *EventProcessor {
33+
e.topics = append(e.topics, topic)
34+
return e
35+
}
36+
37+
func (e *EventProcessor) Build() error {
38+
if e.topics == nil || len(e.topics) == 0 {
39+
return errors.New("topics is empty")
40+
}
41+
42+
if len(e.handlerMap) == 0 {
43+
return errors.New("no handlers registered")
44+
}
45+
46+
return nil
47+
}
48+
49+
func (e *EventProcessor) Start(ctx context.Context) error {
50+
if err := e.messageBus.Subscribe(e.topics); err != nil {
51+
log.Fatalf("Failed to subscribe to topics: %v", err)
52+
}
53+
54+
go func() {
55+
defer func(messageBus MessageBusService) {
56+
_ = messageBus.Close()
57+
}(e.messageBus)
58+
59+
for {
60+
select {
61+
case <-ctx.Done():
62+
return
63+
default:
64+
eventProcessingErr := e.ProcessNextEvent()
65+
if eventProcessingErr != nil {
66+
log.Printf("failed to process event: %v", eventProcessingErr)
67+
}
68+
}
69+
}
70+
}()
71+
72+
return nil
73+
}
74+
75+
func (e *EventProcessor) ProcessNextEvent() error {
76+
event, err := e.messageBus.Consume()
77+
if err != nil {
78+
return fmt.Errorf("failed to consume event: %v", err)
79+
}
80+
81+
eventHandler, ok := e.handlerMap[event.EventType.Value()]
82+
if !ok {
83+
return fmt.Errorf("no handler found for event type: %s", event.EventType.Value())
84+
}
85+
86+
return eventHandler.Handle(event.Payload)
87+
}
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
package application
2+
3+
import (
4+
"context"
5+
"errors"
6+
"goproxy/application/mocks"
7+
"goproxy/domain/events"
8+
"goproxy/domain/valueobjects"
9+
"testing"
10+
"time"
11+
)
12+
13+
type MockEventHandler struct {
14+
handledEvents []string
15+
returnError bool
16+
}
17+
18+
func (m *MockEventHandler) Handle(payload string) error {
19+
m.handledEvents = append(m.handledEvents, payload)
20+
if m.returnError {
21+
return errors.New("mock handler error")
22+
}
23+
return nil
24+
}
25+
26+
func TestEventProcessor_Start(t *testing.T) {
27+
// Arrange
28+
mockBus := mocks.NewMockMessageBusService()
29+
mockHandler := &MockEventHandler{}
30+
processor := NewEventProcessor(mockBus).
31+
RegisterHandler("TestEvent", mockHandler).
32+
RegisterTopic("TestTopic")
33+
34+
if err := processor.Build(); err != nil {
35+
t.Fatalf("Failed to build EventProcessor: %v", err)
36+
}
37+
38+
ctx, cancel := context.WithCancel(context.Background())
39+
defer cancel()
40+
41+
eventType, eventTypeErr := valueobjects.ParseEventTypeFromString("TestEvent")
42+
if eventTypeErr != nil {
43+
t.Fatalf("Failed to parse event type from payload: %v", eventTypeErr)
44+
}
45+
46+
_ = mockBus.Produce("TestTopic", events.OutboxEvent{
47+
EventType: eventType,
48+
Payload: "test_payload",
49+
})
50+
51+
// Act
52+
go func() {
53+
if err := processor.Start(ctx); err != nil {
54+
t.Errorf("Failed to start EventProcessor: %v", err)
55+
}
56+
}()
57+
58+
// Wait for the event to be processed
59+
time.Sleep(100 * time.Millisecond)
60+
61+
// Assert
62+
if len(mockHandler.handledEvents) != 1 {
63+
t.Fatalf("Expected 1 handled event, got %d", len(mockHandler.handledEvents))
64+
}
65+
if mockHandler.handledEvents[0] != "test_payload" {
66+
t.Errorf("Unexpected payload handled: %s", mockHandler.handledEvents[0])
67+
}
68+
}
69+
70+
func TestEventProcessor_MissingHandler(t *testing.T) {
71+
// Arrange
72+
mockBus := mocks.NewMockMessageBusService()
73+
processor := NewEventProcessor(mockBus).
74+
RegisterHandler("SomeEvent", &MockEventHandler{}).
75+
RegisterTopic("TestTopic")
76+
77+
if err := processor.Build(); err != nil {
78+
t.Fatalf("Failed to build EventProcessor: %v", err)
79+
}
80+
81+
ctx, cancel := context.WithCancel(context.Background())
82+
defer cancel()
83+
84+
eventType, eventTypeErr := valueobjects.ParseEventTypeFromString("UnregisteredEvent")
85+
if eventTypeErr != nil {
86+
t.Fatalf("Failed to parse event type from payload: %v", eventTypeErr)
87+
}
88+
89+
_ = mockBus.Produce("TestTopic", events.OutboxEvent{
90+
EventType: eventType,
91+
Payload: "test_payload",
92+
})
93+
94+
// Act
95+
go func() {
96+
if err := processor.Start(ctx); err != nil {
97+
t.Errorf("Failed to start EventProcessor: %v", err)
98+
}
99+
}()
100+
101+
// Wait for the event to be processed
102+
time.Sleep(100 * time.Millisecond)
103+
104+
// Assert
105+
// No handlers registered for "UnregisteredEvent", so nothing should happen
106+
}
107+
108+
func TestEventProcessor_Shutdown(t *testing.T) {
109+
// Arrange
110+
mockBus := mocks.NewMockMessageBusService()
111+
mockHandler := &MockEventHandler{}
112+
processor := NewEventProcessor(mockBus).
113+
RegisterHandler("TestEvent", mockHandler).
114+
RegisterTopic("TestTopic")
115+
116+
if err := processor.Build(); err != nil {
117+
t.Fatalf("Failed to build EventProcessor: %v", err)
118+
}
119+
120+
ctx, cancel := context.WithCancel(context.Background())
121+
122+
// Act
123+
go func() {
124+
if err := processor.Start(ctx); err != nil {
125+
t.Errorf("Failed to start EventProcessor: %v", err)
126+
}
127+
}()
128+
129+
// Simulate shutdown
130+
time.Sleep(100 * time.Millisecond)
131+
cancel()
132+
133+
// Assert
134+
time.Sleep(100 * time.Millisecond) // Give time for graceful shutdown
135+
if !mockBus.WasCloseCalled() {
136+
t.Errorf("MessageBus was not closed during shutdown")
137+
}
138+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package application
2+
3+
import (
4+
"net"
5+
)
6+
7+
type HttpProxyListenerService interface {
8+
Listen(port int) (net.Listener, error)
9+
}

src/application/ip_resolver.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package application
2+
3+
import (
4+
"net"
5+
)
6+
7+
type IPResolver interface {
8+
GetHostPublicIPs() ([]net.IP, error)
9+
}

0 commit comments

Comments
 (0)