Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package com.evolutiongaming.skafka
package producer

import cats.data.{NonEmptyMap => Nem}
import cats.effect.{Resource, Sync, Async, Deferred}
import cats.effect.implicits._
import cats.effect.{Async, Resource, Sync}
import cats.implicits._
import cats.{Applicative, Functor, MonadError, ~>}
import com.evolutiongaming.catshelper.CatsHelper._
import com.evolutiongaming.catshelper.{Blocking, Log, MonadThrowable, ToTry}
import com.evolutiongaming.catshelper.{Blocking, Log, MonadThrowable}
import com.evolutiongaming.skafka.Converters._
import com.evolutiongaming.skafka.producer.ProducerConverters._
import com.evolutiongaming.smetrics.MeasureDuration
Expand All @@ -18,7 +18,7 @@ import org.apache.kafka.clients.producer.{
RecordMetadata => RecordMetadataJ
}

import scala.concurrent.{ExecutionContext, ExecutionException}
import scala.concurrent.{ExecutionContext, ExecutionException, Promise}
import scala.jdk.CollectionConverters._

/**
Expand Down Expand Up @@ -91,14 +91,14 @@ object Producer {
}

@deprecated("Use of(ProducerConfig)", since = "12.0.1")
def of[F[_]: ToTry: Async](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

breaks backward compatibility

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But how does MiMa check pass then?

Copy link
Contributor

@t3hnar t3hnar Feb 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But how does MiMa check pass then?

good question, should not pass

def of[F[_]: Async](
config: ProducerConfig,
executorBlocking: ExecutionContext
): Resource[F, Producer[F]] = {
of(config)
}

def of[F[_]: ToTry: Async](
def of[F[_]: Async](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

breaks backward compatibility

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I though it might break it, but MiMa checks passing kinda convinced me otherwise.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or do you mean source compatibility?

config: ProducerConfig
): Resource[F, Producer[F]] = {
val producer = CreateProducerJ(config)
Expand All @@ -108,11 +108,11 @@ object Producer {
private sealed abstract class Main

@deprecated("Use fromProducerJ2", since = "12.0.1")
def fromProducerJ1[F[_]: Blocking: ToTry: Async](producer: F[ProducerJ[Bytes, Bytes]]): Resource[F, Producer[F]] = {
def fromProducerJ1[F[_]: Blocking: Async](producer: F[ProducerJ[Bytes, Bytes]]): Resource[F, Producer[F]] = {
fromProducerJ2(producer)
}

def fromProducerJ2[F[_]: ToTry: Async](producer: F[ProducerJ[Bytes, Bytes]]): Resource[F, Producer[F]] = {
def fromProducerJ2[F[_]: Async](producer: F[ProducerJ[Bytes, Bytes]]): Resource[F, Producer[F]] = {

def blocking[A](f: => A) = Sync[F].blocking(f)

Expand Down Expand Up @@ -150,7 +150,7 @@ object Producer {

def block(record: ProducerRecordJ[Bytes, Bytes]) = {

def callbackOf(deferred: Deferred[F, Either[Throwable, RecordMetadataJ]]): Callback = {
def callbackOf(promise: Promise[RecordMetadataJ]): Callback = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you think that this is anyhow faster?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't measured it of course, it just intuitively feels more lightweight, but I don't have any strong arguments in this regard.

Copy link
Contributor

@t3hnar t3hnar Feb 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd recommend keep using deferred but call toFuture instead of toTry and deprecate API requiring ToTry

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But how is ToFuture better than ToTry, don't they both call unsafeRunSync in implementation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But how is ToFuture better than ToTry, don't they both call unsafeRunSync in implementation?

  1. less construction requirements
  2. ToFuture uses unsafeToFuture, and truly saying both are ok in this case

Copy link
Contributor Author

@RibleStrype RibleStrype Feb 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if I change ToTry to ToFuture what do I do with the resulting Future? Do I await inside the Callback or just ignore it? TBH I don't like any of these options and would rather leave ToTry then.
btw, what's the reason you are against using Promise here? To me it seems cleaner that materialising an effect in any manner

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I await inside the Callback or just ignore it?

Just ignore it, as executed code is private and you exactly know what is being executed

Copy link
Contributor

@t3hnar t3hnar Feb 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reason you are against using Promise here

Because that combination of Promise/Future adds more overhead than you might think of comparing to native, it is also not a cancellable thing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this particular use case is not cancellable either way, is it? Even if you cancel the resulting effect the producer's callback will still be executed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean cancelling the "inner" effect resulting from Deferred#get

(metadata: RecordMetadataJ, exception: Exception) =>
val result = if (exception != null) {
exception.asLeft[RecordMetadataJ]
Expand All @@ -159,22 +159,16 @@ object Producer {
} else {
SkafkaError("both metadata & exception are nulls").asLeft[RecordMetadataJ]
}
deferred
.complete(result)
.toTry
.get
()
promise.complete(result.toTry)
}

val result = for {
deferred <- Async[F].deferred[Either[Throwable, RecordMetadataJ]]
callback = callbackOf(deferred)
_ <- blocking { producer.send(record, callback) }
promise <- Sync[F].delay(Promise[RecordMetadataJ]())
callback = callbackOf(promise)
_ <- blocking { producer.send(record, callback) }
res = Async[F].fromFuture(Sync[F].delay(promise.future))
} yield {
deferred
.get
.flatMap { _.liftTo[F] }
.recoverWith(executionException)
res.recoverWith(executionException)
}
result.recoverWith(executionException)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package com.evolutiongaming.skafka.producer

import cats.effect.{Async, MonadCancel, Resource}
import cats.~>
import com.evolutiongaming.catshelper.ToTry
import com.evolutiongaming.smetrics.MeasureDuration

import scala.concurrent.ExecutionContext
Expand All @@ -15,12 +14,12 @@ trait ProducerOf[F[_]] {
object ProducerOf {

@deprecated("Use apply1", since = "12.0.1")
def apply[F[_]: MeasureDuration: ToTry: Async](
def apply[F[_]: MeasureDuration: Async](
executorBlocking: ExecutionContext,
metrics: Option[ProducerMetrics[F]] = None
): ProducerOf[F] = apply1(metrics = metrics)

def apply1[F[_]: MeasureDuration: ToTry: Async](
def apply1[F[_]: MeasureDuration: Async](
metrics: Option[ProducerMetrics[F]] = None
): ProducerOf[F] = new ProducerOf[F] {

Expand Down