Skip to content

Commit b2b347d

Browse files
feat(interactive): support long_text in interactive (#4522)
Fixes #4526 #4368 --------- Co-authored-by: [email protected] <[email protected]>
1 parent b0b8099 commit b2b347d

File tree

6 files changed

+122
-9
lines changed

6 files changed

+122
-9
lines changed

flex/interactive/sdk/python/gs_interactive/tests/conftest.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1183,6 +1183,34 @@ def import_data_to_full_modern_graph(sess: Session, graph_id: str):
11831183
assert wait_job_finish(sess, job_id)
11841184

11851185

1186+
def import_long_string_data_data_to_vertex_only_modern_graph(
1187+
sess: Session, graph_id: str
1188+
):
1189+
schema_mapping = SchemaMapping.from_dict(modern_graph_vertex_only_import_config)
1190+
# create a long string file under MODERN_GRAPH_DATA_DIR
1191+
long_string_file = os.path.join(MODERN_GRAPH_DATA_DIR, "person_long_string.csv")
1192+
1193+
# By default, the max length of a string property is 256,
1194+
# a string with length 4096 should be enough to test the long string feature
1195+
def generate_large_string(sample: str):
1196+
return sample * 4096
1197+
1198+
with open(long_string_file, "w") as f:
1199+
f.write("id|name|age\n")
1200+
f.write("1|" + generate_large_string("marko") + "|29\n")
1201+
f.write("2|" + generate_large_string("vadas") + "|27\n")
1202+
f.write("4|" + generate_large_string("josh") + "|32\n")
1203+
f.write("6|" + generate_large_string("peter") + "|35\n")
1204+
1205+
schema_mapping.vertex_mappings[0].inputs[0] = "person_long_string.csv"
1206+
resp = sess.bulk_loading(graph_id, schema_mapping)
1207+
assert resp.is_ok()
1208+
job_id = resp.get_value().job_id
1209+
assert wait_job_finish(sess, job_id)
1210+
# return a callable to clean up the long string file
1211+
os.remove(long_string_file)
1212+
1213+
11861214
def import_data_to_full_graph_algo_graph(sess: Session, graph_id: str):
11871215
schema_mapping = SchemaMapping.from_dict(graph_algo_graph_import_config)
11881216
resp = sess.bulk_loading(graph_id, schema_mapping)

flex/interactive/sdk/python/gs_interactive/tests/test_robustness.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@
3838
from gs_interactive.tests.conftest import (
3939
import_data_to_vertex_only_modern_graph_no_wait,
4040
)
41+
from gs_interactive.tests.conftest import (
42+
import_long_string_data_data_to_vertex_only_modern_graph,
43+
)
4144
from gs_interactive.tests.conftest import run_cypher_test_suite
4245
from gs_interactive.tests.conftest import send_get_request_periodically
4346
from gs_interactive.tests.conftest import start_service_on_graph
@@ -540,3 +543,21 @@ def test_create_graph_with_temporal_type(
540543
result = neo4j_session.run("MATCH (n: person) return n.birthday AS birthday;")
541544
records = result.fetch(10)
542545
assert len(records) == 4
546+
547+
548+
def test_graph_with_long_text_property(
549+
interactive_session, neo4j_session, create_vertex_only_modern_graph
550+
):
551+
print("[Test graph with long text property]")
552+
import_long_string_data_data_to_vertex_only_modern_graph(
553+
interactive_session, create_vertex_only_modern_graph
554+
)
555+
start_service_on_graph(interactive_session, create_vertex_only_modern_graph)
556+
ensure_compiler_schema_ready(
557+
interactive_session, neo4j_session, create_vertex_only_modern_graph
558+
)
559+
result = neo4j_session.run("MATCH (n: person) return n.name AS name;")
560+
records = result.fetch(10)
561+
assert len(records) == 4
562+
for record in records:
563+
assert len(record["name"]) > 4096

flex/storages/rt_mutable_graph/loader/abstract_arrow_fragment_loader.cc

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,14 @@ bool check_primary_key_type(std::shared_ptr<arrow::DataType> data_type) {
4040

4141
void set_column_from_string_array(gs::ColumnBase* col,
4242
std::shared_ptr<arrow::ChunkedArray> array,
43-
const std::vector<size_t>& offset) {
43+
const std::vector<size_t>& offset,
44+
bool enable_resize) {
4445
auto type = array->type();
4546
auto size = col->size();
47+
auto typed_col = dynamic_cast<gs::TypedColumn<std::string_view>*>(col);
48+
if (enable_resize) {
49+
CHECK(typed_col != nullptr) << "Only support TypedColumn<std::string_view>";
50+
}
4651
CHECK(type->Equals(arrow::large_utf8()) || type->Equals(arrow::utf8()))
4752
<< "Inconsistent data type, expect string, but got " << type->ToString();
4853
size_t cur_ind = 0;
@@ -62,7 +67,11 @@ void set_column_from_string_array(gs::ColumnBase* col,
6267
if (offset[cur_ind] >= size) {
6368
cur_ind++;
6469
} else {
65-
col->set_any(offset[cur_ind++], std::move(sw));
70+
if (!enable_resize) {
71+
col->set_any(offset[cur_ind++], std::move(sw));
72+
} else {
73+
typed_col->set_value_safe(offset[cur_ind++], std::move(sw));
74+
}
6675
}
6776
}
6877
}
@@ -76,7 +85,11 @@ void set_column_from_string_array(gs::ColumnBase* col,
7685
if (offset[cur_ind] >= size) {
7786
cur_ind++;
7887
} else {
79-
col->set_any(offset[cur_ind++], std::move(sw));
88+
if (!enable_resize) {
89+
col->set_any(offset[cur_ind++], std::move(sw));
90+
} else {
91+
typed_col->set_value_safe(offset[cur_ind++], std::move(sw));
92+
}
8093
}
8194
}
8295
}
@@ -113,7 +126,7 @@ void set_properties_column(gs::ColumnBase* col,
113126
} else if (col_type.type_enum == impl::PropertyTypeImpl::kVarChar) {
114127
set_column_from_string_array(col, array, offset);
115128
} else if (col_type == PropertyType::kStringView) {
116-
set_column_from_string_array(col, array, offset);
129+
set_column_from_string_array(col, array, offset, true);
117130
} else {
118131
LOG(FATAL) << "Not support type: " << type->ToString();
119132
}

flex/storages/rt_mutable_graph/loader/abstract_arrow_fragment_loader.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ void set_column(gs::ColumnBase* col, std::shared_ptr<arrow::ChunkedArray> array,
7272
// For String types.
7373
void set_column_from_string_array(gs::ColumnBase* col,
7474
std::shared_ptr<arrow::ChunkedArray> array,
75-
const std::vector<size_t>& offset);
75+
const std::vector<size_t>& offset,
76+
bool enable_resize = false);
7677

7778
void set_column_from_timestamp_array(gs::ColumnBase* col,
7879
std::shared_ptr<arrow::ChunkedArray> array,

flex/utils/property/column.cc

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,45 @@ std::shared_ptr<ColumnBase> CreateColumn(
218218
}
219219
}
220220

221+
void TypedColumn<std::string_view>::set_value_safe(
222+
size_t idx, const std::string_view& value) {
223+
std::shared_lock<std::shared_mutex> lock(rw_mutex_);
224+
225+
if (idx >= basic_size_ && idx < basic_size_ + extra_size_) {
226+
size_t offset = pos_.fetch_add(value.size());
227+
if (pos_.load() > extra_buffer_.data_size()) {
228+
lock.unlock();
229+
std::unique_lock<std::shared_mutex> w_lock(rw_mutex_);
230+
if (pos_.load() > extra_buffer_.data_size()) {
231+
size_t new_avg_width =
232+
(pos_.load() + idx - basic_size_) / (idx - basic_size_ + 1);
233+
size_t new_len = std::max(extra_size_ * new_avg_width, pos_.load());
234+
extra_buffer_.resize(extra_buffer_.size(), new_len);
235+
}
236+
w_lock.unlock();
237+
lock.lock();
238+
}
239+
extra_buffer_.set(idx - basic_size_, offset, value);
240+
} else if (idx < basic_size_) {
241+
size_t offset = basic_pos_.fetch_add(value.size());
242+
if (basic_pos_.load() > basic_buffer_.data_size()) {
243+
lock.unlock();
244+
std::unique_lock<std::shared_mutex> w_lock(rw_mutex_);
245+
if (basic_pos_.load() > basic_buffer_.data_size()) {
246+
size_t new_avg_width = (basic_pos_.load() + idx) / (idx + 1);
247+
size_t new_len =
248+
std::max(basic_size_ * new_avg_width, basic_pos_.load());
249+
basic_buffer_.resize(basic_buffer_.size(), new_len);
250+
}
251+
w_lock.unlock();
252+
lock.lock();
253+
}
254+
basic_buffer_.set(idx, offset, value);
255+
} else {
256+
LOG(FATAL) << "Index out of range";
257+
}
258+
}
259+
221260
std::shared_ptr<RefColumnBase> CreateRefColumn(
222261
std::shared_ptr<ColumnBase> column) {
223262
auto type = column->type();

flex/utils/property/column.h

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#ifndef GRAPHSCOPE_PROPERTY_COLUMN_H_
1717
#define GRAPHSCOPE_PROPERTY_COLUMN_H_
1818

19+
#include <shared_mutex>
1920
#include <string>
2021
#include <string_view>
2122
#include "grape/utils/concurrent_queue.h"
@@ -345,9 +346,14 @@ class TypedColumn<grape::EmptyType> : public ColumnBase {
345346
template <>
346347
class TypedColumn<std::string_view> : public ColumnBase {
347348
public:
348-
TypedColumn(StorageStrategy strategy,
349-
uint16_t width = PropertyType::GetStringDefaultMaxLength())
350-
: strategy_(strategy), width_(width) {}
349+
TypedColumn(StorageStrategy strategy, uint16_t width)
350+
: strategy_(strategy),
351+
width_(width),
352+
type_(PropertyType::Varchar(width_)) {}
353+
TypedColumn(StorageStrategy strategy)
354+
: strategy_(strategy),
355+
width_(PropertyType::GetStringDefaultMaxLength()),
356+
type_(PropertyType::kStringView) {}
351357
~TypedColumn() { close(); }
352358

353359
void open(const std::string& name, const std::string& snapshot_dir,
@@ -478,6 +484,7 @@ class TypedColumn<std::string_view> : public ColumnBase {
478484
size_t size() const override { return basic_size_ + extra_size_; }
479485

480486
void resize(size_t size) override {
487+
std::unique_lock<std::shared_mutex> lock(rw_mutex_);
481488
if (size < basic_buffer_.size()) {
482489
basic_size_ = size;
483490
extra_size_ = 0;
@@ -504,7 +511,7 @@ class TypedColumn<std::string_view> : public ColumnBase {
504511
}
505512
}
506513

507-
PropertyType type() const override { return PropertyType::Varchar(width_); }
514+
PropertyType type() const override { return type_; }
508515

509516
void set_value(size_t idx, const std::string_view& val) {
510517
auto copied_val = val;
@@ -547,6 +554,8 @@ class TypedColumn<std::string_view> : public ColumnBase {
547554
}
548555
}
549556

557+
void set_value_safe(size_t idx, const std::string_view& value);
558+
550559
inline std::string_view get_view(size_t idx) const {
551560
return idx < basic_size_ ? basic_buffer_.get(idx)
552561
: extra_buffer_.get(idx - basic_size_);
@@ -584,7 +593,9 @@ class TypedColumn<std::string_view> : public ColumnBase {
584593
std::atomic<size_t> pos_;
585594
std::atomic<size_t> basic_pos_;
586595
StorageStrategy strategy_;
596+
std::shared_mutex rw_mutex_;
587597
uint16_t width_;
598+
PropertyType type_;
588599
};
589600

590601
using StringColumn = TypedColumn<std::string_view>;

0 commit comments

Comments
 (0)