Motivation

The randomization test p-value proposed by Al-Abadleh et al. (2021) is useful for estabilishing statistical significance of differences in air quality measures under minimal modeling assumptions. However, the computation of the p-value can be time-consumming for each of numerous pollutants, stations, years, months, etc.

The following R code provides an example of computing these p-values on multiple cores in parallel (e.g., over a server). Essentially this will consist of a for-loop over all combinations of pollutants, stations, years, and months in the pollutant_data dataset. Each iteration of the for-loop is independent of the others, such that the whole loop is embarassingly parallelizable and thus can be easily run on parallel cores.

Setup

First we load the required packages:

# packages
require(aq2020)
#> Loading required package: aq2020
require(tidyr)
#> Loading required package: tidyr
require(dplyr)
#> Loading required package: dplyr
#> 
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#> 
#>     filter, lag
#> The following objects are masked from 'package:base':
#> 
#>     intersect, setdiff, setequal, union
require(lubridate)
#> Loading required package: lubridate
#> 
#> Attaching package: 'lubridate'
#> The following objects are masked from 'package:base':
#> 
#>     date, intersect, setdiff, union
require(parallel)
#> Loading required package: parallel

In large-scale computations, it is best to save results after each iteration of the for-loop rather than wait for the entire loop to terminate. This way if anything goes wrong part way through we don’t need to restart from scratch.

# where to save pvalue calculations
data_path <- file.path("data", "pollutant", "pvalue")

#' Helper function to standardize file names.
#'
#' @param path Path to the file.
#' @param ... Elements of the file name, to be concatenated into a single string separated by "_".
#' @param ext File extension.
#' @return A string representing the file name.
get_filename <- function(path, ..., ext = "rds") {
  fname <- paste0(c(...), collapse = "_")
  file.path(path, paste0(fname, ".", ext))
}

Now let’s create the functions to compute the absolute difference of medians and range of medians test statistics. For more information on these please see vignette("aq2020").

#' Calculate the absolute difference in medians statistic.
#'
#' @param value A vector of length `n_obs` of daily median data.
#' @param group A grouping vector of length `n_obs` indicating whether the observation is pre or post lockdown data.  Any vector for which `length(unique(group)) == 2` will work.
#' @return The difference in medians statistic.
#' @details When `length(unique(group)) = n_group < 2`, the function returns `NA`.  This is so the p-value calculation doesn't crash when e.g., there is no data in one of the groups.  When `n_group > 2` an error is thrown.
med_diff <- function(value, group) {
  n_group <- length(unique(group))
  if(n_group > 2) {
    stop("group must consist of at most 2 unique groups.")
  }
  if(n_group < 2) return(NA)
  meds <- tapply(value, group, median)
  dmeds <- abs(meds[1] - meds[2])
  setNames(dmeds, nm = "med_diff")
}

#' Calculate the range between group medians statistic.
#' 
#' @param value A vector of length `n_obs` of daily median data.
#' @param group A grouping vector of length `n_obs` indicating whether the observation is pre or post lockdown data.  Any vector for which `length(unique(group)) >= 2` will work.
#' @return The range between group medians statistic.
#' @details When `length(unique(group)) = n_group < 2`, the function returns `NA`.  This is so the p-value calculation doesn't crash when e.g., there is no data in one of the groups.
med_range <- function(value, group) {
  if(length(unique(group)) < 2) return(NA)
  meds <- tapply(value, group, median)
  rmeds <- max(meds) - min(meds)
  setNames(rmeds, nm = "med_range")
}

Now we define the core function pval_calc() which will be applied at each step of the for-loop to each combination of pollutants.

#' Compute the randomization p-value for each combination of station, pollutant and month.
#'
#' @param station Station name.
#' @param pollutant Pollutant name.
#' @param months Vector of integers between 1 and 12.
#' @param no_wknd Logical, whether to exclude weekends.
#' @param nsim Number of permutations to use in the Monte Carlo calculation.
#'
#' @return A tibble with columns:
#' \describe{
#'   \item{`Station`}{Station name.}
#'   \item{`Pollutant`}{Pollutant name.}
#'   \item{`Month`}{Month of the year.}
#'   \item{`Period`}{Either "2017-2019" or "2020".}
#'   \item{`Stat`}{Name of the computed statistic.  These are: `Tobs`, `pval`, `med`, and `n`.}
#'   \item{`Value`}{Value of the computed statistic.}
#' }
pval_calc <- function(station, pollutant, months, no_wknd = TRUE, nsim) {
  # helper function for summarize() with tibbles
  set_tibble <- function(x, nm) {
    tibble(Value = setNames(x, NULL),
           Stat = nm)
  }
  message("Station: ", station, ", Pollutant: ", pollutant)
  # prepare data for p-values
  poll_data <- pollutant_data %>%
    filter(Station == station & Pollutant == pollutant) %>%
    mutate(Year = year(Date),
           Month = month(Date, label = TRUE, abbr = FALSE),
           Day = day(Date)) %>%
    filter(Month %in% month(months, label = TRUE, abbr = FALSE)) %>%
    filter(!is.na(Concentration))
  if(no_wknd) {
    poll_data <- poll_data %>%
      mutate(Weekday = wday(Date, label = TRUE)) %>%
      filter(!Weekday %in% c("Sat", "Sun")) %>%
      select(-Weekday)
  }
  # pvalue calculations
  stat_names <- c("Tobs", "pval", "median", "n")
  tm <- system.time({
    # 2017-2019
    pv_ref <- poll_data %>%
      filter(Year != 2020) %>%
      group_by(Month) %>%
      summarize(set_tibble(
        x = c(fisher_pv(group = Year,
                        value = Concentration,
                        nsim = nsim,
                        Tfun = med_range),
              median(Concentration),
              length(Concentration)),
        nm = stat_names),
        .groups = "drop") %>%
      mutate(Period = "2017-2019")
    # 2020
    pv_2020 <- poll_data %>%
      mutate(Year = ifelse(Year == 2020, "2020", "2017-2019")) %>%
      group_by(Month) %>%
      summarize(set_tibble(
        x = c(fisher_pv(group = Year,
                        value = Concentration,
                        nsim = nsim,
                        Tfun = med_diff),
              median(Concentration[Year == "2020"]),
              length(Concentration[Year == "2020"])),
        nm = stat_names),
        .groups = "drop") %>%
      mutate(Period = "2020")
    pv_data <- bind_rows(pv_ref, pv_2020)
  })
  message("Time: ", round(tm[3], 1), " seconds")
  # append station and pollutant information
  pv_data <- pv_data %>%
    mutate(Station = station,
           Pollutant = pollutant) %>%
    select(Station, Pollutant, Month, Period, Stat, Value)
}

Now let’s check what pval_calc() computes:

pv <- pval_calc(station = "Milton",
                pollutant = "O3",
                months = 11:12,
                no_wknd = TRUE,
                nsim = 100)
#> Station: Milton, Pollutant: O3
#> Time: 0.1 seconds
pv %>% pivot_wider(names_from = "Stat", values_from = "Value")
#> # A tibble: 4 x 8
#>   Station Pollutant Month    Period     Tobs  pval median     n
#>   <chr>   <chr>     <ord>    <chr>     <dbl> <dbl>  <dbl> <dbl>
#> 1 Milton  O3        November 2017-2019 NA    NA      22.8    21
#> 2 Milton  O3        December 2017-2019  1.47  0.7    19.2    28
#> 3 Milton  O3        November 2020       1.71  0.75   21.0    21
#> 4 Milton  O3        December 2020       1.15  0.42   20.4    23

As we can see, the output consists of the station, pollutant and months requested, the period, and four computed statistics. These are:

  • Tobs: The value of the test statistic, which is difference in medians for the 2020 period and range of medians for the 2017-2019 period.
  • pval: The corresponding randomization p-value.
  • median: The median concentration value during either period.
  • n: The sample size in either period.

Note that Tobs and pval in November 2017-2019 are NA. Let’s take a closer look at why this is:

pollutant_data %>%
  mutate(Year = factor(year(Date))) %>%
  filter(
    Station == "Milton",
    Pollutant == "O3",
    month(Date) == 11
    ) %>%
  group_by(Year, .drop = FALSE) %>%
  summarize(n = n())
#> # A tibble: 4 x 2
#>   Year      n
#>   <fct> <int>
#> 1 2017      0
#> 2 2018      0
#> 3 2019     30
#> 4 2020     30

As we can see it is because there is no \(\text{O}_3\) pollutant data for Milton in November of 2017 and 2018, such that the reference set 2017-2019 consists of only one year and thus the range-of-medians statistic becomes meaningless.

P-value Calculation

We are now ready to calculate the p-value for all station/pollutant/month combinations. Since pval_calc() accepts a vector of months, the for-loop need only run over all 123 possible station/pollutant combinations:

# all station/pollutant combinations
pollutant_info %>% filter(has_poll)
#> # A tibble: 123 x 5
#>    station    station_id pollutant poll_id has_poll
#>    <chr>           <dbl> <chr>       <dbl> <lgl>   
#>  1 Barrie          47045 O3            122 TRUE    
#>  2 Barrie          47045 PM25          124 TRUE    
#>  3 Barrie          47045 NO2            36 TRUE    
#>  4 Belleville      54012 O3            122 TRUE    
#>  5 Belleville      54012 PM25          124 TRUE    
#>  6 Belleville      54012 NO2            36 TRUE    
#>  7 Brantford       21005 O3            122 TRUE    
#>  8 Brantford       21005 PM25          124 TRUE    
#>  9 Brantford       21005 NO2            36 TRUE    
#> 10 Burlington      44008 O3            122 TRUE    
#> # … with 113 more rows

First let’s set up our R session for parallel processing the calculations in the for-loop:

# parallel job specification
nsim <- 1e4 # number of random permutations
months <- 1:12 # months for which to calculate p-values
no_wknd <- TRUE # exclude weekends
# which station/pollutant combinations
# from pollutant_info to run
job_ind <- which(pollutant_info$has_poll)
# cluster setup

# number of cores to use.
# this uses all available cores, but a different number can be set.
ncores <- detectCores(logical = FALSE) 

# parallel processing seed
RNGkind("L'Ecuyer-CMRG") 

# create the parallel cluster
cl <- makeCluster(spec = ncores) 


# load all packages on each cluster.
invisible(
  clusterEvalQ(cl, {
    require(aq2020)
    require(tidyr)
    require(dplyr)
    require(lubridate)
    require(parallel)
  })
)

# copy all R objects required for the calculation onto each core.
# it's often simplest to just copy everything in the workspace.
clusterExport(cl, varlist = ls()[ls() != "cl"])

Now we’re ready to run the p-value computation in parallel. This is done with the function parallel::parLapply(). Unlike regular R for-loops, parLapply() does the entire for-loop computation in a single function call. Therefore, if at step there is an error all previous computations are lost. Therefore, the parallel computation is set up so that:

  1. The p-value calculation is saved at the end of each step.
  2. parLapply() is instructed to flag an error at each step rather than throw one, using tryCatch(). This way parLapply() will never throw an error, but you can find out after the fact which steps of the computation failed by inspecting the contents of bad_ind.
system.time({
  bad_ind <- parLapply(cl, job_ind, fun = function(ii) {
    station <- pollutant_info$station[ii]
    pollutant <- pollutant_info$pollutant[ii]
    # calculate p-value in tryCatch block to catch/flag any errors
    pv_data <- tryCatch(
      pval_calc(station = station, pollutant = pollutant,
                months = months, no_wknd = no_wknd, nsim = nsim),
      error = function(e) NULL
    )
    # save data
    if(!is.null(pv_data)) {
      saveRDS(pv_data,
              file = get_filename(path = data_path,
                                  station, pollutant))
    }
    # return TRUE if p-value computation failed and FALSE otherwise
    is.null(pv_data) 
  })
})

Once the calculation is done, we can check whether any of the jobs failed:

# if you are in an interactive R session, you can simply do this: 
## any(unlist(bad_ind))

# if you ran R as a batch job which exists after the computation is done,
# you can regenerate the contents of bad_ind like this: 
bad_ind <- lapply(job_ind, function(ii) {
  station <- pollutant_info$station[ii]
  pollutant <- pollutant_info$pollutant[ii]
  pv_data <- tryCatch(
    readRDS(file = get_filename(path = data_path,
                                station, pollutant)),
    error = function(e) NULL
  )
  is.null(pv_data)
})

any(unlist(bad_ind))
#> [1] FALSE

If the result is other than FALSE, you can rerun parLapply() over the jobs in which(unlist(bad_ind)).

Finally, when the parallel computation is done you must shut down the cluster:

Combining Results

The computation above were saved in separate files for each station/pollutant combination. The following code puts these together into a single dataset containing the following columns:

  • Station: Station name.
  • Pollutant: Pollutant name.
  • Period: 2017-2019 or 2020.
  • Month: Name of the month.
  • Ndays: Number of days in the given Month and Period.
  • Median: Median concentration over all days in the given Month and Period.
  • Pval: P-value of the randomization test.
pollutant_pval <- lapply(job_ind, function(ii) {
  station <- pollutant_info$station[ii]
  pollutant <- pollutant_info$pollutant[ii]
  pv_data <- readRDS(file = get_filename(path = data_path,
                                         station, pollutant))
  pv_data %>%
    pivot_wider(names_from = "Stat", values_from = "Value") %>%
    select(Station, Pollutant, Period, Month,
           Ndays = n, Median = median, Pval = pval)
}) %>% bind_rows()

This dataset is available in the aq2020 dataset as pollutant_pval:

aq2020::pollutant_pval # dataset inside the aq2020 package
#> # A tibble: 2,943 x 7
#>    Station Pollutant Period    Month     Ndays Median   Pval
#>    <chr>   <chr>     <chr>     <ord>     <dbl>  <dbl>  <dbl>
#>  1 Barrie  O3        2017-2019 January      68   24.5 0.341 
#>  2 Barrie  O3        2017-2019 February     60   29.7 0.245 
#>  3 Barrie  O3        2017-2019 March        66   34.4 0.0094
#>  4 Barrie  O3        2017-2019 April        63   34.4 0.129 
#>  5 Barrie  O3        2017-2019 May          69   31.7 0.0027
#>  6 Barrie  O3        2017-2019 June         63   26.6 0.820 
#>  7 Barrie  O3        2017-2019 July         66   26.6 0.0777
#>  8 Barrie  O3        2017-2019 August       68   23.5 0.957 
#>  9 Barrie  O3        2017-2019 September    62   20   0.899 
#> 10 Barrie  O3        2017-2019 October      68   18.6 0.0392
#> # … with 2,933 more rows

References

Al-Abadleh, H.A., Lysy, M., Neil, L., Patel, P., Mohammed, W., and Khalaf, Y., 2021. Rigorous quantification of statistical significance of the covid-19 lockdown effect on air quality: The case from ground-based measurements in Ontario, Canada. Journal of Hazardous Materials, 413, 125445.