Pipeline Module¶
The pipeline module provides a high-level API for running distance computation workflows. It supports both programmatic and configuration-based approaches.
Overview¶
The pipeline system offers three levels of usage:
quick_distance- One-liner for simple computationsrun_pipeline- Configuration file-based executionPipeline- Full programmatic control
Quick Distance¶
The simplest way to compute distances:
from themap import quick_distance
results = quick_distance(
data_dir="datasets",
output_dir="output",
molecule_featurizer="ecfp",
molecule_method="euclidean",
)
Function Reference¶
themap.pipeline.orchestrator.quick_distance ¶
quick_distance(
data_dir: str,
output_dir: str = "output",
molecule_featurizer: str = "ecfp",
molecule_method: str = "euclidean",
n_jobs: int = 8,
) -> Dict[str, DistanceMatrix]
Quick distance computation with minimal configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data_dir
|
str
|
Path to data directory with train/test folders. |
required |
output_dir
|
str
|
Path to output directory. |
'output'
|
molecule_featurizer
|
str
|
Molecule featurizer name. |
'ecfp'
|
molecule_method
|
str
|
Distance method. |
'euclidean'
|
n_jobs
|
int
|
Number of parallel jobs. |
8
|
Returns:
| Type | Description |
|---|---|
Dict[str, DistanceMatrix]
|
Dictionary of distance matrices. |
Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
data_dir |
str | Required | Directory with train/test folders |
output_dir |
str | "output" |
Output directory for results |
molecule_featurizer |
str | "ecfp" |
Molecular fingerprint type |
molecule_method |
str | "euclidean" |
Distance metric |
n_jobs |
int | 8 |
Parallel workers |
Returns¶
Dictionary with distance matrices:
{
"molecule": {
"target_task_1": {
"source_task_1": 0.75,
"source_task_2": 1.23,
},
"target_task_2": {...}
}
}
Run Pipeline¶
Execute a pipeline from a YAML configuration file:
Function Reference¶
themap.pipeline.orchestrator.run_pipeline ¶
Convenience function to run pipeline from config file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config_path
|
str
|
Path to YAML config file. |
required |
Returns:
| Type | Description |
|---|---|
Dict[str, DistanceMatrix]
|
Dictionary of distance matrices. |
Configuration File Format¶
# config.yaml
data:
directory: "datasets"
task_list: null # Auto-discover tasks
molecule:
enabled: true
featurizer: "ecfp"
method: "euclidean"
protein:
enabled: false
output:
directory: "output"
format: "csv"
save_features: true
compute:
n_jobs: 8
device: "auto"
Pipeline Class¶
For full programmatic control:
Pipeline¶
themap.pipeline.orchestrator.Pipeline ¶
Main pipeline orchestrator for THEMAP distance computation.
Orchestrates the complete workflow: 1. Load datasets from train/test directories 2. Compute molecule features (with caching) 3. Compute protein features (with caching, if enabled) 4. Compute distance matrices 5. Combine matrices (if needed) 6. Save results to output directory
Attributes:
| Name | Type | Description |
|---|---|---|
config |
Pipeline configuration |
|
loader |
Dataset loader |
|
cache |
Optional[FeatureCache]
|
Feature cache (if save_features is enabled) |
mol_featurizer |
MoleculeFeaturizer
|
Molecule featurizer (if molecule distance enabled) |
prot_featurizer |
ProteinFeaturizer
|
Protein featurizer (if protein distance enabled) |
Examples:
>>> config = PipelineConfig.from_yaml("config.yaml")
>>> pipeline = Pipeline(config)
>>> results = pipeline.run()
>>> print(results["molecule"]) # molecule distance matrix
mol_featurizer
property
¶
Get molecule featurizer (lazy initialization).
prot_featurizer
property
¶
Get protein featurizer (lazy initialization).
__init__ ¶
Initialize the pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
PipelineConfig
|
Pipeline configuration |
required |
run ¶
Run the complete pipeline.
Returns:
| Type | Description |
|---|---|
Dict[str, DistanceMatrix]
|
Dictionary mapping distance type to distance matrix: |
Dict[str, DistanceMatrix]
|
|
Dict[str, DistanceMatrix]
|
|
Dict[str, DistanceMatrix]
|
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If no distance type is enabled |
Usage Example¶
from themap import Pipeline, PipelineConfig
from themap.config import (
DataConfig,
MoleculeDistanceConfig,
OutputConfig,
ComputeConfig,
)
from pathlib import Path
# Build configuration
config = PipelineConfig(
data=DataConfig(
directory=Path("datasets"),
task_list=None,
),
molecule=MoleculeDistanceConfig(
enabled=True,
featurizer="ecfp",
method="euclidean",
),
output=OutputConfig(
directory=Path("output"),
format="csv",
save_features=True,
),
compute=ComputeConfig(
n_jobs=8,
device="auto",
),
)
# Create and run pipeline
pipeline = Pipeline(config)
results = pipeline.run()
# Access results
print(f"Computed distances for {len(results['molecule'])} target tasks")
Configuration Classes¶
PipelineConfig¶
themap.config.PipelineConfig
dataclass
¶
Main configuration for the THEMAP pipeline.
Example YAML:
data:
directory: "datasets/TDC"
task_list: "tasks.json" # Optional
distances:
molecule:
enabled: true
featurizer: "ecfp"
method: "euclidean"
protein:
enabled: false
featurizer: "esm2_t33_650M_UR50D"
method: "cosine"
combination:
strategy: "weighted_average"
weights:
molecule: 0.7
protein: 0.3
output:
directory: "output/"
save_features: true
compute:
n_jobs: 8
batch_size: 1000
from_yaml
classmethod
¶
Load configuration from a YAML file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
Union[str, Path]
|
Path to the YAML configuration file. |
required |
Returns:
| Type | Description |
|---|---|
PipelineConfig
|
PipelineConfig instance. |
from_dict
classmethod
¶
Create configuration from a dictionary.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config_dict
|
Dict[str, Any]
|
Dictionary with configuration values. |
required |
Returns:
| Type | Description |
|---|---|
PipelineConfig
|
PipelineConfig instance. |
to_yaml ¶
Save configuration to a YAML file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
Union[str, Path]
|
Path to save the YAML file. |
required |
validate ¶
Validate configuration and return list of warnings/errors.
Returns:
| Type | Description |
|---|---|
List[str]
|
List of warning/error messages. |
DataConfig¶
from themap.config import DataConfig
from pathlib import Path
data_config = DataConfig(
directory=Path("datasets"),
task_list=["CHEMBL123", "CHEMBL456"], # Optional: specific tasks
source_fold="train",
target_fold="test",
)
MoleculeDistanceConfig¶
from themap.config import MoleculeDistanceConfig
mol_config = MoleculeDistanceConfig(
enabled=True,
featurizer="ecfp", # Fingerprint type
method="euclidean", # Distance method
cache_features=True, # Cache computed features
)
ProteinDistanceConfig¶
from themap.config import ProteinDistanceConfig
prot_config = ProteinDistanceConfig(
enabled=True,
featurizer="esm2_t33_650M_UR50D",
method="euclidean",
)
OutputConfig¶
from themap.config import OutputConfig
from pathlib import Path
output_config = OutputConfig(
directory=Path("output"),
format="csv", # csv, json, or parquet
save_features=True, # Save computed features
save_matrices=True, # Save distance matrices
)
ComputeConfig¶
from themap.config import ComputeConfig
compute_config = ComputeConfig(
n_jobs=8, # Parallel workers
device="auto", # auto, cpu, or cuda
batch_size=1000, # Batch size for processing
)
Configuration from YAML¶
Loading Configuration¶
from themap.config import PipelineConfig
# Load from file
config = PipelineConfig.from_yaml("config.yaml")
# Validate configuration
issues = config.validate()
if issues:
for issue in issues:
print(f"Warning: {issue}")
Saving Configuration¶
from themap.config import PipelineConfig
config = PipelineConfig(...)
# Save to file
config.to_yaml("my_config.yaml")
Output Files¶
The pipeline generates these output files:
output/
├── molecule_distances.csv # Distance matrix
├── molecule_distances.json # JSON format (if enabled)
├── features/ # Cached features (if enabled)
│ ├── ecfp_source.npz
│ └── ecfp_target.npz
└── pipeline_summary.json # Execution summary
Distance Matrix Format (CSV)¶
Summary File¶
{
"pipeline_name": "molecule_distance",
"execution_time": "2.34s",
"datasets_processed": {
"source": 10,
"target": 3
},
"distance_computations": 30,
"config": {...}
}
Advanced Usage¶
Custom Pipeline Steps¶
from themap import Pipeline, PipelineConfig
class CustomPipeline(Pipeline):
def run(self):
# Pre-processing
self.validate_data()
# Run standard pipeline
results = super().run()
# Post-processing
results = self.analyze_results(results)
return results
def validate_data(self):
"""Custom validation logic."""
pass
def analyze_results(self, results):
"""Custom analysis."""
return results
Combining Multiple Runs¶
from themap import quick_distance
# Run with different methods
methods = ["euclidean", "cosine"]
all_results = {}
for method in methods:
results = quick_distance(
data_dir="datasets",
molecule_method=method,
output_dir=f"output_{method}",
)
all_results[method] = results
# Compare methods
for method, results in all_results.items():
print(f"\n{method.upper()} distances:")
# Analyze results...
Incremental Processing¶
from themap import Pipeline, PipelineConfig
# Process in batches
config = PipelineConfig(...)
pipeline = Pipeline(config)
# Get task lists
source_tasks = pipeline.get_source_tasks()
target_tasks = pipeline.get_target_tasks()
# Process incrementally
batch_size = 10
all_results = {}
for i in range(0, len(target_tasks), batch_size):
batch_targets = target_tasks[i:i+batch_size]
batch_results = pipeline.run_for_targets(batch_targets)
all_results.update(batch_results)
print(f"Processed {i+batch_size}/{len(target_tasks)} targets")
Error Handling¶
from themap import run_pipeline
from themap.pipeline import PipelineError
try:
results = run_pipeline("config.yaml")
except FileNotFoundError as e:
print(f"Configuration file not found: {e}")
except PipelineError as e:
print(f"Pipeline execution failed: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
Performance Tips¶
- Use caching: Set
save_features: truefor repeated runs - Choose fast methods: Use
euclideanfor exploration,otddfor final analysis - Parallel processing: Increase
n_jobsfor multi-core systems - GPU acceleration: Use
device: cudafor protein featurizers