DR. ATABAK KH
Cloud Platform Modernization Architect specializing in transforming legacy systems into reliable, observable, and cost-efficient Cloud platforms.
Certified: Google Professional Cloud Architect, AWS Solutions Architect, MapR Cluster Administrator
Takeaway: For robust GO prediction, start with homology + PLM baselines, add label smoothing on PPI, and only then graduate to GNNs/multimodal fusion.
is_a, part_of.1) Homology: BLAST/DIAMOND kNN; domain HMMs (Pfam).
2) Sequence embeddings: frozen PLMs (e.g., ESM-like, 1-3k dims).
3) Structure: DSSP features, secondary structure, solvent accessibility; optional 3D graph embeddings.
4) Networks: PPI/co-expression; A adjacency for smoothing or GNN.
5) Text: weak supervision from abstracts/full-text.
(a) Logistic baseline on PLM embeddings (balanced, calibrated)
(b) One-step label smoothing on normalized PPI
(c) Hierarchy closure + per-class threshold tuning (Fmax)
# X: PLM embeddings [N, D], Y: multi-hot GO [N, C], A: normalized PPI
clf = OneVsRestClassifier(LogisticRegression(max_iter=4000, class_weight="balanced"))
clf.fit(X_tr, Y_tr)
P0 = clf.predict_proba(X_val)
alpha = 0.2
P1 = (1 - alpha) * P0 + alpha * (A @ P0) # one-step smoothing
P1 = np.clip(P1, 0, 1)
P1 = close_under_ancestors(P1, go_dag) # hierarchy consistency
th = tune_thresholds(P1, Y_val, metric="Fmax") # per-class thresholds
Y_hat = (P1 >= th).astype(int)
C=1.0, class_weight="balanced", solver="lbfgs", max_iter=4000.alpha = 0.1-0.3 (avoid oversmoothing hubs).[0.05..0.95].Bottom line: this pipeline is simple, strong, and extensible; add GNNs (GCN/GAT) or multimodal fusion when the baseline plateaus.
Homology-based features (BLAST/DIAMOND):
from Bio.Blast import NCBIWWW, NCBIXML
import subprocess
def get_blast_homologs(sequence: str, database: str = "nr", evalue: float = 1e-5):
"""
Get homologous proteins via BLAST.
Returns:
homolog_ids: List of UniProt IDs with significant hits
scores: List of E-values
"""
result = NCBIWWW.qblast("blastp", database, sequence, expect=evalue)
blast_record = NCBIXML.read(result)
homolog_ids = []
scores = []
for alignment in blast_record.alignments:
for hsp in alignment.hsps:
if hsp.expect < evalue:
homolog_ids.append(alignment.title.split('|')[1]) # UniProt ID
scores.append(hsp.expect)
return homolog_ids, scores
def transfer_labels_from_homologs(protein_id: str, homolog_ids: list, go_annotations: dict):
"""
Transfer GO labels from homologous proteins.
Returns:
transferred_labels: Set of GO terms from homologs
"""
transferred_labels = set()
for homolog_id in homolog_ids:
if homolog_id in go_annotations:
transferred_labels.update(go_annotations[homolog_id])
return transferred_labels
PLM embeddings (ESM2):
from transformers import EsmModel, EsmTokenizer
import torch
def get_esm2_embedding(sequence: str, model_name: str = "facebook/esm2_t33_650M_UR50D"):
"""
Get ESM2 embedding for a protein sequence.
Returns:
embedding: numpy array [D] of L2-normalized embedding
"""
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
tokenizer = EsmTokenizer.from_pretrained(model_name)
model = EsmModel.from_pretrained(model_name).to(device)
model.eval()
# Tokenize
encoded = tokenizer(sequence, return_tensors="pt", truncation=True, max_length=1024).to(device)
# Get embedding (mean pooling)
with torch.no_grad():
outputs = model(**encoded)
embedding = outputs.last_hidden_state.mean(dim=1).squeeze()
embedding = embedding / embedding.norm() # L2 normalize
return embedding.cpu().numpy()
Structure features (DSSP):
from Bio.PDB import PDBParser
from Bio.PDB.DSSP import DSSP
def extract_structure_features(pdb_file: str):
"""
Extract secondary structure and solvent accessibility from PDB.
Returns:
features: Dictionary with SS, SA, etc.
"""
parser = PDBParser(QUIET=True)
structure = parser.get_structure('protein', pdb_file)
model = structure[0]
dssp = DSSP(model, pdb_file)
features = {
'secondary_structure': [],
'solvent_accessibility': [],
'phi': [],
'psi': []
}
for residue in dssp:
features['secondary_structure'].append(residue[2]) # H, E, C
features['solvent_accessibility'].append(residue[3]) # RSA
features['phi'].append(residue[4])
features['psi'].append(residue[5])
return features
One-step label smoothing implementation:
import numpy as np
import scipy.sparse as sp
def label_smoothing_on_ppi(
P0: np.ndarray,
A: sp.csr_matrix,
alpha: float = 0.2
):
"""
Apply one-step label smoothing on PPI network.
Args:
P0: [N, C] initial predictions (probabilities)
A: [N, N] normalized PPI adjacency matrix
alpha: Smoothing coefficient (0.2 = 20% from neighbors)
Returns:
P1: [N, C] smoothed predictions
"""
# Normalize adjacency (row-stochastic)
A_normalized = A.copy()
row_sums = np.array(A.sum(axis=1)).flatten()
row_sums[row_sums == 0] = 1.0 # Avoid division by zero
A_normalized = A_normalized.multiply(1.0 / row_sums[:, np.newaxis])
# One-step smoothing: (1-alpha) * P0 + alpha * A @ P0
P_smoothed = A_normalized @ P0
P1 = (1 - alpha) * P0 + alpha * P_smoothed
# Clip to [0, 1]
P1 = np.clip(P1, 0, 1)
return P1
Why it works:
Tuning alpha:
Ancestor closure (from previous article):
def close_under_ancestors(P: np.ndarray, go_dag, threshold: float = 0.5):
"""
Ensure predictions respect GO hierarchy.
Args:
P: [N, C] class probabilities
go_dag: GO DAG object
threshold: Threshold for binary predictions
Returns:
Y_closed: [N, C] ancestor-closed binary labels
"""
Y = (P >= threshold).astype(np.uint8)
# Topological order (parents before children)
order = go_dag.topo_order()
# Visit children before parents (reverse order)
for term_idx in reversed(order):
for ancestor_idx in go_dag.ancestors(term_idx):
# If child is on, force ancestor on
Y[:, ancestor_idx] = np.maximum(Y[:, ancestor_idx], Y[:, term_idx])
return Y
Per-class threshold optimization:
from sklearn.metrics import f1_score
def tune_per_class_thresholds(
P: np.ndarray,
Y_true: np.ndarray,
metric: str = "f1",
thresholds: np.ndarray = None
):
"""
Find optimal per-class thresholds for Fmax.
Returns:
best_thresholds: [C] array of optimal thresholds per class
best_fmax: Maximum F1 score achieved
"""
if thresholds is None:
thresholds = np.arange(0.05, 0.95, 0.01)
num_classes = Y_true.shape[1]
best_thresholds = np.zeros(num_classes)
best_fmax = 0.0
for class_idx in range(num_classes):
if Y_true[:, class_idx].sum() == 0:
continue # Skip classes with no positives
best_f1 = 0.0
best_th = 0.5
for th in thresholds:
y_pred = (P[:, class_idx] >= th).astype(int)
f1 = f1_score(Y_true[:, class_idx], y_pred, zero_division=0)
if f1 > best_f1:
best_f1 = f1
best_th = th
best_thresholds[class_idx] = best_th
# Compute Fmax with best thresholds
Y_pred = (P >= best_thresholds).astype(int)
fmax = compute_fmax(Y_true, Y_pred)
return best_thresholds, fmax
GCN for GO prediction:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv
class GCNForGO(nn.Module):
def __init__(self, input_dim, hidden_dim, num_classes, num_layers=2):
super().__init__()
self.convs = nn.ModuleList()
self.convs.append(GCNConv(input_dim, hidden_dim))
for _ in range(num_layers - 2):
self.convs.append(GCNConv(hidden_dim, hidden_dim))
self.convs.append(GCNConv(hidden_dim, num_classes))
self.dropout = nn.Dropout(0.5)
def forward(self, x, edge_index):
"""
Args:
x: [N, D] node features (PLM embeddings)
edge_index: [2, E] edge indices (PPI network)
"""
for i, conv in enumerate(self.convs[:-1]):
x = conv(x, edge_index)
x = F.relu(x)
x = self.dropout(x)
x = self.convs[-1](x, edge_index)
return torch.sigmoid(x) # Multi-label classification
Training loop:
def train_gnn(model, train_loader, val_loader, num_epochs=100):
"""
Train GCN for GO prediction.
"""
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.BCELoss()
best_fmax = 0.0
for epoch in range(num_epochs):
# Training
model.train()
train_loss = 0.0
for batch in train_loader:
optimizer.zero_grad()
out = model(batch.x, batch.edge_index)
loss = criterion(out, batch.y)
loss.backward()
optimizer.step()
train_loss += loss.item()
# Validation
model.eval()
val_predictions = []
val_labels = []
with torch.no_grad():
for batch in val_loader:
out = model(batch.x, batch.edge_index)
val_predictions.append(out.cpu().numpy())
val_labels.append(batch.y.cpu().numpy())
val_predictions = np.vstack(val_predictions)
val_labels = np.vstack(val_labels)
fmax, _ = compute_fmax(val_labels, val_predictions)
if fmax > best_fmax:
best_fmax = fmax
torch.save(model.state_dict(), 'best_model.pth')
print(f"Epoch {epoch}: Train Loss = {train_loss:.4f}, Val Fmax = {fmax:.4f}")
Late fusion of multiple features:
def multimodal_fusion(
plm_embeddings: np.ndarray,
structure_features: np.ndarray,
homology_scores: np.ndarray,
ppi_embeddings: np.ndarray
):
"""
Combine multiple feature modalities.
Returns:
fused_features: [N, D] combined feature vector
"""
# Normalize each modality
plm_norm = plm_embeddings / np.linalg.norm(plm_embeddings, axis=1, keepdim=True)
struct_norm = structure_features / (np.linalg.norm(structure_features, axis=1, keepdim=True) + 1e-10)
homology_norm = homology_scores / (np.linalg.norm(homology_scores, axis=1, keepdim=True) + 1e-10)
ppi_norm = ppi_embeddings / (np.linalg.norm(ppi_embeddings, axis=1, keepdim=True) + 1e-10)
# Concatenate or weighted average
# Option 1: Concatenate
fused = np.concatenate([plm_norm, struct_norm, homology_norm, ppi_norm], axis=1)
# Option 2: Weighted average (if same dimension)
# weights = [0.5, 0.2, 0.2, 0.1] # PLM gets most weight
# fused = weights[0] * plm_norm + weights[1] * struct_norm + ...
return fused
Early fusion (learned):
class MultimodalFusion(nn.Module):
def __init__(self, plm_dim, struct_dim, homology_dim, ppi_dim, hidden_dim):
super().__init__()
self.plm_proj = nn.Linear(plm_dim, hidden_dim)
self.struct_proj = nn.Linear(struct_dim, hidden_dim)
self.homology_proj = nn.Linear(homology_dim, hidden_dim)
self.ppi_proj = nn.Linear(ppi_dim, hidden_dim)
# Learnable fusion weights
self.fusion_weights = nn.Parameter(torch.ones(4) / 4)
def forward(self, plm, struct, homology, ppi):
# Project each modality to same dimension
plm_proj = F.relu(self.plm_proj(plm))
struct_proj = F.relu(self.struct_proj(struct))
homology_proj = F.relu(self.homology_proj(homology))
ppi_proj = F.relu(self.ppi_proj(ppi))
# Weighted combination
weights = F.softmax(self.fusion_weights, dim=0)
fused = (weights[0] * plm_proj +
weights[1] * struct_proj +
weights[2] * homology_proj +
weights[3] * ppi_proj)
return fused
End-to-end pipeline:
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.multiclass import OneVsRestClassifier
# 1. Load data
sequences = load_sequences("data/uniprot.fasta")
go_annotations = load_go_annotations("data/goa.tsv")
ppi_network = load_ppi_network("data/string.tsv")
# 2. Generate features
print("Generating PLM embeddings...")
X_plm = np.array([get_esm2_embedding(seq) for seq in sequences])
print("Computing homology features...")
X_homology = np.array([get_blast_homologs(seq) for seq in sequences])
print("Building PPI adjacency...")
A = build_ppi_adjacency(ppi_network, sequences)
# 3. Train baseline
print("Training logistic regression...")
clf = OneVsRestClassifier(
LogisticRegression(max_iter=4000, class_weight="balanced", solver="lbfgs")
)
clf.fit(X_plm, Y_train)
P0 = clf.predict_proba(X_val)
# 4. Label smoothing
print("Applying label smoothing...")
alpha = 0.2
P1 = label_smoothing_on_ppi(P0, A, alpha=alpha)
P1 = np.clip(P1, 0, 1)
# 5. Hierarchy closure
print("Enforcing hierarchy consistency...")
P1 = close_under_ancestors(P1, go_dag)
# 6. Threshold tuning
print("Tuning thresholds...")
thresholds, fmax = tune_per_class_thresholds(P1, Y_val)
Y_pred = (P1 >= thresholds).astype(int)
# 7. Evaluate
print("Evaluating...")
fmax_bp, fmax_mf, fmax_cc = compute_fmax_by_ontology(Y_val, Y_pred, go_dag)
micro_auprc, macro_auprc = compute_auprc(Y_val, P1)
print(f"Fmax (BP): {fmax_bp:.3f}")
print(f"Fmax (MF): {fmax_mf:.3f}")
print(f"Fmax (CC): {fmax_cc:.3f}")
print(f"Micro AUPRC: {micro_auprc:.3f}")
print(f"Macro AUPRC: {macro_auprc:.3f}")
Stick with baseline if:
Upgrade to GNNs when:
GNN architecture selection:
Compare different approaches:
results = {}
# Baseline: PLM only
clf_plm = train_logistic(X_plm, Y_train)
P_plm = clf_plm.predict_proba(X_val)
results['PLM only'] = evaluate(Y_val, P_plm)
# + Homology
X_combined = np.concatenate([X_plm, X_homology], axis=1)
clf_hom = train_logistic(X_combined, Y_train)
P_hom = clf_hom.predict_proba(X_val)
results['PLM + Homology'] = evaluate(Y_val, P_hom)
# + PPI smoothing
P_smooth = label_smoothing_on_ppi(P_plm, A, alpha=0.2)
results['PLM + PPI smoothing'] = evaluate(Y_val, P_smooth)
# + Hierarchy closure
P_closed = close_under_ancestors(P_smooth, go_dag)
results['PLM + PPI + Hierarchy'] = evaluate(Y_val, P_closed)
# + Per-class thresholds
thresholds, _ = tune_per_class_thresholds(P_closed, Y_val)
Y_pred = (P_closed >= thresholds).astype(int)
results['Full pipeline'] = evaluate(Y_val, Y_pred)
# Print results table
print("\nAblation Results:")
print("=" * 60)
for method, metrics in results.items():
print(f"{method:30s} Fmax={metrics['fmax']:.3f} AUPRC={metrics['auprc']:.3f}")
Problem: Too many smoothing steps or high alpha blurs signal.
Solution:
Problem: Predictions violate ancestor-child relationships.
Solution:
Problem: Test set contains proteins from training time period.
Solution:
Bottom line: This pipeline is simple, strong, and extensible; add GNNs (GCN/GAT) or multimodal fusion when the baseline plateaus.