Class RpcWorkerServer
Each worker server registers itself with a central scheduler, advertising its capabilities. It uses a thread pool to handle incoming client connections and a separate worker pool to manage the execution of tasks, ensuring that the server remains responsive while processing jobs.
-
Field Summary
FieldsModifier and TypeFieldDescriptionprivate final AtomicIntegerAn atomic counter tracking the number of jobs currently being processed by theworkerPool.private StringA string describing the capabilities of this worker, used by the scheduler to assign appropriate tasks.private booleanA flag indicating whether this worker is a permanent instance (true) or an ephemeral instance (false) that can be scaled down by the scheduler.private booleanA volatile flag indicating whether the worker server is currently running.private static final intThe maximum number of concurrent tasks that this worker can execute simultaneously in its worker pool.private intprivate StringThe hostname or IP address of the scheduler server to which this worker registers and sends callbacks.private intThe port number of the scheduler server to which this worker registers and sends callbacks.private Map<String,TaskHandler> A map storing variousTaskHandlerimplementations, keyed by their task type.private final ExecutorServiceprivate final ExecutorServiceA fixed-size thread pool dedicated to executing actual tasks (jobs). -
Constructor Summary
ConstructorsConstructorDescriptionRpcWorkerServer(int myPort, String schedulerHost, int schedulerPort, String capability, boolean isPermanent) Constructs a newRpcWorkerServerinstance. -
Method Summary
Modifier and TypeMethodDescriptionvoidInitializes and registers variousTaskHandlerimplementations with the worker server.private voidclientHandler(Socket socket) Handles communication with a single client socket (typically the scheduler).Retrieves the hostname of the scheduler this worker is connected to.intRetrieves the port number of the scheduler this worker is connected to.private voidhandleArchiveJob(DataOutputStream out, String payload) Handles the execution of a job that involves staging and running an archived payload.private voidhandleArchiveService(DataOutputStream out, String payload) Handles the deployment and startup of a service from an archived payload.private voidhandleAsyncExecution(DataOutputStream out, String payload) Handles the asynchronous execution of a task.private voidhandleExecution(DataOutputStream out, String payload, String forceTaskType) Handles the execution of a task, either synchronously or asynchronously depending on the worker's saturation.private voidhandleSyncExecution(DataOutputStream out, String payload, String forceTaskType) Handles the synchronous execution of a task.static voidThe main entry point for the RpcWorkerServer application.voidnotifyMasterOfServiceStop(String serviceId) Notifies the scheduler (master) that a specific service has stopped on this worker.private StringprocessCommand(String payload) Parses a command payload to determine the task type and data, then dispatches it to the appropriateTaskHandler.private StringprocessCommandExplicit(String taskType, String taskData) Processes a command by explicitly using a provided task type and task data.private voidRegisters this worker server with the central scheduler.private voidsendCallback(String jobId, String status, String result) Sends a job completion or failure callback to the scheduler.voidstart()Starts the worker server, binding it to the specified port and beginning to listen for incoming client connections.voidstop()Initiates a graceful shutdown of the worker server.voidstreamLogToMaster(String jobId, String line) Streams a single log line for a given job to the scheduler (master).
-
Field Details
-
port
private int port -
threadPool
-
isRunning
private volatile boolean isRunningA volatile flag indicating whether the worker server is currently running. Set tofalseto initiate a graceful shutdown. -
capability
A string describing the capabilities of this worker, used by the scheduler to assign appropriate tasks. Examples include "GENERAL", "GPU", "PDF_CONVERT", etc. -
isPermanent
private boolean isPermanentA flag indicating whether this worker is a permanent instance (true) or an ephemeral instance (false) that can be scaled down by the scheduler. -
schedulerPort
private int schedulerPortThe port number of the scheduler server to which this worker registers and sends callbacks. -
schedulerHost
The hostname or IP address of the scheduler server to which this worker registers and sends callbacks. -
taskHanlderMap
A map storing variousTaskHandlerimplementations, keyed by their task type. This allows the worker to dynamically dispatch tasks based on the received command. -
MAX_THREADS
private static final int MAX_THREADSThe maximum number of concurrent tasks that this worker can execute simultaneously in its worker pool.- See Also:
-
workerPool
A fixed-size thread pool dedicated to executing actual tasks (jobs). This limits the concurrent workload on the worker. -
activeJobs
An atomic counter tracking the number of jobs currently being processed by theworkerPool. Used to manage worker saturation.
-
-
Constructor Details
-
RpcWorkerServer
public RpcWorkerServer(int myPort, String schedulerHost, int schedulerPort, String capability, boolean isPermanent) Constructs a newRpcWorkerServerinstance.- Parameters:
myPort- The port number on which this worker server will listen.schedulerHost- The hostname or IP address of the scheduler.schedulerPort- The port number of the scheduler.capability- A string describing the capabilities of this worker (e.g., "GENERAL", "PDF_CONVERT").isPermanent- A boolean indicating if this worker is a permanent instance (true) or ephemeral (false).
-
-
Method Details
-
getSchedulerHost
Retrieves the hostname of the scheduler this worker is connected to.- Returns:
- The scheduler's hostname.
-
getSchedulerPort
public int getSchedulerPort()Retrieves the port number of the scheduler this worker is connected to.- Returns:
- The scheduler's port number.
-
addTaskHandler
public void addTaskHandler()Initializes and registers variousTaskHandlerimplementations with the worker server. This method populates thetaskHanlderMapwith handlers for specific task types like PDF conversion, file staging, service management, and script execution. -
start
Starts the worker server, binding it to the specified port and beginning to listen for incoming client connections. It also registers itself with the scheduler upon startup.This method enters a continuous loop, accepting new client sockets and submitting them to the internal thread pool for handling. The loop continues as long as the
isRunningflag istrue.- Throws:
Exception- If an error occurs during server startup or socket operations.
-
registerWithScheduler
Registers this worker server with the central scheduler. It sends its port, capabilities, and permanence status to the scheduler and waits for a registration confirmation.- Throws:
Exception- If an I/O error occurs during communication with the scheduler or if registration fails.
-
clientHandler
Handles communication with a single client socket (typically the scheduler). This method continuously readsTitanProtocol.TitanPackets from the client, processes them based on their operation code, and sends appropriate responses.Supported operations include heartbeats, running archive jobs, starting archived services, staging files, starting/stopping services, running scripts, and worker shutdown commands.
- Parameters:
socket- The client socket connected to the worker.
-
handleExecution
Handles the execution of a task, either synchronously or asynchronously depending on the worker's saturation. If the worker pool is saturated, an error is returned. Otherwise, the task is submitted to the worker pool.This method is deprecated in favor of
handleAsyncExecution(DataOutputStream, String)andhandleSyncExecution(DataOutputStream, String, String)for clearer separation of concerns.- Parameters:
out- The output stream to send responses back to the client.payload- The task-specific data required for execution.forceTaskType- An optional parameter to explicitly specify the task type, overriding parsing from the payload.
-
handleArchiveJob
Handles the execution of a job that involves staging and running an archived payload. The payload is expected to contain a job ID, an entry file path, and a Base64 encoded ZIP archive.This method first stages the archive using
WorkspaceManager, resolves the entry file path, and then delegates tohandleAsyncExecution(DataOutputStream, String)to run the job asynchronously.- Parameters:
out- The output stream to send responses back to the client.payload- The payload containing job ID, entry file, and Base64 ZIP data (format: "JOB_ID|ENTRY_FILE|BASE64_ZIP").
-
handleArchiveService
Handles the deployment and startup of a service from an archived payload. The payload is expected to contain a service ID, an entry file path, a port, and a Base64 encoded ZIP archive.This method stages the archive, resolves the entry file, and then delegates to
handleSyncExecution(DataOutputStream, String, String)to start the service synchronously using the "START_SERVICE" handler.- Parameters:
out- The output stream to send responses back to the client.payload- The payload containing service ID, entry file, port, and Base64 ZIP data (format: "SERVICE_ID|ENTRY_FILE|PORT|BASE64_ZIP").
-
processCommandExplicit
Processes a command by explicitly using a provided task type and task data. This method looks up the appropriateTaskHandlerfrom thetaskHanlderMapand executes it.- Parameters:
taskType- The explicit type of the task to execute (e.g., "PDF_CONVERT", "START_SERVICE").taskData- The data payload for the task handler.- Returns:
- A string representing the result of the task execution, or an error message if the task type is unknown or execution fails.
-
notifyMasterOfServiceStop
Notifies the scheduler (master) that a specific service has stopped on this worker. This is typically called by aServiceHandlerwhen a managed service terminates.- Parameters:
serviceId- The unique identifier of the service that has stopped.
-
processCommand
Parses a command payload to determine the task type and data, then dispatches it to the appropriateTaskHandler. The payload is expected to be in the format "TASK_TYPE|ARGS...".This method also includes logic for simulated test commands like "TEST" and "FAIL", and a sleep command for debugging.
- Parameters:
payload- The full command string received from the client.- Returns:
- A string representing the result of the task execution, or an error message if the format is invalid, task type is unknown, or execution fails.
-
handleAsyncExecution
Handles the asynchronous execution of a task. If the worker pool is saturated, an error is immediately sent back. Otherwise, the task is submitted to theworkerPooland an "JOB_ACCEPTED" acknowledgment is sent.The actual task execution happens in a separate thread. Upon completion (success or failure), a callback is sent to the scheduler using
sendCallback(String, String, String).- Parameters:
out- The output stream to send the immediate acknowledgment back to the client.payload- The task-specific data, potentially including a job ID (format: "JOB_ID|TASK_DATA" or just "TASK_DATA").
-
sendCallback
Sends a job completion or failure callback to the scheduler. This method establishes a new socket connection to the scheduler to report the job's final status and result.Callbacks are not sent for "UNKNOWN" or "TEST-JOB" job IDs.
- Parameters:
jobId- The unique identifier of the job.status- The final status of the job (e.g., "COMPLETED", "FAILED").result- A string containing the output or error message from the job execution.
-
handleSyncExecution
Handles the synchronous execution of a task. The task is processed immediately usingprocessCommandExplicit(String, String), and the result (ACK or ERROR) is sent back to the client before the method returns.- Parameters:
out- The output stream to send the response back to the client.payload- The task-specific data for execution.forceTaskType- The explicit type of the task to execute.
-
streamLogToMaster
Streams a single log line for a given job to the scheduler (master). This method establishes a new socket connection to send the log data.Errors during log streaming are silently ignored to prevent disrupting the main worker flow.
- Parameters:
jobId- The unique identifier of the job to which the log line belongs.line- The log message to stream.
-
stop
public void stop()Initiates a graceful shutdown of the worker server. It sets theisRunningflag tofalseto stop the main server loop and then shuts down the internal thread pool, preventing new tasks from being accepted. -
main
The main entry point for the RpcWorkerServer application. This method parses command-line arguments to configure the worker's port, scheduler host/port, capabilities, and permanence status.It then initializes and starts an
RpcWorkerServerinstance.- Parameters:
args- Command-line arguments: [myPort] [schedulerHost] [schedulerPort] [capability] [isPermanent].- Throws:
Exception- If an error occurs during server initialization or startup.
-