Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@

# Scala Steward: Reformat with scalafmt 3.9.7
33d51e84ca26e3fb58f7de9680d3722e61a89a0f

# Scala Steward: Reformat with scalafmt 3.10.3
17cfba21e1b117525d179d55580fe4320091b7ea
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# - better interop with default IntelliJ IDEA setup (matching import and modifiers sorting logic)
# - better developer experience on laptop screens (like 16' MBPs) with IntelliJ IDEA (line wraps)

version = 3.10.1
version = 3.10.3

runner.dialect = scala213source3

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,11 @@ object JournalAdapter {
for {
key <- toKey(persistenceId)
pointer <- journals(key).pointer
} yield for {
pointer <- pointer
if pointer >= from
} yield pointer
} yield
for {
pointer <- pointer
if pointer >= from
} yield pointer
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,12 @@ object JournalSuite {
): F[List[EventRecord[A]]] = {
for {
records <- journal.read().toList
} yield for {
record <- records
} yield {
record.copy(timestamp = timestamp)
}
} yield
for {
record <- records
} yield {
record.copy(timestamp = timestamp)
}
}

def pointer: F[Option[SeqNr]] = journal.pointer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,29 +87,32 @@ class SettingsIntSpec extends AsyncWordSpec with BeforeAndAfterAll with Matchers
}
val all = for {
settings <- settings.all.toList
} yield for {
setting <- settings
if setting.key =!= "schema-version"
} yield {
fix(setting)
}
} yield
for {
setting <- settings
if setting.key =!= "schema-version"
} yield {
fix(setting)
}

def get(key: Setting.Key) = for {
setting <- settings.get(key)
} yield for {
setting <- setting
} yield {
fix(setting)
}

def remove(key: Setting.Key) = {
} yield
for {
setting <- settings.remove(key)
} yield for {
setting <- setting
} yield {
fix(setting)
}

def remove(key: Setting.Key) = {
for {
setting <- settings.remove(key)
} yield
for {
setting <- setting
} yield {
fix(setting)
}
}

for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,12 @@ class ReplicatorIntSpec extends AsyncWordSpec with BeforeAndAfterAll with Matche
val recordMetadata1 = recordMetadata.withExpireAfter(expireAfter)
for {
partitionOffset <- journal.append(events, recordMetadata1, headers)
} yield for {
event <- events
} yield {
EventRecord(event, timestamp, partitionOffset, origin.some, version.some, recordMetadata1, headers)
}
} yield
for {
event <- events
} yield {
EventRecord(event, timestamp, partitionOffset, origin.some, version.some, recordMetadata1, headers)
}
}

"replicate events and expire" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,17 @@ private[journal] object SettingStatements {
.setConsistencyLevel(consistencyConfig.value)
for {
row <- bound.first
} yield for {
row <- row
} yield {
Setting(
key = key,
value = row.decode[Value]("value"),
timestamp = row.decode[Instant]("timestamp"),
origin = row.decode[Option[Origin]],
)
}
} yield
for {
row <- row
} yield {
Setting(
key = key,
value = row.decode[Value]("value"),
timestamp = row.decode[Instant]("timestamp"),
origin = row.decode[Option[Origin]],
)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ private[journal] object SettingsCassandra {
def get(key: K): F[Option[Setting]] = {
for {
setting <- statements.select(key)
} yield for {
setting <- setting
} yield setting
} yield
for {
setting <- setting
} yield setting
}

def set(key: K, value: V): F[Option[Setting]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ object CassandraMetadata {
def keyspace(name: String): F[Option[KeyspaceMetadata[F]]] = {
for {
keyspace <- metadata.keyspace(name)
} yield for {
keyspace <- keyspace
} yield {
KeyspaceMetadata[F](keyspace)
}
} yield
for {
keyspace <- keyspace
} yield {
KeyspaceMetadata[F](keyspace)
}
}
}
}
Expand All @@ -37,11 +38,12 @@ object KeyspaceMetadata {
def table(name: String): F[Option[TableMetadata]] = {
for {
table <- metadata.table(name)
} yield for {
table <- table
} yield {
TableMetadata(table.name)
}
} yield
for {
table <- table
} yield {
TableMetadata(table.name)
}
}
}
}
Expand Down
11 changes: 6 additions & 5 deletions core/src/main/scala/com/evolution/kafka/journal/Origin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ object Origin {
def hostName[F[_]: Sync]: F[Option[Origin]] = {
for {
hostName <- HostName.of[F]()
} yield for {
hostName <- hostName
} yield {
fromHostName(hostName)
}
} yield
for {
hostName <- hostName
} yield {
fromHostName(hostName)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,12 @@ private[journal] object MetaJournalStatements {
.first
for {
row <- row
} yield for {
row <- row
} yield {
row.decode[JournalHead]
}
} yield
for {
row <- row
} yield {
row.decode[JournalHead]
}
}
}
}
Expand Down Expand Up @@ -198,11 +199,12 @@ private[journal] object MetaJournalStatements {
.first
for {
row <- row
} yield for {
row <- row
} yield {
JournalPointer(partitionOffset = row.decode[PartitionOffset], seqNr = row.decode[SeqNr])
}
} yield
for {
row <- row
} yield {
JournalPointer(partitionOffset = row.decode[PartitionOffset], seqNr = row.decode[SeqNr])
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,21 +465,23 @@ object JournalSpec {
def delete(to: DeleteTo): F[Option[Offset]] = {
for {
partitionOffset <- journal.delete(to)
} yield for {
partitionOffset <- partitionOffset
} yield {
partitionOffset.offset
}
} yield
for {
partitionOffset <- partitionOffset
} yield {
partitionOffset.offset
}
}

def purge: F[Option[Offset]] = {
for {
partitionOffset <- journal.purge
} yield for {
partitionOffset <- partitionOffset
} yield {
partitionOffset.offset
}
} yield
for {
partitionOffset <- partitionOffset
} yield {
partitionOffset.offset
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,11 @@ object JournalAdapter {
for {
key <- toKey(persistenceId)
pointer <- journals(key).pointer
} yield for {
pointer <- pointer
if pointer >= from
} yield pointer
} yield
for {
pointer <- pointer
if pointer >= from
} yield pointer
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,12 @@ object JournalSuite {
): F[List[EventRecord[A]]] = {
for {
records <- journal.read().toList
} yield for {
record <- records
} yield {
record.copy(timestamp = timestamp)
}
} yield
for {
record <- records
} yield {
record.copy(timestamp = timestamp)
}
}

def pointer: F[Option[SeqNr]] = journal.pointer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,29 +87,32 @@ class SettingsIntSpec extends AsyncWordSpec with BeforeAndAfterAll with Matchers
}
val all = for {
settings <- settings.all.toList
} yield for {
setting <- settings
if setting.key =!= "schema-version"
} yield {
fix(setting)
}
} yield
for {
setting <- settings
if setting.key =!= "schema-version"
} yield {
fix(setting)
}

def get(key: Setting.Key) = for {
setting <- settings.get(key)
} yield for {
setting <- setting
} yield {
fix(setting)
}

def remove(key: Setting.Key) = {
} yield
for {
setting <- settings.remove(key)
} yield for {
setting <- setting
} yield {
fix(setting)
}

def remove(key: Setting.Key) = {
for {
setting <- settings.remove(key)
} yield
for {
setting <- setting
} yield {
fix(setting)
}
}

for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,12 @@ class ReplicatorIntSpec extends AsyncWordSpec with BeforeAndAfterAll with Matche
val recordMetadata1 = recordMetadata.withExpireAfter(expireAfter)
for {
partitionOffset <- journal.append(events, recordMetadata1, headers)
} yield for {
event <- events
} yield {
EventRecord(event, timestamp, partitionOffset, origin.some, version.some, recordMetadata1, headers)
}
} yield
for {
event <- events
} yield {
EventRecord(event, timestamp, partitionOffset, origin.some, version.some, recordMetadata1, headers)
}
}

"replicate events and expire" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,12 @@ private[journal] object ReplicateRecords {
JournalError(s"ReplicateRecords failed for id: $id, partition: $partition, offset: $offset: $e", e)
}
eventualEvents <- events.events.traverse { _.traverse { a => eventualWrite(a) } }
} yield for {
event <- eventualEvents
} yield {
EventRecord(record, event, events.metadata)
}
} yield
for {
event <- eventualEvents
} yield {
EventRecord(record, event, events.metadata)
}
}
expireAfter = events.last.metadata.payload.expireAfter
result <- journal.append(offset, timestamp, expireAfter, events)
Expand Down
Loading