Skip to content

Commit 4f11c7c

Browse files
authored
fix grpc send large package failed for over package size limit (#82)
* fix grpc send large package failed for over package size limit * sdk support set grpc related options
1 parent 2dfb5ce commit 4f11c7c

File tree

8 files changed

+612
-0
lines changed

8 files changed

+612
-0
lines changed

cpp/ppc-framework/protocol/GrpcConfig.h

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818
* @date 2024-09-02
1919
*/
2020
#pragma once
21+
#include "ppc-framework/Common.h"
2122
#include "ppc-framework/protocol/EndPoint.h"
2223
#include <memory>
24+
#include <sstream>
2325
#include <string>
2426

2527
namespace ppc::protocol
@@ -64,9 +66,63 @@ class GrpcConfig
6466

6567
bool enableDnslookup() const { return m_enableDnslookup; }
6668

69+
uint64_t maxSendMessageSize() const { return m_maxSendMessageSize; }
70+
uint64_t maxReceivedMessageSize() const { return m_maxReceivedMessageSize; }
71+
72+
void setMaxSendMessageSize(uint64_t maxSendMessageSize)
73+
{
74+
m_maxSendMessageSize = maxSendMessageSize;
75+
}
76+
void setMaxReceivedMessageSize(uint64_t maxReceivedMessageSize)
77+
{
78+
m_maxReceivedMessageSize = maxReceivedMessageSize;
79+
}
80+
81+
/*
82+
typedef enum {
83+
GRPC_COMPRESS_NONE = 0,
84+
GRPC_COMPRESS_DEFLATE,
85+
GRPC_COMPRESS_GZIP,
86+
GRPC_COMPRESS_ALGORITHMS_COUNT
87+
} grpc_compression_algorithm;
88+
*/
89+
int compressAlgorithm() const { return m_compressAlgorithm; }
90+
91+
void setCompressAlgorithm(int compressAlgorithm)
92+
{
93+
if (compressAlgorithm < 0 || compressAlgorithm > 2)
94+
{
95+
BOOST_THROW_EXCEPTION(WeDPRException() << bcos::errinfo_comment(
96+
"Invalid compress algorithm, must between 0-3"));
97+
}
98+
m_compressAlgorithm = compressAlgorithm;
99+
}
100+
67101
protected:
68102
bool m_enableHealthCheck = true;
69103
std::string m_loadBalancePolicy = "round_robin";
70104
bool m_enableDnslookup = false;
105+
106+
// the max send message size in bytes
107+
uint64_t m_maxSendMessageSize = 1024 * 1024 * 1024;
108+
// the max received message size in bytes
109+
uint16_t m_maxReceivedMessageSize = 1024 * 1024 * 1024;
110+
int m_compressAlgorithm = 0;
71111
};
112+
113+
inline std::string printGrpcConfig(ppc::protocol::GrpcConfig::Ptr const& grpcConfig)
114+
{
115+
if (!grpcConfig)
116+
{
117+
return "nullptr";
118+
}
119+
std::ostringstream stringstream;
120+
stringstream << LOG_KV("loadBalancePolicy", grpcConfig->loadBalancePolicy())
121+
<< LOG_KV("enableHealthCheck", grpcConfig->enableHealthCheck())
122+
<< LOG_KV("enableDnslookup", grpcConfig->enableDnslookup())
123+
<< LOG_KV("maxSendMessageSize", grpcConfig->maxSendMessageSize())
124+
<< LOG_KV("maxReceivedMessageSize", grpcConfig->maxReceivedMessageSize())
125+
<< LOG_KV("compressAlgorithm", grpcConfig->compressAlgorithm());
126+
return stringstream.str();
127+
}
72128
} // namespace ppc::protocol

cpp/wedpr-protocol/grpc/Common.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,13 @@
2020
#pragma once
2121
#include "ppc-framework/Common.h"
2222
#include "ppc-framework/protocol/GrpcConfig.h"
23+
#include <grpc/compression.h>
2324
#include <grpcpp/grpcpp.h>
2425

2526
namespace ppc::protocol
2627
{
28+
#define GRPC_LOG(LEVEL) BCOS_LOG(LEVEL) << "[GRPC]"
29+
2730
inline grpc::ChannelArguments toChannelConfig(ppc::protocol::GrpcConfig::Ptr const& grpcConfig)
2831
{
2932
grpc::ChannelArguments args;
@@ -47,6 +50,11 @@ inline grpc::ChannelArguments toChannelConfig(ppc::protocol::GrpcConfig::Ptr con
4750
{
4851
args.SetInt("grpc.enable_dns_srv_lookup", 1);
4952
}
53+
args.SetMaxReceiveMessageSize(grpcConfig->maxReceivedMessageSize());
54+
args.SetMaxSendMessageSize(grpcConfig->maxSendMessageSize());
55+
// the compress algorithm
56+
args.SetCompressionAlgorithm((grpc_compression_algorithm)(grpcConfig->compressAlgorithm()));
57+
GRPC_LOG(INFO) << LOG_DESC("toChannelConfig") << printGrpcConfig(grpcConfig);
5058
return args;
5159
}
5260
} // namespace ppc::protocol

cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/GrpcConfig.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,4 +67,30 @@ public void setEnableDnslookup(boolean enableDnslookup) {
6767
public boolean enableDnslookup() {
6868
return wedpr_java_transportJNI.GrpcConfig_enableDnslookup(swigCPtr, this);
6969
}
70+
71+
public java.math.BigInteger maxSendMessageSize() {
72+
return wedpr_java_transportJNI.GrpcConfig_maxSendMessageSize(swigCPtr, this);
73+
}
74+
75+
public java.math.BigInteger maxReceivedMessageSize() {
76+
return wedpr_java_transportJNI.GrpcConfig_maxReceivedMessageSize(swigCPtr, this);
77+
}
78+
79+
public void setMaxSendMessageSize(java.math.BigInteger maxSendMessageSize) {
80+
wedpr_java_transportJNI.GrpcConfig_setMaxSendMessageSize(
81+
swigCPtr, this, maxSendMessageSize);
82+
}
83+
84+
public void setMaxReceivedMessageSize(java.math.BigInteger maxReceivedMessageSize) {
85+
wedpr_java_transportJNI.GrpcConfig_setMaxReceivedMessageSize(
86+
swigCPtr, this, maxReceivedMessageSize);
87+
}
88+
89+
public int compressAlgorithm() {
90+
return wedpr_java_transportJNI.GrpcConfig_compressAlgorithm(swigCPtr, this);
91+
}
92+
93+
public void setCompressAlgorithm(int compressAlgorithm) {
94+
wedpr_java_transportJNI.GrpcConfig_setCompressAlgorithm(swigCPtr, this, compressAlgorithm);
95+
}
7096
}

cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/wedpr_java_transport.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ public static String printFrontDesc(FrontConfig config) {
1313
return wedpr_java_transportJNI.printFrontDesc(FrontConfig.getCPtr(config), config);
1414
}
1515

16+
public static String printGrpcConfig(GrpcConfig grpcConfig) {
17+
return wedpr_java_transportJNI.printGrpcConfig(GrpcConfig.getCPtr(grpcConfig), grpcConfig);
18+
}
19+
1620
public static String printOptionalField(MessageOptionalHeader optionalHeader) {
1721
return wedpr_java_transportJNI.printOptionalField(
1822
MessageOptionalHeader.getCPtr(optionalHeader), optionalHeader);

cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/java/com/webank/wedpr/sdk/jni/generated/wedpr_java_transportJNI.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,25 @@ public static final native void GrpcConfig_setEnableDnslookup(
404404

405405
public static final native boolean GrpcConfig_enableDnslookup(long jarg1, GrpcConfig jarg1_);
406406

407+
public static final native java.math.BigInteger GrpcConfig_maxSendMessageSize(
408+
long jarg1, GrpcConfig jarg1_);
409+
410+
public static final native java.math.BigInteger GrpcConfig_maxReceivedMessageSize(
411+
long jarg1, GrpcConfig jarg1_);
412+
413+
public static final native void GrpcConfig_setMaxSendMessageSize(
414+
long jarg1, GrpcConfig jarg1_, java.math.BigInteger jarg2);
415+
416+
public static final native void GrpcConfig_setMaxReceivedMessageSize(
417+
long jarg1, GrpcConfig jarg1_, java.math.BigInteger jarg2);
418+
419+
public static final native int GrpcConfig_compressAlgorithm(long jarg1, GrpcConfig jarg1_);
420+
421+
public static final native void GrpcConfig_setCompressAlgorithm(
422+
long jarg1, GrpcConfig jarg1_, int jarg2);
423+
424+
public static final native String printGrpcConfig(long jarg1, GrpcConfig jarg1_);
425+
407426
public static final native void delete_MessageOptionalHeader(long jarg1);
408427

409428
public static final native void MessageOptionalHeader_encode(

cpp/wedpr-transport/sdk-wrapper/java/src/wedpr_java_transportJAVA_wrap.cxx

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4453,6 +4453,208 @@ SWIGEXPORT jboolean JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_
44534453
}
44544454

44554455

4456+
SWIGEXPORT jobject JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_1transportJNI_GrpcConfig_1maxSendMessageSize(JNIEnv *jenv, jclass jcls, jlong jarg1, jobject jarg1_) {
4457+
jobject jresult = 0 ;
4458+
ppc::protocol::GrpcConfig *arg1 = (ppc::protocol::GrpcConfig *) 0 ;
4459+
std::shared_ptr< ppc::protocol::GrpcConfig const > *smartarg1 = 0 ;
4460+
uint64_t result;
4461+
4462+
(void)jenv;
4463+
(void)jcls;
4464+
(void)jarg1_;
4465+
4466+
smartarg1 = *(std::shared_ptr< const ppc::protocol::GrpcConfig > **)&jarg1;
4467+
arg1 = (ppc::protocol::GrpcConfig *)(smartarg1 ? smartarg1->get() : 0);
4468+
result = (uint64_t)((ppc::protocol::GrpcConfig const *)arg1)->maxSendMessageSize();
4469+
{
4470+
jbyteArray ba = jenv->NewByteArray(9);
4471+
jbyte* bae = jenv->GetByteArrayElements(ba, 0);
4472+
jclass clazz = jenv->FindClass("java/math/BigInteger");
4473+
jmethodID mid = jenv->GetMethodID(clazz, "<init>", "([B)V");
4474+
jobject bigint;
4475+
int i;
4476+
4477+
bae[0] = 0;
4478+
for(i=1; i<9; i++ ) {
4479+
bae[i] = (jbyte)(result>>8*(8-i));
4480+
}
4481+
4482+
jenv->ReleaseByteArrayElements(ba, bae, 0);
4483+
bigint = jenv->NewObject(clazz, mid, ba);
4484+
jenv->DeleteLocalRef(ba);
4485+
jresult = bigint;
4486+
}
4487+
return jresult;
4488+
}
4489+
4490+
4491+
SWIGEXPORT jobject JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_1transportJNI_GrpcConfig_1maxReceivedMessageSize(JNIEnv *jenv, jclass jcls, jlong jarg1, jobject jarg1_) {
4492+
jobject jresult = 0 ;
4493+
ppc::protocol::GrpcConfig *arg1 = (ppc::protocol::GrpcConfig *) 0 ;
4494+
std::shared_ptr< ppc::protocol::GrpcConfig const > *smartarg1 = 0 ;
4495+
uint64_t result;
4496+
4497+
(void)jenv;
4498+
(void)jcls;
4499+
(void)jarg1_;
4500+
4501+
smartarg1 = *(std::shared_ptr< const ppc::protocol::GrpcConfig > **)&jarg1;
4502+
arg1 = (ppc::protocol::GrpcConfig *)(smartarg1 ? smartarg1->get() : 0);
4503+
result = (uint64_t)((ppc::protocol::GrpcConfig const *)arg1)->maxReceivedMessageSize();
4504+
{
4505+
jbyteArray ba = jenv->NewByteArray(9);
4506+
jbyte* bae = jenv->GetByteArrayElements(ba, 0);
4507+
jclass clazz = jenv->FindClass("java/math/BigInteger");
4508+
jmethodID mid = jenv->GetMethodID(clazz, "<init>", "([B)V");
4509+
jobject bigint;
4510+
int i;
4511+
4512+
bae[0] = 0;
4513+
for(i=1; i<9; i++ ) {
4514+
bae[i] = (jbyte)(result>>8*(8-i));
4515+
}
4516+
4517+
jenv->ReleaseByteArrayElements(ba, bae, 0);
4518+
bigint = jenv->NewObject(clazz, mid, ba);
4519+
jenv->DeleteLocalRef(ba);
4520+
jresult = bigint;
4521+
}
4522+
return jresult;
4523+
}
4524+
4525+
4526+
SWIGEXPORT void JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_1transportJNI_GrpcConfig_1setMaxSendMessageSize(JNIEnv *jenv, jclass jcls, jlong jarg1, jobject jarg1_, jobject jarg2) {
4527+
ppc::protocol::GrpcConfig *arg1 = (ppc::protocol::GrpcConfig *) 0 ;
4528+
uint64_t arg2 ;
4529+
std::shared_ptr< ppc::protocol::GrpcConfig > *smartarg1 = 0 ;
4530+
4531+
(void)jenv;
4532+
(void)jcls;
4533+
(void)jarg1_;
4534+
4535+
smartarg1 = *(std::shared_ptr< ppc::protocol::GrpcConfig > **)&jarg1;
4536+
arg1 = (ppc::protocol::GrpcConfig *)(smartarg1 ? smartarg1->get() : 0);
4537+
{
4538+
jclass clazz;
4539+
jmethodID mid;
4540+
jbyteArray ba;
4541+
jbyte* bae;
4542+
jsize sz;
4543+
int i;
4544+
4545+
if (!jarg2) {
4546+
SWIG_JavaThrowException(jenv, SWIG_JavaNullPointerException, "BigInteger null");
4547+
return ;
4548+
}
4549+
clazz = jenv->GetObjectClass(jarg2);
4550+
mid = jenv->GetMethodID(clazz, "toByteArray", "()[B");
4551+
ba = (jbyteArray)jenv->CallObjectMethod(jarg2, mid);
4552+
bae = jenv->GetByteArrayElements(ba, 0);
4553+
sz = jenv->GetArrayLength(ba);
4554+
arg2 = 0;
4555+
if (sz > 0) {
4556+
arg2 = (uint64_t)(signed char)bae[0];
4557+
for(i=1; i<sz; i++) {
4558+
arg2 = (arg2 << 8) | (uint64_t)(unsigned char)bae[i];
4559+
}
4560+
}
4561+
jenv->ReleaseByteArrayElements(ba, bae, 0);
4562+
}
4563+
(arg1)->setMaxSendMessageSize(arg2);
4564+
}
4565+
4566+
4567+
SWIGEXPORT void JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_1transportJNI_GrpcConfig_1setMaxReceivedMessageSize(JNIEnv *jenv, jclass jcls, jlong jarg1, jobject jarg1_, jobject jarg2) {
4568+
ppc::protocol::GrpcConfig *arg1 = (ppc::protocol::GrpcConfig *) 0 ;
4569+
uint64_t arg2 ;
4570+
std::shared_ptr< ppc::protocol::GrpcConfig > *smartarg1 = 0 ;
4571+
4572+
(void)jenv;
4573+
(void)jcls;
4574+
(void)jarg1_;
4575+
4576+
smartarg1 = *(std::shared_ptr< ppc::protocol::GrpcConfig > **)&jarg1;
4577+
arg1 = (ppc::protocol::GrpcConfig *)(smartarg1 ? smartarg1->get() : 0);
4578+
{
4579+
jclass clazz;
4580+
jmethodID mid;
4581+
jbyteArray ba;
4582+
jbyte* bae;
4583+
jsize sz;
4584+
int i;
4585+
4586+
if (!jarg2) {
4587+
SWIG_JavaThrowException(jenv, SWIG_JavaNullPointerException, "BigInteger null");
4588+
return ;
4589+
}
4590+
clazz = jenv->GetObjectClass(jarg2);
4591+
mid = jenv->GetMethodID(clazz, "toByteArray", "()[B");
4592+
ba = (jbyteArray)jenv->CallObjectMethod(jarg2, mid);
4593+
bae = jenv->GetByteArrayElements(ba, 0);
4594+
sz = jenv->GetArrayLength(ba);
4595+
arg2 = 0;
4596+
if (sz > 0) {
4597+
arg2 = (uint64_t)(signed char)bae[0];
4598+
for(i=1; i<sz; i++) {
4599+
arg2 = (arg2 << 8) | (uint64_t)(unsigned char)bae[i];
4600+
}
4601+
}
4602+
jenv->ReleaseByteArrayElements(ba, bae, 0);
4603+
}
4604+
(arg1)->setMaxReceivedMessageSize(arg2);
4605+
}
4606+
4607+
4608+
SWIGEXPORT jint JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_1transportJNI_GrpcConfig_1compressAlgorithm(JNIEnv *jenv, jclass jcls, jlong jarg1, jobject jarg1_) {
4609+
jint jresult = 0 ;
4610+
ppc::protocol::GrpcConfig *arg1 = (ppc::protocol::GrpcConfig *) 0 ;
4611+
std::shared_ptr< ppc::protocol::GrpcConfig const > *smartarg1 = 0 ;
4612+
int result;
4613+
4614+
(void)jenv;
4615+
(void)jcls;
4616+
(void)jarg1_;
4617+
4618+
smartarg1 = *(std::shared_ptr< const ppc::protocol::GrpcConfig > **)&jarg1;
4619+
arg1 = (ppc::protocol::GrpcConfig *)(smartarg1 ? smartarg1->get() : 0);
4620+
result = (int)((ppc::protocol::GrpcConfig const *)arg1)->compressAlgorithm();
4621+
jresult = (jint)result;
4622+
return jresult;
4623+
}
4624+
4625+
4626+
SWIGEXPORT void JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_1transportJNI_GrpcConfig_1setCompressAlgorithm(JNIEnv *jenv, jclass jcls, jlong jarg1, jobject jarg1_, jint jarg2) {
4627+
ppc::protocol::GrpcConfig *arg1 = (ppc::protocol::GrpcConfig *) 0 ;
4628+
int arg2 ;
4629+
std::shared_ptr< ppc::protocol::GrpcConfig > *smartarg1 = 0 ;
4630+
4631+
(void)jenv;
4632+
(void)jcls;
4633+
(void)jarg1_;
4634+
4635+
smartarg1 = *(std::shared_ptr< ppc::protocol::GrpcConfig > **)&jarg1;
4636+
arg1 = (ppc::protocol::GrpcConfig *)(smartarg1 ? smartarg1->get() : 0);
4637+
arg2 = (int)jarg2;
4638+
(arg1)->setCompressAlgorithm(arg2);
4639+
}
4640+
4641+
4642+
SWIGEXPORT jstring JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_1transportJNI_printGrpcConfig(JNIEnv *jenv, jclass jcls, jlong jarg1, jobject jarg1_) {
4643+
jstring jresult = 0 ;
4644+
ppc::protocol::GrpcConfig::Ptr *arg1 = 0 ;
4645+
ppc::protocol::GrpcConfig::Ptr tempnull1 ;
4646+
std::string result;
4647+
4648+
(void)jenv;
4649+
(void)jcls;
4650+
(void)jarg1_;
4651+
arg1 = jarg1 ? *(ppc::protocol::GrpcConfig::Ptr **)&jarg1 : &tempnull1;
4652+
result = ppc::protocol::printGrpcConfig((std::shared_ptr< ppc::protocol::GrpcConfig > const &)*arg1);
4653+
jresult = jenv->NewStringUTF((&result)->c_str());
4654+
return jresult;
4655+
}
4656+
4657+
44564658
SWIGEXPORT void JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_1transportJNI_delete_1MessageOptionalHeader(JNIEnv *jenv, jclass jcls, jlong jarg1) {
44574659
ppc::protocol::MessageOptionalHeader *arg1 = (ppc::protocol::MessageOptionalHeader *) 0 ;
44584660
std::shared_ptr< ppc::protocol::MessageOptionalHeader > *smartarg1 = 0 ;

0 commit comments

Comments
 (0)