Skip to content

Workflows

A “Workflow” is a directed acyclic graph consisting of “Tasks” glued together with publishers.

The following simple example contains two tasks.

import krews.core.*
import krews.file.*
import krews.run
import reactor.core.publisher.*

// Bootstrap
fun main(args: Array<String>) = run(sampleWorkflow, args)

// The application's workflow, named "sample-workflow"
val sampleWorkflow = workflow("sample-workflow") {
    // A simple flux of the integers 1 through 10
    val range = (1..10).toFlux()

    // A task that creates base64 files
    val base64Out = task<Int, File>("base64", range) {
        dockerImage = "alpine:3.8"
        output = OutputFile("base64/$input.b64")
        command =
            """
            mkdir -p /data/base64
            echo "Hello world number $$input!" | base64 > /data/base64/$input.b64
            """
    }

    // A task that zips files
    task<File, File>("gzip", base64Out) {
        dockerImage = "alpine:3.8"
        output = OutputFile("gzip/${input.filename()}.gz")
        command =
            """
            mkdir -p /data/gzip
            gzip /data/${input.path} > /data/gzip/${input.filename()}.gz
            """
    }
}

Let’s break this down.

The main function is the application entrypoint. All it does is call the Krews run function which does all the heavy lifting, parsing command line arguments and running the application with different configurations. More on that later.

Tasks

Tasks are objects that handle processing data using docker containers. They have an input publisher and output publisher. The Input publisher may be a Mono or Flux, but the output publisher is always a Flux.

If you read the Project Reactor Documentation, you may have noticed most operations described in terms of marble diagrams (See Flux docs for examples). Tasks can be conceptualized in a similar way.

Tasks operate on every item from an input publisher, process them, and turn them into outputs for the output flux.

Let’s take another look at the tasks from the above workflow.

    val base64Out = task<Int, File>("base64", range) {
        // Everything in this scope is calculated per-input 
        dockerImage = "alpine:3.8"
        output = OutputFile("base64/$input.b64")
        command =
            """
            mkdir -p /data/base64
            echo "Hello world number $input!" | base64 > /data/base64/$input.b64
            """
    }

Task Inputs and Outputs

Tasks are declared in Workflows using the task function. This function has two generic fields that we must provide with classes (The stuff in <>). The first represents the type of the Input and the second Represents the type for the output. These classes may include:

  • Primitive types like Int, String, and Boolean
  • Krews Files
  • Collection classes like List, Set, and Map
  • Data Classes
  • Any combination of the above. For example, List of a data class objects.

It’s a good idea to always use Data Classes, even if you’re only wrapping single Files or values.

More on this below.

Task Definitions

Tasks also require 2 fields, the name and the input publisher, and a builder function (everything in {}). This function runs for every element provider by the input publisher. This input element is available as the variable “input” in the function.

Within the function, you may set the following fields:

  • dockerImage (Required): The name of the docker image the task will run with.
  • output (Required): The output object. (More on this below)
  • command: The command that will be executed on the given docker image.
  • env: a map of optional environment variables to run the docker container with.
  • inputsDir: The working directory that should contain input files.
  • outputsDir: THe working directory that should contain output files.
  • cpus: number of cpus required for each task run.
  • memory: amount of memory required for each task run.
  • diskSize: disk space required for each task run.
  • time: time required for each task run.

Files

Caution

Krews Files in this section refer to File classes in the package krews.file NOT java.io. Make sure to import accordingly with:

import krews.file.*

Krews File objects are references to files that will be moved in and out of containers and File Storage. This is accomplished by using them in task Input and Output types. This can happen 3 ways:

  • as the types directly

    task<File, File>("example", input) { /* ... */ }
    

  • in Collection Types

    task<List<File>, Map<String, File>>("example", input) { /* ... */ }
    

  • as fields in Data Classes

    data class MyTaskInput(fileA: File, fileB: File)
    data class MyTaskOutput(fileX: File, fileY: File)
    
    task<MyTaskInput, MyTaskOutput>("example", input) { /* ... */ }
    

Again, always using data classes for task inputs and outputs is a good idea.

File Paths

Files contain a path field. This is a partial path (ie. some-dir/file.txt) that can be used to determine real file paths as the file gets copied to and from docker containers.

When referencing a file in a task docker container, use the File.dockerPath utility to get the real path inside docker. This path will be equal to "${task.inputsDir}/${file.path}" if the file is from your task input and "${task.outputsDir}/${file.path}" otherwise. inputsDir and outputsDir can be set but are /inputs and /outputs by default respectively.

Files also contain the following utility functions:

  • parentDir(): gets the parent directory in the File’s partial path if one exists.
  • filename(): gets the filename without parent directories.
  • filenameNoExt(): gets the filename without extensions

Input Files

In Krews, InputFile is a type of File. They refer to files that were not created by Krews workflows. Passing them to tasks in input element will trigger copying the file into the task’s docker container.

There are several implementation, and you can create your own.

  • LocalInputFile refers to files on a locally accessible file system. For use with Local executor and files on NFS for Slurm Executor.
  • GSInputFile refers to files in Google Cloud Storage. May be used with any executor as long as the machine running the task has permissions to access to it.

Output Files

OutputFile is another implementation of File. These refer to files that are created by the Krews workflow.

  • They live in /outputs in your execution environment’s working directory.
  • Using OutputFiles in task inputs will trigger the file to be copied into the docker container.
  • Using OutputFiles in task outputs will cause the file to be copied out of the docker container into /outputs.

Important

Make sure to create an OutputFile for every file you want to saved. Any file that is not in the output object WILL NOT be saved to File Storage.

Params

Both workflows and tasks contain values that can be passed in from configuration files. We call these “Params.”

Workflow params are passed into the workflow itself and task params are passed into each task function. Task functions run for each input, but the same task params are passed in each time.

Both types of params have a required generic type (again, the stuff in <>’s). This type just needs to be able to deserialize from our config. It’s highly recommended that you use a data class for this as well. See the configurations page for more on this.

Here’s what this looks like on our previous example:

import krews.core.*
import krews.file.*
import krews.run
import reactor.core.publisher.*

fun main(args: Array<String>) = run(sampleWorkflow, args)

// Now we're using our best practice of data classes for everything
data class SampleParams(val rangeMax: Int)

data class Base64Params(val msg: String)
data class Base64Input(val index: Int)
data class Base64Output(val base64File: File)

data class ZipParams(val filenamePrefix: String)
data class ZipInput(val base64File: File)
data class ZipOutput(val zipFile: File)

val sampleWorkflow = workflow("sample-workflow") {
    // Here's our workflow level params
    val params = params<SampleParams>()

    val base64In = (1..params.rangeMax).toFlux().map { Base64Input(it) }
    val base64Out = task<Base64Input, Base64Output>("base64", base64In) {
        // and here's our workflow level params
        val taskParams = taskParams<Base64Params>()

        dockerImage = "alpine:3.8"
        output = Base64Output(OutputFile("base64/${input.index}.b64"))
        command =
            """
            mkdir -p /data/base64
            echo "${taskParams.msg} ${input.index}!" | base64 > /data/base64/${input.index}.b64
            """
    }

    val zipIn = base64Out.map { ZipInput(it.base64File) }
    task<ZipInput, ZipOutput>("gzip", zipIn) {
        val taskParams = taskParams<ZipParams>()

        dockerImage = "alpine:3.8"
        output = ZipOutput(OutputFile("gzip/${input.base64File.filename()}.gz"))
        command =
            """
            mkdir -p /data/gzip
            gzip /data/${input.base64File.path} > /data/gzip/${taskParams.filenamePrefix}-${input.base64File.filename()}.gz
            """
    }
}

Grouping

Sometimes jobs are so small and fast that we spend more time and compute power on the overhead than the job itself. For example, on Google this means spinning up new VMs, downloading docker images, downloading large input files that may be needed across many task runs, and tracking and polling with Krews.

For cases like this, Krews allows you to “group” multiple runs from the same task together to be submitted as single jobs. This can be done via task level configuration. For example

task.my-task {
    grouping = 5
}

This will submit “my-task” task runs in batches of 5. On Google this means that they will run sequentially on the same VM, in separate containers. On Slurm, this would mean running sequentially in the same SBatch job. For Local Docker Runs this setting is ignored.

Code Organization

By now you might be thinking that our workflow is starting to look pretty busy, and as our workflow grows, this problem would only get worse.

Because this is an ordinary Kotlin project, we get to split our code up into multiple files. Here’s how we recommend you do it.

logo

Notice we’ve broken up our application into a top-level entrypoint file containing our workflow App.kt, and a file for each task in the task package.

Let’s take a look at one of the tasks first.

task/Base64.kt

package task

import krews.core.*
import krews.file.*
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux

data class Base64Params(val msg: String)
data class Base64Input(val index: Int)
data class Base64Output(val base64File: File)

fun WorkflowBuilder.base64Task(i: Publisher<Base64Input>): Flux<Base64Output> = 
        this.task("base64", i) {
    val taskParams = taskParams<Base64Params>()
    val msg = taskParams.msg
    val index = input.index

    dockerImage = "alpine:3.9"
    output = Base64Output(OutputFile("base64/$index.b64"))
    command =
        """
        mkdir -p /data/base64
        echo "$msg $index!" | base64 > /data/base64/$index.b64
        """
}

Now all our Base64 task related code is in one place. We’ve also used a Kotlin Extension Function on a class called WorkflowBuilder with Single-Expression Function Shorthand for our task creation function itself. WorkflowBuilder the class used under-the-hood when you call the “workflow” function. This is just some syntax sugar that allows us to make our workflow look like this:

App.kt

import krews.core.*
import krews.run
import reactor.core.publisher.*
import task.*

fun main(args: Array<String>) = run(sampleWorkflow, args)

data class SampleParams(
    val rangeMax: Int
)

val sampleWorkflow = workflow("sample-workflow") {
    val params = params<SampleParams>()

    val base64In = (1..params.rangeMax).toFlux().map { Base64Input(it) }
    // Here's our base64Task call referencing our extension function
    val base64Out = base64Task(base64In)

    val zipIn = base64Out.map { ZipInput(it.base64File) }
    zipTask(zipIn)
}

Now our application file is just concerned with a higher level view of the workflow, what tasks are added and how they’ve been piped together.