Fitting logistic regression on 100gb dataset on a laptop

· · Read in about 9 min · (1865 words)

EDIT: Thanks for comments, I created repository with full end-to-end reproducible code. You can find it here – https://github.com/dselivanov/kaggle-outbrain.

This is continue of Lessons learned from “Outbrain Click Prediction” kaggle competition (part 1). As a quick recap – we achieved MAP@12 ~ 0.67 which is equal to ~90-100 position on leaderboard. And we didn’t use information about page views from 100gb (30gb compressed) page_views.csv.zip file.

Splitting

As it is impossible to read zip file with R line by line (at least I don’t know solution) we will split file into many “mini-batches” in a way that each such batch can be efficiently read from disk into RAM. Moreover this will allow to process chunks in parallel. As mentioned in first part your best friend are data.table and UNIX CLI. We will use split utility to split file. Moreover we will gzip each chunk, so they won’t occupy 100gb on disk.

mkdir page_views_splits
unzip -p page_views.csv.zip | split --line-bytes=300m --filter='gzip --fast > ./page_views_splits/$FILE.gz'

In a pipe we: 1. uncompress zip stream 1. with argument --line-bytes=300m we create batches of 300 mb (command doesn’t break lines!) 1. compress to gzip file (fast comression, minimal compression rate) 1. save it to page_views_splits directory which we created at first line

This will take ~ 20-30 minutes.

For OS X some modifications needed. First install gnu command line utils coreutils – built in tools are heavily outdated:

brew install coreutils

Then use gsplit instead of split

mkdir page_views_splits
unzip -p page_views.csv.zip | gsplit --line-bytes=300m --filter='gzip --fast > ./page_views_splits/$FILE.gz'

Parsing

Now we can parse each file in memory and can do it in parallel. Also it is interesting, that page_views.zip contains views not only for users in test and train set, but also for not observed users. While I pretty sure this information can also be usefule, in this post we will filter out views for such users – this will reduce data set significantly (We didn’t use this data in our solution ans as far as I know winners didn’t use it too). So let’s start.

First load events data we prepared in part 1 and find all unique user ids:

library(data.table)
library(Matrix)
library(methods)
library(magrittr)
events = readRDS('~/projects/kaggle/outbrain/data/events.rds')
uuid_events = unique(events$uuid)
rm(events);gc()

We will hash string based uuids into integer ones as we did in part 1:

hash_uuid = function(uuid_string, H_SIZE = 2**28) {
  text2vec:::hasher(uuid_string, H_SIZE)
}

Will process data in 4 threads/processes and partition by uuid into 50 buckets (same number we partitioned in part 1):

library(doParallel)
registerDoParallel(4)
N_PART = 50
for(x in 0:(N_PART - 1)) {
  # here we will put uuid partitioned views for each file
  dir.create(sprintf("~/projects/kaggle/outbrain/data/raw/views/%02d", x))
}
# by default saveRDS can save file with compression level = 6 - very slow
# or without compression - large files.
save_rds_compressed = function(x, file, compr_lvl = 1) {
  con = gzfile(file, open = "wb", compression = compr_lvl)
  saveRDS(x, file = con)
  close.connection(con)
}

We will read file chunks, filter out not relevant data columns and unobserved uuids. On my laptop this tooks ~20 minutes:

colnames = c("uuid", "document_id", "timestamp", "platform", "geo_location", "traffic_source")
fls = list.files("data/raw/page_views_splits/", full.names = TRUE)
foreach(f = fls, .inorder = F, .combine = c, .multicombine = TRUE,
        .options.multicore = list(preschedule = FALSE)) %dopar% {
  if(basename(f) == "xaa.gz") header = T else  header = F
  # will only need c("uuid", "document_id", "timestamp") -  first 3 columns
  # fread can consume UNIX pipe as input, which is not the thing many people know about
  dt = fread(paste("zcat < ", f), header = header, col.names = colnames[1:3], select = 1:3)
  dt[, uuid := hash_uuid(uuid)]
  # filter out not observed uuids
  j = dt[['uuid']] %in% uuid_events
  dt = dt[j, ]
  # partition by uuid and save
  for(i in 0:(N_PART - 1)) {
    out = sprintf("~/projects/kaggle/outbrain/data/raw/views/%02d/%s.rds", i, basename(f))
     # see save_rds_compressed from part 1 of this artice
    save_rds_compressed(dt[uuid %% N_PART == i, ], out)
  }
}

Now for each uuid partition we have many files with views. Convenient way is to collapse them to single file per partition:

for(x in 0:(N_PART - 1))
  dir.create(sprintf("~/projects/kaggle/outbrain/data/raw/views_comb/%02d", x))
res = foreach(chunk = 0:(N_PART - 1), .inorder = FALSE, .multicombine = TRUE,
        .options.multicore = list(preschedule = FALSE)) %dopar% {
          dir = sprintf("~/projects/kaggle/outbrain/data/raw/views/%02d/", chunk)
          fls = list.files(dir)
          dt = fls %>% lapply(function(x) readRDS(paste0(dir, x))) %>% 
            rbindlist()
          save_rds_compressed(dt, sprintf("~/projects/kaggle/outbrain/data/raw/views_comb/%02d.rds", chunk))
        }

And we are done – we can join each chunk from page_views with each chunk of events because they contain information about same group of users – users which uuid have same modulo of division by N_PART = 50.

Adding page views to the model

Recap – in part 1 we have created sparse model matrix for each uuid partition. Also for each uuid partition we have stored dataframe with uuid, ad_id, promo_document_id, campaign_id, advertiser_id, in a was that each row of the dataframe corresponds to row in feature matrix. We stored it here:

events_matrix_dir = "~/projects/kaggle/outbrain/data/events_matrix/"

Now we want to model interaction between previos page views and whether user will click on a given adverisement. In order to do that we will model interactions between each page user viewed and each attribute of advertisement – ad_id, promo_document_id, campaign_id, advertiser_id. We can express them via hashing – same interactions will be hashed to same values. Hash function will be the same we used before – add interaction feature indices (with offset = some hash on feature name).

Let’s start. Views data is here:

views_dir = "~/projects/kaggle/outbrain/data/raw/views_comb/"

We will:

  1. read partition with matrix and corresponding ad context data frame (previously saved in part 1)
  2. read partition with views
  3. join views dataframe (actually data.table) with as context dataframe and calculate interactions with function above
  4. cbind events matrix (from step 1) with views interaction matrix (from step 3)
  5. optionally save new matrix to disk, so in future we can tune model parameters and don’t rebuild feature matrix
string_hasher = function(x, h_size = 2**24) 
  text2vec:::hasher(x, h_size)
dir = "~/projects/kaggle/outbrain/data/events_views_matrix/"
# dir.create(path = dir)
# i = 0
N_PART = 50
# for(i in (0:(N_PART - 1))) {
for(i in (0:(N_PART - 1))) {
  event_data = readRDS(sprintf("%s/%d.rds", events_matrix_dir, i))
  # as we know rows of the data frame corresponds to row number in feature matrix event_data$x
  event_data$dt[, i := 0L:(.N - 1L)]
  setkey(event_data$dt, "uuid")
  # views_data = fst::read.fst(sprintf("%s/%02d.fst", views_dir, i), as.data.table = TRUE)
  views_data = readRDS(sprintf("%s/%02d.rds", views_dir, i))
  # will just use 1-hot encoded page views, so group by uuid, document_id and drop N at the end
  views_data = views_data[, .N, keyby = .(uuid, document_id)][, .(uuid, document_id)]
  # now build interactions - join views and event_data$dt by uuid
  views_h_size = 2**24
  views_interactions = views_data[event_data$dt, .(
    # row index
    i, 
    # column indices
    j1 = (string_hasher("promo_document_id") + 1.0 * promo_document_id * document_id) %% views_h_size,
    j2 = (string_hasher("campaign_id") + 1.0 * campaign_id * document_id) %% views_h_size,
    j3 = (string_hasher("advertiser_id") + 1.0 * advertiser_id * document_id) %% views_h_size,
    j4 = (string_hasher("advertiser_id + campaign_id") + 1.0 * advertiser_id * campaign_id * document_id) %% views_h_size
  ), 
  on = .(uuid = uuid), allow.cartesian=TRUE, nomatch = 0]
  # turn df into sparse matrix
  m_views = sparseMatrix(i = rep(views_interactions$i, 4), 
                         j = c(views_interactions$j1, views_interactions$j2, views_interactions$j3, views_interactions$j4), 
                         x = 1, dims = c(nrow(event_data$dt), views_h_size), index1 = F, giveCsparse = F, check = F)
  # cbind with events matrix and convert to CSR
  m = cbind(event_data$x, m_views) %>% as("RsparseMatrix")
  save_rds_compressed(list(x = m, y = event_data$y), file = sprintf("%s/%02d", dir, i))
  message(sprintf("%s - chunk %02d", Sys.time(), i))
}

Also need to prepare matrix for cross-validation. First read cross-validation matrix from part 1:

event_data_cv = readRDS("~/projects/kaggle/outbrain/data/dt_cv_0.rds")
event_data_cv$dt[, i := 0L:(.N - 1L)]
setkey(event_data_cv$dt, "uuid")

As a remider – for CV we saved partition 0.

cv_part = 0
views_data = readRDS(sprintf("%s/%02d.rds", views_dir, cv_part))
# will just use 1-hot encoded page views, so group by uuid, document_id and drop N at the end
views_data = views_data[, .N, keyby = .(uuid, document_id)][, .(uuid, document_id)]

And now join same way:

  views_h_size = 2**24
  views_interactions = views_data[event_data_cv$dt, .(
    # row index
    i, 
    # column indices
    j1 = (string_hasher("promo_document_id") + 1.0 * promo_document_id * document_id) %% views_h_size,
    j2 = (string_hasher("campaign_id") + 1.0 * campaign_id * document_id) %% views_h_size,
    j3 = (string_hasher("advertiser_id") + 1.0 * advertiser_id * document_id) %% views_h_size,
    j4 = (string_hasher("advertiser_id + campaign_id") + 1.0 * advertiser_id * campaign_id * document_id) %% views_h_size
    ), 
  on = .(uuid = uuid), allow.cartesian=TRUE, nomatch = 0]
  # turn df into sparse matrix
  m_views = sparseMatrix(i = rep(views_interactions$i, 4), 
                         j = c(views_interactions$j1, views_interactions$j2, views_interactions$j3, views_interactions$j4), 
                         x = 1, dims = c(nrow(event_data_cv$dt), views_h_size), index1 = F, giveCsparse = F, check = F)
  # cbind with events matrix and convert to CSR
  m = cbind(event_data_cv$x, m_views) %>% as("RsparseMatrix")
  save_rds_compressed(list(x = m, y = event_data_cv$y, dt = event_data_cv$dt), file = "~/projects/kaggle/outbrain/data/cv_views_blog.rds")

Now we are done with data preparation and can continiue with FTRL model. I would like to highlite, that in compressed form, matrix occupies ~25gb on disk. In uncomressed format it is ~100gb.

Fitting FTRL from disk

Goof thing is that now problem is not very different from that we already solved in part 1. Now we will just read our matrix chunk by chunk and update model:

library(FTRL)
m_dir = "~/projects/kaggle/outbrain/data/events_views_matrix/"
m_files = list.files(m_dir)
ftrl = FTRL$new(alpha = 0.05, beta = 0.5, lambda = 10, l1_ratio = 1, dropout = 0)
cv = readRDS("~/projects/kaggle/outbrain/data/dt_cv_0.rds")
i = 1
for( fl in m_files) {
  data = readRDS(sprintf("%s/%s", m_dir, fl))
  ftrl$partial_fit(X = data$x, y = data$y, nthread = 8)
  if(i %% 5 == 0) {
    train_auc = glmnet::auc(data$y, ftrl$predict(data$x))
    p = ftrl$predict(cv$x)
    dt_cv = copy(cv$dt[, .(display_id, clicked = cv$y, p = -p)])
    setkey(dt_cv, display_id, p)
    mean_map12 = dt_cv[ , .(map_12 = 1 / .Internal(which(clicked == 1))), by = display_id][['map_12']] %>% 
      mean %>% round(5)
    cv_auc = glmnet::auc(cv$y, p)
    message(sprintf("%s batch %d train_auc = %f, cv_auc = %f, map@12 = %f", Sys.time(), i, train_auc, cv_auc, mean_map12))
  }
  i = i + 1
}

2017-02-18 19:01:42 batch 5 train_auc = 0.751388, cv_auc = 0.732823, map@12 = 0.658050
2017-02-18 19:02:32 batch 10 train_auc = 0.760015, cv_auc = 0.740163, map@12 = 0.664360
2017-02-18 19:03:24 batch 15 train_auc = 0.767968, cv_auc = 0.743684, map@12 = 0.667110
2017-02-18 19:04:17 batch 20 train_auc = 0.771974, cv_auc = 0.746428, map@12 = 0.669090
2017-02-18 19:05:06 batch 25 train_auc = 0.776669, cv_auc = 0.748072, map@12 = 0.670730
2017-02-18 19:05:57 batch 30 train_auc = 0.780555, cv_auc = 0.749372, map@12 = 0.670810
2017-02-18 19:06:47 batch 35 train_auc = 0.782584, cv_auc = 0.750513, map@12 = 0.672070
2017-02-18 19:07:36 batch 40 train_auc = 0.784387, cv_auc = 0.751552, map@12 = 0.672830
2017-02-18 19:08:25 batch 45 train_auc = 0.787242, cv_auc = 0.752350, map@12 = 0.673350
2017-02-18 19:09:15 batch 50 train_auc = 0.789798, cv_auc = 0.752649, map@12 = 0.673850

This gives us ~ 0.674. After applying leakage this changes to ~0.68 which is around 50 place on leaderboard. Fitting took 9 minutes(incuding time for reading from disk). Awesome, isn’t it?

Conclusion

As we see, it is possible to achieve quite high results on large complex datasets with limited resources. Key components are:

  1. Smart partitioning
  2. Features hashing (and feature interactions extraction)
  3. Online learning

Another takeaway for me is very interesting general purpose machine learning method call Factorization Machines which allows to model feature interactions in a factorized way. More interesting it allows to learn interactions in a linear time (actually amortized linear). I will explore them in a next posts. Stay tuned!