Skip to content

Commit f321248

Browse files
yaauieandsel
andauthored
PQ settings refactor: propagate builder upward (#18180)
* noop: refactor pq settings to make future changes simpler The `ackedqueue.SettingsImpl` uses an _immutable_ builder, which makes adding options cumbersome; each additional property added needs to modify code from all existing options. By introducing an api-internal temporary mutable builder, we can simplify the process of creating an immutable copy that has a single component modified. * pq settings: validate while building * noop: refacor pq-related constructors to take ackedqueue.Settings * pq init refactor: assertion style Co-authored-by: Andrea Selva <[email protected]> * pq settings: builder need not be immutable, but settings should be --------- Co-authored-by: Andrea Selva <[email protected]>
1 parent 8228fe5 commit f321248

File tree

8 files changed

+186
-129
lines changed

8 files changed

+186
-129
lines changed

logstash-core/spec/logstash/acked_queue_concurrent_stress_spec.rb

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,17 @@
2828
let(:output_strings) { [] }
2929
let(:reject_memo_keys) { [:reject_memo_keys, :path, :queue, :writer_threads, :collector, :metric, :reader_threads, :output_strings] }
3030

31+
let(:queue_settings) do
32+
LogStash::AckedQueue.file_settings_builder(path)
33+
.capacity(page_capacity)
34+
.checkpointMaxAcks(queue_checkpoint_acks)
35+
.checkpointMaxWrites(queue_checkpoint_writes)
36+
.queueMaxBytes(queue_capacity)
37+
.build
38+
end
39+
3140
let(:queue) do
32-
described_class.new(path, page_capacity, 0, queue_checkpoint_acks, queue_checkpoint_writes, false, queue_capacity)
41+
described_class.new(queue_settings)
3342
end
3443

3544
let(:writer_threads) do

logstash-core/spec/logstash/instrument/wrapped_write_client_spec.rb

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,18 @@ def threaded_read_client
125125

126126
context "WrappedAckedQueue" do
127127
let(:path) { Stud::Temporary.directory }
128-
let(:queue) { LogStash::WrappedAckedQueue.new(path, 1024, 10, 1024, 1024, false, 4096) }
128+
129+
let(:queue_settings) do
130+
LogStash::AckedQueue.file_settings_builder(path)
131+
.capacity(1024)
132+
.maxUnread(10)
133+
.checkpointMaxAcks(1024)
134+
.checkpointMaxWrites(1024)
135+
.queueMaxBytes(4096)
136+
.build
137+
end
138+
139+
let(:queue) { LogStash::WrappedAckedQueue.new(queue_settings) }
129140

130141
before do
131142
read_client.set_events_metric(metric.namespace([:stats, :events]))

logstash-core/spec/logstash/util/wrapped_acked_queue_spec.rb

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,18 @@
5353
let(:checkpoint_acks) { 1024 }
5454
let(:checkpoint_writes) { 1024 }
5555
let(:path) { Stud::Temporary.directory }
56-
let(:queue) { LogStash::WrappedAckedQueue.new(path, page_capacity, max_events, checkpoint_acks, checkpoint_writes, false, max_bytes) }
56+
57+
let(:queue_settings) do
58+
LogStash::AckedQueue.file_settings_builder(path)
59+
.capacity(page_capacity)
60+
.maxUnread(max_events)
61+
.checkpointMaxAcks(checkpoint_acks)
62+
.checkpointMaxWrites(checkpoint_writes)
63+
.queueMaxBytes(max_bytes)
64+
.build
65+
end
66+
67+
let(:queue) { LogStash::WrappedAckedQueue.new(queue_settings) }
5768

5869
after do
5970
queue.close

logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,15 @@
3131
import org.jruby.anno.JRubyMethod;
3232
import org.jruby.runtime.ThreadContext;
3333
import org.jruby.runtime.builtin.IRubyObject;
34+
import org.logstash.Event;
3435
import org.logstash.RubyUtil;
3536
import org.logstash.ackedqueue.ext.JRubyWrappedAckedQueueExt;
3637
import org.logstash.common.SettingKeyDefinitions;
3738
import org.logstash.execution.AbstractWrappedQueueExt;
3839
import org.logstash.ext.JrubyWrappedSynchronousQueueExt;
3940

41+
import static org.logstash.common.SettingKeyDefinitions.*;
42+
4043
/**
4144
* Persistent queue factory JRuby extension.
4245
* */
@@ -69,29 +72,16 @@ public static AbstractWrappedQueueExt create(final ThreadContext context, final
6972
final IRubyObject settings) throws IOException {
7073
final String type = getSetting(context, settings, QUEUE_TYPE_CONTEXT_NAME).asJavaString();
7174
if (PERSISTED_TYPE.equals(type)) {
72-
final Path queuePath = Paths.get(
73-
getSetting(context, settings, SettingKeyDefinitions.PATH_QUEUE).asJavaString(),
74-
getSetting(context, settings, SettingKeyDefinitions.PIPELINE_ID).asJavaString()
75-
);
75+
final Settings queueSettings = extractQueueSettings(settings);
76+
final Path queuePath = Paths.get(queueSettings.getDirPath());
7677

7778
// Files.createDirectories raises a FileAlreadyExistsException
7879
// if pipeline queue path is a symlink, so worth checking against Files.exists
7980
if (Files.exists(queuePath) == false) {
8081
Files.createDirectories(queuePath);
8182
}
8283

83-
return new JRubyWrappedAckedQueueExt(context.runtime, RubyUtil.WRAPPED_ACKED_QUEUE_CLASS)
84-
.initialize(
85-
context, new IRubyObject[]{
86-
context.runtime.newString(queuePath.toString()),
87-
getSetting(context, settings, SettingKeyDefinitions.QUEUE_PAGE_CAPACITY),
88-
getSetting(context, settings, SettingKeyDefinitions.QUEUE_MAX_EVENTS),
89-
getSetting(context, settings, SettingKeyDefinitions.QUEUE_CHECKPOINT_WRITES),
90-
getSetting(context, settings, SettingKeyDefinitions.QUEUE_CHECKPOINT_ACKS),
91-
getSetting(context, settings, SettingKeyDefinitions.QUEUE_CHECKPOINT_RETRY),
92-
getSetting(context, settings, SettingKeyDefinitions.QUEUE_MAX_BYTES)
93-
}
94-
);
84+
return JRubyWrappedAckedQueueExt.create(context, queueSettings);
9585
} else if (MEMORY_TYPE.equals(type)) {
9686
return new JrubyWrappedSynchronousQueueExt(
9787
context.runtime, RubyUtil.WRAPPED_SYNCHRONOUS_QUEUE_CLASS
@@ -118,4 +108,21 @@ private static IRubyObject getSetting(final ThreadContext context, final IRubyOb
118108
final String name) {
119109
return settings.callMethod(context, "get_value", context.runtime.newString(name));
120110
}
111+
112+
private static Settings extractQueueSettings(final IRubyObject settings) {
113+
final ThreadContext context = settings.getRuntime().getCurrentContext();
114+
final Path queuePath = Paths.get(
115+
getSetting(context, settings, PATH_QUEUE).asJavaString(),
116+
getSetting(context, settings, PIPELINE_ID).asJavaString()
117+
);
118+
return SettingsImpl.fileSettingsBuilder(queuePath.toString())
119+
.elementClass(Event.class)
120+
.capacity(getSetting(context, settings, QUEUE_PAGE_CAPACITY).toJava(Integer.class))
121+
.maxUnread(getSetting(context, settings, QUEUE_MAX_EVENTS).toJava(Integer.class))
122+
.checkpointMaxWrites(getSetting(context, settings, QUEUE_CHECKPOINT_WRITES).toJava(Integer.class))
123+
.checkpointMaxAcks(getSetting(context, settings, QUEUE_CHECKPOINT_ACKS).toJava(Integer.class))
124+
.checkpointRetry(getSetting(context, settings, QUEUE_CHECKPOINT_RETRY).isTrue())
125+
.queueMaxBytes(getSetting(context, settings, QUEUE_MAX_BYTES).toJava(Integer.class))
126+
.build();
127+
}
121128
}

logstash-core/src/main/java/org/logstash/ackedqueue/Settings.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020

2121
package org.logstash.ackedqueue;
2222

23+
import java.util.ArrayList;
24+
import java.util.List;
25+
2326
/**
2427
* Persistent queue settings definition.
2528
* */
@@ -41,6 +44,32 @@ public interface Settings {
4144

4245
boolean getCheckpointRetry();
4346

47+
/**
48+
* Validate and return the settings, or throw descriptive {@link QueueRuntimeException}
49+
* @param settings the settings to validate
50+
* @return the settings that were provided
51+
*/
52+
static Settings ensureValid(final Settings settings) {
53+
final List<String> errors = new ArrayList<>();
54+
55+
if (settings == null) {
56+
errors.add("settings cannot be null");
57+
} else {
58+
if (settings.getDirPath() == null) {
59+
errors.add("dirPath cannot be null");
60+
}
61+
if (settings.getElementClass() == null) {
62+
errors.add("elementClass cannot be null");
63+
}
64+
}
65+
66+
if (!errors.isEmpty()) {
67+
throw new QueueRuntimeException(String.format("Invalid Queue Settings: %s", errors));
68+
}
69+
70+
return settings;
71+
}
72+
4473
/**
4574
* Persistent queue Setting's fluent builder definition
4675
* */

logstash-core/src/main/java/org/logstash/ackedqueue/SettingsImpl.java

Lines changed: 57 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -17,44 +17,38 @@
1717
* under the License.
1818
*/
1919

20-
2120
package org.logstash.ackedqueue;
2221

2322
/**
2423
* Persistent queue settings implementation.
2524
* */
2625
public class SettingsImpl implements Settings {
27-
private String dirForFiles;
28-
private Class<? extends Queueable> elementClass;
29-
private int capacity;
30-
private long queueMaxBytes;
31-
private int maxUnread;
32-
private int checkpointMaxAcks;
33-
private int checkpointMaxWrites;
34-
private boolean checkpointRetry;
26+
private final String dirForFiles;
27+
private final Class<? extends Queueable> elementClass;
28+
private final int capacity;
29+
private final long queueMaxBytes;
30+
private final int maxUnread;
31+
private final int checkpointMaxAcks;
32+
private final int checkpointMaxWrites;
33+
private final boolean checkpointRetry;
3534

3635
public static Builder builder(final Settings settings) {
37-
return new BuilderImpl(settings.getDirPath(), settings.getElementClass(), settings.getCapacity(),
38-
settings.getQueueMaxBytes(), settings.getMaxUnread(), settings.getCheckpointMaxAcks(),
39-
settings.getCheckpointMaxWrites(), settings.getCheckpointRetry()
40-
);
36+
return new BuilderImpl(settings);
4137
}
4238

4339
public static Builder fileSettingsBuilder(final String dirForFiles) {
4440
return new BuilderImpl(dirForFiles);
4541
}
4642

47-
private SettingsImpl(final String dirForFiles, final Class<? extends Queueable> elementClass,
48-
final int capacity, final long queueMaxBytes, final int maxUnread,
49-
final int checkpointMaxAcks, final int checkpointMaxWrites, boolean checkpointRetry) {
50-
this.dirForFiles = dirForFiles;
51-
this.elementClass = elementClass;
52-
this.capacity = capacity;
53-
this.queueMaxBytes = queueMaxBytes;
54-
this.maxUnread = maxUnread;
55-
this.checkpointMaxAcks = checkpointMaxAcks;
56-
this.checkpointMaxWrites = checkpointMaxWrites;
57-
this.checkpointRetry = checkpointRetry;
43+
private SettingsImpl(final BuilderImpl builder) {
44+
this.dirForFiles = builder.dirForFiles;
45+
this.elementClass = builder.elementClass;
46+
this.capacity = builder.capacity;
47+
this.queueMaxBytes = builder.queueMaxBytes;
48+
this.maxUnread = builder.maxUnread;
49+
this.checkpointMaxAcks = builder.checkpointMaxAcks;
50+
this.checkpointMaxWrites = builder.checkpointMaxWrites;
51+
this.checkpointRetry = builder.checkpointRetry;
5852
}
5953

6054
@Override
@@ -132,103 +126,87 @@ private static final class BuilderImpl implements Builder {
132126

133127
private final String dirForFiles;
134128

135-
private final Class<? extends Queueable> elementClass;
129+
private Class<? extends Queueable> elementClass;
136130

137-
private final int capacity;
131+
private int capacity;
138132

139-
private final long queueMaxBytes;
133+
private long queueMaxBytes;
140134

141-
private final int maxUnread;
135+
private int maxUnread;
142136

143-
private final int checkpointMaxAcks;
137+
private int checkpointMaxAcks;
144138

145-
private final int checkpointMaxWrites;
139+
private int checkpointMaxWrites;
146140

147-
private final boolean checkpointRetry;
141+
private boolean checkpointRetry;
148142

149143
private BuilderImpl(final String dirForFiles) {
150-
this(dirForFiles, null, DEFAULT_CAPACITY, DEFAULT_MAX_QUEUE_BYTES,
151-
DEFAULT_MAX_UNREAD, DEFAULT_CHECKPOINT_MAX_ACKS, DEFAULT_CHECKPOINT_MAX_WRITES, false
152-
);
144+
this.dirForFiles = dirForFiles;
145+
this.elementClass = null;
146+
this.capacity = DEFAULT_CAPACITY;
147+
this.queueMaxBytes = DEFAULT_MAX_QUEUE_BYTES;
148+
this.maxUnread = DEFAULT_MAX_UNREAD;
149+
this.checkpointMaxAcks = DEFAULT_CHECKPOINT_MAX_ACKS;
150+
this.checkpointMaxWrites = DEFAULT_CHECKPOINT_MAX_WRITES;
151+
this.checkpointRetry = false;
153152
}
154153

155-
private BuilderImpl(final String dirForFiles, final Class<? extends Queueable> elementClass,
156-
final int capacity, final long queueMaxBytes, final int maxUnread,
157-
final int checkpointMaxAcks, final int checkpointMaxWrites, final boolean checkpointRetry) {
158-
this.dirForFiles = dirForFiles;
159-
this.elementClass = elementClass;
160-
this.capacity = capacity;
161-
this.queueMaxBytes = queueMaxBytes;
162-
this.maxUnread = maxUnread;
163-
this.checkpointMaxAcks = checkpointMaxAcks;
164-
this.checkpointMaxWrites = checkpointMaxWrites;
165-
this.checkpointRetry = checkpointRetry;
154+
private BuilderImpl(final Settings settings) {
155+
this.dirForFiles = settings.getDirPath();
156+
this.elementClass = settings.getElementClass();
157+
this.capacity = settings.getCapacity();
158+
this.queueMaxBytes = settings.getQueueMaxBytes();
159+
this.maxUnread = settings.getMaxUnread();
160+
this.checkpointMaxAcks = settings.getCheckpointMaxAcks();
161+
this.checkpointMaxWrites = settings.getCheckpointMaxWrites();
162+
this.checkpointRetry = settings.getCheckpointRetry();
166163
}
167164

168165
@Override
169166
public Builder elementClass(final Class<? extends Queueable> elementClass) {
170-
return new BuilderImpl(
171-
this.dirForFiles, elementClass, this.capacity, this.queueMaxBytes, this.maxUnread,
172-
this.checkpointMaxAcks, this.checkpointMaxWrites, this.checkpointRetry
173-
);
167+
this.elementClass = elementClass;
168+
return this;
174169
}
175170

176171
@Override
177172
public Builder capacity(final int capacity) {
178-
return new BuilderImpl(
179-
this.dirForFiles, this.elementClass, capacity, this.queueMaxBytes, this.maxUnread,
180-
this.checkpointMaxAcks, this.checkpointMaxWrites, this.checkpointRetry
181-
);
173+
this.capacity = capacity;
174+
return this;
182175
}
183176

184177
@Override
185178
public Builder queueMaxBytes(final long size) {
186-
return new BuilderImpl(
187-
this.dirForFiles, this.elementClass, this.capacity, size, this.maxUnread,
188-
this.checkpointMaxAcks, this.checkpointMaxWrites, this.checkpointRetry
189-
);
179+
this.queueMaxBytes = size;
180+
return this;
190181
}
191182

192183
@Override
193184
public Builder maxUnread(final int maxUnread) {
194-
return new BuilderImpl(
195-
this.dirForFiles, this.elementClass,
196-
this.capacity, this.queueMaxBytes, maxUnread, this.checkpointMaxAcks,
197-
this.checkpointMaxWrites, this.checkpointRetry
198-
);
185+
this.maxUnread = maxUnread;
186+
return this;
199187
}
200188

201189
@Override
202190
public Builder checkpointMaxAcks(final int checkpointMaxAcks) {
203-
return new BuilderImpl(
204-
this.dirForFiles, this.elementClass,
205-
this.capacity, this.queueMaxBytes, this.maxUnread, checkpointMaxAcks,
206-
this.checkpointMaxWrites, this.checkpointRetry
207-
);
191+
this.checkpointMaxAcks = checkpointMaxAcks;
192+
return this;
208193
}
209194

210195
@Override
211196
public Builder checkpointMaxWrites(final int checkpointMaxWrites) {
212-
return new BuilderImpl(
213-
this.dirForFiles, this.elementClass, this.capacity, this.queueMaxBytes,
214-
this.maxUnread, this.checkpointMaxAcks, checkpointMaxWrites, this.checkpointRetry
215-
);
197+
this.checkpointMaxWrites = checkpointMaxWrites;
198+
return this;
216199
}
217200

218201
@Override
219202
public Builder checkpointRetry(final boolean checkpointRetry) {
220-
return new BuilderImpl(
221-
this.dirForFiles, this.elementClass, this.capacity, this.queueMaxBytes,
222-
this.maxUnread, this.checkpointMaxAcks, checkpointMaxWrites, checkpointRetry
223-
);
203+
this.checkpointRetry = checkpointRetry;
204+
return this;
224205
}
225206

226207
@Override
227208
public Settings build() {
228-
return new SettingsImpl(
229-
this.dirForFiles, this.elementClass, this.capacity, this.queueMaxBytes,
230-
this.maxUnread, this.checkpointMaxAcks, this.checkpointMaxWrites, this.checkpointRetry
231-
);
209+
return Settings.ensureValid(new SettingsImpl(this));
232210
}
233211
}
234212
}

0 commit comments

Comments
 (0)