@@ -100,9 +100,13 @@ public void Disassemble(IPipelineContext pContext, IBaseMessage pInMsg)
100100 //If the request has a body it should be preserved an the query parameters should be written to it's own message part.
101101 if ( hasData )
102102 {
103+ string msgType = GetMessageType ( MakeMarkable ( pInMsg . BodyPart . Data ) ) ;
104+
103105 outMsg = pInMsg ;
104106 outMsg . BodyPart . Data = pInMsg . BodyPart . Data ;
107+
105108 outMsg . Context = PipelineUtil . CloneMessageContext ( pInMsg . Context ) ;
109+ outMsg . Context . Promote ( new ContextProperty ( SystemProperties . MessageType ) , msgType ) ;
106110 var factory = pContext . GetMessageFactory ( ) ;
107111 var queryPart = factory . CreateMessagePart ( ) ;
108112 queryPart . Data = ms ;
@@ -142,9 +146,60 @@ public IBaseMessage GetNext(IPipelineContext pContext)
142146 return null ;
143147 }
144148
149+ private MarkableForwardOnlyEventingReadStream MakeMarkable ( Stream stream )
150+ {
151+ MarkableForwardOnlyEventingReadStream eventingReadStream = null ;
152+
153+ if ( stream != null )
154+ {
155+ eventingReadStream = stream as MarkableForwardOnlyEventingReadStream ?? new MarkableForwardOnlyEventingReadStream ( stream ) ;
156+ }
157+
158+ return eventingReadStream ;
159+ }
160+
161+ private string GetMessageType ( MarkableForwardOnlyEventingReadStream stm )
162+ {
163+ string msgType = null ;
164+
165+ stm . MarkPosition ( ) ;
166+ try
167+ {
168+ XmlTextReader xmlTextReader = null ;
169+
170+ xmlTextReader = new XmlTextReader ( ( Stream ) stm ) ;
171+
172+ while ( msgType == null )
173+ {
174+ if ( xmlTextReader . Read ( ) )
175+ {
176+ if ( xmlTextReader . NodeType == XmlNodeType . Element && xmlTextReader . Depth == 0 )
177+ {
178+ if ( xmlTextReader . NamespaceURI == null || xmlTextReader . NamespaceURI . Length <= 0 )
179+ {
180+ msgType = xmlTextReader . LocalName ;
181+ }
182+ else
183+ {
184+ msgType = xmlTextReader . NamespaceURI + '#' + xmlTextReader . LocalName ;
185+ }
186+ }
187+ }
188+ else
189+ {
190+ break ;
191+ }
192+ }
193+ }
194+ finally
195+ {
196+ stm . ResetPosition ( ) ;
197+ }
198+ return msgType ;
199+ }
145200 private bool HasData ( Stream data )
146201 {
147- byte [ ] buffer = new byte [ 10 ] ;
202+ byte [ ] buffer = new byte [ 10 ] ;
148203 const int bufferSize = 0x280 ;
149204 const int thresholdSize = 0x100000 ;
150205
0 commit comments