# audio_classifier_cli_refactored.py ## # @file audio_classifier_cli_refactored.py # @brief Command-line tool for audio-based person identification using MFCC features and GMMs. # # This script provides modes to: # 1. Optimize MFCC parameters (deltas, cmvn, nceps, nbanks, nfft) and GMM components # by evaluating performance on a development set. # 2. Train a set of GMMs (one per speaker) with specified MFCC & GMM parameters # on a training set, save the models, and evaluate on train/dev sets. # 3. Evaluate a pre-trained set of GMM models on the development set. # 4. Predict identities for audio files in an input directory using pre-trained GMMs # and output results in the specified project format. # # It uses a custom library (expected to be 'my_audio_lib.py' or aliased as 'ikrlib') # containing Python 3 compatible MFCC extraction and GMM training steps based on the # original ikrlib provided for the course. Librosa is used optionally for delta features. # import os import glob import numpy as np import time import pickle import argparse import re import itertools import logging from typing import List, Tuple, Optional, Dict, Any # --- Logging Setup --- logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s') # --- Library Imports --- # Import custom audio library (expected to contain MFCC/GMM logic) try: import ikrlib as ikrlib # Use the Python 3 compatible version logging.info("Successfully imported ikrlib.") except ImportError: logging.error("Could not import my_audio_lib.py. Make sure it exists and is in the Python path.") exit(1) except Exception as e: logging.error(f"Error importing my_audio_lib: {e}") exit(1) # Required third-party libraries from scipy.io import wavfile try: import librosa # Optional, for delta features HAS_LIBROSA = True except ImportError: logging.warning("librosa not found. Delta features (--add-deltas) will not be available.") HAS_LIBROSA = False from sklearn.metrics import accuracy_score, classification_report try: from tabulate import tabulate # Optional, for formatted tables HAS_TABULATE = True except ImportError: HAS_TABULATE = False logging.info("Optional dependency 'tabulate' not found. Using basic table format.") # --- Constants and Default Configuration --- ## @brief Default path to the training data directory. DEFAULT_TRAIN_DIR: str = '../train' ## @brief Default path to the development (validation) data directory. DEFAULT_DEV_DIR: str = '../dev' ## @brief Expected audio sampling frequency in Hz. DEFAULT_FS: int = 16000 ## @brief Default filename for saving/loading trained GMM models. DEFAULT_MODEL_FILENAME: str = 'trained_model_audio.pkl' ## @brief Default filename for saving prediction results. DEFAULT_PREDICTIONS_FILENAME: str = 'audio_predictions.txt' # Default MFCC Parameters ## @brief Default MFCC window length in samples (e.g., 25ms at 16kHz). DEFAULT_WINDOW: int = 400 ## @brief Default MFCC window overlap in samples (e.g., 10ms shift at 16kHz). DEFAULT_NOVERLAP: int = 240 ## @brief Default FFT length for MFCC calculation. DEFAULT_NFFT: int = 512 ## @brief Default number of Mel filter banks. DEFAULT_NBANKS: int = 23 ## @brief Default number of base Mel-frequency cepstral coefficients. DEFAULT_NCEPS: int = 16 ## @brief Default flag for adding delta and delta-delta features. DEFAULT_ADD_DELTAS: bool = True ## @brief Default flag for applying Cepstral Mean and Variance Normalization. DEFAULT_USE_CMVN: bool = True # Default GMM Parameters ## @brief Default number of Gaussian components per speaker GMM. DEFAULT_GMM_COMPONENTS: int = 32 ## @brief Default maximum number of EM iterations for GMM training. DEFAULT_MAX_ITER: int = 20 ## @brief Default convergence tolerance for GMM EM algorithm. DEFAULT_TOL: float = 1e-4 ## @brief Default covariance type for GMMs ('diag' recommended for speech). DEFAULT_COV_TYPE: str = 'diag' # --- Parameter Lists for Optimization Mode --- # WARNING: Modifying these drastically impacts runtime! ## @brief List of GMM component counts to test during optimization. GMM_COMPONENTS_TO_TEST: List[int] = [16, 32, 64] ## @brief List of boolean flags for adding delta features to test during optimization. ADD_DELTAS_TO_TEST: List[bool] = [True, False] ## @brief List of boolean flags for using CMVN to test during optimization. USE_CMVN_TO_TEST: List[bool] = [True, False] ## @brief List of base MFCC coefficient counts (NCEPS) to test during optimization. NCEPS_TO_TEST: List[int] = [13, 16] ## @brief List of Mel filter bank counts (NBANKS) to test during optimization. NBANKS_TO_TEST: List[int] = [23, 30] ## @brief List of FFT lengths (NFFT) to test during optimization. NFFT_TO_TEST: List[int] = [512, 1024] # Note: Other parameters like window, overlap, max_iter, tol, cov_type are kept fixed during optimization for simplicity. ## @brief Total number of classes (persons) expected in the dataset. NUM_CLASSES: int = 31 # --- Helper Functions --- def load_audio_data(data_dir: str, expect_labels: bool = True) -> Tuple[Optional[List[str]], Optional[np.ndarray], Optional[List[str]]]: """! @brief Loads audio file paths, segment names, and optionally labels from structured directories. Expects subdirectories named with integer class labels (e.g., '1', '2', ...) if labels=True. Falls back to loading WAV files directly from data_dir if no class subdirectories are found. @param data_dir Path to the root data directory. @param expect_labels If True, attempts to parse labels from subdirectory names. @return Tuple of (audio_paths, labels_array, segment_names). Labels array is None if expect_labels is False or labels cannot be determined. Returns (None, None, None) if data loading fails. """ audio_paths: List[str] = [] labels: List[Optional[int]] = [] segment_names: List[str] = [] logging.info(f"Loading audio data from: {data_dir} {'(expecting labels)' if expect_labels else '(paths only)'}") if not os.path.isdir(data_dir): logging.error(f"Data directory not found: {data_dir}") return None, None, None # Prefer class subdirectories if they exist and are digits class_dirs = sorted([d for d in os.listdir(data_dir) if os.path.isdir(os.path.join(data_dir, d)) and d.isdigit()]) if class_dirs: logging.debug(f"Found class subdirectories: {len(class_dirs)}") for class_label_str in class_dirs: try: class_label = int(class_label_str) if expect_labels else None current_dir = os.path.join(data_dir, class_label_str) for wav_path in glob.glob(os.path.join(current_dir, '*.[wW][aA][vV]')): # Case-insensitive WAV if os.path.isfile(wav_path): audio_paths.append(wav_path) segment_names.append(os.path.splitext(os.path.basename(wav_path))[0]) if expect_labels: labels.append(class_label) except ValueError: logging.warning(f"Skipping non-integer subdir '{class_label_str}'") except Exception as e: logging.error(f"Error loading {current_dir}: {e}") else: # Fallback to loading files directly logging.info("No class subdirectories found, loading WAV files directly.") found_files = glob.glob(os.path.join(data_dir, '*.[wW][aA][vV]')) if not found_files: logging.warning(f"No WAV files found directly in {data_dir}") if expect_labels and found_files: logging.warning("Cannot extract labels when loading directly.") for wav_path in found_files: if os.path.isfile(wav_path): audio_paths.append(wav_path) segment_names.append(os.path.splitext(os.path.basename(wav_path))[0]) if expect_labels: labels.append(None) if not audio_paths: logging.error(f"No WAV files found in {data_dir}") return None, None, None logging.info(f"Found {len(audio_paths)} audio paths.") # Prepare final labels array, filtering out None values if necessary final_labels_array = None if expect_labels: if any(l is not None for l in labels): valid_mask = [(l is not None) for l in labels] if not all(valid_mask): logging.warning("Filtering out audio files loaded without labels.") original_count = len(audio_paths) audio_paths = [p for i, p in enumerate(audio_paths) if valid_mask[i]] segment_names = [s for i, s in enumerate(segment_names) if valid_mask[i]] logging.info(f" Retained {len(audio_paths)}/{original_count} files with labels.") # Create array only from non-None labels final_labels_array = np.array([l for l in labels if l is not None], dtype=int) return audio_paths, final_labels_array, segment_names def extract_mfcc(audio_path: str, fs: int = DEFAULT_FS, window: int = DEFAULT_WINDOW, noverlap: int = DEFAULT_NOVERLAP, nfft: int = DEFAULT_NFFT, nbanks: int = DEFAULT_NBANKS, nceps: int = DEFAULT_NCEPS, add_deltas: bool = DEFAULT_ADD_DELTAS, use_cmvn: bool = DEFAULT_USE_CMVN) -> Optional[np.ndarray]: """! @brief Extracts MFCC features for a single audio file. Optionally adds delta features (if librosa is available) and applies CMVN. @param audio_path Path to the input WAV file. @param fs Expected sampling frequency. @param window Window length in samples. @param noverlap Overlap length in samples. @param nfft FFT length. @param nbanks Number of Mel filter banks. @param nceps Number of base cepstral coefficients. @param add_deltas If True, attempts to add delta and delta-delta features. @param use_cmvn If True, applies Cepstral Mean and Variance Normalization. @return Numpy array of features (num_frames x feature_dim), or None if an error occurs. """ try: rate, sig = wavfile.read(audio_path) if rate != fs: logging.debug(f"Sample rate mismatch for {os.path.basename(audio_path)}. Expected {fs}, got {rate}. Skipping.") return None # Ensure signal is float for calculations sig = sig.astype(np.float64) # Use ikrlib's mfcc function (assumes it's Python 3 compatible) mfccs = ikrlib.mfcc(sig, window, noverlap, nfft, fs, nbanks, nceps) if mfccs is None or mfccs.shape[0] == 0: return None # Handle empty output from mfcc features = mfccs # Add Deltas if add_deltas: if HAS_LIBROSA: try: delta1 = librosa.feature.delta(mfccs, axis=0, width=9) delta2 = librosa.feature.delta(mfccs, axis=0, order=2, width=9) features = np.hstack((mfccs, delta1, delta2)) except Exception as e: logging.warning(f"Error calculating deltas for {os.path.basename(audio_path)}: {e}. Using base MFCCs.") features = mfccs # Fallback else: # Warning about missing librosa already given at import pass # Cannot add deltas # Apply CMVN if use_cmvn and features.shape[0] > 1: # Need >1 frame mean = np.mean(features, axis=0) std_dev = np.std(features, axis=0) std_dev[std_dev < 1e-6] = 1e-6 # Avoid division by zero features = (features - mean) / std_dev # Final check for empty features after processing if features.shape[0] == 0: logging.debug(f"No features remaining after processing for {os.path.basename(audio_path)}.") return None return features except FileNotFoundError: logging.error(f"Audio file not found: {audio_path}") return None except Exception as e: logging.warning(f"Error processing audio {os.path.basename(audio_path)}: {e}") return None def preprocess_data_mfcc(audio_paths: List[str], config_name: str, fs: int, window: int, noverlap: int, nfft: int, nbanks: int, nceps: int, add_deltas: bool, use_cmvn: bool) -> Tuple[Optional[List[np.ndarray]], Optional[List[int]]]: """! @brief Extracts MFCC features for a list of audio files using specified parameters. @param audio_paths List of paths to audio files. @param config_name String identifier for logging (e.g., "Train", "Dev"). @param fs Expected sampling frequency. @param window MFCC window length in samples. @param noverlap MFCC overlap length in samples. @param nfft MFCC FFT length. @param nbanks Number of Mel filter banks. @param nceps Number of base cepstral coefficients. @param add_deltas Boolean flag to add delta features. @param use_cmvn Boolean flag to apply CMVN. @return Tuple containing (List of feature arrays [one per successfully processed file], List of original indices corresponding to the successful extractions). Returns (None, None) if extraction fails completely. """ features_list: List[np.ndarray] = [] processed_indices: List[int] = [] # Original indices from audio_paths error_count = 0 logging.info(f"Extracting MFCC for {len(audio_paths)} files ({config_name})...") start_time = time.time() for i, path in enumerate(audio_paths): features = extract_mfcc(path, fs, window, noverlap, nfft, nbanks, nceps, add_deltas, use_cmvn) if features is not None and features.shape[0] > 0: features_list.append(features) processed_indices.append(i) else: error_count += 1 # Reduce progress printing frequency if (i + 1) % 20 == 0 or i == len(audio_paths) - 1: logging.debug(f" Processed {i+1}/{len(audio_paths)} audio files...") elapsed_time = time.time() - start_time logging.info(f"MFCC extraction done in {elapsed_time:.2f}s.") if error_count > 0: logging.warning(f"Failed to extract features for {error_count} files.") if not features_list: logging.error(f"No features extracted for config '{config_name}'.") return None, None logging.info(f"Successfully extracted features for {len(processed_indices)} files.") return features_list, processed_indices def train_single_gmm(X_class_features: np.ndarray, n_components: int, max_iter: int, tol: float, cov_type: str = 'diag') -> Optional[Tuple[np.ndarray, np.ndarray, np.ndarray]]: """! @brief Trains a single Gaussian Mixture Model using iterative EM steps from the ikrlib wrapper. Assumes diagonal covariances are used primarily. @param X_class_features Numpy array of all feature frames for one class (num_frames x num_features). @param n_components The number of Gaussian components in the mixture. @param max_iter Maximum number of EM iterations. @param tol Convergence tolerance based on change in total log likelihood. @param cov_type Covariance type ('diag' strongly recommended and default). @return Tuple of (weights, means, covs) if successful, otherwise None. covs will be a diagonal covariance matrix (n_components x n_features). """ if X_class_features is None or X_class_features.shape[0] < n_components: logging.warning(f"Insufficient data ({0 if X_class_features is None else X_class_features.shape[0]} frames) for {n_components} components. Skipping GMM.") return None n_samples, n_features = X_class_features.shape logging.debug(f" Training GMM with {n_components} components on {n_samples} frames...") # --- Initialization --- ws_init = np.ones(n_components) / n_components try: # Select distinct initial means mus_init_indices = np.random.choice(n_samples, n_components, replace=False) mus_init = X_class_features[mus_init_indices, :] except ValueError as e: logging.error(f" Error selecting initial means (need n_samples >= n_components): {e}. Skipping.") return None if cov_type != 'diag': logging.warning("Only 'diag' covariance is fully supported by this wrapper. Forcing 'diag'.") cov_type = 'diag' # Initialize diagonal covariances based on overall variance overall_var = np.var(X_class_features, axis=0) + 1e-6 # Epsilon for stability if np.all(overall_var < 1e-5): # Handle cases with zero variance features logging.warning(" Feature variance very low. Using small constant diagonal covs.") overall_var = np.full(n_features, 1e-4) covs_init = np.tile(overall_var, (n_components, 1)) covs_init[covs_init <= 1e-8] = 1e-8 # Floor initial variances # --- EM Iteration Loop --- ws, mus, covs = ws_init, mus_init, covs_init prev_tll = -np.inf converged = False logging.debug(f" Iterating (max={max_iter}, tol={tol}):") for i in range(max_iter): try: # Apply variance floor before each step if cov_type == 'diag': covs[covs <= 1e-8] = 1e-8 # Call the single EM step function from the library ws_new, mus_new, covs_new, tll = ikrlib.train_gmm_diag_step(X_class_features, ws, mus, covs) # Check for numerical instability if not np.isfinite(tll) or np.any(~np.isfinite(ws_new)) or np.any(~np.isfinite(mus_new)) or np.any(~np.isfinite(covs_new)): logging.warning(f"\n NaN/Inf detected during EM iteration {i+1}. Stopping training for this GMM.") return None # Indicate failure # Apply variance floor after update if cov_type == 'diag': covs_new[covs_new <= 1e-8] = 1e-8 # Update parameters ws, mus, covs = ws_new, mus_new, covs_new # Check for convergence delta_tll = tll - prev_tll logging.debug(f" Iter {i+1}: TLL={tll:.4f}, Delta={delta_tll:.4f}") if abs(delta_tll) < tol: logging.debug(" Converged.") converged = True break prev_tll = tll except np.linalg.LinAlgError as e: logging.warning(f"\n Linear algebra error during EM (iter {i+1}): {e}. Stopping training.") return None except Exception as e: logging.error(f"\n Unexpected error during EM (iter {i+1}): {e}", exc_info=True) # Log traceback return None if not converged: logging.debug(" Max iterations reached.") # Final check for NaNs just in case if np.any(~np.isfinite(ws)) or np.any(~np.isfinite(mus)) or np.any(~np.isfinite(covs)): logging.error(" Error: Final GMM parameters contain NaN/Inf.") return None return ws, mus, covs # Return the trained model parameters tuple def score_gmm_utterance(gmm_params: Tuple[np.ndarray, np.ndarray, np.ndarray], X_utterance: np.ndarray) -> float: """! @brief Calculates the average log-likelihood for an utterance given a single GMM's parameters. Assumes diagonal covariance GMM. @param gmm_params Tuple containing (weights, means, covs_diag). @param X_utterance Numpy array of feature frames for the utterance (num_frames x num_features). @return Average log-likelihood score, or -infinity on error. """ ws, mus, covs_diag = gmm_params if X_utterance is None or X_utterance.shape[0] == 0: logging.debug("Cannot score empty utterance.") return -np.inf try: # Call the logpdf function from the library frame_logliks = ikrlib.logpdf_gmm_diag(X_utterance, ws, mus, covs_diag) # Handle potential non-finite values robustly finite_mask = np.isfinite(frame_logliks) if not np.all(finite_mask): num_nonfinite = len(frame_logliks) - np.sum(finite_mask) logging.warning(f"Non-finite frame log-likelihoods encountered ({num_nonfinite}). Using only finite values.") finite_logliks = frame_logliks[finite_mask] if len(finite_logliks) == 0: return -np.inf # All frames were non-finite return np.mean(finite_logliks) else: return np.mean(frame_logliks) # Common case except Exception as e: logging.warning(f"Error during logpdf_gmm scoring: {e}") return -np.inf def save_gmm_models(gmm_models: Dict[int, tuple], filename: str): """! @brief Saves the dictionary of trained GMM models using pickle. """ logging.info(f"Saving {len(gmm_models)} GMM models to {filename}") try: with open(filename, 'wb') as f: pickle.dump(gmm_models, f) except Exception as e: logging.error(f"Error saving GMM models to {filename}: {e}") def load_gmm_models(filename: str) -> Optional[Dict[int, tuple]]: """! @brief Loads a dictionary of trained GMM models from a pickle file. @param filename Path to the pickle file. @return Dictionary {class_label: (ws, mus, covs)}, or None if loading fails. """ logging.info(f"Loading GMM models from {filename}") if not os.path.exists(filename): logging.error(f"GMM models file not found: {filename}") return None try: with open(filename, 'rb') as f: gmm_models = pickle.load(f) # Basic validation if not isinstance(gmm_models, dict): raise TypeError("Loaded object is not a dictionary") if not gmm_models: logging.warning("Loaded GMM dictionary is empty.") # Allow numpy integers as keys too if not all(isinstance(k, (int, np.integer)) for k in gmm_models.keys()): raise TypeError("Keys are not integers") # Optional: Add more checks on the structure of the tuples if needed logging.info(f"Successfully loaded {len(gmm_models)} GMM models.") return gmm_models except Exception as e: logging.error(f"Error loading GMM models from {filename}: {e}") return None # --- Mode Functions --- def run_optimize(args: argparse.Namespace): """! @brief Performs the GMM parameter optimization loop. Tests combinations of GMM components and MFCC parameters (deltas, cmvn, nceps, nbanks, nfft). Evaluates each combination on the dev set and reports the best performer. @param args Parsed command-line arguments. """ try: train_paths, y_train_all, _ = load_audio_data(args.train_dir, expect_labels=True) dev_paths, y_dev_all, _ = load_audio_data(args.dev_dir, expect_labels=True) if train_paths is None or dev_paths is None or y_train_all is None or y_dev_all is None: logging.error("Cannot run optimization: data loading failed.") return except FileNotFoundError as e: logging.error(e) return # Use fixed MFCC settings from global defaults for window/overlap window = DEFAULT_WINDOW noverlap = DEFAULT_NOVERLAP # Use fixed GMM settings from global defaults for iter/tol/cov max_iter = DEFAULT_MAX_ITER tol = DEFAULT_TOL cov_type = DEFAULT_COV_TYPE # Create parameter combinations to test param_combinations = list(itertools.product( GMM_COMPONENTS_TO_TEST, ADD_DELTAS_TO_TEST, USE_CMVN_TO_TEST, NCEPS_TO_TEST, NBANKS_TO_TEST, NFFT_TO_TEST )) results: List[Dict[str, Any]] = [] total_start_time = time.time() logging.info(f"Starting GMM parameter optimization ({len(param_combinations)} combinations)...") logging.info(f" Fixed GMM: Iter={max_iter}, Tol={tol}, Cov={cov_type}. Components varied.") logging.info(f" Fixed MFCC: Win={window}, NOverlap={noverlap}. NCEPS/NBANKS/NFFT/Deltas/CMVN varied.") # --- Pre-extract features --- train_features_cache: Dict[tuple, Tuple[Optional[List], Optional[List], Optional[np.ndarray]]] = {} dev_features_cache: Dict[tuple, Tuple[Optional[List], Optional[List], Optional[np.ndarray]]] = {} logging.info("Pre-extracting features for unique MFCC settings...") mfcc_settings_to_test = set() for combo in param_combinations: mfcc_key = (combo[1], combo[2], combo[3], combo[4], combo[5]) # (deltas, cmvn, nceps, nbanks, nfft) mfcc_settings_to_test.add(mfcc_key) logging.info(f" Need to extract features for {len(mfcc_settings_to_test)} unique MFCC configurations.") for mfcc_idx, mfcc_key in enumerate(mfcc_settings_to_test): add_deltas, use_cmvn, nceps, nbanks, nfft = mfcc_key if nceps > nbanks: logging.warning(f" Skipping extraction for invalid config: NCEPS({nceps}) > NBANKS({nbanks})") train_features_cache[mfcc_key] = (None, None, None) dev_features_cache[mfcc_key] = (None, None, None) continue mfcc_config_name = f"MFCC {mfcc_idx+1}/{len(mfcc_settings_to_test)} D{add_deltas}_C{use_cmvn}_N{nceps}_B{nbanks}_F{nfft}" logging.info(f" Extracting for: {mfcc_config_name}") train_feat_list, train_idx = preprocess_data_mfcc(train_paths, "Train", DEFAULT_FS, window, noverlap, nfft, nbanks, nceps, add_deltas, use_cmvn) y_train_filt = y_train_all[train_idx] if train_idx is not None else None train_features_cache[mfcc_key] = (train_feat_list, train_idx, y_train_filt) dev_feat_list, dev_idx = preprocess_data_mfcc(dev_paths, "Dev", DEFAULT_FS, window, noverlap, nfft, nbanks, nceps, add_deltas, use_cmvn) y_dev_filt = y_dev_all[dev_idx] if dev_idx is not None else None dev_features_cache[mfcc_key] = (dev_feat_list, dev_idx, y_dev_filt) logging.info("Feature extraction complete.") # --- Run evaluation loop --- for i, params in enumerate(param_combinations): gmm_components, add_deltas, use_cmvn, nceps, nbanks, nfft = params mfcc_key = (add_deltas, use_cmvn, nceps, nbanks, nfft) if nceps > nbanks: continue # Skip invalid combo config_name = f"GMM:{gmm_components}c, D:{add_deltas}, CMVN:{use_cmvn}, N:{nceps}, B:{nbanks}, F:{nfft}" iteration_start_time = time.time() logging.info(f"--- Testing Combo {i+1}/{len(param_combinations)}: {config_name} ---") train_features_list, _, y_train = train_features_cache.get(mfcc_key, (None, None, None)) dev_features_list, _, y_dev_true = dev_features_cache.get(mfcc_key, (None, None, None)) if train_features_list is None or dev_features_list is None or y_train is None or y_dev_true is None: logging.warning(" Skipping: Feature extraction failed for this setting.") results.append({'params': params, 'accuracy': -1.0, 'error': 'Feature extraction failed'}) continue # Group training features features_by_class = {} successful_grouping = True for feat_idx, label in enumerate(y_train): if label not in features_by_class: features_by_class[label] = [] if train_features_list[feat_idx] is not None and train_features_list[feat_idx].shape[0] > 0: features_by_class[label].append(train_features_list[feat_idx]) else: successful_grouping = False break # Handle potential None feature here if not successful_grouping: logging.warning(" Skipping: Issue grouping training features.") results.append({'params': params, 'accuracy': -1.0, 'error': 'Feature grouping error'}) continue # Train GMMs gmm_models: Dict[int, tuple] = {} train_gmm_start = time.time() all_classes = sorted(features_by_class.keys()) logging.info(f" Training {len(all_classes)} GMMs...") successful_models_count = 0 for label in all_classes: if not features_by_class.get(label): continue valid_features = [f for f in features_by_class[label] if f is not None and f.shape[0] > 0] if not valid_features: continue try: X_class = np.vstack(valid_features) except ValueError: continue gmm_result = train_single_gmm(X_class, gmm_components, max_iter, tol, cov_type) if gmm_result is not None: gmm_models[label] = gmm_result successful_models_count += 1 logging.info(f" Training done ({successful_models_count} models) in {time.time()-train_gmm_start:.2f}s.") if not gmm_models: logging.warning(" Skipping evaluation: No GMMs trained.") results.append({'params': params, 'accuracy': -1.0, 'error': 'GMM training failed'}) continue # Evaluate on Dev set y_pred = [] eval_start_time = time.time() valid_labels = sorted(gmm_models.keys()) logging.info(f" Evaluating on {len(dev_features_list)} dev files...") for X_dev_utt in dev_features_list: scores = [score_gmm_utterance(gmm_models[lbl], X_dev_utt) if lbl in gmm_models else -np.inf for lbl in valid_labels] if not scores: pred_label = -1 else: pred_label = valid_labels[np.argmax(scores)] y_pred.append(pred_label) try: accuracy = accuracy_score(y_dev_true, y_pred) logging.info(f" Evaluation done in {time.time()-eval_start_time:.2f}s -> Accuracy: {accuracy:.4f}") results.append({'params': params, 'accuracy': accuracy, 'error': None}) except ValueError as e: logging.error(f" Error calculating accuracy: {e}. Maybe label mismatch?") results.append({'params': params, 'accuracy': -1.0, 'error': 'Accuracy calculation error'}) logging.info(f" Iteration finished in {time.time() - iteration_start_time:.2f}s") total_time = time.time() - total_start_time logging.info(f"--- Optimization Finished in {total_time:.2f} seconds ---") # --- Reporting Results --- best_result = None if results: results.sort(key=lambda x: x['accuracy'], reverse=True) best_result = results[0] if best_result['accuracy'] >= 0: logging.info("\nBest Configuration Found:") logging.info(f" Parameters (GMM_C, Deltas, CMVN, NCEPS, NBANKS, NFFT): {best_result['params']}") logging.info(f" Dev Set Accuracy: {best_result['accuracy']:.4f}") else: logging.warning("No successful configurations found.") print("\n--- Optimization Summary Table ---") # Print table to stdout headers = ["GMM_C", "Deltas", "CMVN", "NCEPS", "NBANKS", "NFFT", "Accuracy", "Error"] table_data = [[p[0], p[1], p[2], p[3], p[4], p[5], f"{res['accuracy']:.4f}" if res['accuracy'] >= 0 else "FAIL", res['error'] or ""] for res in results for p in [res['params']]] table_data.sort(key=lambda row: float(row[6]) if row[6] != "FAIL" else -1.0, reverse=True) if HAS_TABULATE: print(tabulate(table_data, headers=headers, tablefmt="grid")) else: print(" | ".join(headers)) print("-" * 80) [print(f" {row[0]:<5} | {str(row[1]):<6} | {str(row[2]):<4} | {row[3]:<5} | {row[4]:<6} | {row[5]:<4} | {row[6]:<8} | {row[7]}") for row in table_data] # --- Retrain and Save Best --- if best_result and best_result['accuracy'] >= 0: logging.info("Retraining best model configuration and saving...") best_gmm_c, best_deltas, best_cmvn, best_nceps, best_nbanks, best_nfft = best_result['params'] mfcc_key = (best_deltas, best_cmvn, best_nceps, best_nbanks, best_nfft) train_features_list, _, y_train = train_features_cache.get(mfcc_key, (None, None, None)) if train_features_list is not None and y_train is not None: features_by_class = {} for feat_idx, label in enumerate(y_train): if label not in features_by_class: features_by_class[label] = [] if train_features_list[feat_idx] is not None and train_features_list[feat_idx].shape[0]>0: features_by_class[label].append(train_features_list[feat_idx]) best_gmm_models: Dict[int, tuple] = {} all_classes = sorted(features_by_class.keys()) logging.info(f" Retraining {len(all_classes)} GMMs with best params...") for label in all_classes: if not features_by_class.get(label): continue valid_features = [f for f in features_by_class[label] if f is not None and f.shape[0] > 0] if not valid_features: continue try: X_class = np.vstack(valid_features) except ValueError: continue gmm_result = train_single_gmm(X_class, best_gmm_c, max_iter, tol, cov_type) if gmm_result is not None: best_gmm_models[label] = gmm_result # Construct filename reflecting parameters save_path = f'gmm_BEST_C{best_gmm_c}_D{best_deltas}_CMVN{best_cmvn}_N{best_nceps}B{best_nbanks}F{best_nfft}.pkl' save_gmm_models(best_gmm_models, save_path) else: logging.error("Could not retrain best model: feature cache miss or error.") def run_train(args: argparse.Namespace): """! @brief Trains GMMs for each class, saves them, and evaluates on train/dev sets. @param args Parsed command-line arguments containing MFCC and GMM parameters. """ logging.info("--- Training & Evaluating GMM Mode ---") gmm_config_name = f"GMM{args.gmm_components}c_{args.cov_type}_Iter{args.max_iter}" mfcc_config_name = f"MFCC_N{args.nceps}B{args.nbanks}F{args.nfft}_D{args.add_deltas}_C{args.use_cmvn}" logging.info(f"Using GMM parameters: {gmm_config_name}") logging.info(f"Using MFCC parameters: {mfcc_config_name}") logging.info(f"Output models path: {args.output_model}") # --- Training Phase --- try: train_paths, y_train_all, _ = load_audio_data(args.train_dir, True) assert train_paths and y_train_all is not None except Exception as e: logging.error(f"Error loading train data: {e}") return logging.info("Extracting Training Features...") train_features_list, train_processed_indices = preprocess_data_mfcc( train_paths, "Train", DEFAULT_FS, args.window, args.noverlap, args.nfft, args.nbanks, args.nceps, args.add_deltas, args.use_cmvn ) if train_features_list is None: logging.error("Training failed: Feature extraction error.") return try: y_train = y_train_all[train_processed_indices] assert len(y_train) > 0 except (IndexError, AssertionError): logging.error("Training failed: Label filtering or no valid data.") return # Group features features_by_class = {} class_feature_lengths = {} for i, label in enumerate(y_train): if label not in features_by_class: features_by_class[label] = [] if train_features_list[i] is not None and train_features_list[i].shape[0] > 0: features_by_class[label].append(train_features_list[i]) class_feature_lengths[label] = class_feature_lengths.get(label, 0) + train_features_list[i].shape[0] logging.info(f"Features extracted for {len(y_train)} training files across {len(features_by_class)} classes.") # Train GMMs gmm_models: Dict[int, tuple] = {} training_start_time = time.time() all_classes = sorted(features_by_class.keys()) logging.info(f"Training GMMs for classes: {all_classes}...") successful_training_count = 0 for label in all_classes: logging.info(f" Training GMM for class {label} (Frames: {class_feature_lengths.get(label, 0)})...") if not features_by_class.get(label): logging.warning(" No features. Skipping.") continue try: valid_features = [f for f in features_by_class[label] if f is not None and f.shape[0] > 0] if not valid_features: logging.warning(" No stackable features. Skipping.") continue X_class = np.vstack(valid_features) except ValueError as e: logging.warning(f" Stack error: {e}. Skipping.") continue gmm_result = train_single_gmm(X_class, args.gmm_components, args.max_iter, args.tol, args.cov_type) if gmm_result is not None: gmm_models[label] = gmm_result successful_training_count += 1 logging.info(" Done.") else: logging.warning(f" Failed training for class {label}.") training_time = time.time() - training_start_time logging.info(f"GMM training finished in {training_time:.2f}s. Trained {successful_training_count}/{len(all_classes)} models.") # Save models if gmm_models: save_gmm_models(gmm_models, args.output_model) else: logging.error("No models trained, skipping save.") return print("-" * 30) # Separator # --- Evaluate on Training Set --- logging.info("--- Evaluating Trained GMMs on TRAINING Set ---") y_pred_train = [] train_eval_start_time = time.time() valid_labels_trained = sorted(gmm_models.keys()) logging.info(f"Scoring {len(train_features_list)} training files...") for i, X_train_utt in enumerate(train_features_list): if X_train_utt is None: continue # Skip if features were None originally scores = [score_gmm_utterance(gmm_models[lbl], X_train_utt) if lbl in gmm_models else -np.inf for lbl in valid_labels_trained] pred_label = valid_labels_trained[np.argmax(scores)] if scores else -1 y_pred_train.append(pred_label) train_eval_time = time.time() - train_eval_start_time logging.info(f"Train set scoring done in {train_eval_time:.2f}s.") # Need to filter y_train again to match only those utterances that were scored y_train_scored = [y for i, y in enumerate(y_train) if train_features_list[i] is not None] if len(y_train_scored) != len(y_pred_train): logging.warning("Mismatch between scored training utterances and labels. Cannot calculate training accuracy accurately.") else: train_accuracy = accuracy_score(y_train_scored, y_pred_train) logging.info(f"Accuracy on Training Set: {train_accuracy:.4f}") print("-" * 30) # --- Evaluate on Dev Set --- logging.info("--- Evaluating Trained GMMs on DEV Set ---") run_evaluate(args, loaded_gmm_models=gmm_models) # Pass models print("-" * 30) logging.info("Training and Evaluation complete.") def run_evaluate(args: argparse.Namespace, loaded_gmm_models: Optional[Dict[int, tuple]] = None): """! @brief Loads trained GMM models (or uses provided ones) and evaluates them on the dev set. @param args Parsed command-line arguments. @param loaded_gmm_models Optional dictionary of pre-loaded GMM models (used when called from run_train). """ gmm_models = None if loaded_gmm_models is None: logging.info("--- Evaluate Mode ---") logging.info(f"Loading models: {args.input_model}") logging.info(f"Evaluating on data from: {args.dev_dir}") gmm_models = load_gmm_models(args.input_model) else: gmm_models = loaded_gmm_models logging.info(f"Evaluating models just trained using MFCC params:") logging.info(f" Win={args.window}, NOverlap={args.noverlap}, NFFT={args.nfft}, NBanks={args.nbanks}") logging.info(f" NCeps={args.nceps}, Deltas={args.add_deltas}, CMVN={args.use_cmvn}") if not gmm_models: logging.error("Evaluation failed: GMM models not available.") return # Load Dev Data try: dev_paths, y_dev_all, _ = load_audio_data(args.dev_dir, True) assert dev_paths and y_dev_all is not None except Exception as e: logging.error(f"Eval failed loading dev data: {e}") return # Extract features for Dev Data dev_features_list, dev_processed_indices = preprocess_data_mfcc( dev_paths, "Dev Eval", DEFAULT_FS, args.window, args.noverlap, args.nfft, args.nbanks, args.nceps, args.add_deltas, args.use_cmvn ) if dev_features_list is None: logging.error("Eval failed: MFCC dev.") return if dev_processed_indices is None or len(dev_processed_indices) == 0: logging.error("Eval failed: No valid dev audio.") return try: y_dev_true = y_dev_all[dev_processed_indices] except IndexError: logging.error("Eval failed: Index filtering labels.") return if len(y_dev_true) != len(dev_features_list): logging.error(f"Eval failed: Label/feature mismatch.") return logging.info(f"Evaluating on {len(dev_features_list)} dev files.") # Predict for each dev utterance y_pred = [] eval_start_time = time.time() num_classes_trained = len(gmm_models) logging.info(f"Scoring against {num_classes_trained} trained GMMs...") valid_labels = sorted(gmm_models.keys()) for i, X_dev_utt in enumerate(dev_features_list): if X_dev_utt is None: # Double check y_pred.append(-1) continue scores = [score_gmm_utterance(gmm_models[lbl], X_dev_utt) if lbl in gmm_models else -np.inf for lbl in valid_labels] pred_label = valid_labels[np.argmax(scores)] if scores else -1 y_pred.append(pred_label) # logging.debug(f" Processed {i+1}/{len(dev_features_list)}") # Reduce verbosity eval_time = time.time() - eval_start_time logging.info(f"Evaluation scoring finished in {eval_time:.2f}s.") # Calculate and Print Metrics accuracy = accuracy_score(y_dev_true, y_pred) logging.info(f"Accuracy on Dev Set: {accuracy:.4f}") print("\n--- Classification Report (Dev Set) ---") # Print report to stdout all_possible_labels = np.arange(1, NUM_CLASSES + 1) present_labels = sorted(list(set(y_dev_true) | set(gmm_models.keys()))) report_labels = sorted(list(set(present_labels) | set(all_possible_labels))) print(classification_report(y_dev_true, y_pred, labels=report_labels, zero_division=0)) def run_predict(args: argparse.Namespace): """! @brief Loads trained GMM models and predicts identities for audio files in a directory. Outputs predictions in the specified project format. @param args Parsed command-line arguments containing prediction parameters (input_model, input_data, output_predictions) and MFCC parameters used for training. """ logging.info("--- Prediction Mode ---") logging.info(f"Loading models: {args.input_model}") logging.info(f"Input data dir: {args.input_data}") logging.info(f"Output predictions file: {args.output_predictions}") gmm_models = load_gmm_models(args.input_model) if not gmm_models: logging.error("Predict failed: Models empty or failed to load.") return pred_paths, _, segment_names = load_audio_data(args.input_data, expect_labels=False) if pred_paths is None or segment_names is None: logging.error("Predict failed: No input data.") return # Extract features using MFCC parameters specified in args logging.info(f"Using MFCC params: Win={args.window}, Shift={args.window-args.noverlap}, NCEP={args.nceps}, Deltas={args.add_deltas}, CMVN={args.use_cmvn}") pred_features_list, pred_processed_indices = preprocess_data_mfcc( pred_paths, "Predict", DEFAULT_FS, args.window, args.noverlap, args.nfft, args.nbanks, args.nceps, args.add_deltas, args.use_cmvn ) if pred_features_list is None: logging.error("Predict failed: MFCC error.") return if pred_processed_indices is None or len(pred_processed_indices) == 0: logging.error("Predict failed: No audio files processed successfully.") return # Align segment names segment_names_processed = [segment_names[i] for i in pred_processed_indices] results_lines = [] logging.info(f"Predicting for {len(pred_features_list)} successfully processed files...") predict_start_time = time.time() valid_gmm_labels = sorted(gmm_models.keys()) for i, (X_pred_utt, seg_name) in enumerate(zip(pred_features_list, segment_names_processed)): if (i + 1) % 100 == 0 or i == len(pred_features_list)-1: logging.info(f" Processed {i+1}/{len(pred_features_list)}") if X_pred_utt is None: # Handle potential None feature hard_decision = np.nan final_scores = [np.nan] * NUM_CLASSES else: scores_for_utt = {label: score_gmm_utterance(gmm_models[label], X_pred_utt) for label in valid_gmm_labels if label in gmm_models} if not scores_for_utt: hard_decision = np.nan final_scores = [np.nan] * NUM_CLASSES else: hard_decision = max(scores_for_utt, key=scores_for_utt.get) final_scores = [scores_for_utt.get(lbl_idx, np.nan) for lbl_idx in range(1, NUM_CLASSES + 1)] log_prob_str = " ".join([f"{sc:.6f}" if not np.isnan(sc) else "nan" for sc in final_scores]) hard_decision_str = str(int(hard_decision)) if not np.isnan(hard_decision) else "nan" results_lines.append(f"{seg_name} {hard_decision_str} {log_prob_str}") predict_time = time.time() - predict_start_time logging.info(f"Prediction finished in {predict_time:.2f}s.") try: with open(args.output_predictions, 'w') as f: [f.write(line + '\n') for line in results_lines] logging.info(f"Predictions saved to {args.output_predictions}") except Exception as e: logging.error(f"Error saving predictions: {e}") # --- Argument Parser Setup --- def create_parser() -> argparse.ArgumentParser: """! @brief Creates the argument parser for the command-line interface. """ parser = argparse.ArgumentParser( description="Train, evaluate, optimize or predict using GMMs on MFCC features.", formatter_class=argparse.ArgumentDefaultsHelpFormatter ) # Common args parser.add_argument('--train-dir', type=str, default=DEFAULT_TRAIN_DIR, help="Path to training data directory.") parser.add_argument('--dev-dir', type=str, default=DEFAULT_DEV_DIR, help="Path to development data directory.") # Mode selection mode_group = parser.add_mutually_exclusive_group(required=True) mode_group.add_argument('--train', action='store_true', help="Train GMMs, save, and evaluate on train/dev.") mode_group.add_argument('--evaluate', action='store_true', help="Evaluate pre-trained GMMs on the dev set.") mode_group.add_argument('--optimize', action='store_true', help="Optimize GMM components & MFCC parameters.") mode_group.add_argument('--predict', action='store_true', help="Predict using pre-trained GMMs on new data.") # Feature Arguments (used by all modes) feature_group = parser.add_argument_group('Feature Extraction Options') feature_group.add_argument('--window', type=int, default=DEFAULT_WINDOW, help="MFCC window length (samples).") feature_group.add_argument('--noverlap', type=int, default=DEFAULT_NOVERLAP, help="MFCC window overlap (samples).") feature_group.add_argument('--nfft', type=int, default=DEFAULT_NFFT, help="MFCC FFT length.") feature_group.add_argument('--nbanks', type=int, default=DEFAULT_NBANKS, help="Number of Mel filter banks.") feature_group.add_argument('--nceps', type=int, default=DEFAULT_NCEPS, help="Number of cepstral coefficients (base).") feature_group.add_argument('--add-deltas', action=argparse.BooleanOptionalAction, default=DEFAULT_ADD_DELTAS, help="Add delta features.") feature_group.add_argument('--use-cmvn', action=argparse.BooleanOptionalAction, default=DEFAULT_USE_CMVN, help="Apply CMVN.") # GMM Arguments (used by --train) gmm_group = parser.add_argument_group('GMM Training Options (for --train)') gmm_group.add_argument('--gmm-components', type=int, default=DEFAULT_GMM_COMPONENTS, help="Number of Gaussian components per GMM.") gmm_group.add_argument('--max-iter', type=int, default=DEFAULT_MAX_ITER, help="Maximum EM iterations.") gmm_group.add_argument('--tol', type=float, default=DEFAULT_TOL, help="EM convergence tolerance.") gmm_group.add_argument('--cov-type', type=str, default=DEFAULT_COV_TYPE, choices=['diag'], help="GMM covariance type ('diag' only).") gmm_group.add_argument('--output-model', type=str, default=DEFAULT_MODEL_FILENAME, help="Path to save trained GMM models dictionary.") # Evaluate/Predict Arguments evalpred_group = parser.add_argument_group('Evaluation/Prediction Options') evalpred_group.add_argument('--input-model', type=str, default=DEFAULT_MODEL_FILENAME, help="Path to saved GMM models (.pkl).") evalpred_group.add_argument('--input-data', type=str, help="Path to audio data directory (for --predict).") evalpred_group.add_argument('--output-predictions', type=str, default=DEFAULT_PREDICTIONS_FILENAME, help="Output prediction file (for --predict).") return parser # --- Main Execution --- def main(): """! @brief Main function to parse arguments and dispatch execution to the appropriate mode function. """ parser = create_parser() args = parser.parse_args() # --- Argument Validation --- if args.predict and not args.input_data: parser.error("--input-data required for --predict") if args.cov_type != 'diag': logging.warning("Only 'diag' covariance supported. Using 'diag'.") args.cov_type = 'diag' # Validate MFCC params (basic check) if args.nceps > args.nbanks: parser.error(f"Number of cepstral coefficients (nceps={args.nceps}) cannot exceed number of filter banks (nbanks={args.nbanks}).") if args.add_deltas and not HAS_LIBROSA: logging.warning("Cannot add deltas because librosa is not installed. Proceeding without deltas.") args.add_deltas = False # --- Execute Selected Mode --- if args.train: run_train(args) elif args.evaluate: run_evaluate(args) elif args.optimize: run_optimize(args) elif args.predict: run_predict(args) else: logging.error("No execution mode selected. Use --help for options.") parser.print_help() if __name__ == "__main__": main()