Skip to content

Error: Connection failed: ArgumentError("invalid value for Enum WireType: 7") #282

@rzhli

Description

@rzhli

I'm trying to build a Futu OpenAPI SDK, but encounter with an invalid value for Enum WireType: 7 error when decoding the proto message received.

The .proto file:
 syntax = "proto2";
 package InitConnect;
 option java_package = "com.futu.openapi.pb";
 option go_package = "github.com/futuopen/ftapi4go/pb/initconnect";
 
 import "Common.proto";
 
 message C2S
 {
 	required int32 clientVer = 1; //客户端版本号,clientVer = "."以前的数 * 100 + "."以后的,举例:1.1版本的clientVer为1 * 100 + 1 = 101,2.21版本为2 * 100 + 21 = 221
 	required string clientID = 2; //客户端唯一标识,无生具体生成规则,客户端自己保证唯一性即可
 	optional bool recvNotify = 3; //此连接是否接收市场状态、交易需要重新解锁等等事件通知,true代表接收,FutuOpenD就会向此连接推送这些通知,反之false代表不接收不推送
 	//如果通信要加密,首先得在FutuOpenD和客户端都配置RSA密钥,不配置始终不加密
 	//如果配置了RSA密钥且指定的加密算法不为PacketEncAlgo_None则加密(即便这里不设置,配置了RSA密钥,也会采用默认加密方式),默认采用FTAES_ECB算法
 	optional int32 packetEncAlgo = 4; //指定包加密算法,参见Common.PacketEncAlgo的枚举定义
 	optional int32 pushProtoFmt = 5; //指定这条连接上的推送协议格式,若不指定则使用push_proto_type配置项
 	optional string programmingLanguage = 6; //接口编程语言,用于统计语言偏好
 }
 
 message S2C
 {
 	required int32 serverVer = 1; //FutuOpenD的版本号
 	required uint64 loginUserID = 2; //FutuOpenD登陆的牛牛用户ID
 	required uint64 connID = 3; //此连接的连接ID,连接的唯一标识
 	required string connAESKey = 4; //此连接后续AES加密通信的Key,固定为16字节长字符串
 	required int32 keepAliveInterval = 5; //心跳保活间隔
 	optional string aesCBCiv = 6; //AES加密通信CBC加密模式的iv,固定为16字节长字符串
 	optional int32 userAttribution = 7; //用户类型,牛牛用户或MooMoo用户
 }
 
 message Request
 {
 	required C2S c2s = 1;
 }
 
 message Response
 {
 	required int32 retType = 1 [default = -400]; //返回结果,参见Common.RetType的枚举定义
 	optional string retMsg = 2; //返回结果描述
 	optional int32 errCode = 3; //错误码,客户端一般通过retType和retMsg来判断结果和详情,errCode只做日志记录,仅在个别协议失败时对账用
 	
 	optional S2C s2c = 4;
 }
and .jl file generated:
module InitConnect

using ProtoBuf
import ProtoBuf as PB
using ..Common

mutable struct C2S
    clientVer::Int32
    clientID::String
    recvNotify::Bool
    packetEncAlgo::Common.PacketEncAlgo.T
    pushProtoFmt::Common.ProtoFmt.T
    programmingLanguage::String
end
C2S() = C2S(zero(Int32), "", false, Common.PacketEncAlgo.PacketEncAlgo_FTAES_ECB, Common.ProtoFmt.ProtoFmt_Protobuf, "")
PB.default_values(::Type{C2S}) = (;clientVer = zero(Int32), clientID = "", recvNotify = false, packetEncAlgo = Common.PacketEncAlgo_FTAES_ECB, pushProtoFmt = Common.ProtoFmt.ProtoFmt_Protobuf, programmingLanguage = "")
PB.field_numbers(::Type{C2S}) = (;clientVer = 1, clientID = 2, recvNotify = 3, packetEncAlgo = 4, pushProtoFmt = 5, programmingLanguage = 6)
function PB.encode(e::PB.AbstractProtoEncoder, x::C2S)
    initpos = position(e.io)
    x.clientVer != zero(Int32) && PB.encode(e, 1, x.clientVer)
    x.clientID != "" && PB.encode(e, 2, x.clientID)
    x.recvNotify != false && PB.encode(e, 3, x.recvNotify)
    x.packetEncAlgo != Common.PacketEncAlgo.PacketEncAlgo_FTAES_ECB && PB.encode(e, 4, x.packetEncAlgo)
    x.pushProtoFmt != Common.ProtoFmt.ProtoFmt_Protobuf && PB.encode(e, 5, x.pushProtoFmt)
    x.programmingLanguage != "" && PB.encode(e, 6, x.programmingLanguage)
    return position(e.io) - initpos
end

mutable struct S2C
    serverVer::Int32
    loginUserID::UInt64
    connID::UInt64
    connAESKey::String
    keepAliveInterval::Int32
    aesCBCiv::String
    userAttribution::Int32
end
S2C() = S2C(zero(Int32), zero(UInt64), zero(UInt64), "", zero(Int32), "", zero(Int32))
PB.default_values(::Type{S2C}) = (;serverVer = zero(Int32), loginUserID = zero(UInt64), connID = zero(UInt64), connAESKey = "", keepAliveInterval = zero(Int32), aesCBCiv = "", userAttribution = zero(Int32))
PB.field_numbers(::Type{S2C}) = (;serverVer = 1, loginUserID = 2, connID = 3, connAESKey = 4, keepAliveInterval = 5, aesCBCiv = 6, userAttribution = 7)
function PB.decode(d::PB.AbstractProtoDecoder, ::Type{<:S2C})
    serverVer = zero(Int32)
    loginUserID = zero(UInt64)
    connID = zero(UInt64)
    connAESKey = ""
    keepAliveInterval = zero(Int32)
    aesCBCiv = ""
    userAttribution = zero(Int32)
    while !PB.message_done(d)
        field_number, wire_type = PB.decode_tag(d)
        if field_number == 1
            serverVer = PB.decode(d, Int32)
        elseif field_number == 2
            loginUserID = PB.decode(d, UInt64)
        elseif field_number == 3
            connID = PB.decode(d, UInt64)
        elseif field_number == 4
            connAESKey = PB.decode(d, String)
        elseif field_number == 5
            keepAliveInterval = PB.decode(d, Int32)
        elseif field_number == 6
            aesCBCiv = PB.decode(d, String)
        elseif field_number == 7
            userAttribution = PB.decode(d, Int32)
        else
            PB.skip(d, wire_type)
        end
    end
    return S2C(serverVer, loginUserID, connID, connAESKey, keepAliveInterval, aesCBCiv, userAttribution)
end

mutable struct Request
    c2s::C2S
end
PB.default_values(::Type{Request}) = (;c2s = C2S())
PB.field_numbers(::Type{Request}) = (;c2s = 1)
function PB.encode(e::PB.AbstractProtoEncoder, x::Request)
    initpos = position(e.io)
    PB.encode(e, 1, x.c2s)
    return position(e.io) - initpos
end

mutable struct Response
    retType::Int32
    retMsg::String
    errCode::Int32
    s2c::S2C
end
PB.default_values(::Type{Response}) = (;retType = Common.RetType.RetType_Unknown, retMsg = "", errCode = 0, s2c = S2C())
PB.field_numbers(::Type{Response}) = (;retType = 1, retMsg = 2, errCode = 3, s2c = 4)
function PB.decode(d::PB.AbstractProtoDecoder, ::Type{<:Response})
    retType = Common.RetType.RetType_Unknown
    retMsg = ""
    errCode = 0
    s2c = S2C()
    while !PB.message_done(d)
        field_number, wire_type = PB.decode_tag(d)
        if field_number == 1
            retType = PB.decode(d, Int32)
        elseif field_number == 2
            retMsg = PB.decode(d, String)
        elseif field_number == 3
            errCode = PB.decode(d, Int32)
        elseif field_number == 4
            s2c = PB.decode(d, S2C)
        else
            PB.skip(d, wire_type)
        end
    end
    return Response(retType, retMsg, errCode, s2c)
end

export C2S, S2C, Request, Response

end
revelant function:
function connect!(conn::OpenDConnection)
    lock(conn.lock) do
        if conn.connected
            @warn "Already connected to OpenD"
            return true
        end

        try
            conn.socket = Sockets.connect(conn.host, conn.port)
            conn.connected = true
            conn.serial_no = UInt32(0)

            # Determine encryption algorithm
            is_encrypt = !isempty(conn.rsa_private_key_path)
            enc_algo = is_encrypt ? Common.PacketEncAlgo.PacketEncAlgo_AES_CBC : Common.PacketEncAlgo.PacketEncAlgo_None

            # Send InitConnect request
            c2s = InitConnect.C2S(
                310,
                "FutuAPI.jl_v1.0",
                true,
                enc_algo,
                Common.ProtoFmt.ProtoFmt_Protobuf,
                "Julia"
            )
            req = InitConnect.Request(c2s)

            # Serialize the body
            io = IOBuffer()
            encode(ProtoEncoder(io), req)
            req_body_bytes = take!(io)

            # Calculate SHA1 of original body (before encryption)
            body_sha1 = sha1(req_body_bytes)

            # Encrypt body with RSA public key if private key is configured
            final_body_bytes = req_body_bytes
            if !isempty(conn.rsa_private_key_path)
                # Replace private.pem with public.pem to get public key path
                pub_key_path = replace(conn.rsa_private_key_path, "private.pem" => "public.pem")
                final_body_bytes = Encryption.encrypt_rsa(req_body_bytes, pub_key_path)
            end

            header = ProtoHeader(
                UInt32(PROTO_ID_INIT_CONNECT),
                UInt32(length(final_body_bytes)),
                get_next_serial_no(conn)
            )
            header.body_sha1 .= body_sha1

            packet = vcat(serialize_header(header), final_body_bytes)
            write(conn.socket, packet)

            header, body_bytes = recv_packet(conn, is_init_connect=true)

            # Decrypt InitConnect response if RSA encryption is enabled
            final_body_bytes = body_bytes
            if !isempty(conn.rsa_private_key_path)
                final_body_bytes = Encryption.decrypt_rsa(body_bytes, conn.rsa_private_key_path)
                @info "Decrypted body length: $(length(final_body_bytes)), first 20 bytes: $(final_body_bytes[1:min(20, length(final_body_bytes))])"

                # Debug: show hex representation
                hex_str = join([string(b, base=16, pad=2) for b in final_body_bytes[1:min(50, length(final_body_bytes))]], " ")
                @info "Decrypted body hex (first 50 bytes): $hex_str"
            end

            resp = decode(ProtoDecoder(IOBuffer(final_body_bytes)), InitConnect.Response)    # Error with this line
            if resp.retType != Int32(Common.RetType.RetType_Succeed)
                throw(ConnectionError("Failed to initialize connection: retType=$(resp.retType), retMsg='$(resp.retMsg)', errCode=$(resp.errCode)"))
            end

            # Store connection info
            s2c = resp.s2c
            conn.server_ver = s2c.serverVer
            conn.login_user_id = s2c.loginUserID
            conn.conn_id = s2c.connID
            conn.conn_aes_key = s2c.connAESKey
            conn.conn_aes_iv = s2c.aesCBCiv
            conn.keep_alive_interval = s2c.keepAliveInterval
            conn.last_keep_alive = time()

            # @info "AES Key: $(conn.conn_aes_key), AES IV: $(conn.conn_aes_iv)"

            # Start keep-alive task
            conn.keep_alive_task = @async begin
                while is_connected(conn)
                    try
                        sleep(conn.keep_alive_interval / 2) # Send heartbeat more frequently
                        if time() - conn.last_keep_alive >= conn.keep_alive_interval
                            keep_alive(conn)
                        end
                    catch e
                        if !(e isa EOFError || e isa IOError || e isa InterruptException)
                            @error "Keep-alive task failed: $e"
                        end
                        break
                    end
                end
            end

            @info "Connected to OpenD at $(conn.host):$(conn.port) (Server v$(conn.server_ver), ConnID: $(conn.conn_id))"
            return true
        catch e
            @error "Connection failed: $e"
            conn.connected = false
            conn.socket = nothing
            throw(ConnectionError("Failed to connect to OpenD: $e"))
        end
    end
end

function recv_packet(conn::OpenDConnection; is_init_connect::Bool=false)::Tuple{ProtoHeader, Vector{UInt8}}
    if !is_connected(conn) throw(ConnectionError("Not connected to OpenD")) end

    try
        header_buffer = read(conn.socket, PROTO_HEADER_SIZE)
        if length(header_buffer) < PROTO_HEADER_SIZE
            throw(EOFError())
        end
        header = deserialize_header(header_buffer)

        body_bytes = read(conn.socket, header.body_len)
        if length(body_bytes) < header.body_len
            throw(EOFError())
        end

        # For debugging: check if SHA1 is all zeros (meaning no validation needed)
        is_sha1_empty = all(x -> x == 0, header.body_sha1)

        # Decrypt if needed
        decrypted_body = body_bytes
        if !is_init_connect && !isempty(conn.conn_aes_key)
            if !isempty(conn.conn_aes_iv)
                decrypted_body = decrypt_aes_cbc(body_bytes, conn.conn_aes_key, conn.conn_aes_iv)
            else
                decrypted_body = decrypt_aes_ecb(body_bytes, conn.conn_aes_key)
            end
        end

        # SHA1 validation - only validate if SHA1 is not empty
        # Some responses may not include SHA1 validation
        # For InitConnect, we skip SHA1 validation as it seems to be inconsistent
        if !is_sha1_empty && !is_init_connect
            calculated_sha1 = sha1(decrypted_body)
            if calculated_sha1 != header.body_sha1
                # Try SHA1 of encrypted body as well (some old protocols may use this)
                encrypted_sha1 = sha1(body_bytes)
                if encrypted_sha1 != header.body_sha1
                    @error "Body SHA1 mismatch" header_sha1=bytes2hex(header.body_sha1) decrypted_sha1=bytes2hex(calculated_sha1) encrypted_sha1=bytes2hex(encrypted_sha1)
                    throw(ProtocolError("Body SHA1 mismatch"))
                end
            end
        end

        return header, decrypted_body
    catch e
        if !(e isa InterruptException)
            disconnect!(conn)
        end
        throw(ConnectionError("Failed to receive packet: $e"))
    end
end

Error and Stacktrace:

[ Info: Decrypted body length: 66, first 20 bytes: UInt8[0x08, 0x00, 0x12, 0x00, 0x18, 0x00, 0x22, 0x3a, 0x08, 0x88, 0x07, 0x10, 0xdf, 0xae, 0x97, 0x0e, 0x18, 0xd6, 0x81, 0xb8]
[ Info: Decrypted body hex (first 50 bytes): 08 00 12 00 18 00 22 3a 08 88 07 10 df ae 97 0e 18 d6 81 b8 85 d3 f0 bd b2 66 22 10 31 39 31 43 43 30 43 32 46 44 39 46 38 43 34 42 28 0a 32 10 44 33
┌ Error: Connection failed: ArgumentError("invalid value for Enum WireType: 7")
└ @ FutuAPI.Connection ~/文档/投资/Futu/FutuAPI/src/core/connection.jl:242
ERROR: ConnectionError: Failed to connect to OpenD: ArgumentError("invalid value for Enum WireType: 7")
Stacktrace:
 [1] (::FutuAPI.Connection.var"#5#8"{FutuAPI.Connection.OpenDConnection})()
   @ FutuAPI.Connection ~/文档/投资/Futu/FutuAPI/src/core/connection.jl:245
 [2] lock(f::FutuAPI.Connection.var"#5#8"{FutuAPI.Connection.OpenDConnection}, l::ReentrantLock)
   @ Base ./lock.jl:232
 [3] connect!
   @ ~/文档/投资/Futu/FutuAPI/src/core/connection.jl:140 [inlined]
 [4] connect!(client::OpenDClient)
   @ FutuAPI.Client ~/文档/投资/Futu/FutuAPI/src/core/client.jl:45
 [5] top-level scope
   @ ~/文档/投资/Futu/FutuAPI/test.jl:7

caused by: ArgumentError: invalid value for Enum WireType: 7
Stacktrace:
  [1] enum_argument_error(typename::Symbol, x::UInt32)
    @ Base.Enums ./Enums.jl:93
  [2] WireType
    @ ./Enums.jl:212 [inlined]
  [3] decode_tag
    @ ~/.julia/packages/ProtoBuf/vW0fR/src/codec/decode.jl:4 [inlined]
  [4] decode(d::ProtoBuf.Codecs.ProtoDecoder{IOBuffer, typeof(eof)}, ::Type{FutuAPI.AllProtos.InitConnect.S2C})
    @ FutuAPI.AllProtos.InitConnect ~/文档/投资/Futu/FutuAPI/src/protocol/proto_generated/InitConnect.jl:50
  [5] decode(d::ProtoBuf.Codecs.ProtoDecoder{IOBuffer, typeof(eof)}, ::Type{FutuAPI.AllProtos.InitConnect.Response})
    @ FutuAPI.AllProtos.InitConnect ~/文档/投资/Futu/FutuAPI/src/protocol/proto_generated/InitConnect.jl:105
  [6] (::FutuAPI.Connection.var"#5#8"{FutuAPI.Connection.OpenDConnection})()
    @ FutuAPI.Connection ~/文档/投资/Futu/FutuAPI/src/core/connection.jl:205
  [7] lock(f::FutuAPI.Connection.var"#5#8"{FutuAPI.Connection.OpenDConnection}, l::ReentrantLock)
    @ Base ./lock.jl:232
  [8] connect!
    @ ~/文档/投资/Futu/FutuAPI/src/core/connection.jl:140 [inlined]
  [9] connect!(client::OpenDClient)
    @ FutuAPI.Client ~/文档/投资/Futu/FutuAPI/src/core/client.jl:45
 [10] top-level scope
    @ ~/文档/投资/Futu/FutuAPI/test.jl:7

It‘s the same error with #276, actually I don't know how to solve it.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions