Skip to content

Commit d4261f5

Browse files
committed
Basic offer management without plugins
1 parent 939e25d commit d4261f5

22 files changed

+759
-91
lines changed

eclair-core/src/main/resources/reference.conf

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -636,6 +636,14 @@ eclair {
636636
// Frequency at which we clean our DB to remove peer storage from nodes with whom we don't have channels anymore.
637637
cleanup-frequency = 1 day
638638
}
639+
640+
managed-offers {
641+
message-path-min-length = 2
642+
643+
payment-path-count = 2
644+
payment-path-length = 4
645+
payment-path-expiry-delta = 500
646+
}
639647
}
640648

641649
akka {

eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,17 @@ import fr.acinq.eclair.db.AuditDb.{NetworkFee, Stats}
3939
import fr.acinq.eclair.db.{IncomingPayment, OutgoingPayment, OutgoingPaymentStatus}
4040
import fr.acinq.eclair.io.Peer.{GetPeerInfo, OpenChannelResponse, PeerInfo}
4141
import fr.acinq.eclair.io._
42+
import fr.acinq.eclair.message.OnionMessages.{IntermediateNode, Recipient}
4243
import fr.acinq.eclair.message.{OnionMessages, Postman}
4344
import fr.acinq.eclair.payment._
45+
import fr.acinq.eclair.payment.offer.{OfferCreator, OfferManager}
4446
import fr.acinq.eclair.payment.receive.MultiPartHandler.ReceiveStandardPayment
4547
import fr.acinq.eclair.payment.relay.Relayer.{ChannelBalance, GetOutgoingChannels, OutgoingChannels, RelayFees}
4648
import fr.acinq.eclair.payment.send.PaymentInitiator._
4749
import fr.acinq.eclair.payment.send.{ClearRecipient, OfferPayment, PaymentIdentifier}
4850
import fr.acinq.eclair.router.Router
4951
import fr.acinq.eclair.router.Router._
50-
import fr.acinq.eclair.wire.protocol.OfferTypes.Offer
52+
import fr.acinq.eclair.wire.protocol.OfferTypes.{Offer, OfferAbsoluteExpiry, OfferIssuer, OfferQuantityMax, OfferTlv}
5153
import fr.acinq.eclair.wire.protocol._
5254
import grizzled.slf4j.Logging
5355
import scodec.bits.ByteVector
@@ -126,6 +128,12 @@ trait Eclair {
126128

127129
def receive(description: Either[String, ByteVector32], amount_opt: Option[MilliSatoshi], expire_opt: Option[Long], fallbackAddress_opt: Option[String], paymentPreimage_opt: Option[ByteVector32], privateChannelIds_opt: Option[List[ByteVector32]])(implicit timeout: Timeout): Future[Bolt11Invoice]
128130

131+
def createOffer(description_opt: Option[String], amount_opt: Option[MilliSatoshi], expiry_opt: Option[TimestampSecond], issuer_opt: Option[String], firstNodeId_opt: Option[PublicKey])(implicit timeout: Timeout): Future[Offer]
132+
133+
def disableOffer(offer: Offer)(implicit timeout: Timeout): Unit
134+
135+
def listOffers(onlyActive: Boolean = true)(implicit timeout: Timeout): Future[Seq[Offer]]
136+
129137
def newAddress(): Future[String]
130138

131139
def receivedInfo(paymentHash: ByteVector32)(implicit timeout: Timeout): Future[Option[IncomingPayment]]
@@ -388,6 +396,24 @@ class EclairImpl(val appKit: Kit) extends Eclair with Logging with SpendFromChan
388396
}
389397
}
390398

399+
override def createOffer(description_opt: Option[String], amount_opt: Option[MilliSatoshi], expiry_opt: Option[TimestampSecond], issuer_opt: Option[String], firstNodeId_opt: Option[PublicKey])(implicit timeout: Timeout): Future[Offer] = {
400+
val offerCreator = appKit.system.spawnAnonymous(OfferCreator(appKit.nodeParams, appKit.router, appKit.offerManager, appKit.defaultOfferHandler))
401+
offerCreator.ask[Either[String, Offer]](replyTo => OfferCreator.Create(replyTo, description_opt, amount_opt, expiry_opt, issuer_opt, firstNodeId_opt))
402+
.flatMap {
403+
case Left(errorMessage) => Future.failed(new Exception(errorMessage))
404+
case Right(offer) => Future.successful(offer)
405+
}
406+
}
407+
408+
override def disableOffer(offer: Offer)(implicit timeout: Timeout): Unit = {
409+
appKit.offerManager ! OfferManager.DisableOffer(offer)
410+
appKit.nodeParams.db.managedOffers.disableOffer(offer)
411+
}
412+
413+
override def listOffers(onlyActive: Boolean = true)(implicit timeout: Timeout): Future[Seq[Offer]] = Future {
414+
appKit.nodeParams.db.managedOffers.listOffers(onlyActive).map(_.offer)
415+
}
416+
391417
override def newAddress(): Future[String] = {
392418
appKit.wallet match {
393419
case w: BitcoinCoreClient => w.getReceiveAddress()

eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import fr.acinq.eclair.db._
3030
import fr.acinq.eclair.io.MessageRelay.{RelayAll, RelayChannelsOnly, RelayPolicy}
3131
import fr.acinq.eclair.io.{PeerConnection, PeerReadyNotifier}
3232
import fr.acinq.eclair.message.OnionMessages.OnionMessageConfig
33+
import fr.acinq.eclair.payment.offer.OffersConfig
3334
import fr.acinq.eclair.payment.relay.OnTheFlyFunding
3435
import fr.acinq.eclair.payment.relay.Relayer.{AsyncPaymentsParams, RelayFees, RelayParams}
3536
import fr.acinq.eclair.router.Announcements.AddressException
@@ -92,7 +93,8 @@ case class NodeParams(nodeKeyManager: NodeKeyManager,
9293
liquidityAdsConfig: LiquidityAds.Config,
9394
peerWakeUpConfig: PeerReadyNotifier.WakeUpConfig,
9495
onTheFlyFundingConfig: OnTheFlyFunding.Config,
95-
peerStorageConfig: PeerStorageConfig) {
96+
peerStorageConfig: PeerStorageConfig,
97+
offersConfig: OffersConfig) {
9698
val privateKey: Crypto.PrivateKey = nodeKeyManager.nodeKey.privateKey
9799

98100
val nodeId: PublicKey = nodeKeyManager.nodeId
@@ -705,6 +707,12 @@ object NodeParams extends Logging {
705707
writeDelay = FiniteDuration(config.getDuration("peer-storage.write-delay").getSeconds, TimeUnit.SECONDS),
706708
removalDelay = FiniteDuration(config.getDuration("peer-storage.removal-delay").getSeconds, TimeUnit.SECONDS),
707709
cleanUpFrequency = FiniteDuration(config.getDuration("peer-storage.cleanup-frequency").getSeconds, TimeUnit.SECONDS),
710+
),
711+
offersConfig = OffersConfig(
712+
messagePathMinLength = config.getInt("managed-offers.message-path-min-length"),
713+
paymentPathCount = config.getInt("managed-offers.payment-path-count"),
714+
paymentPathLength = config.getInt("managed-offers.payment-path-length"),
715+
paymentPathCltvExpiryDelta = CltvExpiryDelta(config.getInt("managed-offers.payment-path-expiry-delta")),
708716
)
709717
)
710718
}

eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import fr.acinq.eclair.db.FileBackupHandler.FileBackupParams
4040
import fr.acinq.eclair.db.{Databases, DbEventHandler, FileBackupHandler, PeerStorageCleaner}
4141
import fr.acinq.eclair.io._
4242
import fr.acinq.eclair.message.Postman
43-
import fr.acinq.eclair.payment.offer.OfferManager
43+
import fr.acinq.eclair.payment.offer.{DefaultHandler, OfferManager}
4444
import fr.acinq.eclair.payment.receive.PaymentHandler
4545
import fr.acinq.eclair.payment.relay.{AsyncPaymentTriggerer, PostRestartHtlcCleaner, Relayer}
4646
import fr.acinq.eclair.payment.send.{Autoprobe, PaymentInitiator}
@@ -358,6 +358,8 @@ class Setup(val datadir: File,
358358
dbEventHandler = system.actorOf(SimpleSupervisor.props(DbEventHandler.props(nodeParams), "db-event-handler", SupervisorStrategy.Resume))
359359
register = system.actorOf(SimpleSupervisor.props(Register.props(), "register", SupervisorStrategy.Resume))
360360
offerManager = system.spawn(Behaviors.supervise(OfferManager(nodeParams, paymentTimeout = 1 minute)).onFailure(typed.SupervisorStrategy.resume), name = "offer-manager")
361+
defaultOfferHandler = system.spawn(Behaviors.supervise(DefaultHandler(nodeParams, router)).onFailure(typed.SupervisorStrategy.resume), name = "default-offer-handler")
362+
_ = for (offer <- nodeParams.db.managedOffers.listOffers(onlyActive = true)) offerManager ! OfferManager.RegisterOffer(offer.offer, None, offer.pathId_opt, defaultOfferHandler)
361363
paymentHandler = system.actorOf(SimpleSupervisor.props(PaymentHandler.props(nodeParams, register, offerManager), "payment-handler", SupervisorStrategy.Resume))
362364
triggerer = system.spawn(Behaviors.supervise(AsyncPaymentTriggerer()).onFailure(typed.SupervisorStrategy.resume), name = "async-payment-triggerer")
363365
peerReadyManager = system.spawn(Behaviors.supervise(PeerReadyManager()).onFailure(typed.SupervisorStrategy.restart), name = "peer-ready-manager")
@@ -399,6 +401,7 @@ class Setup(val datadir: File,
399401
balanceActor = balanceActor,
400402
postman = postman,
401403
offerManager = offerManager,
404+
defaultOfferHandler = defaultOfferHandler,
402405
wallet = bitcoinClient)
403406

404407
zmqBlockTimeout = after(5 seconds, using = system.scheduler)(Future.failed(BitcoinZMQConnectionTimeoutException))
@@ -468,6 +471,7 @@ case class Kit(nodeParams: NodeParams,
468471
balanceActor: typed.ActorRef[BalanceActor.Command],
469472
postman: typed.ActorRef[Postman.Command],
470473
offerManager: typed.ActorRef[OfferManager.Command],
474+
defaultOfferHandler: typed.ActorRef[OfferManager.HandlerCommand],
471475
wallet: OnChainWallet with OnchainPubkeyCache)
472476

473477
object Kit {

eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ trait Databases {
4343
def channels: ChannelsDb
4444
def peers: PeersDb
4545
def payments: PaymentsDb
46+
def managedOffers: OffersDb
4647
def pendingCommands: PendingCommandsDb
4748
def liquidity: LiquidityDb
4849
//@formatter:on
@@ -66,6 +67,7 @@ object Databases extends Logging {
6667
channels: SqliteChannelsDb,
6768
peers: SqlitePeersDb,
6869
payments: SqlitePaymentsDb,
70+
managedOffers: SqliteOffersDb,
6971
pendingCommands: SqlitePendingCommandsDb,
7072
private val backupConnection: Connection) extends Databases with FileBackup {
7173
override def backup(backupFile: File): Unit = SqliteUtils.using(backupConnection.createStatement()) {
@@ -85,6 +87,7 @@ object Databases extends Logging {
8587
channels = new SqliteChannelsDb(eclairJdbc),
8688
peers = new SqlitePeersDb(eclairJdbc),
8789
payments = new SqlitePaymentsDb(eclairJdbc),
90+
managedOffers = new SqliteOffersDb(eclairJdbc),
8891
pendingCommands = new SqlitePendingCommandsDb(eclairJdbc),
8992
backupConnection = eclairJdbc
9093
)
@@ -97,6 +100,7 @@ object Databases extends Logging {
97100
channels: PgChannelsDb,
98101
peers: PgPeersDb,
99102
payments: PgPaymentsDb,
103+
managedOffers: PgOffersDb,
100104
pendingCommands: PgPendingCommandsDb,
101105
dataSource: HikariDataSource,
102106
lock: PgLock) extends Databases with ExclusiveLock {
@@ -157,6 +161,7 @@ object Databases extends Logging {
157161
channels = new PgChannelsDb,
158162
peers = new PgPeersDb,
159163
payments = new PgPaymentsDb,
164+
managedOffers = new PgOffersDb,
160165
pendingCommands = new PgPendingCommandsDb,
161166
dataSource = ds,
162167
lock = lock)

eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ case class DualDatabases(primary: Databases, secondary: Databases) extends Datab
3636
override val channels: ChannelsDb = DualChannelsDb(primary.channels, secondary.channels)
3737
override val peers: PeersDb = DualPeersDb(primary.peers, secondary.peers)
3838
override val payments: PaymentsDb = DualPaymentsDb(primary.payments, secondary.payments)
39+
override val managedOffers: OffersDb = DualOffersDb(primary.managedOffers, secondary.managedOffers)
3940
override val pendingCommands: PendingCommandsDb = DualPendingCommandsDb(primary.pendingCommands, secondary.pendingCommands)
4041
override val liquidity: LiquidityDb = DualLiquidityDb(primary.liquidity, secondary.liquidity)
4142

@@ -405,6 +406,31 @@ case class DualPaymentsDb(primary: PaymentsDb, secondary: PaymentsDb) extends Pa
405406
}
406407
}
407408

409+
case class DualOffersDb(primary: OffersDb, secondary: OffersDb) extends OffersDb {
410+
411+
private implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("db-offers").build()))
412+
413+
override def addOffer(offer: OfferTypes.Offer, pathId_opt: Option[ByteVector32]): Unit = {
414+
runAsync(secondary.addOffer(offer, pathId_opt))
415+
primary.addOffer(offer, pathId_opt)
416+
}
417+
418+
override def disableOffer(offer: OfferTypes.Offer): Unit = {
419+
runAsync(secondary.disableOffer(offer))
420+
primary.disableOffer(offer)
421+
}
422+
423+
override def enableOffer(offer: OfferTypes.Offer): Unit = {
424+
runAsync(secondary.enableOffer(offer))
425+
primary.enableOffer(offer)
426+
}
427+
428+
override def listOffers(onlyActive: Boolean): Seq[OfferData] = {
429+
runAsync(secondary.listOffers(onlyActive))
430+
primary.listOffers(onlyActive)
431+
}
432+
}
433+
408434
case class DualPendingCommandsDb(primary: PendingCommandsDb, secondary: PendingCommandsDb) extends PendingCommandsDb {
409435

410436
private implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("db-pending-commands").build()))
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright 2025 ACINQ SAS
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package fr.acinq.eclair.db
18+
19+
import fr.acinq.bitcoin.scalacompat.ByteVector32
20+
import fr.acinq.eclair.TimestampMilli
21+
import fr.acinq.eclair.wire.protocol.OfferTypes.Offer
22+
23+
/**
24+
* Database for offers fully managed by eclair, as opposed to offers managed by a plugin.
25+
*/
26+
trait OffersDb {
27+
/**
28+
* Add an offer managed by eclair.
29+
* @param pathId_opt If the offer uses a blinded path, this is the corresponding pathId.
30+
*/
31+
def addOffer(offer: Offer, pathId_opt: Option[ByteVector32]): Unit
32+
33+
/**
34+
* Disable an offer. The offer is still stored but new invoice requests and new payment attempts for already emitted
35+
* invoices will be rejected.
36+
*/
37+
def disableOffer(offer: Offer): Unit
38+
39+
/**
40+
* Activate an offer that was previously disabled.
41+
*/
42+
def enableOffer(offer: Offer): Unit
43+
44+
/**
45+
* List offers managed by eclair.
46+
* @param onlyActive Whether to return only active offers or also disabled ones.
47+
*/
48+
def listOffers(onlyActive: Boolean): Seq[OfferData]
49+
}
50+
51+
case class OfferData(offer: Offer, pathId_opt: Option[ByteVector32], createdAt: TimestampMilli, isActive: Boolean)
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Copyright 2025 ACINQ SAS
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package fr.acinq.eclair.db.pg
18+
19+
import fr.acinq.bitcoin.scalacompat.ByteVector32
20+
import fr.acinq.eclair.TimestampMilli
21+
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
22+
import fr.acinq.eclair.db.Monitoring.Tags.DbBackends
23+
import fr.acinq.eclair.db.{OfferData, OffersDb}
24+
import fr.acinq.eclair.db.pg.PgUtils.PgLock
25+
import fr.acinq.eclair.wire.protocol.OfferTypes
26+
import fr.acinq.eclair.wire.protocol.OfferTypes.Offer
27+
import grizzled.slf4j.Logging
28+
29+
import java.sql.ResultSet
30+
import javax.sql.DataSource
31+
32+
object PgOffersDb {
33+
val DB_NAME = "offers"
34+
val CURRENT_VERSION = 1
35+
}
36+
37+
class PgOffersDb(implicit ds: DataSource, lock: PgLock) extends OffersDb with Logging {
38+
39+
import PgOffersDb._
40+
import PgUtils.ExtendedResultSet._
41+
import PgUtils._
42+
import lock._
43+
44+
inTransaction { pg =>
45+
using(pg.createStatement()) { statement =>
46+
getVersion(statement, DB_NAME) match {
47+
case None =>
48+
statement.executeUpdate("CREATE SCHEMA offers")
49+
statement.executeUpdate("CREATE TABLE offers.managed (offer_id TEXT NOT NULL PRIMARY KEY, offer TEXT NOT NULL, path_id TEXT, created_at TIMESTAMP WITH TIME ZONE NOT NULL, is_active BOOLEAN NOT NULL)")
50+
statement.executeUpdate("CREATE INDEX offer_is_active_idx ON offers.managed(is_active)")
51+
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
52+
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
53+
}
54+
setVersion(statement, DB_NAME, CURRENT_VERSION)
55+
}
56+
}
57+
58+
override def addOffer(offer: OfferTypes.Offer, pathId_opt: Option[ByteVector32]): Unit = withMetrics("offers/add", DbBackends.Postgres){
59+
withLock { pg =>
60+
using(pg.prepareStatement("INSERT INTO offers.managed (offer_id, offer, path_id, created_at, is_active) VALUES (?, ?, ?, ?, TRUE)")) { statement =>
61+
statement.setString(1, offer.offerId.toHex)
62+
statement.setString(2, offer.toString)
63+
pathId_opt match {
64+
case Some(pathId) => statement.setString(3, pathId.toHex)
65+
case None => statement.setNull(3, java.sql.Types.VARCHAR)
66+
}
67+
statement.setTimestamp(4, TimestampMilli.now().toSqlTimestamp)
68+
statement.executeUpdate()
69+
}
70+
}
71+
}
72+
73+
override def disableOffer(offer: OfferTypes.Offer): Unit = withMetrics("offers/disable", DbBackends.Postgres){
74+
withLock { pg =>
75+
using(pg.prepareStatement("UPDATE offers.managed SET is_active = FALSE WHERE offer_id = ?")) { statement =>
76+
statement.setString(1, offer.offerId.toHex)
77+
statement.executeUpdate()
78+
}
79+
}
80+
}
81+
82+
override def enableOffer(offer: OfferTypes.Offer): Unit = withMetrics("offers/enable", DbBackends.Postgres){
83+
withLock { pg =>
84+
using(pg.prepareStatement("UPDATE offers.managed SET is_active = TRUE WHERE offer_id = ?")) { statement =>
85+
statement.setString(1, offer.offerId.toHex)
86+
statement.executeUpdate()
87+
}
88+
}
89+
}
90+
91+
private def parseOfferData(rs: ResultSet): OfferData = {
92+
OfferData(
93+
Offer.decode(rs.getString("offer")).get,
94+
rs.getStringNullable("path_id").map(ByteVector32.fromValidHex),
95+
TimestampMilli.fromSqlTimestamp(rs.getTimestamp("created_at")),
96+
rs.getBoolean("is_active")
97+
)
98+
}
99+
100+
override def listOffers(onlyActive: Boolean): Seq[OfferData] = withMetrics("offers/list", DbBackends.Postgres){
101+
withLock { pg =>
102+
if (onlyActive) {
103+
using(pg.prepareStatement("SELECT * FROM offers.managed WHERE is_active = TRUE")) { statement =>
104+
statement.executeQuery().map(parseOfferData).toSeq
105+
}
106+
} else {
107+
using(pg.prepareStatement("SELECT * FROM offers.managed")) { statement =>
108+
statement.executeQuery().map(parseOfferData).toSeq
109+
}
110+
}
111+
}
112+
}
113+
}

0 commit comments

Comments
 (0)