public class ZKWorkerController extends java.lang.Object implements IWorkerController, IWorkerStatusUpdater
We create following directories for a job by Job Master: persistent state znode (directory) ephemeral state znode (directory) default barrier znode (directory) init barrier znode (directory) events znode (directory) We create a persistent znode for each worker under the job persistent state znode. It has WorkerInfo object, the latest WorkerState and worker restart count. Job Master watches the children of this znode for worker state changes. We create an ephemeral znode for each worker under the job ephemeral state znode. The children of this znode is used for watching for worker failures and joins. Job Master watches the children of this znode. Persistent znodes for workers must be created before ephemeral znodes, because, JM gets workerInfo from persistent znode We have two types of barriers: default and init Workers wait at the init barrier when they are starting. They also come back to the init barrier in case of a failure. They all proceed from the init barrier after a failure We create a barrier znode for each worker under the barrier znode, When all workers created their znodes under this directory, all workers arrived on the barrier. Job master watches the children of this znode and informs all workers by publishing an event. Each worker is responsible for deletion of their znodes under the barrier directory, when they proceed through the barrier. Job Master deletes the worker barrier znodes in case of scaling down or job termination.
Events: All events are published on the events znode (directory) as new child znodes with sequential numerical names
Modifier and Type | Field and Description |
---|---|
static java.util.logging.Logger |
LOG |
Constructor and Description |
---|
ZKWorkerController(Config config,
java.lang.String jobID,
int numberOfWorkers,
WorkerInfo workerInfo)
Construct ZKWorkerController but do not initialize yet
|
Modifier and Type | Method and Description |
---|---|
boolean |
addAllJoinedListener(IAllJoinedListener iAllJoinedListener)
add a single IAllJoinedListener
if additional IAllJoinedListener tried to be added,
do not add and return false
|
boolean |
addFailureListener(IWorkerFailureListener iWorkerFailureListener)
add a single IWorkerFailureListener
if additional IWorkerFailureListener tried to be added,
do not add and return false
|
void |
addJMFailureListener(IJobMasterFailureListener iJobMasterFailureListener)
TODO: jm restarted implemented, but jm failed not implemented yet
Supports multiple IJobMasterFailureListeners
|
boolean |
addScalerListener(IScalerListener iScalerListener)
add a single IScalerListener
if additional IScalerListener tried to be added,
do not add and return false
|
protected java.util.List<WorkerInfo> |
cloneWorkers() |
void |
close()
close all local entities.
|
java.util.List<WorkerInfo> |
getAllWorkers()
get all workers in the job.
|
CheckpointingClient |
getCheckpointingClient() |
java.util.List<WorkerInfo> |
getCurrentWorkers()
Get current list of workers
This list does not have the workers that have failed or already completed
|
IWorkerFailureListener |
getFailureListener()
Get the failure listener
|
java.util.List<WorkerInfo> |
getJoinedWorkers()
get all joined workers in this job, including the ones finished execution
if there are some workers that have not joined yet, they may not be included in this list.
|
int |
getNumberOfCurrentWorkers()
get number of current workers in the job as seen from this worker
|
int |
getNumberOfWorkers()
return the number of all workers in this job,
including non-started ones and finished ones
|
WorkerInfo |
getWorkerInfo()
return the WorkerInfo object for this worker
|
WorkerInfo |
getWorkerInfoForID(int id)
return the WorkerInfo object for the given ID
|
WorkerState |
getWorkerStatusForID(int id)
return the status of any worker in the job
|
void |
initialize(int restartCount1,
long jsTime)
Initialize this ZKWorkerController
Connect to the ZK server
create a persistent znode for this worker
set this WorkerInfo and its status in the body of that znode
create an ephemeral znode for this worker
create a cache for the persistent znode of the job for watching job scaling events
initialState has to be either: WorkerState.STARTED or WorkerState.RESTARTED
|
boolean |
isRestarted() |
void |
setCheckpointingClient(CheckpointingClient checkpointingClient) |
boolean |
updateWorkerStatus(WorkerState newStatus)
Update worker status with new state
return true if successful
|
void |
waitOnBarrier()
wait for all workers in the job to arrive at this barrier
After waiting for the timeout specified in ControllerContext.maxWaitTimeOnBarrier
if some workers still could not arrive at the barrier, throw an exception
|
void |
waitOnBarrier(long timeLimit)
All workers create a znode on the barrier directory
Job master watches znode creations/removals on this directory
when the number of znodes on that directory reaches the number of workers in the job,
Job master publishes AllArrivedOnBarrier event
Workers proceed when they get this event or when they time out
|
void |
waitOnInitBarrier()
init barrier
the same algorithm as the default barrier
|
int |
workerRestartCount()
get worker restartCount
zero means starting for the first time
|
public ZKWorkerController(Config config, java.lang.String jobID, int numberOfWorkers, WorkerInfo workerInfo)
public void initialize(int restartCount1, long jsTime) throws java.lang.Exception
The body of the persistent worker znode will be updated as the status of worker changes from STARTED, COMPLETED,
java.lang.Exception
public void setCheckpointingClient(CheckpointingClient checkpointingClient)
public boolean isRestarted()
public boolean updateWorkerStatus(WorkerState newStatus)
Initially worker status is set as STARTED or RESTARTED. Therefore, there is no need to call this method after starting this IWorkerController This method should be called to change worker status to COMPLETED, FAILED, etc.
updateWorkerStatus
in interface IWorkerStatusUpdater
public WorkerState getWorkerStatusForID(int id)
IWorkerStatusUpdater
getWorkerStatusForID
in interface IWorkerStatusUpdater
public WorkerInfo getWorkerInfo()
IWorkerController
getWorkerInfo
in interface IWorkerController
public WorkerInfo getWorkerInfoForID(int id)
IWorkerController
getWorkerInfoForID
in interface IWorkerController
public int getNumberOfWorkers()
IWorkerController
getNumberOfWorkers
in interface IWorkerController
public java.util.List<WorkerInfo> getJoinedWorkers()
IWorkerController
getJoinedWorkers
in interface IWorkerController
public java.util.List<WorkerInfo> getAllWorkers() throws TimeoutException
IWorkerController
return all workers in the job including the ones that have already left, if any
getAllWorkers
in interface IWorkerController
TimeoutException
protected java.util.List<WorkerInfo> cloneWorkers()
public boolean addFailureListener(IWorkerFailureListener iWorkerFailureListener)
public boolean addAllJoinedListener(IAllJoinedListener iAllJoinedListener)
public boolean addScalerListener(IScalerListener iScalerListener)
public void addJMFailureListener(IJobMasterFailureListener iJobMasterFailureListener)
public java.util.List<WorkerInfo> getCurrentWorkers()
public int getNumberOfCurrentWorkers() throws java.lang.Exception
java.lang.Exception
public void waitOnBarrier() throws TimeoutException
IWorkerController
waitOnBarrier
in interface IWorkerController
TimeoutException
public void waitOnBarrier(long timeLimit) throws TimeoutException
Workers remove their znodes after they proceed through the barrier so that they can wait on the barrier again Workers are responsible for creating and removing znodes on the barrier Job master removes barrier znode after the job completion or scale down. if timeout is reached, throws TimeoutException.
waitOnBarrier
in interface IWorkerController
TimeoutException
public void waitOnInitBarrier() throws TimeoutException
waitOnInitBarrier
in interface IWorkerController
TimeoutException
public int workerRestartCount()
IWorkerController
workerRestartCount
in interface IWorkerController
public IWorkerFailureListener getFailureListener()
IWorkerController
getFailureListener
in interface IWorkerController
public CheckpointingClient getCheckpointingClient()
getCheckpointingClient
in interface IWorkerController
public void close()