Developer Guide — Building on Titan
This guide is for developers who want to understand the codebase, add features, or build new capabilities (like SAGA, new execution drivers, or custom schedulers) on top of Titan. It covers the code layout, execution model, extension points, and how the pieces fit together.
If you're looking for setup instructions and PR process, see Contributing.
Codebase Map
titan-orchestrator/
├── src/main/java/titan/ ← Core engine (Java 17+)
│ ├── scheduler/
│ │ ├── Scheduler.java ← The brain: DAG resolution, dispatch loop,
│ │ │ state persistence, auto-scaling, HITL
│ │ ├── Job.java ← Job model: state machine, metadata, DAG edges
│ │ ├── Worker.java ← Worker model: load, capabilities, heartbeat
│ │ ├── WorkerRegistry.java ← Tracks workers, capability matching
│ │ ├── TaskExecution.java ← Execution record (worker assignment, timing)
│ │ └── ScheduledJob.java ← DelayQueue wrapper for timed execution
│ │
│ ├── network/
│ │ ├── TitanProtocol.java ← Wire format: 8-byte header + payload
│ │ ├── SchedulerServer.java ← Master TCP server: routes opcodes to Scheduler
│ │ ├── RpcWorkerServer.java ← Worker TCP server: receives jobs, runs them
│ │ ├── RpcClient.java ← Client-side TCP connector
│ │ └── LogBatcher.java ← Buffers stdout lines before streaming to Master
│ │
│ ├── tasks/
│ │ ├── ScriptExecutorHandler ← Spawns job scripts as child processes
│ │ ├── ServiceHandler.java ← Deploys long-running services, auto-restart
│ │ ├── ProcessRegistry.java ← Tracks live child processes for cleanup
│ │ ├── FileHandler.java ← File transfer between Master and workers
│ │ └── TaskHandler.java ← Handler interface (implement this for new types)
│ │
│ ├── storage/
│ │ └── TitanJRedisAdapter.java ← Redis-protocol client to TitanStore
│ │
│ ├── filesys/
│ │ ├── AssetManager.java ← Manages perm_files/ (persistent scripts)
│ │ ├── WorkspaceManager.java ← Per-job workspace isolation
│ │ └── ZipUtils.java ← File transfer compression
│ │
│ ├── Main.java ← Entry point: SCHEDULER or WORKER mode
│ ├── TitanCLI.java ← Interactive CLI (status, logs, approve, store)
│ ├── TitanMaster.java ← Master bootstrap
│ └── TitanWorker.java ← Worker bootstrap
│
├── titan_sdk/ ← Python SDK
│ ├── titan_sdk.py ← TitanClient, TitanJob, HITL injection, KV ops
│ ├── titan_cli.py ← Python CLI entry point
│ ├── titan_mcp_server.py ← MCP server for Claude Desktop / Cursor
│ └── titan_yaml_parser.py ← YAML pipeline parser
│
├── perm_files/ ← Persistent file storage on Master
│ ├── server_dashboard.py ← Flask dashboard (routes + API)
│ ├── templates/ ← Dashboard HTML templates
│ ├── TitanStore.jar ← AOF-backed KV store (separate repo)
│ ├── Worker.jar ← Worker binary (copy of engine JAR)
│ └── hitl_gate.py ← HITL gate polling script
│
├── titan_test_suite/ ← All examples and tests
│ └── examples/
│ ├── agents_examples/ ← Multi-agent pipelines (research, code gen, etc.)
│ ├── dynamic_dag_custom/ ← Dynamic DAG, ETL, GPU routing examples
│ ├── hitl_example/ ← HITL demo
│ └── titan_ci/ ← Titan running CI on itself
│
└── titan-docs/ ← MkDocs documentation site
Execution Model
Understanding these four loops is enough to work on any part of Titan.
1. Dispatch loop (Scheduler.java)
The Master's main loop. Blocks on taskQueue.take(), finds a capable worker, sends the job.
All jobs flow through this single loop regardless of how they were submitted (SDK, CLI, YAML, MCP, Constructor). The queue is a PriorityBlockingQueue — higher priority jobs are dispatched first.
2. DAG resolution (Scheduler.java)
When a job with dependencies is submitted, it enters dagWaitingRoom instead of taskQueue. When a parent completes, unlockChildren() scans the waiting room and moves children whose dependencies are all met into taskQueue.
submitJob() → has parents? → dagWaitingRoom
completeJob() → unlockChildren() → all parents done? → taskQueue
3. Job execution (RpcWorkerServer.java → ScriptExecutorHandler.java)
The worker receives a job via TCP, spawns a child process (ProcessBuilder), captures stdout via LogBatcher, and sends OP_JOB_COMPLETE back to the Master when the process exits.
receive OP_RUN → ScriptExecutorHandler.execute()
→ ProcessBuilder("python", script.py).start()
→ stream stdout to LogBatcher → OP_LOG_BATCH to Master
→ process exits → OP_JOB_COMPLETE callback to Master
4. State persistence (TitanJRedisAdapter.java → TitanStore)
Every state transition is written to TitanStore via the RESP protocol. The adapter's execute() is synchronized — all writes are serialized. TitanStore appends every write to an AOF file. On Master restart, recoverState() reads back all job states and re-queues incomplete jobs.
safeRedisSet("job:X:status", "RUNNING") → TitanJRedisAdapter.execute()
→ RESP SET command → TitanStore → AOF append
Threading Model
| Component | Pool type | Size | What it handles |
|---|---|---|---|
| SchedulerServer | CachedThreadPool |
Unbounded | One thread per TCP connection (SDK, CLI, worker callbacks) |
| Scheduler dispatch | Single thread | 1 | The runDispatchLoop() — all dispatch decisions are single-threaded |
| Worker connection handler | CachedThreadPool |
Unbounded | Incoming TCP from Master |
| Worker job execution | FixedThreadPool |
4 (MAX_THREADS) |
Concurrent job slots — rejects when full |
| TitanJRedisAdapter | synchronized |
Effectively 1 | All TitanStore access serialized through one lock |
Key implication: the dispatch loop is single-threaded, so it processes one job at a time from the queue. This is simple and safe but means dispatch throughput is bounded by the time to find a worker + send the job.
Wire Protocol (TITAN_PROTO)
Every message between Master, workers, and clients uses the same format:
[ Version (1 byte) | OpCode (1 byte) | Flags (1 byte) | Spare (1 byte) | Length (4 bytes) ] + [ Payload ]
8-byte fixed header + UTF-8 payload. Defined in TitanProtocol.java.
Key opcodes:
| OpCode | Hex | Direction | Purpose |
|---|---|---|---|
OP_SUBMIT_DAG |
0x04 | Client → Master | Submit a multi-job DAG |
OP_RUN |
0x06 | Master → Worker | Execute a script |
OP_DEPLOY |
0x05 | Master → Worker | Deploy a long-running service |
OP_JOB_COMPLETE |
0x12 | Worker → Master | Job finished (status + result) |
OP_GET_JOB_STATUS |
0x55 | Client → Master | Query job state |
OP_GET_LOGS |
0x16 | Client → Master | Fetch job stdout/stderr |
OP_KV_SET / OP_KV_GET |
0x60/0x61 | Client → Master | TitanStore read/write |
OP_CANCEL_JOB |
0x56 | Client → Master | Cancel job + cascade children |
OP_LOG_BATCH |
0x17 | Worker → Master | Streamed stdout lines |
Full list in TitanProtocol.java. To add a new opcode: define it there, handle it in SchedulerServer.clientHandler(), and add the SDK method in titan_sdk.py.
Current Scheduling Model
The dispatch logic in Scheduler.selectBestWorker() is intentionally basic in v1:
// Simplified view of current logic
for (Worker worker : availableWorkers) {
if (worker.isSaturated()) continue;
if (worker.getCurrentLoad() < minLoad) {
minLoad = worker.getCurrentLoad();
bestWorker = worker;
}
}
It picks the worker with the lowest currentLoad counter. If a job has preferredWorkerId set (affinity), it tries that worker first.
What the current scheduler does not consider:
| Signal | Status |
|---|---|
| Worker queue depth (jobs waiting locally) | Not tracked |
| Completion velocity (fast vs slow workers) | Not tracked |
| Network latency to worker | Not measured |
| Resource utilization (CPU/memory) | Not monitored |
| Tie-breaking (two workers at same load) | First in iteration order wins |
This is a known gap. Resource-aware scheduling (factoring in actual CPU/memory usage and completion rates) is planned for a future update. If you're building on the dispatch logic, be aware that currentLoad is an approximation — it's incremented on dispatch and decremented on callback, not based on real worker utilization.
If you want to improve the scheduler, selectBestWorker() in Scheduler.java is the single method to change. The Worker model already has fields for currentLoad, capabilities, idleDuration, and port — add more fields if your policy needs them (e.g., avgCompletionTime, cpuUsage).
Extension Points
Adding a new job type
Implement TaskHandler:
public class MyHandler implements TaskHandler {
@Override
public String execute(String payload) {
// payload format: "arg1|arg2|..."
// do work
return "SUCCESS";
}
}
Register it in RpcWorkerServer.addTaskHandler():
Submit jobs with that type by setting the payload prefix to MY_TYPE|....
Adding a new opcode
- Define in
TitanProtocol.java:public static final byte OP_MY_THING = 0x70; - Handle in
SchedulerServer.clientHandler(): - Add SDK method in
titan_sdk.py: - Optionally add CLI command in
TitanCLI.java
Adding a new dashboard page
- Create
perm_files/templates/my_page.html - Add route in
server_dashboard.py:
Adding a new scheduling policy
The dispatch decision lives in Scheduler.selectBestWorker(). Currently it does least-connection with affinity override. To add a new policy (round-robin, weighted, response-time-aware), modify this method. The Worker model tracks currentLoad, capabilities, idleDuration, and port — add more fields if your policy needs them.
SDK Internals
The SDK Reference covers the public API. This section covers what happens under the hood.
Connection model. TitanClient._send_request(opcode, payload) opens a fresh TCP socket to the Master for every call, sends the 8-byte TITAN_PROTO header + payload, reads the response, and closes the socket. There's no persistent connection or connection pool. This is simple but means high-frequency SDK calls (e.g., polling get_job_status in a tight loop) pay the TCP handshake cost every time.
_send_request(opcode, payload)
└─ Socket(TITAN_HOST, TITAN_PORT) ← new connection
└─ TitanProtocol.send(out, opcode, payload)
└─ TitanProtocol.read(in) → response
└─ socket.close() ← connection closed
HITL injection. When you set hitl_message on a TitanJob, the SDK rewires the DAG before submission. _inject_hitl_gates(jobs) does this:
- Finds every job with
hitl_messageset - Creates a gate job (
hitl-gate-{job_id}) that runshitl_gate.pywith the gate ID and message as args - Rewires all children of the original job to depend on the gate instead
- Clears any stale decision in TitanStore (
store_put("titan:hitl:status:{gate_id}", "CLEARED"))
The engine never knows about HITL — it just sees a regular DAG with an extra script job in the middle.
Manifest file. On every submit_dag() call, the SDK writes .titan_dag_manifest.json in the project root. This file maps DAG names to their job IDs and metadata. The dashboard reads this file to discover DAGs and group them in the sidebar. Without the manifest, DAGs are discovered from worker stats only (slower, less metadata).
YAML submission. submit_yaml(path) parses a YAML file into TitanJob objects using titan_yaml_parser.py, then calls submit_dag() with the result. The YAML parser handles depends_on fields, requirement tags, priority, and HITL message fields.
State Machine — Job Lifecycle
PENDING → RUNNING → COMPLETED
→ FAILED → (retry < 3) → PENDING (re-queued)
→ (retry >= 3) → DEAD (dead letter queue)
PENDING → CANCELLED (via cancel command, cascades to children)
RUNNING → CANCELLED (kill signal sent to worker)
States are stored in TitanStore as job:{id}:status. The Job.java class has an enum Status with these values.
Code Flow by Scenario
Each trace below shows the exact path through the codebase, file by file, method by method. Follow these when debugging or extending a specific flow.
Scenario 1: Single script job (SDK submit)
The simplest path. User submits one job, it runs on a worker, completes.
sequenceDiagram
participant U as titan_sdk.py
participant SS as SchedulerServer
participant S as Scheduler
participant TS as TitanStore
participant W as RpcWorkerServer
participant SE as ScriptExecutorHandler
participant P as Python Process
U->>SS: OP_SUBMIT_DAG (TCP 9090)
SS->>S: submitJob(job)
S->>TS: SET job:X:status PENDING
S->>S: taskQueue.add(job)
SS-->>U: DAG_ACCEPTED
S->>S: runDispatchLoop() → take()
S->>TS: SET job:X:status RUNNING
S->>W: OP_RUN (TCP 8080)
W->>SE: execute(payload)
SE->>P: ProcessBuilder.start()
P-->>SE: stdout lines
SE-->>SS: OP_LOG_BATCH
P-->>SE: exit 0
SE-->>W: SUCCESS
W->>SS: OP_JOB_COMPLETE
SS->>S: handleJobCallback()
S->>S: completeJob()
S->>TS: SET job:X:status COMPLETED
S->>S: unlockChildren()
titan_sdk.py TitanClient.submit_dag(name, [job])
└─ _inject_hitl_gates(jobs) ← checks for HITL, none here
└─ _build_dag_payload(name, jobs) ← serializes to "job_id|file|req|parents"
└─ _send_request(OP_SUBMIT_DAG, payload)
└─ _write_dag_manifest() ← writes .titan_dag_manifest.json for dashboard
│
▼ TCP port 9090
SchedulerServer.java clientHandler(socket)
└─ TitanProtocol.read(in) ← parse 8-byte header + payload
└─ case OP_SUBMIT_DAG:
└─ scheduler.submitDAG(payload) ← parses pipe-delimited jobs
│
▼
Scheduler.java submitJob(job)
└─ safeRedisSet("job:X:status", "PENDING")
└─ safeRedisSet("job:X:payload", ...)
└─ no parents → taskQueue.add(job)
│
▼
runDispatchLoop() ← blocked on taskQueue.take()
└─ taskQueue.take() → got job
└─ selectBestWorker(job, workers)
└─ WorkerRegistry.getWorkersByCapability("GENERAL")
└─ iterate, find lowest currentLoad
└─ safeRedisSet("job:X:status", "RUNNING")
└─ executeJobRequest(job, worker) ← TCP to worker
│
▼ TCP port 8080
RpcWorkerServer.java processCommand(payload)
└─ taskType = "RUN_SCRIPT"
└─ ScriptExecutorHandler.execute(payload)
│
▼
ScriptExecutorHandler ProcessBuilder("python", script.py).start()
└─ thread: read stdout line by line
└─ LogBatcher.addLog(line)
└─ LogBatcher flushes → OP_LOG_BATCH → SchedulerServer
└─ process.waitFor() → exit code 0
└─ return "SUCCESS"
│
▼
RpcWorkerServer.java sendCallback(OP_JOB_COMPLETE, "job-X|COMPLETED|result")
│
▼ TCP port 9090
SchedulerServer.java case OP_JOB_COMPLETE:
└─ scheduler.handleJobCallback(payload)
│
▼
Scheduler.java handleJobCallback(payload)
└─ parse jobId, status, result
└─ completeJob(job, result, record)
└─ safeRedisSet("job:X:status", "COMPLETED")
└─ safeRedisSrem("system:active_jobs", jobId)
└─ unlockChildren(jobId) ← no children, nothing to unlock
Scenario 2: DAG with dependencies (A → B → C)
Three jobs in a chain. B waits for A, C waits for B.
titan_sdk.py submit_dag("pipeline", [A, B(parents=[A]), C(parents=[B])])
│
▼
Scheduler.java submitJob(A) → no parents → taskQueue.add(A)
submitJob(B) → has parent A → dagWaitingRoom.put(B)
submitJob(C) → has parent B → dagWaitingRoom.put(C)
│
▼
runDispatchLoop() picks A → dispatches to worker
... A executes and completes ...
│
▼
completeJob(A)
└─ unlockChildren("A")
└─ scan dagWaitingRoom
└─ B has parent A → A is done → all parents met
└─ taskQueue.add(B) ← B is now dispatchable
└─ C has parent B → B not done → stays in waiting room
│
▼
runDispatchLoop() picks B → dispatches to worker
... B executes and completes ...
│
▼
completeJob(B)
└─ unlockChildren("B")
└─ C has parent B → B is done → all parents met
└─ taskQueue.add(C)
│
▼
runDispatchLoop() picks C → dispatches → completes
Scenario 3: Fan-out with fan-in (A → [B,C,D] → E)
Parallel execution with a collector that waits for all branches.
sequenceDiagram
participant S as Scheduler
participant W1 as Worker 1
participant W2 as Worker 2
Note over S: A completes → unlockChildren()
S->>S: taskQueue.add(B), add(C), add(D)
par Parallel dispatch
S->>W1: dispatch B
S->>W1: dispatch C
S->>W2: dispatch D
end
W1-->>S: B COMPLETED → unlockChildren(B)
Note over S: E needs [B,C,D] — B done, C,D pending → wait
W1-->>S: C COMPLETED → unlockChildren(C)
Note over S: E needs [B,C,D] — B,C done, D pending → wait
W2-->>S: D COMPLETED → unlockChildren(D)
Note over S: E needs [B,C,D] — all done → taskQueue.add(E)
S->>W1: dispatch E
W1-->>S: E COMPLETED
Scheduler.java submitJob(A) → taskQueue
submitJob(B) → parent A → dagWaitingRoom
submitJob(C) → parent A → dagWaitingRoom
submitJob(D) → parent A → dagWaitingRoom
submitJob(E) → parents [B,C,D] → dagWaitingRoom
│
▼
A completes → unlockChildren("A")
└─ B, C, D all have parent A met → taskQueue.add(B), add(C), add(D)
└─ E has parents [B,C,D] → none done yet → stays
│
▼
runDispatchLoop() picks B → worker 1
runDispatchLoop() picks C → worker 1 (or worker 2 if available)
runDispatchLoop() picks D → worker 2 (least loaded)
← B, C, D executing in parallel across workers →
│
▼
B completes → unlockChildren("B")
└─ E: parent B done, but C and D still pending → stays
C completes → unlockChildren("C")
└─ E: parents B,C done, D still pending → stays
D completes → unlockChildren("D")
└─ E: parents B,C,D all done → taskQueue.add(E)
│
▼
runDispatchLoop() picks E → dispatches → completes
Scenario 4: Service deployment (long-running)
A service is deployed and stays alive. Different path from script jobs.
sequenceDiagram
participant U as CLI / SDK
participant SS as SchedulerServer
participant S as Scheduler
participant W as RpcWorkerServer
participant SH as ServiceHandler
participant P as Service Process
U->>SS: OP_DEPLOY
SS->>S: submitJob(DEPLOY_PAYLOAD)
S->>S: dispatch to worker
S->>W: OP_RUN (TCP 8080)
W->>SH: execute(payload)
SH->>P: ProcessBuilder.start() — detached
SH->>SH: runningServices.put(id, process)
SH-->>W: DEPLOYED_SUCCESS
Note over P: Process stays alive — no completion callback
alt Process crashes
P--xSH: onExit fires
SH->>SH: not in intentionallyStopped → crash
SH->>SH: sleep 3s
SH->>P: relaunch same command
else Intentional stop
U->>SS: OP_STOP
SS->>W: forward stop
W->>SH: stopProcess(id)
SH->>SH: intentionallyStopped.add(id)
SH->>P: destroyForcibly()
P--xSH: onExit fires
SH->>SS: OP_UNREGISTER_SERVICE
end
titan_sdk.py submit with DEPLOY_PAYLOAD type
or
TitanCLI.java "deploy log_viewer.py 9991"
│
▼
SchedulerServer.java case OP_DEPLOY:
└─ scheduler.submitJob(job) ← payload starts with "DEPLOY_PAYLOAD"
│
▼
Scheduler.java runDispatchLoop() picks job
└─ dispatches to worker
│
▼ TCP port 8080
RpcWorkerServer.java processCommand(payload)
└─ taskType = "DEPLOY_PAYLOAD" or "START_SERVICE"
└─ ServiceHandler.execute(payload)
│
▼
ServiceHandler.java startProcess(filename, serviceId, port)
└─ if .jar → ProcessBuilder("java", "-jar", file, port)
└─ if .py → launchDetachedProcess("python", file)
│
▼
launchDetachedProcess(serviceId, dir, command)
└─ ProcessBuilder.start()
└─ thread: stdout → LogBatcher → stream to Master
└─ runningServices.put(serviceId, process)
└─ ProcessRegistry.register(serviceId, pid)
└─ process.onExit().thenRun(() → {
if intentionallyStopped → notify Master, done
else → crash detected → sleep 3s → relaunch same command
})
└─ return "DEPLOYED_SUCCESS"
│
▼
← process stays alive, no OP_JOB_COMPLETE callback →
← service appears in dashboard under worker's Running Services →
Stopping a service:
TitanCLI.java "stop service-id"
│
▼
SchedulerServer.java case OP_STOP → forward to worker
│
▼
ServiceHandler.java stopProcess(serviceId)
└─ intentionallyStopped.add(serviceId) ← prevents auto-restart
└─ process.destroyForcibly()
└─ onExit fires → sees intentionallyStopped → notifyMasterOfServiceStop()
Scenario 5: HITL gate (pause for human approval)
A DAG with a human checkpoint mid-execution.
sequenceDiagram
participant SDK as titan_sdk.py
participant S as Scheduler
participant W as Worker
participant Gate as hitl_gate.py
participant TS as TitanStore
participant H as Human (CLI / Dashboard)
SDK->>SDK: _inject_hitl_gates() → insert gate job
SDK->>S: submit DAG: preprocess → hitl-gate → train
Note over S: preprocess completes → unlock gate
S->>W: dispatch hitl-gate-preprocess
W->>Gate: ProcessBuilder.start()
loop Every 3 seconds
Gate->>TS: GET titan:hitl:status:hitl-gate-preprocess
TS-->>Gate: CLEARED (keep waiting)
end
H->>TS: SET titan:hitl:status:hitl-gate-preprocess APPROVED
Gate->>TS: GET titan:hitl:status:hitl-gate-preprocess
TS-->>Gate: APPROVED
Gate->>Gate: sys.exit(0)
W-->>S: OP_JOB_COMPLETE (COMPLETED)
S->>S: unlockChildren() → train enters taskQueue
titan_sdk.py submit_dag("pipeline", [
TitanJob("preprocess", ..., hitl_message="Review before training"),
TitanJob("train", ..., parents=["preprocess"])
])
└─ _inject_hitl_gates(jobs)
└─ detects hitl_message on "preprocess"
└─ creates gate job: "hitl-gate-preprocess"
└─ rewires: train.parents = ["hitl-gate-preprocess"]
└─ store_put("titan:hitl:status:hitl-gate-preprocess", "CLEARED")
│
▼
Final DAG: preprocess → hitl-gate-preprocess → train
│
▼
Scheduler.java preprocess runs and completes
└─ unlockChildren → hitl-gate-preprocess enters taskQueue
│
▼
hitl-gate-preprocess dispatched to worker
│
▼
Worker runs: perm_files/hitl_gate.py
└─ args: "hitl-gate-preprocess 172800 Review before training"
└─ store_put("titan:hitl:queue", gate_id) ← dashboard polls this
└─ loop:
store_get("titan:hitl:status:hitl-gate-preprocess")
if "APPROVED" → sys.exit(0) ← job COMPLETED
if "REJECTED" → sys.exit(1) ← job FAILED
else → sleep 3s, keep polling
│
▼ (meanwhile, human reviews)
Dashboard /api/hitl/pending → shows pending gates
or
TitanCLI.java "approve hitl-gate-preprocess"
└─ OP_KV_SET: "titan:hitl:status:hitl-gate-preprocess|APPROVED"
│
▼
hitl_gate.py next poll sees "APPROVED" → sys.exit(0)
│
▼
OP_JOB_COMPLETE: hitl-gate-preprocess COMPLETED
└─ unlockChildren → train enters taskQueue → dispatches → runs
If rejected: gate exits 1 → job FAILED → train stays in dagWaitingRoom forever (parent never completed).
Scenario 6: Job failure with retry
A job fails, gets retried up to 3 times, then goes to dead letter queue.
sequenceDiagram
participant W as Worker
participant S as Scheduler
participant TS as TitanStore
participant DLQ as Dead Letter Queue
W->>S: OP_JOB_COMPLETE (FAILED)
S->>S: handleJobFailure(job)
S->>S: retryCount++ → now 1
alt retryCount < 3
S->>TS: SET job:X:status PENDING
S->>S: taskQueue.add(job) — re-queued
Note over S: dispatch loop picks it up again
S->>W: re-dispatch to worker
W->>S: FAILED again → retry 2 → re-queue
S->>W: re-dispatch
W->>S: FAILED again → retry 3
end
S->>S: retryCount >= 3
S->>TS: SET job:X:status DEAD
S->>DLQ: deadLetterQueue.add(job)
S->>S: cancel all children in dagWaitingRoom
ScriptExecutorHandler process exits with code 1 (or crashes)
└─ return "FAILED: ..."
│
▼
RpcWorkerServer.java sendCallback(OP_JOB_COMPLETE, "job-X|FAILED|error message")
│
▼
Scheduler.java handleJobCallback(payload)
└─ statusStr = "FAILED"
└─ handleJobFailure(job)
└─ job.incrementRetry()
└─ if retryCount < 3:
job.setStatus(PENDING)
safeRedisSet("job:X:status", "PENDING")
taskQueue.add(job) ← re-queued for another attempt
└─ if retryCount >= 3:
safeRedisSet("job:X:status", "DEAD")
deadLetterQueue.add(job) ← no more retries
cancel all children in dagWaitingRoom
Scenario 7: Cancel with cascade
User cancels a running job. All downstream descendants are cancelled too.
TitanCLI.java "cancel parent-job"
│
▼
SchedulerServer.java case OP_CANCEL_JOB:
└─ scheduler.cancelJob("DAG-parent-job")
│
▼
Scheduler.java cancelJob(jobId)
└─ check: is it in taskQueue? → remove, mark CANCELLED
└─ check: is it in dagWaitingRoom? → remove, mark CANCELLED
└─ check: is it running on a worker?
└─ yes → send kill signal to worker
└─ worker kills the process
└─ safeRedisSet("job:X:status", "CANCELLED")
└─ cascade: find all jobs in dagWaitingRoom whose ancestors include X
└─ for each descendant:
safeRedisSet("job:child:status", "CANCELLED")
remove from dagWaitingRoom
Scenario 8: Worker crash and recovery
A worker dies while running a job. Master detects and reschedules.
Worker process killed (kill -9, OOM, hardware failure)
│
▼
Scheduler.java reconcileClusters() runs periodically
└─ checks heartbeat timestamps for all registered workers
└─ worker hasn't sent heartbeat in > threshold
└─ removes worker from WorkerRegistry
└─ any jobs assigned to that worker:
└─ handleJobFailure(job) → re-queues with retry increment
│
▼
RpcWorkerServer.java (on the worker side, if worker restarts)
└─ reRegister loop fires every 30s
└─ sends OP_REGISTER to Master
└─ Master re-adds to WorkerRegistry
└─ worker is available for dispatch again
Scenario 9: MCP submission (natural language)
Claude Desktop or Cursor submits a DAG through the MCP server.
sequenceDiagram
participant U as User
participant C as Claude Desktop
participant MCP as titan_mcp_server.py
participant SDK as TitanClient
participant S as Scheduler
participant W as Workers
U->>C: "Run 3 analysis jobs in parallel, combine results"
C->>C: generates Python scripts on the fly
C->>MCP: submit_dag_pipeline(name, jobs_json)
MCP->>MCP: auto-generate agent_run_id
MCP->>MCP: prepend _SCRIPT_PREAMBLE to each script
MCP->>MCP: write scripts to perm_files/
MCP->>SDK: submit_dag(name, jobs, agent_run_id)
SDK->>S: OP_SUBMIT_DAG (same path as Scenario 1)
S->>W: dispatch jobs across workers
loop Poll until done
C->>MCP: get_job_status(job_id)
MCP->>SDK: get_job_status()
SDK-->>MCP: COMPLETED
MCP-->>C: status
end
C->>MCP: get_job_logs(job_id)
MCP-->>C: logs content
C-->>U: rendered results
Claude Desktop user types: "Run three analysis jobs in parallel, combine results"
└─ Claude generates job scripts on the fly
└─ calls MCP tool: submit_dag_pipeline(dag_name, jobs_json)
│
▼ MCP stdio protocol
titan_mcp_server.py submit_dag_pipeline handler
└─ generates agent_run_id (auto: "mcp-{name}-{uuid}")
└─ for each job: prepend _SCRIPT_PREAMBLE (sys.path fix)
└─ write scripts to perm_files/
└─ TitanClient().submit_dag(name, jobs, agent_run_id=run_id)
│
▼
titan_sdk.py submit_dag() → same path as Scenario 1
└─ manifest includes agent_run_id → appears in Agent Runs tab
│
▼
← jobs execute across workers, logs stream back →
│
▼
Claude Desktop calls get_job_status, get_job_logs to poll
└─ reads results, renders response to user
Common Gotchas
JAR mismatch. After rebuilding with mvn package, the running Master and Worker still use the old JAR. Restart the cluster or verify the PID matches the new process.
Worker.jar is a copy. perm_files/Worker.jar is a copy of the engine JAR used when deploying workers remotely. After a rebuild: cp target/titan-orchestrator-1.0-SNAPSHOT.jar perm_files/Worker.jar.
TitanStore must start before Master. The Master connects to TitanStore on startup. If TitanStore isn't running, the Master falls back to in-memory mode (no persistence, no recovery). titan-dev.sh up handles the order correctly.
Dashboard template changes need restart. Flask doesn't auto-reload templates in production mode. Kill and restart the dashboard process after editing templates.
DAG- prefix. All job IDs submitted via submit_dag() are prefixed with DAG- by the SDK. The CLI auto-adds this prefix for status, logs, and cancel commands, but raw OP_GET_JOB_STATUS calls need it explicitly.
HITL gates are just scripts. hitl_gate.py is a regular job that polls titan:hitl:status:{gate_id} in TitanStore. Approving a gate from the CLI (approve hitl-gate-X) simply writes APPROVED to that key. The gate script sees it on its next poll and exits 0.
Key Methods Reference
If you're tracing a bug or adding a feature, these are the methods you'll spend the most time in:
| Method | File | What it does |
|---|---|---|
runDispatchLoop() |
Scheduler.java | Main loop — takes from queue, finds worker, dispatches |
selectBestWorker() |
Scheduler.java | Least-loaded worker selection with affinity |
submitJob() |
Scheduler.java | Persists job state, routes to queue or waiting room |
completeJob() |
Scheduler.java | Marks complete, unlocks children, updates history |
handleJobFailure() |
Scheduler.java | Retry or dead-letter decision |
handleJobCallback() |
Scheduler.java | Receives worker completion/failure callbacks |
unlockChildren() |
Scheduler.java | Moves dependency-met jobs from waiting room to queue |
cancelJob() |
Scheduler.java | Cancel + cascade to all descendants |
clientHandler() |
SchedulerServer.java | Opcode routing — the switch statement |
processCommand() |
RpcWorkerServer.java | Worker-side command dispatch |
execute() |
ScriptExecutorHandler.java | Process spawn, stdout capture, exit handling |
launchDetachedProcess() |
ServiceHandler.java | Service deploy with auto-restart |
submit_dag() |
titan_sdk.py | SDK entry point — HITL injection, manifest write |
discover_dags_from_stats() |
server_dashboard.py | Dashboard DAG discovery from Master stats |