Skip to content

Commit 41d4a0d

Browse files
author
Varada M
committed
8352392: AIX: implement attach API v2 and streaming output
Reviewed-by: mdoerr, jkern, amenkov
1 parent 1c2a553 commit 41d4a0d

File tree

3 files changed

+80
-219
lines changed

3 files changed

+80
-219
lines changed

src/hotspot/os/aix/attachListener_aix.cpp

Lines changed: 53 additions & 182 deletions
Original file line numberDiff line numberDiff line change
@@ -73,16 +73,7 @@ class AixAttachListener: AllStatic {
7373

7474
static bool _atexit_registered;
7575

76-
// reads a request from the given connected socket
77-
static AixAttachOperation* read_request(int s);
78-
7976
public:
80-
enum {
81-
ATTACH_PROTOCOL_VER = 1 // protocol version
82-
};
83-
enum {
84-
ATTACH_ERROR_BADVERSION = 101 // error codes
85-
};
8677

8778
static void set_path(char* path) {
8879
if (path == nullptr) {
@@ -107,25 +98,65 @@ class AixAttachListener: AllStatic {
10798
static void set_shutdown(bool shutdown) { _shutdown = shutdown; }
10899
static bool is_shutdown() { return _shutdown; }
109100

110-
// write the given buffer to a socket
111-
static int write_fully(int s, char* buf, size_t len);
112-
113101
static AixAttachOperation* dequeue();
114102
};
115103

104+
class SocketChannel : public AttachOperation::RequestReader, public AttachOperation::ReplyWriter {
105+
private:
106+
int _socket;
107+
public:
108+
SocketChannel(int socket) : _socket(socket) {}
109+
~SocketChannel() {
110+
close();
111+
}
112+
113+
bool opened() const {
114+
return _socket != -1;
115+
}
116+
117+
void close() {
118+
if (opened()) {
119+
// SHUT_RDWR is not available
120+
::shutdown(_socket, 2);
121+
::close(_socket);
122+
_socket = -1;
123+
}
124+
}
125+
126+
// RequestReader
127+
int read(void* buffer, int size) override {
128+
ssize_t n;
129+
RESTARTABLE(::read(_socket, buffer, (size_t)size), n);
130+
return checked_cast<int>(n);
131+
}
132+
133+
// ReplyWriter
134+
int write(const void* buffer, int size) override {
135+
ssize_t n;
136+
RESTARTABLE(::write(_socket, buffer, size), n);
137+
return checked_cast<int>(n);
138+
}
139+
140+
void flush() override {
141+
}
142+
};
143+
116144
class AixAttachOperation: public AttachOperation {
117145
private:
118146
// the connection to the client
119-
int _socket;
147+
SocketChannel _socket_channel;
120148

121149
public:
122-
void complete(jint res, bufferedStream* st);
150+
AixAttachOperation(int socket) : AttachOperation(), _socket_channel(socket) {}
123151

124-
void set_socket(int s) { _socket = s; }
125-
int socket() const { return _socket; }
152+
void complete(jint res, bufferedStream* st) override;
126153

127-
AixAttachOperation(char* name) : AttachOperation(name) {
128-
set_socket(-1);
154+
ReplyWriter* get_reply_writer() override {
155+
return &_socket_channel;
156+
}
157+
158+
bool read_request() {
159+
return _socket_channel.read_request(this, &_socket_channel);
129160
}
130161
};
131162

@@ -137,34 +168,6 @@ bool AixAttachListener::_atexit_registered = false;
137168
// Shutdown marker to prevent accept blocking during clean-up
138169
volatile bool AixAttachListener::_shutdown = false;
139170

140-
// Supporting class to help split a buffer into individual components
141-
class ArgumentIterator : public StackObj {
142-
private:
143-
char* _pos;
144-
char* _end;
145-
public:
146-
ArgumentIterator(char* arg_buffer, size_t arg_size) {
147-
_pos = arg_buffer;
148-
_end = _pos + arg_size - 1;
149-
}
150-
char* next() {
151-
if (*_pos == '\0') {
152-
// advance the iterator if possible (null arguments)
153-
if (_pos < _end) {
154-
_pos += 1;
155-
}
156-
return nullptr;
157-
}
158-
char* res = _pos;
159-
char* next_pos = strchr(_pos, '\0');
160-
if (next_pos < _end) {
161-
next_pos++;
162-
}
163-
_pos = next_pos;
164-
return res;
165-
}
166-
};
167-
168171
// On AIX if sockets block until all data has been transmitted
169172
// successfully in some communication domains a socket "close" may
170173
// never complete. We have to take care that after the socket shutdown
@@ -258,106 +261,6 @@ int AixAttachListener::init() {
258261
return 0;
259262
}
260263

261-
// Given a socket that is connected to a peer we read the request and
262-
// create an AttachOperation. As the socket is blocking there is potential
263-
// for a denial-of-service if the peer does not response. However this happens
264-
// after the peer credentials have been checked and in the worst case it just
265-
// means that the attach listener thread is blocked.
266-
//
267-
AixAttachOperation* AixAttachListener::read_request(int s) {
268-
char ver_str[8];
269-
os::snprintf_checked(ver_str, sizeof(ver_str), "%d", ATTACH_PROTOCOL_VER);
270-
271-
// The request is a sequence of strings so we first figure out the
272-
// expected count and the maximum possible length of the request.
273-
// The request is:
274-
// <ver>0<cmd>0<arg>0<arg>0<arg>0
275-
// where <ver> is the protocol version (1), <cmd> is the command
276-
// name ("load", "datadump", ...), and <arg> is an argument
277-
int expected_str_count = 2 + AttachOperation::arg_count_max;
278-
const size_t max_len = (sizeof(ver_str) + 1) + (AttachOperation::name_length_max + 1) +
279-
AttachOperation::arg_count_max*(AttachOperation::arg_length_max + 1);
280-
281-
char buf[max_len];
282-
int str_count = 0;
283-
284-
// Read until all (expected) strings have been read, the buffer is
285-
// full, or EOF.
286-
287-
size_t off = 0;
288-
size_t left = max_len;
289-
290-
do {
291-
ssize_t n;
292-
// Don't block on interrupts because this will
293-
// hang in the clean-up when shutting down.
294-
n = read(s, buf+off, left);
295-
assert(n <= checked_cast<ssize_t>(left), "buffer was too small, impossible!");
296-
buf[max_len - 1] = '\0';
297-
if (n == -1) {
298-
return nullptr; // reset by peer or other error
299-
}
300-
if (n == 0) {
301-
break;
302-
}
303-
for (int i=0; i<n; i++) {
304-
if (buf[off+i] == 0) {
305-
// EOS found
306-
str_count++;
307-
308-
// The first string is <ver> so check it now to
309-
// check for protocol mismatch
310-
if (str_count == 1) {
311-
if ((strlen(buf) != strlen(ver_str)) ||
312-
(atoi(buf) != ATTACH_PROTOCOL_VER)) {
313-
char msg[32];
314-
os::snprintf_checked(msg, sizeof(msg), "%d\n", ATTACH_ERROR_BADVERSION);
315-
write_fully(s, msg, strlen(msg));
316-
return nullptr;
317-
}
318-
}
319-
}
320-
}
321-
off += n;
322-
left -= n;
323-
} while (left > 0 && str_count < expected_str_count);
324-
325-
if (str_count != expected_str_count) {
326-
return nullptr; // incomplete request
327-
}
328-
329-
// parse request
330-
331-
ArgumentIterator args(buf, (max_len)-left);
332-
333-
// version already checked
334-
char* v = args.next();
335-
336-
char* name = args.next();
337-
if (name == nullptr || strlen(name) > AttachOperation::name_length_max) {
338-
return nullptr;
339-
}
340-
341-
AixAttachOperation* op = new AixAttachOperation(name);
342-
343-
for (int i=0; i<AttachOperation::arg_count_max; i++) {
344-
char* arg = args.next();
345-
if (arg == nullptr) {
346-
op->set_arg(i, nullptr);
347-
} else {
348-
if (strlen(arg) > AttachOperation::arg_length_max) {
349-
delete op;
350-
return nullptr;
351-
}
352-
op->set_arg(i, arg);
353-
}
354-
}
355-
356-
op->set_socket(s);
357-
return op;
358-
}
359-
360-
361264
// Dequeue an operation
362265
//
363266
// In the Aix implementation there is only a single operation and clients
@@ -402,31 +305,16 @@ AixAttachOperation* AixAttachListener::dequeue() {
402305
}
403306

404307
// peer credential look okay so we read the request
405-
AixAttachOperation* op = read_request(s);
406-
if (op == nullptr) {
407-
::close(s);
308+
AixAttachOperation* op = new AixAttachOperation(s);
309+
if (!op->read_request()) {
310+
delete op;
408311
continue;
409312
} else {
410313
return op;
411314
}
412315
}
413316
}
414317

415-
// write the given buffer to the socket
416-
int AixAttachListener::write_fully(int s, char* buf, size_t len) {
417-
do {
418-
ssize_t n = ::write(s, buf, len);
419-
if (n == -1) {
420-
if (errno != EINTR) return -1;
421-
} else {
422-
buf += n;
423-
len -= n;
424-
}
425-
}
426-
while (len > 0);
427-
return 0;
428-
}
429-
430318
// Complete an operation by sending the operation result and any result
431319
// output to the client. At this time the socket is in blocking mode so
432320
// potentially we can block if there is a lot of data and the client is
@@ -436,24 +324,6 @@ int AixAttachListener::write_fully(int s, char* buf, size_t len) {
436324
// socket could be made non-blocking and a timeout could be used.
437325

438326
void AixAttachOperation::complete(jint result, bufferedStream* st) {
439-
JavaThread* thread = JavaThread::current();
440-
ThreadBlockInVM tbivm(thread);
441-
442-
// write operation result
443-
char msg[32];
444-
os::snprintf_checked(msg, sizeof(msg), "%d\n", result);
445-
int rc = AixAttachListener::write_fully(this->socket(), msg, strlen(msg));
446-
447-
// write any result data
448-
if (rc == 0) {
449-
// Shutdown the socket in the cleanup function to enable more than
450-
// one agent attach in a sequence (see comments to listener_cleanup()).
451-
AixAttachListener::write_fully(this->socket(), (char*) st->base(), st->size());
452-
}
453-
454-
// done
455-
::close(this->socket());
456-
457327
delete this;
458328
}
459329

@@ -493,6 +363,7 @@ void AttachListener::vm_start() {
493363
}
494364

495365
int AttachListener::pd_init() {
366+
AttachListener::set_supported_version(ATTACH_API_V2);
496367
JavaThread* thread = JavaThread::current();
497368
ThreadBlockInVM tbivm(thread);
498369

0 commit comments

Comments
 (0)