Package titan.scheduler
Class Scheduler
java.lang.Object
titan.scheduler.Scheduler
The main Scheduler class responsible for managing workers, dispatching jobs, and maintaining system state.
It handles job submission, scheduling, execution, and recovery, interacting with workers via RPC and
persisting state using Redis.
-
Field Summary
FieldsModifier and TypeFieldDescriptionprivate final ExecutorServiceprivate final Map<String,TaskExecution> private final ScheduledExecutorServiceprivate booleanprivate static final int(package private) intprivate intprivate final TitanJRedisAdapterprivate Stringprivate intprivate final ScheduledExecutorServiceprivate booleanprivate final RpcClientprivate final SchedulerServerprivate final ExecutorServiceprivate final BlockingQueue<Job>private final BlockingQueue<ScheduledJob>private final WorkerRegistry -
Constructor Summary
ConstructorsConstructorDescriptionScheduler(int port) Constructs a new Scheduler instance, initializing its core components. -
Method Summary
Modifier and TypeMethodDescriptionprivate voidappendLogToDisk(String jobId, String line) Appends a log line to a job-specific log file on disk.voidcancelChildren(String failedParentId) Recursively cancels child jobs whose parent job has failed.voidSends heartbeat requests to all registered workers to verify their liveness and update their load status.private voidcompleteJob(Job job, String result, TaskExecution record) Marks a job as completed.private StringexecuteDeploySequence(Job job, Worker worker, String payload) Executes a deployment sequence on a worker.private StringexecuteJobRequest(Job job, Worker worker) Executes a job request on a specified worker based on the job's payload type.private StringexecuteRunArchive(Job job, Worker worker, String payload) Executes a job from an archived (ZIP) asset on a worker.private StringexecuteRunOneOff(Job job, Worker worker, String payload) Executes a one-off script or program on a worker.private StringexecuteServiceArchive(Job job, Worker worker, String payload) Starts a long-running service from an archived (ZIP) asset on a worker.private StringexecuteStandardTask(Job job, Worker worker, String payload) Executes a standard task on a worker.private StringExtracts the skill requirement (capability) from a job's payload.private intfindSafePort(List<Worker> currentWorkers, int minRange, int maxRange) Finds an available port within a specified range for a new worker.Retrieves the map of jobs currently waiting for their DAG dependencies to be met.getJobStatus(String id) Retrieves the current status of a job based on its ID.Retrieves the map of currently running services (identified by job ID) to their assigned worker.Retrieves a snapshot of the recent log lines for a given job from the in-memory buffer.getRedis()Retrieves the Redis adapter instance used by the scheduler.Generates a human-readable string containing various system statistics.Generates a JSON string containing various system statistics.Retrieves the worker registry instance used by the scheduler.voidhandleJobCallback(String payload) Processes callbacks from workers regarding the completion or failure of asynchronous jobs.private voidhandleJobFailure(Job job) Handles the failure of a job.private booleanisPortInUseLocally(int port) Checks if a given port is currently in use on the local machine.private booleanisWorkerAlive(String host, int port) Checks if a worker is alive and reachable on a given host and port by attempting to open a socket connection.voidStreams a log line for a specific job.private voidpropagateAffinity(String parentId, String workerPortId) Propagates worker affinity from a completed parent job to its dependent child jobs.private voidPeriodically checks the cluster's health and load to determine if scaling actions (up or down) are needed.private voidRecovers the scheduler's state from Redis upon startup.redisKVGet(String key) Retrieves the value associated with a given key from Redis.voidredisKVSet(String key, String value) Sets a key-value pair in Redis.voidredisSetAdd(String key, String value) Adds a member to a Redis set.voidregisterWorker(String host, int port, String capability, boolean isPermanent) Registers a new worker with the scheduler.private voidThe main dispatch loop of the scheduler.private voidsafeRedisSadd(String key, String member) Safely adds a member to a Redis set, catching and logging any IOException.private voidsafeRedisSet(String key, String value) Safely sets a key-value pair in Redis, catching and logging any IOException.safeRedisSMembers(String key) Safely retrieves all members of a Redis set, catching and logging any exceptions.private voidsafeRedisSrem(String key, String member) Safely removes a member from a Redis set, catching and logging any exceptions.private WorkerselectBestWorker(Job job, List<Worker> availableWorkers) Selects the most suitable worker for a given job from a list of available workers.private StringsendExecuteCommand(Worker worker, byte opCode, String payload) Sends an RPC command to a specific worker and handles the response.shutdownWorkerNode(String targetHost, int targetPort) Initiates a graceful shutdown of a specific worker node.voidstart()Initiates the scheduler's operations.voidActivates the auto-scaling mechanism for the Titan cluster.voidstop()Shuts down the scheduler and all its associated executor services.stopRemoteService(String serviceId) Stops a remote service identified by its service ID.voidSubmits a job to the scheduler using a raw string payload.voidSubmits aJobto the scheduler for processing.private voidunlockChildren(String parentId) Unlocks child jobs that were dependent on a newly completed parent job.
-
Field Details
-
workerRegistry
-
schedulerClient
-
taskQueue
-
waitingRoom
-
deadLetterQueue
-
schedulerServer
-
heartBeatExecutor
-
dispatchExecutor
-
serverExecutor
-
isRunning
private volatile boolean isRunning -
port
private int port -
redisPort
private int redisPort -
redisHost
-
redis
-
executionHistory
-
liveServiceMap
-
workerCompletionStats
-
workerRecentHistory
-
runningJobs
-
dagWaitingRoom
-
MAX_WORKERS
int MAX_WORKERS -
scalingInProgress
private volatile boolean scalingInProgress -
scalerExecutor
-
portBlacklist
-
liveLogBuffer
-
MAX_LOG_LINES
private static final int MAX_LOG_LINES- See Also:
-
-
Constructor Details
-
Scheduler
public Scheduler(int port) Constructs a new Scheduler instance, initializing its core components. This includes the worker registry, RPC client, job queues (task, dead-letter, waiting room, DAG waiting room), and executor services for heartbeats, dispatching, and the scheduler server. It also sets up the Redis adapter for persistence and starts a clock watcher thread for delayed jobs.- Parameters:
port- The port on which the scheduler server will listen for incoming requests.- Throws:
RuntimeException- if the Scheduler Server fails to start due to an IOException.
-
-
Method Details
-
start
public void start()Initiates the scheduler's operations. This method connects to Redis, attempts to recover any orphaned jobs, starts the scheduler server in a separate thread, and schedules periodic heartbeat checks for workers. It also starts the main job dispatch loop. -
getRedis
Retrieves the Redis adapter instance used by the scheduler.- Returns:
- The
TitanJRedisAdapterinstance.
-
getWorkerRegistry
Retrieves the worker registry instance used by the scheduler.- Returns:
- The
WorkerRegistryinstance.
-
startAutoScaler
public void startAutoScaler()Activates the auto-scaling mechanism for the Titan cluster. This method schedules a periodic task to reconcile the cluster size based on worker load and availability. -
reconcileClusters
private void reconcileClusters()Periodically checks the cluster's health and load to determine if scaling actions (up or down) are needed. It identifies saturated worker pools to scale up new workers and detects idle, non-permanent workers for scale-down. Scaling up involves submitting a special 'DEPLOY_PAYLOAD' job to launch a new worker. Scaling down involves gracefully shutting down an idle worker node. -
findSafePort
Finds an available port within a specified range for a new worker. It checks against currently registered workers and a blacklist of recently failed ports.- Parameters:
currentWorkers- A list of currently active workers.minRange- The minimum port number to consider.maxRange- The maximum port number to consider.- Returns:
- An available port number, or -1 if no safe port is found within the scan range.
-
isPortInUseLocally
private boolean isPortInUseLocally(int port) Checks if a given port is currently in use on the local machine.- Parameters:
port- The port number to check.- Returns:
trueif the port is in use,falseotherwise.
-
checkHeartBeat
public void checkHeartBeat()Sends heartbeat requests to all registered workers to verify their liveness and update their load status. Workers that do not respond are marked as dead. Responding workers update their last seen timestamp and load metrics. This method also updates Redis with the status of live workers. -
getLiveServiceMap
Retrieves the map of currently running services (identified by job ID) to their assigned worker.- Returns:
- A map where keys are service job IDs and values are the
Workerinstances hosting them.
-
registerWorker
Registers a new worker with the scheduler. This method adds the worker to the registry, clears any `scalingInProgress` flag, and removes the port from the blacklist. It also handles the 'promotion' of auto-scaled worker deployment jobs, marking the parent worker as idle.- Parameters:
host- The hostname or IP address of the worker.port- The port number the worker is listening on.capability- A string describing the worker's capabilities (e.g., "GENERAL", "GPU").isPermanent-trueif the worker is a permanent part of the cluster and should not be scaled down,falseotherwise.
-
getDAGWaitingRoom
Retrieves the map of jobs currently waiting for their DAG dependencies to be met.- Returns:
- A map where keys are job IDs and values are the
Jobinstances waiting on dependencies.
-
submitJob
Submits aJobto the scheduler for processing. The job's state is persisted to Redis. If the job has dependencies, it's placed in the DAG waiting room. If it has a scheduled time in the future, it's placed in the waiting room. Otherwise, it's added directly to the active task queue.- Parameters:
job- TheJobobject to be submitted.
-
submitJob
Submits a job to the scheduler using a raw string payload. This method parses the payload to extract job ID, priority, and delay, then constructs aJobobject and delegates tosubmitJob(Job). The payload format can include optional ID, priority, and delay separated by pipes. Example: "JOB-101|RUN_PAYLOAD|script.py|data|GPU|10|1000" (ID|Payload|Priority|Delay)- Parameters:
jobPayload- The raw string payload representing the job.
-
extractSkillRequirement
Extracts the skill requirement (capability) from a job's payload. This method parses the payload string to identify specific keywords or patterns that indicate a worker capability needed for the job (e.g., "GPU", "GENERAL"). It handles various payload formats and metadata to accurately determine the skill.- Parameters:
job- TheJobfor which to extract the skill requirement.- Returns:
- A string representing the required skill (e.g., "GPU", "GENERAL"), defaulting to "GENERAL" if none is found.
-
redisKVSet
Sets a key-value pair in Redis. This is a public wrapper aroundsafeRedisSet(String, String).- Parameters:
key- The key to set.value- The value to associate with the key.
-
redisSetAdd
Adds a member to a Redis set. This is a public wrapper aroundsafeRedisSadd(String, String).- Parameters:
key- The key of the set.value- The member to add to the set.
-
redisKVGet
Retrieves the value associated with a given key from Redis.- Parameters:
key- The key to retrieve.- Returns:
- The string value associated with the key, or
nullif the key does not exist or an error occurs.
-
safeRedisSet
Safely sets a key-value pair in Redis, catching and logging any IOException.- Parameters:
key- The key to set.value- The value to associate with the key.
-
safeRedisSadd
Safely adds a member to a Redis set, catching and logging any IOException.- Parameters:
key- The key of the set.member- The member to add to the set.
-
safeRedisSrem
Safely removes a member from a Redis set, catching and logging any exceptions.- Parameters:
key- The key of the set.member- The member to remove from the set.
-
safeRedisSMembers
Safely retrieves all members of a Redis set, catching and logging any exceptions.- Parameters:
key- The key of the set.- Returns:
- A
Setof strings representing the members of the set, ornullif an error occurs.
-
recoverState
private void recoverState()Recovers the scheduler's state from Redis upon startup. It scans for active jobs that were not marked as completed or dead, and re-queues them into the appropriate scheduler queues (waiting room or task queue) based on their status and scheduled time. This ensures job continuity across scheduler restarts. -
runDispatchLoop
The main dispatch loop of the scheduler. This loop continuously polls the task queue for new jobs. When a job is available, it determines the required skill, selects the best available worker, dispatches the job to that worker, and handles the job's execution and potential failures. If no suitable worker is found or all workers are saturated, the job is re-queued.- Throws:
InterruptedException- If the dispatch loop thread is interrupted.
-
handleJobFailure
Handles the failure of a job. Increments the job's retry count. If the retry count exceeds a threshold (3 retries), the job is marked as DEAD, moved to the dead-letter queue, and any dependent child jobs are cancelled. Otherwise, the job is re-queued for another attempt. Special handling is included for auto-scale jobs to prevent infinite retries on deployment issues.- Parameters:
job- TheJobthat failed.
-
handleJobCallback
Processes callbacks from workers regarding the completion or failure of asynchronous jobs. The payload typically contains the job ID, status (COMPLETED/FAILED), and an optional result message. This method updates the job's status, clears the worker's active job flag, and triggers completion or failure handling.- Parameters:
payload- The callback string from the worker (e.g., "JOB-123|COMPLETED|Result: 5050").
-
completeJob
Marks a job as completed. Updates the job's status in Redis, removes it from the active jobs set, and records its completion in the execution history. It also decrements the load on the assigned worker, updates worker completion statistics and history, propagates affinity to child jobs, and unlocks any dependent jobs.- Parameters:
job- TheJobthat completed.result- The result string returned by the worker.record- TheTaskExecutionrecord associated with this job's execution.
-
selectBestWorker
Selects the most suitable worker for a given job from a list of available workers. Prioritizes workers with job affinity (sticky scheduling) if specified by the job. Otherwise, it selects the least loaded worker that is not saturated. -
propagateAffinity
Propagates worker affinity from a completed parent job to its dependent child jobs. If a child job requires affinity and its parent completed on a specific worker, the child job will be 'locked' to that same worker for execution, if that worker is still alive.- Parameters:
parentId- The ID of the completed parent job.workerPortId- The port ID of the worker where the parent job was executed.
-
executeJobRequest
Executes a job request on a specified worker based on the job's payload type. This method acts as a dispatcher for different types of job execution (standard, deploy, run one-off, archive). It sets the worker's current job ID before execution. -
executeStandardTask
Executes a standard task on a worker. This typically involves sending a simple RUN command with the job ID and the task payload. -
executeDeploySequence
Executes a deployment sequence on a worker. This involves staging a file (e.g., a JAR or script) and then starting it as a service on a specified port. It includes checks for port availability and waits for the deployed service to become reachable. Special handling is included for internal auto-scaling deployments.- Parameters:
job- TheJobrepresenting the deployment.worker- TheWorkeron which to deploy.payload- The deployment payload, including filename, base64 content, and optional target port.- Returns:
- A success message including the PID if available.
- Throws:
Exception- If staging fails, starting the service fails, or the deployed service does not become reachable.
-
isWorkerAlive
Checks if a worker is alive and reachable on a given host and port by attempting to open a socket connection.- Parameters:
host- The hostname or IP address of the worker.port- The port number of the worker.- Returns:
trueif the worker is reachable,falseotherwise.
-
executeRunOneOff
Executes a one-off script or program on a worker. This involves staging the script file (potentially with arguments) and then instructing the worker to run it asynchronously.- Parameters:
job- TheJobrepresenting the one-off execution.worker- TheWorkeron which to run the script.payload- The payload containing filename, optional arguments, and base64 script content.- Returns:
- The response from the worker, typically indicating job acceptance.
- Throws:
Exception- If staging fails or the run command fails.
-
executeRunArchive
Executes a job from an archived (ZIP) asset on a worker. This method resolves the archive pointer to get the entry point and base64 content of the archive, then sends it to the worker for execution.- Parameters:
job- TheJobto execute from an archive.worker- TheWorkeron which to run the archive job.payload- The payload containing the archive pointer (e.g., "zip_name.zip/entry.py").- Returns:
- The response from the worker.
- Throws:
Exception- If resolving the archive pointer fails or the execution command fails.
-
executeServiceArchive
Starts a long-running service from an archived (ZIP) asset on a worker. This method resolves the archive pointer, extracts the entry point and base64 content of the archive, and instructs the worker to start it as a detached service on a specified port.- Parameters:
job- TheJobrepresenting the archived service.worker- TheWorkeron which to start the service.payload- The payload containing the archive pointer, optional arguments, and target port.- Returns:
- The response from the worker, typically indicating deployment success.
- Throws:
Exception- If resolving the archive pointer fails or the service start command fails.
-
sendExecuteCommand
Sends an RPC command to a specific worker and handles the response. This is a utility method for communicating with workers.- Parameters:
worker- TheWorkerto send the command to.opCode- The operation code (TitanProtocolconstant) for the command.payload- The payload string for the command.- Returns:
- The response string from the worker.
- Throws:
Exception- If the response indicates an error or communication fails.
-
stopRemoteService
Stops a remote service identified by its service ID. It looks up the worker hosting the service and sends a STOP command to that worker. If successful, the service is removed from the live service map.- Parameters:
serviceId- The ID of the service to stop.- Returns:
- A success or error message indicating the outcome of the stop operation.
-
shutdownWorkerNode
Initiates a graceful shutdown of a specific worker node. It first stops any services hosted by the target worker, then sends a KILL_WORKER command to the worker. Permanent workers cannot be shut down via this method.- Parameters:
targetHost- The hostname or IP address of the worker to shut down.targetPort- The port number of the worker to shut down.- Returns:
- A success or error message indicating the outcome of the shutdown operation.
-
unlockChildren
Unlocks child jobs that were dependent on a newly completed parent job. It iterates through jobs in the DAG waiting room, resolves the dependency for the given parent ID, and if all dependencies for a child job are met, it moves that child job to the active task queue.- Parameters:
parentId- The ID of the parent job that has just completed.
-
cancelChildren
Recursively cancels child jobs whose parent job has failed. When a parent job fails, all its direct and indirect dependent jobs are marked as DEAD and moved to the dead-letter queue.- Parameters:
failedParentId- The ID of the parent job that failed.
-
logStream
Streams a log line for a specific job. The log line is added to an in-memory buffer for real-time retrieval and also appended to a persistent log file on disk. The in-memory buffer maintains a maximum number of lines to prevent excessive memory usage.- Parameters:
jobId- The ID of the job to which the log line belongs.line- The log line to stream.
-
appendLogToDisk
Appends a log line to a job-specific log file on disk. Log files are stored in the 'titan_server_logs' directory.- Parameters:
jobId- The ID of the job.line- The log line to append.
-
getLogs
Retrieves a snapshot of the recent log lines for a given job from the in-memory buffer. This method returns a copy of the log list to preventConcurrentModificationExceptions.- Parameters:
jobId- The ID of the job whose logs are to be retrieved.- Returns:
- A
Listof strings, each representing a log line for the specified job.
-
getSystemStats
Generates a human-readable string containing various system statistics. This includes the number of active workers, job queue sizes, and detailed status for each worker (load, capabilities, and hosted services).- Returns:
- A formatted string with system statistics.
-
getSystemStatsJSON
Generates a JSON string containing various system statistics. This includes the number of active workers, job queue sizes, and detailed status for each worker (port, capabilities, load, active job, recent history, and hosted services).- Returns:
- A JSON formatted string with system statistics.
-
getJobStatus
Retrieves the current status of a job based on its ID.- Parameters:
id- The ID of the job.- Returns:
- The
Job.Statusof the job, orJob.Status.PENDINGif the job is not found in execution history.
-
stop
public void stop()Shuts down the scheduler and all its associated executor services. This method gracefully stops the scheduler server, heartbeat executor, and dispatch executor.
-