Skip to content

Commit a0cef28

Browse files
authored
[GOBBLIN-2188] Define Initializer.AfterInitializeMemento for GoT to tunnel state from GenerateWorkUnits to CommitActivity (#4091)
1 parent 87c8ab4 commit a0cef28

File tree

12 files changed

+479
-48
lines changed

12 files changed

+479
-48
lines changed

gobblin-api/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ apply plugin: 'java'
2020
dependencies {
2121
compile externalDependency.guava
2222
compile externalDependency.gson
23+
compile externalDependency.jacksonCore
24+
compile externalDependency.jacksonMapper
2325
compile externalDependency.jasypt
2426
compile externalDependency.jodaTime
2527
compile externalDependency.commonsLang3

gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ public class ConfigurationKeys {
203203
public static final String TASK_DATA_ROOT_DIR_KEY = "task.data.root.dir";
204204
public static final String SOURCE_CLASS_KEY = "source.class";
205205
public static final String CONVERTER_CLASSES_KEY = "converter.classes";
206+
public static final String CONVERTER_INITIALIZERS_SERIALIZED_MEMENTOS_KEY = "converter.initializers.serialized.mementos";
206207
public static final String RECORD_STREAM_PROCESSOR_CLASSES_KEY = "recordStreamProcessor.classes";
207208
public static final String FORK_OPERATOR_CLASS_KEY = "fork.operator.class";
208209
public static final String DEFAULT_FORK_OPERATOR_CLASS = "org.apache.gobblin.fork.IdentityForkOperator";
@@ -434,6 +435,7 @@ public class ConfigurationKeys {
434435
public static final String WRITER_TRUNCATE_STAGING_TABLE = WRITER_PREFIX + ".truncate.staging.table";
435436
public static final String WRITER_OUTPUT_DIR = WRITER_PREFIX + ".output.dir";
436437
public static final String WRITER_BUILDER_CLASS = WRITER_PREFIX + ".builder.class";
438+
public static final String WRITER_INITIALIZER_SERIALIZED_MEMENTO_KEY = "writer.initializer.serialized.memento";
437439
public static final String DEFAULT_WRITER_BUILDER_CLASS = "org.apache.gobblin.writer.AvroDataWriterBuilder";
438440
public static final String WRITER_FILE_NAME = WRITER_PREFIX + ".file.name";
439441
public static final String WRITER_FILE_PATH = WRITER_PREFIX + ".file.path";

gobblin-api/src/main/java/org/apache/gobblin/initializer/Initializer.java

Lines changed: 102 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,121 @@
1818
package org.apache.gobblin.initializer;
1919

2020
import java.io.Closeable;
21+
import java.util.Optional;
22+
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
import com.fasterxml.jackson.annotation.JsonTypeInfo;
27+
import com.fasterxml.jackson.core.JsonProcessingException;
28+
import com.fasterxml.jackson.databind.ObjectMapper;
29+
2130

2231
public interface Initializer extends Closeable {
2332

2433
/**
25-
* Initialize for the writer.
34+
* Marker interface to convey an opaque snapshot of the internal state of any concrete {@link Initializer}, thus affording state serialization for
35+
* eventual "revival" as a new `Initializer` holding equivalent internal state. {@link #commemorate()} (i.e. create) the memento after
36+
* {@link #initialize()} and subsequently {@link #recall(AfterInitializeMemento)} the state it preserved before performing {@link #close()}.
37+
*
38+
* When synchronous and the same instance throughout, the "Initializer Lifecycle" is:
39+
* [concrete `My_T implements Initializer`, instance A] -
40+
* `.initialize()`; ==PROCESSING RUNS==; `.close()`;
41+
*
42+
* When trading `AfterInitializeMemento` between instances (even memory-space boundaries) it becomes:
43+
* [concrete `My_T implements Initializer`, instance A] -
44+
* `.initialize()`; `.commemorate()`; ==PERSIST/TRANSMIT MEMENTO==
45+
* ==PROCESSING RUNS==;
46+
* [concrete `My_T implements Initializer`, instance B] -
47+
* ==RECEIVE MEMENTO==; `.recall()`; `.close()`
2648
*
27-
* @param state
28-
* @param workUnits WorkUnits created by Source
49+
* Both for backwards compatibility and because not every concrete `Initializer` has internal state worth capturing, not every `Initializer`
50+
* impl will implement an `AfterInitializeMemento`. Those that do will supply a unique impl capturing self-aware impl details of their
51+
* `Initializer`.
52+
*
53+
* An `AfterInitializeMemento` impl needs simply be (de)serializable by {@link ObjectMapper}.
54+
*
55+
* An `Initializer` impl with an `AfterInitializeMemento` impl MUST NOT (re-)process any {@link org.apache.gobblin.source.workunit.WorkUnit}s
56+
* during its {@link #close()} method: `WorkUnit` processing MUST occur entirely within {@link #initialize()}.
57+
*/
58+
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "@class") // to handle variety of concrete impls
59+
public interface AfterInitializeMemento {
60+
static Logger logger = LoggerFactory.getLogger(AfterInitializeMemento.class);
61+
62+
/**
63+
* Convey attempt to work with a concrete {@link AfterInitializeMemento} of type other than the single expected companion type known to `forInitializer`.
64+
* @see #castAsOrThrow(Class, Initializer)
65+
*/
66+
static class MismatchedMementoException extends RuntimeException {
67+
public MismatchedMementoException(AfterInitializeMemento memento, Class<?> asClass, Initializer forInitializer) {
68+
super(String.format("Memento '%s' for Initializer '%s' of class '%s' - NOT '%s'", memento, forInitializer.getClass().getName(),
69+
memento.getClass().getName(), asClass.getName()));
70+
}
71+
}
72+
73+
/** stringify as JSON */
74+
static String serialize(AfterInitializeMemento memento) {
75+
ObjectMapper objectMapper = new ObjectMapper();
76+
try {
77+
String result = objectMapper.writeValueAsString(memento);
78+
logger.info("Serializing AfterInitializeMemento {} as '{}'", memento, result);
79+
return result;
80+
} catch (JsonProcessingException e) {
81+
logger.error("Failed to serialize AfterInitializeMemento '" + memento + "'", e);
82+
throw new RuntimeException(e);
83+
}
84+
}
85+
86+
/** destringify JSON */
87+
static AfterInitializeMemento deserialize(String serialized) {
88+
ObjectMapper objectMapper = new ObjectMapper();
89+
try {
90+
AfterInitializeMemento result = objectMapper.readValue(serialized, AfterInitializeMemento.class);
91+
logger.info("Deserializing AfterInitializeMemento '{}' as {}", serialized, result);
92+
return result;
93+
} catch (JsonProcessingException e) {
94+
logger.error("Failed to deserialize AfterInitializeMemento '" + serialized + "'", e);
95+
throw new RuntimeException(e);
96+
}
97+
}
98+
99+
/** cast `this` (concrete `AfterInitializeMemento`) to `castClass`, else {@link MismatchedMementoException} */
100+
default <T extends AfterInitializeMemento> T castAsOrThrow(Class<T> castClass, Initializer forInitializer)
101+
throws MismatchedMementoException {
102+
if (castClass.isAssignableFrom(this.getClass())) {
103+
return (T) this;
104+
} else {
105+
throw new AfterInitializeMemento.MismatchedMementoException(this, castClass, forInitializer);
106+
}
107+
}
108+
}
109+
110+
/**
111+
* Initialize the writer/converter (e.g. using the state and/or {@link org.apache.gobblin.source.workunit.WorkUnit}s provided when constructing the instance)
29112
*/
30113
public void initialize();
31114

32115
/**
33116
* Removed checked exception.
34117
* {@inheritDoc}
35118
* @see java.io.Closeable#close()
119+
*
120+
* NOTE: An `Initializer` impl with an `AfterInitializeMemento` impl MUST NOT (re-)process any {@link org.apache.gobblin.source.workunit.WorkUnit}s
121+
* during its {@link #close()} method: `WorkUnit` processing MUST occur entirely within {@link #initialize()}.
36122
*/
37123
@Override
38124
public void close();
125+
126+
/** @return the `Initializer`-specific companion memento, to convey internal state after {@link #initialize()}, and as needed to {@link #close()} */
127+
default Optional<AfterInitializeMemento> commemorate() {
128+
return Optional.empty();
129+
}
130+
131+
/**
132+
* "reinitialize" a fresh instance, per `Initializer`-specific companion `memento`, with (equivalent) post {@link #initialize()} internal state needed
133+
* to {@link #close()}
134+
*/
135+
default void recall(AfterInitializeMemento memento) {
136+
// noop
137+
}
39138
}

gobblin-core/src/main/java/org/apache/gobblin/converter/initializer/MultiConverterInitializer.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,27 +18,39 @@
1818
package org.apache.gobblin.converter.initializer;
1919

2020
import java.util.List;
21+
import java.util.Optional;
2122

2223
import lombok.ToString;
24+
2325
import org.apache.gobblin.initializer.Initializer;
2426
import org.apache.gobblin.initializer.MultiInitializer;
2527

2628

2729
@ToString
2830
public class MultiConverterInitializer implements ConverterInitializer {
29-
private final Initializer intializer;
31+
private final Initializer initializer;
3032

3133
public MultiConverterInitializer(List<ConverterInitializer> converterInitializers) {
32-
this.intializer = new MultiInitializer(converterInitializers);
34+
this.initializer = new MultiInitializer(converterInitializers);
3335
}
3436

3537
@Override
3638
public void initialize() {
37-
this.intializer.initialize();
39+
this.initializer.initialize();
3840
}
3941

4042
@Override
4143
public void close() {
42-
this.intializer.close();
44+
this.initializer.close();
45+
}
46+
47+
@Override
48+
public Optional<AfterInitializeMemento> commemorate() {
49+
return this.initializer.commemorate();
50+
}
51+
52+
@Override
53+
public void recall(AfterInitializeMemento memento) {
54+
this.initializer.recall(memento);
4355
}
4456
}

gobblin-core/src/main/java/org/apache/gobblin/initializer/MultiInitializer.java

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,46 @@
1919

2020
import java.io.IOException;
2121
import java.util.List;
22+
import java.util.Optional;
23+
import java.util.stream.Collectors;
2224

25+
import lombok.AccessLevel;
26+
import lombok.Data;
27+
import lombok.NoArgsConstructor;
28+
import lombok.NonNull;
29+
import lombok.RequiredArgsConstructor;
30+
import lombok.Setter;
2331
import lombok.ToString;
2432

2533
import com.google.common.collect.ImmutableList;
34+
import com.google.common.collect.Streams;
2635
import com.google.common.io.Closer;
2736

2837

2938
/**
30-
* Wraps multiple writer initializer behind its interface. This is useful when there're more than one branch.
39+
* Wraps multiple writer initializers, which is useful when more than one branch.
3140
*/
3241
@ToString
3342
public class MultiInitializer implements Initializer {
43+
44+
/** Commemorate each (`Optional`) {@link org.apache.gobblin.initializer.Initializer.AfterInitializeMemento} of every wrapped {@link Initializer} */
45+
@Data
46+
@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization
47+
@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
48+
@RequiredArgsConstructor
49+
private static class Memento implements AfterInitializeMemento {
50+
// WARNING: not possible to use `List<Optional<AfterInitializeMemento>>`, as first attempted, due to:
51+
// com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "present" (class java.util.Optional), not marked as
52+
// ignorable (0 known properties: ])
53+
// at [Source:(String)"{\"@class\":\"org.apache.gobblin.initializer.MultiInitializer$Memento\",\"orderedInitializersMementos\":[{\"present\":false}]}"]
54+
// (through reference chain: org.apache.gobblin.initializer.MultiInitializer$Memento[\"orderedInitializersMementos\"]->java.util.ArrayList[0]
55+
// ->java.util.Optional[\"present\"])",
56+
// the following does NOT fix, probably due to `Optional`'s nesting with `List`:
57+
// @JsonIgnoreProperties(ignoreUnknown = true)
58+
@NonNull private List<AfterInitializeMemento> orderedInitializersMementos;
59+
}
60+
61+
3462
private final List<Initializer> initializers;
3563
private final Closer closer;
3664

@@ -57,4 +85,21 @@ public void close() {
5785
throw new RuntimeException(e);
5886
}
5987
}
60-
}
88+
89+
@Override
90+
public Optional<AfterInitializeMemento> commemorate() {
91+
return Optional.of(new MultiInitializer.Memento(this.initializers.stream()
92+
.map(Initializer::commemorate)
93+
.map(opt -> opt.orElse(null))
94+
.collect(Collectors.toList())));
95+
}
96+
97+
@Override
98+
public void recall(AfterInitializeMemento memento) {
99+
Memento recollection = memento.castAsOrThrow(MultiInitializer.Memento.class, this);
100+
Streams.zip(this.initializers.stream(), recollection.orderedInitializersMementos.stream(), (initializer, nullableInitializerMemento) -> {
101+
Optional.ofNullable(nullableInitializerMemento).ifPresent(initializer::recall);
102+
return null;
103+
}).count(); // force evaluation, since `Streams.zip` used purely for side effects
104+
}
105+
}

gobblin-core/src/main/java/org/apache/gobblin/writer/initializer/MultiWriterInitializer.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,31 +17,41 @@
1717

1818
package org.apache.gobblin.writer.initializer;
1919

20-
import org.apache.gobblin.initializer.Initializer;
21-
import org.apache.gobblin.initializer.MultiInitializer;
22-
20+
import java.util.Optional;
2321
import java.util.List;
2422

2523
import lombok.ToString;
2624

25+
import org.apache.gobblin.initializer.Initializer;
26+
import org.apache.gobblin.initializer.MultiInitializer;
27+
2728

2829
@ToString
2930
public class MultiWriterInitializer implements WriterInitializer {
3031

31-
private final Initializer intializer;
32+
private final Initializer initializer;
3233

3334
public MultiWriterInitializer(List<WriterInitializer> writerInitializers) {
34-
this.intializer = new MultiInitializer(writerInitializers);
35+
this.initializer = new MultiInitializer(writerInitializers);
3536
}
3637

3738
@Override
3839
public void initialize() {
39-
this.intializer.initialize();
40+
this.initializer.initialize();
4041
}
4142

4243
@Override
4344
public void close() {
44-
this.intializer.close();
45+
this.initializer.close();
4546
}
4647

48+
@Override
49+
public Optional<AfterInitializeMemento> commemorate() {
50+
return this.initializer.commemorate();
51+
}
52+
53+
@Override
54+
public void recall(AfterInitializeMemento memento) {
55+
this.initializer.recall(memento);
56+
}
4757
}

0 commit comments

Comments
 (0)