Skip to content

Commit af3fa83

Browse files
authored
feat: PostgreSQL support for DomainAudit (#7665)
<!-- 1-2 line summary of WHAT changed technically: - Always link the relevant projects GitHub issue, unless it is a minor bugfix - Good: "Modified FailoverDomain mapper to allow null ActiveClusterName #320" - Bad: "added nil check" --> **What changed?** Implement SQL plugin interface and add PostgreSQL support for DomainAudit. - Created `domain_audit_log` table in Postgresql schema - Created data structures for `DomainAuditLogRow` and `DomainAuditLogFilter` in sqlplugin - Implemented sqlplugin interfaces for `InsertIntoDomainAuditLog` and `SelectFromDomainAuditLogs` functions - Implemented `InsertIntoDomainAuditLog` and `SelectFromDomainAuditLogs` functions in Postgresql - Updated SQL factory to create `newSQLDomainAuditStore` Fixes: #7602 <!-- Your goal is to provide all the required context for a future maintainer to understand the reasons for making this change (see https://cbea.ms/git-commit/#why-not-how). How did this work previously (and what was wrong with it)? What has changed, and why did you solve it this way? - Good: "Active-active domains have independent cluster attributes per region. Previously, modifying cluster attributes required spedifying the default ActiveClusterName which updates the global domain default. This prevents operators from updating regional configurations without affecting the primary cluster designation. This change allows attribute updates to be independent of active cluster selection." - Bad: "Improves domain handling" --> **Why?** DomainAudit allows all modifications made to a domain (e.g failovers) to be stored and retrieved. This has previously only been supported by NoSQL databases (Cassandra, MongoDB, DynamoDB). The SQL plugin is now configured to handle DomainAudit requests, so other SQL databases can also be easily configured to support DomainAudit. PostgreSQL support for DomainAudit is now added. <!-- Include specific test commands and setup. Please include the exact commands such that another maintainer or contributor can reproduce the test steps taken. - e.g Unit test commands with exact invocation `go test -v ./common/types/mapper/proto -run TestFailoverDomainRequest` - For integration tests include setup steps and test commands Example: "Started local server with `./cadence start`, then ran `make test_e2e`" - For local simulation testing include setup steps for the server and how you ran the tests - Good: Full commands that reviewers can copy-paste to verify - Bad: "Tested locally" or "Added tests" --> **How did you test it?** - Unit tests: `go test -v ./common/persistence/sql -run TestCreateDomainAuditLog TestGetDomainAuditLogs TestFactoryNewDomainAuditStore TestInsertIntoDomainAuditLog TestSelectFromDomainAuditLogs TestGetDataBlobEncoding TestGetDataBlobBytes` - Integration tests: Ran `TestPostgresSQLDomainAuditPersistence` on github actions <!-- If there are risks that the release engineer should know about document them here. For example: - Has an API/IDL been modified? Is it backwards/forwards compatible? If not, what are the repecussions? - Has a schema change been introduced? Is it possible to roll back? - Has a feature flag been re-used for a new purpose? - Is there a potential performance concern? Is the change modifying core task processing logic? - If truly N/A, you can mark it as such --> **Potential risks** - Postgresql schema is updated with a new table; it can be rolled back with Postgres database release version <!-- If this PR completes a user facing feature or changes functionality add release notes here. Your release notes should allow a user and the release engineer to understand the changes with little context. Always ensure that the description contains a link to the relevant GitHub issue. --> **Release notes** Added PostgreSQL support for DomainAudit <!-- Consider whether this change requires documentation updates in the Cadence-Docs repo - If yes: mention what needs updating (or link to docs PR in cadence-docs repo) - If in doubt, add a note about potential doc needs - Only mark N/A if you're certain no docs are affected --> **Documentation Changes** N/A --- ## Potential Breaking change **Detailed Description** [In-depth description of the changes made to the schema or interfaces, specifying new fields, removed fields, or modified data structures] - Created `domain_audit_log` table in Postgresql schema - Created data structures for `DomainAuditLogRow` and `DomainAuditLogFilter` in sqlplugin - Implemented sqlplugin interfaces for `InsertIntoDomainAuditLog` and `SelectFromDomainAuditLogs` functions - Implemented `InsertIntoDomainAuditLog` and `SelectFromDomainAuditLogs` functions in Postgresql - Updated SQL factory to create `newSQLDomainAuditStore` **Impact Analysis** - **Backward Compatibility**: Schema changes can be rolled back with Postgres database release version - **Forward Compatibility**: Doesn't affect existing schema **Testing Plan** - **Unit Tests**: `go test -v ./common/persistence/sql -run TestCreateDomainAuditLog TestGetDomainAuditLogs TestFactoryNewDomainAuditStore TestInsertIntoDomainAuditLog TestSelectFromDomainAuditLogs TestGetDataBlobEncoding TestGetDataBlobBytes` - **Persistence Tests**: Ran `TestPostgresSQLDomainAuditPersistence` on github actions - **Integration Tests**: [Do we have integration test covering the change?] - **Compatibility Tests**: [Have we done tests to test the backward and forward compatibility?] **Rollout Plan** - What is the rollout plan? - Does the order of deployment matter? - Is it safe to rollback? Does the order of rollback matter? Schema changes can be rolled back with Postgres database release version - Is there a kill switch to mitigate the impact immediately? Changing [NewDomainAuditStore()](https://github.com/cadence-workflow/cadence/blob/57b7157840e96ab7746fece26c8da291a582bf7b/common/persistence/sql/factory.go#L114) to return `nil, nil` should revert effects of all changes --- ## Reviewer Validation **PR Description Quality** (check these before reviewing code): - [ ] **"What changed"** provides a clear 1-2 line summary - [ ] Project Issue is linked - [ ] **"Why"** explains the full motivation with sufficient context - [ ] **Testing is documented:** - [ ] Unit test commands are included (with exact `go test` invocation) - [ ] Integration test setup/commands included (if integration tests were run) - [ ] Canary testing details included (if canary was mentioned) - [ ] **Potential risks** section is thoughtfully filled out (or legitimately N/A) - [ ] **Release notes** included if this completes a user-facing feature - [ ] **Documentation** needs are addressed (or noted if uncertain) --------- Signed-off-by: Joanna Lau <118241363+joannalauu@users.noreply.github.com>
1 parent f551af0 commit af3fa83

File tree

15 files changed

+1409
-3
lines changed

15 files changed

+1409
-3
lines changed

common/persistence/sql/factory.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,11 @@ func (f *Factory) NewDomainStore() (p.DomainStore, error) {
112112

113113
// NewDomainAuditStore returns a domain audit store
114114
func (f *Factory) NewDomainAuditStore() (p.DomainAuditStore, error) {
115-
return nil, nil
115+
conn, err := f.dbConn.get()
116+
if err != nil {
117+
return nil, err
118+
}
119+
return newSQLDomainAuditStore(conn, f.logger, f.parser)
116120
}
117121

118122
// NewExecutionStore returns an ExecutionStore for a given shardID

common/persistence/sql/factory_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,3 +221,25 @@ func TestFactoryNewConfigStore(t *testing.T) {
221221
assert.NoError(t, err)
222222
factory.Close()
223223
}
224+
225+
func TestFactoryNewDomainAuditStore(t *testing.T) {
226+
ctrl := gomock.NewController(t)
227+
defer ctrl.Finish()
228+
cfg := config.SQL{}
229+
clusterName := "test"
230+
logger := testlogger.New(t)
231+
mockParser := serialization.NewMockParser(ctrl)
232+
dc := &persistence.DynamicConfiguration{}
233+
factory := NewFactory(cfg, clusterName, logger, mockParser, dc)
234+
domainAuditStore, err := factory.NewDomainAuditStore()
235+
assert.Nil(t, domainAuditStore)
236+
assert.Error(t, err)
237+
factory.Close()
238+
239+
cfg.PluginName = "shared"
240+
factory = NewFactory(cfg, clusterName, logger, mockParser, dc)
241+
domainAuditStore, err = factory.NewDomainAuditStore()
242+
assert.NotNil(t, domainAuditStore)
243+
assert.NoError(t, err)
244+
factory.Close()
245+
}
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
// Copyright (c) 2025 Uber Technologies, Inc.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a copy
4+
// of this software and associated documentation files (the "Software"), to deal
5+
// in the Software without restriction, including without limitation the rights
6+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7+
// copies of the Software, and to permit persons to whom the Software is
8+
// furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19+
// THE SOFTWARE.
20+
21+
package sql
22+
23+
import (
24+
"context"
25+
"fmt"
26+
"time"
27+
28+
"github.com/uber/cadence/common/constants"
29+
"github.com/uber/cadence/common/log"
30+
"github.com/uber/cadence/common/persistence"
31+
"github.com/uber/cadence/common/persistence/serialization"
32+
"github.com/uber/cadence/common/persistence/sql/sqlplugin"
33+
"github.com/uber/cadence/common/types"
34+
)
35+
36+
type sqlDomainAuditStore struct {
37+
sqlStore
38+
}
39+
40+
// domainAuditLogPageToken is used for pagination
41+
type domainAuditLogPageToken struct {
42+
CreatedTime time.Time `json:"created_time"`
43+
EventID string `json:"event_id"`
44+
}
45+
46+
// newSQLDomainAuditStore creates an instance of sqlDomainAuditStore
47+
func newSQLDomainAuditStore(
48+
db sqlplugin.DB,
49+
logger log.Logger,
50+
parser serialization.Parser,
51+
) (persistence.DomainAuditStore, error) {
52+
return &sqlDomainAuditStore{
53+
sqlStore: sqlStore{
54+
db: db,
55+
logger: logger,
56+
parser: parser,
57+
},
58+
}, nil
59+
}
60+
61+
// CreateDomainAuditLog creates a new domain audit log entry
62+
func (m *sqlDomainAuditStore) CreateDomainAuditLog(
63+
ctx context.Context,
64+
request *persistence.InternalCreateDomainAuditLogRequest,
65+
) (*persistence.CreateDomainAuditLogResponse, error) {
66+
row := &sqlplugin.DomainAuditLogRow{
67+
DomainID: request.DomainID,
68+
EventID: request.EventID,
69+
StateBefore: getDataBlobBytes(request.StateBefore),
70+
StateBeforeEncoding: getDataBlobEncoding(request.StateBefore),
71+
StateAfter: getDataBlobBytes(request.StateAfter),
72+
StateAfterEncoding: getDataBlobEncoding(request.StateAfter),
73+
OperationType: request.OperationType,
74+
CreatedTime: request.CreatedTime,
75+
LastUpdatedTime: request.LastUpdatedTime,
76+
Identity: request.Identity,
77+
IdentityType: request.IdentityType,
78+
Comment: request.Comment,
79+
}
80+
81+
_, err := m.db.InsertIntoDomainAuditLog(ctx, row)
82+
if err != nil {
83+
return nil, convertCommonErrors(m.db, "CreateDomainAuditLog", "", err)
84+
}
85+
86+
return &persistence.CreateDomainAuditLogResponse{
87+
EventID: request.EventID,
88+
}, nil
89+
}
90+
91+
// GetDomainAuditLogs retrieves domain audit logs
92+
func (m *sqlDomainAuditStore) GetDomainAuditLogs(
93+
ctx context.Context,
94+
request *persistence.GetDomainAuditLogsRequest,
95+
) (*persistence.InternalGetDomainAuditLogsResponse, error) {
96+
minCreatedTime := time.Unix(0, 0)
97+
maxCreatedTime := time.Now().UTC()
98+
if request.MinCreatedTime != nil {
99+
minCreatedTime = *request.MinCreatedTime
100+
}
101+
if request.MaxCreatedTime != nil {
102+
maxCreatedTime = *request.MaxCreatedTime
103+
}
104+
105+
pageMaxCreatedTime := maxCreatedTime
106+
// if next page token is not present, set pageMinEventID to largest possible uuid
107+
// to prevent the query from returning rows where created_time is equal to pageMaxCreatedTime
108+
pageMinEventID := "ffffffff-ffff-ffff-ffff-ffffffffffff"
109+
if request.NextPageToken != nil {
110+
page := domainAuditLogPageToken{}
111+
if err := gobDeserialize(request.NextPageToken, &page); err != nil {
112+
return nil, fmt.Errorf("unable to decode next page token")
113+
}
114+
pageMaxCreatedTime = page.CreatedTime
115+
pageMinEventID = page.EventID
116+
}
117+
118+
filter := &sqlplugin.DomainAuditLogFilter{
119+
DomainID: request.DomainID,
120+
OperationType: request.OperationType,
121+
MinCreatedTime: &minCreatedTime,
122+
MaxCreatedTime: &maxCreatedTime,
123+
PageSize: request.PageSize,
124+
PageMaxCreatedTime: &pageMaxCreatedTime,
125+
PageMinEventID: &pageMinEventID,
126+
}
127+
128+
rows, err := m.db.SelectFromDomainAuditLogs(ctx, filter)
129+
if err != nil {
130+
return nil, convertCommonErrors(m.db, "GetDomainAuditLogs", "", err)
131+
}
132+
133+
var nextPageToken []byte
134+
if request.PageSize > 0 && len(rows) >= request.PageSize {
135+
// there could be more results
136+
lastRow := rows[request.PageSize-1]
137+
token := domainAuditLogPageToken{
138+
CreatedTime: lastRow.CreatedTime,
139+
EventID: lastRow.EventID,
140+
}
141+
nextPageToken, err = gobSerialize(token)
142+
if err != nil {
143+
return nil, &types.InternalServiceError{Message: fmt.Sprintf("error serializing nextPageToken:%v", err)}
144+
}
145+
}
146+
147+
var auditLogs []*persistence.InternalDomainAuditLog
148+
for _, row := range rows {
149+
auditLogs = append(auditLogs, deseralizeDomainAuditLogRow(row))
150+
}
151+
152+
return &persistence.InternalGetDomainAuditLogsResponse{
153+
AuditLogs: auditLogs,
154+
NextPageToken: nextPageToken,
155+
}, nil
156+
}
157+
158+
func getDataBlobBytes(blob *persistence.DataBlob) []byte {
159+
if blob == nil {
160+
return []byte{}
161+
}
162+
return blob.Data
163+
}
164+
165+
func getDataBlobEncoding(blob *persistence.DataBlob) constants.EncodingType {
166+
if blob == nil {
167+
return constants.EncodingTypeEmpty
168+
}
169+
return blob.Encoding
170+
}
171+
172+
func deseralizeDomainAuditLogRow(row *sqlplugin.DomainAuditLogRow) *persistence.InternalDomainAuditLog {
173+
auditLog := &persistence.InternalDomainAuditLog{
174+
EventID: row.EventID,
175+
DomainID: row.DomainID,
176+
OperationType: row.OperationType,
177+
CreatedTime: row.CreatedTime,
178+
LastUpdatedTime: row.LastUpdatedTime,
179+
Identity: row.Identity,
180+
IdentityType: row.IdentityType,
181+
Comment: row.Comment,
182+
}
183+
184+
if len(row.StateBefore) > 0 {
185+
auditLog.StateBefore = &persistence.DataBlob{
186+
Encoding: row.StateBeforeEncoding,
187+
Data: row.StateBefore,
188+
}
189+
}
190+
191+
if len(row.StateAfter) > 0 {
192+
auditLog.StateAfter = &persistence.DataBlob{
193+
Encoding: row.StateAfterEncoding,
194+
Data: row.StateAfter,
195+
}
196+
}
197+
198+
return auditLog
199+
}

0 commit comments

Comments
 (0)