Skip to content

Commit 511b982

Browse files
authored
Http post sink (#99)
* Http post sink * Fixed copyright comments and year.
1 parent 41c5959 commit 511b982

16 files changed

+1076
-98
lines changed

README.md

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -147,9 +147,9 @@ For example:
147147
"name": "my_application_telemetry_sink"
148148
},
149149
{
150-
"type": "com.arpnetworking.tsdcore.sinks.AggregationServerSink",
151-
"name": "my_application_aggregation_server_sink",
152-
"serverAddress": "192.168.0.2"
150+
"type": "com.arpnetworking.tsdcore.sinks.AggregationServerHttpSink",
151+
"name": "my_application_aggregation_server_http_sink",
152+
"uri": "http://192.168.0.2:7066/metrics/v1/data"
153153
}
154154
]
155155
}
@@ -284,6 +284,27 @@ interval="10s"
284284
[[inputs.kernel]]
285285
```
286286

287+
### Graphite
288+
289+
Example MAD source configuration:
290+
291+
```json
292+
{
293+
type="com.arpnetworking.metrics.mad.sources.MappingSource"
294+
name="graphitetcp_mapping_source"
295+
actorName="graphite-tcp-source"
296+
findAndReplace={
297+
"\\."=["/"]
298+
}
299+
source={
300+
type="com.arpnetworking.metrics.common.sources.TcpLineSource"
301+
name="graphitetcp_source"
302+
host="0.0.0.0"
303+
port="2003"
304+
}
305+
}
306+
```
307+
287308
Development
288309
-----------
289310

jdk-wrapper.sh

Lines changed: 106 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -40,45 +40,109 @@ safe_command() {
4040
fi
4141
}
4242

43-
download() {
43+
checksum() {
44+
l_file="$1"
45+
checksum_exec=""
46+
if command -v sha256sum > /dev/null; then
47+
checksum_exec="sha256sum"
48+
elif command -v shasum > /dev/null; then
49+
checksum_exec="shasum -a 256"
50+
elif command -v sha1sum > /dev/null; then
51+
checksum_exec="sha1sum"
52+
elif command -v md5 > /dev/null; then
53+
checksum_exec="md5"
54+
fi
55+
if [ -z "${checksum_exec}" ]; then
56+
log_err "ERROR: No supported checksum command found!"
57+
exit 1
58+
fi
59+
cat "${l_file}" | ${checksum_exec}
60+
}
61+
62+
rand() {
63+
awk 'BEGIN {srand();printf "%d\n", (rand() * 10^8);}'
64+
}
65+
66+
download_if_needed() {
4467
file="$1"
45-
if [ ! -f "${JDKW_PATH}/${file}" ]; then
46-
jdkw_url="${JDKW_URI}/${file}"
68+
path="$2"
69+
if [ ! -f "${path}/${file}" ]; then
70+
jdkw_url="${JDKW_BASE_URI}/releases/download/${JDKW_RELEASE}/${file}"
4771
log_out "Downloading ${file} from ${jdkw_url}"
48-
safe_command "curl ${CURL_OPTIONS} -f -k -L -o \"${JDKW_PATH}/${file}\" \"${jdkw_url}\""
49-
safe_command "chmod +x \"${JDKW_PATH}/${file}\""
72+
safe_command "curl ${curl_options} -f -k -L -o \"${path}/${file}\" \"${jdkw_url}\""
73+
safe_command "chmod +x \"${path}/${file}\""
5074
fi
5175
}
5276

5377
# Default curl options
54-
CURL_OPTIONS=""
78+
curl_options=""
79+
80+
# Process (but do not load) properties from environment
81+
env_configuration=
82+
l_fifo="${TMPDIR:-/tmp}/$$.$(rand)"
83+
safe_command "mkfifo \"${l_fifo}\""
84+
env > "${l_fifo}" &
85+
while IFS='=' read -r name value
86+
do
87+
jdkw_arg=$(echo "${name}" | grep '^JDKW_.*')
88+
jdkw_base_dir_arg=$(echo "${name}" | grep '^JDKW_BASE_DIR')
89+
if [ -n "${jdkw_base_dir_arg}" ]; then
90+
eval "${name}=\"${value}\""
91+
fi
92+
if [ -n "${jdkw_arg}" ]; then
93+
env_configuration="${env_configuration}${name}=\"${value}\" "
94+
fi
95+
done < "${l_fifo}"
96+
safe_command "rm \"${l_fifo}\""
97+
98+
# Process (but do not load) properties from command line arguments
99+
command=
100+
cmd_configuration=
101+
for arg in "$@"; do
102+
if [ -z ${in_command} ]; then
103+
jdkw_arg=$(echo "${arg}" | grep '^JDKW_.*')
104+
jdkw_base_dir_arg=$(echo "${arg}" | grep '^JDKW_BASE_DIR.*')
105+
if [ -n "${jdkw_base_dir_arg}" ]; then
106+
eval ${arg}
107+
fi
108+
if [ -n "${jdkw_arg}" ]; then
109+
cmd_configuration="${cmd_configuration}${arg} "
110+
fi
111+
fi
112+
case "${arg}" in
113+
*\'*)
114+
arg=`printf "%s" "$arg" | sed "s/'/'\"'\"'/g"`
115+
;;
116+
*) : ;;
117+
esac
118+
command="${command} '${arg}'"
119+
done
120+
121+
# Default base directory to current working directory
122+
if [ -z "${JDKW_BASE_DIR}" ]; then
123+
JDKW_BASE_DIR="."
124+
fi
55125

56126
# Load properties file in home directory
57127
if [ -f "${HOME}/.jdkw" ]; then
58128
. "${HOME}/.jdkw"
59129
fi
60130

61-
# Load properties file in working directory
62-
if [ -f ".jdkw" ]; then
63-
. "./.jdkw"
131+
# Load properties file in base directory
132+
if [ -f "${JDKW_BASE_DIR}/.jdkw" ]; then
133+
. "${JDKW_BASE_DIR}/.jdkw"
64134
fi
65135

66-
# Process command line arguments
67-
for ARG in "$@"; do
68-
JDKW_ARG=$(echo "${ARG}" | grep 'JDKW_.*')
69-
if [ -n "${JDKW_ARG}" ]; then
70-
eval ${ARG}
71-
else
72-
break
73-
fi
74-
done
136+
# Load properties from environment
137+
eval "${env_configuration}"
75138

76-
# Globals
77-
JDKW_BASE_URI="https://github.com/KoskiLabs/jdk-wrapper"
78-
JDKW_IMPL="jdkw-impl.sh"
79-
JDKW_WRAPPER="jdk-wrapper.sh"
139+
# Load properties from command line arguments
140+
eval "${cmd_configuration}"
80141

81142
# Process configuration
143+
if [ -z "${JDKW_BASE_URI}" ]; then
144+
JDKW_BASE_URI="https://github.com/KoskiLabs/jdk-wrapper"
145+
fi
82146
if [ -z "${JDKW_RELEASE}" ]; then
83147
JDKW_RELEASE="latest"
84148
log_out "Defaulted to version ${JDKW_RELEASE}"
@@ -88,38 +152,42 @@ if [ -z "${JDKW_TARGET}" ]; then
88152
log_out "Defaulted to target ${JDKW_TARGET}"
89153
fi
90154
if [ -z "${JDKW_VERBOSE}" ]; then
91-
CURL_OPTIONS="${CURL_OPTIONS} --silent"
155+
curl_options="${curl_options} --silent"
92156
fi
93157

94158
# Resolve latest version
95159
if [ "${JDKW_RELEASE}" = "latest" ]; then
96-
JDKW_RELEASE=$(curl ${CURL_OPTIONS} -f -k -L -H 'Accept: application/json' "${JDKW_BASE_URI}/releases/latest" | sed -e 's/.*"tag_name":"\([^"]*\)".*/\1/')
160+
latest_version_json="${TMPDIR:-/tmp}/jdkw-latest-version-$$.$(rand)"
161+
safe_command "curl ${curl_options} -f -k -L -o \"${latest_version_json}\" -H 'Accept: application/json' \"${JDKW_BASE_URI}/releases/latest\""
162+
JDKW_RELEASE=$(cat "${latest_version_json}" | sed -e 's/.*"tag_name":"\([^"]*\)".*/\1/')
163+
rm -f "${latest_version_json}"
97164
log_out "Resolved latest version to ${JDKW_RELEASE}"
98165
fi
99166

100-
# Define source and target
101-
JDKW_URI="${JDKW_BASE_URI}/releases/download/${JDKW_RELEASE}"
102-
JDKW_PATH="${JDKW_TARGET}/jdkw/${JDKW_RELEASE}"
103-
104167
# Ensure target directory exists
105-
if [ ! -d "${JDKW_PATH}" ]; then
106-
log_out "Creating target directory ${JDKW_PATH}"
107-
safe_command "mkdir -p \"${JDKW_PATH}\""
168+
jdkw_path="${JDKW_TARGET}/jdkw/${JDKW_RELEASE}"
169+
if [ ! -d "${jdkw_path}" ]; then
170+
log_out "Creating target directory ${jdkw_path}"
171+
safe_command "mkdir -p \"${jdkw_path}\""
108172
fi
109173

110174
# Download the jdk wrapper version
111-
download "${JDKW_IMPL}"
112-
download "${JDKW_WRAPPER}"
175+
jdkw_impl="jdkw-impl.sh"
176+
jdkw_wrapper="jdk-wrapper.sh"
177+
download_if_needed "${jdkw_impl}" "${jdkw_path}"
178+
download_if_needed "${jdkw_wrapper}" "${jdkw_path}"
179+
180+
# Execute the provided command
181+
eval ${jdkw_path}/${jdkw_impl} ${command}
182+
result=$?
113183

114184
# Check whether this wrapper is the one specified for this version
115-
jdkw_download="${JDKW_PATH}/${JDKW_WRAPPER}"
185+
jdkw_download="${jdkw_path}/${jdkw_wrapper}"
116186
jdkw_current="$(CDPATH= cd -- "$(dirname -- "$0")" && pwd)/$(basename "$0")"
117-
if [ "$(cat "${jdkw_download}" | sha1sum )" != "$(cat "${jdkw_current}" | sha1sum)" ]; then
187+
if [ "$(checksum "${jdkw_download}")" != "$(checksum "${jdkw_current}")" ]; then
118188
printf "\e[0;31m[WARNING]\e[0m Your jdk-wrapper.sh file does not match the one in your JDKW_RELEASE.\n"
119189
printf "\e[0;32mUpdate your jdk-wrapper.sh to match by running:\e[0m\n"
120190
printf "cp \"%s\" \"%s\"\n" "${jdkw_download}" "${jdkw_current}"
121191
fi
122192

123-
# Execute the provided command
124-
${JDKW_PATH}/${JDKW_IMPL} $@
125-
exit $?
193+
exit ${result}

pom.xml

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
<apache.httpcore.version>4.4.3</apache.httpcore.version>
7878
<arpnetworking.commons.version>1.16.0</arpnetworking.commons.version>
7979
<aspectjrt.version>1.8.9</aspectjrt.version>
80+
<asynchttpclient.version>2.4.8</asynchttpclient.version>
8081
<cglib.version>3.2.5</cglib.version>
8182
<client.protocol.version>0.10.0</client.protocol.version>
8283
<fastutil.version>8.1.1</fastutil.version>
@@ -576,16 +577,21 @@
576577
<artifactId>scala-library</artifactId>
577578
<version>${scala.library.version}</version>
578579
</dependency>
579-
<dependency>
580-
<groupId>io.vertx</groupId>
581-
<artifactId>vertx-core</artifactId>
582-
<version>${vertx.core.version}</version>
583-
</dependency>
584580
<dependency>
585581
<groupId>org.apache.httpcomponents</groupId>
586582
<artifactId>httpclient</artifactId>
587583
<version>${apache.httpclient.version}</version>
588584
</dependency>
585+
<dependency>
586+
<groupId>org.asynchttpclient</groupId>
587+
<artifactId>async-http-client</artifactId>
588+
<version>${asynchttpclient.version}</version>
589+
</dependency>
590+
<dependency>
591+
<groupId>io.vertx</groupId>
592+
<artifactId>vertx-core</artifactId>
593+
<version>${vertx.core.version}</version>
594+
</dependency>
589595
<dependency>
590596
<groupId>org.apache.httpcomponents</groupId>
591597
<artifactId>httpcore</artifactId>
@@ -769,7 +775,7 @@
769775
<directory>/opt/mad</directory>
770776
<sources>
771777
<source>
772-
<location>target/appassembler</location>
778+
<location>${buildDirectory}/appassembler</location>
773779
</source>
774780
</sources>
775781
</mapping>

src/main/docker/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ ENTRYPOINT [ \
4343
"-XX:HeapDumpPath=/opt/mad/logs/mad.oom.hprof", \
4444
"-XX:+PrintGCDetails", \
4545
"-XX:+PrintGCDateStamps", \
46-
"-Xloggc:logs/mad.gc.log", \
46+
"-Xloggc:/opt/mad/logs/mad.gc.log", \
4747
"-XX:NumberOfGCLogFiles=2", \
4848
"-XX:GCLogFileSize=50M", \
4949
"-XX:+UseGCLogFileRotation", \

src/main/java/com/arpnetworking/tsdcore/model/AggregationMessage.java

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,17 @@
1515
*/
1616
package com.arpnetworking.tsdcore.model;
1717

18+
import akka.util.ByteString;
19+
import akka.util.ByteStringBuilder;
1820
import com.arpnetworking.metrics.aggregation.protocol.Messages;
1921
import com.google.protobuf.GeneratedMessage;
2022
import org.vertx.java.core.buffer.Buffer;
2123

24+
import java.io.IOException;
25+
import java.nio.ByteOrder;
26+
2227
/**
23-
* Class for building messages from the raw, on-the-wire bytes in the TCP stream.
28+
* Class for building on-the-wire bytes for messages.
2429
*
2530
* @author Brandon Arp (brandon dot arp at inscopemetrics dot com)
2631
*/
@@ -41,7 +46,7 @@ public static AggregationMessage create(final GeneratedMessage message) {
4146
*
4247
* @return <code>Buffer</code> containing serialized message.
4348
*/
44-
public Buffer serialize() {
49+
public Buffer serializeToBuffer() {
4550
final Buffer b = new Buffer();
4651
b.appendInt(0);
4752
if (_message instanceof Messages.HostIdentification) {
@@ -64,6 +69,40 @@ public Buffer serialize() {
6469
return b;
6570
}
6671

72+
/**
73+
* Serialize the message into a <code>ByteString</code>.
74+
*
75+
* @return <code>Buffer</code> containing serialized message.
76+
*/
77+
public ByteString serializeToByteString() {
78+
final ByteStringBuilder b = ByteString.createBuilder();
79+
if (_message instanceof Messages.HostIdentification) {
80+
b.putByte((byte) 0x01);
81+
} else if (_message instanceof Messages.HeartbeatRecord) {
82+
b.putByte((byte) 0x03);
83+
} else if (_message instanceof Messages.StatisticSetRecord) {
84+
b.putByte((byte) 0x04);
85+
} else if (_message instanceof Messages.SamplesSupportingData) {
86+
b.putByte((byte) 0x05);
87+
b.putByte((byte) 0x01);
88+
} else if (_message instanceof Messages.SparseHistogramSupportingData) {
89+
b.putByte((byte) 0x05);
90+
b.putByte((byte) 0x02);
91+
} else {
92+
throw new IllegalArgumentException(
93+
String.format("Unsupported message; messageClass=%s", _message.getClass()));
94+
}
95+
try {
96+
_message.writeTo(b.asOutputStream());
97+
} catch (final IOException e) {
98+
throw new RuntimeException(e);
99+
}
100+
final ByteString bs = b.result();
101+
final ByteStringBuilder sizePrefix = ByteString.createBuilder();
102+
sizePrefix.putInt(bs.size() + INTEGER_SIZE_IN_BYTES, ByteOrder.BIG_ENDIAN);
103+
return sizePrefix.result().concat(bs);
104+
}
105+
67106
public GeneratedMessage getMessage() {
68107
return _message;
69108
}

0 commit comments

Comments
 (0)