diff --git a/source/dnode/mnode/impl/src/mndXnode.c b/source/dnode/mnode/impl/src/mndXnode.c index 53571c978572..8f342bdce6f9 100644 --- a/source/dnode/mnode/impl/src/mndXnode.c +++ b/source/dnode/mnode/impl/src/mndXnode.c @@ -18,13 +18,16 @@ #include "mndDef.h" #include "tdatablock.h" #include "types.h" -#ifndef WINDOWS #include +#ifndef WINDOWS #include #include #include #include #include +#else +#include "tsha.h" +#include "tbase64.h" #endif #include "audit.h" #include "mndDnode.h" @@ -39,7 +42,11 @@ #include "xnode.h" #define TSDB_XNODE_RESERVE_SIZE 64 +#ifdef WINDOWS +#define XNODED_PIPE_SOCKET_URL "http://localhost:6051" +#else #define XNODED_PIPE_SOCKET_URL "http://localhost" +#endif typedef enum { HTTP_TYPE_GET = 0, HTTP_TYPE_POST, @@ -2701,14 +2708,14 @@ int32_t mndXnodeUserPassActionUpdate(SSdb *pSdb, SXnodeUserPassObj *pOld, SXnode tmp = pOld->token; pOld->token = pNew->token; pNew->token = tmp; - + // swapFields(&pNew->userLen, &pNew->user, &pOld->userLen, &pOld->user); // swapFields(&pNew->passLen, &pNew->pass, &pOld->passLen, &pOld->pass); // swapFields(&pNew->tokenLen, &pNew->token, &pOld->tokenLen, &pOld->token); // SXnodeUserPassObj* tmp = pNew; // pNew = pOld; // pOld = tmp; - + taosWUnLockLatch(&pOld->lock); return 0; } @@ -3862,7 +3869,6 @@ static size_t taosCurlWriteData(char *pCont, size_t contLen, size_t nmemb, void } } -#ifndef WINDOWS static int32_t taosCurlGetRequest(const char *url, SCurlResp *pRsp, int32_t timeout, const char *socketPath) { CURL *curl = NULL; int32_t code = 0; @@ -3874,7 +3880,9 @@ static int32_t taosCurlGetRequest(const char *url, SCurlResp *pRsp, int32_t time return -1; } + #ifndef WINDOWS TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_UNIX_SOCKET_PATH, socketPath), &lino, _OVER); + #endif TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_URL, url), &lino, _OVER); TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, taosCurlWriteData), &lino, _OVER); TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_WRITEDATA, pRsp), &lino, _OVER); @@ -3919,7 +3927,9 @@ static int32_t taosCurlPostRequest(const char *url, SCurlResp *pRsp, const char } headers = curl_slist_append(headers, "Content-Type:application/json;charset=UTF-8"); + #ifndef WINDOWS TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_UNIX_SOCKET_PATH, socketPath), &lino, _OVER); + #endif TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers), &lino, _OVER); TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_URL, url), &lino, _OVER); TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, taosCurlWriteData), &lino, _OVER); @@ -3972,7 +3982,9 @@ static int32_t taosCurlDeleteRequest(const char *url, SCurlResp *pRsp, int32_t t return -1; } + #ifndef WINDOWS if (curl_easy_setopt(curl, CURLOPT_UNIX_SOCKET_PATH, socketPath)) goto _OVER; + #endif if (curl_easy_setopt(curl, CURLOPT_URL, url) != 0) goto _OVER; if (curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE") != 0) goto _OVER; if (curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, taosCurlWriteData) != 0) goto _OVER; @@ -4003,25 +4015,20 @@ static int32_t taosCurlDeleteRequest(const char *url, SCurlResp *pRsp, int32_t t XND_LOG_END(code, lino); return code; } -#else -static int32_t taosCurlGetRequest(const char *url, SCurlResp *pRsp, int32_t timeout, const char *socketPath) { return 0; } -static int32_t taosCurlPostRequest(const char *url, SCurlResp *pRsp, const char *buf, int32_t bufLen, int32_t timeout, - const char *socketPath) { - return 0; -} -static int32_t taosCurlDeleteRequest(const char *url, SCurlResp *pRsp, int32_t timeout, const char *socketPath) { return 0; } -#endif + SJson *mndSendReqRetJson(const char *url, EHttpType type, int64_t timeout, const char *buf, int64_t bufLen) { SJson *pJson = NULL; SCurlResp curlRsp = {0}; char socketPath[PATH_MAX] = {0}; getXnodedPipeName(socketPath, sizeof(socketPath)); + #ifndef WINDOWS if (!taosCheckExistFile(socketPath)) { uError("xnode failed to send request, socket path:%s not exist", socketPath); terrno = TSDB_CODE_MND_XNODE_URL_CANT_ACCESS; goto _EXIT; } + #endif if (type == HTTP_TYPE_GET) { if ((terrno = taosCurlGetRequest(url, &curlRsp, timeout, socketPath)) != 0) { goto _OVER; @@ -4352,7 +4359,6 @@ static int32_t mndCheckXnodeAgentExists(SMnode *pMnode, const char *name) { return TSDB_CODE_SUCCESS; } -#ifndef WINDOWS typedef struct { int64_t sub; // agent ID int64_t iat; // issued at time @@ -4362,6 +4368,8 @@ const unsigned char MNDXNODE_DEFAULT_SECRET[] = {126, 222, 130, 137, 43, 122, 4 138, 153, 244, 251, 99, 50, 55, 140, 238, 218, 232, 15, 161, 226, 54, 130, 40, 211, 234, 111, 171}; +#ifndef WINDOWS + agentTokenField mndXnodeCreateAgentTokenField(long agent_id, time_t issued_at) { agentTokenField field = {0}; field.sub = agent_id; @@ -4653,18 +4661,264 @@ char *mndXnodeCreateAgentToken(const agentTokenField *claims, const unsigned cha return token; } +#else + +static int32_t mndXnodeHmacSha256Windows(const unsigned char *key, size_t key_len, const unsigned char *message, + size_t message_len, unsigned char *output) { + unsigned char k_pad[64]; + unsigned char k_ipad[64]; + unsigned char k_opad[64]; + unsigned char inner_hash[SHA256_DIGEST_SIZE]; + sha256_ctx ctx; + + if (key_len > 64) { + sha256_init(&ctx); + sha256_update(&ctx, key, key_len); + sha256_final(&ctx, k_pad); + memset(k_pad + SHA256_DIGEST_SIZE, 0, 64 - SHA256_DIGEST_SIZE); + } else { + memcpy(k_pad, key, key_len); + memset(k_pad + key_len, 0, 64 - key_len); + } + + for (int i = 0; i < 64; i++) { + k_ipad[i] = k_pad[i] ^ 0x36; + k_opad[i] = k_pad[i] ^ 0x5c; + } + + sha256_init(&ctx); + sha256_update(&ctx, k_ipad, 64); + sha256_update(&ctx, message, message_len); + sha256_final(&ctx, inner_hash); + + sha256_init(&ctx); + sha256_update(&ctx, k_opad, 64); + sha256_update(&ctx, inner_hash, SHA256_DIGEST_SIZE); + sha256_final(&ctx, output); + + return 0; +} + +static char *mndXnodeBase64UrlEncodeWindows(const unsigned char *input, size_t input_len) { + char *base64_str = NULL; + + int32_t ret = base64_encode(input, input_len, &base64_str); + if (ret != 0 || !base64_str) { + return NULL; + } + + for (size_t i = 0; base64_str[i] != '\0'; i++) { + if (base64_str[i] == '+') { + base64_str[i] = '-'; + } else if (base64_str[i] == '/') { + base64_str[i] = '_'; + } + } + + size_t len = strlen(base64_str); + while (len > 0 && base64_str[len - 1] == '=') { + base64_str[len - 1] = '\0'; + len--; + } + + return base64_str; +} + +static char *mndXnodeCreateTokenHeaderWindows() { + int32_t code = 0, lino = 0; + cJSON *headerJson = NULL; + char *headerJsonStr = NULL; + char *encoded = NULL; + + headerJson = tjsonCreateObject(); + if (!headerJson) { + code = terrno; + goto _exit; + } + + TAOS_CHECK_EXIT(tjsonAddStringToObject(headerJson, "alg", "HS256")); + TAOS_CHECK_EXIT(tjsonAddStringToObject(headerJson, "typ", "JWT")); + + headerJsonStr = tjsonToUnformattedString(headerJson); + if (!headerJsonStr) { + code = terrno; + goto _exit; + } + encoded = mndXnodeBase64UrlEncodeWindows((const unsigned char *)headerJsonStr, strlen(headerJsonStr)); + if (!encoded) { + code = terrno; + goto _exit; + } + +_exit: + if (code != TSDB_CODE_SUCCESS) { + mError("xnode agent: line: %d failed to create header since %s", lino, tstrerror(code)); + taosMemoryFree(encoded); + encoded = NULL; + } + + if (headerJsonStr) { + taosMemoryFree(headerJsonStr); + } + if (headerJson) { + tjsonDelete(headerJson); + } + + return encoded; +} + +static char *mndXnodeCreateTokenPayloadWindows(const agentTokenField *claims) { + int32_t code = 0, lino = 0; + cJSON *payloadJson = NULL; + char *payloadStr = NULL; + char *encoded = NULL; + + if (!claims) { + code = TSDB_CODE_INVALID_PARA; + terrno = code; + return NULL; + } + + payloadJson = tjsonCreateObject(); + if (!payloadJson) { + code = terrno; + goto _exit; + } + + TAOS_CHECK_EXIT(tjsonAddDoubleToObject(payloadJson, "iat", claims->iat)); + TAOS_CHECK_EXIT(tjsonAddDoubleToObject(payloadJson, "sub", claims->sub)); + + payloadStr = tjsonToUnformattedString(payloadJson); + if (!payloadStr) { + code = terrno; + goto _exit; + } + encoded = mndXnodeBase64UrlEncodeWindows((const unsigned char *)payloadStr, strlen(payloadStr)); + if (!encoded) { + code = terrno; + goto _exit; + } + +_exit: + if (code != TSDB_CODE_SUCCESS) { + mError("xnode agent line: %d failed to create payload since %s", lino, tstrerror(code)); + taosMemoryFree(encoded); + encoded = NULL; + } + if (payloadStr) { + taosMemoryFree(payloadStr); + } + if (payloadJson) { + tjsonDelete(payloadJson); + } + return encoded; +} + +static char *mndXnodeCreateTokenSignatureWindows(const char *header_payload, const unsigned char *secret, + size_t secret_len) { + int32_t code = 0, lino = 0; + unsigned char hash[SHA256_DIGEST_SIZE] = {0}; + char *encoded = NULL; + + int32_t ret = mndXnodeHmacSha256Windows(secret, secret_len, (const unsigned char *)header_payload, + strlen(header_payload), hash); + if (ret != 0) { + code = terrno; + goto _exit; + } + + encoded = mndXnodeBase64UrlEncodeWindows(hash, SHA256_DIGEST_SIZE); + if (!encoded) { + code = terrno; + goto _exit; + } + +_exit: + if (code != TSDB_CODE_SUCCESS) { + mError("xnode agent line: %d failed create signature since %s", lino, tstrerror(code)); + taosMemoryFree(encoded); + encoded = NULL; + } + return encoded; +} + +char *mndXnodeCreateAgentToken(const agentTokenField *claims, const unsigned char *secret, size_t secret_len) { + int32_t code = 0, lino = 0; + char *header = NULL, *payload = NULL; + char *headerPayload = NULL; + char *signature = NULL; + char *token = NULL; + + if (!claims) { + code = TSDB_CODE_INVALID_PARA; + goto _exit; + } + + if (!secret || secret_len == 0) { + secret = MNDXNODE_DEFAULT_SECRET; + secret_len = sizeof(MNDXNODE_DEFAULT_SECRET); + } + + header = mndXnodeCreateTokenHeaderWindows(); + if (!header) { + code = terrno; + goto _exit; + } + + payload = mndXnodeCreateTokenPayloadWindows(claims); + if (!payload) { + code = terrno; + goto _exit; + } + + size_t header_payload_len = strlen(header) + strlen(payload) + 2; + headerPayload = taosMemoryMalloc(header_payload_len); + if (!headerPayload) { + code = terrno; + goto _exit; + } + snprintf(headerPayload, header_payload_len, "%s.%s", header, payload); + + signature = mndXnodeCreateTokenSignatureWindows(headerPayload, secret, secret_len); + if (!signature) { + code = terrno; + goto _exit; + } + + size_t token_len = strlen(headerPayload) + strlen(signature) + 2; + token = taosMemoryCalloc(1, token_len); + if (!token) { + code = terrno; + goto _exit; + } + + snprintf(token, token_len, "%s.%s", headerPayload, signature); + +_exit: + if (code != TSDB_CODE_SUCCESS) { + mError("xnode agent line: %d failed create token since %s", lino, tstrerror(code)); + taosMemoryFree(token); + token = NULL; + } + taosMemoryFree(signature); + taosMemoryFree(headerPayload); + taosMemoryFree(payload); + taosMemoryFree(header); + + return token; +} + #endif int32_t mndXnodeGenAgentToken(const SXnodeAgentObj *pAgent, char *pTokenBuf) { int32_t code = 0, lino = 0; - #ifndef WINDOWS - // char *token = - // "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE3Njc1OTc3NzIsInN1YiI6MTIzNDV9.i7HvYf_S-yWGEExDzQESPUwVX23Ok_" - // "7Fxo93aqgKrtw"; + agentTokenField claims = { .iat = pAgent->createTime, .sub = pAgent->id, }; + // token be like: + // "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE3Njc1OTc3NzIsInN1YiI6MTIzNDV9.i7HvYf_S-yWGEExDzQESPUwVX23Ok_7Fxo93aqgKrtw" char *token = mndXnodeCreateAgentToken(&claims, MNDXNODE_DEFAULT_SECRET, sizeof(MNDXNODE_DEFAULT_SECRET)); if (!token) { code = terrno; @@ -4678,7 +4932,6 @@ int32_t mndXnodeGenAgentToken(const SXnodeAgentObj *pAgent, char *pTokenBuf) { mError("xnode agent line: %d failed gen token since %s", lino, tstrerror(code)); } taosMemoryFree(token); - #endif TAOS_RETURN(code); } diff --git a/source/dnode/xnode/src/xnode.c b/source/dnode/xnode/src/xnode.c index 445fbadc82b9..b9462a4583ae 100644 --- a/source/dnode/xnode/src/xnode.c +++ b/source/dnode/xnode/src/xnode.c @@ -74,15 +74,19 @@ int32_t mndOpenXnd(const SXnodeOpt *pOption) { void mndCloseXnd() { xnodeMgmtStopXnoded(); } void getXnodedPipeName(char *pipeName, int32_t size) { -#ifdef _WIN32 - snprintf(pipeName, size, "%s.%x", XNODED_MGMT_LISTEN_PIPE_NAME_PREFIX, MurmurHash3_32(tsDataDir, strlen(tsDataDir))); -#else int32_t len = strlen(tsDataDir); - if (len > 0 && tsDataDir[len - 1] != '/') { + if (len > 0 && (tsDataDir[len - 1] != '/' && tsDataDir[len - 1] != '\\')) { + #ifdef _WIN32 + snprintf(pipeName, size, "%s\\.%s.%x", tsDataDir, XNODED_MGMT_LISTEN_PIPE_NAME_PREFIX, MurmurHash3_32(tsDataDir, strlen(tsDataDir))); + #else snprintf(pipeName, size, "%s/%s", tsDataDir, XNODED_MGMT_LISTEN_PIPE_NAME_PREFIX); + #endif } else { + #ifdef _WIN32 + snprintf(pipeName, size, "%s.%s.%x", tsDataDir, XNODED_MGMT_LISTEN_PIPE_NAME_PREFIX, MurmurHash3_32(tsDataDir, strlen(tsDataDir))); + #else snprintf(pipeName, size, "%s%s", tsDataDir, XNODED_MGMT_LISTEN_PIPE_NAME_PREFIX); + #endif } -#endif xndDebug("xnode get unix socket pipe path:%s", pipeName); } diff --git a/source/libs/txnode/src/txnodeMgmt.c b/source/libs/txnode/src/txnodeMgmt.c index b32297bf9533..e2069c720033 100644 --- a/source/libs/txnode/src/txnodeMgmt.c +++ b/source/libs/txnode/src/txnodeMgmt.c @@ -63,16 +63,17 @@ SXnodedData xnodedGlobal = {0}; static int32_t xnodeMgmtSpawnXnoded(SXnodedData *pData); static void getXnodedPidPath(char *pipeName, int32_t size) { -#ifdef _WIN32 - snprintf(pipeName, size, "%s%s", tsDataDir, XNODED_XNODED_PID_NAME); -#else int32_t len = strlen(tsDataDir); - if (len > 0 && tsDataDir[len - 1] != '/') { + + if (len > 0 && (tsDataDir[len - 1] != '/' && tsDataDir[len - 1] != '\\')) { + #ifdef _WIN32 + snprintf(pipeName, size, "%s\\%s", tsDataDir, XNODED_XNODED_PID_NAME); + #else snprintf(pipeName, size, "%s/%s", tsDataDir, XNODED_XNODED_PID_NAME); + #endif } else { snprintf(pipeName, size, "%s%s", tsDataDir, XNODED_XNODED_PID_NAME); } -#endif xndDebug("xnode get xnoded pid path:%s", pipeName); } @@ -318,7 +319,7 @@ static int32_t xnodeMgmtSpawnXnoded(SXnodedData *pData) { } tstrncpy(envXnodedWithPEnv[numEnviron + j], envXnoded[i], len); j++; - } + } } envXnodedWithPEnv[numEnviron + lenEnvXnoded - 1] = NULL;