Skip to content

Commit 8136701

Browse files
authored
[fix](sql function) Fix the unix_timestamp return value type in Nereids (#57424)
1 parent dcec829 commit 8136701

File tree

6 files changed

+102
-0
lines changed

6 files changed

+102
-0
lines changed

fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,7 @@ public TPipelineFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdInde
311311
? taskInfo.isMemtableOnSinkNode()
312312
: false;
313313
queryOptions.setEnableMemtableOnSinkNode(enableMemtableOnSinkNode);
314+
queryOptions.setNewVersionUnixTimestamp(true);
314315
params.setQueryOptions(queryOptions);
315316
TQueryGlobals queryGlobals = new TQueryGlobals();
316317
queryGlobals.setNowString(TimeUtils.getDatetimeFormatWithTimeZone().format(LocalDateTime.now()));

fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,7 @@ public static CoordinatorContext buildForLoad(
306306
queryOptions.setEnableProfile(enableProfile);
307307
queryOptions.setProfileLevel(2);
308308
queryOptions.setBeExecVersion(Config.be_exec_version);
309+
queryOptions.setNewVersionUnixTimestamp(true);
309310

310311
TQueryGlobals queryGlobals = new TQueryGlobals();
311312
queryGlobals.setNowString(TimeUtils.getDatetimeFormatWithTimeZone().format(LocalDateTime.now()));

fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,7 @@ private void setForInsert(long jobId) {
487487
this.coordinatorContext.setJobProcessor(jobProc);
488488
// Set this field to true to avoid data entering the normal cache LRU queue
489489
this.coordinatorContext.queryOptions.setDisableFileCache(true);
490+
this.coordinatorContext.queryOptions.setNewVersionUnixTimestamp(true);
490491
}
491492

492493
private void setForQuery() {

fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2174,6 +2174,7 @@ private HttpStreamParams initHttpStreamPlan(TStreamLoadPutRequest request, Conne
21742174
: false;
21752175
coord.getQueryOptions().setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
21762176
}
2177+
// coord.getQueryOptions().setNewVersionUnixTimestamp(true);
21772178
httpStreamParams.setParams(coord.getStreamLoadPlan());
21782179
} catch (UserException e) {
21792180
LOG.warn("exec sql error", e);
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{"a": "foo", "b": "bar"}
2+
{"a": "baz", "b": "qux"}
3+
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
suite("test_unix_timestamp_func_load") {
19+
def s3BucketName = getS3BucketName()
20+
def s3Endpoint = getS3Endpoint()
21+
def s3Region = getS3Region()
22+
def ak = getS3AK()
23+
def sk = getS3SK()
24+
25+
sql """
26+
CREATE TABLE IF NOT EXISTS t (
27+
id BIGINT
28+
)
29+
PROPERTIES("replication_num" = "1");
30+
"""
31+
32+
try {
33+
streamLoad {
34+
table "t"
35+
set 'format', 'json'
36+
set 'columns', 'a,b,id=unix_timestamp()'
37+
file 'data_by_line.json'
38+
time 10000
39+
40+
check { result, exception, startTime, endTime ->
41+
if (exception != null) {
42+
throw exception
43+
}
44+
log.info("Stream load result: ${result}".toString())
45+
def json = parseJson(result)
46+
assertEquals("success", json.Status.toLowerCase())
47+
}
48+
}
49+
50+
def res1 = sql """
51+
insert into t
52+
select unix_timestamp();
53+
"""
54+
log.info("select frm s3 result: ${res1}".toString())
55+
assertTrue(res1.size() == 1)
56+
57+
def label = "s3_load_default_" + UUID.randomUUID().toString().replaceAll("-", "")
58+
sql """
59+
LOAD LABEL ${label} (
60+
DATA INFILE("s3://${s3BucketName}/load/data_by_line.json")
61+
INTO TABLE t
62+
FORMAT AS "json"
63+
(a, b)
64+
SET (
65+
id = unix_timestamp()
66+
)
67+
)
68+
WITH S3 (
69+
"s3.access_key" = "${ak}",
70+
"s3.secret_key" = "${sk}",
71+
"s3.endpoint" = "${s3Endpoint}",
72+
"s3.region" = "${s3Region}"
73+
);
74+
"""
75+
76+
// Wait for load to complete
77+
def max_try_time = 60000
78+
while (max_try_time > 0) {
79+
def result = sql "SHOW LOAD WHERE label = '${label}'"
80+
if (result[0][2] == "FINISHED") {
81+
break
82+
} else if (result[0][2] == "CANCELLED") {
83+
throw new Exception("Load job cancelled: " + result[0][7])
84+
}
85+
Thread.sleep(1000)
86+
max_try_time -= 1000
87+
if (max_try_time <= 0) {
88+
throw new Exception("Load job timeout")
89+
}
90+
}
91+
} finally {
92+
try_sql("DROP TABLE IF EXISTS t")
93+
}
94+
95+
}

0 commit comments

Comments
 (0)