@@ -1126,46 +1126,55 @@ impl Relay {
1126
1126
) ) ) ;
1127
1127
}
1128
1128
1129
- time:: timeout ( Some ( opts. timeout ) , async {
1130
- self . send_msg ( ClientMessage :: new_event ( event) , None ) . await ?;
1131
- let mut notifications = self . notification_sender . subscribe ( ) ;
1132
- while let Ok ( notification) = notifications. recv ( ) . await {
1133
- match notification {
1134
- RelayPoolNotification :: Message (
1135
- url,
1136
- RelayMessage :: Ok {
1137
- event_id,
1138
- status,
1139
- message,
1140
- } ,
1141
- ) => {
1142
- if self . url == url && id == event_id {
1143
- if status {
1144
- return Ok ( event_id) ;
1145
- } else {
1146
- return Err ( Error :: EventNotPublished ( message) ) ;
1129
+ if event. is_ephemeral ( ) {
1130
+ tracing:: warn!(
1131
+ "Trying to send an ephemeral event: skip the wait for `OK` msg from relay."
1132
+ ) ;
1133
+ self . send_msg ( ClientMessage :: new_event ( event) , Some ( opts. timeout ) )
1134
+ . await ?;
1135
+ Ok ( id)
1136
+ } else {
1137
+ time:: timeout ( Some ( opts. timeout ) , async {
1138
+ self . send_msg ( ClientMessage :: new_event ( event) , None ) . await ?;
1139
+ let mut notifications = self . notification_sender . subscribe ( ) ;
1140
+ while let Ok ( notification) = notifications. recv ( ) . await {
1141
+ match notification {
1142
+ RelayPoolNotification :: Message (
1143
+ url,
1144
+ RelayMessage :: Ok {
1145
+ event_id,
1146
+ status,
1147
+ message,
1148
+ } ,
1149
+ ) => {
1150
+ if self . url == url && id == event_id {
1151
+ if status {
1152
+ return Ok ( event_id) ;
1153
+ } else {
1154
+ return Err ( Error :: EventNotPublished ( message) ) ;
1155
+ }
1147
1156
}
1148
1157
}
1149
- }
1150
- RelayPoolNotification :: RelayStatus { url , status } => {
1151
- if opts . skip_disconnected && url == self . url {
1152
- if let RelayStatus :: Disconnected
1153
- | RelayStatus :: Stopped
1154
- | RelayStatus :: Terminated = status
1155
- {
1156
- return Err ( Error :: EventNotPublished ( String :: from (
1157
- "relay not connected (status changed)" ,
1158
- ) ) ) ;
1158
+ RelayPoolNotification :: RelayStatus { url , status } => {
1159
+ if opts . skip_disconnected && url == self . url {
1160
+ if let RelayStatus :: Disconnected
1161
+ | RelayStatus :: Stopped
1162
+ | RelayStatus :: Terminated = status
1163
+ {
1164
+ return Err ( Error :: EventNotPublished ( String :: from (
1165
+ "relay not connected (status changed)" ,
1166
+ ) ) ) ;
1167
+ }
1159
1168
}
1160
1169
}
1170
+ _ => ( ) ,
1161
1171
}
1162
- _ => ( ) ,
1163
1172
}
1164
- }
1165
- Err ( Error :: LoopTerminated )
1166
- } )
1167
- . await
1168
- . ok_or ( Error :: Timeout ) ?
1173
+ Err ( Error :: LoopTerminated )
1174
+ } )
1175
+ . await
1176
+ . ok_or ( Error :: Timeout ) ?
1177
+ }
1169
1178
}
1170
1179
1171
1180
/// Send multiple [`Event`] at once
@@ -1184,15 +1193,22 @@ impl Relay {
1184
1193
) ) ) ;
1185
1194
}
1186
1195
1187
- let msgs: Vec < ClientMessage > = events
1188
- . iter ( )
1189
- . cloned ( )
1190
- . map ( ClientMessage :: new_event)
1191
- . collect ( ) ;
1196
+ let mut msgs: Vec < ClientMessage > = Vec :: with_capacity ( events. len ( ) ) ;
1197
+ let mut missing: HashSet < EventId > = HashSet :: new ( ) ;
1198
+
1199
+ for event in events. into_iter ( ) {
1200
+ if !event. is_ephemeral ( ) {
1201
+ missing. insert ( event. id ) ;
1202
+ } else {
1203
+ tracing:: warn!(
1204
+ "Trying to batch ephemeral event: skip the wait for `OK` msg from relay."
1205
+ ) ;
1206
+ }
1207
+ msgs. push ( ClientMessage :: new_event ( event) ) ;
1208
+ }
1192
1209
1193
1210
time:: timeout ( Some ( opts. timeout ) , async {
1194
1211
self . batch_msg ( msgs, None ) . await ?;
1195
- let mut missing: HashSet < EventId > = events. into_iter ( ) . map ( |e| e. id ) . collect ( ) ;
1196
1212
let mut published: HashSet < EventId > = HashSet :: new ( ) ;
1197
1213
let mut not_published: HashMap < EventId , String > = HashMap :: new ( ) ;
1198
1214
let mut notifications = self . notification_sender . subscribe ( ) ;
0 commit comments