Dynamic core.async/flow graphs for agent fan-out

2026-04-10 Fri 16:47 article clojure llm publish

core.async/flow (ref) runs a directed graph of processes connected by channels. The graph is a plain map, allowing for generation at runtime. I experimented with giving an LLM a tool to do just that.

The Plan

The LLM produces a TaskPlan via structured output: a vector of self-contained prompts and an instruction for synthesizing them.

(def TaskPlanSchema
  [:map
   [:tasks
    [:vector {:min-count 2 :max-count 6}
     [:map [:prompt :string]]]]
   [:aggregation-prompt :string]])

m/explain validates the plan before any process starts.

Two Process Templates

llm-worker-fn receives a prompt string, calls generate, emits the result on :out:

clojure

(defn llm-worker-fn
  ;; declares the input and output port names
  ([] {:ins {:in "prompt string"} :outs {:out "answer string"}})
  ;; returns initial state — stateless, so empty map
  ([_] {})
  ;; called on lifecycle transitions (pause/resume/stop) — passthrough
  ([state _] state)
  ;; called on each incoming message — runs the agent, emits the answer
  ([state _port {:keys [id prompt]}]
   (let [answer (:text (llm/run-agent ai
                                      {:tools worker-tools
                                       :system-prompt "Use kagi_search to find current information. Answer in 4-6 sentences."
                                       :max-steps 3}
                                      prompt))]
     [state {:out [answer]}])))

make-aggregator-fn takes n and aggregation-prompt. Each incoming message is added to state. When the nth arrives, it calls generate with all results and emits the synthesis:

(defn make-aggregator-fn [n aggregation-prompt]
  (fn
    ([] {:ins {:in "worker result"} :outs {:out "final answer"}})
    ([_] {:results []})
    ([state _] state)
    ([{:keys [results] :as state} _port msg]
     (let [results' (conj results msg)]
       (if (= (count results') n)
         (let [numbered (str/join "\n\n---\n\n"
                          (map-indexed #(str "Result " (inc %1) ":\n" %2) results'))
               answer   (:text (llm/generate ai (str aggregation-prompt "\n\n" numbered)))]
           [(assoc state :results results') {:out [answer]}])
         [(assoc state :results results') {}])))))

Flow's state persists across transform calls. Workers complete in any order. The aggregator fires when the count is reached.

Building the Graph

tasks->flow-config takes a validated plan and an output channel and returns a create-flow config. Worker ids are generated with map-indexed:

(defn tasks->flow-config [{:keys [tasks aggregation-prompt]} out-chan]
  (let [n          (count tasks)
        indexed    (map-indexed (fn [i t] [(keyword (str "task-" (inc i))) t]) tasks)
        worker-ids (mapv first indexed)]
    {:procs (merge
              (into {} (map (fn [[id {:keys [prompt]}]]
                              [id {:proc (flow/process llm-worker-fn {:workload :io})
                                   :args {:prompt prompt}}])
                            indexed))
              {:aggregate {:proc (flow/process (make-aggregator-fn n aggregation-prompt) {:workload :io})}
               :sink      {:proc (flow/process sink-fn {:workload :io}) :args {:out-chan out-chan}}})
     :conns (into (mapv #(vector [% :out] [:aggregate :in]) worker-ids)
                  [[[:aggregate :out] [:sink :in]]])}))

For 5 tasks:

[task-1] ──┐
[task-2] ──┤
[task-3] ──┼──► [aggregate] ──► [sink]
[task-4] ──┤
[task-5] ──┘

For a question that decomposes into 4 tasks, tasks->flow-config returns:

{:procs
 {:task-1    {:proc #<process llm-worker-fn>
              :args {:prompt "Research aspect A..."}}
  :task-2    {:proc #<process llm-worker-fn>
              :args {:prompt "Research aspect B..."}}
  :task-3    {:proc #<process llm-worker-fn>
              :args {:prompt "Research aspect C..."}}
  :task-4    {:proc #<process llm-worker-fn>
              :args {:prompt "Research aspect D..."}}
  :aggregate {:proc #<process make-aggregator-fn/4>}
  :sink      {:proc #<process sink-fn>
              :args {:out-chan #<ManyToManyChannel>}}}
 :conns
 [[[:task-1    :out] [:aggregate :in]]
  [[:task-2    :out] [:aggregate :in]]
  [[:task-3    :out] [:aggregate :in]]
  [[:task-4    :out] [:aggregate :in]]
  [[:aggregate :out] [:sink     :in]]]}

The Tool

(defn fan-out-research
  {:malli/schema
   [:=> [:cat
         [:map {:name        "fan_out_research"
                :description "Decompose a question into parallel subtasks and synthesize results."}
          [:tasks
           {:description "2-6 independent subtasks. Each prompt must be self-contained."}
           [:vector {:min-count 2 :max-count 6}
            [:map [:prompt :string]]]]
          [:aggregation-prompt
           {:description "instruction for the synthesis step"}
           :string]]]
        :string]}
  [{:keys [tasks] :as plan}]
  (when-let [err (m/explain TaskPlanSchema plan)]
    (throw (ex-info "Invalid task plan" {:explain err})))
  (let [out-chan (a/chan 1)
        g        (flow/create-flow (tasks->flow-config plan out-chan))
        {:keys [error-chan]} (flow/start g)]
    (a/go-loop []
      (when-let [err (a/<! error-chan)]
        (println "FLOW ERROR:" (pr-str err))
        (recur)))
    (flow/resume g)
    (doseq [[id {:keys [prompt]}] (map-indexed (fn [i t] [(keyword (str "task-" (inc i))) t]) tasks)]
      (flow/inject g [id :in] [{:prompt prompt}]))
    (let [result (a/<!! out-chan)]
      (flow/stop g)
      result)))

In Action

(llm/run-agent ai
  {:tools        [#'fan-out-research]
   :system-prompt "Use fan_out_research to decompose complex questions into parallel subtasks."
   :max-steps    3}
  "Compare the food cultures of Osaka, Lyon, and Mexico City.")

The model decomposes the question into 5 tasks: one per city, plus cross-cutting tasks for street food comparison and historical influences.

[fan-out] spawning 5 parallel workers
  task-1: What are the key characteristics of Osaka's food culture?...
  task-2: What are the key characteristics of Lyon's food culture?...
  task-3: What are the key characteristics of Mexico City's food culture?...
  task-4: Compare and contrast street food, fine dining, and regional specialties...
  task-5: What are some historical influences on the food cultures of all three cities?...

  [worker task-1] running...
  [worker task-3] running...
  [worker task-4] running...
  [worker task-5] running...
  [worker task-2] running...
  [worker task-2] done
  [worker task-1] done
  [worker task-5] done
  [worker task-3] done
  [worker task-4] done
  [aggregate] all 5 results in — synthesizing...

=== Final Answer ===

The food cultures of Osaka, Lyon, and Mexico City each possess distinct
characteristics shaped by history and regional influences.

Osaka, known for its "kuidaore" ("eat 'til you drop") philosophy, emphasizes
affordability and variety, with signature dishes like takoyaki and okonomiyaki.
Its merchant city history fostered a focus on fresh ingredients and
presentation. Street food and regional variations thrive, while fine dining is
less prominent.

Lyon is characterized by its rich, traditional, hearty cuisine. Its historical
role as a silk trading hub influenced its access to diverse ingredients and
skilled chefs. Bouchons serve dishes like quenelles and coq au vin, utilizing
offal, butter, and regional produce. Fine dining and regional cuisine are
dominant, though street food is emerging.

Mexico City's food culture blends Indigenous, Spanish, and global influences,
reflected in dishes like tacos, enchiladas, and mole. Its history is marked by
Aztec origins, Spanish colonization, and indigenous agricultural practices. The
city boasts a dynamic street food scene, diverse regional influences, and a
growing fine dining sector that combines tradition and modern techniques.

The outer agent loop issues one tool call and receives one string. The flow graph, the parallel workers, and the aggregation are internal to the tool.

The pattern generalizes to any task that decomposes into independent subtasks: code review across multiple files, analysis across multiple datasets, research across multiple sources. The schema, the two process templates, and the graph construction stay the same. Only the prompts change.

Each worker is itself a clj-llm/run-agent call. Passing a fan-out tool to the worker agents allows subgraphs to spawn subgraphs. A depth counter in the task args, decremented at each level and refusing to fan out at zero, bounds the recursion.

Beyond fan-out: arbitrary DAGs

The flat fan-out pattern has a built-in constraint: all tasks are independent and all feed the same aggregator. But create-flow can represent any DAG. The next step is letting the LLM generate one.

The key design decision is separating nodes from edges. Rather than embedding dependency lists inside each node, the plan is two flat vectors:

{:nodes [{:id "rent-shimo"    :type "search" :query "Shimokitazawa rent prices Tokyo"}
         {:id "food-shimo"    :type "search" :query "Shimokitazawa food bar scene"}
         {:id "commute-shimo" :type "search" :query "Shimokitazawa train commute options"}
         {:id "summary-shimo" :type "llm"    :prompt "Summarise Shimokitazawa for a prospective resident"}
         ;; ... same for Nakameguro and Koenji ...
         {:id "compare"       :type "llm"    :prompt "Side-by-side comparison to help decide where to live"}]
 :edges [{:from "rent-shimo"    :to "summary-shimo"}
         {:from "food-shimo"    :to "summary-shimo"}
         {:from "commute-shimo" :to "summary-shimo"}
         ;; ...
         {:from "summary-shimo"  :to "compare"}
         {:from "summary-nakame" :to "compare"}
         {:from "summary-koenji" :to "compare"}]}

Each node variant only carries the fields that apply to it. The schema reflects that:

[:map
 [:nodes [:vector
          [:or
           [:map [:id :string] [:type [:= "search"]] [:query  :string]]
           [:map [:id :string] [:type [:= "llm"]]    [:prompt :string]]]]]
 [:edges [:vector [:map [:from :string] [:to :string]]]]]

Each edge {:from A :to B} becomes two things in the flow config: a connection [[A :out] [B :(keyword A)]], and an expected input port on B's process. The llm process constructor receives the set of incoming port names derived from the edges. It accumulates one message per port in state and fires generation when the set is complete:

(defn make-llm-proc [in-ports prompt]
  (fn
    ;; init
    ([_] {:collected {}})
    ;; transition
    ([state _] state)
    ;; transform — accumulate inputs, fire when all ports have arrived
    ([{:keys [collected] :as state} port msg]
     (let [collected' (assoc collected port msg)]
       (if (= (set (keys collected')) in-ports)
         (let [context (str/join "\n\n---\n\n"
                         (map #(str (name %) ":\n" (get collected' %)) (sort in-ports)))
               answer  (:text (llm/generate ai (str prompt "\n\n" context)))]
           [(assoc state :collected {}) {:out [answer]}])
         [(assoc state :collected collected') {}])))))

The port name carries the provenance. When summary-shimo fires, its context is labelled rent-shimo:, food-shimo:, commute-shimo: — the prompt can reason about which result came from where.

For the Shimokitazawa / Nakameguro / Koenji question, the model generates 13 nodes and 12 edges:

[llm] compare
├── [llm] summary-shimo
│   ├── [search] rent-shimo
│   ├── [search] food-shimo
│   └── [search] commute-shimo
├── [llm] summary-nakame
│   ├── [search] rent-nakame
│   ├── [search] food-nakame
│   └── [search] commute-nakame
└── [llm] summary-koenji
    ├── [search] rent-koenji
    ├── [search] food-koenji
    └── [search] commute-koenji

All 9 searches fire simultaneously. Each neighbourhood summary fires as soon as its 3 searches complete. The final comparison fires when all 3 summaries arrive:

[rent-shimo] searching...      [rent-nakame] searching...      [rent-koenji] searching...
[food-shimo] searching...      [food-nakame] searching...      [food-koenji] searching...
[commute-shimo] searching...   [commute-nakame] searching...   [commute-koenji] searching...

[commute-nakame] done
[food-nakame] done
[rent-nakame] done
[summary-nakame] all inputs arrived — generating...
[rent-shimo] done
[food-shimo] done
[commute-shimo] done
[summary-shimo] all inputs arrived — generating...
[food-koenji] done
[rent-koenji] done
[commute-koenji] done
[summary-koenji] all inputs arrived — generating...
[summary-nakame] done
[summary-shimo] done
[summary-koenji] done
[compare] all inputs arrived — generating...

The LLM's job is graph architecture: which nodes exist, which depend on which. The process templates and the wiring logic are fixed code.

Try it Yourself

Both scripts are in the clj-llm repository. Workers use Kagi FastGPT for grounded results when KAGI_API_KEY is set, and fall back to the model's own knowledge without it. LLM_MODEL overrides the default model.

git clone https://github.com/minikomi/clj-llm
cd clj-llm

Flat fan-out

Pipe any question in and the LLM decomposes it into independent parallel tasks:

echo "Compare the food cultures of Osaka, Lyon, and Mexico City" | clojure -M:flow scripts/fan-out.clj

DAG research

Pipe a question that needs layered research — the LLM designs the graph, nodes run in parallel where possible:

echo "Shimokitazawa vs Nakameguro vs Koenji: search rent, food scene, and commute separately for each, summarise each, then compare." | clojure -M:flow scripts/dag-research.clj

Output for that question — 13 nodes, 12 edges, 34 seconds wall time:

[dag] 13 nodes — 9 search, 4 llm
      12 edges

[llm] compare
├── [llm] summary_shimo
│   ├── [search] rent_shimo
│   ├── [search] food_shimo
│   └── [search] commute_shimo
├── [llm] summary_naka
│   ├── [search] rent_naka
│   ├── [search] food_naka
│   └── [search] commute_naka
└── [llm] summary_koen
    ├── [search] rent_koen
    ├── [search] food_koen
    └── [search] commute_koen

  [rent_shimo] searching...  [rent_naka] searching...  [rent_koen] searching...
  [food_shimo] searching...  [food_naka] searching...  [food_koen] searching...
  [commute_shimo] searching...  [commute_naka] searching...  [commute_koen] searching...
  [food_shimo] done
  [rent_shimo] done
  [rent_koen] done
  [commute_koen] done
  [summary_koen] all inputs arrived — generating...
  [rent_naka] done
  [food_koen] done
  [food_naka] done
  [commute_naka] done
  [summary_naka] all inputs arrived — generating...
  [commute_shimo] done
  [summary_shimo] all inputs arrived — generating...
  [summary_koen] done
  [summary_naka] done
  [summary_shimo] done
  [compare] all inputs arrived — generating...
  [compare] done

Progress and the tree diagram go to stderr, the final answer to stdout, so results can be redirected:

echo "..." | clojure -M:flow scripts/dag-research.clj > answer.md