Skip to content

Commit 92a8da8

Browse files
Merge pull request opendatahub-io#48 from kserve/main
[pull] main from kserve:main
2 parents c3d4fb8 + eaa2fde commit 92a8da8

File tree

4 files changed

+82
-47
lines changed

4 files changed

+82
-47
lines changed

src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -725,6 +725,7 @@ public void onHalfClose() {
725725
String vModelId = null;
726726
String requestId = null;
727727
ModelResponse response = null;
728+
ByteBuf responsePayload = null;
728729
try (InterruptingListener cancelListener = newInterruptingListener()) {
729730
if (logHeaders != null) {
730731
logHeaders.addToMDC(headers); // MDC cleared in finally block
@@ -767,18 +768,20 @@ public void onHalfClose() {
767768
} finally {
768769
if (payloadProcessor != null) {
769770
processPayload(reqMessage.readerIndex(reqReaderIndex),
770-
requestId, resolvedModelId, methodName, headers, null, true);
771+
requestId, resolvedModelId, vModelId, methodName, headers, null);
771772
} else {
772773
releaseReqMessage();
773774
}
774775
reqMessage = null; // ownership released or transferred
775776
}
776777

777-
respReaderIndex = response.data.readerIndex();
778778
respSize = response.data.readableBytes();
779779
call.sendHeaders(response.metadata);
780+
if (payloadProcessor != null) {
781+
responsePayload = response.data.retainedSlice();
782+
}
780783
call.sendMessage(response.data);
781-
// response is released via ReleaseAfterResponse.releaseAll()
784+
// final response refcount is released via ReleaseAfterResponse.releaseAll()
782785
status = OK;
783786
} catch (Exception e) {
784787
status = toStatus(e);
@@ -795,17 +798,13 @@ public void onHalfClose() {
795798
evictMethodDescriptor(methodName);
796799
}
797800
} finally {
798-
final boolean releaseResponse = status != OK;
799801
if (payloadProcessor != null) {
800-
ByteBuf data = null;
801-
Metadata metadata = null;
802-
if (response != null) {
803-
data = response.data.readerIndex(respReaderIndex);
804-
metadata = response.metadata;
805-
}
806-
processPayload(data, requestId, resolvedModelId, methodName, metadata, status, releaseResponse);
807-
} else if (releaseResponse && response != null) {
808-
response.release();
802+
Metadata metadata = response != null ? response.metadata : null;
803+
processPayload(responsePayload, requestId, resolvedModelId, vModelId, methodName, metadata, status);
804+
}
805+
if (status != OK && response != null) {
806+
// An additional release is required if we call.sendMessage() wasn't sucessful
807+
response.data.release();
809808
}
810809
ReleaseAfterResponse.releaseAll();
811810
clearThreadLocals();
@@ -820,23 +819,22 @@ public void onHalfClose() {
820819
}
821820

822821
/**
823-
* Invoke PayloadProcessor on the request/response data
822+
* Invoke PayloadProcessor on the request/response data. This method takes ownership
823+
* of the passed-in {@code ByteBuf}.
824+
*
824825
* @param data the binary data
825826
* @param payloadId the id of the request
826827
* @param modelId the id of the model
828+
* @param vModelId the id of the vModel
827829
* @param methodName the name of the invoked method
828830
* @param metadata the method name metadata
829831
* @param status null for requests, non-null for responses
830-
* @param takeOwnership whether the processor should take ownership
831832
*/
832-
private void processPayload(ByteBuf data, String payloadId, String modelId, String methodName,
833-
Metadata metadata, io.grpc.Status status, boolean takeOwnership) {
833+
private void processPayload(ByteBuf data, String payloadId, String modelId, String vModelId, String methodName,
834+
Metadata metadata, io.grpc.Status status) {
834835
Payload payload = null;
835836
try {
836837
assert payloadProcessor != null;
837-
if (!takeOwnership) {
838-
ReferenceCountUtil.retain(data);
839-
}
840838
payload = new Payload(payloadId, modelId, methodName, metadata, data, status);
841839
if (payloadProcessor.process(payload)) {
842840
data = null; // ownership transferred
@@ -1200,6 +1198,7 @@ public void getVModelStatus(GetVModelStatusRequest request, StreamObserver<VMode
12001198
} finally {
12011199
clearThreadLocals();
12021200
}
1201+
12031202
}
12041203

12051204
@Override

src/main/java/com/ibm/watson/modelmesh/payload/MatchingPayloadProcessor.java

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.ibm.watson.modelmesh.payload;
1818

1919
import java.io.IOException;
20+
import java.util.Objects;
2021

2122
/**
2223
* A {@link PayloadProcessor} that processes {@link Payload}s only if they match with given model ID or method name.
@@ -29,10 +30,13 @@ public class MatchingPayloadProcessor implements PayloadProcessor {
2930

3031
private final String modelId;
3132

32-
MatchingPayloadProcessor(PayloadProcessor delegate, String methodName, String modelId) {
33+
private final String vModelId;
34+
35+
MatchingPayloadProcessor(PayloadProcessor delegate, String methodName, String modelId, String vModelId) {
3336
this.delegate = delegate;
3437
this.methodName = methodName;
3538
this.modelId = modelId;
39+
this.vModelId = vModelId;
3640
}
3741

3842
@Override
@@ -42,40 +46,49 @@ public String getName() {
4246

4347
@Override
4448
public boolean process(Payload payload) {
45-
boolean processed = false;
46-
boolean methodMatches = true;
47-
if (this.methodName != null) {
48-
methodMatches = payload.getMethod() != null && this.methodName.equals(payload.getMethod());
49-
}
49+
boolean methodMatches = this.methodName == null || Objects.equals(this.methodName, payload.getMethod());
5050
if (methodMatches) {
51-
boolean modelIdMatches = true;
52-
if (this.modelId != null) {
53-
modelIdMatches = this.modelId.equals(payload.getModelId());
54-
}
51+
boolean modelIdMatches = this.modelId == null || this.modelId.equals(payload.getModelId());
5552
if (modelIdMatches) {
56-
processed = delegate.process(payload);
53+
boolean vModelIdMatches = this.vModelId == null || this.vModelId.equals(payload.getVModelId());
54+
if (vModelIdMatches) {
55+
return delegate.process(payload);
56+
}
5757
}
5858
}
59-
return processed;
59+
return false;
6060
}
6161

6262
public static MatchingPayloadProcessor from(String modelId, String method, PayloadProcessor processor) {
63+
return from(modelId, null, method, processor);
64+
}
65+
66+
public static MatchingPayloadProcessor from(String modelId, String vModelId,
67+
String method, PayloadProcessor processor) {
6368
if (modelId != null) {
64-
if (modelId.length() > 0) {
69+
if (!modelId.isEmpty()) {
6570
modelId = modelId.replaceFirst("/", "");
66-
if (modelId.length() == 0 || modelId.equals("*")) {
71+
if (modelId.isEmpty() || modelId.equals("*")) {
6772
modelId = null;
6873
}
6974
} else {
7075
modelId = null;
7176
}
7277
}
73-
if (method != null) {
74-
if (method.length() == 0 || method.equals("*")) {
75-
method = null;
78+
if (vModelId != null) {
79+
if (!vModelId.isEmpty()) {
80+
vModelId = vModelId.replaceFirst("/", "");
81+
if (vModelId.isEmpty() || vModelId.equals("*")) {
82+
vModelId = null;
83+
}
84+
} else {
85+
vModelId = null;
7686
}
7787
}
78-
return new MatchingPayloadProcessor(processor, method, modelId);
88+
if (method != null && (method.isEmpty() || method.equals("*"))) {
89+
method = null;
90+
}
91+
return new MatchingPayloadProcessor(processor, method, modelId, vModelId);
7992
}
8093

8194
@Override

src/main/java/com/ibm/watson/modelmesh/payload/Payload.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ public enum Kind {
3939

4040
private final String modelId;
4141

42+
private final String vModelId;
43+
4244
private final String method;
4345

4446
private final Metadata metadata;
@@ -48,10 +50,17 @@ public enum Kind {
4850
// null for requests, non-null for responses
4951
private final Status status;
5052

53+
5154
public Payload(@Nonnull String id, @Nonnull String modelId, @Nullable String method, @Nullable Metadata metadata,
5255
@Nullable ByteBuf data, @Nullable Status status) {
56+
this(id, modelId, null, method, metadata, data, status);
57+
}
58+
59+
public Payload(@Nonnull String id, @Nonnull String modelId, @Nullable String vModelId, @Nullable String method,
60+
@Nullable Metadata metadata, @Nullable ByteBuf data, @Nullable Status status) {
5361
this.id = id;
5462
this.modelId = modelId;
63+
this.vModelId = vModelId;
5564
this.method = method;
5665
this.metadata = metadata;
5766
this.data = data;
@@ -68,6 +77,16 @@ public String getModelId() {
6877
return modelId;
6978
}
7079

80+
@CheckForNull
81+
public String getVModelId() {
82+
return vModelId;
83+
}
84+
85+
@Nonnull
86+
public String getVModelIdOrModelId() {
87+
return vModelId != null ? vModelId : modelId;
88+
}
89+
7190
@CheckForNull
7291
public String getMethod() {
7392
return method;
@@ -101,6 +120,7 @@ public void release() {
101120
public String toString() {
102121
return "Payload{" +
103122
"id='" + id + '\'' +
123+
", vModelId=" + (vModelId != null ? ('\'' + vModelId + '\'') : "null") +
104124
", modelId='" + modelId + '\'' +
105125
", method='" + method + '\'' +
106126
", status=" + (status == null ? "request" : String.valueOf(status)) +

src/main/java/com/ibm/watson/modelmesh/payload/RemotePayloadProcessor.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,10 @@ public boolean process(Payload payload) {
5757
private static PayloadContent prepareContentBody(Payload payload) {
5858
String id = payload.getId();
5959
String modelId = payload.getModelId();
60+
String vModelId = payload.getVModelId();
6061
String kind = payload.getKind().toString().toLowerCase();
6162
ByteBuf byteBuf = payload.getData();
62-
String data;
63-
if (byteBuf != null) {
64-
data = encodeBinaryToString(byteBuf);
65-
} else {
66-
data = "";
67-
}
63+
String data = byteBuf != null ? encodeBinaryToString(byteBuf) : "";
6864
Metadata metadata = payload.getMetadata();
6965
Map<String, String> metadataMap = new HashMap<>();
7066
if (metadata != null) {
@@ -79,7 +75,7 @@ private static PayloadContent prepareContentBody(Payload payload) {
7975
}
8076
}
8177
String status = payload.getStatus() != null ? payload.getStatus().getCode().toString() : "";
82-
return new PayloadContent(id, modelId, data, kind, status, metadataMap);
78+
return new PayloadContent(id, modelId, vModelId, data, kind, status, metadataMap);
8379
}
8480

8581
private static String encodeBinaryToString(ByteBuf byteBuf) {
@@ -116,15 +112,17 @@ private static class PayloadContent {
116112

117113
private final String id;
118114
private final String modelid;
115+
private final String vModelId;
119116
private final String data;
120117
private final String kind;
121118
private final String status;
122119
private final Map<String, String> metadata;
123120

124-
private PayloadContent(String id, String modelid, String data, String kind, String status,
125-
Map<String, String> metadata) {
121+
private PayloadContent(String id, String modelid, String vModelId, String data, String kind,
122+
String status, Map<String, String> metadata) {
126123
this.id = id;
127124
this.modelid = modelid;
125+
this.vModelId = vModelId;
128126
this.data = data;
129127
this.kind = kind;
130128
this.status = status;
@@ -143,6 +141,10 @@ public String getModelid() {
143141
return modelid;
144142
}
145143

144+
public String getvModelId() {
145+
return vModelId;
146+
}
147+
146148
public String getData() {
147149
return data;
148150
}
@@ -160,6 +162,7 @@ public String toString() {
160162
return "PayloadContent{" +
161163
"id='" + id + '\'' +
162164
", modelid='" + modelid + '\'' +
165+
", vModelId=" + (vModelId != null ? ('\'' + vModelId + '\'') : "null") +
163166
", data='" + data + '\'' +
164167
", kind='" + kind + '\'' +
165168
", status='" + status + '\'' +

0 commit comments

Comments
 (0)