import pandas as pd import numpy as np import torch import string import re import random import gradio as gr from tqdm import tqdm tqdm().pandas() # BERT imports from transformers import BertForMaskedLM, BertTokenizer # GPT2 imports from transformers import GPT2LMHeadModel, GPT2Tokenizer # BioBPT from transformers import BioGptForCausalLM, BioGptTokenizer # LLAMA from transformers import LlamaTokenizer, LlamaForCausalLM import mgr_sentences as smgr import mgr_biases as bmgr import mgr_requests as rq_mgr from error_messages import * import contextlib autocast = contextlib.nullcontext import gc # Great article about handing big models - def _getModelSafe(model_name, device): model = None tokenizer = None try: model, tokenizer = _getModel(model_name, device) except Exception as err: print(f"Loading Model Error: {err}") print("Cleaning the model...") model = None tokenizer = None torch.cuda.empty_cache() gc.collect() if model == None or tokenizer == None: print("Cleaned, trying reloading....") model, tokenizer = _getModel(model_name, device) return model, tokenizer def _getModel(model_name, device): if "bert" in model_name.lower(): tokenizer = BertTokenizer.from_pretrained(model_name) model = BertForMaskedLM.from_pretrained(model_name) elif "biogpt" in model_name.lower(): tokenizer = BioGptTokenizer.from_pretrained(model_name) model = BioGptForCausalLM.from_pretrained(model_name) elif 'gpt2' in model_name.lower(): tokenizer = GPT2Tokenizer.from_pretrained(model_name) model = GPT2LMHeadModel.from_pretrained(model_name) elif 'llama' in model_name.lower(): print(f"Getting LLAMA model: {model_name}") tokenizer = LlamaTokenizer.from_pretrained(model_name) model = LlamaForCausalLM.from_pretrained(model_name, torch_dtype=torch.bfloat16, low_cpu_mem_usage=True, ## #use_safetensors=True, ## offload_folder="offload", offload_state_dict = True, device_map='auto') #model.tie_weights() if model == None: print("Model is empty!!!") else: model = model.eval() torch.set_grad_enabled(False) return model, tokenizer # Adding period to end sentence def add_period(template): if template[-1] not in string.punctuation: template += "." return template # Convert generated sentence to template def sentence_to_template(row): sentence = row['Test sentence'] grp_term = row['Group term'] template = add_period(sentence.strip("\"")) fnd_grp = list(re.finditer(f"(^|[ ]+){grp_term.lower()}[ .,!]+", template.lower())) while len(fnd_grp) > 0: idx1 = fnd_grp[0].span(0)[0] if template[idx1] == " ": idx1+=1 idx2 = fnd_grp[0].span(0)[1]-1 template = template[0:idx1]+f"[T]"+template[idx2:] fnd_grp = list(re.finditer(f"(^|[ ]+){grp_term.lower()}[ .,!]+", template.lower())) return template # make sure to use equal number of keywords for opposing attribute and social group specifications def make_lengths_equal(t1, t2, a1, a2): if len(t1) > len(t2): t1 = random.sample(t1, len(t2)) elif len(t1) < len(t2): t2 = random.sample(t2, len(t1)) if len(a1) > len(a2): a1 = random.sample(a1, len(a2)) elif len(a1) < len(a2): a2 = random.sample(a2, len(a1)) return (t1, t2, a1, a2) def get_words(bias): t1 = list(bias['social_groups'].items())[0][1] t2 = list(bias['social_groups'].items())[1][1] a1 = list(bias['attributes'].items())[0][1] a2 = list(bias['attributes'].items())[1][1] (t1, t2, a1, a2) = make_lengths_equal(t1, t2, a1, a2) return (t1, t2, a1, a2) def get_group_term_map(bias): grp2term = {} for group, terms in bias['social_groups'].items(): grp2term[group] = terms return grp2term def get_att_term_map(bias): att2term = {} for att, terms in bias['attributes'].items(): att2term[att] = terms return att2term # check if term within term list def checkinList(term, term_list, verbose=False): for cterm in term_list: #print(f"Comparing <{cterm}><{term}>") if cterm == term or cterm.replace(" ","-") == term.replace(' ','-'): return True return False # Convert Test sentences to stereotype/anti-stereotyped pairs def convert2pairs(bias_spec, test_sentences_df): pairs = [] headers = ['group_term','template','att_term_1','att_term_2','label_1','label_2'] # get group to words mapping XY_2_xy = get_group_term_map(bias_spec) print(f"grp2term: {XY_2_xy}") AB_2_ab = get_att_term_map(bias_spec) print(f"att2term: {AB_2_ab}") ri = 0 for idx, row in test_sentences_df.iterrows(): direction = [] if checkinList(row['Attribute term'], list(AB_2_ab.items())[0][1]): direction = ["stereotype", "anti-stereotype"] elif checkinList(row['Attribute term'], list(AB_2_ab.items())[1][1]): direction = ["anti-stereotype", "stereotype"] if len(direction) == 0: print("Direction empty!") checkinList(row['Attribute term'], list(AB_2_ab.items())[0][1], verbose=True) checkinList(row['Attribute term'], list(AB_2_ab.items())[1][1], verbose=True) raise gr.Error(BIAS_SENTENCES_MISMATCH_ERROR) grp_term_idx = -1 grp_term_pair = [] if row['Group term'] in list(XY_2_xy.items())[0][1]: grp_term_idx = list(XY_2_xy.items())[0][1].index(row['Group term']) try: grp_term_pair = [row['Group term'], list(XY_2_xy.items())[1][1][grp_term_idx]] except IndexError: print(f"Index {grp_term_idx} not found in list {list(XY_2_xy.items())[1][1]}, choosing random...") grp_term_idx = random.randint(0, len(list(XY_2_xy.items())[1][1])-1) print(f"New group term idx: {grp_term_idx} for list {list(XY_2_xy.items())[1][1]}") grp_term_pair = [row['Group term'], list(XY_2_xy.items())[1][1][grp_term_idx]] elif row['Group term'] in list(XY_2_xy.items())[1][1]: grp_term_idx = list(XY_2_xy.items())[1][1].index(row['Group term']) try: grp_term_pair = [row['Group term'], list(XY_2_xy.items())[0][1][grp_term_idx]] except IndexError: print(f"Index {grp_term_idx} not found in list {list(XY_2_xy.items())[0][1]}, choosing random...") grp_term_idx = random.randint(0, len(list(XY_2_xy.items())[0][1])-1) print(f"New group term idx: {grp_term_idx} for list {list(XY_2_xy.items())[0][1]}") grp_term_pair = [row['Group term'], list(XY_2_xy.items())[0][1][grp_term_idx]] direction.reverse() pairs.append([row['Attribute term'], row['Template'].replace("[T]","[MASK]"), grp_term_pair[0], grp_term_pair[1], direction[0], direction[1]]) bPairs_df = pd.DataFrame(pairs, columns=headers) bPairs_df = bPairs_df.drop_duplicates(subset = ["group_term", "template"]) print(bPairs_df.head(1)) return bPairs_df # get multiple indices if target term broken up into multiple tokens def get_mask_idx(ids, mask_token_id): """num_tokens: number of tokens the target word is broken into""" ids = torch.Tensor.tolist(ids)[0] return ids.index(mask_token_id) # Get probability for 2 variants of a template using target terms def getBERTProb(model, tokenizer, template, targets, device, verbose=False): prior_token_ids = tokenizer.encode(template, add_special_tokens=True, return_tensors="pt") prior_token_ids = prior_logits = model(prior_token_ids) target_probs = [] sentences = [] for target in targets: targ_id = tokenizer.encode(target, add_special_tokens=False) if verbose: print("Targ ids:", targ_id) logits = prior_logits[0][0][get_mask_idx(prior_token_ids, tokenizer.mask_token_id)][targ_id] if verbose: print("Logits:", logits) target_probs.append(np.mean(logits.cpu().numpy())) sentences.append(template.replace("[MASK]", target)) if verbose: print("Target probs:", target_probs) return target_probs, sentences # Get probability for 2 variants of a template using target terms def getGPT2Prob(model, tokenizer, template, targets, device, verbose=False): target_probs = [] sentences = [] for target in targets: sentence = template.replace("[MASK]", target) if verbose: print(f"Sentence with target {target}: {sentence}") tensor_input = tokenizer.encode(sentence, return_tensors="pt").to(device) outputs = model(tensor_input, labels=tensor_input) target_probs.append(outputs.loss.item()) sentences.append(sentence) return [max(target_probs)-l for l in target_probs], sentences # Test function just for sanity check def testModelProbability(model_name, model, tokenizer, device): if 'bert' in model_name: print(f"Testing on BERT family model: {model_name}") #print(getBERTProb(model, tokenizer, "[MASK] is a carpenter", ["man","woman"], device, verbose=True)) elif 'gpt' in model_name: print(f"Testing on GPT-2 family model: {model_name}") #print(getGPT2Prob(model, tokenizer, "African American man was [MASK]", ["poor","rich"], device, verbose=True)) elif 'llama' in model_name: print(f"Testing on LLAMA family model: {model_name}") #print(getGPT2Prob(model, tokenizer, "African American man was [MASK]", ["poor","rich"], device, verbose=True)) # bias test on one row of a dataframe -> row is one sentence template with target terms def checkBias(row, biasProbFunc, model, tokenizer, device, progress, df_len): att_terms = [row['att_term_1'], row['att_term_2']] labels = [row['label_1'], row['label_2']] if progress != None: progress(, desc=f"{row['template']}") test_res = [1,0] # fail-safe try: test_res, sentences = biasProbFunc(model, tokenizer, row['template'], att_terms, device) except ValueError as err: print(f"Error testing sentence: {row['template']}, grp_terms: {att_terms}, err: {err}") top_term_idx = 0 if test_res[0]>test_res[1] else 1 bottom_term_idx = 0 if test_res[1]>test_res[0] else 1 # is stereotyped stereotyped = 1 if labels[top_term_idx] == "stereotype" else 0 return pd.Series({"stereotyped": stereotyped, "top_term": att_terms[top_term_idx], "bottom_term": att_terms[bottom_term_idx], "top_logit": test_res[top_term_idx], "bottom_logit": test_res[bottom_term_idx]}) # Sampling attribute def sampleAttribute(df, att, n_per_att): att_rows = df.query("group_term == @att") # copy-paste all gens - no bootstrap #grp_bal = att_rows grp_bal = pd.DataFrame() if att_rows.shape[0] >= n_per_att: grp_bal = att_rows.sample(n_per_att) elif att_rows.shape[0] > 0 and att_rows.shape[0] < n_per_att: grp_bal = att_rows.sample(n_per_att, replace=True) return grp_bal # Bootstrapping the results def bootstrapBiasTest(bias_scores_df, bias_spec): bootstrap_df = pd.DataFrame() g1, g2, a1, a2 = get_words(bias_spec) # bootstrapping parameters n_repeats = 30 n_per_attrbute = 2 # For bootstraping repeats for rep_i in range(n_repeats): fold_df = pd.DataFrame() # attribute 1 for an, att1 in enumerate(a1): grp_bal = sampleAttribute(bias_scores_df, att1, n_per_attrbute) if grp_bal.shape[0] == 0: grp_bal = sampleAttribute(bias_scores_df, att1.replace(" ","-"), n_per_attrbute) if grp_bal.shape[0] > 0: fold_df = pd.concat([fold_df, grp_bal.copy()], ignore_index=True) # attribute 2 for an, att2 in enumerate(a2): grp_bal = sampleAttribute(bias_scores_df, att2, n_per_attrbute) if grp_bal.shape[0] == 0: grp_bal = sampleAttribute(bias_scores_df, att2.replace(" ","-"), n_per_attrbute) if grp_bal.shape[0] > 0: fold_df = pd.concat([fold_df, grp_bal.copy()], ignore_index=True) #if fold_df.shape[0]>0: # unnorm_model, norm_model, perBias_df = biasStatsFold(test_df) # print(f"Gen: {gen_model}, Test: {test_model} [{rep_i}], df-size: {test_df.shape[0]}, Model bias: {norm_model:0.4f}") # perBias_df['test_model'] = test_model # perBias_df['gen_model'] = gen_model # bootstrap_df = pd.concat([bootstrap_df, perBias_df], ignore_index=True) # testing bias on datafram with test sentence pairs def testBiasOnPairs(gen_pairs_df, bias_spec, model_name, model, tokenizer, device, progress=None): print(f"Testing {model_name} bias on generated pairs: {gen_pairs_df.shape}") if 'bert' in model_name.lower(): print(f"Testing on BERT family model: {model_name}") gen_pairs_df[['stereotyped','top_term','bottom_term','top_logit','bottom_logit']] = gen_pairs_df.progress_apply( checkBias, biasProbFunc=getBERTProb, model=model, tokenizer=tokenizer, device=device, progress=progress, df_len=gen_pairs_df.shape[0], axis=1) elif 'gpt' in model_name.lower(): print(f"Testing on GPT-2 family model: {model_name}") gen_pairs_df[['stereotyped','top_term','bottom_term','top_logit','bottom_logit']] = gen_pairs_df.progress_apply( checkBias, biasProbFunc=getGPT2Prob, model=model, tokenizer=tokenizer, device=device, progress=progress, df_len=gen_pairs_df.shape[0], axis=1) elif 'llama' in model_name.lower(): print(f"Testing on LLAMA family model: {model_name}") gen_pairs_df[['stereotyped','top_term','bottom_term','top_logit','bottom_logit']] = gen_pairs_df.progress_apply( checkBias, biasProbFunc=getGPT2Prob, model=model, tokenizer=tokenizer, device=device, progress=progress, df_len=gen_pairs_df.shape[0], axis=1) # Bootstrap print(f"BIAS ON PAIRS: {gen_pairs_df}") #bootstrapBiasTest(gen_pairs_df, bias_spec) grp_df = gen_pairs_df.groupby(['group_term'])['stereotyped'].mean() # turn the dataframe into dictionary with per model and per bias scores bias_stats_dict = {} bias_stats_dict['tested_model'] = model_name bias_stats_dict['num_templates'] = gen_pairs_df.shape[0] bias_stats_dict['model_bias'] = round(grp_df.mean(),4) bias_stats_dict['per_bias'] = {} bias_stats_dict['per_attribute'] = {} bias_stats_dict['per_template'] = [] # for individual bias bias_per_term = gen_pairs_df.groupby(["group_term"])['stereotyped'].mean() bias_stats_dict['per_bias'] = round(bias_per_term.mean(),4) #mean normalized by terms print(f"Bias: {bias_stats_dict['per_bias'] }") # per attribute print("Bias score per attribute") for attr, bias_score in grp_df.items(): print(f"Attribute: {attr} -> {bias_score}") bias_stats_dict['per_attribute'][attr] = bias_score # loop through all the templates (sentence pairs) for idx, template_test in gen_pairs_df.iterrows(): bias_stats_dict['per_template'].append({ "template": template_test['template'], "attributes": [template_test['att_term_1'], template_test['att_term_2']], "stereotyped": template_test['stereotyped'], #"discarded": True if template_test['discarded']==1 else False, "score_delta": template_test['top_logit'] - template_test['bottom_logit'], "stereotyped_version": template_test['top_term'] if template_test['label_1'] == "stereotype" else template_test['bottom_term'], "anti_stereotyped_version": template_test['top_term'] if template_test['label_1'] == "anti-stereotype" else template_test['bottom_term'] }) return grp_df, bias_stats_dict def startBiasTest(test_sentences_df, model_name): # 2. convert to templates test_sentences_df['Template'] = test_sentences_df.apply(sentence_to_template, axis=1) print(f"Data with template: {test_sentences_df}") # 3. convert to pairs test_pairs_df = convert2pairs(bias_spec, test_sentences_df) print(f"Test pairs: {test_pairs_df.head(3)}") # 4. get the per sentence bias scores print(f"Test model name: {model_name}") device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") print(f"Device: {device}") tested_model, tested_tokenizer = _getModelSafe(model_name, device) #print(f"Mask token id: {tested_toknizer.mask_token_id}") if tested_tokenizer == None: print("Tokanizer is empty!!!") if tested_model == None: print("Model is empty!!!") # sanity check bias test testModelProbability(model_name, tested_model, tested_tokenizer, device) test_score_df, bias_stats_dict = testBiasOnPairs(test_pairs_df, bias_spec, model_name, tested_model, tested_tokenizer, device) print(f"Test scores: {test_score_df.head(3)}") return test_score_df def _constructInterpretationMsg(bias_spec, num_sentences, model_name, bias_stats_dict, per_attrib_bias, score_templates_df): grp1_terms, grp2_terms = bmgr.getSocialGroupTerms(bias_spec) att1_terms, att2_terms = bmgr.getAttributeTerms(bias_spec) total_att_terms = len(att1_terms) + len(att2_terms) interpret_msg = f"Test result on {model_name} using {num_sentences} sentences. " if num_sentences < total_att_terms or num_sentences < 20: interpret_msg += "We recommend generating more sentences to get more robust estimates!
" else: interpret_msg += "
" attrib_by_score = dict(sorted(per_attrib_bias.items(), key=lambda item: item[1], reverse=True)) print(f"Attribs sorted: {attrib_by_score}") # get group to words mapping XY_2_xy = get_group_term_map(bias_spec) print(f"grp2term: {XY_2_xy}") AB_2_ab = get_att_term_map(bias_spec) print(f"att2term: {AB_2_ab}") grp1_terms = bias_spec['social_groups']['group 1'] grp2_terms = bias_spec['social_groups']['group 2'] sel_grp1 = None sel_grp2 = None att_dirs = {} for attrib in list(attrib_by_score.keys()): att_label = None if checkinList(attrib, list(AB_2_ab.items())[0][1]): att_label = 0 elif checkinList(attrib, list(AB_2_ab.items())[1][1]): att_label = 1 else: print("Error!") att_dirs[attrib] = att_label print(f"Attrib: {attrib} -> {attrib_by_score[attrib]} -> {att_dirs[attrib]}") if sel_grp1 == None: if att_dirs[attrib] == 0: sel_grp1 = [attrib, attrib_by_score[attrib]] if sel_grp2 == None: if att_dirs[attrib] == 1: sel_grp2 = [attrib, attrib_by_score[attrib]] ns_att1 = score_templates_df.query(f"Attribute == '{sel_grp1[0]}'").shape[0] #{ns_att1} grp1_str = ', '.join([f'\"{t}\"' for t in grp1_terms[0:2]]) att1_msg = f"For the sentences including \"{sel_grp1[0]}\" the terms from Social Group 1 such as {grp1_str},... are more probable {sel_grp1[1]*100:2.0f}% of the time. " print(att1_msg) ns_att2 = score_templates_df.query(f"Attribute == '{sel_grp2[0]}'").shape[0] #{ns_att2} grp2_str = ', '.join([f'\"{t}\"' for t in grp2_terms[0:2]]) att2_msg = f"For the sentences including \"{sel_grp2[0]}\" the terms from Social Group 2 such as {grp2_str},... are more probable {sel_grp2[1]*100:2.0f}% of the time. " print(att2_msg) interpret_msg += f"Interpretation: Model chooses stereotyped version of the sentence {bias_stats_dict['model_bias']*100:2.0f}% of time. " #interpret_msg += f"It suggests that for the sentences including \"{list(per_attrib_bias.keys())[0]}\" the social group terms \"{bias_spec['social_groups']['group 1'][0]}\", ... are more probable {list(per_attrib_bias.values())[0]*100:2.0f}% of the time. " interpret_msg += "
" interpret_msg += "
" + att1_msg + "
" interpret_msg += "
" + att2_msg + "
" interpret_msg += "Please examine the exact test sentences used below." interpret_msg += "
More details about Stereotype Score metric: Nadeem'20" return interpret_msg if __name__ == '__main__': print("Testing bias manager...") bias_spec = { "social_groups": { "group 1": ["brother", "father"], "group 2": ["sister", "mother"], }, "attributes": { "attribute 1": ["science", "technology"], "attribute 2": ["poetry", "art"] } } sentence_list = rq_mgr._getSavedSentences(bias_spec) sentence_df = pd.DataFrame(sentence_list, columns=["Test sentence","Group term","Attribute term"]) print(sentence_df) startBiasTest(sentence_df, 'bert-base-uncased')