import argparse import os import random import torch import math import numpy as np import torch.nn as nn import torch.optim as optim import matplotlib.pyplot as plt import matplotlib.gridspec as gridspec from torch.utils.data import Dataset, DataLoader, Subset from torchvision import transforms from PIL import Image class FaceDataset(Dataset): """ Dataset loading for face images. root_dirs can be a string or list of directories (e.g., ['train', 'dev']) containing subfolders '1'..'31'. Each filename: f401_01_f12_i0_0.png (session is in the second field = '01'). """ def __init__(self, root_dirs, transform=None): if isinstance(root_dirs, str): root_dirs = [root_dirs] self.transform = transform self.samples = [] # each entry: (path, label, session) for root in root_dirs: for label in sorted(os.listdir(root), key=lambda x: int(x)): label_dir = os.path.join(root, label) if not os.path.isdir(label_dir): continue for fname in os.listdir(label_dir): if not fname.lower().endswith(".png"): continue parts = fname.split("_") session = parts[1] path = os.path.join(label_dir, fname) self.samples.append((path, int(label), session)) def __len__(self): return len(self.samples) def __getitem__(self, idx): path, label, _ = self.samples[idx] img = Image.open(path).convert("RGB") if self.transform: img = self.transform(img) return img, label class EvalDataset(Dataset): """ Dataset for unlabeled evaluation images stored in a single folder. Files should follow the pattern 'eval_XXXXX.png'. Returns: (image_tensor, filename) """ def __init__(self, eval_dir, transform=None): self.eval_dir = eval_dir self.transform = transform # List all files matching eval_*.png self.files = sorted(f for f in os.listdir(eval_dir) if f.lower().endswith(".png") and f.startswith("eval_")) def __len__(self): return len(self.files) def __getitem__(self, idx): fname = self.files[idx] path = os.path.join(self.eval_dir, fname) img = Image.open(path).convert("RGB") if self.transform: img = self.transform(img) return img, fname class ResBlock(nn.Module): def __init__(self, in_ch, out_ch, stride=1, downsample=None): super().__init__() self.conv1 = nn.Conv2d(in_ch, out_ch, 3, stride, 1, bias=False) self.bn1 = nn.BatchNorm2d(out_ch) self.relu = nn.ReLU(inplace=True) self.conv2 = nn.Conv2d(out_ch, out_ch, 3, 1, 1, bias=False) self.bn2 = nn.BatchNorm2d(out_ch) self.downsample = downsample def forward(self, x): identity = x out = self.relu(self.bn1(self.conv1(x))) out = self.bn2(self.conv2(out)) if self.downsample: identity = self.downsample(x) out += identity return self.relu(out) class CNN(nn.Module): def __init__(self, num_classes=31): super().__init__() self.in_ch = 32 self.conv = nn.Conv2d(3, 32, 7, 2, 3, bias=False) self.bn = nn.BatchNorm2d(32) self.relu = nn.ReLU(inplace=True) self.pool = nn.MaxPool2d(3, 2, 1) self.layer1 = self._make_layer(64, 2, 1) self.layer2 = self._make_layer(128, 2, 2) self.layer3 = self._make_layer(256, 2, 2) self.avgpool = nn.AdaptiveAvgPool2d((1, 1)) self.fc = nn.Linear(256, num_classes) def _make_layer(self, out_ch, blocks, stride): downsample = None if stride != 1 or self.in_ch != out_ch: downsample = nn.Sequential(nn.Conv2d(self.in_ch, out_ch, 1, stride, bias=False), nn.BatchNorm2d(out_ch)) layers = [ResBlock(self.in_ch, out_ch, stride, downsample)] self.in_ch = out_ch for _ in range(1, blocks): layers.append(ResBlock(out_ch, out_ch)) return nn.Sequential(*layers) def forward(self, x): x = self.pool(self.relu(self.bn(self.conv(x)))) x = self.layer1(x) x = self.layer2(x) x = self.layer3(x) x = self.avgpool(x) x = torch.flatten(x, 1) return self.fc(x) def train_model(model, device, train_loader, val_loader, epochs, lr, wd=1e-2, patience=18, save_path="model.pth"): criterion = nn.CrossEntropyLoss() optimizer = optim.Adam(model.parameters()) scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="max", factor=0.5, patience=patience) best_acc, no_imp = 0.0, 0 train_losses = [] val_accs = [] for epoch in range(1, epochs + 1): model.train() total_loss = 0 for imgs, labels in train_loader: imgs, labels = imgs.to(device), labels.to(device) - 1 optimizer.zero_grad() preds = model(imgs) loss = criterion(preds, labels) loss.backward() nn.utils.clip_grad_norm_(model.parameters(), 1.0) optimizer.step() total_loss += loss.item() * imgs.size(0) train_loss = total_loss / len(train_loader.dataset) train_losses.append(train_loss) model.eval() correct = 0 with torch.no_grad(): for imgs, labels in val_loader: imgs = imgs.to(device) preds = torch.argmax(model(imgs), 1) + 1 correct += (preds.cpu() == labels).sum().item() val_acc = correct / len(val_loader.dataset) val_accs.append(val_acc) print(f"Epoch {epoch}: train_loss={train_loss:.4f}, val_acc={val_acc:.4f}") scheduler.step(val_acc) if val_acc > best_acc: best_acc, no_imp = val_acc, 0 torch.save(model.state_dict(), save_path) else: no_imp += 1 if no_imp >= patience: print(f"Early stopping: no improvement for {no_imp} epochs") print(f"Stopping at epoch {epoch}") break return best_acc, train_losses, val_accs def jackknife_cross_validation(train_root, dev_root, transform_train, transform_val, batch_size, epochs, lr, device): ds_all = FaceDataset([train_root, dev_root], transform=None) sessions = sorted({sess for _, _, sess in ds_all.samples}) results = [] fold_losses = [] fold_accs = [] for sess in sessions: ds_train = FaceDataset([train_root, dev_root], transform=transform_train) ds_val = FaceDataset([train_root, dev_root], transform=transform_val) idx_tr = [i for i, (_, _, s) in enumerate(ds_train.samples) if s != sess] idx_va = [i for i, (_, _, s) in enumerate(ds_val.samples) if s == sess] loader_tr = DataLoader(Subset(ds_train, idx_tr), batch_size=batch_size, shuffle=True) loader_va = DataLoader(Subset(ds_val, idx_va), batch_size=batch_size, shuffle=False) print(f"Fold session {sess}: train={len(idx_tr)}, val={len(idx_va)} samples") model = CNN(num_classes=31).to(device) model_save_path = f"fold_{sess}.pth" acc, train_losses, val_accs = train_model( model, device, loader_tr, loader_va, epochs, lr, save_path=model_save_path ) print(f"Session {sess} best accuracy={acc:.4f}\n") results.append(acc) fold_losses.append(train_losses) fold_accs.append(val_accs) mean_acc = sum(results) / len(results) print(f"Jackknife per-session accuracy: {results}") print(f"Jackknife mean accuracy: {mean_acc:.4f}") plot_all_folds(fold_losses, fold_accs) def plot_all_folds(fold_losses, fold_accs): num_folds = len(fold_losses) cols = 2 rows = (num_folds + 1) // cols plt.rc("axes", labelsize=13) plt.rc("xtick", labelsize=11) plt.rc("ytick", labelsize=11) plt.rc("legend", fontsize=11) fig = plt.figure(figsize=(12, 8)) gs = gridspec.GridSpec(rows, cols * 2, width_ratios=[1, 0.1, 1, 0]) axes = [] for r in range(rows): for c in [0, 2]: idx = r * cols + c // 2 if idx < num_folds: ax = fig.add_subplot(gs[r, c]) axes.append(ax) for i, ax1 in enumerate(axes): ax2 = ax1.twinx() epochs = np.arange(1, len(fold_losses[i]) + 1) (line1,) = ax1.plot(epochs, fold_losses[i], color="blue", label="Training Loss", linestyle="--") (line2,) = ax2.plot(epochs, fold_accs[i], color="red", label="Validation Accuracy") # find and highlight best epoch best_epoch = int(np.argmax(fold_accs[i])) + 1 best_acc = fold_accs[i][best_epoch - 1] ax2.scatter(best_epoch, best_acc, color="green", s=40, zorder=5, label="Early Stopping Point") ax2.annotate(f"{best_acc:.2f}", xy=(best_epoch + 5, best_acc + 5), color="green") ax1.set_xlabel("Epoch") ax1.set_ylabel("Loss") ax2.set_ylabel("Accuracy") ax1.grid(True) ax1.set_title(f"Session Fold {i + 1}", fontsize=14) ax2.set_yticks(np.arange(0, 1.1, 0.1)) # legends ax1.legend(loc="upper left") ax2.legend(loc="lower left") plt.suptitle("Training Loss & Validation Accuracy per Session Fold", fontsize=18) plt.tight_layout(rect=[0, 0, 1, 0.97]) plt.savefig("loss_accuracy_per_fold.pdf") plt.close() def plot_sample_predictions(model, dataset, device, num_images=15): indices = random.sample(range(len(dataset)), num_images) images, labels = zip(*(dataset[i] for i in indices)) inputs = torch.stack(images).to(device) model.eval() with torch.no_grad(): outputs = model(inputs) preds = (torch.argmax(outputs, dim=1) + 1).cpu().numpy() mean = torch.tensor([0.485, 0.456, 0.406]).view(3, 1, 1) std = torch.tensor([0.229, 0.224, 0.225]).view(3, 1, 1) cols = 5 rows = math.ceil(num_images / cols) fig, axes = plt.subplots(rows, cols, figsize=(3 * cols, 3 * rows)) axes = axes.flatten() for i in range(num_images): ax = axes[i] img = inputs[i].cpu() * std + mean img = img.permute(1, 2, 0).numpy() ax.imshow(img) ax.axis("off") ax.set_title(f"Ground Truth: {labels[i]}\nPredicted: {preds[i]}", fontsize=8) plt.tight_layout() plt.savefig("sample_predictions.pdf") plt.close() def save_results(predictions, logit_scores, output_path="image_CNN.txt"): """ Writes a text file with one line per segment: ... - predictions: hard label (1–31) - logit_scores: list of 31 floats """ with open(output_path, "w") as f: for fname in sorted(predictions): segment = os.path.splitext(fname)[0] hard = predictions[fname] scores = logit_scores.get(fname, [float("nan")] * 31) # format fields = [segment, str(hard)] + [f"{s:.6f}" if not math.isnan(s) else "NaN" for s in scores] f.write(" ".join(fields) + "\n") if __name__ == "__main__": parser = argparse.ArgumentParser(description="Face classifier: --train for training, --eval for evaluation") parser.add_argument("--train", action="store_true", help="Run training using jackknife cross-validation") parser.add_argument("--eval", action="store_true", help="Load saved model and evaluate on a given dataset") parser.add_argument("--model_path", type=str, default="fold_03.pth", help="Path to .pth file for evaluation") args = parser.parse_args() device = torch.device("cuda" if torch.cuda.is_available() else "cpu") train_root, dev_root = "train", "dev" batch_size, epochs, lr = 16, 200, 0.001 transform_train = transforms.Compose( [ transforms.Resize((80, 80)), transforms.RandomHorizontalFlip(), transforms.RandomRotation(7), transforms.GaussianBlur(3, (0.01, 1)), transforms.RandomGrayscale(), transforms.ColorJitter(0.2, 0.2, 0.2, 0.1), transforms.RandomAdjustSharpness(2), transforms.ToTensor(), transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), ] ) transform_val = transforms.Compose( [ transforms.Resize((80, 80)), transforms.ToTensor(), transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), ] ) if args.train: print("Starting training") jackknife_cross_validation(train_root, dev_root, transform_train, transform_val, batch_size, epochs, lr, device) val_dataset = FaceDataset("dev", transform_val) val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, pin_memory=True) model = CNN(num_classes=31).to(device) model.load_state_dict(torch.load("fold_03.pth")) plot_sample_predictions(model, val_dataset, device, num_images=15) elif args.eval: print(f"Evaluating with model {args.model_path}") eval_dataset = EvalDataset("eval", transform_val) eval_loader = DataLoader(eval_dataset, batch_size=batch_size, shuffle=False, pin_memory=True) model = CNN(num_classes=31).to(device) model.load_state_dict(torch.load(args.model_path)) predictions = {} # hard label (int 1–31) logit_scores = {} # list of 31 floats model.eval() with torch.no_grad(): for imgs, fnames in eval_loader: imgs = imgs.to(device) outputs = model(imgs) logprobs = torch.softmax(outputs, dim=1).cpu().tolist() hard_pred = (torch.argmax(outputs, dim=1) + 1).cpu().tolist() for fn, h, lp in zip(fnames, hard_pred, logprobs): predictions[fn] = h logit_scores[fn] = lp save_results(predictions, logit_scores, output_path="image_CNN.txt")