Skip to content

Commit 104e918

Browse files
authored
Add Plasma Metrics API (#4)
* add jni metrics interface * add metrics related protocol, update fbs file and generated header file * add Plasma metrics message functions * impl metrcis method (server side) * impl metrics method (client side) * impl metrics method (jni layer) * add ut * fix code style
1 parent 2393932 commit 104e918

File tree

13 files changed

+353
-5
lines changed

13 files changed

+353
-5
lines changed

cpp/src/plasma/client.cc

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,8 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
276276

277277
int64_t store_capacity() { return store_capacity_; }
278278

279+
Status Metrics(PlasmaMetrics* metrics);
280+
279281
private:
280282
/// Check if store_fd has already been received from the store. If yes,
281283
/// return it. Otherwise, receive it from the store (see analogous logic
@@ -1114,6 +1116,14 @@ std::string PlasmaClient::Impl::DebugString() {
11141116
return debug_string;
11151117
}
11161118

1119+
Status PlasmaClient::Impl::Metrics(PlasmaMetrics* metrics) {
1120+
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
1121+
RETURN_NOT_OK(SendMetricsRequest(store_conn_));
1122+
std::vector<uint8_t> buffer;
1123+
RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaMetricsReply, &buffer));
1124+
return ReadMetricsReply(buffer.data(), buffer.size(), metrics);
1125+
}
1126+
11171127
// ----------------------------------------------------------------------
11181128
// PlasmaClient
11191129

@@ -1221,4 +1231,6 @@ bool PlasmaClient::IsInUse(const ObjectID& object_id) {
12211231

12221232
int64_t PlasmaClient::store_capacity() { return impl_->store_capacity(); }
12231233

1234+
Status PlasmaClient::Metrics(PlasmaMetrics* metrics) { return impl_->Metrics(metrics); }
1235+
12241236
} // namespace plasma

cpp/src/plasma/client.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,15 @@ class ARROW_EXPORT PlasmaClient {
293293
/// \return Memory capacity of the store in bytes.
294294
int64_t store_capacity();
295295

296+
/// Get PlasmaStore memory usage metrics.
297+
///
298+
/// This API is experimental and might change in the future.
299+
///
300+
/// \param[out] metrics PlasmaStore memory uasge, including total share memory,
301+
/// used share memory, total external memory, used external memory.
302+
/// \return The return status.
303+
Status Metrics(PlasmaMetrics* metrics);
304+
296305
private:
297306
friend class PlasmaBuffer;
298307
friend class PlasmaMutableBuffer;

cpp/src/plasma/common.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,14 @@ typedef std::unordered_map<ObjectID, std::unique_ptr<ObjectTableEntry>> ObjectTa
145145
/// by making it possible to pass a context object through dlmalloc.
146146
struct PlasmaStoreInfo;
147147
extern const PlasmaStoreInfo* plasma_config;
148+
149+
/// This type is used to transfer plasma metrics between server and client.
150+
struct PlasmaMetrics {
151+
int64_t share_mem_total;
152+
int64_t share_mem_used;
153+
int64_t external_total;
154+
int64_t external_used;
155+
};
148156
} // namespace plasma
149157

150158
namespace std {

cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,3 +261,27 @@ Java_org_apache_arrow_plasma_PlasmaClientJNI_list(JNIEnv* env, jclass cls, jlong
261261

262262
return ret;
263263
}
264+
265+
JNIEXPORT jint JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_metrics(
266+
JNIEnv* env, jclass cls, jlong conn, jlongArray result) {
267+
plasma::PlasmaClient* client = reinterpret_cast<plasma::PlasmaClient*>(conn);
268+
int64_t* metrics;
269+
plasma::PlasmaMetrics plasmaMetrics;
270+
client->Metrics(&plasmaMetrics);
271+
if (result != nullptr) {
272+
metrics = (int64_t*)env->GetPrimitiveArrayCritical(result, 0);
273+
}
274+
if (metrics == nullptr) {
275+
return -1;
276+
}
277+
metrics[0] = plasmaMetrics.share_mem_total;
278+
metrics[1] = plasmaMetrics.share_mem_used;
279+
metrics[2] = plasmaMetrics.external_total;
280+
metrics[3] = plasmaMetrics.external_used;
281+
282+
if (result != nullptr) {
283+
env->ReleasePrimitiveArrayCritical(result, (void*)metrics, 0);
284+
}
285+
286+
return 0;
287+
}

cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,15 @@ JNIEXPORT jobjectArray JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_list
135135
jclass,
136136
jlong);
137137

138+
/*
139+
* Class: org_apache_arrow_plasma_PlasmaClientJNI
140+
* Method: metrics
141+
* Signature: (J[J)I
142+
*/
143+
JNIEXPORT jint JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_metrics(JNIEnv*,
144+
jclass, jlong,
145+
jlongArray);
146+
138147
#ifdef __cplusplus
139148
}
140149
#endif

cpp/src/plasma/plasma.fbs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,9 @@ enum MessageType:long {
8181
// Touch a number of objects to bump their position in the LRU cache.
8282
PlasmaRefreshLRURequest,
8383
PlasmaRefreshLRUReply,
84+
// Get Plasma metrics info
85+
PlasmaMetricsRequest,
86+
PlasmaMetricsReply,
8487
}
8588

8689
enum PlasmaError:int {
@@ -116,6 +119,17 @@ struct PlasmaObjectSpec {
116119
device_num: int;
117120
}
118121

122+
struct PlasmaMetricsSpec {
123+
// total share memory allocated.
124+
share_mem_total: long;
125+
// used share memory currently.
126+
share_mem_used: long;
127+
// total external store size.
128+
external_total: long;
129+
// used external store size.
130+
external_used: long;
131+
}
132+
119133
table PlasmaSetOptionsRequest {
120134
// The name of the client.
121135
client_name: string;
@@ -355,3 +369,11 @@ table PlasmaRefreshLRURequest {
355369

356370
table PlasmaRefreshLRUReply {
357371
}
372+
373+
table PlasmaMetricsRequest {
374+
}
375+
376+
table PlasmaMetricsReply {
377+
// Metrics info to reply
378+
metrics: PlasmaMetricsSpec;
379+
}

0 commit comments

Comments
 (0)