Skip to content

📚 Python SDK Reference

The Titan Python SDK allows you to programmatically define jobs, interact with the TitanStore data bus, and manage distributed artifacts.


1. Core Classes

TitanClient

The main entry point for connecting to the cluster.

from titan_sdk import TitanClient

# No constructor args — host and port are configured via
# TITAN_HOST / TITAN_PORT environment variables (default: 127.0.0.1:9090)
client = TitanClient()

DAG submission

Method Description
client.submit_job(job) Dispatches a single TitanJob to the cluster.
client.submit_dag(name, jobs, agent_run_id=None) Submits a list of linked TitanJob objects as a single named DAG. Pass agent_run_id to link multiple DAG submissions into one logical agent run visible in the Dashboard.
client.get_job_status(job_id) Securely queries the Master for a job's internal system status.
client.fetch_logs(job_id) Retrieves the stdout/stderr logs for a specific job ID.

File transfer

Method Description
client.upload_file(filepath) Uploads a single file to the Master's uploads/ directory. Returns "UPLOAD_SUCCESS" on success.
client.upload_project_folder(path) Zips and uploads a local folder to the Master's artifact registry.
client.publish_artifact(key, filename) Worker-side. Uploads filename to Master and registers the basename in TitanStore under key. Pair with get_artifact on the orchestrator side.
client.get_artifact(key, save_path=None) Orchestrator-side. Reads the filename registered under key, downloads it from Master, and saves to save_path (defaults to /tmp/<filename>). Returns True on success.
client.fetch_artifact(filename, save_path=None) Low-level download by filename from Master's uploads/ directory.
client.deploy_script(filepath) Deploys a worker script to Master's perm_files/ directory. Returns "DEPLOY_SUCCESS" on success.

TitanStore (shared KV)

Method Description
client.store_put(key, value) Saves a string value to the distributed TitanStore.
client.store_get(key) Retrieves a string value from the distributed TitanStore.
client.store_sadd(key, member) Adds a member to a distributed Set. Returns 1 if new, 0 if already exists.
client.store_smembers(key) Returns a Python list of all members in the specified Set.

TitanJob

Represents a unit of work to be executed on the cluster.

from titan_sdk import TitanJob

job = TitanJob(
    job_id="train_v1",
    filename="scripts/train.py",
    requirement="GPU",     # Optional: "GPU" or "GENERAL"
    priority=10,           # Optional: Higher numbers schedule first
    parents=["data_prep"], # Optional: List of parent Job IDs
    is_archive=False       # Set True if deploying a ZIP/Service
)

These are the constructor parameters:

Parameter Type Default Description
job_id str Required Unique identifier for this execution step.
filename str Required Absolute or relative path to the script or artifact.
job_type str "RUN_PAYLOAD" Defines execution mode (e.g., use "SERVICE" for long-running processes).
args str None Command-line arguments passed to the executed script.
parents list None List of parent job_ids that must complete successfully before this task runs.
port int 0 Port number to bind to (Required if deploying a long-running Service).
is_archive bool False Set to True if deploying a zipped project folder.
priority int 1 Queue priority. Higher numbers are scheduled first.
delay int 0 Artificial delay (in seconds/ms depending on scheduler) before execution.
affinity bool False If True, Titan attempts to route this task to the exact same physical node as its parent task.
requirement str "GENERAL" Hardware capability routing tag (e.g., "GPU", "HIGH_MEM").
hitl_message str None When set, the SDK automatically injects a Human-in-the-Loop gate after this job. The string is shown to the operator in the Dashboard. See HITL Pipelines.
max_wait_seconds int 172800 Maximum time (in seconds) the HITL gate will wait for a human decision before auto-failing. Default is 48 hours. Only applies when hitl_message is set.

2. Defining DAGs Programmatically

You can build dependency graphs using the SDK's API instead of YAML.

from titan_sdk import TitanClient, TitanJob

client = TitanClient()

# Step 1: Define the Root Job (No parents)
task_a = TitanJob(
    job_id="extract_data",
    filename="etl/extract.py",
    priority=5
)

# Step 2: Define a Dependent Job
task_b = TitanJob(
    job_id="train_model",
    filename="ml/train.py",
    requirement="GPU",
    parents=["extract_data"]  # <--- Defines the dependency
)

# Step 3: Submit them as a unified DAG
client.submit_dag("nightly_pipeline", [task_a, task_b])
print("DAG Submitted!")

Linking multiple DAGs into one Agent Run

For agentic workflows that submit several DAGs sequentially, pass the same agent_run_id to each submit_dag call. This links all stages into a single run entry in the Dashboard's Agent Runs view.

import uuid
run_id = uuid.uuid4().hex[:12]

client.submit_dag("PLAN",    [planner_job],   agent_run_id=run_id)
# ... wait for planner, read results ...
client.submit_dag("EXECUTE", executor_jobs,   agent_run_id=run_id)
# ... wait, evaluate ...
client.submit_dag("SYNTH",   [synthesis_job], agent_run_id=run_id)

3. Using the Distributed Data Bus (TitanStore)

Tasks running on completely different physical nodes can share state, pass intermediate variables, or track metrics using Titan's built-in persistence layer.

File 1: task_a.py (Producer)

from titan_sdk import TitanClient

client = TitanClient()
# Save a result globally before the task exits
client.store_put("task_123_accuracy", "98.5")
client.store_sadd("processed_files", "batch_A.csv")

File 2: task_b.py (Consumer)

from titan_sdk import TitanClient

client = TitanClient()
# Retrieve the data passed from Task A
accuracy = client.store_get("task_123_accuracy")
completed_files = client.store_smembers("processed_files")

print(f"Downstream task received accuracy: {accuracy}")

4. File Artifacts

Use artifacts when a worker produces a file that the orchestrator (or a downstream worker) needs to read. TitanStore only holds strings — for binary files or large text outputs, use the artifact system.

Pattern: worker publishes → orchestrator downloads

# worker_script.py (runs on a worker node)
from titan_sdk import TitanClient

client = TitanClient()

# Write the output file to the local workspace
with open("report.md", "w") as f:
    f.write(final_report)

# Upload to master and register under a TitanStore key
client.publish_artifact(f"run:{run_id}:report", "report.md")
# orchestrator.py (runs on your machine)
from titan_sdk import TitanClient

client = TitanClient()

# Download the file by key — saves to /tmp/report.md by default
client.get_artifact(f"run:{run_id}:report", save_path=f"/tmp/report_{run_id}.md")

Remote workers must upload files explicitly

titan_workspace/shared is local to the worker node. For local workers (same machine as Master) files appear in the Dashboard automatically. For remote workers (RunPod, GCP, SSH tunnel), files written to disk stay on the remote machine and will never appear in Dashboard → Workspace Files unless explicitly uploaded.

Use upload_file for simple downloads from the Dashboard, or publish_artifact / get_artifact when the orchestrator needs to read the file programmatically:

  • upload_file("output.txt") — file appears in Dashboard Workspace Files, downloadable by a human
  • publish_artifact(key, "output.txt") — file uploaded and registered in TitanStore; retrieve with get_artifact(key) from the orchestrator