Skip to content

Commit beba9ac

Browse files
committed
feat: support non jms message body for JSONRecordBuilder
Signed-off-by: Joel Hanson <[email protected]>
1 parent ef7f9bb commit beba9ac

File tree

3 files changed

+145
-10
lines changed

3 files changed

+145
-10
lines changed

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1497,4 +1497,49 @@ public void testPollWithShortMaxPollTime() throws Exception {
14971497

14981498
assertThat(records.size() < 100);
14991499
}
1500+
1501+
@Test
1502+
public void shouldSetJmsPropertiesWithJsonRecordBuilderWhenJMSIsDisabled() throws Exception {
1503+
connectTask = getSourceTaskWithEmptyKafkaOffset();
1504+
1505+
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
1506+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
1507+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "false");
1508+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, "true");
1509+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
1510+
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
1511+
1512+
connectTask.start(connectorConfigProps);
1513+
1514+
final TextMessage message = getJmsContext()
1515+
.createTextMessage("{ \"id\": 123, \"name\": \"test\", \"active\": true }");
1516+
message.setStringProperty("source", "system-a");
1517+
message.setIntProperty("version", 2);
1518+
message.setLongProperty("timestamp", 1234567890L);
1519+
1520+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, Arrays.asList(message));
1521+
1522+
final List<SourceRecord> processedRecords = connectTask.poll();
1523+
SourceRecord record = processedRecords.get(0);
1524+
assertThat(processedRecords).hasSize(1);
1525+
assertThat(record).isNotNull();
1526+
assertThat(record.topic()).isEqualTo("mytopic");
1527+
assertThat(record.value()).isInstanceOf(Map.class);
1528+
assertNull(record.valueSchema()); // JSON with no schema
1529+
1530+
// Verify JSON data
1531+
@SuppressWarnings("unchecked")
1532+
Map<String, Object> value = (Map<String, Object>) record.value();
1533+
assertEquals(123L, value.get("id"));
1534+
assertEquals("test", value.get("name"));
1535+
assertEquals(true, value.get("active"));
1536+
1537+
// TODO: uncomment the following change after merging #171
1538+
// final Headers headers = record.headers();
1539+
1540+
// // Verify JMS properties are copied even when JMS body processing is disabled
1541+
// assertThat(headers.lastWithName("source").value()).isEqualTo("system-a");
1542+
// assertThat(headers.lastWithName("version").value()).isEqualTo("2");
1543+
// assertThat(headers.lastWithName("timestamp").value()).isEqualTo("1234567890");
1544+
}
15001545
}

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717

1818
import java.io.File;
1919
import java.net.MalformedURLException;
20-
import java.net.URL;
20+
import java.net.URI;
21+
import java.net.URISyntaxException;
2122
import java.util.ArrayList;
2223
import java.util.List;
2324
import java.util.Locale;
@@ -244,7 +245,7 @@ public class MQSourceConnector extends SourceConnector {
244245
* @param props configuration settings
245246
*/
246247
@Override public void start(final Map<String, String> props) {
247-
log.trace("[{}] Entry {}.start, props={}", Thread.currentThread().getId(), this.getClass().getName(), props);
248+
log.trace("[{}] Entry {}.start, props={}", Thread.currentThread().threadId(), this.getClass().getName(), props);
248249

249250
configProps = props;
250251
for (final Entry<String, String> entry : props.entrySet()) {
@@ -257,7 +258,7 @@ public class MQSourceConnector extends SourceConnector {
257258
log.debug("Connector props entry {} : {}", entry.getKey(), value);
258259
}
259260

260-
log.trace("[{}] Exit {}.start", Thread.currentThread().getId(), this.getClass().getName());
261+
log.trace("[{}] Exit {}.start", Thread.currentThread().threadId(), this.getClass().getName());
261262
}
262263

263264
/**
@@ -277,15 +278,15 @@ public Class<? extends Task> taskClass() {
277278
*/
278279
@Override
279280
public List<Map<String, String>> taskConfigs(final int maxTasks) {
280-
log.trace("[{}] Entry {}.taskConfigs, maxTasks={}", Thread.currentThread().getId(), this.getClass().getName(),
281+
log.trace("[{}] Entry {}.taskConfigs, maxTasks={}", Thread.currentThread().threadId(), this.getClass().getName(),
281282
maxTasks);
282283

283284
final List<Map<String, String>> taskConfigs = new ArrayList<>();
284285
for (int i = 0; i < maxTasks; i++) {
285286
taskConfigs.add(configProps);
286287
}
287288

288-
log.trace("[{}] Exit {}.taskConfigs, retval={}", Thread.currentThread().getId(), this.getClass().getName(),
289+
log.trace("[{}] Exit {}.taskConfigs, retval={}", Thread.currentThread().threadId(), this.getClass().getName(),
289290
taskConfigs);
290291
return taskConfigs;
291292
}
@@ -295,8 +296,8 @@ public List<Map<String, String>> taskConfigs(final int maxTasks) {
295296
*/
296297
@Override
297298
public void stop() {
298-
log.trace("[{}] Entry {}.stop", Thread.currentThread().getId(), this.getClass().getName());
299-
log.trace("[{}] Exit {}.stop", Thread.currentThread().getId(), this.getClass().getName());
299+
log.trace("[{}] Entry {}.stop", Thread.currentThread().threadId(), this.getClass().getName());
300+
log.trace("[{}] Exit {}.stop", Thread.currentThread().threadId(), this.getClass().getName());
300301
}
301302

302303
/**
@@ -709,8 +710,8 @@ public void ensureValid(final String name, final Object value) {
709710
}
710711

711712
try {
712-
new URL(strValue);
713-
} catch (final MalformedURLException exc) {
713+
new URI(strValue).toURL();
714+
} catch (final MalformedURLException | URISyntaxException exc) {
714715
throw new ConfigException(name, value, "Value must be a valid URL");
715716
}
716717
}

src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
import static java.nio.charset.StandardCharsets.UTF_8;
1919

20+
import java.nio.ByteBuffer;
21+
import java.nio.ByteOrder;
2022
import java.util.HashMap;
2123
import javax.jms.BytesMessage;
2224
import javax.jms.JMSContext;
@@ -32,12 +34,21 @@
3234
/**
3335
* Builds Kafka Connect SourceRecords from messages. It parses the bytes of the payload of JMS
3436
* BytesMessage and TextMessage as JSON and creates a SourceRecord with a null schema.
37+
*
38+
* When messageBodyJms is false, this builder can handle MQ messages with RFH2 headers by
39+
* automatically detecting and skipping them to extract the JSON payload.
3540
*/
3641
public class JsonRecordBuilder extends BaseRecordBuilder {
3742
private static final Logger log = LoggerFactory.getLogger(JsonRecordBuilder.class);
3843

3944
private JsonConverter converter;
4045

46+
// RFH2 header constants
47+
private static final String RFH2_STRUC_ID = "RFH ";
48+
private static final int RFH2_STRUCT_ID_LENGTH = 4;
49+
private static final int RFH2_STRUC_LENGTH_OFFSET = 8;
50+
private static final int RFH2_MIN_HEADER_SIZE = 36;
51+
4152
public JsonRecordBuilder() {
4253
log.info("Building records using com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
4354
converter = new JsonConverter();
@@ -65,7 +76,7 @@ public JsonRecordBuilder() {
6576
@Override
6677
public SchemaAndValue getValue(final JMSContext context, final String topic, final boolean messageBodyJms,
6778
final Message message) throws JMSException {
68-
final byte[] payload;
79+
byte[] payload;
6980

7081
if (message instanceof BytesMessage) {
7182
payload = message.getBody(byte[].class);
@@ -77,6 +88,84 @@ public SchemaAndValue getValue(final JMSContext context, final String topic, fin
7788
throw new RecordBuilderException("Unsupported JMS message type");
7889
}
7990

91+
// When messageBodyJms is false, the message may contain RFH2 headers that need to be skipped
92+
if (!messageBodyJms) {
93+
payload = stripRFH2Header(payload);
94+
}
8095
return converter.toConnectData(topic, payload);
8196
}
97+
98+
/**
99+
* Skips RFH2 (Rules and Formatting Header version 2) if present in the payload.
100+
* RFH2 headers are used by IBM MQ to carry additional message properties and metadata.
101+
*
102+
* When messageBodyJms is false (WMQ_MESSAGE_BODY_MQ), JMS does not automatically
103+
* strip RFH2 headers, so we need to parse and skip them manually.
104+
*
105+
* RFH2 structure:
106+
* - StrucId (4 bytes): "RFH " (with trailing space)
107+
* - Version (4 bytes): Version number (typically 2)
108+
* - StrucLength (4 bytes): Total length of RFH2 header including all folders
109+
* - Encoding (4 bytes): Numeric encoding
110+
* - CodedCharSetId (4 bytes): Character set identifier
111+
* - Format (8 bytes): Format name of data following the header
112+
* - Flags (4 bytes): Flags
113+
* - NameValueCCSID (4 bytes): CCSID of name-value data
114+
* - Variable length folders containing name-value pairs
115+
*
116+
* Inspired from https://github.com/CommunityHiQ/Frends.Community.IBMMQ/blob/master/Frends.Community.IBMMQ/Helpers/IBMMQHelpers.cs
117+
*
118+
* @param payload the original message payload
119+
* @return the payload with RFH2 header removed if present, otherwise the original payload
120+
*/
121+
private byte[] stripRFH2Header(final byte[] payload) {
122+
if (payload == null || payload.length < RFH2_MIN_HEADER_SIZE) {
123+
return payload;
124+
}
125+
126+
// Check if the message starts with RFH2 structure ID
127+
final String strucId = new String(payload, 0, RFH2_STRUCT_ID_LENGTH, UTF_8);
128+
if (!RFH2_STRUC_ID.equals(strucId)) {
129+
log.debug("No RFH2 header detected");
130+
return payload;
131+
}
132+
133+
try {
134+
// Read version to detect endianness
135+
final ByteBuffer buffer = ByteBuffer.wrap(payload);
136+
buffer.order(ByteOrder.LITTLE_ENDIAN); // Default to little-endian
137+
buffer.position(RFH2_STRUCT_ID_LENGTH); // Skip StrucId (4 bytes)
138+
int version = buffer.getInt();
139+
140+
// Detect endianness: if version is not 1 or 2, it's likely big-endian
141+
if (version > 2 || version < 1) {
142+
version = Integer.reverseBytes(version);
143+
buffer.order(ByteOrder.BIG_ENDIAN);
144+
log.debug("Detected big-endian RFH2 header");
145+
} else {
146+
log.debug("Detected little-endian RFH2 header");
147+
}
148+
149+
// Read the RFH2 structure length, Skip StrucId (4 bytes) and Version (4 bytes)
150+
buffer.position(RFH2_STRUC_LENGTH_OFFSET);
151+
final int strucLength = buffer.getInt();
152+
153+
if (strucLength < RFH2_MIN_HEADER_SIZE || strucLength > payload.length) {
154+
log.warn("Invalid RFH2 structure length: {}. Treating entire payload as message.", strucLength);
155+
return payload;
156+
}
157+
158+
log.debug("RFH2 header detected (version: {}, length: {} bytes). Stripping header.", version, strucLength);
159+
160+
// Extract the actual message payload after the RFH2 header
161+
final byte[] actualPayload = new byte[payload.length - strucLength];
162+
System.arraycopy(payload, strucLength, actualPayload, 0, actualPayload.length);
163+
164+
return actualPayload;
165+
166+
} catch (final Exception e) {
167+
log.error("Error parsing RFH2 header: {}. Returning original payload.", e.getMessage());
168+
return payload;
169+
}
170+
}
82171
}

0 commit comments

Comments
 (0)