Loading main.pydeleted 100644 → 0 +0 −321 Original line number Diff line number Diff line import torch import optuna import platform import sys import random import string import os from optuna import Trial from torch.utils.data import DataLoader from src.data_handlers import SiameseNetworkDataset from src.net import SiameseNetwork, ContrastiveLoss from src.train import train from src.suggestions import suggest_params_general from src.builders import build_layers from src.evaluation import evaluate import torch.nn as nn import torch.nn.functional as f from sklearn.metrics import accuracy_score from sklearn.metrics import roc_curve, confusion_matrix, fbeta_score, recall_score, precision_score, precision_recall_curve, auc import numpy as np from PIL import Image import torchvision.transforms as transforms import matplotlib.pyplot as plt import torchvision ############################################################################### NUM_WORKERS = 0 if platform.system() == "Windows" else 2 IS_CUDA_CAPABLE = torch.cuda.is_available() MAX_SECONDS_PER_TRIAL = 7200 if len(sys.argv) < 2 else int(sys.argv[1]) EVALUATE_ON = "CEDAR" if len(sys.argv) < 3 else "ICDAR" MODEL_DIR = "." if len(sys.argv) < 4 else sys.argv[3] ############################################################################### def imshow(img,text=None,should_save=False): npimg = img.numpy() plt.axis("off") plt.imshow(np.transpose(npimg, (1, 2, 0))) plt.show() def load_and_evaluate(model_path: str) -> None: """ Load the saved model, evaluate and print the performance. @param model_path: string """ try: # Load the model model = torch.load(model_path) # Create the dataloader for testing dataset_path = os.path.join("dataset", "ICDAR") test_dataset = SiameseNetworkDataset(dataset_path, False, 128) test_dataloader = DataLoader(test_dataset, shuffle=False, num_workers=NUM_WORKERS, batch_size=1) # Evaluate and print the performance of the model predicted = [] actual = [] # Evaluate each sample from dataset for i, data in enumerate(test_dataloader, 0): image1, image2, label = data # Transfer to computation device if IS_CUDA_CAPABLE: image1, image2 = image1.cuda(), image2.cuda() else: image1, image2 = image1.cpu(), image2.cpu() # Feed the network the sample and get output output1, output2 = model(image1, image2) # Calculate pairwise distance of output, returns single value distance = f.pairwise_distance(output1, output2).item() # Collect predictions and the true label if distance is not None: predicted.append(distance) actual.append(label.item()) # Perform normalization so that all predictions fit into range [0, 1] if max(predicted) == 0: raise ValueError predicted = [float(prediction) / max(predicted) for prediction in predicted] # Calculate Precision-Recall Curve and the area under it precision, recall, thresholds = precision_recall_curve(actual, predicted) auc_prc = auc(recall, precision) # Calculate F2-scores for all thresholds fscore = (5 * precision * recall) / (4 * precision + recall) # If division by zero occurred on some elements, zero-out the result fscore[fscore == np.inf] = 0 # Find the threshold that yields highest F2-score best_fscore_index = np.argmax(fscore) # Find precision, recall recall_value = round(recall[best_fscore_index], ndigits=4) precision_value = round(precision[best_fscore_index], ndigits=4) # Assign classes to predictions based on the threshold optimal_threshold = thresholds[best_fscore_index] predicted_thresholded = list( map(lambda prediction: 0 if prediction < optimal_threshold else 1, predicted)) # Construct confusion matrix, find F2 score and accuracy tn, fp, fn, tp = confusion_matrix(actual, predicted_thresholded).ravel() accuracy_value = accuracy_score( actual, predicted_thresholded) f2 = np.max(fscore) f1 = fbeta_score(actual, predicted_thresholded, beta=1, zero_division=0) f0_5 = fbeta_score(actual, predicted_thresholded, beta=0.5, zero_division=0) # ============================================================================= # fpr, tpr, thresholds = roc_curve(actual, predicted) # accuracy_scores = [] # for thresh in thresholds: # accuracy_scores.append(accuracy_score(actual, [m > thresh for m in predicted])) # # accuracies = np.array(accuracy_scores) # max_accuracy = accuracies.max() # max_accuracy_threshold = thresholds[accuracies.argmax()] # # predicted_thresholded = list( # map(lambda prediction: 0 if prediction < max_accuracy_threshold else 1, predicted)) # # # Construct confusion matrix, find F2 score and accuracy # tn, fp, fn, tp = confusion_matrix(actual, predicted_thresholded).ravel() # # f2 = fbeta_score(actual, predicted_thresholded, beta=2) # f1 = fbeta_score(actual, predicted_thresholded, beta=1) # f05 = fbeta_score(actual, predicted_thresholded, beta=0.5) # # precision = precision_score(actual, predicted_thresholded) # recall = recall_score(actual, predicted_thresholded) # # ============================================================================= print("Threshold: {}".format(optimal_threshold)) print("AU PRC: {}, F2-score: {}, F1-score: {}, F0.5-score: {}".format(auc_prc, f2, f1, f0_5)) print("TN: {}, FP: {}, FN: {}, TP: {}".format(tn, fp, fn, tp)) print("Accuracy: {}, Precision: {}, Recall: {}".format(accuracy_value, precision_value, recall_value)) except ValueError as e: print(e) def train_best(): try: # Build layers and network c_layers_built = nn.Sequential(*[ nn.Conv2d(1, 64, kernel_size=3, stride=1, padding=1), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=3, stride=2), nn.Dropout(0.6), nn.Conv2d(64, 224, kernel_size=5, stride=1, padding=2), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=3, stride=2), nn.Dropout(0.55), nn.Conv2d(224, 320, kernel_size=7, stride=2, padding=5), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=3, stride=1), nn.Dropout(0.65), nn.Conv2d(320, 640, kernel_size=11, stride=2, padding=5), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=3, stride=1), nn.Dropout(0.8) ]) in1, in2, in3, in4 = c_layers_built(torch.empty(1, 1, 128, 128)).size() f_layers_built = nn.Sequential(*[ nn.Linear(in1 * in2 * in3 * in4, 4096), nn.ReLU(inplace=True), nn.Linear(4096, 768), nn.ReLU(inplace=True), nn.Linear(768, 32) ]) net = SiameseNetwork(c_layers_built, f_layers_built) # Build the dataloader for training dataset_path = os.path.join("dataset", "ICDAR") train_dataset = SiameseNetworkDataset(dataset_path, True, 128) train_dataloader = DataLoader(train_dataset, shuffle=True, num_workers=NUM_WORKERS, batch_size=12) # Build the optimizer optimizer = torch.optim.Adam(net.parameters(), lr=0.0009079, betas=(0.8909, 0.8937), weight_decay=0.0008969) # Build the loss function loss_function = ContrastiveLoss(1.9447) # Train the model model = train(net, train_dataloader, optimizer, loss_function, 30, IS_CUDA_CAPABLE, NUM_WORKERS, 9000) # Render the current trial invalid if training failed if model is None: return 0.0 # Build the dataloader for testing test_dataset = SiameseNetworkDataset(dataset_path, False, 128) test_dataloader = DataLoader(test_dataset, shuffle=True, num_workers=NUM_WORKERS, batch_size=1) # Evaluate the model auc_prc, f2, f1, f0_5, conf_matrix, acc_prec_rec = evaluate(net, test_dataloader, IS_CUDA_CAPABLE, NUM_WORKERS) tn, fp, fn, tp = conf_matrix accuracy_value, precision_value, recall_value = acc_prec_rec # Save the model if MODEL_DIR is not None: # Generate random name model_name = ''.join(random.choice(string.ascii_lowercase) for _ in range(16)) torch.save(model, os.path.join(MODEL_DIR, model_name)) print("Model saved as: {}".format(model_name)) return auc_prc except (RuntimeError, ValueError) as e: print(e) return 0.0 def objective(trial: Trial) -> int: """ Perform one trial of the study. @param trial: Trial @return: int, area under Precision-Recall Curve, or 0 if the trial failed """ try: # Get suggestions of parameters suggest_params_general(trial) # Build layers and network c_layers_built, f_layers_built = build_layers(trial) net = SiameseNetwork(c_layers_built, f_layers_built) # Build the dataloader for training dataset_path = os.path.join("dataset", "ICDAR") train_dataset = SiameseNetworkDataset(dataset_path, True, 128) train_dataloader = DataLoader(train_dataset, shuffle=True, num_workers=NUM_WORKERS, batch_size=trial.params["batch_size"]) # Build the optimizer optimizer = torch.optim.Adam(net.parameters(), lr=trial.params["lr"], betas=(trial.params["beta1"], trial.params["beta2"]), weight_decay=trial.params["weight_decay"]) # Build the loss function loss_function = ContrastiveLoss(trial.params["margin"]) # Train the model model = train(net, train_dataloader, optimizer, loss_function, trial.params["epochs"], IS_CUDA_CAPABLE, NUM_WORKERS, MAX_SECONDS_PER_TRIAL) # Render the current trial invalid if training failed if model is None: return 0.0 # Build the dataloader for testing test_dataset = SiameseNetworkDataset(dataset_path, False, 128) test_dataloader = DataLoader(test_dataset, shuffle=True, num_workers=NUM_WORKERS, batch_size=1) # Evaluate the model auc_prc, f2, f1, f0_5, conf_matrix, acc_prec_rec = evaluate(net, test_dataloader, IS_CUDA_CAPABLE, NUM_WORKERS) tn, fp, fn, tp = conf_matrix accuracy_value, precision_value, recall_value = acc_prec_rec # Save the model if MODEL_DIR is not None: # Generate random name model_name = ''.join(random.choice(string.ascii_lowercase) for _ in range(16)) torch.save(model.state_dict(), os.path.join(MODEL_DIR, model_name)) print("Model saved as: {}".format(model_name)) return auc_prc except (RuntimeError, ValueError) as e: print(e) return 0.0 ############################################################################### if __name__ == '__main__': #train_best() load_and_evaluate("tqsauyvpyafremyr") study = optuna.create_study(direction="maximize", sampler=optuna.samplers.TPESampler()) #study.optimize(objective) Loading
main.pydeleted 100644 → 0 +0 −321 Original line number Diff line number Diff line import torch import optuna import platform import sys import random import string import os from optuna import Trial from torch.utils.data import DataLoader from src.data_handlers import SiameseNetworkDataset from src.net import SiameseNetwork, ContrastiveLoss from src.train import train from src.suggestions import suggest_params_general from src.builders import build_layers from src.evaluation import evaluate import torch.nn as nn import torch.nn.functional as f from sklearn.metrics import accuracy_score from sklearn.metrics import roc_curve, confusion_matrix, fbeta_score, recall_score, precision_score, precision_recall_curve, auc import numpy as np from PIL import Image import torchvision.transforms as transforms import matplotlib.pyplot as plt import torchvision ############################################################################### NUM_WORKERS = 0 if platform.system() == "Windows" else 2 IS_CUDA_CAPABLE = torch.cuda.is_available() MAX_SECONDS_PER_TRIAL = 7200 if len(sys.argv) < 2 else int(sys.argv[1]) EVALUATE_ON = "CEDAR" if len(sys.argv) < 3 else "ICDAR" MODEL_DIR = "." if len(sys.argv) < 4 else sys.argv[3] ############################################################################### def imshow(img,text=None,should_save=False): npimg = img.numpy() plt.axis("off") plt.imshow(np.transpose(npimg, (1, 2, 0))) plt.show() def load_and_evaluate(model_path: str) -> None: """ Load the saved model, evaluate and print the performance. @param model_path: string """ try: # Load the model model = torch.load(model_path) # Create the dataloader for testing dataset_path = os.path.join("dataset", "ICDAR") test_dataset = SiameseNetworkDataset(dataset_path, False, 128) test_dataloader = DataLoader(test_dataset, shuffle=False, num_workers=NUM_WORKERS, batch_size=1) # Evaluate and print the performance of the model predicted = [] actual = [] # Evaluate each sample from dataset for i, data in enumerate(test_dataloader, 0): image1, image2, label = data # Transfer to computation device if IS_CUDA_CAPABLE: image1, image2 = image1.cuda(), image2.cuda() else: image1, image2 = image1.cpu(), image2.cpu() # Feed the network the sample and get output output1, output2 = model(image1, image2) # Calculate pairwise distance of output, returns single value distance = f.pairwise_distance(output1, output2).item() # Collect predictions and the true label if distance is not None: predicted.append(distance) actual.append(label.item()) # Perform normalization so that all predictions fit into range [0, 1] if max(predicted) == 0: raise ValueError predicted = [float(prediction) / max(predicted) for prediction in predicted] # Calculate Precision-Recall Curve and the area under it precision, recall, thresholds = precision_recall_curve(actual, predicted) auc_prc = auc(recall, precision) # Calculate F2-scores for all thresholds fscore = (5 * precision * recall) / (4 * precision + recall) # If division by zero occurred on some elements, zero-out the result fscore[fscore == np.inf] = 0 # Find the threshold that yields highest F2-score best_fscore_index = np.argmax(fscore) # Find precision, recall recall_value = round(recall[best_fscore_index], ndigits=4) precision_value = round(precision[best_fscore_index], ndigits=4) # Assign classes to predictions based on the threshold optimal_threshold = thresholds[best_fscore_index] predicted_thresholded = list( map(lambda prediction: 0 if prediction < optimal_threshold else 1, predicted)) # Construct confusion matrix, find F2 score and accuracy tn, fp, fn, tp = confusion_matrix(actual, predicted_thresholded).ravel() accuracy_value = accuracy_score( actual, predicted_thresholded) f2 = np.max(fscore) f1 = fbeta_score(actual, predicted_thresholded, beta=1, zero_division=0) f0_5 = fbeta_score(actual, predicted_thresholded, beta=0.5, zero_division=0) # ============================================================================= # fpr, tpr, thresholds = roc_curve(actual, predicted) # accuracy_scores = [] # for thresh in thresholds: # accuracy_scores.append(accuracy_score(actual, [m > thresh for m in predicted])) # # accuracies = np.array(accuracy_scores) # max_accuracy = accuracies.max() # max_accuracy_threshold = thresholds[accuracies.argmax()] # # predicted_thresholded = list( # map(lambda prediction: 0 if prediction < max_accuracy_threshold else 1, predicted)) # # # Construct confusion matrix, find F2 score and accuracy # tn, fp, fn, tp = confusion_matrix(actual, predicted_thresholded).ravel() # # f2 = fbeta_score(actual, predicted_thresholded, beta=2) # f1 = fbeta_score(actual, predicted_thresholded, beta=1) # f05 = fbeta_score(actual, predicted_thresholded, beta=0.5) # # precision = precision_score(actual, predicted_thresholded) # recall = recall_score(actual, predicted_thresholded) # # ============================================================================= print("Threshold: {}".format(optimal_threshold)) print("AU PRC: {}, F2-score: {}, F1-score: {}, F0.5-score: {}".format(auc_prc, f2, f1, f0_5)) print("TN: {}, FP: {}, FN: {}, TP: {}".format(tn, fp, fn, tp)) print("Accuracy: {}, Precision: {}, Recall: {}".format(accuracy_value, precision_value, recall_value)) except ValueError as e: print(e) def train_best(): try: # Build layers and network c_layers_built = nn.Sequential(*[ nn.Conv2d(1, 64, kernel_size=3, stride=1, padding=1), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=3, stride=2), nn.Dropout(0.6), nn.Conv2d(64, 224, kernel_size=5, stride=1, padding=2), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=3, stride=2), nn.Dropout(0.55), nn.Conv2d(224, 320, kernel_size=7, stride=2, padding=5), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=3, stride=1), nn.Dropout(0.65), nn.Conv2d(320, 640, kernel_size=11, stride=2, padding=5), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=3, stride=1), nn.Dropout(0.8) ]) in1, in2, in3, in4 = c_layers_built(torch.empty(1, 1, 128, 128)).size() f_layers_built = nn.Sequential(*[ nn.Linear(in1 * in2 * in3 * in4, 4096), nn.ReLU(inplace=True), nn.Linear(4096, 768), nn.ReLU(inplace=True), nn.Linear(768, 32) ]) net = SiameseNetwork(c_layers_built, f_layers_built) # Build the dataloader for training dataset_path = os.path.join("dataset", "ICDAR") train_dataset = SiameseNetworkDataset(dataset_path, True, 128) train_dataloader = DataLoader(train_dataset, shuffle=True, num_workers=NUM_WORKERS, batch_size=12) # Build the optimizer optimizer = torch.optim.Adam(net.parameters(), lr=0.0009079, betas=(0.8909, 0.8937), weight_decay=0.0008969) # Build the loss function loss_function = ContrastiveLoss(1.9447) # Train the model model = train(net, train_dataloader, optimizer, loss_function, 30, IS_CUDA_CAPABLE, NUM_WORKERS, 9000) # Render the current trial invalid if training failed if model is None: return 0.0 # Build the dataloader for testing test_dataset = SiameseNetworkDataset(dataset_path, False, 128) test_dataloader = DataLoader(test_dataset, shuffle=True, num_workers=NUM_WORKERS, batch_size=1) # Evaluate the model auc_prc, f2, f1, f0_5, conf_matrix, acc_prec_rec = evaluate(net, test_dataloader, IS_CUDA_CAPABLE, NUM_WORKERS) tn, fp, fn, tp = conf_matrix accuracy_value, precision_value, recall_value = acc_prec_rec # Save the model if MODEL_DIR is not None: # Generate random name model_name = ''.join(random.choice(string.ascii_lowercase) for _ in range(16)) torch.save(model, os.path.join(MODEL_DIR, model_name)) print("Model saved as: {}".format(model_name)) return auc_prc except (RuntimeError, ValueError) as e: print(e) return 0.0 def objective(trial: Trial) -> int: """ Perform one trial of the study. @param trial: Trial @return: int, area under Precision-Recall Curve, or 0 if the trial failed """ try: # Get suggestions of parameters suggest_params_general(trial) # Build layers and network c_layers_built, f_layers_built = build_layers(trial) net = SiameseNetwork(c_layers_built, f_layers_built) # Build the dataloader for training dataset_path = os.path.join("dataset", "ICDAR") train_dataset = SiameseNetworkDataset(dataset_path, True, 128) train_dataloader = DataLoader(train_dataset, shuffle=True, num_workers=NUM_WORKERS, batch_size=trial.params["batch_size"]) # Build the optimizer optimizer = torch.optim.Adam(net.parameters(), lr=trial.params["lr"], betas=(trial.params["beta1"], trial.params["beta2"]), weight_decay=trial.params["weight_decay"]) # Build the loss function loss_function = ContrastiveLoss(trial.params["margin"]) # Train the model model = train(net, train_dataloader, optimizer, loss_function, trial.params["epochs"], IS_CUDA_CAPABLE, NUM_WORKERS, MAX_SECONDS_PER_TRIAL) # Render the current trial invalid if training failed if model is None: return 0.0 # Build the dataloader for testing test_dataset = SiameseNetworkDataset(dataset_path, False, 128) test_dataloader = DataLoader(test_dataset, shuffle=True, num_workers=NUM_WORKERS, batch_size=1) # Evaluate the model auc_prc, f2, f1, f0_5, conf_matrix, acc_prec_rec = evaluate(net, test_dataloader, IS_CUDA_CAPABLE, NUM_WORKERS) tn, fp, fn, tp = conf_matrix accuracy_value, precision_value, recall_value = acc_prec_rec # Save the model if MODEL_DIR is not None: # Generate random name model_name = ''.join(random.choice(string.ascii_lowercase) for _ in range(16)) torch.save(model.state_dict(), os.path.join(MODEL_DIR, model_name)) print("Model saved as: {}".format(model_name)) return auc_prc except (RuntimeError, ValueError) as e: print(e) return 0.0 ############################################################################### if __name__ == '__main__': #train_best() load_and_evaluate("tqsauyvpyafremyr") study = optuna.create_study(direction="maximize", sampler=optuna.samplers.TPESampler()) #study.optimize(objective)