Performance Optimization
This tutorial covers best practices for optimizing THEMAP performance when working with large datasets and complex distance computations.
Overview
THEMAP offers several strategies to optimize performance:
- Method selection: Choose appropriate distance metrics
- Caching: Leverage feature caching for repeated computations
- Memory management: Handle large datasets efficiently
- Parallel processing: Utilize multiple cores when possible
- Data preprocessing: Optimize data loading and storage
Distance Method Performance
Speed Comparison
Method | Speed | Memory | Accuracy | Best For |
---|---|---|---|---|
Euclidean | β‘β‘β‘ | πΎ | ββ | Initial exploration, large datasets |
Cosine | β‘β‘β‘ | πΎ | βββ | High-dimensional features |
OTDD | β‘ | πΎπΎπΎ | βββββ | Detailed analysis, small-medium datasets |
Choosing the Right Method
import time
from themap.distance import MoleculeDatasetDistance
from themap.data import MoleculeDataset
def benchmark_methods(datasets, methods=["euclidean", "cosine", "otdd"]):
"""Benchmark different distance methods."""
results = {}
for method in methods:
print(f"Benchmarking {method}...")
start_time = time.time()
try:
distance_calc = MoleculeDatasetDistance(
tasks=None,
molecule_method=method
)
distance_calc.source_molecule_datasets = datasets[:2]
distance_calc.target_molecule_datasets = datasets[:2]
distance_calc.source_task_ids = [d.task_id for d in datasets[:2]]
distance_calc.target_task_ids = [d.task_id for d in datasets[:2]]
distances = distance_calc.get_distance()
elapsed = time.time() - start_time
results[method] = {
'time': elapsed,
'success': True,
'distances': distances
}
print(f" β
{method}: {elapsed:.2f}s")
except Exception as e:
elapsed = time.time() - start_time
results[method] = {
'time': elapsed,
'success': False,
'error': str(e)
}
print(f" β {method}: Failed after {elapsed:.2f}s")
return results
# Load sample datasets
datasets = [
MoleculeDataset.load_from_file(f"datasets/train/{task_id}.jsonl.gz")
for task_id in ["CHEMBL1023359", "CHEMBL1613776"]
]
# Run benchmark
benchmark_results = benchmark_methods(datasets)
Caching Strategies
Feature Caching
from themap.data.tasks import Tasks
# Enable comprehensive caching
tasks = Tasks.from_directory(
directory="datasets/",
task_list_file="datasets/sample_tasks_list.json",
load_molecules=True,
load_proteins=True,
cache_dir="cache/", # All computed features cached here
force_reload=False # Use existing cache when available
)
print("Features will be cached for future use")
Custom Cache Management
import os
import shutil
from pathlib import Path
def manage_cache(cache_dir="cache/"):
"""Manage THEMAP cache directory."""
cache_path = Path(cache_dir)
if cache_path.exists():
# Get cache size
total_size = sum(
f.stat().st_size for f in cache_path.rglob('*') if f.is_file()
)
print(f"Cache directory: {cache_dir}")
print(f"Cache size: {total_size / (1024**3):.2f} GB")
print(f"Cached files: {len(list(cache_path.rglob('*')))}")
# List cache contents
for subdir in cache_path.iterdir():
if subdir.is_dir():
n_files = len(list(subdir.rglob('*')))
print(f" {subdir.name}: {n_files} files")
else:
print(f"Cache directory {cache_dir} does not exist")
def clear_cache(cache_dir="cache/", confirm=True):
"""Clear the cache directory."""
if confirm:
response = input(f"Clear cache directory {cache_dir}? (y/N): ")
if response.lower() != 'y':
return
cache_path = Path(cache_dir)
if cache_path.exists():
shutil.rmtree(cache_path)
print(f"Cache cleared: {cache_dir}")
else:
print(f"Cache directory {cache_dir} does not exist")
# Example usage
manage_cache()
Memory Management
Large Dataset Handling
def process_large_datasets(dataset_dir, batch_size=5, method="euclidean"):
"""Process large numbers of datasets in batches."""
from pathlib import Path
import gc
# Find all dataset files
dataset_files = list(Path(dataset_dir).glob("*.jsonl.gz"))
print(f"Found {len(dataset_files)} datasets")
# Process in batches
all_results = {}
for i in range(0, len(dataset_files), batch_size):
batch_files = dataset_files[i:i + batch_size]
print(f"Processing batch {i//batch_size + 1}: {len(batch_files)} files")
# Load batch
batch_datasets = []
for file_path in batch_files:
try:
dataset = MoleculeDataset.load_from_file(str(file_path))
batch_datasets.append(dataset)
except Exception as e:
print(f"Failed to load {file_path}: {e}")
if len(batch_datasets) < 2:
continue
# Compute distances for this batch
try:
distance_calc = MoleculeDatasetDistance(
tasks=None,
molecule_method=method
)
distance_calc.source_molecule_datasets = batch_datasets
distance_calc.target_molecule_datasets = batch_datasets
distance_calc.source_task_ids = [d.task_id for d in batch_datasets]
distance_calc.target_task_ids = [d.task_id for d in batch_datasets]
batch_results = distance_calc.get_distance()
all_results.update(batch_results)
print(f" β
Computed {len(batch_results)} distances")
except Exception as e:
print(f" β Batch failed: {e}")
# Clean up memory
del batch_datasets
gc.collect()
return all_results
# Process large dataset directory
results = process_large_datasets("datasets/train/", batch_size=3)
Memory Monitoring
import psutil
import os
def monitor_memory():
"""Monitor current memory usage."""
process = psutil.Process(os.getpid())
memory_info = process.memory_info()
print(f"RSS Memory: {memory_info.rss / (1024**3):.2f} GB")
print(f"VMS Memory: {memory_info.vms / (1024**3):.2f} GB")
# System memory
system_memory = psutil.virtual_memory()
print(f"System Memory: {system_memory.percent}% used")
return memory_info
def memory_efficient_processing(tasks, max_memory_gb=8):
"""Process tasks with memory monitoring."""
initial_memory = monitor_memory()
# Estimate memory per task
sample_memory = initial_memory.rss / (1024**3)
# Calculate safe batch size
available_memory = max_memory_gb - sample_memory
estimated_memory_per_task = 0.1 # GB, adjust based on your data
safe_batch_size = max(1, int(available_memory / estimated_memory_per_task))
print(f"Using batch size: {safe_batch_size}")
# Process in safe batches
task_ids = tasks.get_task_ids()
for i in range(0, len(task_ids), safe_batch_size):
batch_ids = task_ids[i:i + safe_batch_size]
print(f"Processing batch {i//safe_batch_size + 1}")
monitor_memory()
# Process batch here
# ... your processing code ...
# Force garbage collection
import gc
gc.collect()
# Example usage
# memory_efficient_processing(tasks, max_memory_gb=16)
Parallel Processing
Multi-Processing for Distance Computation
from concurrent.futures import ProcessPoolExecutor, as_completed
import multiprocessing as mp
def compute_single_distance(args):
"""Compute distance between two datasets (for multiprocessing)."""
source_path, target_path, method = args
try:
# Load datasets
source = MoleculeDataset.load_from_file(source_path)
target = MoleculeDataset.load_from_file(target_path)
# Compute distance
distance_calc = MoleculeDatasetDistance(
tasks=None,
molecule_method=method
)
distance_calc.source_molecule_datasets = [source]
distance_calc.target_molecule_datasets = [target]
distance_calc.source_task_ids = [source.task_id]
distance_calc.target_task_ids = [target.task_id]
result = distance_calc.get_distance()
return {
'source': source.task_id,
'target': target.task_id,
'distance': list(result.values())[0][source.task_id],
'status': 'success'
}
except Exception as e:
return {
'source': str(source_path),
'target': str(target_path),
'distance': None,
'status': f'error: {str(e)}'
}
def parallel_distance_computation(
source_files,
target_files,
method="euclidean",
max_workers=None
):
"""Compute distances in parallel."""
if max_workers is None:
max_workers = min(mp.cpu_count(), 8) # Don't use all cores
# Create all combinations
tasks = [
(source, target, method)
for source in source_files
for target in target_files
]
print(f"Computing {len(tasks)} distances using {max_workers} workers")
results = []
completed = 0
with ProcessPoolExecutor(max_workers=max_workers) as executor:
# Submit all tasks
future_to_task = {
executor.submit(compute_single_distance, task): task
for task in tasks
}
# Collect results
for future in as_completed(future_to_task):
result = future.result()
results.append(result)
completed += 1
if completed % 10 == 0:
print(f"Completed: {completed}/{len(tasks)}")
return results
# Example usage
source_files = ["datasets/train/CHEMBL1023359.jsonl.gz"]
target_files = ["datasets/test/CHEMBL2219358.jsonl.gz", "datasets/test/CHEMBL1963831.jsonl.gz"]
parallel_results = parallel_distance_computation(
source_files,
target_files,
method="euclidean",
max_workers=4
)
Data Preprocessing Optimization
Efficient Data Loading
def optimize_data_loading(data_dir, cache_dir="cache/"):
"""Optimize data loading with preprocessing."""
from pathlib import Path
import pickle
cache_path = Path(cache_dir)
cache_path.mkdir(exist_ok=True)
# Check for preprocessed data
preprocessed_file = cache_path / "preprocessed_datasets.pkl"
if preprocessed_file.exists():
print("Loading preprocessed datasets...")
with open(preprocessed_file, 'rb') as f:
return pickle.load(f)
print("Preprocessing datasets...")
# Load and preprocess all datasets
datasets = {}
dataset_files = list(Path(data_dir).glob("*.jsonl.gz"))
for i, file_path in enumerate(dataset_files):
try:
dataset = MoleculeDataset.load_from_file(str(file_path))
# Precompute features if needed
# This could include molecular descriptors, embeddings, etc.
datasets[dataset.task_id] = dataset
if (i + 1) % 10 == 0:
print(f"Processed {i + 1}/{len(dataset_files)} datasets")
except Exception as e:
print(f"Failed to load {file_path}: {e}")
# Cache preprocessed data
with open(preprocessed_file, 'wb') as f:
pickle.dump(datasets, f)
print(f"Preprocessed {len(datasets)} datasets")
return datasets
# Load optimized datasets
datasets = optimize_data_loading("datasets/train/")
Feature Pre-computation
def precompute_features(tasks, force_recompute=False):
"""Precompute and cache molecular/protein features."""
print("Precomputing features...")
# This will trigger feature computation and caching
task_distance = TaskDistance(
tasks=tasks,
molecule_method="euclidean", # Fast method for precomputation
protein_method="euclidean"
)
# Force feature computation by attempting distance calculation
try:
# This will compute and cache all features
sample_distances = task_distance.get_molecule_distances()
print("β
Molecular features precomputed")
sample_distances = task_distance.get_protein_distances()
print("β
Protein features precomputed")
except Exception as e:
print(f"β οΈ Feature precomputation failed: {e}")
print("Feature precomputation complete")
# Precompute features for faster subsequent operations
# precompute_features(tasks)
OTDD Optimization
OTDD Parameter Tuning
def optimize_otdd_parameters(datasets, max_samples_range=[100, 500, 1000]):
"""Find optimal OTDD parameters for your datasets."""
import time
results = {}
for max_samples in max_samples_range:
print(f"Testing OTDD with max_samples={max_samples}")
start_time = time.time()
try:
distance_calc = MoleculeDatasetDistance(
tasks=None,
molecule_method="otdd"
)
# OTDD parameters are handled internally
# But you can influence performance by dataset size
distance_calc.source_molecule_datasets = datasets[:2]
distance_calc.target_molecule_datasets = datasets[:2]
distance_calc.source_task_ids = [d.task_id for d in datasets[:2]]
distance_calc.target_task_ids = [d.task_id for d in datasets[:2]]
distances = distance_calc.get_distance()
elapsed = time.time() - start_time
results[max_samples] = {
'time': elapsed,
'success': True,
'distances': distances
}
print(f" β
Completed in {elapsed:.2f}s")
except Exception as e:
elapsed = time.time() - start_time
results[max_samples] = {
'time': elapsed,
'success': False,
'error': str(e)
}
print(f" β Failed after {elapsed:.2f}s: {e}")
return results
# Find optimal parameters
# otdd_results = optimize_otdd_parameters(datasets)
Performance Monitoring
Benchmarking Suite
import time
import psutil
import os
class PerformanceBenchmark:
"""Comprehensive performance benchmarking for THEMAP operations."""
def __init__(self):
self.results = {}
def benchmark_distance_methods(self, datasets, methods=None):
"""Benchmark different distance methods."""
if methods is None:
methods = ["euclidean", "cosine"] # Skip OTDD for speed
for method in methods:
print(f"\nπ Benchmarking {method}...")
# Memory before
memory_before = self._get_memory_usage()
start_time = time.time()
try:
distance_calc = MoleculeDatasetDistance(
tasks=None,
molecule_method=method
)
distance_calc.source_molecule_datasets = datasets
distance_calc.target_molecule_datasets = datasets
distance_calc.source_task_ids = [d.task_id for d in datasets]
distance_calc.target_task_ids = [d.task_id for d in datasets]
distances = distance_calc.get_distance()
# Calculate metrics
elapsed = time.time() - start_time
memory_after = self._get_memory_usage()
memory_used = memory_after - memory_before
n_comparisons = sum(len(d) for d in distances.values())
self.results[method] = {
'time': elapsed,
'memory_mb': memory_used,
'comparisons': n_comparisons,
'comparisons_per_second': n_comparisons / elapsed,
'status': 'success'
}
print(f" β
Time: {elapsed:.2f}s")
print(f" πΎ Memory: {memory_used:.1f}MB")
print(f" π {n_comparisons} comparisons ({n_comparisons/elapsed:.1f}/s)")
except Exception as e:
elapsed = time.time() - start_time
self.results[method] = {
'time': elapsed,
'memory_mb': 0,
'comparisons': 0,
'comparisons_per_second': 0,
'status': f'failed: {str(e)}'
}
print(f" β Failed: {e}")
def _get_memory_usage(self):
"""Get current memory usage in MB."""
process = psutil.Process(os.getpid())
return process.memory_info().rss / (1024 * 1024)
def print_summary(self):
"""Print benchmark summary."""
print("\nπ Benchmark Summary:")
print("-" * 80)
print(f"{'Method':<12} {'Time (s)':<10} {'Memory (MB)':<12} {'Comp/sec':<10} {'Status'}")
print("-" * 80)
for method, results in self.results.items():
print(f"{method:<12} {results['time']:<10.2f} {results['memory_mb']:<12.1f} "
f"{results['comparisons_per_second']:<10.1f} {results['status']}")
# Recommendations
successful = {k: v for k, v in self.results.items() if v['status'] == 'success'}
if successful:
fastest = min(successful.items(), key=lambda x: x[1]['time'])
most_efficient = min(successful.items(), key=lambda x: x[1]['memory_mb'])
print(f"\nπ Recommendations:")
print(f"β‘ Fastest: {fastest[0]} ({fastest[1]['time']:.2f}s)")
print(f"πΎ Most memory efficient: {most_efficient[0]} ({most_efficient[1]['memory_mb']:.1f}MB)")
# Run comprehensive benchmark
benchmark = PerformanceBenchmark()
# benchmark.benchmark_distance_methods(datasets[:3]) # Test with 3 datasets
# benchmark.print_summary()
Best Practices Summary
Quick Optimization Checklist
- β Use caching: Always specify a cache directory
- β Choose appropriate methods: Start with euclidean for exploration
- β Process in batches: Handle large datasets in smaller chunks
- β Monitor memory: Keep track of memory usage during processing
- β Parallel processing: Use multiple cores for independent computations
- β Precompute features: Cache expensive feature computations
- β Profile your workflow: Identify bottlenecks with benchmarking
Method Selection Guidelines
def choose_distance_method(n_datasets, dataset_sizes, time_budget_minutes=60):
"""Helper function to choose appropriate distance method."""
total_comparisons = n_datasets * (n_datasets - 1) // 2
avg_dataset_size = sum(dataset_sizes) / len(dataset_sizes)
print(f"Analysis: {total_comparisons} comparisons, avg {avg_dataset_size:.0f} molecules/dataset")
if total_comparisons > 100 or avg_dataset_size > 1000:
recommendation = "euclidean"
reason = "Large number of comparisons or large datasets"
elif total_comparisons > 50:
recommendation = "cosine"
reason = "Moderate number of comparisons"
elif time_budget_minutes > 30:
recommendation = "otdd"
reason = "High accuracy needed and sufficient time budget"
else:
recommendation = "euclidean"
reason = "Limited time budget"
print(f"Recommended method: {recommendation}")
print(f"Reason: {reason}")
return recommendation
# Example usage
# method = choose_distance_method(
# n_datasets=10,
# dataset_sizes=[250, 180, 500, 300],
# time_budget_minutes=30
# )
This comprehensive guide should help you optimize THEMAP performance for your specific use case. Start with the basic optimizations and gradually apply more advanced techniques as needed.