Saturday, July 14, 2018

Best way to process a click stream to create features in Pandas

Leave a Comment

I am processing a dataframe with a click-stream and I'm extracting features for each user in the click-stream to be used in a Machine Learning project.

The dataframe is something like this:

data = pd.DataFrame({'id':['A01','B01','A01','C01','A01','B01','A01'],                      'event':['search','search','buy','home','cancel','home','search'],                      'date':['2018-01-01','2018-01-01','2018-01-02','2018-01-03','2018-01-04','2018-01-04','2018-01-06'],                      'product':['tablet','dvd','tablet','tablet','tablet','book','book'],                      'price': [103,2,203,103,203,21,21]}) data['date'] = pd.to_datetime(data['date']) 

Since I have to create features for each user I'm using a groupby/apply with a custom function like:

featurized = data.groupby('id').apply(featurize) 

Create user features will take a chunk of the dataframe and create many (hundreds) of features. The whole process is just too slow so I'm looking for a recommendation to do this more effciently.

An example of the function used to create features:

def featurize(group):     features = dict()      # Userid     features['id'] = group['id'].max()     # Feature 1: Number of search events     features['number_of_search_events'] = (group['event']=='search').sum()     # Feature 2: Number of tablets     features['number_of_tablets'] = (group['product']=='tablet').sum()     # Feature 3: Total time     features['total_time'] = (group['date'].max() - group['date'].min()) / np.timedelta64(1,'D')     # Feature 4: Total number of events     features['events'] = len(group)     # Histogram of products examined     product_counts = group['product'].value_counts()     # Feature 5 max events for a product     features['max_product_events'] = product_counts.max()     # Feature 6 min events for a product     features['min_product_events'] = product_counts.min()     # Feature 7 avg events for a product     features['mean_product_events'] = product_counts.mean()     # Feature 8 std events for a product     features['std_product_events'] = product_counts.std()     # Feature 9 total price for tablet products     features['tablet_price_sum'] = group.loc[group['product']=='tablet','price'].sum()     # Feature 10 max price for tablet products     features['tablet_price_max'] = group.loc[group['product']=='tablet','price'].max()     # Feature 11 min price for tablet products     features['tablet_price_min'] = group.loc[group['product']=='tablet','price'].min()     # Feature 12 mean price for tablet products     features['tablet_price_mean'] = group.loc[group['product']=='tablet','price'].mean()     # Feature 13 std price for tablet products     features['tablet_price_std'] = group.loc[group['product']=='tablet','price'].std()     return pd.Series(features) 

One potential problem is that each feature potentially scans the whole chunk so if I have 100 features I scan the chunk 100 times instead of just one.

For example a feature can be the number of "tablet" events the user has, other can be the number of "home" events, other can be the average time difference between "search" events, then average time difference between "search" events for "tablets", etc etc. Each feature can be coded as a function that takes a chunk (df) and creates the feature but when we have 100s of features each is scanning the whole chunk when a single linear scan would suffice. The problem is the code would get ugly if I do a manual for loop over each record in the chunk and code all the features in the loop.

Questions:

  1. If I have to process a dataframe hundreds of times, is there a way to abstract this in a single scan that will create all the needed features?

  2. Is there a speed improvement over the groupby/apply approach I'm currently using?

2 Answers

Answers 1

Disclaimer: the following answer does not properly answer the above question. Just leaving it here for the sake of work invested. Maybe there will be some use for it at some point.

  1. parallelism (e.g. Parallelize apply after pandas groupby; see code below)
  2. use a cache if you run the calculations multiple times (e.g. HDFStore)
  3. avoid string operations; use native types which can be efficiently represented in numpy
  4. if you really need strings, use categorical columns (given they represent categorical data..)
  5. if the frames are really big, consider using chunks (e.g. "Large data" work flows using pandas)
  6. use cython for further (potentially drastical) enhancements
  7. re-use dataframe selections (e.g. group.loc[group['product']=='tablet','price'])

As for (1), given your code from above, I could produce speedups of up to 43% (i7-7700HQ CPU, 16GB RAM).

Timings

using joblib: 68.86841534099949s using multiprocessing: 71.53540843299925s single-threaded: 119.05010353899888s 

Code

import pandas as pd import numpy as np import time import timeit import os import joblib import multiprocessing   import pandas as pd import numpy as np import timeit import joblib import multiprocessing   def make_data():     # just some test data ...     n_users = 100     events = ['search', 'buy', 'home', 'cancel']     products = ['tablet', 'dvd', 'book']     max_price = 1000      n_duplicates = 1000     n_rows = 40000      df = pd.DataFrame({         'id': list(map(str, np.random.randint(0, n_users, n_rows))),         'event': list(map(events.__getitem__, np.random.randint(0, len(events), n_rows))),         'date': list(map(pd.to_datetime, np.random.randint(0, 100000, n_rows))),         'product': list(map(products.__getitem__, np.random.randint(0, len(products), n_rows))),         'price': np.random.random(n_rows) * max_price     })     df = pd.concat([df for _ in range(n_duplicates)])     df.to_pickle('big_df.pkl')     return df   def data():     return pd.read_pickle('big_df.pkl')   def featurize(group):     features = dict()      # Feature 1: Number of search events     features['number_of_search_events'] = (group['event'] == 'search').sum()     # Feature 2: Number of tablets     features['number_of_tablets'] = (group['product'] == 'tablet').sum()     # Feature 3: Total time     features['total_time'] = (group['date'].max() - group['date'].min()) / np.timedelta64(1, 'D')     # Feature 4: Total number of events     features['events'] = len(group)     # Histogram of products examined     product_counts = group['product'].value_counts()     # Feature 5 max events for a product     features['max_product_events'] = product_counts.max()     # Feature 6 min events for a product     features['min_product_events'] = product_counts.min()     # Feature 7 avg events for a product     features['mean_product_events'] = product_counts.mean()     # Feature 8 std events for a product     features['std_product_events'] = product_counts.std()     # Feature 9 total price for tablet products     features['tablet_price_sum'] = group.loc[group['product'] == 'tablet', 'price'].sum()     # Feature 10 max price for tablet products     features['tablet_price_max'] = group.loc[group['product'] == 'tablet', 'price'].max()     # Feature 11 min price for tablet products     features['tablet_price_min'] = group.loc[group['product'] == 'tablet', 'price'].min()     # Feature 12 mean price for tablet products     features['tablet_price_mean'] = group.loc[group['product'] == 'tablet', 'price'].mean()     # Feature 13 std price for tablet products     features['tablet_price_std'] = group.loc[group['product'] == 'tablet', 'price'].std()     return pd.DataFrame.from_records(features, index=[group['id'].max()])   # https://stackoverflow.com/questions/26187759/parallelize-apply-after-pandas-groupby def apply_parallel_job(dfGrouped, func):     retLst = joblib.Parallel(n_jobs=multiprocessing.cpu_count())(         joblib.delayed(func)(group) for name, group in dfGrouped)     return pd.concat(retLst)   def apply_parallel_pool(dfGrouped, func):     with multiprocessing.Pool(multiprocessing.cpu_count()) as p:         ret_list = list(p.map(func, [group for name, group in dfGrouped]))     return pd.concat(ret_list)   featurized_job = lambda df: apply_parallel_job(df.groupby('id'), featurize) featurized_pol = lambda df: apply_parallel_pool(df.groupby('id'), featurize) featurized_sng = lambda df: df.groupby('id').apply(featurize)  make_data() print(timeit.timeit("featurized_job(data())", "from __main__ import featurized_job, data", number=3)) print(timeit.timeit("featurized_sng(data())", "from __main__ import featurized_sng, data", number=3)) print(timeit.timeit("featurized_pol(data())", "from __main__ import featurized_pol, data", number=3)) 

As for (7), consider the following refactorization:

Timings

original: 112.0091859719978s re-used prices: 83.85681765000118s 

Code

# [...] prices_ = group.loc[group['product'] == 'tablet', 'price'] features['tablet_price_sum'] = prices_.sum() # Feature 10 max price for tablet products features['tablet_price_max'] = prices_.max() # Feature 11 min price for tablet products features['tablet_price_min'] = prices_.min() # Feature 12 mean price for tablet products features['tablet_price_mean'] = prices_.mean() # Feature 13 std price for tablet products features['tablet_price_std'] = prices_.std() # [...] 

Answers 2

DataFrame.agg() is your friend here. You're right in that the initial method implemented iterates over the entire dataset for EACH call. So what we can do is define all of the heavy lifting that we want to do at the beginning and let pandas handle all of the internal optimizations. Typically with these libraries, there's VERY rarely a time where you can code something that would beat just using the internal libraries.

What's nice about this method is that you only have to do this heavy calculation ONCE and then you can do all of the fine tuned feature creation on the filtered dataset since it's so much faster.

This reduces runtime by 65% percent, which is quite large. And also, next time you want to get a new statistic, you can just access the result of featurize2 and not have to run the computation again.

df = make_data() # include this to be able to calculate standard deviations correctly df['price_sq'] = df['price'] ** 2.  def featurize2(df):     grouped = df.groupby(['id', 'product', 'event'])     initial = grouped.agg({'price': ['count', 'max', 'min', 'mean', 'std', 'sum', 'size'], 'date': [         'max', 'min'], 'price_sq': ['sum']}).reset_index()     return initial  def featurize3(initial):      # Features 5-8     features = initial.groupby('product').sum()['price']['count'].agg(['max', 'min', 'mean', 'std']).rename({         'max': 'max_product_events',         'min': 'min_product_events',         'mean': 'mean_product_events',         'std': 'std_product_events'     })      searches = initial[initial['event'] == 'search']['price']      # Feature 1: Number of search events     features['number_of_search_events'] = searches['count'].sum()      tablets = initial[initial['product'] == 'tablet']['price']     tablets_sq = initial[initial['product'] == 'tablet']['price_sq']      # Feature 2: Number of tablets     features['number_of_tablets'] = tablets['count'].sum()     # Feature 9 total price for tablet products     features['tablet_price_sum'] = tablets['sum'].sum()     # Feature 10 max price for tablet products     features['tablet_price_max'] = tablets['max'].max()     # Feature 11 min price for tablet products     features['tablet_price_min'] = tablets['min'].min()     # Feature 12 mean price for tablet products     features['tablet_price_mean'] = (         tablets['mean'] * tablets['count']).sum() / tablets['count'].sum()     # Feature 13 std price for tablet products     features['tablet_price_std'] = np.sqrt(tablets_sq['sum'].sum(     ) / tablets['count'].sum() - features['tablet_price_mean'] ** 2.)      # Feature 3: Total time     features['total_time'] = (initial['date']['max'].max(     ) - initial['date']['min'].min()) / np.timedelta64(1, 'D')      # Feature 4: Total number of events     features['events'] = initial['price']['count'].sum()      return features  def new_featurize(df):     initial = featurize2(df)     final = featurize3(initial)     return final  original = featurize(df) final = new_featurize(df)  for x in final.index:     print("outputs for index {} are equal: {}".format(         x, np.isclose(final[x], original[x])))  print("featurize(df): {}".format(timeit.timeit("featurize(df)",                     "from __main__ import featurize, df", number=3))) print("featurize2(df): {}".format(timeit.timeit("featurize2(df)",                     "from __main__ import featurize2, df", number=3))) print("new_featurize(df): {}".format(timeit.timeit("new_featurize(df)",                     "from __main__ import new_featurize, df", number=3)))  for x in final.index:     print("outputs for index {} are equal: {}".format(         x, np.isclose(final[x], original[x]))) 

Results

featurize(df): 76.0546050072 featurize2(df): 26.5458261967 new_featurize(df): 26.4640090466 outputs for index max_product_events are equal: [ True] outputs for index min_product_events are equal: [ True] outputs for index mean_product_events are equal: [ True] outputs for index std_product_events are equal: [ True] outputs for index number_of_search_events are equal: [ True] outputs for index number_of_tablets are equal: [ True] outputs for index tablet_price_sum are equal: [ True] outputs for index tablet_price_max are equal: [ True] outputs for index tablet_price_min are equal: [ True] outputs for index tablet_price_mean are equal: [ True] outputs for index tablet_price_std are equal: [ True] outputs for index total_time are equal: [ True] outputs for index events are equal: [ True] 
If You Enjoyed This, Take 5 Seconds To Share It

0 comments:

Post a Comment