Skip to content
This repository was archived by the owner on Nov 22, 2020. It is now read-only.

Commit 22b4efd

Browse files
Add LoggingBackend; Add headers; Make json payload not a string; default configuration (#92)
* Add LoggingBackend; Add headers; Make json payload not a string Signed-off-by: Julien Le Dem <[email protected]> * removed unecessary class Signed-off-by: Julien Le Dem <[email protected]> * improve tests coverage Signed-off-by: Julien Le Dem <[email protected]> * improve tests coverage Signed-off-by: Julien Le Dem <[email protected]> Co-authored-by: Willy Lulciuc <[email protected]>
1 parent 50cf7f9 commit 22b4efd

16 files changed

+247
-17
lines changed

build.gradle

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ repositories {
3232
mavenCentral()
3333
}
3434

35+
java {
36+
sourceCompatibility = JavaVersion.VERSION_11
37+
targetCompatibility = JavaVersion.VERSION_11
38+
}
39+
3540
ext {
3641
lombokVersion = '1.18.8'
3742
slf4jVersion = '1.7.26'

src/main/java/marquez/client/Backend.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
package marquez.client;
22

3+
import java.io.Closeable;
34
import javax.annotation.Nullable;
45

56
/**
67
* The backend contract for sending Marquez instrumentation. Information operations can be sent
78
* synchronously or asynchronously over various protocols
89
*/
9-
public interface Backend {
10+
public interface Backend extends Closeable {
1011

1112
void put(String path, String json);
1213

src/main/java/marquez/client/Backends.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
@Slf4j
1414
public class Backends {
1515

16+
@VisibleForTesting static final String DEFAULT_URL = "http://localhost:8080";
17+
1618
/**
1719
* Will write to a file.
1820
*
@@ -26,17 +28,26 @@ public static Backend newFileBackend(File file) {
2628
/**
2729
* Will issue http requests.
2830
*
29-
* @param baseURL the base url for http requests
31+
* @param baseUrl the base url for http requests
3032
* @return the corresponding backend implementation
3133
*/
3234
public static Backend newHttpBackend(URL baseUrl) {
3335
return new HttpBackend(baseUrl);
3436
}
3537

38+
/**
39+
* Will log requests.
40+
*
41+
* @return the corresponding backend implementation
42+
*/
43+
public static Backend newLoggingBackend() {
44+
return new LoggingBackend();
45+
}
46+
3647
/**
3748
* Initializes the backend base on environment variable configuration.
3849
*
39-
* <p>
50+
* <p>configuration:
4051
*
4152
* <ul>
4253
* <li>MARQUEZ_BACKEND=FILE|HTTP
@@ -52,12 +63,12 @@ public static Backend newBackendFromEnv() {
5263

5364
@VisibleForTesting
5465
static Backend newBackendFromEnv(Map<String, String> env) {
55-
String backendName = env.get("MARQUEZ_BACKEND");
66+
String backendName = env.getOrDefault("MARQUEZ_BACKEND", "http");
5667
switch (backendName.toUpperCase(US)) {
5768
case "FILE":
5869
return newFileBackend(new File(env.get("MARQUEZ_FILE")));
5970
case "HTTP":
60-
String configuredBaseUrl = env.get("MARQUEZ_URL");
71+
String configuredBaseUrl = env.getOrDefault("MARQUEZ_URL", DEFAULT_URL);
6172
try {
6273
return newHttpBackend(new URL(configuredBaseUrl));
6374
} catch (MalformedURLException e) {
@@ -68,6 +79,8 @@ static Backend newBackendFromEnv(Map<String, String> env) {
6879
+ " Defaulting to doing nothing.");
6980
return new NullBackend();
7081
}
82+
case "LOG":
83+
return newLoggingBackend();
7184
default:
7285
log.error(
7386
"Could not initialize Marquez backend for "

src/main/java/marquez/client/FileBackend.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,42 @@
11
package marquez.client;
22

33
import static java.nio.charset.StandardCharsets.UTF_8;
4+
import static org.apache.http.protocol.HTTP.USER_AGENT;
45

6+
import com.fasterxml.jackson.core.type.TypeReference;
7+
import com.fasterxml.jackson.databind.JsonNode;
58
import com.google.common.annotations.VisibleForTesting;
9+
import com.google.common.collect.ImmutableMap;
610
import java.io.File;
711
import java.io.FileWriter;
812
import java.io.IOException;
913
import java.io.Writer;
1014
import java.util.LinkedHashMap;
1115
import java.util.Map;
1216
import lombok.extern.slf4j.Slf4j;
17+
import marquez.client.MarquezClient.Version;
18+
import marquez.client.MarquezHttp.UserAgent;
1319

1420
/** A backend that writes events to a file in json. */
1521
@Slf4j
1622
class FileBackend implements Backend {
1723

24+
private static final TypeReference<JsonNode> JSONNODE = new TypeReference<JsonNode>() {};
25+
1826
private final File file;
1927
private Writer writer = null;
28+
private final Map<String, String> headers;
2029

21-
public FileBackend(File file) {
30+
FileBackend(File file) {
2231
this(file, initWriter(file));
2332
}
2433

34+
@VisibleForTesting
2535
FileBackend(File file, Writer writer) {
2636
this.file = file;
2737
this.writer = writer;
38+
Version version = MarquezClient.Version.get();
39+
this.headers = ImmutableMap.of(USER_AGENT, UserAgent.of(version).getValue());
2840
}
2941

3042
@VisibleForTesting
@@ -58,10 +70,11 @@ public void post(String path, String json) {
5870

5971
private void write(String method, String path, String json) {
6072
if (writer != null) {
61-
Map<String, String> call = new LinkedHashMap<String, String>(3);
73+
Map<String, Object> call = new LinkedHashMap<>(3);
6274
call.put("method", method);
75+
call.put("headers", headers);
6376
call.put("path", path);
64-
call.put("payload", json);
77+
call.put("payload", Utils.fromJson(json, JSONNODE));
6578
String line = Utils.toJson(call) + "\n";
6679
try {
6780
writer.append(line).flush();
@@ -77,4 +90,9 @@ private void write(String method, String path, String json) {
7790
}
7891
}
7992
}
93+
94+
@Override
95+
public void close() throws IOException {
96+
writer.close();
97+
}
8098
}

src/main/java/marquez/client/HttpBackend.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package marquez.client;
22

33
import com.google.common.annotations.VisibleForTesting;
4+
import java.io.IOException;
45
import java.net.MalformedURLException;
56
import java.net.URL;
67

@@ -13,7 +14,7 @@ class HttpBackend implements Backend {
1314
private final URL baseUrl;
1415
private final MarquezHttp http;
1516

16-
public HttpBackend(URL baseUrl) {
17+
HttpBackend(URL baseUrl) {
1718
this(baseUrl, MarquezHttp.create(MarquezClient.Version.get()));
1819
}
1920

@@ -23,6 +24,10 @@ public HttpBackend(URL baseUrl) {
2324
this.http = http;
2425
}
2526

27+
public URL getBaseUrl() {
28+
return baseUrl;
29+
}
30+
2631
private URL url(String path) {
2732
try {
2833
return new URL(this.baseUrl.toString() + path);
@@ -40,4 +45,9 @@ public void put(String path, String json) {
4045
public void post(String path, String json) {
4146
http.post(url(path), json);
4247
}
48+
49+
@Override
50+
public void close() throws IOException {
51+
http.close();
52+
}
4353
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package marquez.client;
2+
3+
import com.google.common.annotations.VisibleForTesting;
4+
import java.io.IOException;
5+
import lombok.extern.slf4j.Slf4j;
6+
import org.slf4j.Logger;
7+
8+
@Slf4j
9+
class LoggingBackend implements Backend {
10+
11+
private final Logger logger;
12+
13+
LoggingBackend() {
14+
this(log);
15+
}
16+
17+
@VisibleForTesting
18+
LoggingBackend(Logger logger) {
19+
this.logger = logger;
20+
}
21+
22+
@Override
23+
public void close() throws IOException {
24+
this.logger.info("closing");
25+
}
26+
27+
@Override
28+
public void put(String path, String json) {
29+
this.logger.info("PUT " + path + " " + json);
30+
}
31+
32+
@Override
33+
public void post(String path, String json) {
34+
this.logger.info("POST " + path + (json == null ? "" : " " + json));
35+
}
36+
}

src/main/java/marquez/client/MarquezHttp.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import com.fasterxml.jackson.annotation.JsonCreator;
2323
import com.fasterxml.jackson.core.type.TypeReference;
24+
import java.io.Closeable;
2425
import java.io.IOException;
2526
import java.net.URISyntaxException;
2627
import java.net.URL;
@@ -34,11 +35,12 @@
3435
import org.apache.http.client.methods.HttpPost;
3536
import org.apache.http.client.methods.HttpPut;
3637
import org.apache.http.entity.StringEntity;
38+
import org.apache.http.impl.client.CloseableHttpClient;
3739
import org.apache.http.impl.client.HttpClientBuilder;
3840
import org.apache.http.util.EntityUtils;
3941

4042
@Slf4j
41-
class MarquezHttp {
43+
class MarquezHttp implements Closeable {
4244
private final HttpClient http;
4345

4446
MarquezHttp(final HttpClient http) {
@@ -47,7 +49,8 @@ class MarquezHttp {
4749

4850
static final MarquezHttp create(final MarquezClient.Version version) {
4951
final UserAgent userAgent = UserAgent.of(version);
50-
final HttpClient http = HttpClientBuilder.create().setUserAgent(userAgent.getValue()).build();
52+
final CloseableHttpClient http =
53+
HttpClientBuilder.create().setUserAgent(userAgent.getValue()).build();
5154
return new MarquezHttp(http);
5255
}
5356

@@ -122,6 +125,13 @@ private void throwOnHttpError(HttpResponse response) throws IOException {
122125
}
123126
}
124127

128+
@Override
129+
public void close() throws IOException {
130+
if (http instanceof Closeable) {
131+
((Closeable) http).close();
132+
}
133+
}
134+
125135
@Value
126136
static class UserAgent {
127137
@Getter String value;

src/main/java/marquez/client/MarquezWriteOnlyClient.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import static marquez.client.models.RunState.FAILED;
66
import static marquez.client.models.RunState.RUNNING;
77

8+
import java.io.Closeable;
89
import java.time.Instant;
910
import javax.annotation.Nullable;
1011
import marquez.client.models.DatasetMeta;
@@ -18,7 +19,7 @@
1819
* The contract of a write only client to instrument jobs actions. Can be taken synchronously or
1920
* asynchronously
2021
*/
21-
public interface MarquezWriteOnlyClient {
22+
public interface MarquezWriteOnlyClient extends Closeable {
2223

2324
public void createNamespace(String namespaceName, NamespaceMeta namespaceMeta);
2425

src/main/java/marquez/client/MarquezWriteOnlyClientImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import static marquez.client.MarquezPathV1.sourcePath;
1111

1212
import com.google.common.collect.ImmutableMap;
13+
import java.io.IOException;
1314
import java.net.URLEncoder;
1415
import java.time.Instant;
1516
import java.util.Map;
@@ -80,4 +81,9 @@ public void markRunAs(String runId, RunState runState, Instant at) {
8081
at == null ? null : ImmutableMap.of("at", ISO_INSTANT.format(at));
8182
backend.post(path(runTransitionPath(runId, runState), queryParams));
8283
}
84+
85+
@Override
86+
public void close() throws IOException {
87+
backend.close();
88+
}
8389
}

src/main/java/marquez/client/NullBackend.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package marquez.client;
22

3+
import java.io.IOException;
4+
35
/** A backend that does not do anything. */
46
class NullBackend implements Backend {
57

@@ -8,4 +10,7 @@ public void put(String path, String json) {}
810

911
@Override
1012
public void post(String path, String json) {}
13+
14+
@Override
15+
public void close() throws IOException {}
1116
}

0 commit comments

Comments
 (0)