Skip to article frontmatterSkip to article content
earth and related environmental sciences

NDVI Time-Series Analysis of Potato Fields

A step-by-step guide to extracting and analyzing temporal NDVI profiles from potato fields for crop-type studies.

Authors
Affiliations
Eurac Research
Eurac Research
ESA EOPF Zarr Logo

🚀 Launch in JupyterHub

Run this notebook interactively with all dependencies pre-installed

Preface

The original notebook used as a starting point for this work is a openEO platform example, available here. The example has been adapted to use the data provided by the EOPF Zarr Samples project instead of the openEO API.

Introduction

This notebook demonstrates how to perform NDVI time-series analysis for potato fields using EOPF-ZARR Sentinel-2 Level-2A data. The focus is on extracting clean vegetation profiles that highlight the characteristic temporal patterns of potato growth. These profiles can later serve as input features for crop-type classification.

The workflow is structured as follows:

  • Data access: Connect to a STAC catalog and query Sentinel-2 scenes for a specific field and time range.
  • Preprocessing: Use the Scene Classification Layer (SCL) with an openEO-style mask_scl_dilation to remove clouds, shadows, snow, and other unwanted contamination.
  • NDVI computation: Derive NDVI from red (B04) and near-infrared (B08) bands, optionally scaling to match openEO conventions.
  • Temporal aggregation: Aggregate NDVI values into dekads (10-day periods), interpolate missing data, and filter to a specific season.
  • Analysis & visualization: Plot the field-mean NDVI profile with uncertainty, revealing the phenological stages of potato development.

This pipeline demonstrates how to transform raw Earth Observation data into interpretable temporal signals, laying the groundwork for future rule-based or machine learning-based crop classification.

Timeseries analysis of vegetation greenness

Different land cover types can be identified by their seasonal green-up and green-down patterns. NDVI time series capture these phenological dynamics:

  • Coniferous forests remain green all year round, showing little variation in NDVI.
  • Grasslands show moderate seasonal cycles with higher values in spring and summer.
  • Annual crops (e.g., sugar beet, maize, potato) exhibit strong seasonal fluctuations, with steep green-up during growth, a pronounced peak, and rapid decline after harvest.

An overview of NDVI trajectories for several crop types is shown in the plots below. These illustrate how temporal profiles can serve as fingerprints for distinguishing between vegetation classes.

Crop_Type_NDVI_Profiles

Setup

Start importing the necessary libraries

import os
import xarray as xr
from datetime import datetime
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pystac_client
from scipy import ndimage as ndi
from distributed import LocalCluster
from pyproj import Transformer
cluster = LocalCluster(processes=False)
client = cluster.get_client()
cluster
Loading...

Area of Interest

The region of interest is located over the Belgium 31N tile, referenced in EPSG:32631. For the STAC query, the required input is the bounding box in EPSG:4326 (bbox_4326). Meanwhile, UTM-based slices are generated to subset the dataset, which is natively stored in UTM coordinates.

# Define AOI and convert to raster CRS
spatial_extent = {
    "west": 3.547150,
    "south": 50.865238,
    "east": 3.548602,
    "north": 50.866717,
}

bbox_4326 = [
    spatial_extent["west"],
    spatial_extent["south"],
    spatial_extent["east"],
    spatial_extent["north"],
]

# Convert AOI to UTM 33N (same as Sentinel-2 data)
transformer = Transformer.from_crs("EPSG:4326", "EPSG:32631", always_xy=True)
west_utm, south_utm = transformer.transform(
    spatial_extent["west"], spatial_extent["south"]
)
east_utm, north_utm = transformer.transform(
    spatial_extent["east"], spatial_extent["north"]
)

# Spatial slice parameters
x_slice = slice(west_utm, east_utm)
y_slice = slice(north_utm, south_utm)

STAC Search

# Connect to the STAC catalog
catalog = pystac_client.Client.open("https://stac.core.eopf.eodc.eu")

# Search for Sentinel-2 L2A items within a specific bounding box and date range
search = catalog.search(
    collections=["sentinel-2-l2a"],
    bbox=bbox_4326,
    datetime="2018-11-01/2020-02-01",
)

# Retrieve the list of matching items
items = list(search.items())
hrefs = [item.assets["product"].href for item in items]
print(f"List of first Five Items returned from the STAC query: {items[:5]}")
print(f"List of first Five Zarr URLs returned from the STAC query: {hrefs[:5]}")
List of first Five Items returned from the STAC query: [<Item id=S02MSIL2A_20200131T105251_0000_A051_T973>, <Item id=S02MSIL2A_20200129T110209_0000_B094_T881>, <Item id=S02MSIL2A_20200126T105229_0000_B051_T215>, <Item id=S02MSIL2A_20200124T110331_0000_A094_T671>, <Item id=S02MSIL2A_20200121T105341_0000_A051_T512>]
List of first Five Zarr URLs returned from the STAC query: ['https://objects.eodc.eu/e05ab01a9d56408d82ac32d69a5aae2a:notebook-data/tutorial_data/cpm_v260/S2A_MSIL2A_20200131T105251_N0500_R051_T31UES_20230426T102522.zarr', 'https://objects.eodc.eu:443/e05ab01a9d56408d82ac32d69a5aae2a:notebook-data/tutorial_data/cpm_v260/S2B_MSIL2A_20200129T110209_N0500_R094_T31UES_20230628T193235.zarr', 'https://objects.eodc.eu:443/e05ab01a9d56408d82ac32d69a5aae2a:notebook-data/tutorial_data/cpm_v260/S2B_MSIL2A_20200126T105229_N0500_R051_T31UES_20230428T121217.zarr', 'https://objects.eodc.eu:443/e05ab01a9d56408d82ac32d69a5aae2a:notebook-data/tutorial_data/cpm_v260/S2A_MSIL2A_20200124T110331_N0500_R094_T31UES_20230428T065124.zarr', 'https://objects.eodc.eu:443/e05ab01a9d56408d82ac32d69a5aae2a:notebook-data/tutorial_data/cpm_v260/S2A_MSIL2A_20200121T105341_N0500_R051_T31UES_20230427T135726.zarr']

DataCube Creation

def extract_time(ds):
    date_format = "%Y%m%dT%H%M%S"
    filename = ds.encoding["source"]
    date_str = os.path.basename(filename).split("_")[2]
    time = datetime.strptime(date_str, date_format)
    return ds.assign_coords(time=time)


datacube = xr.open_mfdataset(
    hrefs,
    engine="zarr",
    chunks={},
    group="/measurements/reflectance/r10m",
    concat_dim="time",
    combine="nested",
    preprocess=extract_time,
    mask_and_scale=True,
).sortby("time", ascending=True)

datacube = datacube[["b04", "b08"]].sel(x=x_slice, y=y_slice)
datacube
Loading...
scl = xr.open_mfdataset(
    hrefs,
    engine="zarr",
    chunks={},
    group="/conditions/mask/l2a_classification/r20m",  # Adjust if necessary
    concat_dim="time",
    combine="nested",
    preprocess=extract_time,
    mask_and_scale=True,
).sortby("time", ascending=True)[["scl"]]
scl_resampled = scl.scl.interp_like(datacube, method="nearest")
datacube["scl"] = scl_resampled
datacube = datacube.compute()
datacube
Loading...

Applying Custom Mask SCL Dilation Process

def circular_kernel(radius: int) -> np.ndarray:
    """Create a 2D circular (disk) kernel with given radius in pixels."""
    r = int(radius)
    y, x = np.ogrid[-r : r + 1, -r : r + 1]
    k = (x * x + y * y) <= (r * r)
    return k.astype(np.float32)


def _dilate_with_convolve(mask_2d: np.ndarray, kernel: np.ndarray) -> np.ndarray:
    """
    Dilate a boolean mask using convolution with a (0/1) kernel.
    Returns a boolean array where any overlap with the kernel sets True.
    """
    if mask_2d.dtype != np.float32:
        mask_2d = mask_2d.astype(np.float32, copy=False)
    # Convolution counts how many True pixels fall under the kernel footprint.
    conv = ndi.convolve(mask_2d, kernel, mode="constant", cval=0.0)
    return conv > 0.0


def _apply_dilation_block(
    scl_block: np.ndarray, k1: np.ndarray, k2: np.ndarray
) -> np.ndarray:
    """
    Core block function (works on 2D arrays). Returns combined dilated mask (bool).
    Implements the openEO SCL logic used by mask_scl_dilation.
    """
    # S2 Sen2Cor SCL classes (for reference):
    #  0 NODATA, 1 SAT/DEF, 2 DARK, 3 CLOUD_SHADOW, 4 VEG, 5 NOT_VEG,
    #  6 WATER, 7 UNCLASS, 8 CLOUD_MED, 9 CLOUD_HIGH, 10 THIN_CIRRUS, 11 SNOW
    # openEO mask_scl_dilation uses:
    #   mask1 = everything except {2,4,5,6,7}  (aggressive neighbourhood removal)
    #   mask2 = {3,8,9,10,11}                  (clouds, shadows, snow)
    scl = scl_block.astype(np.int16, copy=False)

    mask1 = (scl != 2) & (scl != 4) & (scl != 5) & (scl != 6) & (scl != 7)
    mask2 = (scl == 3) | (scl == 8) | (scl == 9) | (scl == 10) | (scl == 11)

    dil1 = _dilate_with_convolve(mask1, k1)
    dil2 = _dilate_with_convolve(mask2, k2)

    combined = dil1 | dil2
    return combined


def mask_scl_dilation(
    ds: xr.Dataset, *, time_band_name: str = "time", scl_band_name: str = "scl"
) -> xr.Dataset:
    """
    Apply the mask_scl_dilation to an xarray Dataset containing a Sentinel-2 SCL band.

    Parameters
    ----------
    ds : xr.Dataset
        Must contain an integer SCL classification band.
        Expected dims include (y, x) and optionally time-like dims (e.g. t).
    time_band_name: str
        Name of the time dimension in 'ds'
    scl_band_name : str
        Name of the SCL band in `ds`.

    Returns
    -------
    xr.Dataset
        Same as input, with selected bands masked (set to NaN) wherever the dilated mask is True.
    """

    if scl_band_name not in ds:
        raise ValueError(
            f"{scl_band_name!r} not found in dataset variables: {list(ds.data_vars)}"
        )

    # Validate time dimension exists in dataset
    if time_band_name not in ds.dims:
        raise ValueError(
            f"{time_band_name!r} not found in dataset dimensions: {list(ds.dims)}"
        )

    kernel1 = circular_kernel(radius=8)
    kernel2 = circular_kernel(radius=100)

    scl = ds[scl_band_name]

    # Figure out which dims are spatial:
    # Try common names first, fallback to guessing the last two dims are (y, x)
    cand_xy = [d for d in ["y", "x"] if d in scl.dims]
    if len(cand_xy) != 2:
        # Guess last two dims in order
        cand_xy = list(scl.dims[-2:])
    ydim, xdim = cand_xy

    # Build dilated mask with apply_ufunc so it's Dask-friendly and vectorized over non-spatial dims.
    dilated = xr.apply_ufunc(
        _apply_dilation_block,
        scl,
        input_core_dims=[[ydim, xdim]],
        output_core_dims=[[ydim, xdim]],
        kwargs={"k1": kernel1.astype(np.float32), "k2": kernel2.astype(np.float32)},
        dask="parallelized",
        vectorize=True,
        output_dtypes=[bool],
        dask_gufunc_kwargs={"allow_rechunk": True},
    ).rename("dilated_mask")

    bands_to_mask = [v for v in ds.data_vars if v != scl_band_name]

    # Align masking array to each band (xarray broadcasting handles extra dims)
    out = ds.copy()
    for var in bands_to_mask:
        da = out[var]
        # Mask (set to NaN where dilated mask is True)
        # Preserve dtype for integer bands by promoting to float to allow NaNs
        if np.issubdtype(da.dtype, np.integer):
            da = da.astype(np.float32)
        out[var] = da.where(~dilated)

    out = out.drop_vars(scl_band_name)

    # Build a keep mask over time dimension
    keep = xr.concat(
        [out[v].notnull().any((ydim, xdim)) for v in bands_to_mask], dim="vars"
    ).any("vars")

    # Apply time filtering
    out = out.sel({time_band_name: keep})

    return out


# Usage examples
masked_ds = mask_scl_dilation(datacube, time_band_name="time", scl_band_name="scl")
masked_ds
Loading...

Compute NDVI

def compute_ndvi(ds: xr.Dataset) -> xr.DataArray:
    nir = ds["b08"].astype("float32")
    red = ds["b04"].astype("float32")
    num = nir - red
    den = nir + red
    ndvi = xr.where(den != 0, num / den, np.nan)
    ndvi.name = "NDVI"

    return ndvi


ndvi_dataset = compute_ndvi(masked_ds)
ndvi_dataset
Loading...

Temporal Aggregation of NDVI

def aggregate_to_dekad(ds: xr.Dataset, time_dim="time") -> xr.Dataset:
    """
    Group by dekad (10-day periods) and compute mean NDVI.
    """
    # Create dekad labels
    time_index = ds[time_dim].to_index()
    dekad_labels = [f"{d.year}-D{(d.dayofyear - 1) // 10 + 1:02}" for d in time_index]
    ds = ds.assign_coords(dekad=("time", dekad_labels))

    # Group by dekad and reduce
    agg_dekad = ds.groupby("dekad").mean(dim="time", keep_attrs=True)

    # Convert dekad labels back to timestamps (approximate mid-dekad date)
    dekad_dates = [
        pd.to_datetime(label.split("-D")[0])
        + pd.to_timedelta((int(label.split("-D")[1]) - 1) * 10 + 5, unit="D")
        for label in agg_dekad.dekad.values
    ]

    # Assign new 'time' coordinate and set it as index
    agg_dekad = (
        agg_dekad.assign_coords(time=("dekad", dekad_dates))
        .swap_dims({"dekad": "time"})
        .drop_vars("dekad")
    )

    return agg_dekad


agg_dekad = aggregate_to_dekad(ndvi_dataset)
agg_dekad
Loading...

Temporal Gap-Filling of Aggregated Mean NDVI

def interpolate_and_filter(ds: xr.Dataset, year: int, time_dim="time") -> xr.Dataset:
    """
    Linearly interpolate over missing dekads and filter to one calendar year.
    """
    # Ensure time is one chunk
    ds = ds.chunk({time_dim: -1})

    # Interpolate over time
    interpolated = ds.interpolate_na(
        dim=time_dim, method="linear", fill_value="extrapolate"
    )

    # Filter to calendar year
    start = f"{year}-01-01"
    end = f"{year}-12-31"
    return interpolated.sel({time_dim: slice(start, end)})


ndvi_dekad = interpolate_and_filter(agg_dekad, year=2019)
ndvi_dekad
Loading...

Reducing Spatial Dimension

mean_df = ndvi_dekad.mean(dim=["x", "y"]).to_series()
std_df = ndvi_dekad.std(dim=["x", "y"]).to_series()

Visualising the Pattern

timeser = pd.concat([mean_df, std_df], axis=1).dropna()
timeser.columns = ["Mean NDVI", "SD"]
timeser.index = pd.to_datetime(timeser.index)
timeser = timeser.sort_index()

plt.plot(timeser["Mean NDVI"], "o-")
plt.fill_between(
    timeser.index,
    timeser["Mean NDVI"] - timeser["SD"],
    timeser["Mean NDVI"] + timeser["SD"],
    color="gray",
    alpha=0.2,
)
plt.ylim(0, 1)
plt.xticks(rotation=45)
plt.title("Potato")
plt.show()
<Figure size 640x480 with 1 Axes>
cluster.close()