This commit is contained in:
Maciej Sobkowiak 2022-02-15 19:06:43 +01:00
commit bc67061d69
5 changed files with 424 additions and 1 deletions

2
.gitignore vendored
View File

@ -2,4 +2,4 @@ venv/
**/images*
**/data/train_features*
**/data/train_labels*
**/__pychache__*
**/__pycache__*

View File

@ -0,0 +1,68 @@
import numpy as np
import pandas as pd
import rasterio
import torch
from typing import Optional, List
class CloudDataset(torch.utils.data.Dataset):
"""Reads in images, transforms pixel values, and serves a
dictionary containing chip ids, image tensors, and
label masks (where available).
"""
def __init__(
self,
x_paths: pd.DataFrame,
bands: List[str],
y_paths: Optional[pd.DataFrame] = None,
transforms: Optional[list] = None,
):
"""
Instantiate the CloudDataset class.
Args:
x_paths (pd.DataFrame): a dataframe with a row for each chip. There must be a column for chip_id,
and a column with the path to the TIF for each of bands
bands (list[str]): list of the bands included in the data
y_paths (pd.DataFrame, optional): a dataframe with a for each chip and columns for chip_id
and the path to the label TIF with ground truth cloud cover
transforms (list, optional): list of transforms to apply to the feature data (eg augmentations)
"""
self.data = x_paths
self.label = y_paths
self.transforms = transforms
self.bands = bands
def __len__(self):
return len(self.data)
def __getitem__(self, idx: int):
# Loads an n-channel image from a chip-level dataframe
img = self.data.loc[idx]
band_arrs = []
for band in self.bands:
with rasterio.open(img[f"{band}_path"]) as b:
band_arr = b.read(1).astype("float32")
band_arrs.append(band_arr)
x_arr = np.stack(band_arrs, axis=-1)
# Apply data augmentations, if provided
if self.transforms:
x_arr = self.transforms(image=x_arr)["image"]
x_arr = np.transpose(x_arr, [2, 0, 1])
# Prepare dictionary for item
item = {"chip_id": img.chip_id, "chip": x_arr}
# Load label if available
if self.label is not None:
label_path = self.label.loc[idx].label_path
with rasterio.open(label_path) as lp:
y_arr = lp.read(1).astype("float32")
# Apply same data augmentations to the label
if self.transforms:
y_arr = self.transforms(image=y_arr)["image"]
item["label"] = y_arr
return item

197
benchmark/cloud_model.py Normal file
View File

@ -0,0 +1,197 @@
from typing import Optional, List
import pandas as pd
import pytorch_lightning as pl
import segmentation_models_pytorch as smp
import torch
try:
from cloud_dataset import CloudDataset
from losses import intersection_over_union
except ImportError:
from benchmark_src.cloud_dataset import CloudDataset
from benchmark_src.losses import intersection_over_union
class CloudModel(pl.LightningModule):
def __init__(
self,
bands: List[str],
x_train: Optional[pd.DataFrame] = None,
y_train: Optional[pd.DataFrame] = None,
x_val: Optional[pd.DataFrame] = None,
y_val: Optional[pd.DataFrame] = None,
hparams: dict = {},
):
"""
Instantiate the CloudModel class based on the pl.LightningModule
(https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html).
Args:
bands (list[str]): Names of the bands provided for each chip
x_train (pd.DataFrame, optional): a dataframe of the training features with a row for each chip.
There must be a column for chip_id, and a column with the path to the TIF for each of bands.
Required for model training
y_train (pd.DataFrame, optional): a dataframe of the training labels with a for each chip
and columns for chip_id and the path to the label TIF with ground truth cloud cover.
Required for model training
x_val (pd.DataFrame, optional): a dataframe of the validation features with a row for each chip.
There must be a column for chip_id, and a column with the path to the TIF for each of bands.
Required for model training
y_val (pd.DataFrame, optional): a dataframe of the validation labels with a for each chip
and columns for chip_id and the path to the label TIF with ground truth cloud cover.
Required for model training
hparams (dict, optional): Dictionary of additional modeling parameters.
"""
super().__init__()
self.hparams.update(hparams)
self.save_hyperparameters()
# required
self.bands = bands
# optional modeling params
self.backbone = self.hparams.get("backbone", "resnet34")
self.weights = self.hparams.get("weights", "imagenet")
self.learning_rate = self.hparams.get("lr", 1e-3)
self.patience = self.hparams.get("patience", 4)
self.num_workers = self.hparams.get("num_workers", 2)
self.batch_size = self.hparams.get("batch_size", 32)
self.gpu = self.hparams.get("gpu", False)
self.transform = None
# Instantiate datasets, model, and trainer params if provided
self.train_dataset = CloudDataset(
x_paths=x_train,
bands=self.bands,
y_paths=y_train,
transforms=self.transform,
)
self.val_dataset = CloudDataset(
x_paths=x_val,
bands=self.bands,
y_paths=y_val,
transforms=None,
)
self.model = self._prepare_model()
## Required LightningModule methods ##
def forward(self, image: torch.Tensor):
# Forward pass
return self.model(image)
def training_step(self, batch: dict, batch_idx: int):
"""
Training step.
Args:
batch (dict): dictionary of items from CloudDataset of the form
{'chip_id': list[str], 'chip': list[torch.Tensor], 'label': list[torch.Tensor]}
batch_idx (int): batch number
"""
if self.train_dataset.data is None:
raise ValueError(
"x_train and y_train must be specified when CloudModel is instantiated to run training"
)
# Switch on training mode
self.model.train()
torch.set_grad_enabled(True)
# Load images and labels
x = batch["chip"]
y = batch["label"].long()
if self.gpu:
x, y = x.cuda(non_blocking=True), y.cuda(non_blocking=True)
# Forward pass
preds = self.forward(x)
# Log batch loss
loss = torch.nn.CrossEntropyLoss(reduction="none")(preds, y).mean()
self.log(
"loss",
loss,
on_step=True,
on_epoch=True,
prog_bar=True,
logger=True,
)
return loss
def validation_step(self, batch: dict, batch_idx: int):
"""
Validation step.
Args:
batch (dict): dictionary of items from CloudDataset of the form
{'chip_id': list[str], 'chip': list[torch.Tensor], 'label': list[torch.Tensor]}
batch_idx (int): batch number
"""
if self.val_dataset.data is None:
raise ValueError(
"x_val and y_val must be specified when CloudModel is instantiated to run validation"
)
# Switch on validation mode
self.model.eval()
torch.set_grad_enabled(False)
# Load images and labels
x = batch["chip"]
y = batch["label"].long()
if self.gpu:
x, y = x.cuda(non_blocking=True), y.cuda(non_blocking=True)
# Forward pass & softmax
preds = self.forward(x)
preds = torch.softmax(preds, dim=1)[:, 1]
preds = (preds > 0.5) * 1 # convert to int
# Log batch IOU
batch_iou = intersection_over_union(preds, y)
self.log(
"iou", batch_iou, on_step=True, on_epoch=True, prog_bar=True, logger=True
)
return batch_iou
def train_dataloader(self):
# DataLoader class for training
return torch.utils.data.DataLoader(
self.train_dataset,
batch_size=self.batch_size,
num_workers=self.num_workers,
shuffle=True,
pin_memory=True,
)
def val_dataloader(self):
# DataLoader class for validation
return torch.utils.data.DataLoader(
self.val_dataset,
batch_size=self.batch_size,
num_workers=0,
shuffle=False,
pin_memory=True,
)
def configure_optimizers(self):
opt = torch.optim.Adam(self.model.parameters(), lr=self.learning_rate)
sch = torch.optim.lr_scheduler.CosineAnnealingLR(opt, T_max=10)
return [opt], [sch]
## Convenience Methods ##
def _prepare_model(self):
# Instantiate U-Net model
unet_model = smp.Unet(
encoder_name=self.backbone,
encoder_weights=self.weights,
in_channels=4,
classes=2,
)
if self.gpu:
unet_model.cuda()
return unet_model

23
benchmark/losses.py Normal file
View File

@ -0,0 +1,23 @@
import numpy as np
def intersection_over_union(pred, true):
"""
Calculates intersection and union for a batch of images.
Args:
pred (torch.Tensor): a tensor of predictions
true (torc.Tensor): a tensor of labels
Returns:
intersection (int): total intersection of pixels
union (int): total union of pixels
"""
valid_pixel_mask = true.ne(255) # valid pixel mask
true = true.masked_select(valid_pixel_mask).to("cpu")
pred = pred.masked_select(valid_pixel_mask).to("cpu")
# Intersection and union totals
intersection = np.logical_and(true, pred)
union = np.logical_or(true, pred)
return intersection.sum() / union.sum()

135
benchmark/main.py Normal file
View File

@ -0,0 +1,135 @@
import os
from pathlib import Path
from typing import List
from loguru import logger
import pandas as pd
from PIL import Image
import torch
import typer
try:
from cloud_dataset import CloudDataset
from cloud_model import CloudModel
except ImportError:
from benchmark.cloud_dataset import CloudDataset
from benchmark.cloud_model import CloudModel
ROOT_DIRECTORY = Path("/codeexecution")
PREDICTIONS_DIRECTORY = ROOT_DIRECTORY / "predictions"
ASSETS_DIRECTORY = Path("./submission/assets")
DATA_DIRECTORY = ROOT_DIRECTORY / "data"
INPUT_IMAGES_DIRECTORY = DATA_DIRECTORY / "test_features"
# Set the pytorch cache directory and include cached models in your submission.zip
os.environ["TORCH_HOME"] = str(ASSETS_DIRECTORY / "assets/torch")
def get_metadata(features_dir: os.PathLike, bands: List[str]):
"""
Given a folder of feature data, return a dataframe where the index is the chip id
and there is a column for the path to each band's TIF image.
Args:
features_dir (os.PathLike): path to the directory of feature data, which should have
a folder for each chip
bands (list[str]): list of bands provided for each chip
"""
chip_metadata = pd.DataFrame(index=[f"{band}_path" for band in bands])
chip_ids = (
pth.name for pth in features_dir.iterdir() if not pth.name.startswith(".")
)
for chip_id in chip_ids:
chip_bands = [features_dir / chip_id / f"{band}.tif" for band in bands]
chip_metadata[chip_id] = chip_bands
return chip_metadata.transpose().reset_index().rename(columns={"index": "chip_id"})
def make_predictions(
model: CloudModel,
x_paths: pd.DataFrame,
bands: List[str],
predictions_dir: os.PathLike,
):
"""Predicts cloud cover and saves results to the predictions directory.
Args:
model (CloudModel): an instantiated CloudModel based on pl.LightningModule
x_paths (pd.DataFrame): a dataframe with a row for each chip. There must be a column for chip_id,
and a column with the path to the TIF for each of bands provided
bands (list[str]): list of bands provided for each chip
predictions_dir (os.PathLike): Destination directory to save the predicted TIF masks
"""
test_dataset = CloudDataset(x_paths=x_paths, bands=bands)
test_dataloader = torch.utils.data.DataLoader(
test_dataset,
batch_size=model.batch_size,
num_workers=model.num_workers,
shuffle=False,
pin_memory=True,
)
for batch_index, batch in enumerate(test_dataloader):
logger.debug(f"Predicting batch {batch_index} of {len(test_dataloader)}")
x = batch["chip"]
preds = model.forward(x)
preds = torch.softmax(preds, dim=1)[:, 1]
preds = (preds > 0.5).detach().numpy().astype("uint8")
for chip_id, pred in zip(batch["chip_id"], preds):
chip_pred_path = predictions_dir / f"{chip_id}.tif"
chip_pred_im = Image.fromarray(pred)
chip_pred_im.save(chip_pred_path)
def main(
model_weights_path: Path = ASSETS_DIRECTORY / "cloud_model.pt",
test_features_dir: Path = DATA_DIRECTORY / "test_features",
predictions_dir: Path = PREDICTIONS_DIRECTORY,
bands: List[str] = ["B02", "B03", "B04", "B08"],
fast_dev_run: bool = False,
):
"""
Generate predictions for the chips in test_features_dir using the model saved at
model_weights_path.
Predictions are saved in predictions_dir. The default paths to all three files are based on
the structure of the code execution runtime.
Args:
model_weights_path (os.PathLike): Path to the weights of a trained CloudModel.
test_features_dir (os.PathLike, optional): Path to the features for the test data. Defaults
to 'data/test_features' in the same directory as main.py
predictions_dir (os.PathLike, optional): Destination directory to save the predicted TIF masks
Defaults to 'predictions' in the same directory as main.py
bands (List[str], optional): List of bands provided for each chip
"""
if not test_features_dir.exists():
raise ValueError(
f"The directory for test feature images must exist and {test_features_dir} does not exist"
)
predictions_dir.mkdir(exist_ok=True, parents=True)
logger.info("Loading model")
model = CloudModel(bands=bands, hparams={"weights": None})
try:
model.load_state_dict(torch.load(model_weights_path))
except RuntimeError:
model.load_state_dict(torch.load(model_weights_path, map_location=torch.device('cpu')))
logger.info("Loading test metadata")
test_metadata = get_metadata(test_features_dir, bands=bands)
train_metadata = get_metadata(Path('data/train_features'), bands=bands)
if fast_dev_run:
test_metadata = test_metadata.head()
logger.info(f"Found {len(test_metadata)} chips")
logger.info("Generating predictions in batches")
make_predictions(model, test_metadata, bands, predictions_dir)
make_predictions(model, train_metadata, bands, Path('data/predictions'))
logger.info(f"""Saved {len(list(predictions_dir.glob("*.tif")))} predictions""")
if __name__ == "__main__":
typer.run(main)