Skip to content

Commit 912ba24

Browse files
committed
Finish remove implementation and listeners.
1 parent 2809916 commit 912ba24

File tree

5 files changed

+386
-84
lines changed

5 files changed

+386
-84
lines changed
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.sdk.extension.incubator.entities;
7+
8+
import java.util.Collection;
9+
import java.util.Collections;
10+
import java.util.List;
11+
import java.util.concurrent.Callable;
12+
import java.util.concurrent.ExecutionException;
13+
import java.util.concurrent.ExecutorService;
14+
import java.util.concurrent.Future;
15+
import java.util.concurrent.FutureTask;
16+
import java.util.concurrent.RejectedExecutionException;
17+
import java.util.concurrent.TimeUnit;
18+
import java.util.stream.Collectors;
19+
20+
/**
21+
* An executor service that runs all jobs immediately on the current thread.
22+
*
23+
* <p>We use this so SDK users can determine how to isolate {@link EntityListener}s with the default
24+
* being no isolation of events.
25+
*/
26+
final class CurrentThreadExecutorService implements ExecutorService {
27+
private volatile boolean shutdown = false;
28+
29+
@Override
30+
public void execute(Runnable command) {
31+
if (shutdown) {
32+
throw new RejectedExecutionException("ExecutorService is shut down");
33+
}
34+
command.run();
35+
}
36+
37+
@Override
38+
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
39+
if (shutdown) {
40+
throw new RejectedExecutionException("ExecutorService is shut down");
41+
}
42+
// Execute all tasks synchronously and collect their Futures
43+
return tasks.stream().map(task -> submit(task)).collect(Collectors.toList());
44+
}
45+
46+
@Override
47+
public <T> List<Future<T>> invokeAll(
48+
Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) {
49+
return invokeAll(tasks);
50+
}
51+
52+
@Override
53+
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
54+
throws InterruptedException, ExecutionException {
55+
if (shutdown) {
56+
throw new RejectedExecutionException("ExecutorService is shut down");
57+
}
58+
// Execute all tasks synchronously and return first success.
59+
for (Callable<T> task : tasks) {
60+
try {
61+
// We wrap the task in a `submit` call to get ExecutionExceptions.
62+
return submit(task).get();
63+
} catch (ExecutionException e) {
64+
// Ignore this error, and try the next one.
65+
}
66+
}
67+
throw new ExecutionException("No tasks completed successfully", null);
68+
}
69+
70+
@Override
71+
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
72+
throws InterruptedException, ExecutionException {
73+
if (shutdown) {
74+
throw new RejectedExecutionException("ExecutorService is shut down");
75+
}
76+
return invokeAny(tasks);
77+
}
78+
79+
@Override
80+
public <T> Future<T> submit(Callable<T> task) {
81+
if (shutdown) {
82+
throw new RejectedExecutionException("ExecutorService is shut down");
83+
}
84+
FutureTask<T> future = new FutureTask<>(task);
85+
// Run in this thread.
86+
future.run();
87+
return future;
88+
}
89+
90+
@Override
91+
public Future<?> submit(Runnable task) {
92+
if (shutdown) {
93+
throw new RejectedExecutionException("ExecutorService is shut down");
94+
}
95+
FutureTask<?> future = new FutureTask<>(task, null);
96+
// Run in this thread.
97+
future.run();
98+
return future;
99+
}
100+
101+
@Override
102+
public <T> Future<T> submit(Runnable task, T result) {
103+
if (shutdown) {
104+
throw new RejectedExecutionException("ExecutorService is shut down");
105+
}
106+
FutureTask<T> future = new FutureTask<>(task, result);
107+
// Run in this thread.
108+
future.run();
109+
return future;
110+
}
111+
112+
@Override
113+
public boolean isShutdown() {
114+
return shutdown;
115+
}
116+
117+
@Override
118+
public boolean isTerminated() {
119+
return shutdown;
120+
}
121+
122+
@Override
123+
public void shutdown() {
124+
shutdown = true;
125+
}
126+
127+
@Override
128+
public List<Runnable> shutdownNow() {
129+
shutdown = true;
130+
return Collections.emptyList();
131+
}
132+
133+
@Override
134+
public boolean awaitTermination(long timeout, TimeUnit unit) {
135+
return isTerminated();
136+
}
137+
}

sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/entities/SdkEntityProvider.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,16 @@
77

88
import io.opentelemetry.api.incubator.entities.EntityBuilder;
99
import io.opentelemetry.api.incubator.entities.EntityProvider;
10+
import io.opentelemetry.sdk.common.CompletableResultCode;
1011
import io.opentelemetry.sdk.resources.Resource;
12+
import java.util.concurrent.TimeUnit;
1113

1214
/** The SDK implementation of {@link EntityProvider}. */
1315
public final class SdkEntityProvider implements EntityProvider {
14-
private final SdkResourceState state = new SdkResourceState();
16+
// TODO - Give control over listener execution model.
17+
// For now, just run everything on the same thread as the entity-attach call.
18+
private final SdkResourceSharedState state =
19+
new SdkResourceSharedState(new CurrentThreadExecutorService());
1520

1621
/**
1722
* Obtains the current {@link Resource} for the SDK.
@@ -22,6 +27,11 @@ public Resource getResource() {
2227
return state.getResource();
2328
}
2429

30+
/**
31+
* Creates a builder for SdkEntityProvider.
32+
*
33+
* @return The new builder.
34+
*/
2535
public static SdkEntityProviderBuilder builder() {
2636
return new SdkEntityProviderBuilder();
2737
}
@@ -33,12 +43,27 @@ public String toString() {
3343

3444
@Override
3545
public boolean removeEntity(String entityType) {
36-
// TODO Auto-generated method stub
37-
throw new UnsupportedOperationException("Unimplemented method 'removeEntity'");
46+
return state.removeEntity(entityType);
3847
}
3948

4049
@Override
4150
public EntityBuilder attachOrUpdateEntity(String entityType) {
42-
return new SdkEntityBuilder(entityType, state::attachEntityOnEmit);
51+
return new SdkEntityBuilder(entityType, state::addOrUpdateEntity);
52+
}
53+
54+
public void onChange(EntityListener listener) {
55+
state.addListener(listener);
56+
}
57+
58+
/**
59+
* Shutdown the provider. The resulting {@link CompletableResultCode} completes when all complete.
60+
*/
61+
public CompletableResultCode shutdown() {
62+
return state.shutdown();
63+
}
64+
65+
/** Close the provider. Calls {@link #shutdown()} and blocks waiting for it to complete. */
66+
public void close() {
67+
shutdown().join(10, TimeUnit.SECONDS);
4368
}
4469
}
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.sdk.extension.incubator.entities;
7+
8+
import io.opentelemetry.api.internal.GuardedBy;
9+
import io.opentelemetry.sdk.common.CompletableResultCode;
10+
import io.opentelemetry.sdk.internal.ThrottlingLogger;
11+
import io.opentelemetry.sdk.resources.Resource;
12+
import io.opentelemetry.sdk.resources.internal.Entity;
13+
import io.opentelemetry.sdk.resources.internal.EntityUtil;
14+
import java.util.ArrayList;
15+
import java.util.List;
16+
import java.util.concurrent.CopyOnWriteArrayList;
17+
import java.util.concurrent.ExecutorService;
18+
import java.util.concurrent.atomic.AtomicReference;
19+
import java.util.logging.Level;
20+
import java.util.logging.Logger;
21+
import javax.annotation.Nullable;
22+
23+
/**
24+
* This class does all state and listener management for a {@link Resource} constructed of {@link
25+
* Entity}s.
26+
*/
27+
final class SdkResourceSharedState {
28+
29+
// The currently advertised Resource to other SDK providers.
30+
private final AtomicReference<Resource> resource = new AtomicReference<>(Resource.empty());
31+
private final Object writeLock = new Object();
32+
private final List<EntityListener> listeners = new CopyOnWriteArrayList<>();
33+
private final ExecutorService listenerExecutor;
34+
35+
// Our internal storage of registered entities.
36+
@GuardedBy("writeLock")
37+
private final ArrayList<Entity> entities = new ArrayList<>();
38+
39+
private static final ThrottlingLogger logger =
40+
new ThrottlingLogger(Logger.getLogger(SdkResourceSharedState.class.getName()));
41+
42+
SdkResourceSharedState(ExecutorService listenerExecutor) {
43+
this.listenerExecutor = listenerExecutor;
44+
}
45+
46+
/**
47+
* Shutdown the provider. The resulting {@link CompletableResultCode} completes when all complete.
48+
*/
49+
CompletableResultCode shutdown() {
50+
// TODO - Actually figure out how to wait for shutdown and deal with pending tasks.
51+
listenerExecutor.shutdown();
52+
return CompletableResultCode.ofSuccess();
53+
}
54+
55+
/** Returns the currently active resource. */
56+
public Resource getResource() {
57+
Resource result = resource.get();
58+
// We do this to make NullAway happy.
59+
if (result == null) {
60+
throw new IllegalStateException("SdkResource should never have null resource");
61+
}
62+
return result;
63+
}
64+
65+
private static boolean hasSameSchemaUrl(Entity lhs, Entity rhs) {
66+
if (lhs.getSchemaUrl() != null) {
67+
return lhs.getSchemaUrl().equals(rhs.getSchemaUrl());
68+
}
69+
return rhs.getSchemaUrl() == null;
70+
}
71+
72+
/**
73+
* Removes an entity by type and notifies listeners.
74+
*
75+
* @param entityType The entity type to remove.
76+
*/
77+
boolean removeEntity(String entityType) {
78+
synchronized (writeLock) {
79+
@Nullable Entity removed = null;
80+
for (Entity existing : entities) {
81+
if (existing.getType().equals(entityType)) {
82+
removed = existing;
83+
}
84+
}
85+
if (removed == null) {
86+
return false;
87+
}
88+
entities.remove(removed);
89+
Resource result = EntityUtil.createResource(entities);
90+
resource.lazySet(result);
91+
publishEntityDelete(new SdkEntityState(removed), result);
92+
return true;
93+
}
94+
}
95+
96+
/**
97+
* Adds an entity and notifies listeners.
98+
*
99+
* <p>Note: This will not add an entity on conflict. This will update the description if the
100+
* entity already exists.
101+
*
102+
* @param e The entity type to add.
103+
*/
104+
void addOrUpdateEntity(Entity e) {
105+
synchronized (writeLock) {
106+
@Nullable Entity conflict = null;
107+
for (Entity existing : entities) {
108+
if (existing.getType().equals(e.getType())) {
109+
conflict = existing;
110+
}
111+
}
112+
Entity newState = e;
113+
if (conflict != null) {
114+
if (hasSameSchemaUrl(conflict, e) && conflict.getId().equals(e.getId())) {
115+
// We can merge descriptive attributes.
116+
entities.remove(conflict);
117+
io.opentelemetry.sdk.resources.internal.EntityBuilder newEntity =
118+
Entity.builder(conflict.getType())
119+
.withId(conflict.getId())
120+
.withDescription(
121+
conflict.getDescription().toBuilder().putAll(e.getDescription()).build());
122+
if (conflict.getSchemaUrl() != null) {
123+
newEntity.setSchemaUrl(conflict.getSchemaUrl());
124+
}
125+
newState = newEntity.build();
126+
entities.add(newState);
127+
} else {
128+
logger.log(Level.INFO, "Ignoring new entity, conflicts with existing: " + e);
129+
return;
130+
}
131+
} else {
132+
entities.add(e);
133+
}
134+
Resource result = EntityUtil.createResource(entities);
135+
resource.lazySet(result);
136+
publishEntityStateChange(new SdkEntityState(newState), result);
137+
}
138+
}
139+
140+
@SuppressWarnings("FutureReturnValueIgnored")
141+
private void publishEntityStateChange(EntityState state, Resource resource) {
142+
for (EntityListener listener : listeners) {
143+
// We isolate listener execution via executor, if configured.
144+
// We ignore failures on futures to avoid having one listener block others.
145+
listenerExecutor.submit(() -> listener.onEntityState(state, resource));
146+
}
147+
}
148+
149+
@SuppressWarnings("FutureReturnValueIgnored")
150+
private void publishEntityDelete(EntityState deleted, Resource resource) {
151+
for (EntityListener listener : listeners) {
152+
// We isolate listener execution via executor, if configured.
153+
// We ignore failures on futures to avoid having one listener block others.
154+
listenerExecutor.submit(() -> listener.onEntityDelete(deleted, resource));
155+
}
156+
}
157+
158+
public void addListener(EntityListener listener) {
159+
listeners.add(listener);
160+
}
161+
}

0 commit comments

Comments
 (0)