Ablation for approaches based on Beam Search with CaleyPy notebook by Dmytro Fedoriaka and Chervov, A. et al. “A Machine Learning Approach That Beats Large Rubik’s Cubes: The CayleyPy Project”. arXiv:2502.13266 [cs.LG]. 2025.
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
import time
import json
from datetime import datetime
import time
import gc
from tqdm import tqdm
from itertools import product
import shap
from sklearn.ensemble import RandomForestRegressor
import torch
from torch.utils.data import DataLoader, TensorDataset, random_split
from cayleypy import PermutationGroups, CayleyGraph, Predictor, prepare_graph
import warnings
warnings.filterwarnings('ignore')def seed_everything(seed: int = 42):
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
np.random.seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = Falsedevice = torch.device("cuda" if torch.cuda.is_available() else "cpu")DATA_ROOT = Path('../data')
assert DATA_ROOT.exists(), f'Dataset directory not found: {DATA_ROOT!s}'
TEST_PATH = DATA_ROOT / 'test.csv'
SUBMISSION_PATH = Path('submission.csv') # Kaggle expects this in the working directory
TEST_PATHtest_df = pd.read_csv(TEST_PATH)Baseline¶
def pancake_sort_path(perm: list[int]) -> list[str]:
"""Return a sequence of prefix reversals that sorts `perm` to the identity permutation."""
arr = list(perm)
n = len(arr)
moves: list[str] = []
for target in range(n, 1, -1):
desired_value = target - 1
idx = arr.index(desired_value)
if idx == target - 1:
continue # already in place
if idx != 0:
moves.append(f'R{idx + 1}')
arr[: idx + 1] = reversed(arr[: idx + 1])
moves.append(f'R{target}')
arr[:target] = reversed(arr[:target])
return movesModel¶
# model
class Net(torch.nn.Module):
passUtils¶
def train_one_epoch(model, X, y, learning_rate=0.001):
val_ratio = 0.1
batch_size = 512 #1024
dataset = TensorDataset(X, y.float())
val_size = int(len(dataset) * val_ratio)
train_size = len(dataset)-val_size
train_ds, val_ds = random_split(dataset, [train_size, val_size])
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=batch_size)
loss_fn = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
model.train()
total_train_loss = 0
for xb, yb in train_loader:
pred = model(xb)
loss = loss_fn(pred, yb)
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_train_loss += loss.item() * xb.size(0)
model.eval()
total_val_loss = 0
with torch.no_grad():
for xb, yb in val_loader:
pred = model(xb)
loss = loss_fn(pred, yb)
total_val_loss += loss.item() * xb.size(0)
avg_train_loss = total_train_loss / train_size
avg_val_loss = total_val_loss / val_size
return avg_train_loss, avg_val_loss
def train_n(n, width=1000, length=20, n_dim=128, epochs=30):
print(f"train for {n} permautations")
graph=CayleyGraph(PermutationGroups.pancake(n), device="cuda", random_seed=42)
X, y = graph.random_walks(width=width, length=min(length, 2*n), mode="bfs")
input_size = graph.definition.state_size
num_classes = n
learning_rate = 0.001
train_losses, val_losses = [], []
start_time = time.time()
model = Net(input_size, num_classes, [n_dim]).to(graph.device)
for i in range(epochs):
avg_train_loss, avg_val_loss = train_one_epoch(model, X, y)
train_losses.append(avg_train_loss)
val_losses.append(avg_val_loss)
print(f"Epoch {i} | Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f}")
end_time = time.time()
print(end_time - start_time)
sns.set_theme(style="whitegrid")
plt.figure(figsize=(10, 5))
sns.lineplot(x=range(epochs), y=train_losses, label='Train Loss', marker='o')
sns.lineplot(x=range(epochs), y=val_losses, label='Val Loss', marker='o')
plt.title(f'Loss for n={n}')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()
model = model.cpu()
if torch.cuda.is_available():
torch.cuda.empty_cache()
return model, graphdef find_prefix_length(generator_perm: list[int]) -> int:
n = len(generator_perm)
for i in range(1, n):
if generator_perm[i] != generator_perm[i-1] - 1:
return i
return n
def convert_to_rk_format(internal_path: list[int], graph: CayleyGraph) -> list[str]:
moves = []
generators = graph.definition.generators
for move_index in internal_path:
if 0 <= move_index < len(generators):
generator_perm = generators[move_index]
k = find_prefix_length(generator_perm)
moves.append(f'R{k}')
return moves
def solve(permutation: list[int], graph: CayleyGraph, model: torch.nn.Module, beam_width: int=10000) -> str:
start_state = np.array(permutation)
n = len(permutation)
result = graph.beam_search(
start_state=start_state,
beam_width=beam_width,
max_steps=3*n,
predictor=Predictor(graph, model),
return_path=True
)
if result.path_found:
moves = convert_to_rk_format(result.path, graph)
return '.'.join(moves)
else:
return 'UNSOLVED'def measure_success(predictor, beam_width, test_start_states, heurestic_path, graph, max_steps_multiplier=2):
success_count = 0
sum_length = 0
diff_length = 0
optimal_count = 0
for state, hp in tqdm(zip(test_start_states, heurestic_path), total=len(test_start_states), desc=f"Beam {beam_width}"):
try:
with torch.no_grad():
result = graph.beam_search(
start_state=state,
beam_width=beam_width,
max_steps=len(state)*max_steps_multiplier,
predictor=predictor,
return_path=True
)
if result.path_found:
success_count += 1
path_length = len(result.path)
sum_length += path_length
diff_length += path_length - len(hp)
if path_length == len(hp):
optimal_count += 1
except Exception as e:
print(f"Error in beam search: {e}")
continue
success_rate = success_count / len(test_start_states) if test_start_states else 0
avg_length = sum_length / success_count if success_count > 0 else 0
avg_diff = diff_length / success_count if success_count > 0 else 0
optimality_rate = optimal_count / success_count if success_count > 0 else 0
return success_rate, avg_length, avg_diff, optimality_rate
def statistical_analysis(df):
print("\n=== STATISTICAL ANALYSIS ===")
numeric_cols = ['success_rate', 'optimality_rate', 'avg_solution_length',
'width', 'length', 'n_dim', 'epochs', 'beam_width', 'n']
numeric_df = df[numeric_cols].select_dtypes(include=[np.number])
if not numeric_df.empty:
correlation_matrix = numeric_df.cr()
plt.figure(figsize=(10, 8))
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', center=0, fmt='.3f')
plt.title('Correlation Matrix')
plt.tight_layout()
plt.savefig('correlation_matrix.png', dpi=300, bbox_inches='tight')
plt.show()
try:
shap.initjs()
X = df[['width', 'length', 'n_dim', 'epochs', 'beam_width', 'n']]
y_success = df['success_rate']
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X, y_success)
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X)
plt.figure(figsize=(10, 8))
shap.summary_plot(shap_values, X, show=False)
plt.title('SHAP Summary Plot - Success Rate')
plt.tight_layout()
plt.savefig('shap_summary.png', dpi=300, bbox_inches='tight')
plt.show()
except Exception as e:
print(f"SHAP analysis failed: {e}")
def save_intermediate_results(results, experiment_id):
df = pd.DataFrame([{
**{'experiment_id': r['experiment_id'], 'n': r['n']},
**r['hyperparameters'],
**r['metrics']
} for r in results])
df.to_csv(f'intermediate_results_{experiment_id}.csv', index=False)
def save_final_results(results):
df = pd.DataFrame([{
**{'experiment_id': r['experiment_id'], 'n': r['n']},
**r['hyperparameters'],
**r['metrics']
} for r in results])
df.to_csv('final_ablation_results.csv', index=False)
with open('final_ablation_results.json', 'w') as f:
json.dump(results, f, indent=2)Ploting¶
def plot_comprehensive_analysis(df):
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
# 1. Success rate vs n
for bw in df['beam_width'].unique():
subset = df[df['beam_width'] == bw]
success_by_n = subset.groupby('n')['success_rate'].mean()
axes[0,0].plot(success_by_n.index, success_by_n.values, 'o-', label=f'beam={bw}')
axes[0,0].set_xlabel('n')
axes[0,0].set_ylabel('Success Rate')
axes[0,0].set_title('Success Rate vs n')
axes[0,0].legend()
# 2. Effect of width on success rate
for n in df['n'].unique()[:3]:
subset = df[df['n'] == n]
axes[0,1].scatter(subset['width'], subset['success_rate'], label=f'n={n}', alpha=0.6)
axes[0,1].set_xlabel('Training Width')
axes[0,1].set_ylabel('Success Rate')
axes[0,1].set_title('Effect of Training Width')
axes[0,1].legend()
# 3. Effect of n_dim on производительность
n_dim_performance = df.groupby('n_dim')['success_rate'].mean()
axes[0,2].bar(n_dim_performance.index, n_dim_performance.values)
axes[0,2].set_xlabel('Hidden Dimension')
axes[0,2].set_ylabel('Average Success Rate')
axes[0,2].set_title('Effect of Hidden Dimension')
# 4. Heatmap: beam_width vs n
pivot_success = df.pivot_table(values='success_rate', index='n', columns='beam_width', aggfunc='mean')
sns.heatmap(pivot_success, annot=True, fmt='.2f', ax=axes[1,0])
axes[1,0].set_title('Success Rate Heatmap')
# 5. Optimality rate analysis
optimal_by_n = df.groupby('n')['optimality_rate'].mean()
axes[1,1].plot(optimal_by_n.index, optimal_by_n.values, 'o-', color='red')
axes[1,1].set_xlabel('n')
axes[1,1].set_ylabel('Optimality Rate')
axes[1,1].set_title('Optimality Rate vs n')
# 6. Trade-off: success rate vs solution length
axes[1,2].scatter(df['success_rate'], df['avg_solution_length'], c=df['n'], alpha=0.6)
axes[1,2].set_xlabel('Success Rate')
axes[1,2].set_ylabel('Average Solution Length')
axes[1,2].set_title('Success Rate vs Solution Length')
plt.colorbar(axes[1,2].collections[0], ax=axes[1,2], label='n')
plt.tight_layout()
plt.savefig('comprehensive_analysis.png', dpi=300, bbox_inches='tight')
plt.show()
def plot_baseline_comparison(baseline_results):
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
for i, result in enumerate(baseline_results):
n = result['n']
# Success rate comparison
axes[0,0].plot(result['beam_size_range'], result['hamming_success'], 'o--', label=f'Hamming n={n}')
axes[0,0].plot(result['beam_size_range'], result['nn_success'], 'o-', label=f'NN n={n}')
# Optimality rate comparison
axes[0,1].plot(result['beam_size_range'], result['hamming_optimal'], 'o--', label=f'Hamming n={n}')
axes[0,1].plot(result['beam_size_range'], result['nn_optimal'], 'o-', label=f'NN n={n}')
axes[0,0].set_xlabel('Beam Size')
axes[0,0].set_ylabel('Success Rate')
axes[0,0].set_title('Success Rate Comparison')
axes[0,0].legend()
axes[0,0].set_xscale('log')
axes[0,1].set_xlabel('Beam Size')
axes[0,1].set_ylabel('Optimality Rate')
axes[0,1].set_title('Optimality Rate Comparison')
axes[0,1].legend()
axes[0,1].set_xscale('log')
plt.tight_layout()
plt.savefig('baseline_comparison.png', dpi=300, bbox_inches='tight')
plt.show()
Ablation¶
def ablation_study(n_range: list[int], hyperparameter_space: dict):
seed_everything(42)
results = []
experiment_id = 1
total_experiments = len(n_range) * len(list(product(*hyperparameter_space.values())))
print(f"Total experiments: {total_experiments}")
for n in n_range:
print(f"\n=== Experiment for {n} ===")
seed_everything(0)
test_start_states = [np.random.permutation(n) for _ in range(100)]
heurestic_path = [pancake_sort_path(x) for x in test_start_states]
hyperparam_combinations = list(product(
hyperparameter_space['width'],
hyperparameter_space['length'],
hyperparameter_space['n_dim'],
hyperparameter_space['epochs'],
hyperparameter_space['beam_width']
))
for width, length, n_dim, epochs, beam_width in tqdm(hyperparam_combinations, desc=f"n={n}"):
try:
model, graph = train_n(n, width, length, n_dim, epochs)
predictor = Predictor(graph, model)
success_rate, avg_length, avg_diff, optimality_rate = measure_success(
predictor, beam_width, test_start_states, heurestic_path, graph
)
result = {
'experiment_id': experiment_id,
'n': n,
'hyperparameters': {
'width': width,
'length': length,
'n_dim': n_dim,
'epochs': epochs,
'beam_width': beam_width
},
'metrics': {
'success_rate': success_rate,
'avg_solution_length': avg_length,
'avg_diff_from_optimal': avg_diff,
'optimality_rate': optimality_rate
},
'timestamp': datetime.now().isoformat()
}
results.append(result)
experiment_id += 1
if experiment_id % 50 == 0:
save_intermediate_results(results, experiment_id)
del model, predictor
if torch.cuda.is_available():
torch.cuda.empty_cache()
except Exception as e:
print(f"Exception while compute {experiment_id}: {e}")
continue
save_final_results(results)
analyze_results(results)
return results
def compare_with_baselines(n_range, beam_size_range):
baseline_results = []
for n in n_range:
print(f"\n=== Compare with baseline for n={n} ===")
seed_everything(0)
test_start_states = [np.random.permutation(n) for _ in range(50)]
heurestic_path = [pancake_sort_path(x) for x in test_start_states]
graph = CayleyGraph(PermutationGroups.pancake(n), device="cuda", random_seed=42)
hamming_success_rates = []
hamming_optimal_rates = []
best_nn_success_rates = []
best_nn_optimal_rates = []
for beam_size in beam_size_range:
hamming_predictor = Predictor(graph, "hamming")
hamming_success, _, _, hamming_optimal = measure_success(
hamming_predictor, beam_size, test_start_states, heurestic_path, graph
)
hamming_success_rates.append(hamming_success)
hamming_optimal_rates.append(hamming_optimal)
best_width, best_length, best_n_dim, best_epochs = 1000, 20, 128, 30
nn_model, _ = train_n(n, best_width, best_length, best_n_dim, best_epochs)
nn_predictor = Predictor(graph, nn_model)
nn_success, _, _, nn_optimal = measure_success(
nn_predictor, beam_size, test_start_states, heurestic_path, graph
)
best_nn_success_rates.append(nn_success)
best_nn_optimal_rates.append(nn_optimal)
del nn_model, nn_predictor
if torch.cuda.is_available():
torch.cuda.empty_cache()
baseline_results.append({
'n': n,
'beam_size_range': beam_size_range,
'hamming_success': hamming_success_rates,
'hamming_optimal': hamming_optimal_rates,
'nn_success': best_nn_success_rates,
'nn_optimal': best_nn_optimal_rates
})
return baseline_results
def analyze_results(results):
df = pd.DataFrame([{
**{'experiment_id': r['experiment_id'], 'n': r['n']},
**r['hyperparameters'],
**r['metrics']
} for r in results])
best_by_n = df.loc[df.groupby('n')['success_rate'].idxmax()]
print("\nЛучшие результаты по n:")
print(best_by_n[['n', 'success_rate', 'optimality_rate', 'width', 'length', 'n_dim', 'epochs', 'beam_width']])
plot_comprehensive_analysis(df)
statistical_analysis(df)
Final¶
# code