@@ -6,7 +6,7 @@ Property BodyClass As %Dictionary.CacheClassname;
6
6
7
7
Parameter SETTINGS = " BodyClass:Basic" ;
8
8
9
- /// Establish gateway connectionand init java API
9
+ /// Establish gateway connection and init java API.
10
10
Method OnInit () As %Status
11
11
{
12
12
Set sc = $$$OK
@@ -18,28 +18,31 @@ Method OnInit() As %Status
18
18
Quit sc
19
19
}
20
20
21
- /// Close connection
21
+ /// Close connection.
22
22
Method OnTearDown () As %Status
23
23
{
24
24
Do ..API .close ()
25
25
Quit $$$OK
26
26
}
27
27
28
- /// default InboundAdapter behavior: always call ProcessInput on CallInterval
28
+ /// Get Messages from RabbitMQ queue.
29
29
Method OnTask () As %Status
30
30
{
31
31
Set sc = $$$OK
32
32
33
33
Set messageCount = 1
34
34
35
35
While messageCount > 0 {
36
+ // List containing metainformation and possibly body (in the case of string interaction) of the RabbitMQ message
36
37
#Dim messageList As %ListOfDataTypes
37
38
38
39
If ..BodyClass = " " {
39
40
Set messageList = ..API .readMessageString ()
40
41
} Else {
41
- Set tempStream = ..GetTempStream ()
42
- Set messageList = ..API .readMessageStream (.tempStream )
42
+ #Dim tempStream As %Library.GlobalBinaryStream
43
+ Set messageList = ##class (%ListOfDataTypes ).%New ()
44
+ For i =1 :1 :15 Do messageList .Insert (" " )
45
+ Set tempStream = ..API .readMessageStream (.messageList )
43
46
}
44
47
45
48
Set messageLength = messageList .GetAt (1 )
@@ -49,11 +52,13 @@ Method OnTask() As %Status
49
52
#Dim message As RabbitMQ.Message
50
53
Set message = ..ListToMessage (messageList )
51
54
If ..BodyClass = " " {
52
- Set message .Body = ..DecodeMessageBody (messageList .GetAt (16 ))
55
+ Set message .BodyString = ..DecodeMessageBody (messageList .GetAt (16 ))
53
56
} Else {
54
- Set message .Body = $classmethod (..BodyClass , " %New" )
55
- Do message .Body .Write (..DecodeMessageBody (tempStream .Read (messageLength )))
56
- Do message .Body .Rewind ()
57
+ Set message .BodyStream = $classmethod (..BodyClass , " %New" )
58
+ While 'tempStream .AtEnd {
59
+ Do message .BodyStream .Write (..DecodeMessageBody (tempStream .Read ($$$MaxStringLength)))
60
+ }
61
+ Do message .BodyStream .Rewind ()
57
62
}
58
63
Set sc = ..BusinessHost .ProcessInput (message )
59
64
} Else {
@@ -65,6 +70,7 @@ Method OnTask() As %Status
65
70
Quit sc
66
71
}
67
72
73
+ /// Convert list containing metainformation into RabbitMQ message
68
74
ClassMethod ListToMessage (list As %ListOfDataTypes ) As RabbitMQ .Message
69
75
{
70
76
Set message = ##class (RabbitMQ.Message ).%New ()
@@ -86,27 +92,12 @@ ClassMethod ListToMessage(list As %ListOfDataTypes) As RabbitMQ.Message
86
92
Quit message
87
93
}
88
94
95
+ /// Decode message body. May be full body or only a piece.
89
96
Method DecodeMessageBody (body As %String ) As %String
90
97
{
91
- If ..Encoding '= " " {
92
- If $isObject (body ) {
93
- // TODO streams
94
- } Else {
95
- Set body = $zcvt (body , " O" , ..Encoding )
96
- }
97
- }
98
+ Set :..Encoding '=" " body = $zcvt (body , " O" , ..Encoding )
98
99
Quit body
99
100
}
100
101
101
- ClassMethod GetTempStream () As %GlobalBinaryStream
102
- {
103
- Set stream =##class (%GlobalBinaryStream ).%New ()
104
- // TODO - work around that
105
- // we need to 'reserve' a number of bytes since we are passing the stream
106
- // by reference (Java's equivalent is byte[] ba = new byte[max];)
107
- For i =1 :1 :32000 Do stream .Write (" 0" )
108
- Quit stream
109
- }
110
-
111
102
}
112
103
0 commit comments