""" From v9 which performed best on the jack knife in v11 70% accuracy Now with: 1. Cepstral Mean Normalization (CMN) 2. Universal Background Model (UBM) + MAP Adaptation 3. Feature Augmentation (Pitch Shift, Time Stretch) 4. Delta + Delta-Delta Features """ import os os.environ["OMP_NUM_THREADS"] = "1" import numpy as np from scipy.io import wavfile from python_speech_features import mfcc, delta from sklearn.mixture import GaussianMixture from sklearn.preprocessing import StandardScaler import librosa from tqdm import tqdm from sklearn.metrics import confusion_matrix, classification_report import seaborn as sns import matplotlib.pyplot as plt # Constants TRAIN_DIR = "./Separate_data/train/sounds" DEV_DIR = "./Separate_data/dev/sounds" NUM_CEPS = 20 NFFT = 1024 SILENCE_TOP_DB = 25 UBM_COMPONENTS = 16 # For Universal Background Model SPEAKER_COMPONENTS = 8 # For speaker-adapted models def load_audio_features(src_folder, augment=True): """Load features with optional augmentation""" all_feats = [] for person_class in sorted(os.listdir(src_folder)): class_dir = os.path.join(src_folder, person_class) for audio_file in sorted(os.listdir(class_dir)): audio_path = os.path.join(class_dir, audio_file) feats = extract_features(audio_path, augment=augment) all_feats.extend(feats) return np.vstack(all_feats) def extract_features(audio_path, augment=True): """Extract features with optional augmentation""" fs, signal = wavfile.read(audio_path) signal = signal[20000:] # Remove first 2 seconds # Convert to float32 for librosa processing signal = signal.astype(np.float32) # Original features features = [get_feats(signal, fs)] if augment: try: # Time stretch stretched = librosa.effects.time_stretch(signal / np.max(np.abs(signal)), rate=0.9) features.append(get_feats(stretched * np.max(np.abs(signal)), fs)) # Pitch shift pitched = librosa.effects.pitch_shift(signal / np.max(np.abs(signal)), sr=fs, n_steps=2) features.append(get_feats(pitched * np.max(np.abs(signal)), fs)) except Exception as e: print(f"Augmentation failed for {audio_path}: {str(e)}") return features def get_feats(audio_sig, freq_sampling): """Extract and normalize features""" audio_sig = remove_silence(audio_sig) mfcc_feats = mfcc(audio_sig, freq_sampling, numcep=NUM_CEPS, appendEnergy=False, nfft=NFFT) # Apply CMN mfcc_feats -= np.mean(mfcc_feats, axis=0, keepdims=True) # Add deltas delta_feats = delta(mfcc_feats, 2) delta_delta_feats = delta(delta_feats, 2) return np.hstack((mfcc_feats, delta_feats, delta_delta_feats)) def remove_silence(audio_sig, top_db=SILENCE_TOP_DB): """Remove silent frames""" intervals = librosa.effects.split(audio_sig, top_db=top_db) return np.concatenate([audio_sig[start:end] for start, end in intervals]) def train_ubm(features): """Train Universal Background Model""" print("Training UBM...") ubm = GaussianMixture(n_components=UBM_COMPONENTS, covariance_type='diag', max_iter=200, verbose=1) ubm.fit(features) return ubm def adapt_speaker_gmm(ubm, features, n_components=SPEAKER_COMPONENTS): """Adapt UBM to speaker using MAP adaptation""" # Initialize with UBM parameters gmm = GaussianMixture(n_components=n_components, covariance_type='diag', max_iter=100) # Use UBM means as starting point gmm.means_ = ubm.means_[:n_components] gmm.covariances_ = ubm.covariances_[:n_components] gmm.weights_ = ubm.weights_[:n_components] gmm.precisions_cholesky_ = ubm.precisions_cholesky_[:n_components] # Fit with adaptation gmm.fit(features) return gmm def train_speaker_models(src_folder, ubm, scaler): """Train speaker-adapted models""" speaker_models = {} for speaker in tqdm(sorted(os.listdir(src_folder)), desc="Training speakers"): speaker_dir = os.path.join(src_folder, speaker) feats = [] for audio_file in sorted(os.listdir(speaker_dir)): audio_path = os.path.join(speaker_dir, audio_file) feats.extend(extract_features(audio_path, augment=True)) if feats: feats = np.vstack(feats) feats = scaler.transform(feats) # Apply global normalization gmm = adapt_speaker_gmm(ubm, feats) speaker_models[speaker] = gmm return speaker_models def evaluate_models(models, test_dir, scaler): """Evaluate trained GMM models on test data""" true_labels = [] pred_labels = [] for person_class in sorted(os.listdir(test_dir)): class_dir = os.path.join(test_dir, person_class) for audio_record in sorted(os.listdir(class_dir)): try: audio_record_pth = os.path.join(class_dir, audio_record) freq_sampling, audio_sig = wavfile.read(audio_record_pth) audio_sig = audio_sig[20000:] if len(audio_sig) > 20000 else audio_sig test_features = get_feats(audio_sig.astype(np.float32), freq_sampling) # Score against all models scores = { name: gmm.score(scaler.transform(test_features)) for name, gmm in models.items() } predicted_class = max(scores.items(), key=lambda x: x[1])[0] true_labels.append(person_class) pred_labels.append(predicted_class) except Exception as e: print(f"Error processing {audio_record_pth}: {str(e)}") continue return true_labels, pred_labels def plot_confusion_matrix(true_labels, pred_labels, classes): """Plot confusion matrix""" cm = confusion_matrix(true_labels, pred_labels, labels=classes) plt.figure(figsize=(10, 8)) sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=classes, yticklabels=classes) plt.xlabel('Predicted') plt.ylabel('True') plt.title('Confusion Matrix') plt.show() if __name__ == "__main__": # 1. Load and normalize all training data print("Loading training data...") scaler = StandardScaler() all_features = load_audio_features(TRAIN_DIR, augment=False) scaler.fit(all_features) # 2. Train UBM on normalized features norm_features = scaler.transform(all_features) ubm = train_ubm(norm_features) # 3. Train speaker-adapted models trained_models = train_speaker_models(TRAIN_DIR, ubm, scaler) print(f"Training complete. Models for {len(trained_models)} speakers created.") # Evaluate on development set print("\nEvaluating on development set...") true_labels, pred_labels = evaluate_models(trained_models, DEV_DIR, scaler) # Get unique class names classes = sorted(list(trained_models.keys())) # Print classification report print("\nClassification Report:") print(classification_report(true_labels, pred_labels, target_names=classes, zero_division=0)) # Plot confusion matrix plot_confusion_matrix(true_labels, pred_labels, classes)