Gather nodes

A gather node runs a tool once per item of an upstream collection and gathers the per-run outputs into parallel lists. Use it when an upstream produces many items but the tool’s input is cardinality = "one", i.e. the tool can only consume one item per invocation.

rf   = g.add_node("rfdiffusion")                  # output `designs`: a dir of N backbones
mpnn = g.add_gather_node("proteinmpnn", split_key="designs")
g.add_edge((rf, mpnn))

split_key names the upstream output key whose value is the collection to fan out over.

Finding split_key

split_key is the name of the upstream tool’s output that holds the collection, read it from the producing tool’s manifest, not the consuming tool’s inputs:

bv show rfdiffusion --format json

RFdiffusion declares one output, designs (a directory of backbones), so split_key="designs". The rule of thumb: pick the upstream output that is a directory or list of the items you want to process one at a time. You also need a gather node (rather than a plain tool node) precisely when that upstream output is a collection but the downstream tool’s matching input is cardinality = "one".

        flowchart LR
    RF["rfdiffusion<br/>runs once → N backbones"] --> SC{{"scatter<br/>split into N inputs"}}
    K(["knobs<br/>num_seq_per_target=2"]) -. into every run .-> RUNS
    SC --> RUNS["proteinmpnn ×N<br/>one backbone each"]
    RUNS --> GA{{"gather<br/>N outputs → lists"}}
    GA --> OUT(["sequences: [s1 … sN]"])
    

Execution

For rfdiffusion gather(proteinmpnn) with N backbones, the executor:

  1. runs the single upstream once, producing e.g. {"designs": "/dir"};

  2. calls the edge’s scatter connector once on that output; it returns a list of N per-item input dicts, e.g. [{"pdb_path": f0}, {"pdb_path": f1}, …];

  3. runs the tool once per dict via run_one, each invocation gets its own numbered output directory, and the tool itself is unaware of the fan-out;

  4. gathers the N result dicts into a dict of lists, {"sequences": [s0, s1, …]}.

Shared parameters (a separate input node) are applied verbatim into every run, not scattered:

mpnn_in = g.add_input_node(num_seq_per_target="2", sampling_temp="0.1")
g.add_edge((rfdiffusion, proteinmpnn), (mpnn_in, proteinmpnn))

A gather node requires exactly one non-input upstream (the collection source); additional input nodes for scalars are allowed.

How the split happens

The split is done by the connector, not by hand-written code. Because the downstream is a gather node, the connector is generated with a scatter instruction and returns list[dict] instead of a single dict. It infers the items from the real runtime data: a directory’s files (grouped by shared prefix, excluding trajectory/temp artifacts), a list’s elements, or a multi-record file’s records. There is no fixed splitter table, a new collection shape works because the connector writes the appropriate crawling code (Connectors).

The connector is generated once per edge and reused for all N items; an N-way fan-out does not generate N connectors.

Chaining gather nodes

gather emits lists, so a gather node can feed another gather node:

proteinmpnn = g.add_gather_node("proteinmpnn", split_key="designs")
colabfold   = g.add_gather_node("colabfold",   split_key="sequences")
g.add_edge((rfdiffusion, proteinmpnn))
g.add_edge((proteinmpnn, colabfold))

The second scatter receives an already-gathered list of directories and flattens it into per-item inputs. This two-level pattern (backbones → one sequence design each → one structure prediction each) is the basis of the self-consistency example (Example pipelines).

Reference

class biocomposer.GatherNode(tool_name: str, split_key: str, inNodes: list = None, args_override: str = None, entrypoint_override: str = None)

Bases: Node

Run a cardinality-one tool once per item of an upstream collection, gathering outputs into a dict-of-lists. split_key names the upstream output key holding the collection. Partitioning is done by the edge connector (see _execute_gather_node).

static gather(results: list) dict

list-of-dicts -> dict-of-lists, so downstream sees parallel lists.

run_one(mapped_inputs: dict) dict

One scattered item; Node.run gives each its own numbered output_dir.