Getting Started
The following tutorial will walk you through a very brief introduction to installing WaterFlow
and writing your first
HelloWorldWorkflow
.
For more advanced workflows and additional information consult the user-guide.
Requirements
- Java 1.8+
- Active
Amazon Web Services
account- AWS credentials must have full access to the SWF service
- Understanding of Amazon Simple Workflow concepts and the Amazon AWS SDK for Java
Installation
<dependency>
<groupId>com.github.fzakaria</groupId>
<artifactId>WaterFlow</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
HelloWorldWorkFlow
The following is a short tutorial on building a Maven application using WaterFlow
. It is a very high level overview and additional information can be found in accompanying sections of the user-guide.
You can find the code listed below on GitHub
Setup
We will assume a maven setup, however the only difference is transitioning the pom.xml
to the build tool of your choice.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>FILL ME IN/groupId>
<artifactId>FILL ME IN/artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<immutables.version>2.1.11</immutables.version>
</properties>
<dependencies>
<dependency>
<groupId>org.immutables</groupId>
<artifactId>value</artifactId>
<version>${immutables.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.github.fzakaria</groupId>
<artifactId>WaterFlow</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.1.5</version>
<scope>runtime</scope>
</dependency>
</dependencies>
</project>
</project>
Activities
The bread and butter of actions in SWF are the ActivityTasks, which represent a unit of asyncrhonous work. Activities can exist in any class that subclasses Activities
and annotated with ActivityMethod
public class ExampleActivities extends Activities {
@ActivityMethod(name = "Hello World", version = "1.0")
public String helloWorld(String name) {
if (name == null) {
throw new IllegalArgumentException("You must provide a non null name!");
}
return String.format("Hello World %s", name);
}
}
In the example above, we've created a very simple ActivityTask
which given an input string, appends it to a Hello World message. The example above is working solely with input/output of String however you are not limited to that! We will see in the following documentation that thanks to the DataConverter
we can pass arbitrary complex types to/from our Activities.
Workflow
The workflow class in the framework represents the Decider
. Through the interface public CompletionStage decide(DecisionContext decisionContext)
the Workflow can orchestrate the future actions that need to occur for the workflow given the previous history.
The example below is a simple example of implementing the Workflow interface. Each workflow has an associated name and version which is important when decided which instance the decider needs to find. As you can see in the decide
method, orchestrating future tasks is as simple as using the JDK8 interface for CompletionStage
.
@Value.Immutable
public class HelloWorldWorkflow extends Workflow<String,String> {
@Override
public Name name() {
return Name.of("Hello World");
}
@Override
public Version version() {
return Version.of("1.0");
}
@Override
public TypeToken<String> inputType() {
return TypeToken.of(String.class);
}
@Override
public TypeToken<String> outputType() {
return TypeToken.of(String.class);
}
@Override
public DataConverter dataConverter() {
return ImmutableJacksonDataConverter.builder().build();
}
final StringActivityAction step1 = StringActivityAction.builder().actionId(ActionId.of("step1"))
.name(Name.of("Hello World")).version(Version.of("1.0")).workflow(this).build();
@Override
public CompletionStage decide(DecisionContext decisionContext) {
CompletionStage<String> input = workflowInput(decisionContext.events());
return input.thenCompose(i -> step1.withInput(i).decide(decisionContext));
}
//Run me to submit this workflow!
public static void main(String[] args) throws IOException, InterruptedException {
Config config = new Config();
HelloWorldWorkflow workflow = ImmutableHelloWorldWorkflow.builder()
.description(Description.of("Starting my first workflow!")).build();
config.submit(workflow, "Jane Doe");
}
}
We've provided a helpful main
method that you can launch that will submit a new instance of this workflow!
Actors - Deciders and Activity Workers
You get to decide the composition of the actors. You can scale the deciders and acivity workers seperately and arrange how each one is configured when polling a particular TaskList
or Domain
. For the purposes of this example, we demonstrate a basic setup that initiates both Activity Workers
and Deciders
as separate threads on the same JVM.
public class ActivityDecisionPollerPool {
protected static final Logger log = LoggerFactory.getLogger(ActivityDecisionPollerPool.class);
private final Config config = new Config();
private final ActivityPollerPool activityPollerPool =
ImmutableActivityPollerPool.builder().domain(config.domain)
.taskList(config.taskListName)
.service(new ScheduledThreadPoolExecutor(config.numberOfWorkers))
.swf(config.swf)
.dataConverter(config.dataConverter)
.addActivities(new HelloWorldActivities()).build();
private final DecisionPollerPool decisionPollerPool =
ImmutableDecisionPollerPool.builder().domain(config.domain)
.taskList(config.taskListName)
.service(new ScheduledThreadPoolExecutor(config.numberOfWorkers))
.swf(config.swf)
.dataConverter(config.dataConverter)
.workflows(Lists.newArrayList(new HelloWorldWorkflow())).build();
public void start() {
activityPollerPool.start();
decisionPollerPool.start();
}
public void stop() {
activityPollerPool.stop();
decisionPollerPool.stop();
}
public static void main(String[] args) throws IOException, InterruptedException {
ActivityDecisionPollerPool activityAndDecisionPollerPool =
new ActivityDecisionPollerPool();
activityAndDecisionPollerPool.start();
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
log.info("Shutting down pool and exiting.");
activityAndDecisionPollerPool.stop();
}
});
log.info("activity pollers started:");
}
}
For the purposes of this demonstrates, the configuration is hard configured in a POJO. However not much is needed to change the Config
class to something that is wired by your favourite Dependency Injection framework (i.e. Guice or Spring)
public class Config {
public final Domain domain = Domain.of("swift");
public final TaskListName taskListName = SwfConstants.DEFAULT_TASK_LIST;
public final Integer numberOfWorkers = 2;
public final DataConverter dataConverter = ImmutableJacksonDataConverter.builder().build();
//SWF holds the connection for 60 seconds to see if a decision is available
final Duration DEFAULT_CONNECTION_TIMEOUT = Duration.ofSeconds(60);
final Duration DEFAULT_SOCKET_TIMEOUT = DEFAULT_CONNECTION_TIMEOUT.plusSeconds(10);
public final AmazonSimpleWorkflow swf = new AmazonSimpleWorkflowClient(new DefaultAWSCredentialsProviderChain(),
new ClientConfiguration().withConnectionTimeout((int) DEFAULT_CONNECTION_TIMEOUT.toMillis())
.withSocketTimeout((int) DEFAULT_SOCKET_TIMEOUT.toMillis()));
public WorkflowExecution submit(Workflow workflow, WorkflowId workflowId, Optional<Object> input) {
Optional<Input> inputOptional = input.map( i -> dataConverter.toData(i)).map(Input::of);
StartWorkflowExecutionRequest request =
WorkflowExecutionRequestBuilder.builder().domain(domain)
.workflow(workflow).input(inputOptional)
.taskList(taskListName).workflowId(workflowId).build();
Run run = swf.startWorkflowExecution(request);
return new WorkflowExecution().withWorkflowId(workflowId.value()).withRunId(run.getRunId());
}
public <I, O> WorkflowExecution submit(Workflow<I,O> workflow, I input) {
WorkflowId workflowId = WorkflowId.randomUniqueWorkflowId(workflow);
return submit(workflow, workflowId, Optional.ofNullable(input));
}
}
Verifying The Results
We can use the aws cli
tool or the SWF dashboard to verify our results!
Scroll down to the bottom until you see the WorkflowExecutionCompleted
and check out the result!
{
"events": [
{
"eventId": 1,
"eventType": "WorkflowExecutionStarted",
"workflowExecutionStartedEventAttributes": {
"taskList": {
"name": "DEFAULT"
},
"parentInitiatedEventId": 0,
"taskStartToCloseTimeout": "60",
"childPolicy": "TERMINATE",
"executionStartToCloseTimeout": "30758400",
"input": "\"Jane Doe\"",
"workflowType": {
"version": "1.0",
"name": "Hello World"
}
},
"eventTimestamp": 1456343145.567
},
{
"eventId": 2,
"eventType": "DecisionTaskScheduled",
"decisionTaskScheduledEventAttributes": {
"startToCloseTimeout": "60",
"taskList": {
"name": "DEFAULT"
}
},
"eventTimestamp": 1456343145.567
},
{
"eventId": 3,
"eventType": "DecisionTaskStarted",
"eventTimestamp": 1456343145.654,
"decisionTaskStartedEventAttributes": {
"scheduledEventId": 2,
"identity": "DECIDER-0"
}
},
{
"eventId": 4,
"eventType": "DecisionTaskTimedOut",
"decisionTaskTimedOutEventAttributes": {
"startedEventId": 3,
"timeoutType": "START_TO_CLOSE",
"scheduledEventId": 2
},
"eventTimestamp": 1456343205.66
},
{
"eventId": 5,
"eventType": "DecisionTaskScheduled",
"decisionTaskScheduledEventAttributes": {
"startToCloseTimeout": "60",
"taskList": {
"name": "DEFAULT"
}
},
"eventTimestamp": 1456343205.66
},
{
"eventId": 6,
"eventType": "DecisionTaskStarted",
"eventTimestamp": 1456343205.708,
"decisionTaskStartedEventAttributes": {
"scheduledEventId": 5,
"identity": "DECIDER-0"
}
},
{
"eventId": 7,
"eventType": "DecisionTaskCompleted",
"decisionTaskCompletedEventAttributes": {
"startedEventId": 6,
"scheduledEventId": 5
},
"eventTimestamp": 1456343206.078
},
{
"eventId": 8,
"eventType": "ActivityTaskScheduled",
"activityTaskScheduledEventAttributes": {
"taskList": {
"name": "DEFAULT"
},
"scheduleToCloseTimeout": "NONE",
"activityType": {
"version": "1.0",
"name": "Hello World"
},
"decisionTaskCompletedEventId": 7,
"heartbeatTimeout": "NONE",
"activityId": "step1",
"scheduleToStartTimeout": "NONE",
"startToCloseTimeout": "NONE",
"input": "[ \"[Ljava.lang.Object;\", [ \"Jane Doe\" ] ]"
},
"eventTimestamp": 1456343206.078
},
{
"eventId": 9,
"eventType": "ActivityTaskStarted",
"eventTimestamp": 1456343206.185,
"activityTaskStartedEventAttributes": {
"scheduledEventId": 8,
"identity": "ACTIVITY-0"
}
},
{
"eventId": 10,
"eventType": "ActivityTaskCompleted",
"activityTaskCompletedEventAttributes": {
"startedEventId": 9,
"scheduledEventId": 8,
"result": "\"Hello World Jane Doe!\""
},
"eventTimestamp": 1456343206.39
},
{
"eventId": 11,
"eventType": "DecisionTaskScheduled",
"decisionTaskScheduledEventAttributes": {
"startToCloseTimeout": "60",
"taskList": {
"name": "DEFAULT"
}
},
"eventTimestamp": 1456343206.39
},
{
"eventId": 12,
"eventType": "DecisionTaskStarted",
"eventTimestamp": 1456343206.45,
"decisionTaskStartedEventAttributes": {
"scheduledEventId": 11,
"identity": "DECIDER-1"
}
},
{
"eventId": 13,
"eventType": "DecisionTaskCompleted",
"decisionTaskCompletedEventAttributes": {
"startedEventId": 12,
"scheduledEventId": 11
},
"eventTimestamp": 1456343206.621
},
{
"eventId": 14,
"eventType": "WorkflowExecutionCompleted",
"workflowExecutionCompletedEventAttributes": {
"result": "\"Hello World Jane Doe!\"",
"decisionTaskCompletedEventId": 13
},
"eventTimestamp": 1456343206.621
}
]
}
That's all it took to get using Simple Workflow Service! No need to fear complicated deciders and distributed work.
Continue reading the user-guide to learn some more advanced use cases and features of WaterFlow
.