Distributed LLM Pipeline — Parallel Analyst
A parallel, fan-out/fan-in pipeline where multiple Gemini agents analyse competing frameworks simultaneously and a synthesis agent consolidates the results into a structured report.
This example demonstrates Titan as a distributed LLM pipeline runner — the same pattern used in data engineering (MapReduce, ETL) but with LLM calls as the compute step.
What It Does
Given a topic and a list of frameworks or products to compare, the pipeline:
- Spawns N analyst agents in parallel — one per framework — each calling Gemini independently
- Each analyst writes its result to the shared workspace
- A synthesis agent (dependent on all analysts completing) reads all results and calls Gemini to produce a structured comparison report
flowchart TD
O["🐍 Orchestrator\n(your machine)"]
M["Titan Master"]
O -->|"submit_dag()"| M
M --> A0["analyst-0\ngemini-2.5-flash\nFramework 0"]
M --> A1["analyst-1\ngemini-2.5-flash\nFramework 1"]
M --> A2["analyst-2\ngemini-2.5-flash\nFramework 2"]
A0 -->|"intel_result_0.txt"| WS[("titan_workspace\n/shared")]
A1 -->|"intel_result_1.txt"| WS
A2 -->|"intel_result_2.txt"| WS
A0 --> S["synthesizer\ngemini-2.5-flash\nFan-in report"]
A1 --> S
A2 --> S
WS -->|"reads all results"| S
S --> RPT["📄 comp_intel_report.md"]
style O fill:#1e293b,stroke:#1de9b6,stroke-width:2px,color:#ffffff
style M fill:#1e293b,stroke:#64748b,stroke-width:2px,color:#ffffff
style A0 fill:#1e293b,stroke:#818cf8,stroke-width:2px,color:#ffffff
style A1 fill:#1e293b,stroke:#818cf8,stroke-width:2px,color:#ffffff
style A2 fill:#1e293b,stroke:#818cf8,stroke-width:2px,color:#ffffff
style WS fill:#1e293b,stroke:#f9a826,stroke-width:2px,color:#ffffff
style S fill:#1e293b,stroke:#22c55e,stroke-width:2px,color:#ffffff
style RPT fill:#1e293b,stroke:#22c55e,stroke-width:2px,color:#ffffff
Titan Features Demonstrated
| Feature | How it appears |
|---|---|
| Parallel fan-out | All N analyst jobs dispatch simultaneously to available workers |
| Fan-in dependency | Synthesizer declares parents=[analyst-0, analyst-1, ..., analyst-N] — cannot start until every analyst completes |
| Dynamic DAG | Number of analyst jobs equals number of frameworks passed — DAG shape is decided at runtime |
| Live log streaming | Each analyst streams its Gemini response preview to the Dashboard log viewer in real time |
| Shared workspace | Analyst results written to titan_workspace/shared/ — accessible by all workers without TitanStore |
Prerequisites
Run It
# Default — compares LangGraph, CrewAI, AutoGen
python titan_test_suite/examples/agents_examples/comp_intel/comp_intel_pipeline.py
# Custom topic + frameworks (wrap multi-word names in quotes)
python titan_test_suite/examples/agents_examples/comp_intel/comp_intel_pipeline.py \
"Cloud AI Platforms" "AWS SageMaker" "Google Vertex AI" "Azure ML"
# Any domain
python titan_test_suite/examples/agents_examples/comp_intel/comp_intel_pipeline.py \
"JavaScript Frameworks" "React" "Vue" "Svelte"
Watch it run at http://localhost:5000 — analyst nodes turn orange as they execute in parallel, then green as they complete. The synthesizer node activates only after all analysts are green.
Output
The synthesis report is saved to:
Report structure (generated by Gemini):
- Executive Summary
- Head-to-Head Comparison table
- Key Differentiators (one paragraph per framework)
- Recommendation Matrix ("Choose X if...")
Files
| File | Role |
|---|---|
titan_test_suite/examples/agents_examples/comp_intel/comp_intel_pipeline.py |
Orchestrator — builds DAG and submits |
perm_files/comp_intel_analyst.py |
Worker — analyses one framework with Gemini |
perm_files/comp_intel_synthesizer.py |
Worker — reads all results, synthesizes final report |
Extend It
Add more frameworks — just pass more args:
python comp_intel_pipeline.py "Database Engines" \
"PostgreSQL" "MongoDB" "Cassandra" "CockroachDB" "DynamoDB"
Up to 6 analysts run in parallel. Each additional framework adds one parallel job with zero code changes.
Add a HITL gate before synthesis — review the raw analyses before committing the Gemini synthesis call:
gate_job = TitanJob(
job_id = "review-gate",
filename = _HITL_GATE_SCRIPT,
args = "review-gate 3600 Analyses ready. Approve synthesis?",
parents = [f"analyst-{i}" for i in range(len(frameworks))],
)
synthesizer_job = TitanJob(
...
parents = ["review-gate"], # depends on gate, not analysts directly
)
Distributed LLM Pipeline vs Agentic Pipeline
This example is a distributed LLM pipeline — the DAG is fixed at submission time and each worker executes a predetermined prompt. No agent makes a routing decision based on what it observes.
For an example where agents dynamically decide what happens next, see the Research Agent.