Skip to content

Commit cf783a0

Browse files
Blazer-007Will-Lo
authored andcommitted
[GOBBLIN-2227] Make dag action and spec store monitor initialization config driven (#4141)
* make consumer initialization config driven * add header lines * added java docs
1 parent ac6423a commit cf783a0

11 files changed

+148
-36
lines changed

gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,15 +82,15 @@
8282
import org.apache.gobblin.service.modules.troubleshooter.MySqlMultiContextIssueRepository;
8383
import org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
8484
import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
85-
import org.apache.gobblin.service.monitoring.DagManagementDagActionStoreChangeMonitor;
85+
import org.apache.gobblin.service.monitoring.DagActionChangeMonitor;
8686
import org.apache.gobblin.service.monitoring.DagManagementDagActionStoreChangeMonitorFactory;
8787
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
8888
import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
8989
import org.apache.gobblin.service.monitoring.GitConfigMonitor;
90+
import org.apache.gobblin.service.monitoring.JobStatusMonitor;
9091
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
91-
import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
9292
import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitorFactory;
93-
import org.apache.gobblin.service.monitoring.SpecStoreChangeMonitor;
93+
import org.apache.gobblin.service.monitoring.SpecChangeMonitor;
9494
import org.apache.gobblin.service.monitoring.SpecStoreChangeMonitorFactory;
9595
import org.apache.gobblin.util.ClassAliasResolver;
9696
import org.apache.gobblin.util.ConfigUtils;
@@ -159,7 +159,7 @@ binding time (optionally bound classes cannot have names associated with them),
159159

160160
binder.bind(DagActionReminderScheduler.class);
161161
binder.bind(DagActionStore.class).to(MysqlDagActionStore.class);
162-
binder.bind(DagManagementDagActionStoreChangeMonitor.class).toProvider(
162+
binder.bind(DagActionChangeMonitor.class).toProvider(
163163
DagManagementDagActionStoreChangeMonitorFactory.class).in(Singleton.class);
164164
binder.bind(DagManagement.class).to(DagManagementTaskStreamImpl.class);
165165
binder.bind(DagManagementStateStore.class).to(MySqlDagManagementStateStore.class);
@@ -169,7 +169,7 @@ binding time (optionally bound classes cannot have names associated with them),
169169
binder.bind(DagProcessingEngineMetrics.class);
170170
binder.bind(FlowLaunchHandler.class);
171171
binder.bind(MultiActiveLeaseArbiter.class).toProvider(DagActionProcessingMultiActiveLeaseArbiterFactory.class);
172-
binder.bind(SpecStoreChangeMonitor.class).toProvider(SpecStoreChangeMonitorFactory.class).in(Singleton.class);
172+
binder.bind(SpecChangeMonitor.class).toProvider(SpecStoreChangeMonitorFactory.class).in(Singleton.class);
173173

174174
binder.bind(RequesterService.class).to(NoopRequesterService.class);
175175
binder.bind(SharedFlowMetricsSingleton.class);
@@ -188,7 +188,7 @@ binding time (optionally bound classes cannot have names associated with them),
188188
}
189189

190190
if (serviceConfig.isJobStatusMonitorEnabled()) {
191-
binder.bind(KafkaJobStatusMonitor.class).toProvider(KafkaJobStatusMonitorFactory.class).in(Singleton.class);
191+
binder.bind(JobStatusMonitor.class).toProvider(KafkaJobStatusMonitorFactory.class).in(Singleton.class);
192192
binder.bind(ErrorClassifier.class);
193193
binder.bind(ErrorPatternStore.class)
194194
.to(getClassByNameOrAlias(ErrorPatternStore.class, serviceConfig.getInnerConfig(),

gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,11 @@
7777
import org.apache.gobblin.service.modules.restli.FlowExecutionResourceHandler;
7878
import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
7979
import org.apache.gobblin.service.modules.topology.TopologySpecFactory;
80-
import org.apache.gobblin.service.monitoring.DagManagementDagActionStoreChangeMonitor;
80+
import org.apache.gobblin.service.monitoring.DagActionChangeMonitor;
8181
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
8282
import org.apache.gobblin.service.monitoring.GitConfigMonitor;
83-
import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
84-
import org.apache.gobblin.service.monitoring.SpecStoreChangeMonitor;
83+
import org.apache.gobblin.service.monitoring.JobStatusMonitor;
84+
import org.apache.gobblin.service.monitoring.SpecChangeMonitor;
8585
import org.apache.gobblin.util.ConfigUtils;
8686

8787

@@ -150,7 +150,7 @@ public class GobblinServiceManager implements ApplicationLauncher {
150150
protected GitConfigMonitor gitConfigMonitor;
151151

152152
@Inject(optional = true)
153-
protected KafkaJobStatusMonitor jobStatusMonitor;
153+
protected JobStatusMonitor jobStatusMonitor;
154154

155155
@Inject
156156
protected MultiContextIssueRepository issueRepository;
@@ -166,10 +166,10 @@ public class GobblinServiceManager implements ApplicationLauncher {
166166
protected D2Announcer d2Announcer;
167167

168168
@Inject(optional = true)
169-
protected SpecStoreChangeMonitor specStoreChangeMonitor;
169+
protected SpecChangeMonitor specStoreChangeMonitor;
170170

171171
@Inject(optional = true)
172-
protected DagManagementDagActionStoreChangeMonitor dagManagementDagActionStoreChangeMonitor;
172+
protected DagActionChangeMonitor dagManagementDagActionStoreChangeMonitor;
173173

174174
@Inject(optional = true)
175175
protected DagProcessingEngine dagProcessingEngine;
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.gobblin.service.monitoring;
19+
20+
import com.google.common.util.concurrent.Service;
21+
22+
23+
/**
24+
* A marker interface for dag action change monitors to generalize initialization in {@link org.apache.gobblin.service.modules.core.GobblinServiceManager#dagManagementDagActionStoreChangeMonitor}
25+
*/
26+
public interface DagActionChangeMonitor extends Service {
27+
28+
/**
29+
* Set the monitor to active state where it can start processing events.
30+
* Should be called from {@link org.apache.gobblin.service.modules.core.GobblinServiceManager} after the service is started.
31+
*/
32+
void setActive();
33+
}

gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959
* connector between the API and execution layers of GaaS.
6060
*/
6161
@Slf4j
62-
public abstract class DagActionStoreChangeMonitor extends HighLevelConsumer<String, DagActionStoreChangeEvent> {
62+
public abstract class DagActionStoreChangeMonitor extends HighLevelConsumer<String, DagActionStoreChangeEvent> implements DagActionChangeMonitor {
6363
public static final String DAG_ACTION_CHANGE_MONITOR_PREFIX = "dagActionChangeStore";
6464

6565
// Metrics
@@ -166,6 +166,7 @@ protected void startUp() {}
166166
This method should be called once by the {@link GobblinServiceManager} only after the FlowGraph and
167167
SpecCompiler are initialized and running.
168168
*/
169+
@Override
169170
public synchronized void setActive() {
170171
if (this.isActive) {
171172
return;

gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,17 @@
3030
import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
3131
import org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
3232
import org.apache.gobblin.util.ConfigUtils;
33+
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
3334

3435

3536
/**
36-
* A factory implementation that returns a {@link DagManagementDagActionStoreChangeMonitor} instance.
37+
* A factory implementation that returns a {@link DagActionChangeMonitor} instance.
3738
*/
3839
@Slf4j
39-
public class DagManagementDagActionStoreChangeMonitorFactory implements Provider<DagManagementDagActionStoreChangeMonitor> {
40-
static final String DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY = "numThreads";
40+
public class DagManagementDagActionStoreChangeMonitorFactory implements Provider<DagActionChangeMonitor> {
41+
private static final String DAG_ACTION_STORE_CHANGE_MONITOR_CLASS_KEY = "class";
42+
private static final String DEFAULT_DAG_ACTION_STORE_CHANGE_MONITOR_CLASS = DagManagementDagActionStoreChangeMonitor.class.getName();
43+
private static final String DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY = "numThreads";
4144

4245
private final Config config;
4346
private final DagManagementStateStore dagManagementStateStore;
@@ -55,18 +58,25 @@ public DagManagementDagActionStoreChangeMonitorFactory(Config config, DagManagem
5558
this.dagProcEngineMetrics = dagProcEngineMetrics;
5659
}
5760

58-
private DagManagementDagActionStoreChangeMonitor createDagActionStoreMonitor() {
61+
private DagActionChangeMonitor createDagActionStoreMonitor() throws ReflectiveOperationException {
5962
Config dagActionStoreChangeConfig = this.config.getConfig(DagActionStoreChangeMonitor.DAG_ACTION_CHANGE_MONITOR_PREFIX);
60-
log.info("DagActionStore will be initialized with config {}", dagActionStoreChangeConfig);
63+
Class<?> dagActionStoreChangeMonitorClass = Class.forName(
64+
ConfigUtils.getString(dagActionStoreChangeConfig, DAG_ACTION_STORE_CHANGE_MONITOR_CLASS_KEY, DEFAULT_DAG_ACTION_STORE_CHANGE_MONITOR_CLASS));
65+
log.info("DagActionStore `{}` will be initialized with config {}", dagActionStoreChangeMonitorClass, dagActionStoreChangeConfig);
6166

6267
int numThreads = ConfigUtils.getInt(dagActionStoreChangeConfig, DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY, 5);
6368

64-
return new DagManagementDagActionStoreChangeMonitor(dagActionStoreChangeConfig, numThreads, dagManagementStateStore,
65-
this.dagManagement, this.dagActionReminderScheduler, dagProcEngineMetrics);
69+
return (DagActionChangeMonitor) GobblinConstructorUtils.invokeLongestConstructor(dagActionStoreChangeMonitorClass,
70+
dagActionStoreChangeConfig, numThreads, dagManagementStateStore, this.dagManagement,
71+
this.dagActionReminderScheduler, dagProcEngineMetrics);
6672
}
6773

6874
@Override
69-
public DagManagementDagActionStoreChangeMonitor get() {
70-
return createDagActionStoreMonitor();
75+
public DagActionChangeMonitor get() {
76+
try {
77+
return createDagActionStoreMonitor();
78+
} catch (ReflectiveOperationException e) {
79+
throw new RuntimeException("Failed to initialize DagActionStoreChangeMonitor due to ", e);
80+
}
7181
}
7282
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.gobblin.service.monitoring;
19+
20+
import com.google.common.util.concurrent.Service;
21+
22+
23+
/**
24+
* A marker interface for job status monitors to generalize initialization in {@link org.apache.gobblin.service.modules.core.GobblinServiceManager#jobStatusMonitor}
25+
*/
26+
public interface JobStatusMonitor extends Service {
27+
}

gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@
8989
* a {@link FileContextBasedFsStateStore}.
9090
*/
9191
@Slf4j
92-
public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], byte[]> {
92+
public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], byte[]> implements JobStatusMonitor {
9393
public static final String JOB_STATUS_MONITOR_PREFIX = "jobStatusMonitor";
9494
//We use table suffix that is different from the Gobblin job state store suffix of jst to avoid confusion.
9595
//gst refers to the state store suffix for GaaS-orchestrated Gobblin jobs.

gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
* A factory implementation that returns a {@link KafkaJobStatusMonitor} instance.
4444
*/
4545
@Slf4j
46-
public class KafkaJobStatusMonitorFactory implements Provider<KafkaJobStatusMonitor> {
46+
public class KafkaJobStatusMonitorFactory implements Provider<JobStatusMonitor> {
4747
private static final String KAFKA_SSL_CONFIG_PREFIX_KEY = "jobStatusMonitor.kafka.config";
4848
private static final String DEFAULT_KAFKA_SSL_CONFIG_PREFIX = "metrics.reporting.kafka.config";
4949

@@ -65,7 +65,7 @@ public KafkaJobStatusMonitorFactory(Config config, JobIssueEventHandler jobIssue
6565
this.errorClassifier = errorClassifier;
6666
}
6767

68-
private KafkaJobStatusMonitor createJobStatusMonitor()
68+
private JobStatusMonitor createJobStatusMonitor()
6969
throws ReflectiveOperationException {
7070
Config jobStatusConfig = config.getConfig(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX);
7171

@@ -94,18 +94,19 @@ private KafkaJobStatusMonitor createJobStatusMonitor()
9494
GaaSJobObservabilityEventProducer.DEFAULT_GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS));
9595
GaaSJobObservabilityEventProducer observabilityEventProducer = (GaaSJobObservabilityEventProducer) GobblinConstructorUtils.invokeLongestConstructor(
9696
observabilityEventProducerClassName, ConfigUtils.configToState(config), this.issueRepository, this.instrumentationEnabled);
97+
log.info("JobStatusMonitor class `{}` will be initialized with config {}", jobStatusMonitorClass, jobStatusConfig);
9798

98-
return (KafkaJobStatusMonitor) GobblinConstructorUtils
99+
return (JobStatusMonitor) GobblinConstructorUtils
99100
.invokeLongestConstructor(jobStatusMonitorClass, topic, jobStatusConfig, numThreads, jobIssueEventHandler, observabilityEventProducer,
100101
dagManagementStateStore, errorClassifier);
101102
}
102103

103104
@Override
104-
public KafkaJobStatusMonitor get() {
105+
public JobStatusMonitor get() {
105106
try {
106107
return createJobStatusMonitor();
107108
} catch (ReflectiveOperationException e) {
108-
throw new RuntimeException(e);
109+
throw new RuntimeException("Failed to initialize JobStatusMonitor due to ", e);
109110
}
110111
}
111112
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.gobblin.service.monitoring;
19+
20+
import com.google.common.util.concurrent.Service;
21+
22+
23+
/**
24+
* A marker interface for spec change monitors to generalize initialization in {@link org.apache.gobblin.service.modules.core.GobblinServiceManager#specStoreChangeMonitor}
25+
*/
26+
public interface SpecChangeMonitor extends Service {
27+
28+
/**
29+
* Set the monitor to active state where it can start processing events.
30+
* Should be called from {@link org.apache.gobblin.service.modules.core.GobblinServiceManager} after the service is started.
31+
*/
32+
void setActive();
33+
}

gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
* a connector between the API and execution layers of GaaS.
5252
*/
5353
@Slf4j
54-
public class SpecStoreChangeMonitor extends HighLevelConsumer<String, GenericStoreChangeEvent> {
54+
public class SpecStoreChangeMonitor extends HighLevelConsumer<String, GenericStoreChangeEvent> implements SpecChangeMonitor {
5555
public static final String SPEC_STORE_CHANGE_MONITOR_PREFIX = "specStoreChangeMonitor";
5656

5757
// Metrics
@@ -108,6 +108,7 @@ protected void startUp() {}
108108
This method should be called once by the {@link GobblinServiceManager} only after the Scheduler is active to ensure
109109
calls to onAddSpec don't fail specCompilation.
110110
*/
111+
@Override
111112
public synchronized void setActive() {
112113
if (this.isActive) {
113114
return;

0 commit comments

Comments
 (0)