From a593dcf900aa0dde0360d0370629dfddccc62c2d Mon Sep 17 00:00:00 2001 From: remorses Date: Sat, 20 Jul 2024 13:46:23 +0200 Subject: [PATCH 01/10] use streamSSEResponse to stream sse --- src/treaty2/index.ts | 85 ++++++++++++++++++++++++++++++++++---------- 1 file changed, 66 insertions(+), 19 deletions(-) diff --git a/src/treaty2/index.ts b/src/treaty2/index.ts index 74eaa90..ad3f2f2 100644 --- a/src/treaty2/index.ts +++ b/src/treaty2/index.ts @@ -112,28 +112,75 @@ const processHeaders = ( } } -export async function* streamResponse(response: Response) { - const body = response.body - - if (!body) return - - const reader = body.getReader() - const decoder = new TextDecoder() - - try { - while (true) { - const { done, value } = await reader.read() - if (done) break +interface SSEEvent { + event: string; + data: string; + id?: string; +} - const data = decoder.decode(value) +export async function* streamSSEResponse(response: Response): AsyncGenerator { + const body = response.body; + if (!body) return; + + const reader = body.getReader(); + const decoder = new TextDecoder(); + let buffer = ''; + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + + let eventEnd: number; + while ((eventEnd = buffer.indexOf('\n\n')) >= 0) { + const eventData = buffer.slice(0, eventEnd); + buffer = buffer.slice(eventEnd + 2); + + const event = parseEvent(eventData); + if (event) { + yield event; + } + } + } + + if (buffer.trim()) { + const event = parseEvent(buffer); + if (event) { + yield event; + } + } + } finally { + reader.releaseLock(); + } +} - yield parseStringifiedValue(data) - } - } finally { - reader.releaseLock() - } +function parseEvent(eventData: string): SSEEvent | null { + let event: string = 'message'; + let data: string[] = []; + let id: string | undefined; + + const lines = eventData.split('\n'); + for (const line of lines) { + if (line.startsWith('event:')) { + event = line.slice(6).trim(); + } else if (line.startsWith('data:')) { + data.push(line.slice(5)); + } else if (line.startsWith('id:')) { + id = line.slice(3).trim(); + } + } + + if (data.length > 0) { + const dataString = data.join('\n'); + return { event, data: dataString, id }; + } + + return null; } + const createProxy = ( domain: string, config: Treaty.Config, @@ -407,7 +454,7 @@ const createProxy = ( response.headers.get('Content-Type')?.split(';')[0] ) { case 'text/event-stream': - data = streamResponse(response) + data = streamSSEResponse(response) break case 'application/json': From a8bfb8440ff7d30c97ae866f7f287c993b108a94 Mon Sep 17 00:00:00 2001 From: remorses Date: Sat, 20 Jul 2024 13:55:34 +0200 Subject: [PATCH 02/10] parse stream events --- src/treaty2/index.ts | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/src/treaty2/index.ts b/src/treaty2/index.ts index ad3f2f2..6acb6be 100644 --- a/src/treaty2/index.ts +++ b/src/treaty2/index.ts @@ -158,7 +158,7 @@ export async function* streamSSEResponse(response: Response): AsyncGenerator 0) { - const dataString = data.join('\n'); - return { event, data: dataString, id }; - } - - return null; + return { event, data: tryParsingJson(data), id }; } +function tryParsingJson(data: string) { + try { + return JSON.parse(data) + } catch (error) { + return data + } +} + const createProxy = ( domain: string, config: Treaty.Config, From a1de16511e8de1669dc20e5c946e129ea69eee79 Mon Sep 17 00:00:00 2001 From: remorses Date: Sat, 20 Jul 2024 13:58:49 +0200 Subject: [PATCH 03/10] throw errors --- src/treaty2/index.ts | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/treaty2/index.ts b/src/treaty2/index.ts index 6acb6be..b6b8f99 100644 --- a/src/treaty2/index.ts +++ b/src/treaty2/index.ts @@ -114,7 +114,7 @@ const processHeaders = ( interface SSEEvent { event: string; - data: string; + data: any; id?: string; } @@ -139,8 +139,11 @@ export async function* streamSSEResponse(response: Response): AsyncGenerator Date: Sat, 20 Jul 2024 14:49:04 +0200 Subject: [PATCH 04/10] return null if json cannot be parsed in case the stream ends while in progress --- src/treaty2/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/treaty2/index.ts b/src/treaty2/index.ts index b6b8f99..68278c5 100644 --- a/src/treaty2/index.ts +++ b/src/treaty2/index.ts @@ -184,7 +184,7 @@ function tryParsingJson(data: string): any { try { return JSON.parse(data) } catch (error) { - return data + return null } } From 41f6ae6affc43fd9c5d60683024e7f8b75512de2 Mon Sep 17 00:00:00 2001 From: remorses Date: Sun, 21 Jul 2024 14:38:13 +0200 Subject: [PATCH 05/10] use eventsource-parser/stream --- bun.lockb | Bin 104038 -> 104420 bytes package.json | 3 ++ src/treaty2/index.ts | 93 +++++++++++++++++++------------------------ 3 files changed, 45 insertions(+), 51 deletions(-) diff --git a/bun.lockb b/bun.lockb index 50d67c89fb548492903f5d80936fc22298ff2578..7af577626fd200b28d184f6cd5ba3d1a9178c3df 100755 GIT binary patch delta 17428 zcmeHvd0bW1+W%e$jGdSlA|C9q=F*gP?+M-GMA{B zV`@sKXo^FMW{E>)-ELa9ymfEgtTb)LuKD{udjM~jd*Aze-`@AH_NVWD)_1LE4bQWl zwf1HoSD)=^_1C?vR)z*$%~|Xk{rqPG{_Hn&;l>~qd*%<*R(}~>`tv8>cZr$2KGpp( z!6dreSK0hZ2fArQ&5_zkQrTc6J3!8|7iLm!FTfc*7K&{lpD4(jl{urZAU`)d`^h{> zde1?Ux*-23q%Y(~oiB#;2A>Mq6>KWWdNBDI6E zC-Q%SHKiU%e1ZhY3%YzomzyCwA-@vR9kNK5nYv8XWs)v~blFLlOqVy|zK*E>fiBPL z@}RR+supZT2DyBdE=zTpqswu+OvF@>MTG_AcqA|?8KU6HjQsS1OsN2)rGe(z^V176 z3#6Q4(vqZ+untX>6=r45us=~G#p`^K2mDX2n~aKXkW;gBCsTz^UYbYDkgmvo8={|mvlnE}p&&`aM7KwIpU(PJq!(s|OwTO313y!}wwNaxXpVh`JukhW;5PD! zpDINGRrJJ!Qt*AQ7rYEk9dCuC0d7EqQH5Mc^1u)T1j$5jc)9EfIL+xrNa~ORhGabg ziR!!hYx=)#$NuYvh6ZQ@`E^5S@DL54KO}W@wwo4wdm$;x7a@Wve;qhf%4T66)ZjzN zldR{L9Xv}9fYER!pp=HKKvHDbu*j(OqV$|>xKo;zTa=MILy~AdCKpLk;scsFk&vYK z1thf^7O3&b`RN&$Z)pcO<==oLcUptA@|?{4shJT{XX%>Wki8&8o$KIWt->WkYv;XNHS`cE{&kc2B$$~a5u9h zby1i$v{||w8Low1`i!ES^umm+jPwlhfRqH+kO^ZVwAS8@Y_cwIMQLLk0#19y8jH4fGzFj@e~V(ZF@{2qrqdIWrqclu11bA6 z+940z;Q_5Y*)|^6s;{?%%VdfO+EnVk4FK^`;cLJs44P&Z5Ptxp|ovcj> zxj#L-K>9}KnFV?InOIBH_~llfrGrx>%$8IZ0ZIMuN82uB!6U(f{QLaA*rD$nVESZ z_5vwGuQ(P`&Cks!%(u_T%q)1a&0jI8v65V&({OI!8-;!a&!_8)B3|6eFG`MV}Hc_Xe-e+ySV)lGfWBRu5cUwbh z{pBl8@)&48gk~8p?_ptkd83<^{leoqSWN>NFYb^mFUCV0pJJ;Ef%u=I`(z z9>?!?NHC9Z#8$4Rb|R(aIhZBsQ8jNgQsdQBEmC$h<=aY<($!Q6Qd;TzNKIAq`nASW zS53Wu)I>F9b}~{^kkV?@H|6;wmb4KRA!VZ)>@6PWX*D^w;l-ZGrnENvh-Wfe&l^3h z>{A}+Wi|D1=EYvg=APJt9_A%p3FZQ%9#T^$no`biq?TuEN^M2zK~?VtQbW~LG*+3G zDrricX-c_Z570_SHKnSXQlB=ZLNK?3wXrm%>Y7sb)YNhBL`OXI2dZjIno<{=Qf}Ca zwbD^Wif!jbU9Bb;cYdU6GJBCXf;e~L@qWqdXg~mkfRysa0IU2V9`x9l&`y0< zi^<2E7k5i$3wT{Ot9$_aV}u$)^#K<7GqAx;EDn1XcAPQ?PGxh!DA*i03$Ug2=YdnB74@!`cilI03)zwywJc|T8!d>Kq@i+Ne(o>&{$bP&t+-7K;lj5Hm1KScIP zUf0tqoAB_bJSSD-b)B_V`*y}t06TD5D^6{t>dZ`xRr1DOR{3k>jaFNvu@1tE0vU|9 zz!vHZ=KEXZH^Im%EE+G1sk1+i4@@?X_DARZc-I7ZB~sKCI__$bFX&9`*a@!_DJV13 zf|N&tkx3G-hq>#(s7skMKZ|@tXQ&5rJ-TV8Ir8$TSk{df2PdEsO+FN{3#fDj!|V(?;iX}?__ySkNd~<4j63>FsYlx?AKG0!uVa!1bGBf+Kx~T zQ=bMSQxQL44Pe^n$tiL#EHI-5c@mg5A!OM)urQRFcsVrBf#JEJO}zXI7#^c#&Z>Wg z;9Y{OHW6csd1I)ReaGYaTIIMvtwpqfz6-##Hp=0TePEi88QT9DjQprhr#v)BtEWyp z1!Puy?Jp&vUwg}ep2{x zxad4mgV7kg`+!Au>mx~4)c}gwN5J$LN1Uw!Bd5U080L8}G692tLHEFD-DqngJVawK z0h%v?k!#@;u)l-RB&aTulS8#hz?7he`8v}yUj-vGu`2qa9$0hC$M!YcN%iu<$OWoH z%sati`Q4~QCnRW4h_hge+yPUFQPM18PLshj)B0if&;A2D2SziX8PhIYW7>c-z$i4q z{4K18*TvFK-56_?-O&}*f+^iBERz?-S>+?hBPYRSVHWu&7@EeKz8wb;2#|DsPOpntDd@_=IHhX+&B+1-3aF=8mEidxkeASmhtVA4Vm0 z2Qdw@@Z!W|dA~)HQjrOZ5g86>v?-=dNwGXWDOp~QO!BIv`u`l*Fs@@O%m#XyVv5t!x; ziobZgoN5b%wy-=ft#68)onTZCqemEguQN5m&C%$jzdGJBq%dUC!m>WXiw0QP&%ADc z)f8jpcLpTO1z0nrfPn8y!z>zTH94g4BLkD=Nof9Yb!620F);EZHpwuHxotnJJ$^hk zK^}=zbD%s$`Os3WmM?&5;fQH8x%TJ9gOlY)u!d;ha4np+6%00`e=_457>x#IfO!wl z+$E{5vFWUp`dE1mjOwX-HoMH@hFax#^hMK)2uI5cz-R*00n7WrG<{5md{bxY#w-U9 z(t5?~S)4_l0!B_(do`^aMDZ(MYEq%ujCDAk5%tFq(07USu{z zV=w__RxoNs)0_=Pt*A34AJ#Q7_^uZ74^+mJ{1fCpuutC@uoYy3!4i7JA%b59qxRIk z*yp@xxYg{BMKOYx3{NoUA~jr19cxO-*blV4(M_qXNGYn`O{9jYDT`vH%9>IQO{oqK z8KvWzQrntRH=0s0!zF2m+FnUh>b0hnGs0dg9f{N+RZldf{?wET7@?M?COWE#q=yro zkkHp%EFRGaQp84CTV5Y)F{gozRaZhSQly1x#rF3%Ui6sN6qL%3JeJI+^2W!kY&VY^ zMLS~AD68x}Qi}*|F8&tNqa*p9QORrrj~ks(>LRIsI^eEJXGkxA0eIOc>H}&?dZG-`u3rBvwf_H&!oRBc8|CZ$|3?EL*W0vyRoNQp2|B-D zl1I`3n!?Gt%z&ip*Rm&Nv;yo@ii@NPm`(&2Ne5m%#F4ViQ&3mQvPm$*2o@yNx%ov`BuNe414xk(`7b1?m~f*xHbW9`g&WOL zTOz-b)NVVbsQE}}72504Rj+WrBzJo2`6MaXS?458y!vs!Aj;^f=aQu1(3DYDH$9)E zBd^TV=JEkOw$fFjAh+b{rfVSIq8hU&BRNl zYJQpoMsx54B$d+jfAo0P{M1|P^S6AytAX0mW0|_5eC_e9_5U9|1*zg7y5aiGq~8Dk zsKE{x(0}*woRp#3^Uu#`%BPsH>*XYA?M&A>$=@H(zdxR(-yhGvKc0VoJgd9nzk6@^ z?>(Mf=yBZic;;183sX)ES#BQuN!Hlzt#0hO@=krp;=ni@Rhg%z>w6*_b2Whr5}} zb{lKO({XRj>u`7CZqsZ`=62lM@Os>xx%YG%Ys+Wi?!p^zZ^r|&ZLB>n!rhf$#@&qv z<=FU@92Z`hqp*(rYL1P$^Y9rq)`>5{-GhIOyC;v!wein$U3hh_!o2x)u(f$Ee0ZM1 ze0Wu!jrsE1xOd^hX4?2qGhKMiOojRJ#+f$e&r|blEP(IAy&Go*HrAb|;ogJq!@Vb$ z3vH|yPsjZMUWa=icYD&tg18;`U|x@VZ|*(I#`^G?xQFlt+(UW5Y#Zy#i*OI)mvIm0 zL33;@f|uhS$*<0_@roiBo>Zi;XuhP##$$_J_zf@%k1K{HVAaJ6i{sb9*3N|`a}}1r ztLEDH(0MNWE?5#DHV?Ld)yz|vl{bRzEOFuEOBB|R?<%qJG4oxx%Y20m;A!)1+^N)s z9|s%6upzt-EUV0gcPUfYFm5liaj$Y0ei2OJ-sLua7ObRPVZ(U?*qj9} zJY<2wM)0BqHXgXpg?|W^%7YeSmcS|(D(q2y6|7|3O;QGCfF*tZz=f!TQ6V%P^( zy;xys{5sg$r(oYx3LD3(o`QW(!#=R_eAv^l53J^Cg-zs*U^^>dUxmWb`K}7sw*>Yr zQCJ2~TLSx@fqh_^Tz&@jf#p1-;0sb6Sk_Y5w^U(vZeI%fmcc%->D+r6>;o%Vrm!5| z05+!*_Ejn@mlsvSzU8nFY$gv{4*S3=mn*D*Uj?gJ0sB@c>`A_41?*c1`@m-NxRtOE zta_!wiuiS~wX0y?DuvDERjXj%v#<}Wgb#Zb_JP$rtI)5AU^_YN;|eS1yEyDy4f|Fr zY#~ou4f~#hePD~Z{2c59%Xv=0ub6dUS!-b58ig(4_BF6iz&^00+*`mtuo9u*N4y5G zIcs6xT7|9PMQdSS73>3B#e=F~A6R9T!Z^PQRB}$Hp0H=Vc+u# z+sf0Phkcu1AJ}#-Z-RYbIhz#x2wMl1wHfwpR@g3X-wgYQ%6c?XYjV!VdB!+hN}eun+7Ik9z_3 zfmOetusVJnZ0!!%w?kn^dDRZsw-fe(9p}S#!alH?oeDd_8^Lz&f_=LbR?m0sf_*Q- zz84jCny0-8`*y=VurplV4g0`yb}Q@+UI&)72lnkzSOd53fqgZw59~bmu7Q1EB{d4W z$Q!`syafARQrIP4^b+jb3;Vz>^Ps)353F*p!rtLm!75&ceJ?BQUB2XH*tZY%fxXA$ z_Q5`|>U|1+3Azrpc0cUfudokz)qdDl3;V!62B-vNbv%y%7t zeFtIRK?T3FrX7TRufRUAPr3XG>;ubrMZxFXITK*5zl{5B9&`lO9f5U66!tB@dc=lbK*NvP@cZKu+<)L7 zK}L%_K2C^2BJy z_^uN+h95Xj+UPNcJ04@Wo4H)CKE`lw&Fku6$|;y~N}&hYDVTB^rhqwf@6#{^tmL#p z4>Pbiufdep6ndP!hN(J(sRDE3L1$nLSmhb@fp#W?pLBEQx7-|g%Ii+%ZFoLA^YC}9 zYR*dJ9LFQOeQo$VK8)v`okQ6H#=p>fuZ{MPUFh#9pP=aaQtWeO9ZSDLlCtR=lkq9zhMq^?=Zx=mH}yPvr@}8f z>h(45RG0o{9j2#mLDHZ3^t~Ftf~r?*D3P8MK;JlME}W17y;5~N^l5}HKV1>tmpbWr z{(2tftmzw`@iB}(lU@R7p!7kEs!#`)0lK>DdF_zisOI3`3+s99!PDqNFO9S(GHFz< zfFsh>8GTE`fAnvvq)dP+1nPP8=l@iIF8W+YdL04d(|53*=Z-w%^EZ8UBfU<*Y=Evl z^r4+HJ%Bj?H4>udksFHvx!A?f=leM!BCG|gZx1Z^Chk=_gWGVl^$0(SsrB7F|n0h|Sn z1N#ANj-~4F6dYtVunedKmIG4&3fnAT0x%Jn1S|sR-{SiM`;ewj*gGLt0ENJlKspeI zeEJ?v3z`49uyN*rNT#XD z$&{RcQXha;;oIPOz!reMjnOKk&+_?@(||d^5}+KY02Tuj^d&l{JSzK~q662Wt`y_F zhSJb!r=-91Xx}4#3zB^K0OIx|4^M7S60C{nzbW4n8f;GTM! zhZyd~tS%H9v<@juB1M%K8&^uvLs7IApeUlqp$Q@<7XV}}%{=vD2P{Agpzq`NXaqE> zDFAsS1+W6~KpYSYBmv1l0+6WlhaexJ$QX{qgFt_vA3!c20MO(N0R{nsfuR6ZQh!Kqx2@g7uW)92etuQ0h%tFf{Vb1zztvxa1J1=zXCo5J_F7I)X^#6ufSh` zmw-=z52%RduyaT)iybYWL$aM#R-9Rm{AE4b~5AY(e3)l%z`3`^v zMV-|EdjX1>mw|)83E(&l?G+>r0mpzNKpjAp$kM|A&>l&9C2d%a0BzdhD_=HrB>j>|J7*g}1}KDSv!oviY3FrB zZIT^;`*rC_P5MzNr@Dvf;fX7BfOw`0>*au?*xQB8bxKb14-X9w4HFSv*(+Xyk>3IF zZTz!t6PrM#5#pB~tn(T_Hkn-$8GbB)eIpk8v2tH}5VVE9oSXRi7pd!xn%Mm4(1@7O zD2bUwH-8r4Yn+lg>h95D{EDgtjw~uPJT^2O??NV#=g+*NjdNA0>u%mm*>w4rW)0)y zRjb0cr)0nRt*NXT31Y&-(Rr z`hLK`*P8XZJBXA33}OI0(gPaLmiwm7pY?rUvxaeg?6HqdY<~K?^3KgAi$x_G_c0Ef zbtxXfKArb&S+l}XQ4a<7p7p{6a!c-gn619hT*BIl(C*AD#yDQ) zb>~j_$a687%^Jo5v{TZ}A-%4fQkqMW+KPFo>th^Q3+@mY$(H_QbF)UK*xQ|XM;m9{ zc*0lRr&X@2XjU*z!JYk~%x~VdQM|civ$%_@KE|=Rk;!lM_U+=fqgmmC2=Ac=g9718 z=k7c()!CD_-23CIx_-@i?Oa4z59Vb`au@4*us3~-<543TPaizyIC&jrDk?OLVt0aA z*po$eG)`Il^{XSDPx{}zY+}m(|Au?RL<97VRTaggqd)Qq{0EDfBGj?rys93I{tV%YeKH~S=FL1b6bB2iE8G#6Bba;Za4c`h z>?PU-Vs?yUcQ3a6<1hCHyc?xPW*9~08=_wzJovU47l=qs^A;-u*+Zs2oyA9iEV%nO zoz<6@rek)uUMtDl@ZN=Ys6GM@4V)6agP6C;(^t&5FfWAZM9>k&DZ8CTzvM=4^1)1y znb?dZ<6Pa{Z1!?qw}J1YrD)oR@Iui={ez+Gr#o}APK{Z;-Nc3>#}lUKit^qtc{yl1 z&?#xBPi<_!$!uap$mxWf4dQ3?>RSiu0vg`t_Lsqf-5pJAJ902nWl~oW77RbM&^tCQ z@rNDb*K=0lgMIo|A|*cS{3T@mLYe2rs&lV|owN^x!&_dtQT25(L)z;x zBfogC@hj+Qb0Mu0m!J@B933pWeelW$)?-?bMTSO1Nyg#AMz_rAzeHWELe=n4cpyw# zB)arL+y6=-N}3^3p}-D^>3vuvs~6ArL6qBiiJN`c0bk?zV7PT#&v^^oI-_0mM`4}# zfOs_oo;ObVEuFG@-?8@_u0ewwhk3swZik@ZYr;7c4Syl55I)BF!%>sIx_4(~`VXpl zB>8!Vs0_tK7zYs@$3#4KvDJ%bprCux8YE7IvIrmJ1Y(=&kKbR^`Y(}c6SOF#y`p_z zHk_Rh%AU)_}m3`1gh!+kq!m6T+Ai0UOX4Z*24qg;b60b z5{~W`iPCU*I4n@S84mv$rx~X#du6o!PaAJGx3x%Md+{bk-w5m*Vt0I>`?1ZkS5~M-V+)j|+hS%EMv)P#ae_l3U{A7}lHpNw`?>yiupaJt2(V z6^t{c$rY(v8YgDYLmhn$o$^t?OqHdbxM|+-(Tza0weV02_D^kN_A$<(HdM}Q=V^L( zjtMUgYIk^E?6a^-=%_sQzIfRt4#u*nCMi*LjbrPuR_fvqlzMQS7p>#LJ{Jk`FvU2C z`qfX9Kb+Xni5@QWFvWDt7P(NsE9DYWpC*nIQ@!jVZp5=rtVTE_z({poJYGHQ$z`DU0foNC(b1Fb{`lV{*3t`McwA_B1YQX5qO}%Z z2sMROB0L%0Z50Yci!X$l-dRx!g%)23HHDU52(=Qt5WboGFJ1^Wjh0>rwGzA#`dYDJ zwe&)$X=I8K)_?Irs42AcLa3E&7S*WQ;tQdsa6w##0{%Mq5~9TyLQM}Zg#Ibm7O#o8 z6olq2k&}Ypmmd))`yuKkh@+rvi+C4>rdTH0^kWbGqpq)UUiQ-~LDR1#uIZ=-koqEH zw4ycIX3hZZ546~U^f}&L1ovls#F+jp3{kzLKl2OtxTE$?f9_k6G{3&+>+is2Y*0U1 zR=yDQf_LOy(Ezphzv4v(Z~tWh!Ut>O)rXqdW1r9ve|!3&^;seoyUmlVgG z$h+qKE|rp+w@krHS-(oV=&>@zE+?yV)XaRp&$@t}A9Z^BocI0ftWV$d%y;IQS@S&e z%*@&gr!RLqf3?GTNq9)B{pLG4ok}7etJvDEYyR@*w;c3%`;h1D1(kP_AOEmTy8DMO zGKntFGKYUr@1`12bEL+SR6I4iAd6Tq&=CA%$VQM)Q~ z$j^Xm1=&^S?IA7TGGuGWYbHr*3waVU0CF=V<e3mK6tAO) zlz#@29<&G27jgrnJLIH^`30l$^QG@(jeiO0hWrVc6DDSlmZYrwkyEmBMoW8Og*WnF zhHL>j8?uNJ6ZDLsx=e;_j{FEnFGycqy6EypxP|hs=<ijw+`N;{=19D8x#F5m>emH~7-wfH5>iKB(v-vucchNntCdfUH zu~faBSui$id{*H~Xwy@6wbc5Mn>`_WQf7WWM?UpwG=_%iRYFoY&DZk>V(928Jt3(* z8)Ap_Js`=p8eLvSIs9ByJPl)yy2TSA>4EE^Kynmffg0EcNfrN1L&Y}(w2u61JNI-a zehEp>85g95RT`v48YC45x6uj`kpb5g`#_N%*c8!Baug(uPC;I5XY%KyDH94uO&pyy z1o_lKiV;{+{4HXD!r*Q@?&j3G=rSf2+3^7+IbdOXt-1FgiI2?79F>(XNhuw){Kb&u z$g7Z4o|~07CM#N!N}x~eWao#eBYHMO)1Qoda`aP~`2~~Ft-P!$`4kRmp=vjRiko%R z`t>*@jZ~&yVRUw$+EOU5a`KN#3e$QW2}#zpg(PcybZJCOV{r0@nOn@(wBSgsvp%}~ z36f$ib3$QmX2GbjqcTU4tE4M%2pRQrl-7D)c5YtgQ<7xF=1~|(#&N#R>|fNilO(kR z3xcE-!AX}(W3@ikK+=j3XVq4Se~rc9IIWNGLXVcFN=O>=jgVfDvmnW3E8{i2f9Z`$ z>czV`M-;73&^q;0PG&(t)@UhDua=AXNdr7CKddk_H%H3M8#5_0FFz|2`Q$Pq5auUo z##(IJNRqLcIr-8uooD4w%FDvq+sLk!XO5bfHv!Jd95ZLc!^Rlr zB1y-QK?=KdSqVuEjn2xN6qcPYne~dl#PFA0TFb+FY3&c-$6UNE1A1#LD}VBoth_>L z)Y#0N@njgk<>Fmb-|GDTad9pxu4i@rQ;Ty}PGNp_rX+12ppEWQND8u16DN+(#%Pv+ z(-gqErut4w(|m^MuR6bn%DJ=LtZDc_&EF?tKW2@5I`626lrOrnG@jDT&ernMW_Ht8 z60dG%liOqKLrfQ&cv>?no4`{%?dIhTBxyKL^-M6|LMmNN^>s4xHX`+yns*1O3^g^_ zEJ@jF>P@6F)s(-pQ92!|F>2lyNM)(1J}#2;pwYSZ5Pv&kuHkQOweC%v4FZHpT-ge{FJ~s0y1Z;nP&L_d#9Lr8WHI-SL z+F6^rtEN)@5}gp~eO0x^NcB-uAJwM(F+a4tF}11PwW-^+DLdxBrZ=}X^$t?Kw60(P zwbbz1)VkW#b)@ zWS5Jud1Fy2cH$oXR?}f0UX7x*TyA5Rhafhi)zI+>vdZ(odeyQ|z_2tFH{ev}hoy#s ztN~98w6eiGrLEnxza^jF)+Ya;7rAhcW>z^AYi}1Q%Dl|mD(8b~O{V!+`$9lYlfE@Upg6Hj>N1cKH=O&sEhJi(Q^H&@#HP6AX)Ju`{Q(e$<(n7>nR3?d@_I z_Jbj6i==-YjO<2RVD0dRpnDaCiR%IXXal6dvqv zl_%&-d+;`}uBd8O&AkIgCP~}_ok|SU9wPHJ)SIR=)C;nj4}f*yseuWm9|C!GsEy6x za!0%Aqafb5qfJg~gTO^Q@HU({7mVD-sHW*$8}9aqO>Wut;Z98f>jxE#Y8$Kh7+54v z^+}L#A*HPb7XqwuNIT6=C!Pj29!&2#JhC0EztMth3N}WAI*YN%d@~ep$YtK_6>})wNjj+p~;8lV~4{hMt&GE*fwQ&K@ehiGp(uKlF zUIo?zOdU)4CRpvLQ=mrK<)L_=p{Hn0Uk--1EZ9zMeF#QvVJs1v&A1$GXQO!v?t+&_ zV={#D>S&v}bw?P_-Ql8Kq(LP^F9gd0DhoP6ngCpiZ=vJoP|KkqqsJIgw$Y4 zu}64HqFpXW0Q5&Cb@eb^@64+cZSo+z^Q9pZA&2qW2u6NW=alJ+mCsMI$&q*xxmHpK zAqOl4437@BnqLDOpoWN3yd*uU<)fiNV12-}1z-~xwSZ0@7u#BH^-DA1x(op>Sq;-HrlycPn&E)^CQ%r(bFFX zBTr(1jIx?{f%W3<@d@%zNYw>Ohi2t6PdS*%*Ed9?R#YeeCk5$fMy!gkvtW#gl0S)DFvo zz~D^^7j$fi&eU~TJ_|-qW&A?CRd(*FIbD6KDX}NTuRIx2iX*;O!VMbot4MVjiVBP9<5(@cl&%`Z_IUlfoapGQht z7-%VI3~SMMxRN4xAQ-i$K8wxgRjGFKapVo)=TZ~Q9+)1fYU*({wdk=#r`oJLwW*%i zycAV!2~sI)>QZegNHOxp*QQ>rP2I0e_3CHnEv`*nM5?#io_{K(Qqci5LA~2qoB9c< zo~l|rCW4llS(`dhn{pkXm&&O~=@SpbcK{4=P~3=n#9Phxz=o+aVK9Q1v@omzR#wWZ z2H8z#9_4O>ZA|7VgY7Jpm*PH`R}Hqy2eJ6ja3fj*tfqgY@xDWB%*IQHBosB0)SqAU zmQO!7B?kBabUiGafrqNO|4~__Hu+Z?Zh)F$dWY(hWDJ5yy?$S6&;JvO|E{6GSgt?+ z4?94vAELFdN*AOvbpEg;k30#`5N7IfBqUw+NoV92ja7>!iKGb_r*o3jU@kxnP5|g4 zNqiy^Tn|gCKUvNFtt95`I-aU92ZFknSNiu#v!&~)PJTbJ@1vhFtQRhjNiHjsH`-61;AC%gd z{RbOVJ6vCTG{e(%1L~w6Gj$o7Cx$_hJTU^2>SXBhaa}$ENu!dbb0q0wtBU`t9bbR` z|BF`u>eIh%w3l08Zd;fdKhyNem>gj>R z$s0=_0AFSr2dZR6v{>m`nuM!;%{Koh|<_ z?f7c^%~BtuZT<9s=Kzwl)zd|iEcoLszy6(`>eHJ5)qjBqE|O#*c6u%T|07#I8mw=p zN4AvgK2K+WM)zodC773!sZ6<^Qi8pSA_*zgs?e z0T;Q9u7@Ng|GVX@!SmlOAF)v1)=nW)Uw-{tzMa&6`z}C1_NTXees@d(KRH(62gbT{ zzifpy;Mv&@=ERTTj=zMDb1-K<8Fzdk#od($j(6bOK_TvL`~vO`c}R|fHR2_>yYox9 zH|9~f4%UR1;@*^B#odF)PjK+K3GRH=1ciC>nh6f(#k);(u;#oR_ZIvX?%q6Ql7sh| z5F||=gtr3Da@bC`3@GqGjR{(mAD6S zj{*m4!?SU3%a7sSj$5WUSTLWAdwX7mdj}pk)qy`z72+PsFW}yhhdk}T=h+h6!}ulK z!+F#*4*YSZ6!%De7569}U+7@bybSjkUQ_7cHwxW(>NJIQ;^osE{H1B`{63hKr#uTw zo`ogPDlDGg1-k>5FGiWT}J9N1~Fuo8u(@WK)Yf2PEpe+H)T zkXa7iVU{~zG)uwvo=adC!IGX+@IAcrIR~HjoIC#(ERDy{#wg9kD9u*bW4s3J23YDG z1%ERup9A~mz&}j4-2K&ljUzx%R`CYI(U>QpleBs}^6!tBJeOzHPcshrDNFN3( z;_@=sw+!|zQ&*!L3bdr4u7_$9E5U`gc)Tg*$#VP84y11sb4FT=i<(DQDuu1!TUWupRj_Zhg3spZt6|@2*axz!%^YpE-Z!7Er zJHq8{ux}ge+orH%yb|mHnBR7Vo#5HqVc&Mx2X>NMcEG+Juy2RLPVp+R(_mpc6;{Oy zcf!7%un+7E57`C#cEP?~3OmOyfn5Yk+O4p6cUEuM1VBa3tw?|>`^BS-l zV5xf*_5m;73;XuMKCq8?%0Ae)5BBX-*d=}!><(B)g~C4JTPt8+1?+oOVW08zS7G0) zun+7CmtTW@ufe|86n2$Yf*k>9W1hkg5D-+qN%=T%^*!NLwG ztcDjJfPDvGAK2GCq!RX3!oEs{eakO_T?9*dLt%gCrEfUc_xvjEw|M+PSa%TC9aPwl zyyl>T{lvQ+a$JA57Pclll1f97d#I@mqF7585_JK|vXc{=XD z^1VkKrUw#tJ353(JoBi7Fu!=L}EUj{FQRt@qQzm+IHvS>!LAQ$5RlhQ9S$=2U!NUFR znc=dVMf=w^UKMF)BV3SK^8VB2H}F?RceLh9t?^bL+%ccWA9fn^z^eVokd1#|m0j`K zhwqq7Xm;17EUVj+rrNvb(PpB*2eaYl0j^&kXcwh#0;ED;is&*p^^t!0)Acn#I`o^z z__FYgo<}eI#<#X_^*nk{=>^dB9YA&I=Sm;?c|_#z0R0Ge0UibDaz+7_xdQa*fjZ!V z4AP^{(fbTtt#v*4Qu5OC{PjGw!+KtTo=0t+2dJZgMS7+?$h!btL3&#O`XW6Yphk=r^zWOIHonq&i{qYbU`u*89SmUBNe+Nwg4#%8BHW7& zGrcE?`Ce?f2gOAXpeI1_kRqa*vo=M8kR(UZh|;G-ayB`aJ_S-Yx&R5lVd(fl(ul_c zu|Ow)K2UB$8GX>a4@v$Ze~_*8*MM=rcpwL$0ir1J2DXFmP`^`v`~q?gcn6?Js0Q8y zjsVAi6Tn-*KA-}46?h%k4;%n0fj5A1UsW0Qs<`=+=VSi}^kDBx~`2jEi$OSCO+XcBB*a^^2NfR&$=_+6oa0)mC z>;*OeW%OBMDH0r504xL+0iytl&@n&;@Hp@U@El-8qx40157P97c{Ah-00nOW@FWn2 zd^12(m!_&4um$-P%u|79fI?uJ)L#2+T?#TEC;|Eajey~(bRP035GFQyv*G5UwaLfn zLuE^VCfo&NPXyKhBLSLl^woSaWH#_LfEbsGfw{nJ;CW!W&MB`LxIw{ytJsgiIj9G9 zotDW)0L>rbe}^PrZUoi>u|N!fBKp_4YPi!f=LAq45oKZCZVwtD*@CjgU<>X$AP1mfr?4k)kY^~cD8QxzG}Kc7iqz+UVqgX^-4!=AG{t0Z=2 zK>D+RIRJ$d2Wa+`0pzG9z+!-A1I;3uP2{*`z;a+EK#@<+dl{f-lmjmTPQcT^3IKUU zDp>t}&afp35pfhvFu{TlcZxCWd8=t0MTFM!X1oxoM#V}M5C z5@(hfIGTiT8kafD6D8pctqCwgLNqy#TEP+kvgX7GN_#<(mNN6g_MQunV9t z*$uo3ya^lvUIShS4g!_H0bnP&o-ExDP{A8I4nvaFhP7lJS$qN@3+Z7;f#bkQ;4R>7 z;52Xss0OHWlztZ=Sl>MmYr&4R^&)@+(~#?o%zQ*p02|3l#GC*Y$d-xC0j$LIu_PYp z&U{Q8nCKD6qC4+FONg}MgI+C~Wh^eA<-}scqvFD&;3<@}Ldk0nzWKqeWp%r{lIu+D z2xLCK#tA0pg7-$}od3~Or_oki31pT|$tLY+Q{4B@4rttT?u9x9<51Ls#rIe4Z{u;Z zu4J|d3u2be#xbg|y0vrtM{J*HFR@U}AKOzIm6KU>RMNFrJ-1j zy1qX))XrywHtP_>7JRX;PQyi9pkDZjuOWPm17@fHS?oXkmBGADFIo7vgC~p=X#;KV zc5LP6vAIscIJ4&N-J@ppuBoT$N@j~(RPAh>a?5#g{eut7D!bJw7$+?*tZVA`R)?Aa zbtP9k#i4fWyy;s{u@KXPNn&F#i}5rLK7IMk8!e9p+`nLAjl|c%m@+}ay*2WByS+Fb1F zfYR&D#gim|YA&XAV4-eFE!4N2Vy!%y%@7d^#ED+qO0O@Ye^)AH?dB7>PF@AN8k5@uM?y4t=_}Vh2jBh*6Xjh#4U)#&pj| z>Wi8crlG-zG-@i3u(#ns{zE3S8zEE+rNDYN4mA+za930nElz_5< zVg*De{YYE!{T#MuQc#bNQLPi!qG-GtVUjh)JiGJrvty4BUAEE0Zem5j?p*94CO!h! zMna)6=;-tl$5%I5Yc?@ot`3FDDc0jT{U}?RmNW5y#hC zs!w>~sj-@oms^RPj_9R_xY~)eFvIK`Ke4tp$Kq=omHTG&u)&`t9_@juaaz@B;#+#^ zD1Ys#BPX{_>e{eL92DRMn&if*ybD9Oy_@pWnrkLDlh)l>Y+8OI@exdpdcC36)onIT z0uEy{4E^TXSUZye8K2tdk=kFbp;yaImE(b z@t`wn;e;KkM1)14?S&!}!q+$#IQYqL9{ju{^Pg&s81i+B*d2kHU>p#18X7(5ob$Gm zQ0S!j>uLw_Wdw`%HI4?lt-AX0Y?m)$)Fx=ENNFM}5;HMV6h*=j#>v3BE+MN^H|*OC zJsLc8-#9Cn;NR!*X-oSmYDqM8J5-#DL}$%{MZx8bg)fBf?@Tb*Rv-pIp_6fn@QiCs zaNqvbXVo65Gq7!#HtVN4qWu<4?_Z4)3S0CwQmmp!B#He|Y=v(L9EoMNc=huKlegS& zya|D(Pbwdg8;u?Xi*?Z$n)lj^@1wDv8z&J*FMNGS_NS}At9znxW^uZ6#)_nc8z-P7 z22aGaKwVo5R-~V!#LyV_6qcSNF_`5?qs5!?7|VK>pw86Q(fi$F*?8vr3oMq=$+yxY zZ@VpgeKDG*&_mN)q{T5WXLS`Yi=Ca|G~?7=z|_8b^1S*Ug>qzgJiW506V<{j;vVV! zj&m^5T9Y#iFWzN@gz@%iN2>LX&w%#E?4zZFAe9M9Zm@mG#{wsC<5{6tU_gYTY35jukyO&XNvk+2?})t%@qOaW%e{ ziUx74BbzBA;;^@=17u-wVtgFtm~pB({m6Ibm6yNmpw@~Cx8lWITk?I4Gp1FGrZ@IB zeejG4Zysv1(n|4u9D7G=(91=gc6tP{UgD#8Hpa9*L3HiHUdA;1qzfYV;n>I(AqmKu zFESF)dE+GMH+M#U_JpS^ZE5E zKYD3i@UOPv^JFv=Nh=~=((Ph;B6jF#u{#mZGY*W7Pg*u_^825jf?_9aBof3IiP$K2 zCkvk>7Hz6b7X6YiQ;sK#r@%WKCr6*D{D;-tC1pD*(lXZ>KXUMM$_-PZqN-?V_c1%# zp@aq(&qm$z)CStTfi}=>UmJ`VCOX+zs;_Z+G-9*u=vQ+_&QSY|H%t6f7ptJ>yB_-< zEeOBa?^FdzT(Dswt?#`vDn2|a8t0Qfd#6^?Q{2MZ&hPEN^WLdx)c4-0mEgV8))m2D-+QN~;UXq?MK65C0*HFwJ2kyzaRdtW zzISR0^}Tm$C9{QlH&m_fy;D=D@4ZtiDG|fFA!?V3#c08_eSp~34dX9~&p=rZaT|sD z@3MaQwf2y1E91oMwTmI+uOu$_RHI2lXWQJ^}PPS+;B^r5@!pD5^G!I$LE+HtoQy# JJ=w*9{|DUKIPCxc diff --git a/package.json b/package.json index 86069b3..f23d661 100644 --- a/package.json +++ b/package.json @@ -76,6 +76,9 @@ "typescript": "^5.4.5", "vite": "^5.3.3" }, + "dependencies": { + "eventsource-parser": "^1.1.2" + }, "prettier": { "semi": false, "tabWidth": 4, diff --git a/src/treaty2/index.ts b/src/treaty2/index.ts index 68278c5..9b4acc2 100644 --- a/src/treaty2/index.ts +++ b/src/treaty2/index.ts @@ -2,6 +2,8 @@ /* eslint-disable no-case-declarations */ /* eslint-disable prefer-const */ import type { Elysia } from 'elysia' +import { EventSourceParserStream } from 'eventsource-parser/stream' + import type { Treaty } from './types' import { EdenFetchError } from '../errors' @@ -118,65 +120,54 @@ interface SSEEvent { id?: string; } -export async function* streamSSEResponse(response: Response): AsyncGenerator { - const body = response.body; - if (!body) return; - - const reader = body.getReader(); - const decoder = new TextDecoder(); - let buffer = ''; - - try { - while (true) { - const { done, value } = await reader.read(); - if (done) break; - - buffer += decoder.decode(value, { stream: true }); - let eventEnd: number; - while ((eventEnd = buffer.indexOf('\n\n')) >= 0) { - const eventData = buffer.slice(0, eventEnd); - buffer = buffer.slice(eventEnd + 2); - - const event = parseEvent(eventData); - if (event?.event === 'error') { - throw new EdenFetchError(500, event.data); - } - if (event) { - yield event.data; +export class TextDecoderStream extends TransformStream { + constructor() { + const decoder = new TextDecoder('utf-8', { + fatal: true, + ignoreBOM: true + }) + super({ + transform( + chunk: Uint8Array, + controller: TransformStreamDefaultController + ) { + const decoded = decoder.decode(chunk, { stream: true }) + if (decoded.length > 0) { + controller.enqueue(decoded) + } + }, + flush(controller: TransformStreamDefaultController) { + const output = decoder.decode() + if (output.length > 0) { + controller.enqueue(output) } } - } - - if (buffer.trim()) { - const event = parseEvent(buffer); - if (event) { - yield event; - } - } - } finally { - reader.releaseLock(); + }) } } -function parseEvent(eventData: string): SSEEvent | null { - let event: string = 'message'; - let data: string = ''; - let id: string | undefined; - - const lines = eventData.split('\n'); - for (const line of lines) { - if (line.startsWith('event:')) { - event = line.slice(6).trim(); - } else if (line.startsWith('data:')) { - data += line.slice(5).trim(); - } else if (line.startsWith('id:')) { - id = line.slice(3).trim(); +export async function* streamSSEResponse( + response: Response +): AsyncGenerator { + const body = response.body + if (!body) return + + const eventStream = response.body + .pipeThrough(new TextDecoderStream()) + .pipeThrough(new EventSourceParserStream()) + + let reader = eventStream.getReader() + while (true) { + const { done, value: event } = await reader.read() + if (done) break + if (event?.event === 'error') { + throw new EdenFetchError(500, event.data) + } + if (event) { + yield tryParsingJson(event.data) } } - if (!event || !data) return null; - - return { event, data: tryParsingJson(data), id }; } From 98e40db329b5e2362e144051b320ccde3a346b44 Mon Sep 17 00:00:00 2001 From: remorses Date: Sun, 21 Jul 2024 14:39:23 +0200 Subject: [PATCH 06/10] rename to streamResponse --- src/treaty2/index.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/treaty2/index.ts b/src/treaty2/index.ts index 9b4acc2..df97fdc 100644 --- a/src/treaty2/index.ts +++ b/src/treaty2/index.ts @@ -147,7 +147,7 @@ export class TextDecoderStream extends TransformStream { } } -export async function* streamSSEResponse( +export async function* streamResponse( response: Response ): AsyncGenerator { const body = response.body @@ -452,7 +452,7 @@ const createProxy = ( response.headers.get('Content-Type')?.split(';')[0] ) { case 'text/event-stream': - data = streamSSEResponse(response) + data = streamResponse(response) break case 'application/json': From 6a7aa3aa005d468f896a98e08a3fc1daf14da479 Mon Sep 17 00:00:00 2001 From: remorses Date: Sun, 21 Jul 2024 14:40:13 +0200 Subject: [PATCH 07/10] releaseLock --- src/treaty2/index.ts | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/src/treaty2/index.ts b/src/treaty2/index.ts index df97fdc..789fd8d 100644 --- a/src/treaty2/index.ts +++ b/src/treaty2/index.ts @@ -158,15 +158,19 @@ export async function* streamResponse( .pipeThrough(new EventSourceParserStream()) let reader = eventStream.getReader() - while (true) { - const { done, value: event } = await reader.read() - if (done) break - if (event?.event === 'error') { - throw new EdenFetchError(500, event.data) - } - if (event) { - yield tryParsingJson(event.data) + try { + while (true) { + const { done, value: event } = await reader.read() + if (done) break + if (event?.event === 'error') { + throw new EdenFetchError(500, event.data) + } + if (event) { + yield tryParsingJson(event.data) + } } + } finally { + reader.releaseLock() } } From 2007092049c78d69b3577b13ee264dd6c094a619 Mon Sep 17 00:00:00 2001 From: remorses Date: Sun, 21 Jul 2024 14:40:29 +0200 Subject: [PATCH 08/10] remove export --- src/treaty2/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/treaty2/index.ts b/src/treaty2/index.ts index 789fd8d..9932d0a 100644 --- a/src/treaty2/index.ts +++ b/src/treaty2/index.ts @@ -121,7 +121,7 @@ interface SSEEvent { } -export class TextDecoderStream extends TransformStream { +class TextDecoderStream extends TransformStream { constructor() { const decoder = new TextDecoder('utf-8', { fatal: true, From 21d9fc44db954beeccc1dadf4a6be995dd2dc87d Mon Sep 17 00:00:00 2001 From: remorses Date: Tue, 30 Jul 2024 15:14:24 +0200 Subject: [PATCH 09/10] remove fatal: true --- src/treaty2/index.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/treaty2/index.ts b/src/treaty2/index.ts index 9932d0a..35c3c23 100644 --- a/src/treaty2/index.ts +++ b/src/treaty2/index.ts @@ -124,7 +124,6 @@ interface SSEEvent { class TextDecoderStream extends TransformStream { constructor() { const decoder = new TextDecoder('utf-8', { - fatal: true, ignoreBOM: true }) super({ From 0eb29c88c3f27f863622caadfa36bd1097f7c504 Mon Sep 17 00:00:00 2001 From: remorses Date: Tue, 30 Jul 2024 15:16:13 +0200 Subject: [PATCH 10/10] remove SSEEvent --- src/treaty2/index.ts | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/treaty2/index.ts b/src/treaty2/index.ts index 35c3c23..7af212c 100644 --- a/src/treaty2/index.ts +++ b/src/treaty2/index.ts @@ -114,13 +114,6 @@ const processHeaders = ( } } -interface SSEEvent { - event: string; - data: any; - id?: string; -} - - class TextDecoderStream extends TransformStream { constructor() { const decoder = new TextDecoder('utf-8', { @@ -148,7 +141,7 @@ class TextDecoderStream extends TransformStream { export async function* streamResponse( response: Response -): AsyncGenerator { +): AsyncGenerator { const body = response.body if (!body) return