Skip to content

Commit de901df

Browse files
committed
fix (node-delta-sharing): Fix issue with storing read delta sharing tables
Signed-off-by: Ritesh.K <riteshkarki6@gmail.com>
1 parent 6ad4419 commit de901df

File tree

2 files changed

+56
-30
lines changed

2 files changed

+56
-30
lines changed

include/villas/nodes/delta_sharing/delta_sharing.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ struct delta_sharing {
4848
shares;
4949

5050
enum class TableOp { TABLE_NOOP, TABLE_READ, TABLE_WRITE } table_op;
51+
52+
size_t current_row;
5153
};
5254

5355
char *deltaSharing_print(NodeCompat *n);

lib/nodes/delta_sharing/delta_sharing.cpp

Lines changed: 54 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include <arrow/array/array_base.h>
1616
#include <arrow/array/array_binary.h>
1717
#include <arrow/builder.h>
18+
#include <arrow/scalar.h>
1819
#include <arrow/table.h>
1920
#include <arrow/type.h>
2021
#include <arrow/type_fwd.h>
@@ -28,6 +29,8 @@
2829
#include <villas/nodes/delta_sharing/protocol.hpp>
2930
#include <villas/timing.hpp>
3031

32+
#include "villas/log.hpp"
33+
3134
using namespace villas;
3235
using namespace villas::node;
3336

@@ -152,6 +155,7 @@ int villas::node::deltaSharing_init(NodeCompat *n) {
152155
// d->cache_dir = "";
153156
// d->table_path = "";
154157
d->batch_size = 0;
158+
d->current_row = 0;
155159

156160
d->client.reset();
157161
d->table_ptr.reset();
@@ -226,12 +230,12 @@ int villas::node::deltaSharing_read(NodeCompat *n, struct Sample *const smps[],
226230
d->table_ptr = d->client->LoadAsArrowTable(files->at(0).url);
227231

228232
if (!d->table_ptr) {
229-
n->logger->error("Failed to laod table from Delta Sharing server");
233+
n->logger->error("Failed to load table from Delta Sharing server");
234+
return -1;
230235
}
231236
}
232237

233-
unsigned samples_read = 0;
234-
auto num_rows = d->table_ptr->num_rows();
238+
unsigned num_rows = d->table_ptr->num_rows();
235239
unsigned num_cols = d->table_ptr->num_columns();
236240

237241
auto signals = n->getInputSignals(false);
@@ -240,54 +244,74 @@ int villas::node::deltaSharing_read(NodeCompat *n, struct Sample *const smps[],
240244
return -1;
241245
}
242246

243-
for (unsigned i = 0; i < cnt && i < num_rows; i++) {
244-
auto *smp = smps[i];
245-
n->logger->info("Row name {}", d->table_ptr->ColumnNames().at(3));
246-
smp->length = signals->size();
247-
smp->capacity = signals->size();
247+
unsigned samples_read = 0;
248+
while (samples_read < cnt && d->current_row < num_rows) {
249+
auto *smp = smps[samples_read];
250+
// Set smp length and capacity to the number of columns in the table.
251+
smp->length = d->table_ptr->num_columns();
252+
smp->capacity = d->table_ptr->num_columns();
248253
smp->ts.origin = time_now();
254+
smp->flags = (int)SampleFlags::HAS_DATA;
255+
smp->sequence = d->current_row;
249256

250257
for (unsigned col = 0; col < num_cols && col < signals->size(); col++) {
251258
auto chunked_array = d->table_ptr->column(col);
252-
auto first_chunk = chunked_array->chunk(0);
253-
switch (first_chunk->type_id()) {
259+
auto scalar_result = chunked_array->GetScalar(d->current_row);
260+
261+
if (!scalar_result.ok()) {
262+
n->logger->warn("Failed to get scalar at row {}, col {}: {}",
263+
d->current_row, col,
264+
scalar_result.status().ToString());
265+
continue;
266+
}
267+
268+
auto scalar = *scalar_result;
269+
auto sig_type = signals->at(col)->type;
270+
271+
switch (scalar->type->id()) {
254272
case arrow::Type::DOUBLE: {
255-
auto double_array =
256-
std::static_pointer_cast<arrow::DoubleArray>(first_chunk);
257-
smp->data[col].f = double_array->Value(i);
273+
auto double_scalar =
274+
std::static_pointer_cast<arrow::DoubleScalar>(scalar)->value;
275+
smp->data[col].f = double_scalar;
258276
break;
259277
}
260278
case arrow::Type::FLOAT: {
261-
auto float_array =
262-
std::static_pointer_cast<arrow::FloatArray>(first_chunk);
263-
smp->data[col].f = float_array->Value(i);
279+
auto float_scalar =
280+
std::static_pointer_cast<arrow::FloatScalar>(scalar)->value;
281+
smp->data[col].f = float_scalar;
282+
break;
264283
}
265284
case arrow::Type::INT64: {
266-
auto int_array =
267-
std::static_pointer_cast<arrow::Int64Array>(first_chunk);
268-
smp->data[col].i = int_array->Value(i);
285+
auto int64_scalar =
286+
std::static_pointer_cast<arrow::Int64Scalar>(scalar)->value;
287+
smp->data[col].f = int64_scalar;
269288
break;
270289
}
271290
case arrow::Type::INT32: {
272-
auto int_array =
273-
std::static_pointer_cast<arrow::Int32Array>(first_chunk);
274-
smp->data[col].i = int_array->Value(i);
291+
auto int32_scalar =
292+
std::static_pointer_cast<arrow::Int32Scalar>(scalar)->value;
293+
smp->data[col].f = int32_scalar;
275294
break;
276295
}
277-
/* case arrow::Type::STRING: {
278-
auto string_array =
279-
std::static_pointer_cast<arrow::StringArray>(first_chunk);
280-
smp->data[col].
281-
} */
282296
default:
283-
n->logger->warn("Unsupported data type for column {}", col);
284-
smp->data[col].f = 0.0;
297+
n->logger->warn("Unsupported arrow data type for column {}", col);
298+
if (sig_type == SignalType::FLOAT)
299+
smp->data[col].f = 0.0;
300+
else if (sig_type == SignalType::INTEGER)
301+
smp->data[col].i = 0;
285302
}
286303
}
304+
d->current_row++;
287305
samples_read++;
288306
}
289307

290-
n->logger->debug("Read {} samples from Delta Sharing table", samples_read);
308+
if (samples_read < cnt && d->current_row >= num_rows) {
309+
n->logger->info("End of table reached at row {}", d->current_row);
310+
}
311+
312+
n->logger->debug(
313+
"Read {} samples from Delta Sharing table (current row: {})",
314+
samples_read, d->current_row);
291315
return samples_read;
292316
} catch (const std::exception &e) {
293317
n->logger->error("Error reading from Delta Sharing table: {}", e.what());

0 commit comments

Comments
 (0)