Throwables
Things don't always go as planned and neither do your ActivityTask
. Through the help of the DataConverter
, Throwables
are marshalled from the Activity to the decider. That means you can write decider logic that approriately accounts for exceptions that may occur during the execution of an ActivityTask.
A common scenario may be that you would want to run some cleanup actions if an exception is thrown.
@Override
public CompletionStage<String> decide(DecisionContext decisionContext) {
CompletionStage<String> input = workflowInput(decisionContext.events());
return input.thenCompose(i -> step1.withInput(i).decide(decisionContext)).handle((r, t) -> {
if (t != null) {
CompletionStage<String> resultDefault = step1.withActionId(ActionId.of("step1-default"))
.withInput("Default Name").decide(decisionContext);
return step2.withInput((Object) new String[]{"I am missing name."}).decide(decisionContext)
.thenCompose(v -> resultDefault);
}
return CompletableFuture.completedFuture(r);
}).thenCompose(x -> x);
}
In the above example, perhaps, the first invocation of step1
throws an exception. We might now want to rerun step1
(or any other ActivityTask) as a result. CompletionStage
offered by the JDK8 is not the most eloquent beat, and I am considering replacing it with a Future/Promise library that is more monadic such as from javaslang
In this particular case don't forget the
withActionId
since you need to change the name of the previous step.
Heartbeat
During Activity registration (occurs automatically at the start of the ActivityPollerPool
) a taskHeartbeatTimeout
is set. The taskHeartbeatTimeout
is defined as the longest time an AcitivityTask
may perform work before either responding with the result or heartbeat. For activities that run extra long (since a Workflow is allowed to run for up to 1 year!) you'll need to periodically send heartbeats during the Acitvity.
All ActivityMethods must be present in a class that subclasses from com.github.fzakaria.waterflow.Activities
which includes the protected method
protected void recordHeartbeat(String details)
@ActivityMethod(name = "Heartbeat Example", version = "1.0", heartbeatTimeout = "5")
public Void heartbeatExample() throws InterruptedException{
final LongAdder adder = new LongAdder();
final ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
service.scheduleAtFixedRate(() -> {
adder.increment();
recordHeartbeat(format("This is the %s heartbeat", adder.intValue()));
}, 0, 1, TimeUnit.SECONDS);
Thread.sleep(Duration.ofSeconds(10).toMillis());
service.shutdownNow();
return null;
}
Higher level abstractions may find it necessary to wrap the heartbeat functionality in a Watch Dog Thread.
Retries
It would be a PITA if you had to correctly handdle transient exceptions thrown in your ActivityTask - such as service intermittent outage or consistency check or even IOExceptions during HTTP requests.
Since the concept of retrying intermittent failures is so common, WaterFlow
provides a simple way to register a RetryStrategy
to the ActivityAction
.
If any Throwable
escapes during the invocation of an ActivityTask
, the RetryStrategy
is used to determine how long a Timer
should be created to delay until the next attempt.
Several strategies are included in the framework:
- TimeLimitRetryStrategy
- MaxAttemptsRetryStrategy
- ExponentialBackoffStrategy
- NoRetryStrategy
- MaxLimitRetryStrategy
- FixedDelayRetryStrategy
final IntegerActivityAction step1 = IntegerActivityAction.builder()
.actionId(ActionId.of("step1")).retryStrategy(new FixedDelayRetryStrategy(Duration.ofSeconds(3)))
.name(Name.of("Hello World")).version(Version.of("1.0")).workflow(this).build();
DataConverters
The WaterFlow
framework marshals data across activities & deciders by using a DataConverter
. By default, the framework offers a data converter that is based on the Jackson JSON processor - similar to the AWS Flow Framework.
If the default converter isn't sufficient for your application, you can implement a custom data converter. A good example is offering a DataConverter
that might encrypt/decrypt data sent over the wire or marshalling data through S3
to overcome the size limitation of the fields put in palce by SWF