Package titan.tasks

Class ServiceHandler

java.lang.Object
titan.tasks.ServiceHandler
All Implemented Interfaces:
TaskHandler

public class ServiceHandler extends Object implements TaskHandler
ServiceHandler is an implementation of TaskHandler responsible for managing the lifecycle of services (processes) on a worker node. It supports starting and stopping various types of services, including Java JARs, Python scripts, and other executables.

This handler maintains a registry of currently running services and provides mechanisms for launching them in a detached manner, capturing their output, and streaming logs back to a master server. It also handles the cleanup of resources upon service termination.

  • Field Details

    • runningServices

      private static final Map<String,Process> runningServices
      A static, thread-safe map that stores references to currently running Process objects, keyed by their unique service IDs. This allows for tracking and managing active services.
    • WORKSPACE_DIR

      private static final String WORKSPACE_DIR
      The base directory where service files are expected to be located and where service-related logs and temporary files are stored.
      See Also:
    • operation

      private final String operation
      The specific operation this handler instance is configured to perform (e.g., "START", "STOP"). This field determines which internal method (startProcess or stopProcess) will be invoked by the execute(String) method.
    • parentServer

      private final RpcWorkerServer parentServer
      A reference to the parent RpcWorkerServer instance. This is used to facilitate communication back to the master server, such as streaming logs or notifying about service status changes.
  • Constructor Details

    • ServiceHandler

      public ServiceHandler(String op, RpcWorkerServer parentServer)
      Constructs a new ServiceHandler with a specified operation and a reference to its parent RPC server.
      Parameters:
      op - The operation this handler will perform (e.g., "START", "STOP").
      parentServer - The RpcWorkerServer instance that created this handler, used for callbacks.
  • Method Details

    • execute

      public String execute(String payload)
      Executes a service management task based on the handler's configured operation and the provided payload. The payload is expected to be a pipe-separated string containing service details.

      If the operation is "START", it parses the payload for filename, service ID, and port, then attempts to start a new process. If the operation is anything else (implicitly "STOP"), it parses the payload for a service ID and attempts to stop the corresponding process.

      Specified by:
      execute in interface TaskHandler
      Parameters:
      payload - A string containing service parameters, typically in the format "filename|serviceId|port" for START, or "serviceId" for STOP.
      Returns:
      A string indicating the result of the operation, such as "DEPLOYED_SUCCESS", "STOPPED", or an "ERROR" message.
    • startProcess

      private String startProcess(String fileName, String serviceId, String port)
      Initiates the launch of a new service process.

      This method handles different types of service files:

      • If fileName is "Worker.jar", it launches it as a Java JAR using java -jar with OS-specific detachment.
      • If fileName ends with ".py", it launches it using the "python" interpreter.
      • For any other file, it attempts to execute it directly.

      The process is launched in a detached manner, and its output is redirected to log files.

      * @param fileName The name or path of the script/executable file to run.
      Parameters:
      serviceId - A unique identifier for the service being started.
      port - The port number to be used by the service, primarily for Java JAR workers.
      Returns:
      A status string indicating success or failure of the launch operation.
    • launchDetachedProcess

      private String launchDetachedProcess(String serviceId, File executionDir, String... command)
      Launches a generic process in a detached mode, managing its lifecycle and log streaming.

      This method performs the following steps:

      1. Checks if a service with the given serviceId is already running.
      2. Configures a ProcessBuilder with the provided command and redirects standard error/output to a local log file.
      3. Sets the working directory for the process.
      4. Starts the process.
      5. Spawns a new thread to continuously read the process's output, write it to a local log file, and batch-stream it to the master server.
      6. Registers the process in the runningServices map and with the ProcessRegistry.
      7. Registers an onExit hook to clean up resources, remove the service from registries, and notify the master when the process terminates.
      * @param serviceId A unique identifier for the service.
      Parameters:
      executionDir - The directory in which the command should be executed. If null or non-existent, WORKSPACE_DIR is used.
      command - The command and its arguments to execute.
      Returns:
      A status string indicating success or failure, including the service ID and PID if successful.
    • stopProcess

      private String stopProcess(String serviceId)
      Attempts to stop a running service identified by its serviceId.

      It retrieves the Process object associated with the serviceId from runningServices, then attempts to destroy the process. If the process is found and terminated, it is removed from the map.

      Parameters:
      serviceId - The unique identifier of the service to stop.
      Returns:
      A status string indicating whether the service was stopped or if it was unknown.