-
Notifications
You must be signed in to change notification settings - Fork 0
Description
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.autograd import Variable
from torchvision import transforms
from torch.utils.data import DataLoader
import numpy as np
from tqdm import tqdm
import cv2
from PIL import Image
import argparse
from model import SHADOW
from image_loader import ImageFolder, make_dataset
from utils import MyWcploss
from misc import crf_refine, check_mkdir
from Evaluation_codes import evaluate_pairs
import glob
Device configuration
device = torch.device("cuda:1" if torch.cuda.is_available() else "cpu")
Hyperparameters
input_size = 416
num_epochs = 30
batch_size = 8
learning_rate = 0.001
pattern = 'CN'
num_of_interaction = 4
fix_backbone = False
to_pil = transforms.ToPILImage()
Data transforms
transform = transforms.Compose([
transforms.Resize((input_size, input_size)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
target_transform = transforms.Compose([
transforms.Resize((input_size, input_size)),
transforms.ToTensor()
])
def joint_transform(img, target):
return img, target
def train():
# Paths (modify these to match your directory structure)
data_root = r'./train' #训练数据地址,包含shadow和mask
save_results_path = r'./ckpt/train/image' #保存每一个epoch的结果
checkpoint_path = r'./ckpt/train' #保存每一个epoch的参数
gt_root = r'val/mask' #评估数据集的mask
# Create directories
check_mkdir(save_results_path)
check_mkdir(checkpoint_path)
# Load dataset
train_dataset = ImageFolder(
root=data_root,
joint_transform=joint_transform,
transform=transform,
target_transform=target_transform
)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
# Initialize model
net = SHADOW(
backbone='efficientnet-b3',
proj_planes=16,
pred_planes=32,
use_pretrained=True,
fix_backbone=fix_backbone,
has_se=False,
num_of_layers=num_of_interaction,
pattern=pattern
).to(device)
net.train()
# Loss and optimizer
criterion = MyWcploss().to(device)
optimizer = optim.Adam(net.parameters(), lr=learning_rate)
# Training loop
for epoch in range(num_epochs):
print(f'Epoch [{epoch+1}/{num_epochs}]')
running_loss = 0.0
for i, (images, targets) in enumerate(tqdm(train_loader, desc="Training")):
images = Variable(images).to(device)
targets = Variable(targets).to(device)
# Forward pass
optimizer.zero_grad()
S_out, NS_out = net(images)
loss = criterion(S_out, targets) # Use shadow output
# Backward and optimize
loss.backward()
optimizer.step()
running_loss += loss.item()
if (i + 1) % 10 == 0:
print(f'Step [{i+1}/{len(train_loader)}], Loss: {running_loss / 10:.4f}')
running_loss = 0.0
# Save checkpoint
checkpoint = {
'epoch': epoch + 1,
'state_dict': net.state_dict(),
'optimizer': optimizer.state_dict(),
'configs': {
'backbone': 'efficientnet-b3',
'proj_planes': 16,
'pred_planes': 32,
'use_pretrained': True,
'fix_backbone': fix_backbone,
'has_se': False,
'num_of_layers': num_of_interaction,
'pattern': pattern
}
}
torch.save(checkpoint, os.path.join(checkpoint_path, f'epoch_{epoch+1}.pth'))
# Evaluate on test set
net.eval()
test_results_path = os.path.join(save_results_path, f'test_epoch_{epoch+1}')
check_mkdir(test_results_path)
images_list = glob.glob('./val/shadow/*.*') #评估数据集的shadow
for i, path in tqdm(enumerate(images_list), desc="Valing"):
img = Image.open(path)
input_img = img
W, H = img.size
img = Variable(transform(img).unsqueeze(0)).to(device)
S_out, _ = net(img)
S_out = torch.sigmoid(S_out)
prediction = np.array(transforms.Resize((H, W))(to_pil(S_out.data.squeeze(0).cpu())))
prediction = crf_refine(np.array(input_img.convert('RGB')), prediction)
Image.fromarray(prediction).save(
os.path.join(test_results_path, os.path.basename(path)[:-4] + '.png')
)
# Calculate metrics
pos_err, neg_err, ber, acc, df = evaluate_pairs(test_results_path, gt_root)
print(f'Epoch [{epoch+1}] Test Metrics - BER: {ber:.2f}, pErr: {pos_err:.2f}, nErr: {neg_err:.2f}, acc: {acc:.4f}')
net.train()
if name == 'main':
train()