@@ -48,21 +48,23 @@ export function fireAndForget<TData>(
4848 inputCodec : Codec < TData >
4949) : (
5050 rsocket : RSocket ,
51- metadata : Map < string | number | WellKnownMimeType , Buffer >
51+ metadata ? : Map < string | number | WellKnownMimeType , Buffer >
5252) => Observable < void > {
5353 return (
5454 rsocket : RSocket ,
55- metadata : Map < string | number | WellKnownMimeType , Buffer >
56- ) =>
57- new RSocketPublisherToObservable ( ( s ) =>
55+ metadata ?: Map < string | number | WellKnownMimeType , Buffer >
56+ ) => {
57+ const encodedMetadata = metadata ? encodeCompositeMetadata ( metadata ) : null ;
58+ return new RSocketPublisherToObservable ( ( s ) =>
5859 rsocket . fireAndForget (
5960 {
6061 data : data ? inputCodec . encode ( data ) : Buffer . allocUnsafe ( 0 ) ,
61- metadata : encodeCompositeMetadata ( metadata ) ,
62+ metadata : encodedMetadata ,
6263 } ,
6364 s
6465 )
6566 ) ;
67+ } ;
6668}
6769
6870export function requestResponse < TData , RData > (
@@ -71,23 +73,25 @@ export function requestResponse<TData, RData>(
7173 outputCodec : Codec < RData >
7274) : (
7375 rsocket : RSocket ,
74- metadata : Map < string | number | WellKnownMimeType , Buffer >
76+ metadata ? : Map < string | number | WellKnownMimeType , Buffer >
7577) => Observable < RData > {
7678 return (
7779 rsocket : RSocket ,
78- metadata : Map < string | number | WellKnownMimeType , Buffer >
79- ) =>
80- new RSocketPublisherToObservable (
80+ metadata ?: Map < string | number | WellKnownMimeType , Buffer >
81+ ) => {
82+ const encodedMetadata = metadata ? encodeCompositeMetadata ( metadata ) : null ;
83+ return new RSocketPublisherToObservable (
8184 ( s ) =>
8285 rsocket . requestResponse (
8386 {
8487 data : data ? inputCodec . encode ( data ) : Buffer . allocUnsafe ( 0 ) ,
85- metadata : encodeCompositeMetadata ( metadata ) ,
88+ metadata : encodedMetadata ,
8689 } ,
8790 s
8891 ) ,
8992 outputCodec
9093 ) ;
94+ } ;
9195}
9296
9397export function requestStream < TData , RData > (
@@ -98,18 +102,19 @@ export function requestStream<TData, RData>(
98102 scheduler : SchedulerLike = asyncScheduler
99103) : (
100104 rsocket : RSocket ,
101- metadata : Map < string | number | WellKnownMimeType , Buffer >
105+ metadata ? : Map < string | number | WellKnownMimeType , Buffer >
102106) => Observable < RData > {
103107 return (
104108 rsocket : RSocket ,
105- metadata : Map < string | number | WellKnownMimeType , Buffer >
106- ) =>
107- new RSocketPublisherToPrefetchingObservable (
109+ metadata ?: Map < string | number | WellKnownMimeType , Buffer >
110+ ) => {
111+ const encodedMetadata = metadata ? encodeCompositeMetadata ( metadata ) : null ;
112+ return new RSocketPublisherToPrefetchingObservable (
108113 ( s , n ) =>
109114 rsocket . requestStream (
110115 {
111116 data : data ? inputCodec . encode ( data ) : Buffer . allocUnsafe ( 0 ) ,
112- metadata : encodeCompositeMetadata ( metadata ) ,
117+ metadata : encodedMetadata ,
113118 } ,
114119 n ,
115120 s
@@ -118,6 +123,7 @@ export function requestStream<TData, RData>(
118123 outputCodec ,
119124 scheduler
120125 ) ;
126+ } ;
121127}
122128
123129export function requestChannel < TData , RData > (
@@ -128,9 +134,9 @@ export function requestChannel<TData, RData>(
128134 scheduler : SchedulerLike = asyncScheduler
129135) : (
130136 rsocket : RSocket ,
131- metadata : Map < string | number | WellKnownMimeType , Buffer >
137+ metadata ? : Map < string | number | WellKnownMimeType , Buffer >
132138) => Observable < RData > {
133- const [ firstValueObservable , restValuestObservable ] = partition (
139+ const [ firstValueObservable , restValuesObservable ] = partition (
134140 datas . pipe (
135141 share ( {
136142 connector : ( ) => new Subject ( ) ,
@@ -142,35 +148,37 @@ export function requestChannel<TData, RData>(
142148
143149 return (
144150 rsocket : RSocket ,
145- metadata : Map < string | number | WellKnownMimeType , Buffer >
146- ) =>
147- firstValueObservable . pipe (
151+ metadata ?: Map < string | number | WellKnownMimeType , Buffer >
152+ ) => {
153+ const encodedMetadata = metadata ? encodeCompositeMetadata ( metadata ) : null ;
154+ return firstValueObservable . pipe (
148155 take ( 1 ) ,
149- concatMap (
150- ( firstValue ) =>
151- new Observer2BufferingSubscriberToPublisher2PrefetchingObservable (
152- (
153- s : OnTerminalSubscriber &
154- OnNextSubscriber &
155- OnExtensionSubscriber &
156- Requestable &
157- Cancellable
158- ) =>
159- rsocket . requestChannel (
160- {
161- data : inputCodec . encode ( firstValue ) ,
162- metadata : encodeCompositeMetadata ( metadata ) ,
163- } ,
164- prefetch ,
165- false ,
166- s
167- ) ,
168- prefetch ,
169- restValuestObservable ,
170- inputCodec ,
171- outputCodec ,
172- scheduler
173- ) as Observable < RData >
174- )
156+ concatMap ( ( firstValue ) => {
157+ return new Observer2BufferingSubscriberToPublisher2PrefetchingObservable (
158+ (
159+ s : OnTerminalSubscriber &
160+ OnNextSubscriber &
161+ OnExtensionSubscriber &
162+ Requestable &
163+ Cancellable
164+ ) => {
165+ return rsocket . requestChannel (
166+ {
167+ data : inputCodec . encode ( firstValue ) ,
168+ metadata : encodedMetadata ,
169+ } ,
170+ prefetch ,
171+ false ,
172+ s
173+ ) ;
174+ } ,
175+ prefetch ,
176+ restValuesObservable ,
177+ inputCodec ,
178+ outputCodec ,
179+ scheduler
180+ ) as Observable < RData > ;
181+ } )
175182 ) ;
183+ } ;
176184}
0 commit comments