aboutsummaryrefslogtreecommitdiff
path: root/analysis/R/association.R
blob: d1c7b5ee6af1c2a8029ae8c24cbde694eb9d2f38 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
# Copyright 2014 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

library(parallel)  # mclapply

source.rappor <- function(rel_path)  {
  abs_path <- paste0(Sys.getenv("RAPPOR_REPO", ""), rel_path)
  source(abs_path)
}

source.rappor("analysis/R/util.R")  # for Log
source.rappor("analysis/R/decode.R")  # for ComputeCounts

#
# Tools used to estimate variable distributions of up to three variables
#     in RAPPOR. This contains the functions relevant to estimating joint
#     distributions.

GetOtherProbs <- function(counts, map_by_cohort, marginal, params, pstar,
                          qstar) {
  # Computes the marginal for the "other" category.
  #
  # Args:
  #   counts: m x (k+1) matrix with counts of each bit for each
  #       cohort (m=#cohorts total, k=# bits in bloom filter), first column
  #       stores the total counts
  #   map_by_cohort: list of matrices encoding locations of hashes for each
  #       string "other" category)
  #   marginal: object containing the estimated frequencies of known strings
  #       as well as the strings themselves, variance, etc.
  #   params: RAPPOR encoding parameters
  #
  # Returns:
  #   List of vectors of probabilities that each bit was set by the "other"
  #   category.  The list is indexed by cohort.

  N <- sum(counts[, 1])

  # Counts of known strings to remove from each cohort.
  known_counts <- ceiling(marginal$proportion * N / params$m)
  sum_known <- sum(known_counts)

  # Select only the strings we care about from each cohort.
  # NOTE: drop = FALSE necessary if there is one candidate
  candidate_map <- lapply(map_by_cohort, function(map_for_cohort) {
    map_for_cohort[, marginal$string, drop = FALSE]
  })

  # If no strings were found, all nonzero counts were set by "other"
  if (length(marginal) == 0) {
    probs_other <- apply(counts, 1, function(cohort_row) {
      cohort_row[-1] / cohort_row[1]
    })
    return(as.list(as.data.frame(probs_other)))
  }

  # Counts set by known strings without noise considerations.
  known_counts_by_cohort <- sapply(candidate_map, function(map_for_cohort) {
    as.vector(as.matrix(map_for_cohort) %*% known_counts)
  })

  # Protect against R's matrix/vector confusion.  This ensures
  # known_counts_by_cohort is a matrix in the k=1 case.
  dim(known_counts_by_cohort) <- c(params$m, params$k)

  # Counts set by known vals zero bits adjusting by p plus true bits
  # adjusting by q.
  known_counts_by_cohort <- (sum_known - known_counts_by_cohort) * pstar +
                            known_counts_by_cohort * qstar

  # Add the left hand sums to make it a m x (k+1) "counts" matrix
  known_counts_by_cohort <- cbind(sum_known, known_counts_by_cohort)

  # Counts set by the "other" category.
  reduced_counts <- counts - known_counts_by_cohort
  reduced_counts[reduced_counts < 0] <- 0
  probs_other <- apply(reduced_counts, 1, function(cohort_row) {
    cohort_row[-1] / cohort_row[1]
  })

  # Protect against R's matrix/vector confusion.
  dim(probs_other) <- c(params$k, params$m)

  probs_other[probs_other > 1] <- 1
  probs_other[is.nan(probs_other)] <- 0
  probs_other[is.infinite(probs_other)] <- 0

  # Convert it from a k x m matrix to a list indexed by m cohorts.
  # as.data.frame makes each cohort a column, which can be indexed by
  # probs_other[[cohort]].
  result <- as.list(as.data.frame(probs_other))

  result
}

GetCondProbBooleanReports <- function(reports, pstar, qstar, num_cores) {
  # Compute conditional probabilities given a set of Boolean reports.
  #
  # Args:
  #   reports: RAPPOR reports as a list of bit arrays (of length 1, because
  #   this is a boolean report)
  #   pstar, qstar: standard params computed from from rappor parameters
  #   num_cores: number of cores to pass to mclapply to parallelize apply
  #
  # Returns:
  #   Conditional probability of all boolean reports corresponding to
  #   candidates (TRUE, FALSE)

  # The values below are p(report=1|value=TRUE), p(report=1|value=FALSE)
  cond_probs_for_1 <- c(qstar, pstar)
  # The values below are p(report=0|value=TRUE), p(report=0|value=FALSE)
  cond_probs_for_0 <- c(1 - qstar,  1 - pstar)

  cond_report_dist <- mclapply(reports, function(report) {
    if (report[[1]] == 1) {
      cond_probs_for_1
    } else {
      cond_probs_for_0
    }
  }, mc.cores = num_cores)
  cond_report_dist
}

GetCondProbStringReports <- function(reports, cohorts, map, m, pstar, qstar,
                                     marginal, prob_other = NULL, num_cores) {
  # Wrapper around GetCondProb. Given a set of reports, cohorts, map and
  # parameters m, p*, and q*, it first computes bit indices by cohort, and
  # then applies GetCondProb individually to each report.
  #
  # Args:
  #   reports: RAPPOR reports as a list of bit arrays
  #   cohorts: cohorts corresponding to these reports as a list
  #   map: map file
  #   m, pstar, qstar: standard params computed from from rappor parameters
  #   marginal: list containing marginal estimates (output of Decode)
  #   prob_other: vector of length k, indicating how often each bit in the
  #     Bloom filter was set by a string in the "other" category.
  #
  # Returns:
  #   Conditional probability of all reports given each of the strings in
  #   marginal$string

  # Get bit indices that are set per candidate per cohort
  bit_indices_by_cohort <- lapply(1:m, function(cohort) {
    map_for_cohort <- map$map_by_cohort[[cohort]]
    # Find the bits set by the candidate strings
    bit_indices <- lapply(marginal$string, function(x) {
      which(map_for_cohort[, x])
    })
    bit_indices
  })

  # Apply GetCondProb over all reports
  cond_report_dist <- mclapply(seq(length(reports)), function(i) {
    cohort <- cohorts[i]
    #Log('Report %d, cohort %d', i, cohort)
    bit_indices <- bit_indices_by_cohort[[cohort]]
    GetCondProb(reports[[i]], pstar, qstar, bit_indices,
                prob_other = prob_other[[cohort]])
  }, mc.cores = num_cores)
  cond_report_dist
}


GetCondProb <- function(report, pstar, qstar, bit_indices, prob_other = NULL) {
  # Given the observed bit array, estimate P(report | true value).
  # Probabilities are estimated for all truth values.
  #
  # Args:
  #   report: A single observed RAPPOR report (binary vector of length k).
  #   params: RAPPOR parameters.
  #   bit_indices: list with one entry for each candidate.  Each entry is an
  #     integer vector of length h, specifying which bits are set for the
  #     candidate in the report's cohort.
  #   prob_other: vector of length k, indicating how often each bit in the
  #     Bloom filter was set by a string in the "other" category.
  #
  # Returns:
  #   Conditional probability of report given each of the strings in
  #       candidate_strings
  ones <- sum(report)
  zeros <- length(report) - ones
  probs <- ifelse(report == 1, pstar, 1 - pstar)

  # Find the likelihood of report given each candidate string
  prob_obs_vals <- sapply(bit_indices, function(x) {
    prod(c(probs[-x], ifelse(report[x] == 1, qstar, 1 - qstar)))
  })

  # Account for the "other" category
  if (!is.null(prob_other)) {
    prob_other <- prod(c(prob_other[which(report == 1)],
                         (1 - prob_other)[which(report == 0)]))
    c(prob_obs_vals, prob_other)
  } else {
    prob_obs_vals
  }
}

UpdatePij <- function(pij, cond_prob) {
  # Update the probability matrix based on the EM algorithm.
  #
  # Args:
  #   pij: conditional distribution of x (vector)
  #   cond_prob: conditional distribution computed previously
  #
  # Returns:
  #   Updated pijs from em algorithm (maximization)

  # NOTE: Not using mclapply here because we have a faster C++ implementation.
  # mclapply spawns multiple processes, and each process can take up 3 GB+ or 5
  # GB+ of memory.
  wcp <- lapply(cond_prob, function(x) {
    z <- x * pij
    z <- z / sum(z)
    z[is.nan(z)] <- 0
    z
  })
  Reduce("+", wcp) / length(wcp)
}

ComputeVar <- function(cond_prob, est) {
  # Computes the variance of the estimated pij's.
  #
  # Args:
  #   cond_prob: conditional distribution computed previously
  #   est: estimated pij's
  #
  # Returns:
  #   Variance of the estimated pij's

  inform <- Reduce("+", lapply(cond_prob, function(x) {
    (outer(as.vector(x), as.vector(x))) / (sum(x * est))^2
  }))
  var_cov <- solve(inform)
  sd <- matrix(sqrt(diag(var_cov)), dim(cond_prob[[1]]))
  list(var_cov = var_cov, sd = sd, inform = inform)
}

EM <- function(cond_prob, starting_pij = NULL, estimate_var = FALSE,
               max_em_iters = 1000, epsilon = 10^-6, verbose = FALSE) {
  # Performs estimation.
  #
  # Args:
  #   cond_prob: conditional distribution computed previously
  #   starting_pij: estimated pij's
  #   estimate_var: flags whether we should estimate the variance
  #       of our computed distribution
  #   max_em_iters: maximum number of EM iterations
  #   epsilon: convergence parameter
  #   verbose: flags whether to display error data
  #
  # Returns:
  #   Estimated pij's, variance, error params

  pij <- list()
  state_space <- dim(cond_prob[[1]])
  if (is.null(starting_pij)) {
    pij[[1]] <- array(1 / prod(state_space), state_space)
  } else {
    pij[[1]] <- starting_pij
  }

  i <- 0  # visible outside loop
  if (nrow(pij[[1]]) > 0) {
    # Run EM
    for (i in 1:max_em_iters) {
      pij[[i + 1]] <- UpdatePij(pij[[i]], cond_prob)
      dif <- max(abs(pij[[i + 1]] - pij[[i]]))
      if (dif < epsilon) {
        break
      }
      Log('EM iteration %d, dif = %e', i, dif)
    }
  }
  # Compute the variance of the estimate.
  est <- pij[[length(pij)]]
  if (estimate_var) {
    var_cov <- ComputeVar(cond_prob, est)
    sd <- var_cov$sd
    inform <- var_cov$inform
    var_cov <- var_cov$var_cov
  } else {
    var_cov <- NULL
    inform <- NULL
    sd <- NULL
  }
  list(est = est, sd = sd, var_cov = var_cov, hist = pij, num_em_iters = i)
}

TestIndependence <- function(est, inform) {
  # Tests the degree of independence between variables.
  #
  # Args:
  #   est: esimated pij values
  #   inform: information matrix
  #
  # Returns:
  #   Chi-squared statistic for whether two variables are independent

  expec <- outer(apply(est, 1, sum), apply(est, 2, sum))
  diffs <- matrix(est - expec, ncol = 1)
  stat <- t(diffs) %*% inform %*% diffs
  df <- (nrow(est) - 1) * (ncol(est) - 1)
  list(stat = stat, pval = pchisq(stat, df, lower = FALSE))
}

UpdateJointConditional <- function(cond_report_dist, joint_conditional = NULL) {
  # Updates the joint conditional  distribution of d variables, where
  #     num_variables is chosen by the client. Since variables are conditionally
  #     independent of one another, this is basically an outer product.
  #
  # Args:
  #   joint_conditional: The current state of the joint conditional
  #       distribution. This is a list with as many elements as there
  #       are reports.
  #   cond_report_dist: The conditional distribution of variable x, which will
  #       be outer-producted with the current joint conditional.
  #
  # Returns:
  #   A list of same length as joint_conditional containing the joint
  #       conditional distribution of all variables. If I want
  #       P(X'=x',Y=y'|X=x,Y=y), I will look at
  #       joint_conditional[x,x',y,y'].

  if (is.null(joint_conditional)) {
    lapply(cond_report_dist, function(x) array(x))
  } else {
    mapply("outer", joint_conditional, cond_report_dist,
           SIMPLIFY = FALSE)
  }
}

ComputeDistributionEM <- function(reports, report_cohorts, maps,
                                  ignore_other = FALSE,
                                  params = NULL,
                                  params_list = NULL,
                                  marginals = NULL,
                                  estimate_var = FALSE,
                                  num_cores = 10,
                                  em_iter_func = EM,
                                  max_em_iters = 1000) {
  # Computes the distribution of num_variables variables, where
  #     num_variables is chosen by the client, using the EM algorithm.
  #
  # Args:
  #   reports: A list of num_variables elements, each a 2-dimensional array
  #       containing the counts of each bin for each report
  #   report_cohorts: A num_variables-element list; the ith element is an array
  #       containing the cohort of jth report for ith variable.
  #   maps: A num_variables-element list containing the map for each variable
  #   ignore_other: A boolean describing whether to compute the "other" category
  #   params: RAPPOR encoding parameters.  If set, all variables are assumed to
  #       be encoded with these parameters.
  #   params_list: A list of num_variables elements, each of which is the
  #       RAPPOR encoding parameters for a variable (a list itself).  If set,
  #       it must be the same length as 'reports'.
  #   marginals: List of estimated marginals for each variable
  #   estimate_var: A flag telling whether to estimate the variance.
  #   em_iter_func: Function that implements the iterative EM algorithm.

  # Handle the case that the client wants to find the joint distribution of too
  # many variables.
  num_variables <- length(reports)

  if (is.null(params) && is.null(params_list)) {
    stop("Either params or params_list must be passed")
  }

  Log('Computing joint conditional')

  # Compute the counts for each variable and then do conditionals.
  joint_conditional = NULL
  found_strings <- list()

  for (j in (1:num_variables)) {
    Log('Processing var %d', j)

    var_report <- reports[[j]]
    var_cohort <- report_cohorts[[j]]
    var_map <- maps[[j]]
    if (!is.null(params)) {
      var_params <- params
    } else {
      var_params <- params_list[[j]]
    }

    var_counts <- NULL
    if (is.null(marginals)) {
      Log('\tSumming bits to gets observed counts')
      var_counts <- ComputeCounts(var_report, var_cohort, var_params)

      Log('\tDecoding marginal')
      marginal <- Decode(var_counts, var_map$all_cohorts_map, var_params,
                         quiet = TRUE)$fit
      Log('\tMarginal for var %d has %d values:', j, nrow(marginal))
      print(marginal[, c('estimate', 'proportion')])  # rownames are the string
      cat('\n')

      if (nrow(marginal) == 0) {
        Log('ERROR: Nothing decoded for variable %d', j)
        return (NULL)
      }
    } else {
      marginal <- marginals[[j]]
    }
    found_strings[[j]] <- marginal$string

    p <- var_params$p
    q <- var_params$q
    f <- var_params$f
    # pstar and qstar needed to compute other probabilities as well as for
    # inputs to GetCondProb{Boolean, String}Reports subsequently
    pstar <- (1 - f / 2) * p + (f / 2) * q
    qstar <- (1 - f / 2) * q + (f / 2) * p
    k <- var_params$k

    # Ignore other probability if either ignore_other is set or k == 1
    # (Boolean RAPPOR)
    if (ignore_other || (k == 1)) {
      prob_other <- vector(mode = "list", length = var_params$m)
    } else {
      # Compute the probability of the "other" category
      if (is.null(var_counts)) {
        var_counts <- ComputeCounts(var_report, var_cohort, var_params)
      }
      prob_other <- GetOtherProbs(var_counts, var_map$map_by_cohort, marginal,
                                  var_params, pstar, qstar)
      found_strings[[j]] <- c(found_strings[[j]], "Other")
    }

    # Get the joint conditional distribution
    Log('\tGetCondProb for each report (%d cores)', num_cores)

    # TODO(pseudorandom): check RAPPOR type more systematically instead of by
    # checking if k == 1
    if (k == 1) {
      cond_report_dist <- GetCondProbBooleanReports(var_report, pstar, qstar,
                                                    num_cores)
    } else {
      cond_report_dist <- GetCondProbStringReports(var_report,
                                var_cohort, var_map, var_params$m, pstar, qstar,
                                marginal, prob_other, num_cores)
    }

    Log('\tUpdateJointConditional')

    # Update the joint conditional distribution of all variables
    joint_conditional <- UpdateJointConditional(cond_report_dist,
                                                joint_conditional)
  }

  N <- length(joint_conditional)
  dimensions <- dim(joint_conditional[[1]])
  # e.g. 2 x 3
  dimensions_str <- paste(dimensions, collapse = ' x ')
  total_entries <- prod(c(N, dimensions))

  Log('Starting EM with N = %d matrices of size %s (%d entries)',
      N, dimensions_str, total_entries)

  start_time <- proc.time()[['elapsed']]

  # Run expectation maximization to find joint distribution
  em <- em_iter_func(joint_conditional, max_em_iters=max_em_iters,
                     epsilon = 10 ^ -6, verbose = FALSE,
                     estimate_var = estimate_var)

  em_elapsed_time <- proc.time()[['elapsed']] - start_time

  dimnames(em$est) <- found_strings
  # Return results in a usable format
  list(fit = em$est,
       sd = em$sd,
       em_elapsed_time = em_elapsed_time,
       num_em_iters = em$num_em_iters,
       # This last field is implementation-specific; it can be used for
       # interactive debugging.
       em = em)
}