Skip to content

Pipeline Module

The pipeline module provides a high-level API for running distance computation workflows. It supports both programmatic and configuration-based approaches.

Overview

The pipeline system offers three levels of usage:

  1. quick_distance - One-liner for simple computations
  2. run_pipeline - Configuration file-based execution
  3. Pipeline - Full programmatic control

Quick Distance

The simplest way to compute distances:

from themap import quick_distance

results = quick_distance(
    data_dir="datasets",
    output_dir="output",
    molecule_featurizer="ecfp",
    molecule_method="euclidean",
)

Function Reference

themap.pipeline.orchestrator.quick_distance

quick_distance(
    data_dir: str,
    output_dir: str = "output",
    molecule_featurizer: str = "ecfp",
    molecule_method: str = "euclidean",
    n_jobs: int = 8,
) -> Dict[str, DistanceMatrix]

Quick distance computation with minimal configuration.

Parameters:

Name Type Description Default
data_dir str

Path to data directory with train/test folders.

required
output_dir str

Path to output directory.

'output'
molecule_featurizer str

Molecule featurizer name.

'ecfp'
molecule_method str

Distance method.

'euclidean'
n_jobs int

Number of parallel jobs.

8

Returns:

Type Description
Dict[str, DistanceMatrix]

Dictionary of distance matrices.

Parameters

Parameter Type Default Description
data_dir str Required Directory with train/test folders
output_dir str "output" Output directory for results
molecule_featurizer str "ecfp" Molecular fingerprint type
molecule_method str "euclidean" Distance metric
n_jobs int 8 Parallel workers

Returns

Dictionary with distance matrices:

{
    "molecule": {
        "target_task_1": {
            "source_task_1": 0.75,
            "source_task_2": 1.23,
        },
        "target_task_2": {...}
    }
}

Run Pipeline

Execute a pipeline from a YAML configuration file:

from themap import run_pipeline

results = run_pipeline("config.yaml")

Function Reference

themap.pipeline.orchestrator.run_pipeline

run_pipeline(config_path: str) -> Dict[str, DistanceMatrix]

Convenience function to run pipeline from config file.

Parameters:

Name Type Description Default
config_path str

Path to YAML config file.

required

Returns:

Type Description
Dict[str, DistanceMatrix]

Dictionary of distance matrices.

Configuration File Format

# config.yaml
data:
  directory: "datasets"
  task_list: null  # Auto-discover tasks

molecule:
  enabled: true
  featurizer: "ecfp"
  method: "euclidean"

protein:
  enabled: false

output:
  directory: "output"
  format: "csv"
  save_features: true

compute:
  n_jobs: 8
  device: "auto"

Pipeline Class

For full programmatic control:

Pipeline

themap.pipeline.orchestrator.Pipeline

Main pipeline orchestrator for THEMAP distance computation.

Orchestrates the complete workflow: 1. Load datasets from train/test directories 2. Compute molecule features (with caching) 3. Compute protein features (with caching, if enabled) 4. Compute distance matrices 5. Combine matrices (if needed) 6. Save results to output directory

Attributes:

Name Type Description
config

Pipeline configuration

loader

Dataset loader

cache Optional[FeatureCache]

Feature cache (if save_features is enabled)

mol_featurizer MoleculeFeaturizer

Molecule featurizer (if molecule distance enabled)

prot_featurizer ProteinFeaturizer

Protein featurizer (if protein distance enabled)

Examples:

>>> config = PipelineConfig.from_yaml("config.yaml")
>>> pipeline = Pipeline(config)
>>> results = pipeline.run()
>>> print(results["molecule"])  # molecule distance matrix

mol_featurizer property

mol_featurizer: MoleculeFeaturizer

Get molecule featurizer (lazy initialization).

prot_featurizer property

prot_featurizer: ProteinFeaturizer

Get protein featurizer (lazy initialization).

__init__

__init__(config: PipelineConfig)

Initialize the pipeline.

Parameters:

Name Type Description Default
config PipelineConfig

Pipeline configuration

required

run

run() -> Dict[str, DistanceMatrix]

Run the complete pipeline.

Returns:

Type Description
Dict[str, DistanceMatrix]

Dictionary mapping distance type to distance matrix:

Dict[str, DistanceMatrix]
  • "molecule": molecule distance matrix (if enabled)
Dict[str, DistanceMatrix]
  • "protein": protein distance matrix (if enabled)
Dict[str, DistanceMatrix]
  • "combined": combined matrix (if combination != "separate")

Raises:

Type Description
ValueError

If no distance type is enabled

Usage Example

from themap import Pipeline, PipelineConfig
from themap.config import (
    DataConfig,
    MoleculeDistanceConfig,
    OutputConfig,
    ComputeConfig,
)
from pathlib import Path

# Build configuration
config = PipelineConfig(
    data=DataConfig(
        directory=Path("datasets"),
        task_list=None,
    ),
    molecule=MoleculeDistanceConfig(
        enabled=True,
        featurizer="ecfp",
        method="euclidean",
    ),
    output=OutputConfig(
        directory=Path("output"),
        format="csv",
        save_features=True,
    ),
    compute=ComputeConfig(
        n_jobs=8,
        device="auto",
    ),
)

# Create and run pipeline
pipeline = Pipeline(config)
results = pipeline.run()

# Access results
print(f"Computed distances for {len(results['molecule'])} target tasks")

Configuration Classes

PipelineConfig

themap.config.PipelineConfig dataclass

Main configuration for the THEMAP pipeline.

Example YAML:

data:
  directory: "datasets/TDC"
  task_list: "tasks.json"  # Optional

distances:
  molecule:
    enabled: true
    featurizer: "ecfp"
    method: "euclidean"
  protein:
    enabled: false
    featurizer: "esm2_t33_650M_UR50D"
    method: "cosine"

combination:
  strategy: "weighted_average"
  weights:
    molecule: 0.7
    protein: 0.3

output:
  directory: "output/"
  save_features: true

compute:
  n_jobs: 8
  batch_size: 1000

from_yaml classmethod

from_yaml(path: Union[str, Path]) -> PipelineConfig

Load configuration from a YAML file.

Parameters:

Name Type Description Default
path Union[str, Path]

Path to the YAML configuration file.

required

Returns:

Type Description
PipelineConfig

PipelineConfig instance.

from_dict classmethod

from_dict(config_dict: Dict[str, Any]) -> PipelineConfig

Create configuration from a dictionary.

Parameters:

Name Type Description Default
config_dict Dict[str, Any]

Dictionary with configuration values.

required

Returns:

Type Description
PipelineConfig

PipelineConfig instance.

to_dict

to_dict() -> Dict[str, Any]

Convert configuration to dictionary.

to_yaml

to_yaml(path: Union[str, Path]) -> None

Save configuration to a YAML file.

Parameters:

Name Type Description Default
path Union[str, Path]

Path to save the YAML file.

required

validate

validate() -> List[str]

Validate configuration and return list of warnings/errors.

Returns:

Type Description
List[str]

List of warning/error messages.

DataConfig

from themap.config import DataConfig
from pathlib import Path

data_config = DataConfig(
    directory=Path("datasets"),
    task_list=["CHEMBL123", "CHEMBL456"],  # Optional: specific tasks
    source_fold="train",
    target_fold="test",
)

MoleculeDistanceConfig

from themap.config import MoleculeDistanceConfig

mol_config = MoleculeDistanceConfig(
    enabled=True,
    featurizer="ecfp",      # Fingerprint type
    method="euclidean",     # Distance method
    cache_features=True,    # Cache computed features
)

ProteinDistanceConfig

from themap.config import ProteinDistanceConfig

prot_config = ProteinDistanceConfig(
    enabled=True,
    featurizer="esm2_t33_650M_UR50D",
    method="euclidean",
)

OutputConfig

from themap.config import OutputConfig
from pathlib import Path

output_config = OutputConfig(
    directory=Path("output"),
    format="csv",           # csv, json, or parquet
    save_features=True,     # Save computed features
    save_matrices=True,     # Save distance matrices
)

ComputeConfig

from themap.config import ComputeConfig

compute_config = ComputeConfig(
    n_jobs=8,               # Parallel workers
    device="auto",          # auto, cpu, or cuda
    batch_size=1000,        # Batch size for processing
)

Configuration from YAML

Loading Configuration

from themap.config import PipelineConfig

# Load from file
config = PipelineConfig.from_yaml("config.yaml")

# Validate configuration
issues = config.validate()
if issues:
    for issue in issues:
        print(f"Warning: {issue}")

Saving Configuration

from themap.config import PipelineConfig

config = PipelineConfig(...)

# Save to file
config.to_yaml("my_config.yaml")

Output Files

The pipeline generates these output files:

output/
├── molecule_distances.csv       # Distance matrix
├── molecule_distances.json      # JSON format (if enabled)
├── features/                    # Cached features (if enabled)
│   ├── ecfp_source.npz
│   └── ecfp_target.npz
└── pipeline_summary.json        # Execution summary

Distance Matrix Format (CSV)

,CHEMBL111111,CHEMBL222222,CHEMBL333333
CHEMBL123456,0.75,1.23,0.89
CHEMBL789012,1.45,0.67,1.12

Summary File

{
    "pipeline_name": "molecule_distance",
    "execution_time": "2.34s",
    "datasets_processed": {
        "source": 10,
        "target": 3
    },
    "distance_computations": 30,
    "config": {...}
}

Advanced Usage

Custom Pipeline Steps

from themap import Pipeline, PipelineConfig

class CustomPipeline(Pipeline):
    def run(self):
        # Pre-processing
        self.validate_data()

        # Run standard pipeline
        results = super().run()

        # Post-processing
        results = self.analyze_results(results)

        return results

    def validate_data(self):
        """Custom validation logic."""
        pass

    def analyze_results(self, results):
        """Custom analysis."""
        return results

Combining Multiple Runs

from themap import quick_distance

# Run with different methods
methods = ["euclidean", "cosine"]
all_results = {}

for method in methods:
    results = quick_distance(
        data_dir="datasets",
        molecule_method=method,
        output_dir=f"output_{method}",
    )
    all_results[method] = results

# Compare methods
for method, results in all_results.items():
    print(f"\n{method.upper()} distances:")
    # Analyze results...

Incremental Processing

from themap import Pipeline, PipelineConfig

# Process in batches
config = PipelineConfig(...)
pipeline = Pipeline(config)

# Get task lists
source_tasks = pipeline.get_source_tasks()
target_tasks = pipeline.get_target_tasks()

# Process incrementally
batch_size = 10
all_results = {}

for i in range(0, len(target_tasks), batch_size):
    batch_targets = target_tasks[i:i+batch_size]

    batch_results = pipeline.run_for_targets(batch_targets)
    all_results.update(batch_results)

    print(f"Processed {i+batch_size}/{len(target_tasks)} targets")

Error Handling

from themap import run_pipeline
from themap.pipeline import PipelineError

try:
    results = run_pipeline("config.yaml")
except FileNotFoundError as e:
    print(f"Configuration file not found: {e}")
except PipelineError as e:
    print(f"Pipeline execution failed: {e}")
except Exception as e:
    print(f"Unexpected error: {e}")

Performance Tips

  1. Use caching: Set save_features: true for repeated runs
  2. Choose fast methods: Use euclidean for exploration, otdd for final analysis
  3. Parallel processing: Increase n_jobs for multi-core systems
  4. GPU acceleration: Use device: cuda for protein featurizers
# Optimized configuration
compute:
  n_jobs: 16
  device: "cuda"
  batch_size: 2000

output:
  save_features: true  # Cache for reuse