Twister2

Twister2

  • Getting Started
  • Docs
  • Tutorial
  • AI
  • Examples
  • Contribute
  • Download
  • Configurations
  • Java Docs
  • GitHub
  • Blog

›Machine Learning

AI

  • Overview

Twister2Flow

  • Overview on Twister2Flow

Machine Learning

  • Overview on Machine Learning
  • K-Means
  • SVM

Deep Learning

  • Overview on Deep Learning
  • Pytorch Integration

K-Means

The need to process large am​​ounts of continuously arriving information has led to the exploration and application of big data analytics techniques. Likewise, the painstaking process of clustering numerous datasets containing large numbers of records with high dimensions calls for innovative methods. Traditional sequential clustering algorithms are unable to handle it. They are not scalable in relation to larger sizes of data sets, and they are most often computationally expensive in memory space and time complexities. Yet, the parallelization of data clustering algorithms is paramount when dealing with big data. K-Means clustering is an iterative algorithm hence, it requires a large number of iterative steps to find an optimal solution, and this procedure increases the processing time of clustering. Twister2 provides a dataflow task graph approach to distribute the tasks in a parallel manner and aggregate the results which reduce the processing time of K-Means Clustering process.

K-Means Clustering Implementation Details

The implementation details of k-means clustering in Twister2 is discussed below.

DataObjectConstants

The constants which are used by the k-means algorithm to specify the number of workers, parallelism, dimension, size of datapoints, size of centroids, file system, number of iterations, datapoints, and centroids directory.

  public static final String WORKERS = "workers";
  public static final String DIMENSIONS = "dim";
  public static final String PARALLELISM_VALUE = "parallelism";
  public static final String SHARED_FILE_SYSTEM = "fShared";
  public static final String DSIZE = "dsize";
  public static final String CSIZE = "csize";
  public static final String DINPUT_DIRECTORY = "dinput";
  public static final String CINPUT_DIRECTORY = "cinput";
  public static final String OUTPUT_DIRECTORY = "output";
  public static final String NUMBER_OF_FILES = "nFiles";
  public static final String FILE_SYSTEM = "filesys";
  public static final String ITERATIONS = "iter";

KMeansMain

The entry point for the K-Means clustering algorithm is implemented in KMeansMain class. It parses the command line parameters submitted by the user for running the K-Means clustering algorithm. It first set the submitted variables in the JobConfig object and put the JobConfig object into the Twister2Job Builder, set the worker class (KMeansComputeJob.java in this example) and submit the job.

edu.iu.dsc.tws.examples.batch.kmeans.KMeansMain

KMeansComputeJob

It is the main class for the K-Means clustering which consists of four main tasks namely generation of datapoints and centroids, partition and read the partitioned data points, partition and read the centroids, and perform the distance calculation between the datapoints and the centroids. It extends the IWorker class which has the execute() method that invokes the KMeansUtils class to generate the datapoints and the centroids in their respective filesystem and their directories. Then, the execute() method of KMeansComputeJob invokes "datapointsTaskgraph", "centroidsTaskGraph", and "kmeansTaskGraph". We will briefly discuss the functionalities of each task graph defined in the KMeansComputeJob.

Reading and partitioning the Datapoints

The main functionality of the first task graph is to partition the data points, convert the partitioned datapoints into two-dimensional array, and write the two-dimensional array into their respective task index values.

    /* First Graph to partition and read the partitioned data points **/
   PointDataSource ps = new PointDataSource(Context.TWISTER2_DIRECT_EDGE,
           dataDirectory, "points", dimension);

First, add point data source to the task graph builder for the first task graph. Then, set the operation mode and the task graph name.

     datapointsComputeGraphBuilder.addSource("datapointsource", ps, parallelismValue);
     datapointsComputeGraphBuilder.setMode(OperationMode.BATCH);
     datapointsComputeGraphBuilder.setTaskGraphName("datapointsTG");

Finally, invoke the computeGraphBuilder to build the first task graph, get the task schedule plan and execution plan for the first task graph, and call the execute() method to execute the datapoints task graph.

    //Build the first taskgraph
    DataFlowTaskGraph datapointsTaskGraph = computeGraphBuilder.build();
    //Get the execution plan for the first task graph
    ExecutionPlan firstGraphExecutionPlan = taskExecutor.plan(datapointsTaskGraph);
    //Actual execution for the first taskgraph
    taskExecutor.execute(datapointsTaskGraph, firstGraphExecutionPlan); 

PointDataSource

This class partition the datapoints which is based on the task parallelism value. It may either use the "LocalTextInputPartitioner" or "LocalFixedInputPartitioner" to partition the datapoints. Finally, write the partitioned datapoints into their respective edges. The LocalTextInputPartitioner partition the datapoints based on the block whereas the LocalFixedInputPartitioner partition the datapoints based on the length of the file. For example, if the task parallelism is 4, if there are 16 datapoints each task will get 4 datapoints to process. The partitioned datapoints are converted into two-dimensional array.

 @Override
  public void prepare(Config cfg, TaskContext context) {
    super.prepare(cfg, context);
    ExecutionRuntime runtime = (ExecutionRuntime) cfg.get(ExecutorContext.TWISTER2_RUNTIME_OBJECT);
    this.source = runtime.createInput(cfg, context, new LocalTextInputPartitioner(
        new Path(getDataDirectory()), context.getParallelism(), config));
  }

Finally, write the appropriate datapoints into their respective task index values with the entity partition values.

 @Override
  public DataPartition<double[][]> get() {
    return new EntityPartition<>(context.taskIndex(), dataPointsLocal);
  }

Reading and partitioning the Centroids

Similar to the datapoints, the second task graph perform three processes namely partitioning, converting the partitioned centroids into array, and writing into respective task index values but, with one major difference of read the complete file as one partition.

      PointDataSource cs = new PointDataSource(Context.TWISTER2_DIRECT_EDGE, centroidDirectory,
            "centroids", dimension);
      ComputeGraphBuilder centroidsComputeGraphBuilder = ComputeGraphBuilder.newBuilder(conf);

Similar to the first task graph, it add point data source to the task graph builder for the second graph and then it set the operation mode and the task graph name.

   centroidsComputeGraphBuilder.addSource("centroidsource", cs, parallelismValue);
   centroidsComputeGraphBuilder.setMode(OperationMode.BATCH);
   centroidsComputeGraphBuilder.setTaskGraphName("centTG");

Finally, invoke the build() method to build the second task graph, get the task schedule plan and execution plan for the second task graph, and call the execute() method to execute the centroids task graph.

    //Build the second taskgraph
    DataFlowTaskGraph centroidsTaskGraph = computeGraphBuilder.build();
    //Get the execution plan for the second task graph
    ExecutionPlan secondGraphExecutionPlan = taskExecutor.plan(centroidsTaskGraph);
    //Actual execution for the second taskgraph
    taskExecutor.execute(centroidsTaskGraph, secondGraphExecutionPlan); 

KMeans Clustering

The third task graph has the following classes namely KMeansSource, KMeansAllReduceTask, and CentroidAggregator.

    /* Third Graph to do the actual calculation **/
       KMeansSourceTask kMeansSourceTask = new KMeansSourceTask();
       KMeansAllReduceTask kMeansAllReduceTask = new KMeansAllReduceTask();
       ComputeGraphBuilder kmeansComputeGraphBuilder = ComputeGraphBuilder.newBuilder(conf);
   
       //Add source, and sink tasks to the task graph builder for the third task graph
       kmeansComputeGraphBuilder.addSource("kmeanssource", kMeansSourceTask, parallelismValue);
       ComputeConnection kMeanscomputeConnection = kmeansComputeGraphBuilder.addCompute(
           "kmeanssink", kMeansAllReduceTask, parallelismValue);
   
       //Creating the communication edges between the tasks for the third task graph
       kMeanscomputeConnection.allreduce("kmeanssource")
           .viaEdge("all-reduce")
           .withReductionFunction(new CentroidAggregator())
           .withDataType(MessageTypes.OBJECT);
       kmeansComputeGraphBuilder.setMode(OperationMode.BATCH);
       kmeansComputeGraphBuilder.setTaskGraphName("kmeansTG");

Assigning datapoints and initial centroids

The datapoints and centroids are sent to the KMeansTaskGraph as "points" object and "centroids" object as an input for further processing through receivable name set. Finally, it invokes the execute() method of the task executor to do the clustering process.

    //Perform the iterations from 0 to 'n' number of iterations
   IExecutor ex = taskExecutor.createExecution(kmeansTaskGraph);
    for (int i = 0; i < iterations; i++) {
         //actual execution of the third task graph
         ex.execute(i == iterations - 1);
    }

New Centroid Updation

This process repeats for ‘n’ number of iterations as specified by the user. For every iteration, the new centroid value is calculated and the calculated value is distributed across all the task instances. At the end of every iteration, the centroid value is updated and the iteration continues with the new centroid value.

KMeansSourceTask

First, the execute method in KMeansSource retrieve the partitioned data points into their respective task index values and the complete centroid values into their respective task index values. The retrieved data points and centroids are sent to the KMeansUtils to find the nearest centers using the Euclidean distance.

   @Override
   public void execute() {
      int dim = config.getIntegerValue("dim", 2);
      double[][] datapoints = (double[][]) dataPartition.first();
      double[][] centroid = (double[][]) centroidPartition.first();
      double[][] kMeansCenters = KMeansUtils.findNearestCenter(dim, datapoints, centroid);
      context.writeEnd("all-reduce", kMeansCenters);
   }

KMeansAllReduceTask

The KMeansAllReduceTask write the calculated centroid values of their partitioned datapoints into their respective task index values.

    @Override
    public boolean execute(IMessage message) {
      LOG.log(Level.FINE, "Received centroids: " + context.getWorkerId()
          + ":" + context.taskId());
      centroids = (double[][]) message.getContent();
      newCentroids = new double[centroids.length][centroids[0].length - 1];
      for (int i = 0; i < centroids.length; i++) {
        for (int j = 0; j < centroids[0].length - 1; j++) {
          double newVal = centroids[i][j] / centroids[i][centroids[0].length - 1];
          newCentroids[i][j] = newVal;
        }
      }
      return true;
    }

    @Override
    public DataPartition<double[][]> get() {
      return new EntityPartition<>(context.taskIndex(), newCentroids);
    }

CentroidAggregator

The CentroidAggregator implements the IFunction and the function OnMessage which accepts two objects as an argument.

public Object onMessage(Object object1, Object object2)

It sums the corresponding centroid values and return the same.

ret.setCenters(newCentroids); 

To Run K-Means Clustering using Task Graph

This command generate and write the datapoints and centroids in the local filesystem and run the K-Means clustering process. If the user wants to generate and do the processing with csv file, they have to specify the csv file type as "-ftype csv" or else if the user wants to do the processing with text file, they have to specify the file type as "-ftype txt".

./bin/twister2 submit standalone jar examples/libexamples-java.jar edu.iu.dsc.tws.examples.batch.kmeans.KMeansMain -dinput /tmp/dinput -cinput /tmp/cinput -fShared false -nFiles 1 -output /tmp/output -workers 2 -dim 2 -parallelism 4 -filesys local -dsize 1000 -csize 4 -iter 100 -type graph -ftype txt

This command generate and write the datapoints and centroids in the HDFS and run the run the K-Means clustering process.

./bin/twister2 submit standalone jar examples/libexamples-java.jar edu.iu.dsc.tws.examples.batch.kmeans.KMeansMain -dinput hdfs://namenode:9000/tmp/dinput -cinput hdfs://namenode:9000/tmp/cinput -fShared false -nFiles 1 -output hdfs://namenode:9000/tmp/output -workers 2 -dim 2 -parallelism 4 -filesys hdfs -dsize 1000 -csize 4 -iter 100 -type graph -ftype txt

To Run K-Means Clustering using TSet

This command generate and write the datapoints and centroids in the local filesystem and run the K-Means clustering process.

./bin/twister2 submit standalone jar examples/libexamples-java.jar edu.iu.dsc.tws.examples.batch.kmeans.KMeansMain -dinput /tmp/dinput -cinput /tmp/cinput -fShared false -nFiles 1 -output /tmp/output -workers 2 -dim 2 -parallelism 4 -filesys local -dsize 1000 -csize 4 -iter 100 -type tset -ftype txt

This command generate and write the datapoints and centroids in the HDFS and run the run the K-Means clustering process.

./bin/twister2 submit standalone jar examples/libexamples-java.jar edu.iu.dsc.tws.examples.batch.kmeans.KMeansMain -dinput hdfs://namenode:9000/tmp/dinput -cinput hdfs://namenode:9000/tmp/cinput -fShared false -nFiles 1 -output hdfs://namenode:9000/tmp/output -workers 2 -dim 2 -parallelism 4 -filesys hdfs -dsize 1000 -csize 4 -iter 100 -type tset -ftype txt

Sample Output

[2019-11-22 10:41:15 -0500] [INFO] [-] [main] edu.iu.dsc.tws.rsched.schedulers.standalone.MPILauncher: Starting the job master: 127.0.1.1:44675  
[2019-11-22 10:41:15 -0500] [WARNING] [-] [main] edu.iu.dsc.tws.master.server.JobMaster: Dashboard host address is null. Not connecting to Dashboard  
[2019-11-22 10:41:15 -0500] [INFO] [-] [main] edu.iu.dsc.tws.common.net.tcp.Server: Starting server on kannan-Precision-5820-Tower-X-Series:44675  
[2019-11-22 10:41:15 -0500] [INFO] [-] [JM] edu.iu.dsc.tws.master.server.JobMaster: JobMaster [127.0.1.1] started and waiting worker messages on port: 44675  
[2019-11-22 10:41:15 -0500] [INFO] [-] [Thread-4] edu.iu.dsc.tws.rsched.schedulers.standalone.MPIController: Working directory: /home/kannan/.twister2/jobs  
[2019-11-22 10:41:15 -0500] [INFO] [-] [Thread-4] edu.iu.dsc.tws.rsched.schedulers.standalone.MPIController: Launching job in standalone scheduler with no of containers = 2  
[2019-11-22 10:41:15 -0500] [INFO] [-] [Thread-4] edu.iu.dsc.tws.rsched.schedulers.standalone.MPICommand: Job class path: /home/kannan/.twister2/jobs/KMeans-job/libexamples-java.jar  
[2019-11-22 10:41:15 -0500] [INFO] [-] [Thread-4] edu.iu.dsc.tws.rsched.schedulers.standalone.StandaloneCommand: Java version : 8  
[2019-11-22 10:41:16 -0500] [INFO] [-] [JM] edu.iu.dsc.tws.master.server.WorkerMonitor: Worker: 1 joined the job.  
[2019-11-22 10:41:16 -0500] [INFO] [-] [JM] edu.iu.dsc.tws.master.server.WorkerMonitor: Worker: 0 joined the job.  
[2019-11-22 10:41:16 -0500] [INFO] [-] [JM] edu.iu.dsc.tws.master.server.WorkerHandler: All workers joined the job. Worker IDs: [0, 1]  
[2019-11-22 10:41:16 -0500] [INFO] [-] [JM] edu.iu.dsc.tws.master.server.WorkerHandler: Sending WorkersJoined messages ...  
[2019-11-22 10:41:17 -0500] [INFO] [worker-0] [main] edu.iu.dsc.tws.examples.batch.kmeans.KMeansComputeJob: Total K-Means Execution Time: 352   Data Load time : 109    Compute Time : 243  
[2019-11-22 10:41:17 -0500] [INFO] [worker-0] [main] edu.iu.dsc.tws.rsched.schedulers.standalone.MPIWorker: Worker finished executing - 0  
[2019-11-22 10:41:17 -0500] [INFO] [-] [JM] edu.iu.dsc.tws.master.server.WorkerMonitor: Worker:0 COMPLETED.  
[2019-11-22 10:41:17 -0500] [INFO] [worker-1] [main] edu.iu.dsc.tws.examples.batch.kmeans.KMeansComputeJob: Total K-Means Execution Time: 354   Data Load time : 130    Compute Time : 224  
[2019-11-22 10:41:17 -0500] [INFO] [worker-1] [main] edu.iu.dsc.tws.rsched.schedulers.standalone.MPIWorker: Worker finished executing - 1  
[2019-11-22 10:41:17 -0500] [INFO] [-] [JM] edu.iu.dsc.tws.master.server.WorkerMonitor: Worker:1 COMPLETED.  
[2019-11-22 10:41:17 -0500] [INFO] [-] [JM] edu.iu.dsc.tws.master.server.WorkerMonitor: All 2 workers COMPLETED. Terminating the job.  
[2019-11-22 10:41:17 -0500] [INFO] [-] [main] edu.iu.dsc.tws.rsched.core.ResourceAllocator: CLEANED TEMPORARY DIRECTORY......:/tmp/twister2-KMeans-job-7364072149483599729  
← Overview on Machine LearningSVM →
  • K-Means Clustering Implementation Details
    • DataObjectConstants
    • KMeansMain
    • KMeansComputeJob
    • Reading and partitioning the Datapoints
    • Reading and partitioning the Centroids
    • KMeans Clustering
    • KMeansSourceTask
    • KMeansAllReduceTask
    • CentroidAggregator
  • To Run K-Means Clustering using Task Graph
  • To Run K-Means Clustering using TSet
    • Sample Output
Twister2
Docs
Getting Started (Quickstart)Guides (Programming Guides)
Community
Stack OverflowProject Chat
More
BlogGitHubStar
Copyright © 2020 Indiana University