Skip to content

Commit 22939e2

Browse files
committed
Optimizing core classes (Stack and Continuation) -- remove insane logging overhead in critical operations; split Continuation to Restartable (previous behavior) and Optimized (no stack copy on resume but only single resume is possible -- 99.99999% of cases are single resume)
1 parent 0f497ab commit 22939e2

File tree

2 files changed

+254
-75
lines changed

2 files changed

+254
-75
lines changed

net.tascalate.javaflow.api/src/main/java/org/apache/commons/javaflow/api/Continuation.java

Lines changed: 199 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -47,21 +47,21 @@
4747
* later.
4848
*
4949
*/
50-
public final class Continuation implements Serializable {
50+
abstract public class Continuation implements Serializable {
5151

5252
private static final Log log = LogFactory.getLog(Continuation.class);
53-
private static final long serialVersionUID = 2L;
53+
private static final long serialVersionUID = 3L;
5454

55-
private final StackRecorder stackRecorder;
5655
private final Object value;
5756

57+
final StackRecorder stackRecorder;
58+
5859
/**
5960
* Create a new continuation, which continue a previous continuation.
6061
*/
61-
private Continuation(StackRecorder stackRecorder, Object value ) {
62+
Continuation(StackRecorder stackRecorder, Object value) {
6263
this.stackRecorder = stackRecorder;
6364
this.value = value;
64-
6565
}
6666

6767

@@ -102,11 +102,34 @@ public static Object getContext() {
102102
* always return a non-null valid object.
103103
*/
104104
public static Continuation startSuspendedWith(Runnable target) {
105+
return startSuspendedWith(target, false);
106+
}
107+
108+
/**
109+
* Creates a new {@link Continuation} object from the specified {@link Runnable}
110+
* object.
111+
*
112+
* <p>
113+
* Unlike the {@link #startWith(Runnable)} method, this method doesn't actually
114+
* execute the <tt>Runnable</tt> object. It will be executed when
115+
* it's {@link #resume() resumed}.</p>
116+
*
117+
* @param target
118+
* The object whose <tt>run</tt> method will be executed.
119+
* @param optimized
120+
* If true then continuation constructed is performance-optimized but
121+
* may be resumed only once. Otherwise "restartable" continuation is created that may
122+
* be resumed multiple times.
123+
* @return
124+
* always return a non-null valid object.
125+
*/
126+
public static Continuation startSuspendedWith(Runnable target, boolean optimized) {
105127
if(target == null) {
106128
throw new IllegalArgumentException("target is null");
107129
}
108-
return new Continuation( new StackRecorder(target), null );
109-
}
130+
StackRecorder stackRecorder = new StackRecorder(target);
131+
return optimized ? new OptimizedContinuation(stackRecorder, null) : new RestartableContinuation(stackRecorder, null);
132+
}
110133

111134
/**
112135
* Starts executing the specified {@link Runnable} object in an environment
@@ -122,7 +145,28 @@ public static Continuation startSuspendedWith(Runnable target) {
122145
* @see #startWith(Runnable, Object)
123146
*/
124147
public static Continuation startWith(Runnable target) {
125-
return startWith(target, null);
148+
return startWith(target, false);
149+
}
150+
151+
/**
152+
* Starts executing the specified {@link Runnable} object in an environment
153+
* that allows {@link Continuation#suspend()}.
154+
*
155+
* <p>
156+
* This is a short hand for <tt>startWith(target,null)</tt>.
157+
*
158+
* @param target
159+
* The object whose <tt>run</tt> method will be executed.
160+
* @param optimized
161+
* If true then continuation constructed is performance-optimized but
162+
* may be resumed only once. Otherwise "restartable" continuation is created that may
163+
* be resumed multiple times.
164+
* @return
165+
* Continuation object if runnable supplied is supended, otherwise <code>null</code>
166+
* @see #startWith(Runnable, Object)
167+
*/
168+
public static Continuation startWith(Runnable target, boolean optimized) {
169+
return startWith(target, null, optimized);
126170
}
127171

128172
/**
@@ -133,7 +177,7 @@ public static Continuation startWith(Runnable target) {
133177
*
134178
* @param target
135179
* The object whose <tt>run</tt> method will be executed.
136-
* @param pContext
180+
* @param context
137181
* This value can be obtained from {@link #getContext()} until this method returns.
138182
* Can be null.
139183
* @return
@@ -142,13 +186,38 @@ public static Continuation startWith(Runnable target) {
142186
* a new non-null continuation is returned.
143187
* @see #getContext()
144188
*/
145-
public static Continuation startWith(Runnable target, Object pContext) {
189+
public static Continuation startWith(Runnable target, Object context) {
190+
return startWith(target, context, false);
191+
}
192+
193+
/**
194+
* Starts executing the specified {@link Runnable} object in an environment
195+
* that allows {@link Continuation#suspend()}.
196+
*
197+
* This method blocks until the continuation suspends or completes.
198+
*
199+
* @param target
200+
* The object whose <tt>run</tt> method will be executed.
201+
* @param context
202+
* This value can be obtained from {@link #getContext()} until this method returns.
203+
* Can be null.
204+
* @param optimized
205+
* If true then continuation constructed is performance-optimized but
206+
* may be resumed only once. Otherwise "restartable" continuation is created that may
207+
* be resumed multiple times.
208+
* @return
209+
* If the execution completes and there's nothing more to continue, return null.
210+
* Otherwise, the execution has been {@link #suspend() suspended}, in which case
211+
* a new non-null continuation is returned.
212+
* @see #getContext()
213+
*/
214+
public static Continuation startWith(Runnable target, Object context, boolean optimized) {
146215
if (log.isDebugEnabled()) {
147216
log.debug("starting new flow from " + ReflectionUtils.descriptionOfObject(target));
148217
}
149218

150-
return startSuspendedWith(target).resume(pContext);
151-
}
219+
return startSuspendedWith(target, optimized).resume(context);
220+
}
152221

153222
/**
154223
* This method is deprecated, please use {@link #resume()} instead
@@ -253,29 +322,6 @@ public Continuation resume(Object value) {
253322
public void terminate() {
254323
resumeWith(ResumeParameter.exit());
255324
}
256-
257-
protected Continuation resumeWith(ResumeParameter param) {
258-
if (log.isDebugEnabled()) {
259-
log.debug("continueing with continuation " + ReflectionUtils.descriptionOfObject(this));
260-
}
261-
262-
while(true) {
263-
StackRecorder nextStackRecorder = new StackRecorder(stackRecorder);
264-
SuspendResult result = nextStackRecorder.execute(param);
265-
if (SuspendResult.EXIT == result) {
266-
// no more thing to continue
267-
return null;
268-
} else if (SuspendResult.CANCEL == result) {
269-
// return immediately with itself
270-
return this;
271-
} else if (SuspendResult.AGAIN == result) {
272-
// re-execute immediately
273-
continue;
274-
}
275-
276-
return new Continuation(nextStackRecorder, result.value());
277-
}
278-
}
279325

280326
/**
281327
* Check if captured continuation is serializable
@@ -297,6 +343,24 @@ public Object value() {
297343
return value;
298344
}
299345

346+
/**
347+
* <p>View this continuation as a "restartable" continuation that may be resumed multiple times.
348+
* <p>Conversion to the restartable continuation is not always possible, i.e. already resumed
349+
* optimized continuation may not be converted to the restartable variant.
350+
*
351+
* @return self if this is already a restartable continuation or a newly constructed
352+
* restartable continuation
353+
*/
354+
abstract public Continuation restartable();
355+
356+
/**
357+
* <p>View this continuation as a performance-optimized continuation that may be resumed only once.
358+
* <p>Conversion to the optimized continuation is always possible
359+
*
360+
* @return self if this is already an optimized continuation or a newly constructed optimized continuation
361+
*/
362+
abstract public Continuation optimized();
363+
300364
/**
301365
* Stops the running continuation.
302366
*
@@ -423,7 +487,107 @@ public static void cancel() {
423487
StackRecorder.suspend(SuspendResult.CANCEL);
424488
}
425489

490+
@Override
426491
public String toString() {
427492
return "Continuation@" + hashCode() + "/" + ReflectionUtils.getClassLoaderName(this);
428493
}
494+
495+
abstract protected Continuation resumeWith(ResumeParameter param);
496+
497+
static final class RestartableContinuation extends Continuation {
498+
private static final long serialVersionUID = 1L;
499+
500+
RestartableContinuation(StackRecorder stackRecorder, Object value) {
501+
super(stackRecorder, value);
502+
}
503+
504+
@Override
505+
public Continuation restartable() {
506+
return this;
507+
}
508+
509+
@Override
510+
public Continuation optimized() {
511+
return new OptimizedContinuation(new StackRecorder(stackRecorder), value());
512+
}
513+
514+
@Override
515+
protected Continuation resumeWith(ResumeParameter param) {
516+
if (log.isDebugEnabled()) {
517+
log.debug("continueing with continuation " + ReflectionUtils.descriptionOfObject(this));
518+
}
519+
while(true) {
520+
StackRecorder nextStackRecorder = new StackRecorder(stackRecorder);
521+
SuspendResult result = nextStackRecorder.execute(param);
522+
if (SuspendResult.EXIT == result) {
523+
// no more thing to continue
524+
return null;
525+
} else if (SuspendResult.CANCEL == result) {
526+
// return immediately with itself
527+
return this;
528+
} else if (SuspendResult.AGAIN == result) {
529+
// re-execute immediately
530+
continue;
531+
}
532+
533+
return new RestartableContinuation(nextStackRecorder, result.value());
534+
}
535+
}
536+
}
537+
538+
static final class OptimizedContinuation extends Continuation {
539+
private static final long serialVersionUID = 1L;
540+
private boolean isResumed = false;
541+
542+
OptimizedContinuation(StackRecorder stackRecorder, Object value) {
543+
super(stackRecorder, value);
544+
}
545+
546+
@Override
547+
public Continuation restartable() {
548+
synchronized (this) {
549+
if (isResumed) {
550+
throw new IllegalStateException("Exclusive continuation may not be converted to restartable after resume");
551+
}
552+
}
553+
return new RestartableContinuation(new StackRecorder(stackRecorder), value());
554+
}
555+
556+
@Override
557+
public Continuation optimized() {
558+
return this;
559+
}
560+
561+
@Override
562+
protected Continuation resumeWith(ResumeParameter param) {
563+
if (log.isDebugEnabled()) {
564+
log.debug("continueing with continuation " + ReflectionUtils.descriptionOfObject(this));
565+
}
566+
synchronized (this) {
567+
if (isResumed) {
568+
if (param == ResumeParameter.exit()) {
569+
return null;
570+
} else {
571+
throw new IllegalStateException("Exclusive continuation may be resumed only once");
572+
}
573+
}
574+
isResumed = true;
575+
}
576+
StackRecorder nextStackRecorder = stackRecorder; // Use existing one, don't copy
577+
SuspendResult result = nextStackRecorder.execute(param);
578+
if (SuspendResult.EXIT == result) {
579+
// no more thing to continue
580+
return null;
581+
} else if (SuspendResult.CANCEL == result) {
582+
// return immediately with null -- stack is no longer valid
583+
return null;
584+
} else if (SuspendResult.AGAIN == result) {
585+
// invalid operation for exclusive continuation
586+
throw new IllegalStateException("Exclusive continuation may not be re-tried");
587+
}
588+
589+
return new OptimizedContinuation(nextStackRecorder, result.value());
590+
}
591+
}
592+
429593
}

0 commit comments

Comments
 (0)