Skip to content

Commit 676047c

Browse files
authored
Do not migrate expired welcome messages (#1620)
1 parent d01b50d commit 676047c

File tree

7 files changed

+44
-58
lines changed

7 files changed

+44
-58
lines changed

pkg/config/migrator.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ type MigrationServerOptions struct {
1414
BatchSize int32 `long:"batch-size" env:"XMTPD_MIGRATION_DB_BATCH_SIZE" description:"Batch size for migration" default:"1000"`
1515
PollInterval time.Duration `long:"process-interval" env:"XMTPD_MIGRATION_DB_PROCESS_INTERVAL" description:"Interval for processing migration" default:"10s"`
1616
Namespace string `long:"namespace" env:"XMTPD_MIGRATION_DB_NAMESPACE" description:"Namespace for migration" default:""`
17-
StartDate time.Time `long:"start-date" env:"XMTPD_MIGRATION_START_DATE" description:"Start date for migration" default:"2025-10-01T00:00:00Z"`
1817
}
1918

2019
type MigrationClientOptions struct {

pkg/migrator/migrator.go

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -185,14 +185,11 @@ func NewMigrationService(opts ...DBMigratorOption) (*Migrator, error) {
185185
readDB := db.NewDBHandler(reader, db.WithReadReplica(reader))
186186

187187
readers := map[string]ISourceReader{
188-
groupMessagesTableName: NewGroupMessageReader(readDB.DB(), cfg.options.StartDate.Unix()),
189-
inboxLogTableName: NewInboxLogReader(readDB.DB(), cfg.options.StartDate.UnixNano()),
190-
keyPackagesTableName: NewKeyPackageReader(readDB.DB()),
191-
welcomeMessagesTableName: NewWelcomeMessageReader(
192-
readDB.DB(),
193-
cfg.options.StartDate.Unix(),
194-
),
195-
commitMessagesTableName: NewCommitMessageReader(readDB.DB(), cfg.options.StartDate.Unix()),
188+
groupMessagesTableName: NewGroupMessageReader(readDB.DB()),
189+
inboxLogTableName: NewInboxLogReader(readDB.DB()),
190+
keyPackagesTableName: NewKeyPackageReader(readDB.DB()),
191+
welcomeMessagesTableName: NewWelcomeMessageReader(readDB.DB()),
192+
commitMessagesTableName: NewCommitMessageReader(readDB.DB()),
196193
}
197194

198195
transformer := NewTransformer(cfg.feeCalculator, payerPrivateKey, nodeSigningKey)

pkg/migrator/migrator_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ const (
3535
// Identity updates go to blockchain, not database.
3636
inboxLogAmount int64 = 0
3737
inboxLogLastID int64 = 19
38-
welcomeMessageAmount int64 = 19
39-
welcomeMessageLastID int64 = 19
38+
welcomeMessageAmount int64 = 18
39+
welcomeMessageLastID int64 = 150000017
4040
keyPackageAmount int64 = 19
4141
keyPackageLastID int64 = 19
4242
)
@@ -75,7 +75,6 @@ func newMigratorTest(t *testing.T) *migratorTest {
7575
WaitForDB: 5 * time.Second,
7676
BatchSize: 1000,
7777
PollInterval: 500 * time.Millisecond,
78-
StartDate: startDate,
7978
}),
8079
migrator.WithContractsOptions(chainConfig),
8180
migrator.WithFeeCalculator(feeCalculator),
@@ -404,7 +403,8 @@ func verifyGroupMessagesIntegrity(t *testing.T, ctx context.Context, sourceDB, d
404403
func verifyWelcomeMessagesIntegrity(t *testing.T, ctx context.Context, sourceDB, destDB *sql.DB) {
405404
sourceRows, err := sourceDB.QueryContext(ctx, `
406405
SELECT id, installation_key, data
407-
FROM welcome_messages
406+
FROM welcome_messages
407+
WHERE id > 150000000
408408
ORDER BY id
409409
`)
410410
require.NoError(t, err)

pkg/migrator/reader.go

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,12 @@ func NewDBReader[T ISourceRecord](
2929
query string,
3030
queryHeight string,
3131
factory func() T,
32-
startDate int64,
3332
) *DBReader[T] {
3433
return &DBReader[T]{
3534
db: db,
3635
query: query,
3736
queryHeight: queryHeight,
3837
factory: factory,
39-
startDate: startDate,
4038
heightEvery: 10 * time.Minute,
4139
}
4240
}
@@ -127,11 +125,11 @@ type GroupMessageReader struct {
127125
*DBReader[*GroupMessage]
128126
}
129127

130-
func NewGroupMessageReader(db *sql.DB, startDate int64) *GroupMessageReader {
128+
func NewGroupMessageReader(db *sql.DB) *GroupMessageReader {
131129
query := `
132130
SELECT id, created_at, group_id, data, group_id_data_hash, is_commit, sender_hmac, should_push
133131
FROM group_messages
134-
WHERE id > $1 AND is_commit = false AND created_at > to_timestamp($3)
132+
WHERE id > $1 AND is_commit = false
135133
ORDER BY id ASC
136134
LIMIT $2
137135
`
@@ -149,7 +147,6 @@ func NewGroupMessageReader(db *sql.DB, startDate int64) *GroupMessageReader {
149147
query,
150148
queryHeight,
151149
func() *GroupMessage { return &GroupMessage{} },
152-
startDate,
153150
),
154151
}
155152
}
@@ -158,11 +155,11 @@ type CommitMessageReader struct {
158155
*DBReader[*CommitMessage]
159156
}
160157

161-
func NewCommitMessageReader(db *sql.DB, startDate int64) *CommitMessageReader {
158+
func NewCommitMessageReader(db *sql.DB) *CommitMessageReader {
162159
query := `
163160
SELECT id, created_at, group_id, data, group_id_data_hash, is_commit, sender_hmac, should_push
164161
FROM group_messages
165-
WHERE id > $1 AND is_commit = true AND created_at > to_timestamp($3)
162+
WHERE id > $1 AND is_commit = true
166163
ORDER BY id ASC
167164
LIMIT $2
168165
`
@@ -181,7 +178,6 @@ func NewCommitMessageReader(db *sql.DB, startDate int64) *CommitMessageReader {
181178
query,
182179
queryHeight,
183180
func() *CommitMessage { return &CommitMessage{} },
184-
startDate,
185181
),
186182
}
187183
}
@@ -190,11 +186,11 @@ type InboxLogReader struct {
190186
*DBReader[*InboxLog]
191187
}
192188

193-
func NewInboxLogReader(db *sql.DB, startDate int64) *InboxLogReader {
189+
func NewInboxLogReader(db *sql.DB) *InboxLogReader {
194190
query := `
195191
SELECT sequence_id, inbox_id, server_timestamp_ns, identity_update_proto
196192
FROM inbox_log
197-
WHERE sequence_id > $1 AND server_timestamp_ns > $3
193+
WHERE sequence_id > $1
198194
ORDER BY sequence_id ASC
199195
LIMIT $2
200196
`
@@ -212,7 +208,6 @@ func NewInboxLogReader(db *sql.DB, startDate int64) *InboxLogReader {
212208
query,
213209
queryHeight,
214210
func() *InboxLog { return &InboxLog{} },
215-
startDate,
216211
),
217212
}
218213
}
@@ -243,7 +238,6 @@ func NewKeyPackageReader(db *sql.DB) *KeyPackageReader {
243238
query,
244239
queryHeight,
245240
func() *KeyPackage { return &KeyPackage{} },
246-
0,
247241
),
248242
}
249243
}
@@ -252,11 +246,11 @@ type WelcomeMessageReader struct {
252246
*DBReader[*WelcomeMessage]
253247
}
254248

255-
func NewWelcomeMessageReader(db *sql.DB, startDate int64) *WelcomeMessageReader {
249+
func NewWelcomeMessageReader(db *sql.DB) *WelcomeMessageReader {
256250
query := `
257251
SELECT id, created_at, installation_key, data, hpke_public_key, installation_key_data_hash, wrapper_algorithm, welcome_metadata
258252
FROM welcome_messages
259-
WHERE id > $1 AND created_at > to_timestamp($3)
253+
WHERE id > 150000000 AND id > $1
260254
ORDER BY id ASC
261255
LIMIT $2
262256
`
@@ -274,7 +268,6 @@ func NewWelcomeMessageReader(db *sql.DB, startDate int64) *WelcomeMessageReader
274268
query,
275269
queryHeight,
276270
func() *WelcomeMessage { return &WelcomeMessage{} },
277-
startDate,
278271
),
279272
}
280273
}

pkg/migrator/reader_test.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,19 @@ package migrator_test
22

33
import (
44
"testing"
5-
"time"
65

76
"github.com/stretchr/testify/require"
87
"github.com/xmtp/xmtpd/pkg/migrator"
98
"github.com/xmtp/xmtpd/pkg/migrator/testdata"
109
)
1110

12-
var startDate time.Time
13-
1411
func TestGroupMessageReader(t *testing.T) {
1512
ctx := t.Context()
1613

1714
db, _, cleanup := testdata.NewMigratorTestDB(t, ctx)
1815
defer cleanup()
1916

20-
reader := migrator.NewGroupMessageReader(db, startDate.Unix())
17+
reader := migrator.NewGroupMessageReader(db)
2118

2219
cases := []struct {
2320
name string
@@ -69,7 +66,7 @@ func TestInboxLogReader(t *testing.T) {
6966
db, _, cleanup := testdata.NewMigratorTestDB(t, ctx)
7067
defer cleanup()
7168

72-
reader := migrator.NewInboxLogReader(db, startDate.UnixNano())
69+
reader := migrator.NewInboxLogReader(db)
7370

7471
cases := []struct {
7572
name string
@@ -173,7 +170,7 @@ func TestCommitMessageReader(t *testing.T) {
173170
db, _, cleanup := testdata.NewMigratorTestDB(t, ctx)
174171
defer cleanup()
175172

176-
reader := migrator.NewCommitMessageReader(db, startDate.Unix())
173+
reader := migrator.NewCommitMessageReader(db)
177174

178175
cases := []struct {
179176
name string
@@ -220,7 +217,7 @@ func TestWelcomeMessageReader(t *testing.T) {
220217
db, _, cleanup := testdata.NewMigratorTestDB(t, ctx)
221218
defer cleanup()
222219

223-
reader := migrator.NewWelcomeMessageReader(db, startDate.Unix())
220+
reader := migrator.NewWelcomeMessageReader(db)
224221

225222
cases := []struct {
226223
name string

0 commit comments

Comments
 (0)