@@ -14,6 +14,7 @@ const finishChannel = channel('apm:http:client:request:finish')
1414const endChannel = channel ( 'apm:http:client:request:end' )
1515const asyncStartChannel = channel ( 'apm:http:client:request:asyncStart' )
1616const errorChannel = channel ( 'apm:http:client:request:error' )
17+ const responseFinishChannel = channel ( 'apm:http:client:response:finish' )
1718
1819const names = [ 'http' , 'https' , 'node:http' , 'node:https' ]
1920
@@ -39,6 +40,112 @@ function normalizeCallback (inputOptions, callback, inputURL) {
3940 return typeof inputOptions === 'function' ? [ inputOptions , inputURL || { } ] : [ callback , inputOptions ]
4041}
4142
43+ /**
44+ * Wires the downstream response so we can observe when the customer consumes
45+ * the body and when the stream finishes
46+ *
47+ * @param {object } ctx - Instrumentation context
48+ * @param {import('http').IncomingMessage } res - The downstream response object.
49+ * @returns {{ finalizeIfNeeded: () => void }|null } Cleanup helper used for drain.
50+ */
51+ function setupResponseInstrumentation ( ctx , res ) {
52+ const shouldInstrumentFinish = responseFinishChannel . hasSubscribers
53+
54+ if ( ! shouldInstrumentFinish ) {
55+ return null
56+ }
57+
58+ let bodyConsumed = false
59+ let finishCalled = false
60+ let originalRead = null
61+ let dataListenerAdded = false
62+ let dataReadStarted = false
63+
64+ const { shouldCollectBody } = ctx
65+ const bodyChunks = shouldCollectBody ? [ ] : null
66+
67+ const collectChunk = chunk => {
68+ if ( ! shouldCollectBody || ! chunk ) return
69+
70+ if ( typeof chunk === 'string' ) {
71+ bodyChunks . push ( chunk )
72+ } else if ( Buffer . isBuffer ( chunk ) ) {
73+ bodyChunks . push ( chunk )
74+ } else {
75+ // Handle Uint8Array or other array-like types
76+ bodyChunks . push ( Buffer . from ( chunk ) )
77+ }
78+ }
79+
80+ // Listen for body consumption
81+ const onNewListener = ( eventName ) => {
82+ if ( eventName === 'data' || eventName === 'readable' ) {
83+ bodyConsumed = true
84+
85+ // For 'data' events, add our own listener to collect chunks
86+ if ( eventName === 'data' && ! dataListenerAdded && ! dataReadStarted ) {
87+ dataListenerAdded = true
88+ res . on ( 'data' , collectChunk )
89+ }
90+
91+ // For 'readable' events, wrap the read() method
92+ if ( eventName === 'readable' && ! originalRead && ! dataListenerAdded && typeof res . read === 'function' ) {
93+ originalRead = res . read
94+ res . read = function ( ) {
95+ const chunk = originalRead . apply ( this , arguments )
96+ if ( ! dataListenerAdded ) {
97+ dataReadStarted = true
98+ collectChunk ( chunk )
99+ }
100+ return chunk
101+ }
102+ }
103+ }
104+ }
105+
106+ res . on ( 'newListener' , onNewListener )
107+
108+ // Cleanup function to restore original behavior
109+ const cleanup = ( ) => {
110+ res . off ( 'newListener' , onNewListener )
111+ res . off ( 'data' , collectChunk )
112+
113+ if ( originalRead ) {
114+ res . read = originalRead
115+ originalRead = null
116+ }
117+ }
118+
119+ const notifyFinish = ( ) => {
120+ if ( finishCalled ) return
121+ finishCalled = true
122+
123+ // Combine collected chunks into a single body
124+ let body = null
125+ if ( bodyChunks ?. length ) {
126+ const firstChunk = bodyChunks [ 0 ]
127+ body = typeof firstChunk === 'string'
128+ ? bodyChunks . join ( '' )
129+ : Buffer . concat ( bodyChunks )
130+ }
131+
132+ responseFinishChannel . publish ( { ctx, res, body } )
133+ cleanup ( )
134+ }
135+
136+ res . once ( 'end' , notifyFinish )
137+ res . once ( 'close' , notifyFinish )
138+
139+ return {
140+ finalizeIfNeeded ( ) {
141+ if ( ! bodyConsumed ) {
142+ // Body not consumed, resume to complete the response
143+ notifyFinish ( )
144+ }
145+ } ,
146+ }
147+ }
148+
42149function patch ( http , methodName ) {
43150 shimmer . wrap ( http , methodName , instrumentRequest )
44151
@@ -103,7 +210,18 @@ function patch (http, methodName) {
103210 ctx . res = res
104211 res . once ( 'end' , finish )
105212 res . once ( errorMonitor , finish )
106- break
213+
214+ const instrumentation = setupResponseInstrumentation ( ctx , res )
215+
216+ if ( ! instrumentation ) {
217+ break
218+ }
219+
220+ const result = emit . apply ( this , arguments )
221+
222+ instrumentation . finalizeIfNeeded ( )
223+
224+ return result
107225 }
108226 case 'connect' :
109227 case 'upgrade' :
0 commit comments