Package titan.network

Class RpcWorkerServer

java.lang.Object
titan.network.RpcWorkerServer

public class RpcWorkerServer extends Object
Represents a worker server in the Titan distributed system. This server listens for incoming RPC requests from a scheduler, executes various tasks (e.g., file staging, script execution, service management, PDF conversion), and reports job status and logs back to the scheduler.

Each worker server registers itself with a central scheduler, advertising its capabilities. It uses a thread pool to handle incoming client connections and a separate worker pool to manage the execution of tasks, ensuring that the server remains responsive while processing jobs.

  • Field Details

    • port

      private int port
    • threadPool

      private final ExecutorService threadPool
    • isRunning

      private volatile boolean isRunning
      A volatile flag indicating whether the worker server is currently running. Set to false to initiate a graceful shutdown.
    • capability

      private String capability
      A string describing the capabilities of this worker, used by the scheduler to assign appropriate tasks. Examples include "GENERAL", "GPU", "PDF_CONVERT", etc.
    • isPermanent

      private boolean isPermanent
      A flag indicating whether this worker is a permanent instance (true) or an ephemeral instance (false) that can be scaled down by the scheduler.
    • schedulerPort

      private int schedulerPort
      The port number of the scheduler server to which this worker registers and sends callbacks.
    • schedulerHost

      private String schedulerHost
      The hostname or IP address of the scheduler server to which this worker registers and sends callbacks.
    • taskHanlderMap

      private Map<String,TaskHandler> taskHanlderMap
      A map storing various TaskHandler implementations, keyed by their task type. This allows the worker to dynamically dispatch tasks based on the received command.
    • MAX_THREADS

      private static final int MAX_THREADS
      The maximum number of concurrent tasks that this worker can execute simultaneously in its worker pool.
      See Also:
    • workerPool

      private final ExecutorService workerPool
      A fixed-size thread pool dedicated to executing actual tasks (jobs). This limits the concurrent workload on the worker.
    • activeJobs

      private final AtomicInteger activeJobs
      An atomic counter tracking the number of jobs currently being processed by the workerPool. Used to manage worker saturation.
  • Constructor Details

    • RpcWorkerServer

      public RpcWorkerServer(int myPort, String schedulerHost, int schedulerPort, String capability, boolean isPermanent)
      Constructs a new RpcWorkerServer instance.
      Parameters:
      myPort - The port number on which this worker server will listen.
      schedulerHost - The hostname or IP address of the scheduler.
      schedulerPort - The port number of the scheduler.
      capability - A string describing the capabilities of this worker (e.g., "GENERAL", "PDF_CONVERT").
      isPermanent - A boolean indicating if this worker is a permanent instance (true) or ephemeral (false).
  • Method Details

    • getSchedulerHost

      public String getSchedulerHost()
      Retrieves the hostname of the scheduler this worker is connected to.
      Returns:
      The scheduler's hostname.
    • getSchedulerPort

      public int getSchedulerPort()
      Retrieves the port number of the scheduler this worker is connected to.
      Returns:
      The scheduler's port number.
    • addTaskHandler

      public void addTaskHandler()
      Initializes and registers various TaskHandler implementations with the worker server. This method populates the taskHanlderMap with handlers for specific task types like PDF conversion, file staging, service management, and script execution.
    • start

      public void start() throws Exception
      Starts the worker server, binding it to the specified port and beginning to listen for incoming client connections. It also registers itself with the scheduler upon startup.

      This method enters a continuous loop, accepting new client sockets and submitting them to the internal thread pool for handling. The loop continues as long as the isRunning flag is true.

      Throws:
      Exception - If an error occurs during server startup or socket operations.
    • registerWithScheduler

      private void registerWithScheduler() throws Exception
      Registers this worker server with the central scheduler. It sends its port, capabilities, and permanence status to the scheduler and waits for a registration confirmation.
      Throws:
      Exception - If an I/O error occurs during communication with the scheduler or if registration fails.
    • clientHandler

      private void clientHandler(Socket socket)
      Handles communication with a single client socket (typically the scheduler). This method continuously reads TitanProtocol.TitanPackets from the client, processes them based on their operation code, and sends appropriate responses.

      Supported operations include heartbeats, running archive jobs, starting archived services, staging files, starting/stopping services, running scripts, and worker shutdown commands.

      Parameters:
      socket - The client socket connected to the worker.
    • handleExecution

      private void handleExecution(DataOutputStream out, String payload, String forceTaskType)
      Handles the execution of a task, either synchronously or asynchronously depending on the worker's saturation. If the worker pool is saturated, an error is returned. Otherwise, the task is submitted to the worker pool.

      This method is deprecated in favor of handleAsyncExecution(DataOutputStream, String) and handleSyncExecution(DataOutputStream, String, String) for clearer separation of concerns.

      Parameters:
      out - The output stream to send responses back to the client.
      payload - The task-specific data required for execution.
      forceTaskType - An optional parameter to explicitly specify the task type, overriding parsing from the payload.
    • handleArchiveJob

      private void handleArchiveJob(DataOutputStream out, String payload)
      Handles the execution of a job that involves staging and running an archived payload. The payload is expected to contain a job ID, an entry file path, and a Base64 encoded ZIP archive.

      This method first stages the archive using WorkspaceManager, resolves the entry file path, and then delegates to handleAsyncExecution(DataOutputStream, String) to run the job asynchronously.

      Parameters:
      out - The output stream to send responses back to the client.
      payload - The payload containing job ID, entry file, and Base64 ZIP data (format: "JOB_ID|ENTRY_FILE|BASE64_ZIP").
    • handleArchiveService

      private void handleArchiveService(DataOutputStream out, String payload)
      Handles the deployment and startup of a service from an archived payload. The payload is expected to contain a service ID, an entry file path, a port, and a Base64 encoded ZIP archive.

      This method stages the archive, resolves the entry file, and then delegates to handleSyncExecution(DataOutputStream, String, String) to start the service synchronously using the "START_SERVICE" handler.

      Parameters:
      out - The output stream to send responses back to the client.
      payload - The payload containing service ID, entry file, port, and Base64 ZIP data (format: "SERVICE_ID|ENTRY_FILE|PORT|BASE64_ZIP").
    • processCommandExplicit

      private String processCommandExplicit(String taskType, String taskData)
      Processes a command by explicitly using a provided task type and task data. This method looks up the appropriate TaskHandler from the taskHanlderMap and executes it.
      Parameters:
      taskType - The explicit type of the task to execute (e.g., "PDF_CONVERT", "START_SERVICE").
      taskData - The data payload for the task handler.
      Returns:
      A string representing the result of the task execution, or an error message if the task type is unknown or execution fails.
    • notifyMasterOfServiceStop

      public void notifyMasterOfServiceStop(String serviceId)
      Notifies the scheduler (master) that a specific service has stopped on this worker. This is typically called by a ServiceHandler when a managed service terminates.
      Parameters:
      serviceId - The unique identifier of the service that has stopped.
    • processCommand

      private String processCommand(String payload)
      Parses a command payload to determine the task type and data, then dispatches it to the appropriate TaskHandler. The payload is expected to be in the format "TASK_TYPE|ARGS...".

      This method also includes logic for simulated test commands like "TEST" and "FAIL", and a sleep command for debugging.

      Parameters:
      payload - The full command string received from the client.
      Returns:
      A string representing the result of the task execution, or an error message if the format is invalid, task type is unknown, or execution fails.
    • handleAsyncExecution

      private void handleAsyncExecution(DataOutputStream out, String payload)
      Handles the asynchronous execution of a task. If the worker pool is saturated, an error is immediately sent back. Otherwise, the task is submitted to the workerPool and an "JOB_ACCEPTED" acknowledgment is sent.

      The actual task execution happens in a separate thread. Upon completion (success or failure), a callback is sent to the scheduler using sendCallback(String, String, String).

      Parameters:
      out - The output stream to send the immediate acknowledgment back to the client.
      payload - The task-specific data, potentially including a job ID (format: "JOB_ID|TASK_DATA" or just "TASK_DATA").
    • sendCallback

      private void sendCallback(String jobId, String status, String result)
      Sends a job completion or failure callback to the scheduler. This method establishes a new socket connection to the scheduler to report the job's final status and result.

      Callbacks are not sent for "UNKNOWN" or "TEST-JOB" job IDs.

      Parameters:
      jobId - The unique identifier of the job.
      status - The final status of the job (e.g., "COMPLETED", "FAILED").
      result - A string containing the output or error message from the job execution.
    • handleSyncExecution

      private void handleSyncExecution(DataOutputStream out, String payload, String forceTaskType)
      Handles the synchronous execution of a task. The task is processed immediately using processCommandExplicit(String, String), and the result (ACK or ERROR) is sent back to the client before the method returns.
      Parameters:
      out - The output stream to send the response back to the client.
      payload - The task-specific data for execution.
      forceTaskType - The explicit type of the task to execute.
    • streamLogToMaster

      public void streamLogToMaster(String jobId, String line)
      Streams a single log line for a given job to the scheduler (master). This method establishes a new socket connection to send the log data.

      Errors during log streaming are silently ignored to prevent disrupting the main worker flow.

      Parameters:
      jobId - The unique identifier of the job to which the log line belongs.
      line - The log message to stream.
    • stop

      public void stop()
      Initiates a graceful shutdown of the worker server. It sets the isRunning flag to false to stop the main server loop and then shuts down the internal thread pool, preventing new tasks from being accepted.
    • main

      public static void main(String[] args) throws Exception
      The main entry point for the RpcWorkerServer application. This method parses command-line arguments to configure the worker's port, scheduler host/port, capabilities, and permanence status.

      It then initializes and starts an RpcWorkerServer instance.

      Parameters:
      args - Command-line arguments: [myPort] [schedulerHost] [schedulerPort] [capability] [isPermanent].
      Throws:
      Exception - If an error occurs during server initialization or startup.