Class Scheduler

java.lang.Object
titan.scheduler.Scheduler

public class Scheduler extends Object
The main Scheduler class responsible for managing workers, dispatching jobs, and maintaining system state. It handles job submission, scheduling, execution, and recovery, interacting with workers via RPC and persisting state using Redis.
  • Field Details

  • Constructor Details

    • Scheduler

      public Scheduler(int port)
      Constructs a new Scheduler instance, initializing its core components. This includes the worker registry, RPC client, job queues (task, dead-letter, waiting room, DAG waiting room), and executor services for heartbeats, dispatching, and the scheduler server. It also sets up the Redis adapter for persistence and starts a clock watcher thread for delayed jobs.
      Parameters:
      port - The port on which the scheduler server will listen for incoming requests.
      Throws:
      RuntimeException - if the Scheduler Server fails to start due to an IOException.
  • Method Details

    • start

      public void start()
      Initiates the scheduler's operations. This method connects to Redis, attempts to recover any orphaned jobs, starts the scheduler server in a separate thread, and schedules periodic heartbeat checks for workers. It also starts the main job dispatch loop.
    • getRedis

      public TitanJRedisAdapter getRedis()
      Retrieves the Redis adapter instance used by the scheduler.
      Returns:
      The TitanJRedisAdapter instance.
    • getWorkerRegistry

      public WorkerRegistry getWorkerRegistry()
      Retrieves the worker registry instance used by the scheduler.
      Returns:
      The WorkerRegistry instance.
    • startAutoScaler

      public void startAutoScaler()
      Activates the auto-scaling mechanism for the Titan cluster. This method schedules a periodic task to reconcile the cluster size based on worker load and availability.
    • reconcileClusters

      private void reconcileClusters()
      Periodically checks the cluster's health and load to determine if scaling actions (up or down) are needed. It identifies saturated worker pools to scale up new workers and detects idle, non-permanent workers for scale-down. Scaling up involves submitting a special 'DEPLOY_PAYLOAD' job to launch a new worker. Scaling down involves gracefully shutting down an idle worker node.
    • findSafePort

      private int findSafePort(List<Worker> currentWorkers, int minRange, int maxRange)
      Finds an available port within a specified range for a new worker. It checks against currently registered workers and a blacklist of recently failed ports.
      Parameters:
      currentWorkers - A list of currently active workers.
      minRange - The minimum port number to consider.
      maxRange - The maximum port number to consider.
      Returns:
      An available port number, or -1 if no safe port is found within the scan range.
    • isPortInUseLocally

      private boolean isPortInUseLocally(int port)
      Checks if a given port is currently in use on the local machine.
      Parameters:
      port - The port number to check.
      Returns:
      true if the port is in use, false otherwise.
    • checkHeartBeat

      public void checkHeartBeat()
      Sends heartbeat requests to all registered workers to verify their liveness and update their load status. Workers that do not respond are marked as dead. Responding workers update their last seen timestamp and load metrics. This method also updates Redis with the status of live workers.
    • getLiveServiceMap

      public Map<String,Worker> getLiveServiceMap()
      Retrieves the map of currently running services (identified by job ID) to their assigned worker.
      Returns:
      A map where keys are service job IDs and values are the Worker instances hosting them.
    • registerWorker

      public void registerWorker(String host, int port, String capability, boolean isPermanent)
      Registers a new worker with the scheduler. This method adds the worker to the registry, clears any `scalingInProgress` flag, and removes the port from the blacklist. It also handles the 'promotion' of auto-scaled worker deployment jobs, marking the parent worker as idle.
      Parameters:
      host - The hostname or IP address of the worker.
      port - The port number the worker is listening on.
      capability - A string describing the worker's capabilities (e.g., "GENERAL", "GPU").
      isPermanent - true if the worker is a permanent part of the cluster and should not be scaled down, false otherwise.
    • getDAGWaitingRoom

      public Map<String,Job> getDAGWaitingRoom()
      Retrieves the map of jobs currently waiting for their DAG dependencies to be met.
      Returns:
      A map where keys are job IDs and values are the Job instances waiting on dependencies.
    • submitJob

      public void submitJob(Job job)
      Submits a Job to the scheduler for processing. The job's state is persisted to Redis. If the job has dependencies, it's placed in the DAG waiting room. If it has a scheduled time in the future, it's placed in the waiting room. Otherwise, it's added directly to the active task queue.
      Parameters:
      job - The Job object to be submitted.
    • submitJob

      public void submitJob(String jobPayload)
      Submits a job to the scheduler using a raw string payload. This method parses the payload to extract job ID, priority, and delay, then constructs a Job object and delegates to submitJob(Job). The payload format can include optional ID, priority, and delay separated by pipes. Example: "JOB-101|RUN_PAYLOAD|script.py|data|GPU|10|1000" (ID|Payload|Priority|Delay)
      Parameters:
      jobPayload - The raw string payload representing the job.
    • extractSkillRequirement

      private String extractSkillRequirement(Job job)
      Extracts the skill requirement (capability) from a job's payload. This method parses the payload string to identify specific keywords or patterns that indicate a worker capability needed for the job (e.g., "GPU", "GENERAL"). It handles various payload formats and metadata to accurately determine the skill.
      Parameters:
      job - The Job for which to extract the skill requirement.
      Returns:
      A string representing the required skill (e.g., "GPU", "GENERAL"), defaulting to "GENERAL" if none is found.
    • redisKVSet

      public void redisKVSet(String key, String value)
      Sets a key-value pair in Redis. This is a public wrapper around safeRedisSet(String, String).
      Parameters:
      key - The key to set.
      value - The value to associate with the key.
    • redisSetAdd

      public void redisSetAdd(String key, String value)
      Adds a member to a Redis set. This is a public wrapper around safeRedisSadd(String, String).
      Parameters:
      key - The key of the set.
      value - The member to add to the set.
    • redisKVGet

      public String redisKVGet(String key)
      Retrieves the value associated with a given key from Redis.
      Parameters:
      key - The key to retrieve.
      Returns:
      The string value associated with the key, or null if the key does not exist or an error occurs.
    • safeRedisSet

      private void safeRedisSet(String key, String value)
      Safely sets a key-value pair in Redis, catching and logging any IOException.
      Parameters:
      key - The key to set.
      value - The value to associate with the key.
    • safeRedisSadd

      private void safeRedisSadd(String key, String member)
      Safely adds a member to a Redis set, catching and logging any IOException.
      Parameters:
      key - The key of the set.
      member - The member to add to the set.
    • safeRedisSrem

      private void safeRedisSrem(String key, String member)
      Safely removes a member from a Redis set, catching and logging any exceptions.
      Parameters:
      key - The key of the set.
      member - The member to remove from the set.
    • safeRedisSMembers

      public Set<String> safeRedisSMembers(String key)
      Safely retrieves all members of a Redis set, catching and logging any exceptions.
      Parameters:
      key - The key of the set.
      Returns:
      A Set of strings representing the members of the set, or null if an error occurs.
    • recoverState

      private void recoverState()
      Recovers the scheduler's state from Redis upon startup. It scans for active jobs that were not marked as completed or dead, and re-queues them into the appropriate scheduler queues (waiting room or task queue) based on their status and scheduled time. This ensures job continuity across scheduler restarts.
    • runDispatchLoop

      private void runDispatchLoop() throws InterruptedException
      The main dispatch loop of the scheduler. This loop continuously polls the task queue for new jobs. When a job is available, it determines the required skill, selects the best available worker, dispatches the job to that worker, and handles the job's execution and potential failures. If no suitable worker is found or all workers are saturated, the job is re-queued.
      Throws:
      InterruptedException - If the dispatch loop thread is interrupted.
    • handleJobFailure

      private void handleJobFailure(Job job)
      Handles the failure of a job. Increments the job's retry count. If the retry count exceeds a threshold (3 retries), the job is marked as DEAD, moved to the dead-letter queue, and any dependent child jobs are cancelled. Otherwise, the job is re-queued for another attempt. Special handling is included for auto-scale jobs to prevent infinite retries on deployment issues.
      Parameters:
      job - The Job that failed.
    • handleJobCallback

      public void handleJobCallback(String payload)
      Processes callbacks from workers regarding the completion or failure of asynchronous jobs. The payload typically contains the job ID, status (COMPLETED/FAILED), and an optional result message. This method updates the job's status, clears the worker's active job flag, and triggers completion or failure handling.
      Parameters:
      payload - The callback string from the worker (e.g., "JOB-123|COMPLETED|Result: 5050").
    • completeJob

      private void completeJob(Job job, String result, TaskExecution record)
      Marks a job as completed. Updates the job's status in Redis, removes it from the active jobs set, and records its completion in the execution history. It also decrements the load on the assigned worker, updates worker completion statistics and history, propagates affinity to child jobs, and unlocks any dependent jobs.
      Parameters:
      job - The Job that completed.
      result - The result string returned by the worker.
      record - The TaskExecution record associated with this job's execution.
    • selectBestWorker

      private Worker selectBestWorker(Job job, List<Worker> availableWorkers)
      Selects the most suitable worker for a given job from a list of available workers. Prioritizes workers with job affinity (sticky scheduling) if specified by the job. Otherwise, it selects the least loaded worker that is not saturated.
      Parameters:
      job - The Job to be dispatched.
      availableWorkers - A list of Worker instances capable of executing the job.
      Returns:
      The selected Worker, or null if no suitable worker is found.
    • propagateAffinity

      private void propagateAffinity(String parentId, String workerPortId)
      Propagates worker affinity from a completed parent job to its dependent child jobs. If a child job requires affinity and its parent completed on a specific worker, the child job will be 'locked' to that same worker for execution, if that worker is still alive.
      Parameters:
      parentId - The ID of the completed parent job.
      workerPortId - The port ID of the worker where the parent job was executed.
    • executeJobRequest

      private String executeJobRequest(Job job, Worker worker) throws Exception
      Executes a job request on a specified worker based on the job's payload type. This method acts as a dispatcher for different types of job execution (standard, deploy, run one-off, archive). It sets the worker's current job ID before execution.
      Parameters:
      job - The Job to execute.
      worker - The Worker on which to execute the job.
      Returns:
      The response string from the worker after executing the job.
      Throws:
      Exception - If an error occurs during job execution or communication with the worker.
    • executeStandardTask

      private String executeStandardTask(Job job, Worker worker, String payload) throws Exception
      Executes a standard task on a worker. This typically involves sending a simple RUN command with the job ID and the task payload.
      Parameters:
      job - The Job to execute.
      worker - The Worker on which to execute the task.
      payload - The raw payload for the task.
      Returns:
      The response from the worker.
      Throws:
      Exception - If an error occurs during execution.
    • executeDeploySequence

      private String executeDeploySequence(Job job, Worker worker, String payload) throws Exception
      Executes a deployment sequence on a worker. This involves staging a file (e.g., a JAR or script) and then starting it as a service on a specified port. It includes checks for port availability and waits for the deployed service to become reachable. Special handling is included for internal auto-scaling deployments.
      Parameters:
      job - The Job representing the deployment.
      worker - The Worker on which to deploy.
      payload - The deployment payload, including filename, base64 content, and optional target port.
      Returns:
      A success message including the PID if available.
      Throws:
      Exception - If staging fails, starting the service fails, or the deployed service does not become reachable.
    • isWorkerAlive

      private boolean isWorkerAlive(String host, int port)
      Checks if a worker is alive and reachable on a given host and port by attempting to open a socket connection.
      Parameters:
      host - The hostname or IP address of the worker.
      port - The port number of the worker.
      Returns:
      true if the worker is reachable, false otherwise.
    • executeRunOneOff

      private String executeRunOneOff(Job job, Worker worker, String payload) throws Exception
      Executes a one-off script or program on a worker. This involves staging the script file (potentially with arguments) and then instructing the worker to run it asynchronously.
      Parameters:
      job - The Job representing the one-off execution.
      worker - The Worker on which to run the script.
      payload - The payload containing filename, optional arguments, and base64 script content.
      Returns:
      The response from the worker, typically indicating job acceptance.
      Throws:
      Exception - If staging fails or the run command fails.
    • executeRunArchive

      private String executeRunArchive(Job job, Worker worker, String payload) throws Exception
      Executes a job from an archived (ZIP) asset on a worker. This method resolves the archive pointer to get the entry point and base64 content of the archive, then sends it to the worker for execution.
      Parameters:
      job - The Job to execute from an archive.
      worker - The Worker on which to run the archive job.
      payload - The payload containing the archive pointer (e.g., "zip_name.zip/entry.py").
      Returns:
      The response from the worker.
      Throws:
      Exception - If resolving the archive pointer fails or the execution command fails.
    • executeServiceArchive

      private String executeServiceArchive(Job job, Worker worker, String payload) throws Exception
      Starts a long-running service from an archived (ZIP) asset on a worker. This method resolves the archive pointer, extracts the entry point and base64 content of the archive, and instructs the worker to start it as a detached service on a specified port.
      Parameters:
      job - The Job representing the archived service.
      worker - The Worker on which to start the service.
      payload - The payload containing the archive pointer, optional arguments, and target port.
      Returns:
      The response from the worker, typically indicating deployment success.
      Throws:
      Exception - If resolving the archive pointer fails or the service start command fails.
    • sendExecuteCommand

      private String sendExecuteCommand(Worker worker, byte opCode, String payload) throws Exception
      Sends an RPC command to a specific worker and handles the response. This is a utility method for communicating with workers.
      Parameters:
      worker - The Worker to send the command to.
      opCode - The operation code (TitanProtocol constant) for the command.
      payload - The payload string for the command.
      Returns:
      The response string from the worker.
      Throws:
      Exception - If the response indicates an error or communication fails.
    • stopRemoteService

      public String stopRemoteService(String serviceId)
      Stops a remote service identified by its service ID. It looks up the worker hosting the service and sends a STOP command to that worker. If successful, the service is removed from the live service map.
      Parameters:
      serviceId - The ID of the service to stop.
      Returns:
      A success or error message indicating the outcome of the stop operation.
    • shutdownWorkerNode

      public String shutdownWorkerNode(String targetHost, int targetPort)
      Initiates a graceful shutdown of a specific worker node. It first stops any services hosted by the target worker, then sends a KILL_WORKER command to the worker. Permanent workers cannot be shut down via this method.
      Parameters:
      targetHost - The hostname or IP address of the worker to shut down.
      targetPort - The port number of the worker to shut down.
      Returns:
      A success or error message indicating the outcome of the shutdown operation.
    • unlockChildren

      private void unlockChildren(String parentId)
      Unlocks child jobs that were dependent on a newly completed parent job. It iterates through jobs in the DAG waiting room, resolves the dependency for the given parent ID, and if all dependencies for a child job are met, it moves that child job to the active task queue.
      Parameters:
      parentId - The ID of the parent job that has just completed.
    • cancelChildren

      public void cancelChildren(String failedParentId)
      Recursively cancels child jobs whose parent job has failed. When a parent job fails, all its direct and indirect dependent jobs are marked as DEAD and moved to the dead-letter queue.
      Parameters:
      failedParentId - The ID of the parent job that failed.
    • logStream

      public void logStream(String jobId, String line)
      Streams a log line for a specific job. The log line is added to an in-memory buffer for real-time retrieval and also appended to a persistent log file on disk. The in-memory buffer maintains a maximum number of lines to prevent excessive memory usage.
      Parameters:
      jobId - The ID of the job to which the log line belongs.
      line - The log line to stream.
    • appendLogToDisk

      private void appendLogToDisk(String jobId, String line)
      Appends a log line to a job-specific log file on disk. Log files are stored in the 'titan_server_logs' directory.
      Parameters:
      jobId - The ID of the job.
      line - The log line to append.
    • getLogs

      public List<String> getLogs(String jobId)
      Retrieves a snapshot of the recent log lines for a given job from the in-memory buffer. This method returns a copy of the log list to prevent ConcurrentModificationExceptions.
      Parameters:
      jobId - The ID of the job whose logs are to be retrieved.
      Returns:
      A List of strings, each representing a log line for the specified job.
    • getSystemStats

      public String getSystemStats()
      Generates a human-readable string containing various system statistics. This includes the number of active workers, job queue sizes, and detailed status for each worker (load, capabilities, and hosted services).
      Returns:
      A formatted string with system statistics.
    • getSystemStatsJSON

      public String getSystemStatsJSON()
      Generates a JSON string containing various system statistics. This includes the number of active workers, job queue sizes, and detailed status for each worker (port, capabilities, load, active job, recent history, and hosted services).
      Returns:
      A JSON formatted string with system statistics.
    • getJobStatus

      public Job.Status getJobStatus(String id)
      Retrieves the current status of a job based on its ID.
      Parameters:
      id - The ID of the job.
      Returns:
      The Job.Status of the job, or Job.Status.PENDING if the job is not found in execution history.
    • stop

      public void stop()
      Shuts down the scheduler and all its associated executor services. This method gracefully stops the scheduler server, heartbeat executor, and dispatch executor.