Source code for pipeline.preprocessing.feature_table_builder

import datetime
import logging
import pandas as pd
from ..queries import timeframe_queries, basicqueries
from .. import setup_environment
from . import feature_processor
import json
import pdb
import cStringIO

engine, config_db = setup_environment.get_database('pipeline/default_profile.yaml')
try:
    con = engine.raw_connection()
    # missing schema information ..
except:
    # change to log statement
    print 'cannot connect to database'



[docs]class Labeller(): def __init__(self, start_date, end_date, labels): self.start_date = start_date self.end_date = end_date self.labels = labels
[docs] def get_labels(self): timeframe_query_training = timeframe_queries.timeframe_table_end_date( config_db['personid_event_dates'], self.start_date, 'begin_date') people_query_training = basicqueries.count_vals_column_for_id( timeframe_query_training, config_db["id_column"], "event", ['ems', 'mh', 'booking']) people_training = pd.read_sql(people_query_training, con = con) timeframe_query_labels = timeframe_queries.timeframe_table_start_end_date( config_db['personid_event_dates'], self.start_date, self.end_date, 'begin_date', 'begin_date') people_query_labels = basicqueries.count_vals_column_for_id( timeframe_query_labels, config_db["id_column"], "event", ['ems', 'mh', 'booking']) people_labelling = pd.read_sql(people_query_labels, con = con) people = people_training.merge(people_labelling, on = config_db["id_column"], how ='left') people = people.fillna(0) people.drop_duplicates(inplace = True) labelling_columns = [config_db["id_column"]] if 'ems_jims' in self.labels: ems_jims = ('{}').format('ems_jims') people[ems_jims] = None people[ems_jims][(people['booking_sum_x'] == 0)& (people['ems_sum_x'] > 0) & (people['booking_sum_y'] > 0) & (people['mh_sum_x'] == 0)] = 1 people[ems_jims][(people['booking_sum_x'] == 0)& (people['ems_sum_x'] > 0) & (people['booking_sum_y'] == 0) & (people['mh_sum_x'] == 0)] = 0 labelling_columns.append(ems_jims) if 'mh_jims' in self.labels: mh_jims = ('{}').format('mh_jims') people[mh_jims] = None people[mh_jims][(people['mh_sum_x'] > 0) & (people['booking_sum_y'] > 0) & (people['booking_sum_x'] == 0) & (people['ems_sum_x'] == 0)] = 1 people[mh_jims][(people['mh_sum_x'] > 0) & (people['booking_sum_y'] == 0) & (people['booking_sum_x'] == 0) & (people['ems_sum_x'] == 0)] = 0 labelling_columns.append(mh_jims) if 'ems_mh_jims' in self.labels: ems_mh_jims = ('{}').format('ems_mh_jims') people[ems_mh_jims] = None people[ems_mh_jims][(people['mh_sum_x'] > 0) & (people['booking_sum_y'] > 0) & (people['ems_sum_x'] >0) & (people['booking_sum_x'] == 0)] =1 people[ems_mh_jims][(people['mh_sum_x'] > 0) & (people['booking_sum_y'] == 0) & (people['ems_sum_x'] >0) & (people['booking_sum_x'] == 0)] =0 labelling_columns.append(ems_mh_jims) if 'ems_jims_mh' in self.labels: ems_jims_mh = ('{}').format('ems_jims_mh') people[ems_jims_mh] = None people[ems_jims_mh][(people['booking_sum_x'] > 0) & (people['mh_sum_y'] > 0) & (people['ems_sum_x'] >0) & (people['mh_sum_x'] == 0)] =1 people[ems_jims_mh][(people['booking_sum_x'] > 0) & (people['mh_sum_y'] == 0) & (people['ems_sum_x'] >0) & (people['mh_sum_x'] == 0)] =0 labelling_columns.append(ems_jims_mh) if 'ems_ems' in self.labels: ems_ems = ('{}').format('ems_ems') people[ems_ems] = None people[ems_ems][(people['mh_sum_x'] == 0) & (people['ems_sum_x'] > 0) & (people['booking_sum_x'] == 0) & (people['ems_sum_y'] >0)] =1 people[ems_ems][(people['mh_sum_x'] == 0) & (people['ems_sum_x'] > 0) & (people['booking_sum_x'] == 0) & (people['ems_sum_y'] ==0)] =0 labelling_columns.append(ems_ems) if 'mh_ems' in self.labels: mh_ems = ('{}').format('mh_ems') people[mh_ems] = None people[mh_ems][(people['mh_sum_x'] > 0) & (people['ems_sum_x'] == 0) & (people['booking_sum_x'] == 0) & (people['ems_sum_y'] >0)] =1 people[mh_ems][(people['mh_sum_x'] > 0) & (people['ems_sum_x'] == 0) & (people['booking_sum_x'] == 0) & (people['ems_sum_y'] ==0)] =0 labelling_columns.append(mh_ems) if 'ems_mh' in self.labels: ems_mh = ('{}').format('ems_mh') people[ems_mh] = None people[ems_mh][(people['mh_sum_x'] == 0) & (people['ems_sum_x'] > 0) & (people['booking_sum_x'] == 0) & (people['mh_sum_y'] >0)] =1 people[ems_mh][(people['mh_sum_x'] == 0) & (people['ems_sum_x'] > 0) & (people['booking_sum_x'] == 0) & (people['mh_sum_y'] ==0)] =0 labelling_columns.append(ems_mh) if 'jims_mh_jims' in self.labels: jims_mh_jims = ('{}').format('jims_mh_jims') people[jims_mh_jims] = None people[jims_mh_jims][(people['mh_sum_x'] > 0) & (people['ems_sum_x'] == 0) & (people['booking_sum_x'] > 0) & (people['booking_sum_y'] >0)] =1 people[jims_mh_jims][(people['mh_sum_x'] > 0) & (people['ems_sum_x'] == 0) & (people['booking_sum_x'] > 0) & (people['booking_sum_y'] ==0)] =0 labelling_columns.append(jims_mh_jims) if 'jims_jims' in self.labels: jims_jims = ('{}').format('jims_jims') people[jims_jims] = None people[jims_jims][(people['mh_sum_x'] == 0) & (people['ems_sum_x'] == 0) & (people['booking_sum_x'] > 0) & (people['booking_sum_y'] >0)] =1 people[jims_jims][(people['mh_sum_x'] == 0) & (people['ems_sum_x'] == 0) & (people['booking_sum_x'] > 0) & (people['booking_sum_y'] ==0)] =0 labelling_columns.append(jims_jims) if 'anything_jims' in self.labels: anything_jims = ('{}').format('anything_jims') people[anything_jims] = None people[anything_jims][(people['booking_sum_y'] >0)] =1 people[anything_jims][(people['booking_sum_y'] ==0)] =0 labelling_columns.append(anything_jims) if 'jims_mh__ems___jims' in self.labels: jims_mh__ems___jims = ('{}').format('jims_mh__ems___jims') people[jims_mh__ems___jims] = None people[jims_mh__ems___jims][(people['mh_sum_x'] > 0) & (people['ems_sum_x'] >= 0) & (people['booking_sum_x'] > 0) & (people['booking_sum_y'] > 0)] =1 people[jims_mh__ems___jims][(people['mh_sum_x'] > 0) & (people['ems_sum_x'] >= 0) & (people['booking_sum_x'] > 0) & (people['booking_sum_y'] == 0)] = 0 if 'jims_mh_ems_jims' in self.labels: jims_mh_ems_jims = ('{}').format('jims_mh_ems_jims') people[jims_mh_ems_jims] = None people[jims_mh_ems_jims][(people['mh_sum_x'] > 0) & (people['ems_sum_x'] > 0) & (people['booking_sum_x'] > 0) & (people['booking_sum_y'] >0)] =1 people[jims_mh_ems_jims][(people['mh_sum_x'] > 0) & (people['ems_sum_x'] > 0) & (people['booking_sum_x'] > 0) & (people['booking_sum_y'] ==0)] =0 outcomes = people[labelling_columns].convert_objects(convert_numeric=True) outcomes.fillna(0) return outcomes
[docs]def generate_fake_todays(fake_today, prediction_window, start_date): ''' Given a final prediction window start date, the length of the prediction windows, and a training start date, return the start and end dates for all prediction windows as a dictionary. :param fake_today: start date for the final prediction window :type fake_today: datetime :param prediction_window: length of the prediction windows in days :type prediction_window: int :param start_date: start date for the training period :type start_date: datetime :returns: start and end dates for all prediction windows :rtype: dict ''' fake_today_times = [] temp_fake_today = fake_today train_end_date = temp_fake_today - datetime.timedelta(days = prediction_window) while train_end_date > start_date: fake_today_times.append({'start_date_labelling': train_end_date, 'end_date_labelling': temp_fake_today}) temp_fake_today = train_end_date train_end_date = train_end_date - datetime.timedelta(days = prediction_window) return fake_today_times
[docs]def label_feature_producer(start_date, end_date, features, labels): labeller = Labeller(start_date, end_date, labels) dataset = labeller.get_labels() dataset['training_end_date'] = start_date dataset['labeling_end_date'] = end_date # dataset feature_grabber = feature_processor.FeatureGrabber(start_date, engine,config_db, con) feature_name_dictionary = {} #print dataset for feature in features: #print feature res_training, feature_names_training = feature_grabber.getFeature(feature) feature_name_dictionary[feature] = feature_names_training res_training.drop_duplicates(inplace = True) dataset = pd.merge(dataset, res_training, on = config_db['id_column'], how = 'left') return dataset, feature_name_dictionary
#return None, None
[docs]def generate_feature_list(config): feature_list = [] feature_groups = config['feature_groups'] for feature_group in feature_groups: feature_group_dict = config['features'][feature_group] for feature in feature_group_dict: if feature_group_dict[feature]: feature_list.append(feature) return feature_list
[docs]def merge_feature_dictionaries(d1, d2): fin_dic = {} for k in d2.iterkeys(): fin_dic[k] = list(set(d1.get(k, [])) | set(d2.get(k, []))) return fin_dic
[docs]def dataframe_merge(d1, d2): d1_columns = list(d1.columns) #print d1_columns d2_columns = list(d2.columns) if len(d2_columns) ==0: return d1 columns_missing_in_d2 = list((d1_columns) - (d2_columns)) print columns_missing_in_d2 for column in columns_missing_in_d2: if column not in d2_columns: print 'adding to d2' d2[column] = None columns_missing_in_d1 = list(set(d2_columns) - set(d1_columns)) print columns_missing_in_d1 for column in columns_missing_in_d1: if column not in d1_columns: print 'adding to d1' d1[column] = None #for idx, col in enumerate(d1) print d1_columns == d2_columns merged_df = pd.concat([d1,d2], join = 'outer') return merged_df
[docs]def generate_feature_table(config, fake_today, prediction_window, start_date, feature_timestamp): fake_today_dt = datetime.datetime.strptime(fake_today, "%d%b%Y") start_date_dt = datetime.datetime.strptime(start_date, "%d%b%Y") logging.debug("generate labels") labels = [] for label in config['labelling'].keys(): if config['labelling'][label]: labels.append(label) logging.debug(labels) feature_list = generate_feature_list(config) dates_for_fake_today = generate_fake_todays(fake_today_dt, prediction_window, start_date_dt) #for train_x_set print 'building training set' feature_col_dict = {} train_dataset = pd.DataFrame() for date_info in dates_for_fake_today: print date_info dataset, features = label_feature_producer(date_info['start_date_labelling'], date_info['end_date_labelling'], feature_list, labels) dataset.drop_duplicates(inplace = True) train_dataset = pd.concat([dataset, train_dataset], copy=False) #if len(dataset.columns) #train_dataset = dataframe_merge(dataset, train_dataset) #people = people_training.merge(people_labelling, on=config_db["id_column"], how='left') #train_dataset = train_dataset.append(dataset) feature_col_dict = merge_feature_dictionaries(feature_col_dict, features) test_label_date = fake_today_dt + datetime.timedelta(days = prediction_window) test_dataset, features_test = label_feature_producer(fake_today_dt, test_label_date, feature_list, labels) '''train_dataset = train_dataset.fillna(0) test_dataset = test_dataset.fillna(0)''' train_table_name = ('features_train_{}_{}_at_{}').format(fake_today, prediction_window, feature_timestamp) test_table_name = ('features_test_{}_{}_at_{}').format(fake_today, prediction_window, feature_timestamp) print 'writing training database' print train_dataset.shape #writer.write_dataframe_to_db(train_dataset, train_table_name, 'feature_tables') #train_dataset.to_sql(train_table_name, engine, if_exists='append', index=False, schema = 'feature_tables', chunksize= 500) write_dataframe_to_sql(train_table_name, train_dataset, 'feature_tables') print 'writing test database' #writer.write_dataframe_to_db(test_dataset, test_table_name, 'feature_tables') write_dataframe_to_sql(test_table_name, test_dataset, 'feature_tables') #test_dataset.to_sql(test_table_name, engine, if_exists='append', index= False, schema = 'feature_tables', chunksize = 1000) print 'writing json' json_file_name = ('{}classmap_{}_{}.json').format(config['class_map_dictionary_directory'], fake_today, prediction_window) with open(json_file_name, 'w') as fp: json.dump(feature_col_dict, fp) setup_environment.close_engine(engine)
[docs]def chunker(seq, size): return (seq[pos:pos + size] for pos in xrange(0, len(seq), size))
# http://stackoverflow.com/questions/31997859/
[docs]def write_dataframe_to_sql(df_name, df, schema): print('writing ', df_name) connection = engine.raw_connection() cursor = connection.cursor() # Hacky way to create the table with the necessary schema (df.iloc[[]]).to_sql(df_name, engine, schema = schema, index = False, if_exists = 'replace') #stream the data using 'to_csv' and StringIO(); then use sql's 'copy_from' function output = cStringIO.StringIO() #ignore the index df.to_csv(output, sep = '\t', header = False, index = False) #jump to start of stream output.seek(0) skip = output.getvalue() cur = connection.cursor() #null values become '' cur.copy_from(output, schema + '."' + df_name + '"', null = "") connection.commit() cur.close()