# Apply sum rule import sys import argparse from tkinter import Image import os import os.path as osp import numpy as np import joblib from tensorflow.keras.models import load_model from PIL import Image from tqdm import tqdm import json from scipy.special import logsumexp sys.path.append('../sound') from ikrlib import logpdf_gmm, mfcc, wavfile, logpdf_gauss from sound_gmm import wav16khz2mfcc class Input: def __init__(self, sound_path, image_path): self.image_path = image_path self.sound_path = sound_path self.file_name = image_path.replace(".png", "") self.cls = None self.likelihood = None self.posteriors = None def __str__(self): return f"{osp.basename(self.file_name)} {int(self.cls)}{''.join([f' {str(like)}' for like in self.likelihood])}" def load_sound_model(model_path): """ Load the sound model from the specified path. """ try: gmm_models = joblib.load(model_path) print(f"Loaded sound model from {model_path}") return gmm_models except Exception as e: print(f"Error loading sound model: {e}") return None def load_image_model(model_path): """ Load the image model from the specified path. """ try: model = load_model(model_path) print(f"Loaded image model from {model_path}") return model except Exception as e: print(f"Error loading image model: {e}") return None class Inferencer: def __init__(self, sound_model, image_model, alpha, class_indices, priors=None): self.sound_model = load_sound_model(sound_model) self.image_model = load_image_model(image_model) self.class_indicies = json.load(open(class_indices)) self.alpha = alpha if self.sound_model is None or self.image_model is None: raise Exception(f"Could not load sound model or image model") if priors is None: n = len(self.class_indicies) uniform = 1.0 / n self.priors = {cid: uniform for cid in self.class_indicies} else: self.priors = priors def __predict_sound(self, sound_file): mfccs = wav16khz2mfcc(sound_file) log_likelihoods = {} for person_id, model in self.sound_model.items(): try: log_likelihoods[person_id] = np.sum( logpdf_gmm(mfccs, model['weights'], model['means'], model['covariances'])) except np.linalg.LinAlgError: log_likelihoods[person_id] = -np.inf # uniform prior over N speakers speakers = list(log_likelihoods.keys()) N = len(speakers) log_prior = -np.log(N) # unnormalized log-posterior = log-likelihood + log-prior log_unnorm = {pid: ll + log_prior for pid, ll in log_likelihoods.items()} # normalization constant in log-domain log_evidence = logsumexp(list(log_unnorm.values())) # compute final posteriors posteriors = {pid: np.exp(lu - log_evidence) for pid, lu in log_unnorm.items()} return posteriors def __predict_image(self, image_file): image = Image.open(image_file) # Normalize image = image.convert("RGB") image_arr = np.asarray(image, dtype=np.float32) / 255.0 image_arr = np.expand_dims(image_arr, axis=0) preds = self.image_model.predict(image_arr, verbose=0)[0] new_predictions = np.zeros(len(self.class_indicies)) for key, value in self.class_indicies.items(): old = preds[value] new_predictions[int(key) - 1] = old return new_predictions def __sum_rule(self, preds_sound, preds_image): combined_preds = np.zeros(len(self.class_indicies)) for person_id in preds_sound: sound_score = preds_sound[person_id] image_score = preds_image[person_id] combined_preds[person_id] = self.alpha * sound_score + (1 - self.alpha) * image_score return combined_preds def predict(self, inputs): for inp in tqdm(inputs): preds_sound = self.__predict_sound(inp.sound_path) preds_image = self.__predict_image(inp.image_path) preds = self.__sum_rule(preds_sound, preds_image) inp.likelihood = [np.nan for _ in range(len(self.class_indicies))] inp.posteriors = preds inp.cls = np.argmax(preds) + 1 def main(sound_model_path, image_model_path, data_dir, output, alpha=0.5, class_indices=None): if not osp.exists(data_dir): raise AttributeError(f"Data directory {data_dir} does not exist") inferencer = Inferencer(sound_model_path, image_model_path, alpha, class_indices) inputs = [] for i in os.listdir(data_dir): if not i.endswith(".wav"): continue sound_file = osp.join(data_dir, i) image_file = osp.join(data_dir, i.replace(".wav", ".png")) inputs.append(Input(sound_file, image_file)) if not osp.exists(image_file): print(f"Skipping {image_file}") continue inferencer.predict(inputs) with open(output, "w") as f: for inp in inputs: f.write(str(inp) + "\n") if __name__ == "__main__": parser = argparse.ArgumentParser(description="Inference combination of models") parser.add_argument("-s", "--sound", type=str, required=True, help="Path to the sound model") parser.add_argument("-i", "--image", type=str, required=True, help="Path to the image model") parser.add_argument("-d", "--data", type=str, required=True, help="Path to the data directory") parser.add_argument("-o", "--output", type=str, default="output.txt", help="Path to the output directory") parser.add_argument("-a", "--alpha", type=float, default=0.5, help="Alpha parameter for sum rule") parser.add_argument("-ci", "--class_indices", type=str, default='../class_indices.json', help="Path to class indices file") args = parser.parse_args() sound_model_path = args.sound image_model_path = args.image data_dir = args.data alpha = args.alpha output = args.output main(sound_model_path, image_model_path, data_dir, output, alpha, args.class_indices)