Twister2

Twister2

  • Getting Started
  • Docs
  • Tutorial
  • AI
  • Examples
  • Contribute
  • Download
  • Configurations
  • Java Docs
  • GitHub
  • Blog

›APIs

Compiling

  • Overview
  • Linux
  • MacOS
  • Maven Artifacts

App Development

  • API Overview
  • Developing Applications
  • Streaming Jobs
  • Batch Jobs

APIs

  • Worker API
  • Data API
  • Compute API
  • Operator API
  • Windowing API
  • Storm API
  • Apache Beam
  • Python API

Deployment

  • Job Submit
  • Standalone
  • Docker
  • Kubernetes
  • Minikube
  • Mesos
  • Nomad
  • Slurm
  • Dashboard
  • Logging
  • Configurations

Concepts

  • Overview
  • Architecture
  • Operators
  • Task System
  • Data Access

Resources

  • Publications

Operator API

This is the lowest level of API provided by Twister2. It provides the basic parallel operators required by a parallel programs in terms of both Bulk Synchronous Parallel (BSP) and DataFlow API.

The BSP APIs are provided by Harp and MPI specification (OpenMPI).

The DataFlow operators are implemented by Twister2 as a Twister:Net library.

We will focus on the DataFlow operators in this guide as Harp API's and MPI APIs are discussed in their own documentation.

DataFlow Operator Overview

Twister2 dataflow operators are designed as asynchronous operators with state. Every operator needs a LogicalPlan, a Communicator and set of sources and set of targets.

LogicalPlan gives the mapping from set of ids to workers processes. These logical ids are used by sources and targets. An operator can have sources in one set of processes and targets in another set of processes. Depending on the operator, there source set of target set can have one or more logical ids.

The communicator encapsulates the underlying channel that is being used for network data transfer. At the moment Twister:Net supports MPI based and TCP socket based implementations.

An Example Operator

Lets look at an example operator. For this example we have chosen the batch Reduce operator. This operator can have many sources and only one target.

Input to the operator can be any data type. All the sources input data and these values are reduced to a single value at the target. How to reduce multiple values to a single value is defined by a user defined function.

LogicalPlanBuilder logicalPlanBuilder = LogicalPlanBuilder.plan(
  3, // 3 sources
  3, // 3 targets
  workerEnv
).withFairDistribution();

// setup the batch operation
BReduce reduce = new BReduce(communicator, logicalPlanBuilder,
    reduceFunction, reduceReceiver, datatype);
// send the data
reduce.reduce(task, data, flag);

// lets call finish, every source need to call finish, in this example
// we assume one source in a single worker
reduce.finish(source);

// wait while operator completes
while (!reduce.isComplete()) {
    reduce.progressChannel();
}

After every participating source sends its data and calls finish, the final values will be received to the user defined reduceReceiver. At this point, the operation will be complete and the while loop at the end will exit.

We can call reduce.reduce multiple times.

Streaming & Batch Operators

There are separate set of streaming operators and batch operators. The streaming operators never end while batch operators have an end. Otherwise their programming is similar. So in our example above we won't have the finish call and isComplete will never be true for a streaming operator.

For batch operators user needs to indicate the end by using a special flag or a method.

Termination & Progress

The operator library doesn't created its own threads. So, unless user calls some function in the library, nothing will automatically happen. The progression of the operator is done using the progress and progressChannel methods. Unless these methods are called, data will not go through the network.

The data accepting functions of the operators can return true or false depending on weather they accept the data or not. If the method doesn't accept data, that means the internal data structures are full and user needs to call the progressChannel method to send the data to its targets. User doesn't need to wait until the method returns false to call the progression methods.

Batch Operators

The table below describes the batch operators supported by Twister2 and their semantics.

OperatorDescriptionClass
ReduceReduces a set of values into a single value with a user defined function.edu.iu.dsc.tws.comms.batch.BReduce
AllReduceReduces a set of values into a single value with a user defined function and broadcast it to all the targetsedu.iu.dsc.tws.comms.batch.BAllReduce
GatherGathers values from all sources and give it to a single target, optionally can use the disk to do large gathers that doesn't fit into memoryedu.iu.dsc.tws.comms.batch.BGather
AllGatherGathers values from all sources and broadcast it to all the targets, optionally can use the disk to do large gathers that doesn't fit into memoryedu.iu.dsc.tws.comms.batch.BAllGather
BroadcastBroadcast a value from a single source to multiple targetsedu.iu.dsc.tws.comms.batch.BBroadcast
DirectA peer to peer communication between set of sources and targets, each source is matched to a single targetedu.iu.dsc.tws.comms.batch.BDirect
KeyedReduceReduce based on a key specified by the user. Values with the same key are reduced togetheredu.iu.dsc.tws.comms.batch.BKeyedReduce
KeyedGatherGather based on a key specified by the user. Values with the same key are gathered togetheredu.iu.dsc.tws.comms.batch.BKeyedGather
JoinEqui join two data sets with a given keyedu.iu.dsc.tws.comms.batch.BJoin
OuterJoinOuter join two data sets with a given keyedu.iu.dsc.tws.comms.batch.BJoin
Left outer joinLeft outer join two data sets according to a keyedu.iu.dsc.tws.comms.batch.BJoin
Right outer joinRigght outer join two data sets according to a keyedu.iu.dsc.tws.comms.batch.BJoin
PartitionRedistributes data according to a user criteriaedu.iu.dsc.tws.comms.batch.BPartition

Streaming Operators

The table below describes the streaming operators supported by Twister2 and their semantics.

OperatorDescriptionClass
ReduceReduces a set of values into a single value with a user defined function.edu.iu.dsc.tws.comms.stream.SReduce
AllReduceReduces a set of values into a single value with a user defined function and broadcast it to all the targetsedu.iu.dsc.tws.comms.stream.SAllReduce
GatherGathers values from all sources and give it to a single target, optionally can use the disk to do large gathers that doesn't fit into memoryedu.iu.dsc.tws.comms.stream.SGather
AllGatherGathers values from all sources and broadcast it to all the targets, optionally can use the disk to do large gathers that doesn't fit into memoryedu.iu.dsc.tws.comms.stream.SAllGather
BroadcastBroadcast a value from a single source to multiple targetsedu.iu.dsc.tws.comms.stream.SBroadcast
DirectA peer to peer communication between set of sources and targets, each source is matched to a single targetedu.iu.dsc.tws.comms.stream.SDirect
PartitionRedistributes data according to a user criteriaedu.iu.dsc.tws.comms.stream.SPartition

Disk Based Operators

Some operators can be configured to use the disk in-order to support large data sets. Only batch operators support this mode. Join operations and KeyedGather operators can use the disk to do large data operations.

High performance communication layer

Because of the bottom up approach taken when designing and implementing Twister2 the communication layer performs extremely well. A complete study on the Twister2 communication layer can be found at Twister2:Net.

The image below which is extracted from Twister2:Net shows how Twister2 performs against Apache Spark and MPI. Please note that Spark KMeans example is written using the data level API while Twister2 and MPI implementations are communication level applications. However it is clear that Twister2 performs on the same level as OpenMPI which is an highly optimized communication library in the HPC world. And it out performs Spark by roughly a factor of x10.

Kmeans Performance Comparison

Notation :
DFW refers to Twister2
BSP refers to MPI (OpenMPI)

← Compute APIWindowing API →
  • DataFlow Operator Overview
  • An Example Operator
  • Streaming & Batch Operators
  • Termination & Progress
  • Batch Operators
  • Streaming Operators
  • Disk Based Operators
  • High performance communication layer
Twister2
Docs
Getting Started (Quickstart)Guides (Programming Guides)
Community
Stack OverflowProject Chat
More
BlogGitHubStar
Copyright © 2020 Indiana University