#!/usr/bin/env python3 """ Fix Contact Issues This script addresses two main issues with the contacts in the database: 1. Removes prefixed names like "RG_0x..." and "MC_0x..." and replaces them with NULL if they don't have ENS names 2. Merges duplicate contacts that have the same Ethereum address but different records Usage: python fix_contact_issues.py """ import os import sys import argparse from typing import Dict, List, Any, Optional from dotenv import load_dotenv # Add parent directory to path to import utils sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from utils.db_connector import DatabaseConnector from utils.logger import setup_logger # Load environment variables load_dotenv() # Setup logging logger = setup_logger("fix_contact_issues") class ContactFixer: """Fixes issues with contacts in the database.""" def __init__(self): """Initialize the contact fixer.""" self.db = DatabaseConnector() def fix_prefixed_names(self) -> int: """ Replace prefixed names like "RG_0x..." and "MC_0x..." with NULL. Only do this for contacts that don't have ENS names. Returns: Number of contacts fixed """ logger.info("Fixing prefixed names...") # Find contacts with prefixed names query = """ SELECT id, name, "ethereumAddress", "ensName" FROM "Contact" WHERE (name LIKE 'RG\\_%' OR name LIKE 'MC\\_%' OR name LIKE 'ETH\\_%' OR name LIKE '%_0x%') AND "ensName" IS NULL """ contacts = self.db.execute_query(query, {}) logger.info(f"Found {len(contacts)} contacts with prefixed names") # Update contacts to set name to NULL fixed_count = 0 for contact in contacts: update_query = """ UPDATE "Contact" SET name = NULL, "updatedAt" = NOW() WHERE id = %(contact_id)s """ rows_updated = self.db.execute_update(update_query, { "contact_id": contact["id"] }) if rows_updated > 0: logger.info(f"Cleared name for contact {contact['id']} (was '{contact['name']}')") fixed_count += 1 logger.info(f"Fixed {fixed_count} contacts with prefixed names") return fixed_count def find_duplicate_contacts(self) -> List[Dict[str, Any]]: """ Find contacts with duplicate Ethereum addresses. Returns: List of Ethereum addresses with duplicate contacts """ query = """ SELECT "ethereumAddress", COUNT(*) as count FROM "Contact" GROUP BY "ethereumAddress" HAVING COUNT(*) > 1 ORDER BY COUNT(*) DESC """ duplicates = self.db.execute_query(query, {}) logger.info(f"Found {len(duplicates)} Ethereum addresses with duplicate contacts") return duplicates def merge_duplicate_contacts(self) -> int: """ Merge duplicate contacts by keeping the most complete record. Returns: Number of contacts merged """ logger.info("Merging duplicate contacts...") # Find duplicate contacts duplicates = self.find_duplicate_contacts() # For each duplicate address total_merged = 0 for duplicate in duplicates: eth_address = duplicate["ethereumAddress"] # Get all contacts with this address query = """ SELECT id, "ethereumAddress", "ensName", name, email, twitter, discord, telegram, farcaster, "otherSocial", "warpcastAddress", "ethereumAddress2", "createdAt" FROM "Contact" WHERE "ethereumAddress" = %(eth_address)s ORDER BY "createdAt" ASC """ contacts = self.db.execute_query(query, {"eth_address": eth_address}) if len(contacts) <= 1: continue # Determine the primary contact (the one to keep) # We'll keep the oldest one (first created) as the primary primary_contact = contacts[0] primary_id = primary_contact["id"] # Merge data from other contacts into the primary for contact in contacts[1:]: # Update primary contact with any non-null fields from this contact update_data = {} for field in ["ensName", "name", "email", "twitter", "discord", "telegram", "farcaster", "otherSocial", "warpcastAddress", "ethereumAddress2"]: if contact[field] is not None and primary_contact[field] is None: update_data[field] = contact[field] if update_data: self.db.update_contact(primary_id, update_data) logger.info(f"Updated primary contact {primary_id} with data from {contact['id']}") # Move all related data to the primary contact self.move_related_data(contact["id"], primary_id) # Delete the duplicate contact delete_query = """ DELETE FROM "Contact" WHERE id = %(contact_id)s """ self.db.execute_update(delete_query, {"contact_id": contact["id"]}) logger.info(f"Deleted duplicate contact {contact['id']}") total_merged += 1 logger.info(f"Merged {total_merged} duplicate contacts") return total_merged def move_related_data(self, from_id: str, to_id: str) -> None: """ Move all related data from one contact to another. Args: from_id: ID of the contact to move data from to_id: ID of the contact to move data to """ # Move NFT holdings self.move_nft_holdings(from_id, to_id) # Move token holdings self.move_token_holdings(from_id, to_id) # Move DAO memberships self.move_dao_memberships(from_id, to_id) # Move notes self.move_notes(from_id, to_id) # Move tags self.move_tags(from_id, to_id) # Move contact sources self.move_contact_sources(from_id, to_id) def move_nft_holdings(self, from_id: str, to_id: str) -> None: """ Move NFT holdings from one contact to another. Args: from_id: ID of the contact to move holdings from to_id: ID of the contact to move holdings to """ query = """ INSERT INTO "NftHolding" ( id, "contactId", "contractAddress", "tokenId", "collectionName", "acquiredAt", "createdAt", "updatedAt" ) SELECT gen_random_uuid(), %(to_id)s, "contractAddress", "tokenId", "collectionName", "acquiredAt", "createdAt", NOW() FROM "NftHolding" WHERE "contactId" = %(from_id)s ON CONFLICT ("contactId", "contractAddress", "tokenId") DO NOTHING """ self.db.execute_update(query, {"from_id": from_id, "to_id": to_id}) def move_token_holdings(self, from_id: str, to_id: str) -> None: """ Move token holdings from one contact to another. Args: from_id: ID of the contact to move holdings from to_id: ID of the contact to move holdings to """ query = """ INSERT INTO "TokenHolding" ( id, "contactId", "contractAddress", "tokenSymbol", balance, "lastUpdated", "createdAt", "updatedAt" ) SELECT gen_random_uuid(), %(to_id)s, "contractAddress", "tokenSymbol", balance, "lastUpdated", "createdAt", NOW() FROM "TokenHolding" WHERE "contactId" = %(from_id)s ON CONFLICT ("contactId", "contractAddress") DO NOTHING """ self.db.execute_update(query, {"from_id": from_id, "to_id": to_id}) def move_dao_memberships(self, from_id: str, to_id: str) -> None: """ Move DAO memberships from one contact to another. Args: from_id: ID of the contact to move memberships from to_id: ID of the contact to move memberships to """ query = """ INSERT INTO "DaoMembership" ( id, "contactId", "daoName", "daoType", "joinedAt", "createdAt", "updatedAt" ) SELECT gen_random_uuid(), %(to_id)s, "daoName", "daoType", "joinedAt", "createdAt", NOW() FROM "DaoMembership" WHERE "contactId" = %(from_id)s ON CONFLICT ("contactId", "daoName") DO NOTHING """ self.db.execute_update(query, {"from_id": from_id, "to_id": to_id}) def move_notes(self, from_id: str, to_id: str) -> None: """ Move notes from one contact to another. Args: from_id: ID of the contact to move notes from to_id: ID of the contact to move notes to """ query = """ INSERT INTO "Note" ( id, "contactId", content, "createdAt", "updatedAt" ) SELECT gen_random_uuid(), %(to_id)s, content, "createdAt", NOW() FROM "Note" WHERE "contactId" = %(from_id)s """ self.db.execute_update(query, {"from_id": from_id, "to_id": to_id}) def move_tags(self, from_id: str, to_id: str) -> None: """ Move tags from one contact to another. Args: from_id: ID of the contact to move tags from to_id: ID of the contact to move tags to """ query = """ INSERT INTO "TagsOnContacts" ( "contactId", "tagId", "assignedAt" ) SELECT %(to_id)s, "tagId", "assignedAt" FROM "TagsOnContacts" WHERE "contactId" = %(from_id)s ON CONFLICT ("contactId", "tagId") DO NOTHING """ self.db.execute_update(query, {"from_id": from_id, "to_id": to_id}) def move_contact_sources(self, from_id: str, to_id: str) -> None: """ Move contact sources from one contact to another. Args: from_id: ID of the contact to move sources from to_id: ID of the contact to move sources to """ # Check if the ContactSource table exists query = """ SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_name = 'ContactSource' ) as exists """ result = self.db.execute_query(query, {}) if not result or not result[0]["exists"]: logger.info("ContactSource table does not exist, skipping contact sources migration") return query = """ INSERT INTO "ContactSource" ( id, "contactId", "dataSourceId", "createdAt", "updatedAt" ) SELECT gen_random_uuid(), %(to_id)s, "dataSourceId", "createdAt", NOW() FROM "ContactSource" WHERE "contactId" = %(from_id)s ON CONFLICT ("contactId", "dataSourceId") DO NOTHING """ self.db.execute_update(query, {"from_id": from_id, "to_id": to_id}) def run(self) -> None: """Run all fixes.""" logger.info("Starting contact fixes...") # Fix prefixed names fixed_names = self.fix_prefixed_names() # Merge duplicate contacts merged_contacts = self.merge_duplicate_contacts() logger.info(f"Completed fixes: {fixed_names} name prefixes removed, {merged_contacts} duplicate contacts merged") def main(): """Main entry point for the script.""" parser = argparse.ArgumentParser(description="Fix contact issues") parser.add_argument("--names-only", action="store_true", help="Only fix prefixed names, don't merge duplicates") parser.add_argument("--duplicates-only", action="store_true", help="Only merge duplicate contacts, don't fix names") args = parser.parse_args() fixer = ContactFixer() if args.names_only: fixer.fix_prefixed_names() elif args.duplicates_only: fixer.merge_duplicate_contacts() else: fixer.run() if __name__ == "__main__": main()