diff --git a/cmd/syncv3/main.go b/cmd/syncv3/main.go index 6c81ab2b..f42b6cac 100644 --- a/cmd/syncv3/main.go +++ b/cmd/syncv3/main.go @@ -3,7 +3,13 @@ package main import ( "flag" "fmt" - "log" + "github.com/getsentry/sentry-go" + sentryhttp "github.com/getsentry/sentry-go/http" + "github.com/pressly/goose/v3" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "net/http" _ "net/http/pprof" "os" @@ -14,13 +20,6 @@ import ( "syscall" "time" - "github.com/getsentry/sentry-go" - sentryhttp "github.com/getsentry/sentry-go/http" - "github.com/pressly/goose/v3" - "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/rs/zerolog" - "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" - syncv3 "github.com/matrix-org/sliding-sync" "github.com/matrix-org/sliding-sync/internal" "github.com/matrix-org/sliding-sync/sync2" @@ -52,6 +51,7 @@ const ( EnvOTLPPassword = "SYNCV3_OTLP_PASSWORD" EnvSentryDsn = "SYNCV3_SENTRY_DSN" EnvLogLevel = "SYNCV3_LOG_LEVEL" + EnvPlainOutput = "SYNCV3_PLAIN_OUTPUT" EnvMaxConns = "SYNCV3_MAX_DB_CONN" EnvIdleTimeoutSecs = "SYNCV3_DB_IDLE_TIMEOUT_SECS" EnvHTTPTimeoutSecs = "SYNCV3_HTTP_TIMEOUT_SECS" @@ -74,11 +74,12 @@ Environment var %s Default: unset. The Sentry DSN to report events to e.g https://sliding-sync@sentry.example.com/123 - if unset does not send sentry events. %s Default: info. The level of verbosity for messages logged. Available values are trace, debug, info, warn, error and fatal %s Default: unset. Max database connections to use when communicating with postgres. Unset or 0 means no limit. +%s Default: unset. Disable colorized output (for cleaner text logging). If set to 1, will output plain text. %s Default: 3600. The maximum amount of time a database connection may be idle, in seconds. 0 means no limit. %s Default: 300. The timeout in seconds for normal HTTP requests. %s Default: 1800. The timeout in seconds for initial sync requests. `, EnvServer, EnvDB, EnvSecret, EnvBindAddr, EnvTLSCert, EnvTLSKey, EnvPPROF, EnvPrometheus, EnvOTLP, EnvOTLPUsername, EnvOTLPPassword, - EnvSentryDsn, EnvLogLevel, EnvMaxConns, EnvIdleTimeoutSecs, EnvHTTPTimeoutSecs, EnvHTTPInitialTimeoutSecs) + EnvSentryDsn, EnvLogLevel, EnvMaxConns, EnvPlainOutput, EnvIdleTimeoutSecs, EnvHTTPTimeoutSecs, EnvHTTPInitialTimeoutSecs) func defaulting(in, dft string) string { if in == "" { @@ -113,6 +114,7 @@ func main() { EnvSentryDsn: os.Getenv(EnvSentryDsn), EnvLogLevel: os.Getenv(EnvLogLevel), EnvMaxConns: defaulting(os.Getenv(EnvMaxConns), "0"), + EnvPlainOutput: defaulting(os.Getenv(EnvPlainOutput), "0"), EnvIdleTimeoutSecs: defaulting(os.Getenv(EnvIdleTimeoutSecs), "3600"), EnvHTTPTimeoutSecs: defaulting(os.Getenv(EnvHTTPTimeoutSecs), "300"), EnvHTTPInitialTimeoutSecs: defaulting(os.Getenv(EnvHTTPInitialTimeoutSecs), "1800"), @@ -194,6 +196,25 @@ func main() { } } + if args[EnvPlainOutput] != "1" { + log.Logger = log.Output(zerolog.ConsoleWriter{ + Out: os.Stderr, + TimeFormat: "15:04:05", + }) + } else { + output := zerolog.ConsoleWriter{Out: os.Stdout, TimeFormat: time.RFC3339} + output.FormatTimestamp = func(i interface{}) string { + return fmt.Sprintf("%v", i) + } + output.FormatLevel = func(i interface{}) string { + return strings.ToUpper(fmt.Sprintf("%s", i)) + } + output.FormatFieldName = func(i interface{}) string { + return fmt.Sprintf("%s=", i) + } + log.Logger = zerolog.New(output).With().Timestamp().Logger() + } + maxConnsInt, err := strconv.Atoi(args[EnvMaxConns]) if err != nil { panic("invalid value for " + EnvMaxConns + ": " + args[EnvMaxConns]) @@ -285,12 +306,12 @@ func executeMigrations() { db, err := goose.OpenDBWithDriver("postgres", envArgs[EnvDB]) if err != nil { - log.Fatalf("goose: failed to open DB: %v\n", err) + log.Fatal().Err(err).Msgf("goose: failed to open DB: %v\n", err) } defer func() { if err := db.Close(); err != nil { - log.Fatalf("goose: failed to close DB: %v\n", err) + log.Fatal().Err(err).Msgf("goose: failed to close DB: %v\n", err) } }() @@ -301,7 +322,7 @@ func executeMigrations() { goose.SetBaseFS(syncv3.EmbedMigrations) if err := goose.Run(command, db, "state/migrations", arguments...); err != nil { - log.Fatalf("goose %v: %v", command, err) + log.Fatal().Err(err).Msgf("goose %v: %v", command, err) } } diff --git a/internal/errors.go b/internal/errors.go index 14225693..135881d5 100644 --- a/internal/errors.go +++ b/internal/errors.go @@ -8,15 +8,9 @@ import ( "runtime" "github.com/getsentry/sentry-go" - - "github.com/rs/zerolog" + "github.com/rs/zerolog/log" ) -var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{ - Out: os.Stderr, - TimeFormat: "15:04:05", -}) - type HandlerError struct { StatusCode int Err error @@ -103,7 +97,7 @@ func assert(msg string, expr bool) { if os.Getenv("SYNCV3_DEBUG") == "1" { panic(fmt.Sprintf("assert: %s", msg)) } - l := logger.Error() + l := log.Error() _, file, line, ok := runtime.Caller(1) if ok { l = l.Str("assertion", fmt.Sprintf("%s:%d", file, line)) diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index f3132ae8..20dd3879 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -2,19 +2,12 @@ package pubsub import ( "fmt" - "os" "sync" "time" "github.com/prometheus/client_golang/prometheus" - "github.com/rs/zerolog" ) -var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{ - Out: os.Stderr, - TimeFormat: "15:04:05", -}) - type Payload interface { // The type of payload; used mostly for logging and prometheus metrics Type() string diff --git a/pubsub/v2.go b/pubsub/v2.go index 317e96cd..0cbb27b3 100644 --- a/pubsub/v2.go +++ b/pubsub/v2.go @@ -4,6 +4,7 @@ import ( "encoding/json" "github.com/matrix-org/sliding-sync/internal" + "github.com/rs/zerolog/log" ) // The channel which has V2* payloads @@ -197,7 +198,7 @@ func (v *V2Sub) onMessage(p Payload) { case *V2StateRedaction: v.receiver.OnStateRedaction(pl) default: - logger.Warn().Str("type", p.Type()).Msg("V2Sub: unhandled payload type") + log.Warn().Str("type", p.Type()).Msg("V2Sub: unhandled payload type") } } diff --git a/pubsub/v3.go b/pubsub/v3.go index 77bb5db9..61fd2987 100644 --- a/pubsub/v3.go +++ b/pubsub/v3.go @@ -1,5 +1,9 @@ package pubsub +import ( + "github.com/rs/zerolog/log" +) + // The channel which has V3* payloads const ChanV3 = "v3ch" @@ -39,7 +43,7 @@ func (v *V3Sub) onMessage(p Payload) { case *V3EnsurePolling: v.receiver.EnsurePolling(pl) default: - logger.Warn().Str("type", p.Type()).Msg("V3Sub: unhandled payload type") + log.Warn().Str("type", p.Type()).Msg("V3Sub: unhandled payload type") } } diff --git a/sqlutil/sql.go b/sqlutil/sql.go index 5c4a6406..7b5461ee 100644 --- a/sqlutil/sql.go +++ b/sqlutil/sql.go @@ -4,18 +4,12 @@ import ( "context" "fmt" "github.com/matrix-org/sliding-sync/internal" - "github.com/rs/zerolog" - "os" "runtime/debug" "github.com/jmoiron/sqlx" + "github.com/rs/zerolog/log" ) -var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{ - Out: os.Stderr, - TimeFormat: "15:04:05", -}) - // WithTransaction runs a block of code passing in an SQL transaction // If the code returns an error or panics then the transactions is rolled back // Otherwise the transaction is committed. @@ -30,7 +24,7 @@ func WithTransaction(db *sqlx.DB, fn func(txn *sqlx.Tx) error) (err error) { if err == nil && panicErr != nil { // TODO: thread a context through to here? ctx := context.Background() - logger.Error().Msg(string(debug.Stack())) + log.Error().Msg(string(debug.Stack())) internal.GetSentryHubFromContextOrDefault(ctx).RecoverWithContext(ctx, panicErr) err = fmt.Errorf("panic: %v", panicErr) } @@ -59,7 +53,8 @@ type Chunker interface { // Inserting events using NamedExec involves 3n params (n=number of events), meaning it's easy to hit // the limit in rooms like Matrix HQ. This function breaks up the events into chunks which can be // batch inserted in multiple statements. Without this, you'll see errors like: -// "pq: got 95331 parameters but PostgreSQL only supports 65535 parameters" +// +// "pq: got 95331 parameters but PostgreSQL only supports 65535 parameters" func Chunkify(numParamsPerStmt, maxParamsPerCall int, entries Chunker) []Chunker { // common case, most things are small if (entries.Len() * numParamsPerStmt) <= maxParamsPerCall { diff --git a/state/accumulator.go b/state/accumulator.go index 678ef4e8..9ea6404e 100644 --- a/state/accumulator.go +++ b/state/accumulator.go @@ -13,6 +13,7 @@ import ( "github.com/jmoiron/sqlx" "github.com/lib/pq" "github.com/matrix-org/sliding-sync/sqlutil" + "github.com/rs/zerolog/log" "github.com/tidwall/gjson" ) @@ -77,7 +78,7 @@ func (a *Accumulator) calculateNewSnapshot(old StrippedEvents, new Event) (Strip // ruh roh. This should be impossible, but it can happen if the v2 response sends the same // event in both state and timeline. We need to alert the operator and whine badly as it means // we have lost an event by now. - logger.Warn().Str("new_event_id", new.ID).Str("old_event_id", e.ID).Str("room_id", new.RoomID).Str("type", new.Type).Str("state_key", new.StateKey).Msg( + log.Warn().Str("new_event_id", new.ID).Str("old_event_id", e.ID).Str("room_id", new.RoomID).Str("type", new.Type).Str("state_key", new.StateKey).Msg( "Detected different event IDs with the same NID when rolling forward state. This has resulted in data loss in this room (1 event). " + "This can happen when the v2 /sync response sends the same event in both state and timeline sections. " + "The event in this log line has been dropped!", @@ -227,7 +228,7 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (Initia // we don't have a current snapshot for this room but yet no events are new, // no idea how this should be handled. const errMsg = "Accumulator.Initialise: room has no current snapshot but also no new inserted events, doing nothing. This is probably a bug." - logger.Error().Str("room_id", roomID).Msg(errMsg) + log.Error().Str("room_id", roomID).Msg(errMsg) sentry.CaptureException(fmt.Errorf(errMsg)) } // Note: we otherwise ignore cases where the state has only changed to a @@ -398,7 +399,7 @@ func (a *Accumulator) Accumulate(txn *sqlx.Tx, userID, roomID string, timeline s } else { // Bail out and complain loudly. const msg = "Accumulator: skipping processing of timeline, as no snapshot exists" - logger.Warn(). + log.Warn(). Str("event_id", newEvents[0].ID). Str("event_type", newEvents[0].Type). Str("event_state_key", newEvents[0].StateKey). @@ -484,7 +485,7 @@ func (a *Accumulator) Accumulate(txn *sqlx.Tx, userID, roomID string, timeline s if roomVersion == "" { // Defaults to "1" if the key does not exist. roomVersion = "1" - logger.Warn().Str("room", roomID).Err(err).Msg( + log.Warn().Str("room", roomID).Err(err).Msg( "Redact: no content.room_version in create event, defaulting to v1", ) } @@ -576,13 +577,13 @@ func parseAndDeduplicateTimelineEvents(roomID string, timeline sync2.TimelineRes RoomID: roomID, } if err := e.ensureFieldsSetOnEvent(); err != nil { - logger.Warn().Str("event_id", e.ID).Str("room_id", roomID).Err(err).Msg( + log.Warn().Str("event_id", e.ID).Str("room_id", roomID).Err(err).Msg( "Accumulator.filterToNewTimelineEvents: failed to parse event, ignoring", ) continue } if _, ok := seenEvents[e.ID]; ok { - logger.Warn().Str("event_id", e.ID).Str("room_id", roomID).Msg( + log.Warn().Str("event_id", e.ID).Str("room_id", roomID).Msg( "Accumulator.filterToNewTimelineEvents: seen the same event ID twice, ignoring", ) continue @@ -671,7 +672,7 @@ func ensureStateHasCreateEvent(events []Event) error { }) sentry.CaptureMessage(errMsg) }) - logger.Warn(). + log.Warn(). Str("room_id", events[0].RoomID). Int("len_state", len(events)). Msg(errMsg) diff --git a/state/event_table.go b/state/event_table.go index 33c5c6c5..413112f9 100644 --- a/state/event_table.go +++ b/state/event_table.go @@ -14,6 +14,7 @@ import ( "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/sliding-sync/internal" "github.com/matrix-org/sliding-sync/sqlutil" + "github.com/rs/zerolog/log" ) const ( @@ -393,7 +394,7 @@ func (t *EventTable) Redact(txn *sqlx.Tx, roomVer string, redacteeEventIDToRedac if err != nil { // unknown room version... let's just default to "1" rv = gomatrixserverlib.MustGetRoomVersion(gomatrixserverlib.RoomVersionV1) - logger.Warn().Str("version", roomVer).Err(err).Msg( + log.Warn().Str("version", roomVer).Err(err).Msg( "Redact: GetRoomVersion: unknown room version, defaulting to v1", ) } @@ -567,7 +568,7 @@ func filterAndEnsureFieldsSet(events []Event) []Event { for i := range events { ev := &events[i] if err := ev.ensureFieldsSetOnEvent(); err != nil { - logger.Warn().Str("event_id", ev.ID).Err(err).Msg( + log.Warn().Str("event_id", ev.ID).Err(err).Msg( "filterAndEnsureFieldsSet: failed to parse event, ignoring", ) continue diff --git a/state/migrations/20230822180807_bogus_snapshot_cleanup.go b/state/migrations/20230822180807_bogus_snapshot_cleanup.go index a42e950b..39330c38 100644 --- a/state/migrations/20230822180807_bogus_snapshot_cleanup.go +++ b/state/migrations/20230822180807_bogus_snapshot_cleanup.go @@ -4,17 +4,12 @@ import ( "context" "database/sql" "fmt" + "github.com/lib/pq" "github.com/pressly/goose/v3" - "github.com/rs/zerolog" - "os" + "github.com/rs/zerolog/log" ) -var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{ - Out: os.Stderr, - TimeFormat: "15:04:05", -}) - func init() { goose.AddMigrationContext(upBogusSnapshotCleanup, downBogusSnapshotCleanup) } @@ -29,7 +24,7 @@ func upBogusSnapshotCleanup(ctx context.Context, tx *sql.Tx) error { if len(bogusRooms) == 0 { return nil } - logger.Info().Strs("room_ids", bogusRooms). + log.Info().Strs("room_ids", bogusRooms). Msgf("Found %d bogus rooms to cleanup", len(bogusRooms)) tables := []string{"syncv3_snapshots", "syncv3_events", "syncv3_rooms"} @@ -52,9 +47,9 @@ func deleteFromTable(ctx context.Context, tx *sql.Tx, table string, roomIDs []st } ra, err := result.RowsAffected() if err != nil { - logger.Warn().Err(err).Msgf("Couldn't get number of rows deleted from %s", table) + log.Warn().Err(err).Msgf("Couldn't get number of rows deleted from %s", table) } else { - logger.Info().Msgf("Deleted %d rows from %s", ra, table) + log.Info().Msgf("Deleted %d rows from %s", ra, table) } return nil } diff --git a/state/migrations/20231108122539_clear_stuck_invites.go b/state/migrations/20231108122539_clear_stuck_invites.go index acd4afc8..9ffc27a9 100644 --- a/state/migrations/20231108122539_clear_stuck_invites.go +++ b/state/migrations/20231108122539_clear_stuck_invites.go @@ -7,6 +7,7 @@ import ( "github.com/lib/pq" "github.com/pressly/goose/v3" + "github.com/rs/zerolog/log" ) func init() { @@ -49,9 +50,9 @@ func upClearStuckInvites(ctx context.Context, tx *sql.Tx) error { } usersToInvalidate = append(usersToInvalidate, userID) } - logger.Info().Int("len_invalidate_users", len(usersToInvalidate)).Msg("invalidating users") + log.Info().Int("len_invalidate_users", len(usersToInvalidate)).Msg("invalidating users") if len(usersToInvalidate) < 50 { - logger.Info().Strs("invalidate_users", usersToInvalidate).Msg("invalidating users") + log.Info().Strs("invalidate_users", usersToInvalidate).Msg("invalidating users") } // for each user: @@ -64,7 +65,7 @@ func upClearStuckInvites(ctx context.Context, tx *sql.Tx) error { return fmt.Errorf("failed to invalidate since tokens: %w", err) } ra, _ := res.RowsAffected() - logger.Info().Int64("num_devices", ra).Msg("reset since tokens") + log.Info().Int64("num_devices", ra).Msg("reset since tokens") res, err = tx.ExecContext(ctx, ` DELETE FROM syncv3_invites WHERE user_id=ANY($1) @@ -73,7 +74,7 @@ func upClearStuckInvites(ctx context.Context, tx *sql.Tx) error { return fmt.Errorf("failed to remove outstanding invites: %w", err) } ra, _ = res.RowsAffected() - logger.Info().Int64("num_invites", ra).Msg("reset invites") + log.Info().Int64("num_invites", ra).Msg("reset invites") return nil } diff --git a/state/migrations/20240517104423_device_list_table.go b/state/migrations/20240517104423_device_list_table.go index 315b79c2..cbe23cdc 100644 --- a/state/migrations/20240517104423_device_list_table.go +++ b/state/migrations/20240517104423_device_list_table.go @@ -12,6 +12,7 @@ import ( "github.com/matrix-org/sliding-sync/sqlutil" "github.com/matrix-org/sliding-sync/state" "github.com/pressly/goose/v3" + "github.com/rs/zerolog/log" ) type OldDeviceData struct { @@ -63,7 +64,7 @@ func upDeviceListTable(ctx context.Context, tx *sql.Tx) error { if err = tx.QueryRow(`SELECT count(*) FROM syncv3_device_data`).Scan(&count); err != nil { return err } - logger.Info().Int("count", count).Msg("transferring device list data for devices") + log.Info().Int("count", count).Msg("transferring device list data for devices") // scan for existing CBOR (streaming as the CBOR with cursors as it can be large) _, err = tx.Exec(`DECLARE device_data_migration_cursor CURSOR FOR SELECT user_id, device_id, data FROM syncv3_device_data`) @@ -82,7 +83,7 @@ func upDeviceListTable(ctx context.Context, tx *sql.Tx) error { // logging i++ if time.Since(lastUpdate) > updateFrequency { - logger.Info().Msgf("%d/%d process device list data", i, count) + log.Info().Msgf("%d/%d process device list data", i, count) lastUpdate = time.Now() } diff --git a/state/storage.go b/state/storage.go index 0d0faa74..08ed9d8f 100644 --- a/state/storage.go +++ b/state/storage.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "os" "strings" "time" @@ -18,15 +17,10 @@ import ( "github.com/jmoiron/sqlx" "github.com/matrix-org/sliding-sync/internal" "github.com/matrix-org/sliding-sync/sqlutil" - "github.com/rs/zerolog" + "github.com/rs/zerolog/log" "github.com/tidwall/gjson" ) -var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{ - Out: os.Stderr, - TimeFormat: "15:04:05", -}) - // Max number of parameters in a single SQL command const MaxPostgresParameters = 65535 @@ -79,7 +73,7 @@ func NewStorage(postgresURI string) *Storage { if err != nil { sentry.CaptureException(err) // TODO: if we panic(), will sentry have a chance to flush the event? - logger.Panic().Err(err).Str("uri", postgresURI).Msg("failed to open SQL DB") + log.Panic().Err(err).Str("uri", postgresURI).Msg("failed to open SQL DB") } return NewStorageWithDB(db, false) } @@ -545,7 +539,7 @@ func (s *Storage) RoomStateAfterEventPosition(ctx context.Context, roomIDs []str return fmt.Errorf("failed to select latest nids in rooms %v: %s", roomIDs, err) } if len(slowRooms) > 0 { - logger.Warn().Int("slow_rooms", len(slowRooms)).Msg("RoomStateAfterEventPosition: pos value provided is far behind the database copy, performance degraded") + log.Warn().Int("slow_rooms", len(slowRooms)).Msg("RoomStateAfterEventPosition: pos value provided is far behind the database copy, performance degraded") latestSlowEvents, err := s.Accumulator.eventsTable.LatestEventInRooms(txn, slowRooms, pos) if err != nil { return err @@ -653,7 +647,7 @@ func (s *Storage) RoomStateAfterEventPosition(ctx context.Context, roomIDs []str WITH nids AS ( SELECT `+nidcols+` AS allNids FROM syncv3_snapshots WHERE syncv3_snapshots.snapshot_id = ANY(?) ) - SELECT syncv3_events.event_nid, syncv3_events.room_id, syncv3_events.event_type, syncv3_events.state_key, syncv3_events.event + SELECT syncv3_events.event_nid, syncv3_events.room_id, syncv3_events.event_type, syncv3_events.state_key, syncv3_events.event FROM syncv3_events, nids WHERE (`+strings.Join(wheres, " OR ")+`) AND syncv3_events.event_nid = ANY(nids.allNids) ORDER BY syncv3_events.event_nid ASC`, @@ -801,7 +795,7 @@ func (s *Storage) RemoveInaccessibleStateSnapshots() error { } rowsAffected, err := result.RowsAffected() if err == nil { - logger.Info().Int64("rows_affected", rowsAffected).Msg("RemoveInaccessibleStateSnapshots: deleted rows") + log.Info().Int64("rows_affected", rowsAffected).Msg("RemoveInaccessibleStateSnapshots: deleted rows") } return nil } @@ -1082,16 +1076,16 @@ Loop: if n < time.Hour { boundaryTime = now.Add(-1 * time.Hour) } - logger.Info().Time("boundaryTime", boundaryTime).Msg("Cleaner running") + log.Info().Time("boundaryTime", boundaryTime).Msg("Cleaner running") err := s.TransactionsTable.Clean(boundaryTime) if err != nil { - logger.Warn().Err(err).Msg("failed to clean txn ID table") + log.Warn().Err(err).Msg("failed to clean txn ID table") sentry.CaptureException(err) } // we also want to clean up stale state snapshots which are inaccessible, to // keep the size of the syncv3_snapshots table low. if err = s.RemoveInaccessibleStateSnapshots(); err != nil { - logger.Warn().Err(err).Msg("failed to remove inaccessible state snapshots") + log.Warn().Err(err).Msg("failed to remove inaccessible state snapshots") sentry.CaptureException(err) } case <-s.shutdownCh: @@ -1123,7 +1117,7 @@ func (s *Storage) LatestEventNIDInRooms(roomIDs []string, highestNID int64) (roo if len(slowRooms) == 0 { return nil // no work to do } - logger.Warn().Int("slow_rooms", len(slowRooms)).Msg("LatestEventNIDInRooms: pos value provided is far behind the database copy, performance degraded") + log.Warn().Int("slow_rooms", len(slowRooms)).Msg("LatestEventNIDInRooms: pos value provided is far behind the database copy, performance degraded") slowRoomToLatestNIDs, err := s.EventsTable.LatestEventNIDInRooms(txn, slowRooms, highestNID) if err != nil { diff --git a/state/to_device_table.go b/state/to_device_table.go index 8035dd77..fccd4e60 100644 --- a/state/to_device_table.go +++ b/state/to_device_table.go @@ -8,6 +8,7 @@ import ( "github.com/jmoiron/sqlx" "github.com/lib/pq" "github.com/matrix-org/sliding-sync/sqlutil" + "github.com/rs/zerolog/log" "github.com/tidwall/gjson" ) @@ -109,7 +110,7 @@ func (t *ToDeviceTable) Messages(userID, deviceID string, from, limit int64) (ms m := gjson.ParseBytes(msgs[i]) msgId := m.Get(`content.org\.matrix\.msgid`).Str if msgId != "" { - logger.Info().Str("msgid", msgId).Str("user", userID).Str("device", deviceID).Msg("ToDeviceTable.Messages") + log.Info().Str("msgid", msgId).Str("user", userID).Str("device", deviceID).Msg("ToDeviceTable.Messages") } } upTo = rows[len(rows)-1].Position @@ -143,7 +144,7 @@ func (t *ToDeviceTable) InsertMessages(userID, deviceID string, msgs []json.RawM } msgId := m.Get(`content.org\.matrix\.msgid`).Str if msgId != "" { - logger.Debug().Str("msgid", msgId).Str("user", userID).Str("device", deviceID).Msg("ToDeviceTable.InsertMessages") + log.Debug().Str("msgid", msgId).Str("user", userID).Str("device", deviceID).Msg("ToDeviceTable.InsertMessages") } switch rows[i].Type { case "m.room_key_request": diff --git a/sync2/handler2/handler.go b/sync2/handler2/handler.go index 7b6ecf5b..e48a14a5 100644 --- a/sync2/handler2/handler.go +++ b/sync2/handler2/handler.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "hash/fnv" - "os" "sync" "time" @@ -19,16 +18,11 @@ import ( "github.com/matrix-org/sliding-sync/state" "github.com/matrix-org/sliding-sync/sync2" "github.com/prometheus/client_golang/prometheus" - "github.com/rs/zerolog" + "github.com/rs/zerolog/log" "github.com/tidwall/gjson" "github.com/tidwall/sjson" ) -var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{ - Out: os.Stderr, - TimeFormat: "15:04:05", -}) - // Handler is responsible for starting v2 pollers at startup; // processing v2 data (as a sync2.V2DataReceiver) and publishing updates (pubsub.Payload to V2Listeners); // and receiving and processing EnsurePolling events. @@ -96,7 +90,7 @@ func (h *Handler) Listen() { defer internal.ReportPanicsToSentry() err := h.v3Sub.Listen() if err != nil { - logger.Err(err).Msg("Failed to listen for v3 messages") + log.Err(err).Msg("Failed to listen for v3 messages") sentry.CaptureException(err) } }() @@ -124,7 +118,7 @@ func (h *Handler) Teardown() { func (h *Handler) StartV2Pollers() { tokens, err := h.v2Store.TokensTable.TokenForEachDevice(nil) if err != nil { - logger.Err(err).Msg("StartV2Pollers: failed to query tokens") + log.Err(err).Msg("StartV2Pollers: failed to query tokens") sentry.CaptureException(err) return } @@ -143,7 +137,7 @@ func (h *Handler) StartV2Pollers() { ch <- t } close(ch) - logger.Info().Int("num_devices", len(tokens)).Int("num_fail_decrypt", numFails).Msg("StartV2Pollers") + log.Info().Int("num_devices", len(tokens)).Int("num_fail_decrypt", numFails).Msg("StartV2Pollers") var wg sync.WaitGroup wg.Add(numWorkers) for i := 0; i < numWorkers; i++ { @@ -155,11 +149,9 @@ func (h *Handler) StartV2Pollers() { DeviceID: t.DeviceID, } _, err = h.pMap.EnsurePolling( - pid, t.AccessToken, t.Since, true, - logger.With().Str("user_id", t.UserID).Str("device_id", t.DeviceID).Logger(), - ) + pid, t.AccessToken, t.Since, true) if err != nil { - logger.Err(err).Str("user_id", t.UserID).Str("device_id", t.DeviceID).Msg("Failed to start poller") + log.Err(err).Str("user_id", t.UserID).Str("device_id", t.DeviceID).Msg("Failed to start poller") } else { h.updateMetrics() } @@ -172,7 +164,7 @@ func (h *Handler) StartV2Pollers() { }() } wg.Wait() - logger.Info().Msg("StartV2Pollers finished") + log.Info().Msg("StartV2Pollers finished") h.startPollerExpiryTicker() } @@ -198,7 +190,7 @@ func (h *Handler) OnTerminated(ctx context.Context, pollerID sync2.PollerID) { func (h *Handler) OnExpiredToken(ctx context.Context, accessTokenHash, userID, deviceID string) { err := h.v2Store.TokensTable.Delete(accessTokenHash) if err != nil { - logger.Err(err).Str("user", userID).Str("device", deviceID).Str("access_token_hash", accessTokenHash).Msg("V2: failed to expire token") + log.Err(err).Str("user", userID).Str("device", deviceID).Str("access_token_hash", accessTokenHash).Msg("V2: failed to expire token") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) } // Notify v3 side so it can remove the connection from ConnMap @@ -222,7 +214,7 @@ func (h *Handler) addPrometheusMetrics() { func (h *Handler) UpdateDeviceSince(ctx context.Context, userID, deviceID, since string) { err := h.v2Store.DevicesTable.UpdateDeviceSince(userID, deviceID, since) if err != nil { - logger.Err(err).Str("user", userID).Str("device", deviceID).Str("since", since).Msg("V2: failed to persist since token") + log.Err(err).Str("user", userID).Str("device", deviceID).Str("since", since).Msg("V2: failed to persist since token") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) } } @@ -237,7 +229,7 @@ func (h *Handler) OnE2EEData(ctx context.Context, userID, deviceID string, otkCo FallbackKeyTypes: fallbackKeyTypes, }, deviceListChanges) if err != nil { - logger.Err(err).Str("user", userID).Msg("failed to upsert device data") + log.Err(err).Str("user", userID).Msg("failed to upsert device data") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) retErr = err return @@ -287,7 +279,7 @@ func (h *Handler) Accumulate(ctx context.Context, userID, deviceID, roomID strin // persist the txn IDs err := h.Store.TransactionsTable.Insert(userID, deviceID, eventIDToTxnID) if err != nil { - logger.Err(err).Str("user", userID).Str("device", deviceID).Int("num_txns", len(eventIDToTxnID)).Msg("failed to persist txn IDs for user") + log.Err(err).Str("user", userID).Str("device", deviceID).Int("num_txns", len(eventIDToTxnID)).Msg("failed to persist txn IDs for user") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) } } @@ -295,7 +287,7 @@ func (h *Handler) Accumulate(ctx context.Context, userID, deviceID, roomID strin // Insert new events accResult, err := h.Store.Accumulate(userID, roomID, timeline) if err != nil { - logger.Err(err).Int("timeline", len(timeline.Events)).Str("room", roomID).Msg("V2: failed to accumulate room") + log.Err(err).Int("timeline", len(timeline.Events)).Str("room", roomID).Msg("V2: failed to accumulate room") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return err } @@ -328,7 +320,7 @@ func (h *Handler) Accumulate(ctx context.Context, userID, deviceID, roomID strin return err }) if err != nil { - logger.Err(err). + log.Err(err). Int("timeline", len(timeline.Events)). Int("num_transaction_ids", len(eventIDsWithTxns)). Int("num_missing_transaction_ids", len(eventIDsLackingTxns)). @@ -376,7 +368,7 @@ func (h *Handler) Initialise(ctx context.Context, roomID string, state []json.Ra } res, err := h.Store.Initialise(roomID, state) if err != nil { - logger.Err(err).Int("state", len(state)).Str("room", roomID).Msg("V2: failed to initialise room") + log.Err(err).Int("state", len(state)).Str("room", roomID).Msg("V2: failed to initialise room") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return err } @@ -420,7 +412,7 @@ func (h *Handler) OnReceipt(ctx context.Context, userID, roomID, ephEventType st // else it returns nil newReceipts, err := h.Store.ReceiptTable.Insert(roomID, ephEvent) if err != nil { - logger.Err(err).Str("room", roomID).Msg("failed to store receipts") + log.Err(err).Str("room", roomID).Msg("failed to store receipts") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return } @@ -436,7 +428,7 @@ func (h *Handler) OnReceipt(ctx context.Context, userID, roomID, ephEventType st func (h *Handler) AddToDeviceMessages(ctx context.Context, userID, deviceID string, msgs []json.RawMessage) error { _, err := h.Store.ToDeviceTable.InsertMessages(userID, deviceID, msgs) if err != nil { - logger.Err(err).Str("user", userID).Str("device", deviceID).Int("msgs", len(msgs)).Msg("V2: failed to store to-device messages") + log.Err(err).Str("user", userID).Str("device", deviceID).Int("msgs", len(msgs)).Msg("V2: failed to store to-device messages") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return err } @@ -473,7 +465,7 @@ func (h *Handler) UpdateUnreadCounts(ctx context.Context, roomID, userID string, err := h.Store.UnreadTable.UpdateUnreadCounters(userID, roomID, highlightCount, notifCount) if err != nil { - logger.Err(err).Str("user", userID).Str("room", roomID).Msg("failed to update unread counters") + log.Err(err).Str("user", userID).Str("room", roomID).Msg("failed to update unread counters") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) } h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2UnreadCounts{ @@ -508,7 +500,7 @@ func (h *Handler) OnAccountData(ctx context.Context, userID, roomID string, even data, err := h.Store.InsertAccountData(userID, roomID, dedupedEvents) if err != nil { - logger.Err(err).Str("user", userID).Str("room", roomID).Msg("failed to update account data") + log.Err(err).Str("user", userID).Str("room", roomID).Msg("failed to update account data") sentry.CaptureException(err) return err } @@ -527,7 +519,7 @@ func (h *Handler) OnAccountData(ctx context.Context, userID, roomID string, even func (h *Handler) OnInvite(ctx context.Context, userID, roomID string, inviteState []json.RawMessage) error { err := h.Store.InvitesTable.InsertInvite(userID, roomID, inviteState) if err != nil { - logger.Err(err).Str("user", userID).Str("room", roomID).Msg("failed to insert invite") + log.Err(err).Str("user", userID).Str("room", roomID).Msg("failed to insert invite") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return err } @@ -542,7 +534,7 @@ func (h *Handler) OnLeftRoom(ctx context.Context, userID, roomID string, leaveEv // remove any invites for this user if they are rejecting an invite err := h.Store.InvitesTable.RemoveInvite(userID, roomID) if err != nil { - logger.Err(err).Str("user", userID).Str("room", roomID).Msg("failed to retire invite") + log.Err(err).Str("user", userID).Str("room", roomID).Msg("failed to retire invite") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return err } @@ -562,7 +554,7 @@ func (h *Handler) OnLeftRoom(ctx context.Context, userID, roomID string, leaveEv } func (h *Handler) EnsurePolling(p *pubsub.V3EnsurePolling) { - log := logger.With().Str("user_id", p.UserID).Str("device_id", p.DeviceID).Logger() + log := log.With().Str("user_id", p.UserID).Str("device_id", p.DeviceID).Logger() log.Info().Msg("EnsurePolling: new request") defer func() { log.Info().Msg("EnsurePolling: preprocessing done") @@ -581,7 +573,7 @@ func (h *Handler) EnsurePolling(p *pubsub.V3EnsurePolling) { DeviceID: p.DeviceID, } _, err = h.pMap.EnsurePolling( - pid, accessToken, since, false, log, + pid, accessToken, since, false, ) if err != nil { log.Err(err).Msg("Failed to start poller") @@ -614,7 +606,7 @@ func (h *Handler) startPollerExpiryTicker() { func (h *Handler) ExpireOldPollers() { devices, err := h.v2Store.DevicesTable.FindOldDevices(30 * 24 * time.Hour) if err != nil { - logger.Err(err).Msg("Error fetching old devices") + log.Err(err).Msg("Error fetching old devices") sentry.CaptureException(err) return } @@ -625,7 +617,7 @@ func (h *Handler) ExpireOldPollers() { } numExpired := h.pMap.ExpirePollers(pids) if len(devices) > 0 { - logger.Info().Int("old", len(devices)).Int("expired", numExpired).Msg("poller cleanup old devices") + log.Info().Int("old", len(devices)).Int("expired", numExpired).Msg("poller cleanup old devices") } } diff --git a/sync2/handler2/handler_test.go b/sync2/handler2/handler_test.go index adeb5f0e..2b03658d 100644 --- a/sync2/handler2/handler_test.go +++ b/sync2/handler2/handler_test.go @@ -17,7 +17,6 @@ import ( "github.com/matrix-org/sliding-sync/sync2" "github.com/matrix-org/sliding-sync/sync2/handler2" "github.com/matrix-org/sliding-sync/testutils" - "github.com/rs/zerolog" ) var postgresURI string @@ -52,7 +51,7 @@ func (p *mockPollerMap) ExpirePollers([]sync2.PollerID) int { return 0 } -func (p *mockPollerMap) EnsurePolling(pid sync2.PollerID, accessToken, v2since string, isStartup bool, logger zerolog.Logger) (bool, error) { +func (p *mockPollerMap) EnsurePolling(pid sync2.PollerID, accessToken, v2since string, isStartup bool) (bool, error) { p.calls = append(p.calls, pollInfo{ pid: pid, accessToken: accessToken, diff --git a/sync2/poller.go b/sync2/poller.go index e3e4839b..82e7405c 100644 --- a/sync2/poller.go +++ b/sync2/poller.go @@ -16,7 +16,7 @@ import ( "github.com/matrix-org/sliding-sync/internal" "github.com/prometheus/client_golang/prometheus" - "github.com/rs/zerolog" + "github.com/rs/zerolog/log" "github.com/tidwall/gjson" ) @@ -70,7 +70,7 @@ type V2DataReceiver interface { } type IPollerMap interface { - EnsurePolling(pid PollerID, accessToken, v2since string, isStartup bool, logger zerolog.Logger) (created bool, err error) + EnsurePolling(pid PollerID, accessToken, v2since string, isStartup bool) (created bool, err error) NumPollers() int Terminate() DeviceIDs(userID string) []string @@ -253,7 +253,7 @@ func (h *PollerMap) ExpirePollers(pids []PollerID) int { // Note that we will immediately return if there is a poller for the same user but a different device. // We do this to allow for logins on clients to be snappy fast, even though they won't yet have the // to-device msgs to decrypt E2EE rooms. -func (h *PollerMap) EnsurePolling(pid PollerID, accessToken, v2since string, isStartup bool, logger zerolog.Logger) (bool, error) { +func (h *PollerMap) EnsurePolling(pid PollerID, accessToken, v2since string, isStartup bool) (bool, error) { h.pollerMu.Lock() if !h.executorRunning { h.executorRunning = true @@ -263,7 +263,7 @@ func (h *PollerMap) EnsurePolling(pid PollerID, accessToken, v2since string, isS // a poller exists and hasn't been terminated so we don't need to do anything if ok && !poller.terminated.Load() { if poller.accessToken != accessToken { - logger.Warn().Msg("PollerMap.EnsurePolling: poller already running with different access token") + log.Warn().Msg("PollerMap.EnsurePolling: poller already running with different access token") } h.pollerMu.Unlock() // this existing poller may not have completed the initial sync yet, so we need to make sure @@ -288,7 +288,7 @@ func (h *PollerMap) EnsurePolling(pid PollerID, accessToken, v2since string, isS // replace the poller. If we don't need to wait, then we just want to nab to-device events initially. // We don't do that on startup though as we cannot be sure that other pollers will not be using expired tokens. - poller = newPoller(pid, accessToken, h.v2Client, h, logger, !needToWait && !isStartup) + poller = newPoller(pid, accessToken, h.v2Client, h, !needToWait && !isStartup) poller.processHistogramVec = h.processHistogramVec poller.timelineSizeVec = h.timelineSizeHistogramVec poller.gappyStateSizeVec = h.gappyStateSizeVec @@ -301,7 +301,7 @@ func (h *PollerMap) EnsurePolling(pid PollerID, accessToken, v2since string, isS if needToWait { poller.WaitUntilInitialSync() } else { - logger.Info().Str("user", poller.userID).Msg("a poller exists for this user; not waiting for this device to do an initial sync") + log.Info().Str("user", poller.userID).Msg("a poller exists for this user; not waiting for this device to do an initial sync") } if poller.terminated.Load() { return false, fmt.Errorf("PollerMap.EnsurePolling: poller terminated after intial sync") @@ -429,7 +429,6 @@ type poller struct { accessToken string client Client receiver V2DataReceiver - logger zerolog.Logger initialToDeviceOnly bool @@ -461,7 +460,7 @@ type poller struct { totalNumPolls prometheus.Counter } -func newPoller(pid PollerID, accessToken string, client Client, receiver V2DataReceiver, logger zerolog.Logger, initialToDeviceOnly bool) *poller { +func newPoller(pid PollerID, accessToken string, client Client, receiver V2DataReceiver, initialToDeviceOnly bool) *poller { var wg sync.WaitGroup wg.Add(1) return &poller{ @@ -471,7 +470,6 @@ func newPoller(pid PollerID, accessToken string, client Client, receiver V2DataR client: client, receiver: receiver, terminated: &atomic.Bool{}, - logger: logger, wg: &wg, initialToDeviceOnly: initialToDeviceOnly, } @@ -507,11 +505,11 @@ func (p *poller) Poll(since string) { }) ctx := sentry.SetHubOnContext(context.Background(), hub) - p.logger.Info().Str("since", since).Msg("Poller: v2 poll loop started") + log.Info().Str("since", since).Msg("Poller: v2 poll loop started") defer func() { panicErr := recover() if panicErr != nil { - logger.Error().Str("user", p.userID).Str("device", p.deviceID).Msgf("%s. Traceback:\n%s", panicErr, debug.Stack()) + log.Error().Str("user", p.userID).Str("device", p.deviceID).Msgf("%s. Traceback:\n%s", panicErr, debug.Stack()) internal.GetSentryHubFromContextOrDefault(ctx).RecoverWithContext(ctx, panicErr) } p.receiver.OnTerminated(ctx, PollerID{ @@ -554,7 +552,7 @@ func (p *poller) poll(ctx context.Context, s *pollLoopState) error { if s.failCount > 1000 { // 3s * 1000 = 3000s = 50 minutes errMsg := "poller: access token has failed >1000 times to /sync, terminating loop" - p.logger.Warn().Msg(errMsg) + log.Warn().Msg(errMsg) p.receiver.OnExpiredToken(ctx, hashToken(p.accessToken), p.userID, p.deviceID) p.Terminate() return fmt.Errorf(errMsg) @@ -563,7 +561,7 @@ func (p *poller) poll(ctx context.Context, s *pollLoopState) error { // period of time (on massive accounts on matrix.org) such that if you wait 2,4,8min between // requests it might force the server to do the work all over again :( waitTime := 3 * time.Second - p.logger.Warn().Str("duration", waitTime.String()).Int("fail-count", s.failCount).Msg("Poller: waiting before next poll") + log.Warn().Str("duration", waitTime.String()).Int("fail-count", s.failCount).Msg("Poller: waiting before next poll") timeSleep(waitTime) } if p.terminated.Load() { @@ -587,19 +585,19 @@ func (p *poller) poll(ctx context.Context, s *pollLoopState) error { // check if temporary isFatal := statusCode == 401 || statusCode == 403 if !isFatal { - p.logger.Warn().Int("code", statusCode).Err(err).Msg("Poller: sync v2 poll returned temporary error") + log.Warn().Int("code", statusCode).Err(err).Msg("Poller: sync v2 poll returned temporary error") s.failCount += 1 return nil } else { errMsg := "poller: access token has been invalidated, terminating loop" - p.logger.Warn().Msg(errMsg) + log.Warn().Msg(errMsg) p.receiver.OnExpiredToken(ctx, hashToken(p.accessToken), p.userID, p.deviceID) p.Terminate() return fmt.Errorf(errMsg) } } if s.since == "" { - p.logger.Info().Msg("Poller: valid initial sync response received") + log.Info().Msg("Poller: valid initial sync response received") } p.initialToDeviceOnly = false start = time.Now() @@ -609,19 +607,19 @@ func (p *poller) poll(ctx context.Context, s *pollLoopState) error { // retry processing the same response after a brief period retryErr := p.parseE2EEData(ctx, resp) if shouldRetry(retryErr) { - p.logger.Err(retryErr).Msg("Poller: parseE2EEData returned an error") + log.Err(retryErr).Msg("Poller: parseE2EEData returned an error") s.failCount += 1 return nil } retryErr = p.parseGlobalAccountData(ctx, resp) if shouldRetry(retryErr) { - p.logger.Err(retryErr).Msg("Poller: parseGlobalAccountData returned an error") + log.Err(retryErr).Msg("Poller: parseGlobalAccountData returned an error") s.failCount += 1 return nil } retryErr = p.parseRoomsResponse(ctx, resp) if shouldRetry(retryErr) { - p.logger.Err(retryErr).Msg("Poller: parseRoomsResponse returned an error") + log.Err(retryErr).Msg("Poller: parseRoomsResponse returned an error") s.failCount += 1 return nil } @@ -633,7 +631,7 @@ func (p *poller) poll(ctx context.Context, s *pollLoopState) error { // deduplicated. retryErr = p.parseToDeviceMessages(ctx, resp) if shouldRetry(retryErr) { - p.logger.Err(retryErr).Msg("Poller: parseToDeviceMessages returned an error") + log.Err(retryErr).Msg("Poller: parseToDeviceMessages returned an error") s.failCount += 1 return nil } @@ -824,7 +822,7 @@ func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) erro err = p.receiver.Initialise(ctx, roomID, roomData.State.Events) if err == nil { const warnMsg = "parseRoomsResponse: m.room.create event was found in the timeline not state, info after moving create event" - logger.Warn().Str("user_id", p.userID).Str("room_id", roomID).Int( + log.Warn().Str("user_id", p.userID).Str("room_id", roomID).Int( "timeline", len(roomData.Timeline.Events), ).Int("state", len(roomData.State.Events)).Msg(warnMsg) hub := internal.GetSentryHubFromContextOrDefault(ctx) @@ -960,7 +958,7 @@ func (p *poller) maybeLogStats(force bool) { return } p.lastLogged = time.Now() - p.logger.Info().Ints( + log.Info().Ints( "rooms [timeline,state,typing,receipts,invites]", []int{ p.totalTimelineCalls, p.totalStateCalls, p.totalTyping, p.totalReceipts, p.totalInvites, }, diff --git a/sync2/poller_test.go b/sync2/poller_test.go index 4b145aab..b2c77805 100644 --- a/sync2/poller_test.go +++ b/sync2/poller_test.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "net/http" - "os" "strconv" "sync" "testing" @@ -13,7 +12,6 @@ import ( "github.com/matrix-org/sliding-sync/internal" "github.com/matrix-org/sliding-sync/testutils" - "github.com/rs/zerolog" ) const initialSinceToken = "0" @@ -107,7 +105,7 @@ func TestPollerMapEnsurePolling(t *testing.T) { pm.EnsurePolling(PollerID{ UserID: "alice:localhost", DeviceID: "FOOBAR", - }, "access_token", "", false, zerolog.New(os.Stderr)) + }, "access_token", "", false) close(ensurePollingUnblocked) }() ensureBlocking := func() { @@ -192,7 +190,7 @@ func TestPollerMapEnsurePollingIdempotent(t *testing.T) { for i := 0; i < n; i++ { go func() { t.Logf("EnsurePolling") - pm.EnsurePolling(PollerID{UserID: "@alice:localhost", DeviceID: "FOOBAR"}, "access_token", "", false, zerolog.New(os.Stderr)) + pm.EnsurePolling(PollerID{UserID: "@alice:localhost", DeviceID: "FOOBAR"}, "access_token", "", false) wg.Done() t.Logf("EnsurePolling unblocked") }() @@ -256,7 +254,7 @@ func TestPollerMapEnsurePollingFailsWithExpiredToken(t *testing.T) { pm := NewPollerMap(client, false) pm.SetCallbacks(accumulator) - created, err := pm.EnsurePolling(PollerID{}, "dummy_token", "", true, zerolog.New(os.Stderr)) + created, err := pm.EnsurePolling(PollerID{}, "dummy_token", "", true) if created { t.Errorf("Expected created=false, got created=true") @@ -292,7 +290,7 @@ func TestPollerMap_ExpirePollers(t *testing.T) { for i, spec := range pollerSpecs { created, err := pm.EnsurePolling( PollerID{UserID: spec.UserID, DeviceID: spec.DeviceID}, - spec.Token, "", true, logger, + spec.Token, "", true, ) if err != nil { t.Errorf("EnsurePolling error for poller #%d (%v): %s", i, spec, err) @@ -327,7 +325,7 @@ func TestPollerMap_ExpirePollers(t *testing.T) { for i, spec := range pollerSpecs { created, err := pm.EnsurePolling( PollerID{UserID: spec.UserID, DeviceID: spec.DeviceID}, - spec.Token, "", true, logger, + spec.Token, "", true, ) if err != nil { t.Errorf("EnsurePolling error for poller #%d (%v): %s", i, spec, err) @@ -370,7 +368,7 @@ func TestPollerPollFromNothing(t *testing.T) { }) var wg sync.WaitGroup wg.Add(1) - poller := newPoller(pid, "Authorization: hello world", client, accumulator, zerolog.New(os.Stderr), false) + poller := newPoller(pid, "Authorization: hello world", client, accumulator, false) go func() { defer wg.Done() poller.Poll("") @@ -460,7 +458,7 @@ func TestPollerPollFromExisting(t *testing.T) { }) var wg sync.WaitGroup wg.Add(1) - poller := newPoller(pid, "Authorization: hello world", client, accumulator, zerolog.New(os.Stderr), false) + poller := newPoller(pid, "Authorization: hello world", client, accumulator, false) go func() { defer wg.Done() poller.Poll(since) @@ -503,7 +501,7 @@ func TestPollerPollUpdateDeviceSincePeriodically(t *testing.T) { return <-syncResponses, 200, nil }) accumulator.updateSinceCalled = make(chan struct{}, 1) - poller := newPoller(pid, "Authorization: hello world", client, accumulator, zerolog.New(os.Stderr), false) + poller := newPoller(pid, "Authorization: hello world", client, accumulator, false) defer poller.Terminate() go func() { poller.Poll(initialSinceToken) @@ -613,7 +611,7 @@ func TestPollerGivesUpEventually(t *testing.T) { }() var wg sync.WaitGroup wg.Add(1) - poller := newPoller(PollerID{UserID: "@alice:localhost", DeviceID: deviceID}, "Authorization: hello world", client, accumulator, zerolog.New(os.Stderr), false) + poller := newPoller(PollerID{UserID: "@alice:localhost", DeviceID: deviceID}, "Authorization: hello world", client, accumulator, false) go func() { defer wg.Done() poller.Poll("") @@ -685,7 +683,7 @@ func TestPollerBackoff(t *testing.T) { }() var wg sync.WaitGroup wg.Add(1) - poller := newPoller(PollerID{UserID: "@alice:localhost", DeviceID: deviceID}, "Authorization: hello world", client, accumulator, zerolog.New(os.Stderr), false) + poller := newPoller(PollerID{UserID: "@alice:localhost", DeviceID: deviceID}, "Authorization: hello world", client, accumulator, false) go func() { defer wg.Done() poller.Poll("some_since_value") @@ -715,7 +713,7 @@ func TestPollerUnblocksIfTerminatedInitially(t *testing.T) { pollUnblocked := make(chan struct{}) waitUntilInitialSyncUnblocked := make(chan struct{}) - poller := newPoller(PollerID{UserID: "@alice:localhost", DeviceID: deviceID}, "Authorization: hello world", client, accumulator, zerolog.New(os.Stderr), false) + poller := newPoller(PollerID{UserID: "@alice:localhost", DeviceID: deviceID}, "Authorization: hello world", client, accumulator, false) go func() { poller.Poll("") close(pollUnblocked) @@ -1010,7 +1008,7 @@ func TestPollerResendsOnCallbackError(t *testing.T) { }, } receiver := tc.generateReceiver() - poller := newPoller(pid, "Authorization: hello world", client, receiver, zerolog.New(os.Stderr), false) + poller := newPoller(pid, "Authorization: hello world", client, receiver, false) waitForInitialSync(t, poller) select { case <-waitForStuckPolling: @@ -1082,7 +1080,7 @@ func TestPollerDoesNotResendOnDataError(t *testing.T) { }, 200, nil }, } - poller := newPoller(pid, "Authorization: hello world", client, receiver, zerolog.New(os.Stderr), false) + poller := newPoller(pid, "Authorization: hello world", client, receiver, false) waitForInitialSync(t, poller) select { case <-waitForSuccess: @@ -1112,7 +1110,7 @@ func TestPollerResendsOnDataErrorWithOtherErrors(t *testing.T) { return internal.NewDataError("onLeftRoom this is a test: %v", 42) }, } - poller := newPoller(pid, "Authorization: hello world", nil, receiver, zerolog.New(os.Stderr), false) + poller := newPoller(pid, "Authorization: hello world", nil, receiver, false) testCases := []struct { name string res SyncResponse diff --git a/sync2/storage.go b/sync2/storage.go index 2cfa4c8a..47575c3c 100644 --- a/sync2/storage.go +++ b/sync2/storage.go @@ -1,18 +1,11 @@ package sync2 import ( - "os" - "github.com/getsentry/sentry-go" "github.com/jmoiron/sqlx" - "github.com/rs/zerolog" + "github.com/rs/zerolog/log" ) -var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{ - Out: os.Stderr, - TimeFormat: "15:04:05", -}) - type Storage struct { DevicesTable *DevicesTable TokensTable *TokensTable @@ -24,7 +17,7 @@ func NewStore(postgresURI, secret string) *Storage { if err != nil { sentry.CaptureException(err) // TODO: if we panic(), will sentry have a chance to flush the event? - logger.Panic().Err(err).Str("uri", postgresURI).Msg("failed to open SQL DB") + log.Panic().Err(err).Str("uri", postgresURI).Msg("failed to open SQL DB") } return NewStoreWithDB(db, secret) } diff --git a/sync2/tokens_table.go b/sync2/tokens_table.go index 961e7bdd..7977ec2a 100644 --- a/sync2/tokens_table.go +++ b/sync2/tokens_table.go @@ -8,6 +8,7 @@ import ( "encoding/hex" "fmt" "github.com/jmoiron/sqlx" + "github.com/rs/zerolog/log" "io" "strings" "time" @@ -254,7 +255,7 @@ func (t *TokensTable) Delete(accessTokenHash string) error { return err } if ra != 1 { - logger.Warn().Msgf("Tokens.Delete: expected to delete one token, but actually deleted %d", ra) + log.Warn().Msgf("Tokens.Delete: expected to delete one token, but actually deleted %d", ra) } return nil } diff --git a/sync3/caches/global.go b/sync3/caches/global.go index e448c834..62625a8a 100644 --- a/sync3/caches/global.go +++ b/sync3/caches/global.go @@ -3,13 +3,12 @@ package caches import ( "context" "encoding/json" - "os" "sort" "sync" "github.com/matrix-org/sliding-sync/internal" "github.com/matrix-org/sliding-sync/state" - "github.com/rs/zerolog" + "github.com/rs/zerolog/log" "github.com/tidwall/gjson" ) @@ -55,11 +54,6 @@ type EventData struct { ForceInitial bool } -var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{ - Out: os.Stderr, - TimeFormat: "15:04:05", -}) - // The purpose of global cache is to store global-level information about all rooms the server is aware of. // Global-level information is represented as internal.RoomMetadata and includes things like Heroes, join/invite // counts, if the room is encrypted, etc. Basically anything that is the same for all users of the system. This @@ -124,7 +118,7 @@ func (c *GlobalCache) LoadRoomsFromMap(ctx context.Context, joinTimingsByRoomID func (c *GlobalCache) copyRoom(roomID string) *internal.RoomMetadata { sr := c.roomIDToMetadata[roomID] if sr == nil { - logger.Warn().Str("room", roomID).Msg("GlobalCache.LoadRoom: no metadata for this room, returning stub") + log.Warn().Str("room", roomID).Msg("GlobalCache.LoadRoom: no metadata for this room, returning stub") return internal.NewRoomMetadata(roomID) } return sr.DeepCopy() @@ -176,7 +170,7 @@ func (c *GlobalCache) LoadStateEvent(ctx context.Context, roomID string, loadPos evType: {stateKey}, }) if err != nil { - logger.Err(err).Str("room", roomID).Int64("pos", loadPosition).Msg("failed to load room state") + log.Err(err).Str("room", roomID).Int64("pos", loadPosition).Msg("failed to load room state") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return nil } @@ -198,7 +192,7 @@ func (c *GlobalCache) LoadRoomState(ctx context.Context, roomIDs []string, loadP resultMap := make(map[string][]json.RawMessage, len(roomIDs)) roomIDToStateEvents, err := c.store.RoomStateAfterEventPosition(ctx, roomIDs, loadPosition, requiredStateMap.QueryStateMap()) if err != nil { - logger.Err(err).Strs("rooms", roomIDs).Int64("pos", loadPosition).Msg("failed to load room state") + log.Err(err).Strs("rooms", roomIDs).Int64("pos", loadPosition).Msg("failed to load room state") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return nil } @@ -383,13 +377,13 @@ func (c *GlobalCache) OnInvalidateRoom(ctx context.Context, roomID string) { metadata, ok := c.roomIDToMetadata[roomID] if !ok { - logger.Warn().Str("room_id", roomID).Msg("OnInvalidateRoom: room not in global cache") + log.Warn().Str("room_id", roomID).Msg("OnInvalidateRoom: room not in global cache") return } err := c.store.ResetMetadataState(metadata) if err != nil { internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) - logger.Warn().Err(err).Msg("OnInvalidateRoom: failed to reset metadata") + log.Warn().Err(err).Msg("OnInvalidateRoom: failed to reset metadata") } } diff --git a/sync3/caches/user.go b/sync3/caches/user.go index 3a7ec0c1..4fd95381 100644 --- a/sync3/caches/user.go +++ b/sync3/caches/user.go @@ -10,6 +10,7 @@ import ( "github.com/matrix-org/sliding-sync/internal" "github.com/matrix-org/sliding-sync/state" + "github.com/rs/zerolog/log" "github.com/tidwall/gjson" "github.com/tidwall/sjson" ) @@ -137,7 +138,7 @@ func NewInviteData(ctx context.Context, userID, roomID string, inviteState []jso } if id.InviteEvent == nil { const errMsg = "cannot make invite, missing invite event for user" - logger.Error().Str("invitee", userID).Str("room", roomID).Int("num_invite_state", len(inviteState)).Msg(errMsg) + log.Error().Str("invitee", userID).Str("room", roomID).Int("num_invite_state", len(inviteState)).Msg(errMsg) hub := internal.GetSentryHubFromContextOrDefault(ctx) hub.WithScope(func(scope *sentry.Scope) { scope.SetContext(internal.SentryCtxKey, map[string]interface{}{ @@ -331,7 +332,7 @@ func (c *UserCache) LazyLoadTimelines(ctx context.Context, loadPos int64, roomID result := make(map[string]state.LatestEvents) roomIDToLatestEvents, err := c.store.LatestEventsInRooms(c.UserID, roomIDs, loadPos, maxTimelineEvents) if err != nil { - logger.Err(err).Strs("rooms", roomIDs).Msg("failed to get LatestEventsInRooms") + log.Err(err).Strs("rooms", roomIDs).Msg("failed to get LatestEventsInRooms") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return nil } @@ -401,7 +402,7 @@ func (c *UserCache) newRoomUpdate(ctx context.Context, roomID string) RoomUpdate if globalRooms == nil || globalRooms[roomID] == nil { // this can happen when we join a room we didn't know about because we process unread counts // before the timeline events. Warn and send a stub - logger.Warn().Str("room", roomID).Msg("UserCache update: room doesn't exist in global cache yet, generating stub") + log.Warn().Str("room", roomID).Msg("UserCache update: room doesn't exist in global cache yet, generating stub") r = internal.NewRoomMetadata(roomID) } else { r = globalRooms[roomID] @@ -482,7 +483,7 @@ func (c *UserCache) AnnotateWithTransactionIDs(ctx context.Context, userID strin event := events[data.i] newJSON, err := sjson.SetBytes(event, "unsigned.transaction_id", txnID) if err != nil { - logger.Err(err).Str("user", c.UserID).Msg("AnnotateWithTransactionIDs: sjson failed") + log.Err(err).Str("user", c.UserID).Msg("AnnotateWithTransactionIDs: sjson failed") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) } else { events[data.i] = newJSON diff --git a/sync3/conn.go b/sync3/conn.go index 7fcb32e0..7c421f11 100644 --- a/sync3/conn.go +++ b/sync3/conn.go @@ -9,6 +9,7 @@ import ( "github.com/matrix-org/sliding-sync/internal" "github.com/matrix-org/sliding-sync/sync3/caches" + "github.com/rs/zerolog/log" ) // The amount of time to artificially wait if the server detects spamming clients. This time will @@ -97,7 +98,7 @@ func (c *Conn) tryRequest(ctx context.Context, req *Request, start time.Time) (r panicErr := recover() if panicErr != nil { err = fmt.Errorf("panic: %s", panicErr) - logger.Error().Msg(string(debug.Stack())) + log.Error().Msg(string(debug.Stack())) // Note: as we've captured the panicErr ourselves, there isn't much // difference between RecoverWithContext and CaptureException. But // there /is/ a small difference: @@ -160,7 +161,7 @@ func (c *Conn) OnIncomingRequest(ctx context.Context, req *Request, start time.T // are playing games if !isFirstRequest && !isRetransmit && !c.isOutstanding(req.pos) { // the client made up a position, reject them - logger.Trace().Int64("pos", req.pos).Msg("unknown pos") + log.Trace().Int64("pos", req.pos).Msg("unknown pos") return nil, internal.ExpiredSessionError() } @@ -181,7 +182,7 @@ func (c *Conn) OnIncomingRequest(ctx context.Context, req *Request, start time.T c.serverResponses = c.serverResponses[delIndex+1:] // slice out the first delIndex+1 elements defer func() { - l := logger.Trace().Int("num_res_acks", delIndex+1).Bool("is_retransmit", isRetransmit).Bool("is_first", isFirstRequest).Bool("is_same", isSameRequest).Int64("pos", req.pos).Str("user", c.UserID) + l := log.Trace().Int("num_res_acks", delIndex+1).Bool("is_retransmit", isRetransmit).Bool("is_first", isFirstRequest).Bool("is_same", isSameRequest).Int64("pos", req.pos).Str("user", c.UserID) if nextUnACKedResponse != nil { l.Int64("new_pos", nextUnACKedResponse.PosInt()) } @@ -196,13 +197,13 @@ func (c *Conn) OnIncomingRequest(ctx context.Context, req *Request, start time.T if isSameRequest { // this is the 2nd+ time we've seen this request, meaning the client likely retried this // request. Send the response we sent before. - logger.Trace().Int64("pos", req.pos).Msg("returning cached response for pos, with delay") + log.Trace().Int64("pos", req.pos).Msg("returning cached response for pos, with delay") // apply a small artificial wait to protect the proxy in case this is caused by a buggy // client sending the same request over and over time.Sleep(SpamProtectionInterval) return nextUnACKedResponse, nil } else { - logger.Info().Int64("pos", req.pos).Msg("client has resent this pos with different request data") + log.Info().Int64("pos", req.pos).Msg("client has resent this pos with different request data") // we need to fallthrough to process this request as the client will not resend this request data, } } diff --git a/sync3/connmap.go b/sync3/connmap.go index 287770fe..d03fc777 100644 --- a/sync3/connmap.go +++ b/sync3/connmap.go @@ -10,6 +10,7 @@ import ( "github.com/ReneKroon/ttlcache/v2" "github.com/matrix-org/sliding-sync/internal" "github.com/prometheus/client_golang/prometheus" + "github.com/rs/zerolog/log" ) // ConnMap stores a collection of Conns. @@ -126,7 +127,7 @@ func (m *ConnMap) getConn(cid ConnID) *Conn { return conn } // e.g buffer exceeded, close it and remove it from the cache - logger.Info().Str("conn", cid.String()).Msg("closing connection due to dead connection (buffer full)") + log.Info().Str("conn", cid.String()).Msg("closing connection due to dead connection (buffer full)") m.closeConn(conn) if m.expiryBufferFullCounter != nil { m.expiryBufferFullCounter.Inc() @@ -149,7 +150,7 @@ func (m *ConnMap) CreateConn(cid ConnID, cancel context.CancelFunc, newConnHandl // /sync without a `?pos=` value. time.Sleep(SpamProtectionInterval) } - logger.Trace().Str("conn", cid.String()).Bool("spamming", isSpamming).Msg("closing connection due to CreateConn called again") + log.Trace().Str("conn", cid.String()).Bool("spamming", isSpamming).Msg("closing connection due to CreateConn called again") m.closeConn(conn) } h := newConnHandler() @@ -163,13 +164,13 @@ func (m *ConnMap) CreateConn(cid ConnID, cancel context.CancelFunc, newConnHandl } func (m *ConnMap) CloseConnsForDevice(userID, deviceID string) { - logger.Trace().Str("user", userID).Str("device", deviceID).Msg("closing connections due to CloseConn()") + log.Trace().Str("user", userID).Str("device", deviceID).Msg("closing connections due to CloseConn()") // gather open connections for this user|device connIDs := m.connIDsForDevice(userID, deviceID) for _, cid := range connIDs { err := m.cache.Remove(cid.String()) // this will fire TTL callbacks which calls closeConn if err != nil { - logger.Err(err).Str("cid", cid.String()).Msg("CloseConnsForDevice: cid did not exist in ttlcache") + log.Err(err).Str("cid", cid.String()).Msg("CloseConnsForDevice: cid did not exist in ttlcache") internal.GetSentryHubFromContextOrDefault(context.Background()).CaptureException(err) } } @@ -195,12 +196,12 @@ func (m *ConnMap) CloseConnsForUsers(userIDs []string) (closed int) { defer m.mu.Unlock() for _, userID := range userIDs { conns := m.userIDToConn[userID] - logger.Trace().Str("user", userID).Int("num_conns", len(conns)).Msg("closing all device connections due to CloseConn()") + log.Trace().Str("user", userID).Int("num_conns", len(conns)).Msg("closing all device connections due to CloseConn()") for _, conn := range conns { err := m.cache.Remove(conn.String()) // this will fire TTL callbacks which calls closeConn if err != nil { - logger.Err(err).Str("cid", conn.String()).Msg("CloseConnsForDevice: cid did not exist in ttlcache") + log.Err(err).Str("cid", conn.String()).Msg("CloseConnsForDevice: cid did not exist in ttlcache") internal.GetSentryHubFromContextOrDefault(context.Background()).CaptureException(err) } } @@ -213,7 +214,7 @@ func (m *ConnMap) closeConnExpires(connID string, value interface{}) { m.mu.Lock() defer m.mu.Unlock() conn := value.(*Conn) - logger.Info().Str("conn", connID).Msg("closing connection due to expired TTL in cache") + log.Info().Str("conn", connID).Msg("closing connection due to expired TTL in cache") if m.expiryTimedOutCounter != nil { m.expiryTimedOutCounter.Inc() } @@ -227,7 +228,7 @@ func (m *ConnMap) closeConn(conn *Conn) { } connKey := conn.ConnID.String() - logger.Trace().Str("conn", connKey).Msg("closing connection") + log.Trace().Str("conn", connKey).Msg("closing connection") // remove conn from all the maps delete(m.connIDToConn, connKey) h := conn.handler diff --git a/sync3/dispatcher.go b/sync3/dispatcher.go index a7bb24b1..e100c4e1 100644 --- a/sync3/dispatcher.go +++ b/sync3/dispatcher.go @@ -3,20 +3,14 @@ package sync3 import ( "context" "encoding/json" - "os" "sync" "github.com/matrix-org/sliding-sync/internal" "github.com/matrix-org/sliding-sync/sync3/caches" - "github.com/rs/zerolog" + "github.com/rs/zerolog/log" "github.com/tidwall/gjson" ) -var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{ - Out: os.Stderr, - TimeFormat: "15:04:05", -}) - const DispatcherAllUsers = "-" // Receiver represents the callbacks that a Dispatcher may fire. @@ -83,7 +77,7 @@ func (d *Dispatcher) Register(ctx context.Context, userID string, r Receiver) er d.userToReceiverMu.Lock() defer d.userToReceiverMu.Unlock() if _, ok := d.userToReceiver[userID]; ok { - logger.Warn().Str("user", userID).Msg("Dispatcher.Register: receiver already registered") + log.Warn().Str("user", userID).Msg("Dispatcher.Register: receiver already registered") } d.userToReceiver[userID] = r return r.OnRegistered(ctx) @@ -122,7 +116,7 @@ func (d *Dispatcher) newEventData(event json.RawMessage, roomID string, latestPo func (d *Dispatcher) OnNewInitialRoomState(ctx context.Context, roomID string, state []json.RawMessage) { // sanity check if _, jc := d.jrt.JoinedUsersForRoom(roomID, nil); jc > 0 { - logger.Warn().Int("join_count", jc).Str("room", roomID).Int("num_state", len(state)).Msg( + log.Warn().Int("join_count", jc).Str("room", roomID).Int("num_state", len(state)).Msg( "OnNewInitialRoomState but have entries in JoinedRoomsTracker already, this should be impossible. Degrading to live events", ) for _, s := range state { diff --git a/sync3/extensions/account_data.go b/sync3/extensions/account_data.go index a84d7f2a..b63d1d6b 100644 --- a/sync3/extensions/account_data.go +++ b/sync3/extensions/account_data.go @@ -7,6 +7,7 @@ import ( "github.com/matrix-org/sliding-sync/internal" "github.com/matrix-org/sliding-sync/state" "github.com/matrix-org/sliding-sync/sync3/caches" + "github.com/rs/zerolog/log" ) // Client created request params @@ -66,7 +67,7 @@ func (r *AccountDataRequest) AppendLive(ctx context.Context, res *Response, extC } roomAccountData, err := extCtx.Store.AccountDatas(extCtx.UserID, update.RoomID()) if err != nil { - logger.Err(err).Str("user", extCtx.UserID).Str("room", update.RoomID()).Msg("failed to fetch room account data") + log.Err(err).Str("user", extCtx.UserID).Str("room", update.RoomID()).Msg("failed to fetch room account data") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) } else { if len(roomAccountData) > 0 { // else we can end up with `null` not `[]` @@ -109,7 +110,7 @@ func (r *AccountDataRequest) ProcessInitial(ctx context.Context, res *Response, if len(roomIDs) > 0 { roomsAccountData, err := extCtx.Store.AccountDatas(extCtx.UserID, roomIDs...) if err != nil { - logger.Err(err).Str("user", extCtx.UserID).Strs("rooms", roomIDs).Msg("failed to fetch room account data") + log.Err(err).Str("user", extCtx.UserID).Strs("rooms", roomIDs).Msg("failed to fetch room account data") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) } else { extRes.Rooms = make(map[string][]json.RawMessage) @@ -123,7 +124,7 @@ func (r *AccountDataRequest) ProcessInitial(ctx context.Context, res *Response, if extCtx.IsInitial { globalAccountData, err := extCtx.Store.AccountDatas(extCtx.UserID) if err != nil { - logger.Err(err).Str("user", extCtx.UserID).Msg("failed to fetch global account data") + log.Err(err).Str("user", extCtx.UserID).Msg("failed to fetch global account data") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) } else { extRes.Global = accountEventsAsJSON(globalAccountData) diff --git a/sync3/extensions/extensions.go b/sync3/extensions/extensions.go index 4eebd4c9..bec15a3c 100644 --- a/sync3/extensions/extensions.go +++ b/sync3/extensions/extensions.go @@ -3,20 +3,13 @@ package extensions import ( "context" - "os" "reflect" "github.com/matrix-org/sliding-sync/internal" "github.com/matrix-org/sliding-sync/state" "github.com/matrix-org/sliding-sync/sync3/caches" - "github.com/rs/zerolog" ) -var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{ - Out: os.Stderr, - TimeFormat: "15:04:05", -}) - type GenericRequest interface { // Name provides a name to identify the kind of request. At present, it's only // used to name opentracing spans; this isn't end-user visible. diff --git a/sync3/extensions/receipts.go b/sync3/extensions/receipts.go index cf5be0de..80e17781 100644 --- a/sync3/extensions/receipts.go +++ b/sync3/extensions/receipts.go @@ -7,6 +7,7 @@ import ( "github.com/matrix-org/sliding-sync/internal" "github.com/matrix-org/sliding-sync/state" "github.com/matrix-org/sliding-sync/sync3/caches" + "github.com/rs/zerolog/log" ) // Client created request params @@ -42,7 +43,7 @@ func (r *ReceiptsRequest) AppendLive(ctx context.Context, res *Response, extCtx if res.Receipts == nil { edu, err := state.PackReceiptsIntoEDU([]internal.Receipt{update.Receipt}) if err != nil { - logger.Err(err).Str("user", extCtx.UserID).Str("room", update.Receipt.RoomID).Msg("failed to pack receipt into new edu") + log.Err(err).Str("user", extCtx.UserID).Str("room", update.Receipt.RoomID).Msg("failed to pack receipt into new edu") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return } @@ -55,7 +56,7 @@ func (r *ReceiptsRequest) AppendLive(ctx context.Context, res *Response, extCtx // we have receipts already, but not for this room edu, err := state.PackReceiptsIntoEDU([]internal.Receipt{update.Receipt}) if err != nil { - logger.Err(err).Str("user", extCtx.UserID).Str("room", update.Receipt.RoomID).Msg("failed to pack receipt into edu") + log.Err(err).Str("user", extCtx.UserID).Str("room", update.Receipt.RoomID).Msg("failed to pack receipt into edu") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return } @@ -65,7 +66,7 @@ func (r *ReceiptsRequest) AppendLive(ctx context.Context, res *Response, extCtx // aggregate receipts: we need to unpack then repack annoyingly. pub, priv, err := state.UnpackReceiptsFromEDU(update.RoomID(), res.Receipts.Rooms[update.RoomID()]) if err != nil { - logger.Err(err).Str("user", extCtx.UserID).Str("room", update.Receipt.RoomID).Msg("failed to pack receipt into edu") + log.Err(err).Str("user", extCtx.UserID).Str("room", update.Receipt.RoomID).Msg("failed to pack receipt into edu") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return } @@ -74,7 +75,7 @@ func (r *ReceiptsRequest) AppendLive(ctx context.Context, res *Response, extCtx receipts = append(receipts, update.Receipt) edu, err := state.PackReceiptsIntoEDU(receipts) if err != nil { - logger.Err(err).Str("user", extCtx.UserID).Str("room", update.Receipt.RoomID).Msg("failed to pack receipt into edu") + log.Err(err).Str("user", extCtx.UserID).Str("room", update.Receipt.RoomID).Msg("failed to pack receipt into edu") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return } @@ -94,7 +95,7 @@ func (r *ReceiptsRequest) ProcessInitial(ctx context.Context, res *Response, ext } receipts, err := extCtx.Store.ReceiptTable.SelectReceiptsForEvents(roomID, timeline) if err != nil { - logger.Err(err).Str("user", extCtx.UserID).Str("room", roomID).Msg("failed to SelectReceiptsForEvents") + log.Err(err).Str("user", extCtx.UserID).Str("room", roomID).Msg("failed to SelectReceiptsForEvents") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) continue } @@ -104,7 +105,7 @@ func (r *ReceiptsRequest) ProcessInitial(ctx context.Context, res *Response, ext // single shot query to pull out our own receipts for these rooms to always include our own receipts ownReceipts, err := extCtx.Store.ReceiptTable.SelectReceiptsForUser(interestedRoomIDs, extCtx.UserID) if err != nil { - logger.Err(err).Str("user", extCtx.UserID).Strs("rooms", interestedRoomIDs).Msg("failed to SelectReceiptsForUser") + log.Err(err).Str("user", extCtx.UserID).Strs("rooms", interestedRoomIDs).Msg("failed to SelectReceiptsForUser") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return } diff --git a/sync3/extensions/todevice.go b/sync3/extensions/todevice.go index 430c3da5..a4d17339 100644 --- a/sync3/extensions/todevice.go +++ b/sync3/extensions/todevice.go @@ -10,6 +10,7 @@ import ( "github.com/matrix-org/sliding-sync/internal" "github.com/matrix-org/sliding-sync/sync3/caches" + "github.com/rs/zerolog/log" ) // used to remember since positions to warn when they are not incremented. This can happen @@ -64,7 +65,7 @@ func (r *ToDeviceRequest) ProcessInitial(ctx context.Context, res *Response, ext if r.Limit == 0 { r.Limit = 100 // default to 100 } - l := logger.With().Str("user", extCtx.UserID).Str("device", extCtx.DeviceID).Logger() + l := log.With().Str("user", extCtx.UserID).Str("device", extCtx.DeviceID).Logger() mapMu.Lock() lastSentPos, exists := deviceIDToSinceDebugOnly[extCtx.DeviceID] diff --git a/sync3/handler/connstate.go b/sync3/handler/connstate.go index 2eed4974..b424d524 100644 --- a/sync3/handler/connstate.go +++ b/sync3/handler/connstate.go @@ -10,6 +10,7 @@ import ( "github.com/matrix-org/sliding-sync/sync3/caches" "github.com/matrix-org/sliding-sync/sync3/extensions" "github.com/prometheus/client_golang/prometheus" + "github.com/rs/zerolog/log" "github.com/tidwall/gjson" ) @@ -179,7 +180,7 @@ func (s *ConnState) OnIncomingRequest(ctx context.Context, cid sync3.ConnID, req if err != nil { // in practice this means DB hit failures. If we try again later maybe it'll work, and we will because // anchorLoadPosition is unset. - logger.Err(err).Str("conn", cid.String()).Msg("failed to load initial data") + log.Err(err).Str("conn", cid.String()).Msg("failed to load initial data") } region.End() } @@ -313,7 +314,7 @@ func (s *ConnState) onIncomingListRequest(ctx context.Context, builder *RoomsBui // the sort/filter operations have changed, invalidate everything (if there were previous syncs), re-sort and re-SYNC if prevReqList != nil { // there were previous syncs for this list, INVALIDATE the lot - logger.Trace().Interface("range", prevRange).Msg("INVALIDATEing because sort/filter ops have changed") + log.Trace().Interface("range", prevRange).Msg("INVALIDATEing because sort/filter ops have changed") allRoomIDs := roomList.RoomIDs() for _, r := range prevRange { if r[0] >= roomList.Len() { @@ -333,7 +334,7 @@ func (s *ConnState) onIncomingListRequest(ctx context.Context, builder *RoomsBui } // resort as either we changed the sort order or we added/removed a bunch of rooms if err := roomList.Sort(nextReqList.Sort); err != nil { - logger.Err(err).Str("key", listKey).Msg("cannot sort list") + log.Err(err).Str("key", listKey).Msg("cannot sort list") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) } addedRanges = nextReqList.Ranges @@ -342,7 +343,7 @@ func (s *ConnState) onIncomingListRequest(ctx context.Context, builder *RoomsBui // send INVALIDATE for these ranges if len(removedRanges) > 0 { - logger.Trace().Interface("range", removedRanges).Msg("INVALIDATEing because ranges were removed") + log.Trace().Interface("range", removedRanges).Msg("INVALIDATEing because ranges were removed") } for i := range removedRanges { if removedRanges[i][0] >= (roomList.Len()) { @@ -431,7 +432,7 @@ func (s *ConnState) buildListSubscriptions(ctx context.Context, builder *RoomsBu for listKey, list := range listDeltas { if list.Curr == nil { // they deleted this list - logger.Debug().Str("key", listKey).Msg("list deleted") + log.Debug().Str("key", listKey).Msg("list deleted") s.lists.DeleteList(listKey) continue } @@ -451,7 +452,7 @@ func (s *ConnState) buildRoomSubscriptions(ctx context.Context, builder *RoomsBu sub, ok := s.muxedReq.RoomSubscriptions[roomID] if !ok { - logger.Warn().Str("room_id", roomID).Msg( + log.Warn().Str("room_id", roomID).Msg( "room listed in subscriptions but there is no subscription information in the request, ignoring room subscription.", ) continue @@ -724,7 +725,7 @@ func (s *ConnState) trackProcessDuration(ctx context.Context, dur time.Duration, // Called when the connection is torn down func (s *ConnState) Destroy() { s.userCache.Unsubscribe(s.userCacheID) - logger.Debug().Str("user_id", s.userID).Str("device_id", s.deviceID).Msg("cancelling any in-flight requests") + log.Debug().Str("user_id", s.userID).Str("device_id", s.deviceID).Msg("cancelling any in-flight requests") if s.cancelLatestReq != nil { s.cancelLatestReq() } @@ -761,7 +762,7 @@ func (s *ConnState) OnRoomUpdate(ctx context.Context, up caches.RoomUpdate) { internal.AssertWithContext(ctx, "missing global room metadata", update.GlobalRoomMetadata() != nil) s.OnUpdate(ctx, update) default: - logger.Warn().Str("room_id", up.RoomID()).Msg("OnRoomUpdate unknown update type") + log.Warn().Str("room_id", up.RoomID()).Msg("OnRoomUpdate unknown update type") } } diff --git a/sync3/handler/connstate_live.go b/sync3/handler/connstate_live.go index 70da78db..ad5f294a 100644 --- a/sync3/handler/connstate_live.go +++ b/sync3/handler/connstate_live.go @@ -9,6 +9,7 @@ import ( "github.com/matrix-org/sliding-sync/sync3" "github.com/matrix-org/sliding-sync/sync3/caches" "github.com/matrix-org/sliding-sync/sync3/extensions" + "github.com/rs/zerolog/log" ) // the amount of time to try to insert into a full buffer before giving up. @@ -37,7 +38,7 @@ func (s *connStateLive) onUpdate(up caches.Update) { select { case s.updates <- up: case <-time.After(BufferWaitTime): - logger.Warn().Interface("update", up).Str("user", s.userID).Str("device", s.deviceID).Msg( + log.Warn().Interface("update", up).Str("user", s.userID).Str("device", s.deviceID).Msg( "cannot send update to connection, buffer exceeded. Destroying connection.", ) s.bufferFull = true @@ -50,7 +51,7 @@ func (s *connStateLive) liveUpdate( ctx context.Context, req *sync3.Request, ex extensions.Request, isInitial bool, response *sync3.Response, ) { - log := logger.With().Str("user", s.userID).Str("device", s.deviceID).Logger() + log := log.With().Str("user", s.userID).Str("device", s.deviceID).Logger() // we need to ensure that we keep consuming from the updates channel, even if they want a response // immediately. If we have new list data we won't wait, but if we don't then we need to be able to // catch-up to the current head position, hence giving 100ms grace period for processing. @@ -150,7 +151,7 @@ func (s *connStateLive) processLiveUpdate(ctx context.Context, up caches.Update, // Skip message events from ignored users. if roomEventUpdate.EventData.StateKey == nil && s.userCache.ShouldIgnore(roomEventUpdate.EventData.Sender) { - logger.Trace(). + log.Trace(). Str("user", s.userID). Str("type", roomEventUpdate.EventData.EventType). Str("sender", roomEventUpdate.EventData.Sender). @@ -386,14 +387,14 @@ func (s *connStateLive) processLiveUpdateForList( ) (hasUpdates bool) { switch update := up.(type) { case *caches.RoomEventUpdate: - logger.Trace().Str("user", s.userID).Str("type", update.EventData.EventType).Msg("received event update") + log.Trace().Str("user", s.userID).Str("type", update.EventData.EventType).Msg("received event update") if update.EventData.ForceInitial { // add room to sub: this applies for when we track all rooms too as we want joins/etc to come through with initial data subID := builder.AddSubscription(reqList.RoomSubscription) builder.AddRoomsToSubscription(ctx, subID, []string{update.RoomID()}) } case *caches.UnreadCountUpdate: - logger.Trace().Str("user", s.userID).Str("room", update.RoomID()).Bool("count_decreased", update.HasCountDecreased).Msg("received unread count update") + log.Trace().Str("user", s.userID).Str("room", update.RoomID()).Bool("count_decreased", update.HasCountDecreased).Msg("received unread count update") // normally we do not signal unread count increases to the client as we want to atomically // increase the count AND send the msg so there's no phantom msgs/notifications. However, // we must resort the list and send delta even if this is an increase else diff --git a/sync3/handler/ensure_polling.go b/sync3/handler/ensure_polling.go index 24a54cae..db63d142 100644 --- a/sync3/handler/ensure_polling.go +++ b/sync3/handler/ensure_polling.go @@ -9,6 +9,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/matrix-org/sliding-sync/pubsub" + "github.com/rs/zerolog/log" ) // pendingInfo tracks the status of a poller that we are (or previously were) waiting @@ -128,7 +129,7 @@ func (p *EnsurePoller) EnsurePolling(ctx context.Context, pid sync2.PollerID, to } func (p *EnsurePoller) OnInitialSyncComplete(payload *pubsub.V2InitialSyncComplete) { - log := logger.With().Str("user", payload.UserID).Str("device", payload.DeviceID).Logger() + log := log.With().Str("user", payload.UserID).Str("device", payload.DeviceID).Logger() log.Trace().Msg("OnInitialSyncComplete: got payload") pid := sync2.PollerID{UserID: payload.UserID, DeviceID: payload.DeviceID} p.mu.Lock() diff --git a/sync3/handler/handler.go b/sync3/handler/handler.go index 9ab0db77..fd3ba615 100644 --- a/sync3/handler/handler.go +++ b/sync3/handler/handler.go @@ -8,7 +8,6 @@ import ( "fmt" "net/http" "net/url" - "os" "reflect" "strconv" "sync" @@ -34,11 +33,6 @@ import ( const DefaultSessionID = "default" -var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{ - Out: os.Stderr, - TimeFormat: "15:04:05", -}) - // This is a net.http Handler for sync v3. It is responsible for pairing requests to Conns and to // ensure that the sync v2 poller is running for this client. type SyncLiveHandler struct { @@ -75,7 +69,7 @@ func NewSync3Handler( pub pubsub.Notifier, sub pubsub.Listener, enablePrometheus bool, maxPendingEventUpdates int, maxTransactionIDDelay time.Duration, ) (*SyncLiveHandler, error) { - logger.Info().Msg("creating handler") + log.Info().Msg("creating handler") sh := &SyncLiveHandler{ V2: v2Client, Storage: store, @@ -122,7 +116,7 @@ func (h *SyncLiveHandler) Listen() { defer internal.ReportPanicsToSentry() err := h.V2Sub.Listen() if err != nil { - logger.Err(err).Msg("Failed to listen for v2 messages") + log.Err(err).Msg("Failed to listen for v2 messages") sentry.CaptureException(err) } }() @@ -483,7 +477,7 @@ func (h *SyncLiveHandler) identifyUnknownAccessToken(ctx context.Context, access // Create a brand-new row for this token. token, err = h.V2Store.TokensTable.Insert(txn, accessToken, userID, deviceID, time.Now()) if err != nil { - logger.Warn().Err(err).Str("user", userID).Str("device", deviceID).Msg("failed to insert v2 token") + log.Warn().Err(err).Str("user", userID).Str("device", deviceID).Msg("failed to insert v2 token") return err } @@ -625,7 +619,7 @@ func (h *SyncLiveHandler) DeviceData(ctx context.Context, userID, deviceID strin dd, err := h.Storage.DeviceDataTable.Select(userID, deviceID, shouldSwap) if err != nil { - logger.Err(err).Str("user", userID).Msg("failed to SelectAndSwap device data") + log.Err(err).Str("user", userID).Msg("failed to SelectAndSwap device data") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return nil } @@ -637,7 +631,7 @@ func (h *SyncLiveHandler) DeviceData(ctx context.Context, userID, deviceID strin func (h *SyncLiveHandler) TransactionIDForEvents(userID string, deviceID string, eventIDs []string) (eventIDToTxnID map[string]string) { eventIDToTxnID, err := h.Storage.TransactionsTable.Select(userID, deviceID, eventIDs) if err != nil { - logger.Warn().Str("err", err.Error()).Str("device", deviceID).Msg("failed to select txn IDs for events") + log.Warn().Str("err", err.Error()).Str("device", deviceID).Msg("failed to select txn IDs for events") } return } @@ -653,7 +647,7 @@ func (h *SyncLiveHandler) Accumulate(p *pubsub.V2Accumulate) { // note: events is sorted in ascending NID order, event if p.EventNIDs isn't. events, err := h.Storage.EventNIDs(p.EventNIDs) if err != nil { - logger.Err(err).Str("room", p.RoomID).Msg("Accumulate: failed to EventNIDs") + log.Err(err).Str("room", p.RoomID).Msg("Accumulate: failed to EventNIDs") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return } @@ -684,7 +678,7 @@ func (h *SyncLiveHandler) Initialise(p *pubsub.V2Initialise) { defer task.End() state, err := h.Storage.StateSnapshot(p.SnapshotNID) if err != nil { - logger.Err(err).Int64("snap", p.SnapshotNID).Str("room", p.RoomID).Msg("Initialise: failed to get StateSnapshot") + log.Err(err).Int64("snap", p.SnapshotNID).Str("room", p.RoomID).Msg("Initialise: failed to get StateSnapshot") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return } @@ -736,7 +730,7 @@ func (h *SyncLiveHandler) OnInvite(p *pubsub.V2InviteRoom) { } inviteState, err := h.Storage.InvitesTable.SelectInviteState(p.UserID, p.RoomID) if err != nil { - logger.Err(err).Str("user", p.UserID).Str("room", p.RoomID).Msg("failed to get invite state") + log.Err(err).Str("user", p.UserID).Str("room", p.RoomID).Msg("failed to get invite state") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return } @@ -806,7 +800,7 @@ func (h *SyncLiveHandler) OnAccountData(p *pubsub.V2AccountData) { } data, err := h.Storage.AccountData(p.UserID, p.RoomID, p.Types) if err != nil { - logger.Err(err).Str("user", p.UserID).Str("room", p.RoomID).Msg("OnAccountData: failed to lookup") + log.Err(err).Str("user", p.UserID).Str("room", p.RoomID).Msg("OnAccountData: failed to lookup") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return } @@ -848,7 +842,7 @@ func (h *SyncLiveHandler) OnInvalidateRoom(p *pubsub.V2InvalidateRoom) { }) hub.CaptureException(err) }) - logger.Err(err). + log.Err(err). Str("room_id", p.RoomID). Msg("Failed to fetch members after cache invalidation") return @@ -871,7 +865,7 @@ func (h *SyncLiveHandler) OnInvalidateRoom(p *pubsub.V2InvalidateRoom) { h.destroyedConns.Add(float64(destroyed)) } // invalidations are rare and dangerous if we get it wrong, so log information about it. - logger.Info(). + log.Info(). Str("room_id", p.RoomID).Int("joins", len(joins)).Int("invites", len(invites)).Int("leaves", len(leaves)). Int("del_user_caches", len(unregistered)).Int("conns_destroyed", destroyed).Msg("OnInvalidateRoom") } diff --git a/sync3/handler/rooms_builder.go b/sync3/handler/rooms_builder.go index e0e4efc8..0a9a15df 100644 --- a/sync3/handler/rooms_builder.go +++ b/sync3/handler/rooms_builder.go @@ -13,19 +13,21 @@ import ( // in the Response. It is not thread-safe and should only be called by the ConnState thread. // // The top-level `rooms` key is an amalgamation of: -// - Room subscriptions -// - Rooms within all sliding lists. +// - Room subscriptions +// - Rooms within all sliding lists. // // The purpose of this builder is to remember which rooms we will be returning data for, along with the // room subscription for that room. This then allows efficient database accesses. For example: -// - List A will return !a, !b, !c with Room Subscription X -// - List B will return !b, !c, !d with Room Subscription Y -// - Room sub for !a with Room Subscription Z +// - List A will return !a, !b, !c with Room Subscription X +// - List B will return !b, !c, !d with Room Subscription Y +// - Room sub for !a with Room Subscription Z +// // Rather than performing each operation in isolation and query for rooms multiple times (where the // response data will inevitably be dropped), we can instead amalgamate this into: -// - Room Subscription X+Z -> !a -// - Room Subscription X+Y -> !b, !c -// - Room Subscription Y -> !d +// - Room Subscription X+Z -> !a +// - Room Subscription X+Y -> !b, !c +// - Room Subscription Y -> !d +// // This data will not be wasted when it has been retrieved from the database. type RoomsBuilder struct { subs []sync3.RoomSubscription diff --git a/sync3/lists.go b/sync3/lists.go index 823637b4..13f46ba8 100644 --- a/sync3/lists.go +++ b/sync3/lists.go @@ -5,6 +5,7 @@ import ( "strings" "github.com/matrix-org/sliding-sync/internal" + "github.com/rs/zerolog/log" ) type OverwriteVal bool @@ -244,7 +245,7 @@ func (s *InternalRequestLists) AssignList(ctx context.Context, listKey string, f if sort != nil { err := roomList.Sort(sort) if err != nil { - logger.Err(err).Strs("sort_by", sort).Msg("failed to sort") + log.Err(err).Strs("sort_by", sort).Msg("failed to sort") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) } } diff --git a/sync3/main_test.go b/sync3/main_test.go index 9638346d..551777b6 100644 --- a/sync3/main_test.go +++ b/sync3/main_test.go @@ -6,12 +6,13 @@ import ( "github.com/matrix-org/sliding-sync/testutils" "github.com/rs/zerolog" + "github.com/rs/zerolog/log" ) var postgresConnectionString = "user=xxxxx dbname=syncv3_test sslmode=disable" func TestMain(m *testing.M) { - logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{ + log.Logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{ Out: os.Stderr, TimeFormat: "15:04:05", NoColor: true, diff --git a/sync3/ops.go b/sync3/ops.go index 9f051191..906c6b8f 100644 --- a/sync3/ops.go +++ b/sync3/ops.go @@ -3,6 +3,7 @@ package sync3 import ( "context" "github.com/matrix-org/sliding-sync/internal" + "github.com/rs/zerolog/log" ) type List interface { @@ -49,7 +50,7 @@ func CalculateListOps(ctx context.Context, reqList *RequestList, list List, room list.Add(roomID) // this should only move exactly 1 room at most as this is called for every single update if err := list.Sort(reqList.Sort); err != nil { - logger.Err(err).Msg("cannot sort list") + log.Err(err).Msg("cannot sort list") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) } // find the new position of this room @@ -67,7 +68,7 @@ func CalculateListOps(ctx context.Context, reqList *RequestList, list List, room case ListOpChange: // this should only move exactly 1 room at most as this is called for every single update if err := list.Sort(reqList.Sort); err != nil { - logger.Err(err).Msg("cannot sort list") + log.Err(err).Msg("cannot sort list") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) } // find the new position of this room diff --git a/tests-e2e/gappy_state_test.go b/tests-e2e/gappy_state_test.go index 209fc9b3..77078d7f 100644 --- a/tests-e2e/gappy_state_test.go +++ b/tests-e2e/gappy_state_test.go @@ -69,11 +69,11 @@ func TestGappyState(t *testing.T) { t.Log("Alice sends lots of other state events.") const numOtherState = 40 for i := 0; i < numOtherState; i++ { - alice.Unsafe_SendEventUnsynced(t, roomID, b.Event{ - Type: "com.example.dummy", - StateKey: ptr(fmt.Sprintf("%d", i)), - Content: map[string]any{}, - }) + alice.Unsafe_SendEventUnsynced(t, roomID, b.Event{ + Type: "com.example.dummy", + StateKey: ptr(fmt.Sprintf("%d", i)), + Content: map[string]any{}, + }) } t.Log("Alice sends a batch of message events.") diff --git a/v3.go b/v3.go index 53e40637..031ff0b6 100644 --- a/v3.go +++ b/v3.go @@ -24,17 +24,13 @@ import ( "github.com/matrix-org/sliding-sync/sync2/handler2" "github.com/matrix-org/sliding-sync/sync3/handler" "github.com/pressly/goose/v3" - "github.com/rs/zerolog" "github.com/rs/zerolog/hlog" + "github.com/rs/zerolog/log" ) //go:embed state/migrations/* var EmbedMigrations embed.FS -var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{ - Out: os.Stderr, - TimeFormat: "15:04:05", -}) var Version string type Opts struct { @@ -94,14 +90,14 @@ func Setup(destHomeserver, postgresURI, secret string, opts Opts) (*handler2.Han // Sanity check that we can contact the upstream homeserver. _, err := v2Client.Versions(context.Background()) if err != nil { - logger.Warn().Err(err).Str("dest", destHomeserver).Msg("Could not contact upstream homeserver. Is SYNCV3_SERVER set correctly?") + log.Warn().Err(err).Str("dest", destHomeserver).Msg("Could not contact upstream homeserver. Is SYNCV3_SERVER set correctly?") } db, err := sqlx.Open("postgres", postgresURI) if err != nil { sentry.CaptureException(err) // TODO: if we panic(), will sentry have a chance to flush the event? - logger.Panic().Err(err).Str("uri", postgresURI).Msg("failed to open SQL DB") + log.Panic().Err(err).Str("uri", postgresURI).Msg("failed to open SQL DB") } if opts.DBMaxConns > 0 { @@ -121,7 +117,7 @@ func Setup(destHomeserver, postgresURI, secret string, opts Opts) (*handler2.Han goose.SetBaseFS(EmbedMigrations) err = goose.Up(db.DB, "state/migrations", goose.WithAllowMissing()) if err != nil { - logger.Panic().Err(err).Msg("failed to execute migrations") + log.Panic().Err(err).Msg("failed to execute migrations") } bufferSize := 50 @@ -152,7 +148,7 @@ func Setup(destHomeserver, postgresURI, secret string, opts Opts) (*handler2.Han if err != nil { panic(err) } - logger.Info().Msg("retrieved global snapshot from database") + log.Info().Msg("retrieved global snapshot from database") h3.Startup(&storeSnapshot) // begin consuming from these positions @@ -188,7 +184,7 @@ func RunSyncV3Server(h http.Handler, bindAddr, destV2Server, tlsCert, tlsKey str srv := &server{ chain: []func(next http.Handler) http.Handler{ - hlog.NewHandler(logger), + hlog.NewHandler(log.Logger), func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { r = r.WithContext(internal.RequestContext(r.Context())) @@ -220,39 +216,39 @@ func RunSyncV3Server(h http.Handler, bindAddr, destV2Server, tlsCert, tlsKey str // Block forever var err error if internal.IsUnixSocket(bindAddr) { - logger.Info().Msgf("listening on unix socket %s", bindAddr) + log.Info().Msgf("listening on unix socket %s", bindAddr) listener := unixSocketListener(bindAddr) err = http.Serve(listener, srv) } else { if tlsCert != "" && tlsKey != "" { - logger.Info().Msgf("listening TLS on %s", bindAddr) + log.Info().Msgf("listening TLS on %s", bindAddr) err = http.ListenAndServeTLS(bindAddr, tlsCert, tlsKey, srv) } else { - logger.Info().Msgf("listening on %s", bindAddr) + log.Info().Msgf("listening on %s", bindAddr) err = http.ListenAndServe(bindAddr, srv) } } if err != nil { sentry.CaptureException(err) // TODO: Fatal() calls os.Exit. Will that give time for sentry.Flush() to run? - logger.Fatal().Err(err).Msg("failed to listen and serve") + log.Fatal().Err(err).Msg("failed to listen and serve") } } func unixSocketListener(bindAddr string) net.Listener { err := os.Remove(bindAddr) if err != nil && !errors.Is(err, fs.ErrNotExist) { - logger.Fatal().Err(err).Msg("failed to remove existing unix socket") + log.Fatal().Err(err).Msg("failed to remove existing unix socket") } listener, err := net.Listen("unix", bindAddr) if err != nil { - logger.Fatal().Err(err).Msg("failed to serve unix socket") + log.Fatal().Err(err).Msg("failed to serve unix socket") } // least permissions and work out of box (-w--w--w-); could be extracted as // env variable if needed err = os.Chmod(bindAddr, 0222) if err != nil { - logger.Fatal().Err(err).Msg("failed to set unix socket permissions") + log.Fatal().Err(err).Msg("failed to set unix socket permissions") } return listener }