From cd4ffa3665d4f49f0982274472deff0ec060e91d Mon Sep 17 00:00:00 2001 From: boilerrat <128boilerrat@gmail.com> Date: Sun, 16 Mar 2025 19:25:24 -0400 Subject: [PATCH] Improve contact data quality and handling of duplicates. Add scripts for merging duplicate contacts, fixing generic names, and checking data quality. --- scripts/moloch_dao/check_and_fix_data.py | 478 ++++++++++++++++++ scripts/moloch_dao/import_raid_guild_csv.py | 199 ++++++-- .../moloch_dao/merge_duplicate_contacts.py | 318 ++++++++++++ scripts/moloch_dao/update_raid_guild_names.py | 127 +++++ 4 files changed, 1087 insertions(+), 35 deletions(-) create mode 100644 scripts/moloch_dao/check_and_fix_data.py create mode 100644 scripts/moloch_dao/merge_duplicate_contacts.py create mode 100644 scripts/moloch_dao/update_raid_guild_names.py diff --git a/scripts/moloch_dao/check_and_fix_data.py b/scripts/moloch_dao/check_and_fix_data.py new file mode 100644 index 0000000..109ea52 --- /dev/null +++ b/scripts/moloch_dao/check_and_fix_data.py @@ -0,0 +1,478 @@ +#!/usr/bin/env python3 +""" +Check and Fix Data Issues + +This script checks for various data issues in the database and fixes them: +1. Duplicate contacts by Ethereum address +2. Contacts with multiple DAO memberships +3. Contacts with missing or generic names +4. Other data quality issues + +Usage: + python check_and_fix_data.py +""" + +import os +import sys +import logging +from typing import Dict, Any, List, Optional, Tuple +from dotenv import load_dotenv + +# Add parent directory to path to import utils +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from utils.db_connector import DatabaseConnector +from utils.logger import setup_logger + +# Load environment variables +load_dotenv() + +# Setup logging +logger = setup_logger("data_checker_fixer") + +class DataCheckerFixer: + """Checker and fixer for data issues""" + + def __init__(self): + """Initialize the checker/fixer""" + # Initialize database + self.db = DatabaseConnector() + + def check_duplicate_addresses(self) -> List[Dict[str, Any]]: + """Check for duplicate Ethereum addresses in the Contact table""" + query = """ + SELECT "ethereumAddress", COUNT(*) as count, array_agg(id) as contact_ids + FROM "Contact" + WHERE "ethereumAddress" IS NOT NULL + GROUP BY "ethereumAddress" + HAVING COUNT(*) > 1 + ORDER BY COUNT(*) DESC + """ + + result = self.db.execute_query(query) + logger.info(f"Found {len(result)} duplicate Ethereum addresses") + return result + + def check_multi_dao_contacts(self) -> List[Dict[str, Any]]: + """Check for contacts that belong to multiple DAOs""" + query = """ + SELECT + c.id, + c."ethereumAddress", + c.name, + COUNT(DISTINCT dm."daoName") as dao_count, + array_agg(dm."daoName") as daos + FROM "Contact" c + JOIN "DaoMembership" dm ON c.id = dm."contactId" + GROUP BY c.id, c."ethereumAddress", c.name + HAVING COUNT(DISTINCT dm."daoName") > 1 + ORDER BY dao_count DESC + """ + + result = self.db.execute_query(query) + logger.info(f"Found {len(result)} contacts that belong to multiple DAOs") + + # Print details for each multi-DAO contact + for contact in result: + logger.info(f"Contact {contact['name']} ({contact['ethereumAddress']}) belongs to {contact['dao_count']} DAOs: {', '.join(contact['daos'])}") + + return result + + def check_generic_names(self) -> List[Dict[str, Any]]: + """Check for contacts with generic or missing names""" + query = """ + SELECT id, "ethereumAddress", name, "ensName" + FROM "Contact" + WHERE name IS NULL + OR name = '' + OR name = 'Raid Guild Member' + OR name LIKE 'RG\\_%%' ESCAPE '\\' + """ + + result = self.db.execute_query(query) + logger.info(f"Found {len(result)} contacts with generic or missing names") + return result + + def fix_generic_names(self) -> int: + """Fix contacts with generic or missing names""" + # Get contacts with generic names + contacts = self.check_generic_names() + + fixed_count = 0 + for contact in contacts: + contact_id = contact["id"] + eth_address = contact["ethereumAddress"] + ens_name = contact["ensName"] + + # Generate a better name + if ens_name: + # Use ENS name without .eth suffix + if ens_name.endswith('.eth'): + name = ens_name[:-4] + else: + name = ens_name + else: + # Get DAO memberships + dao_query = """ + SELECT "daoName" FROM "DaoMembership" + WHERE "contactId" = %(contact_id)s + """ + + dao_result = self.db.execute_query(dao_query, {"contact_id": contact_id}) + dao_names = [dao["daoName"] for dao in dao_result] if dao_result else [] + + # Use first DAO as prefix for the shortened address + if dao_names: + dao_prefix = dao_names[0][:2] # First two letters of the DAO name + name = f"{dao_prefix}_{eth_address[:6]}...{eth_address[-4:]}" + else: + name = f"ETH_{eth_address[:6]}...{eth_address[-4:]}" + + # Update the contact + update_query = """ + UPDATE "Contact" + SET name = %(name)s, + "updatedAt" = NOW() + WHERE id = %(contact_id)s + """ + + rows_updated = self.db.execute_update(update_query, { + "contact_id": contact_id, + "name": name + }) + + if rows_updated > 0: + logger.info(f"Updated name for contact {contact_id} to '{name}'") + fixed_count += 1 + + logger.info(f"Fixed {fixed_count} contacts with generic or missing names") + return fixed_count + + def check_case_sensitivity_issues(self) -> List[Dict[str, Any]]: + """Check for case sensitivity issues in Ethereum addresses""" + query = """ + SELECT LOWER("ethereumAddress") as lower_address, + array_agg("ethereumAddress") as addresses, + array_agg(id) as contact_ids, + COUNT(*) as count + FROM "Contact" + GROUP BY LOWER("ethereumAddress") + HAVING COUNT(*) > 1 + """ + + result = self.db.execute_query(query) + logger.info(f"Found {len(result)} potential case sensitivity issues with Ethereum addresses") + return result + + def fix_case_sensitivity_issues(self) -> int: + """Fix case sensitivity issues in Ethereum addresses""" + # Get case sensitivity issues + issues = self.check_case_sensitivity_issues() + + fixed_count = 0 + for issue in issues: + lower_address = issue["lower_address"] + addresses = issue["addresses"] + contact_ids = issue["contact_ids"] + + logger.info(f"Fixing case sensitivity issue for address {lower_address}") + logger.info(f" Found variations: {', '.join(addresses)}") + + try: + # Start a transaction + self.db.execute_query("BEGIN") + + # First, check if any contact already has the lowercase address + existing_query = """ + SELECT id FROM "Contact" + WHERE "ethereumAddress" = %(lower_address)s + """ + + existing = self.db.execute_query(existing_query, {"lower_address": lower_address}) + + if existing: + # We have a contact with the lowercase address already + # This is the primary contact we'll keep + primary_id = existing[0]["id"] + secondary_ids = [cid for cid in contact_ids if cid != primary_id] + + logger.info(f" Found existing contact {primary_id} with lowercase address") + logger.info(f" Will merge {len(secondary_ids)} other contacts into it") + + # For each secondary contact, transfer its DAO memberships to the primary + for secondary_id in secondary_ids: + # Get DAO memberships for this secondary contact + secondary_daos_query = """ + SELECT id, "daoName", "daoType", "joinedAt" + FROM "DaoMembership" + WHERE "contactId" = %(contact_id)s + """ + secondary_daos = self.db.execute_query(secondary_daos_query, {"contact_id": secondary_id}) + + # Get existing DAO memberships for the primary contact + primary_daos_query = """ + SELECT "daoName" FROM "DaoMembership" WHERE "contactId" = %(contact_id)s + """ + primary_daos_result = self.db.execute_query(primary_daos_query, {"contact_id": primary_id}) + primary_daos = [row["daoName"] for row in primary_daos_result] if primary_daos_result else [] + + # For each DAO membership of the secondary contact + for dao in secondary_daos: + dao_name = dao["daoName"] + + if dao_name in primary_daos: + # Primary already has this membership, skip + logger.info(f" Primary already has membership in {dao_name}, skipping") + continue + else: + # Primary doesn't have this membership, transfer it + update_query = """ + UPDATE "DaoMembership" + SET "contactId" = %(primary_id)s, + "updatedAt" = NOW() + WHERE id = %(membership_id)s + """ + + self.db.execute_update(update_query, { + "primary_id": primary_id, + "membership_id": dao["id"] + }) + + # Add to primary's DAO list to avoid duplicates in future iterations + primary_daos.append(dao_name) + logger.info(f" Transferred {dao_name} membership from secondary to primary contact") + + # Transfer notes + notes_query = """ + UPDATE "Note" + SET "contactId" = %(primary_id)s, + "updatedAt" = NOW() + WHERE "contactId" = %(secondary_id)s + """ + + self.db.execute_update(notes_query, { + "primary_id": primary_id, + "secondary_id": secondary_id + }) + + # Delete the secondary contact + delete_query = """ + DELETE FROM "Contact" + WHERE id = %(secondary_id)s + """ + + self.db.execute_update(delete_query, {"secondary_id": secondary_id}) + logger.info(f" Deleted secondary contact {secondary_id}") + fixed_count += 1 + else: + # No contact has the lowercase address yet + # Choose the first contact as primary and update its address to lowercase + primary_id = contact_ids[0] + secondary_ids = contact_ids[1:] + + logger.info(f" No contact with lowercase address found") + logger.info(f" Will update contact {primary_id} to lowercase and merge {len(secondary_ids)} others into it") + + # Update primary contact to lowercase + update_query = """ + UPDATE "Contact" + SET "ethereumAddress" = %(lower_address)s, + "updatedAt" = NOW() + WHERE id = %(primary_id)s + """ + + self.db.execute_update(update_query, { + "primary_id": primary_id, + "lower_address": lower_address + }) + + logger.info(f" Updated address for contact {primary_id} to lowercase") + fixed_count += 1 + + # Now handle secondary contacts same as above + # Get existing DAO memberships for the primary contact + primary_daos_query = """ + SELECT "daoName" FROM "DaoMembership" WHERE "contactId" = %(contact_id)s + """ + primary_daos_result = self.db.execute_query(primary_daos_query, {"contact_id": primary_id}) + primary_daos = [row["daoName"] for row in primary_daos_result] if primary_daos_result else [] + + for secondary_id in secondary_ids: + # Get DAO memberships for this secondary contact + secondary_daos_query = """ + SELECT id, "daoName", "daoType", "joinedAt" + FROM "DaoMembership" + WHERE "contactId" = %(contact_id)s + """ + secondary_daos = self.db.execute_query(secondary_daos_query, {"contact_id": secondary_id}) + + # For each DAO membership of the secondary contact + for dao in secondary_daos: + dao_name = dao["daoName"] + + if dao_name in primary_daos: + # Primary already has this membership, skip + logger.info(f" Primary already has membership in {dao_name}, skipping") + continue + else: + # Primary doesn't have this membership, transfer it + update_query = """ + UPDATE "DaoMembership" + SET "contactId" = %(primary_id)s, + "updatedAt" = NOW() + WHERE id = %(membership_id)s + """ + + self.db.execute_update(update_query, { + "primary_id": primary_id, + "membership_id": dao["id"] + }) + + # Add to primary's DAO list to avoid duplicates in future iterations + primary_daos.append(dao_name) + logger.info(f" Transferred {dao_name} membership from secondary to primary contact") + + # Transfer notes + notes_query = """ + UPDATE "Note" + SET "contactId" = %(primary_id)s, + "updatedAt" = NOW() + WHERE "contactId" = %(secondary_id)s + """ + + self.db.execute_update(notes_query, { + "primary_id": primary_id, + "secondary_id": secondary_id + }) + + # Delete the secondary contact + delete_query = """ + DELETE FROM "Contact" + WHERE id = %(secondary_id)s + """ + + self.db.execute_update(delete_query, {"secondary_id": secondary_id}) + logger.info(f" Deleted secondary contact {secondary_id}") + fixed_count += 1 + + # Commit the transaction + self.db.execute_query("COMMIT") + logger.info(f" Successfully fixed case sensitivity issue for {lower_address}") + + except Exception as e: + # Rollback on error + self.db.execute_query("ROLLBACK") + logger.error(f"Error fixing case sensitivity issue for {lower_address}: {e}") + continue + + logger.info(f"Fixed {fixed_count} contacts with case sensitivity issues") + return fixed_count + + def check_dao_membership_issues(self) -> List[Dict[str, Any]]: + """Check for issues with DAO memberships""" + query = """ + SELECT dm."contactId", dm."daoName", COUNT(*) as count + FROM "DaoMembership" dm + GROUP BY dm."contactId", dm."daoName" + HAVING COUNT(*) > 1 + """ + + result = self.db.execute_query(query) + logger.info(f"Found {len(result)} duplicate DAO memberships") + return result + + def fix_dao_membership_issues(self) -> int: + """Fix issues with DAO memberships""" + # Get duplicate DAO memberships + issues = self.check_dao_membership_issues() + + fixed_count = 0 + for issue in issues: + contact_id = issue["contactId"] + dao_name = issue["daoName"] + count = issue["count"] + + logger.info(f"Fixing duplicate DAO membership for contact {contact_id} in {dao_name} (found {count})") + + # Get all memberships for this contact and DAO + membership_query = """ + SELECT id, "joinedAt" + FROM "DaoMembership" + WHERE "contactId" = %(contact_id)s AND "daoName" = %(dao_name)s + ORDER BY "joinedAt" ASC NULLS LAST + """ + + memberships = self.db.execute_query(membership_query, { + "contact_id": contact_id, + "dao_name": dao_name + }) + + # Keep the one with the earliest join date + keep_id = memberships[0]["id"] + delete_ids = [m["id"] for m in memberships[1:]] + + logger.info(f" Keeping membership {keep_id}, deleting {len(delete_ids)} duplicates") + + # Delete duplicate memberships + delete_query = """ + DELETE FROM "DaoMembership" + WHERE id IN %(delete_ids)s + """ + + rows_deleted = self.db.execute_update(delete_query, { + "delete_ids": tuple(delete_ids) + }) + + if rows_deleted > 0: + logger.info(f" Deleted {rows_deleted} duplicate memberships") + fixed_count += rows_deleted + + logger.info(f"Fixed {fixed_count} duplicate DAO memberships") + return fixed_count + + def run(self): + """Run all checks and fixes""" + logger.info("Starting data check and fix") + + # Check for various issues + duplicate_addresses = self.check_duplicate_addresses() + multi_dao_contacts = self.check_multi_dao_contacts() + generic_names = self.check_generic_names() + case_issues = self.check_case_sensitivity_issues() + dao_issues = self.check_dao_membership_issues() + + # Fix issues if found + fixed_count = 0 + + if case_issues: + fixed_count += self.fix_case_sensitivity_issues() + + if duplicate_addresses: + logger.warning("Found duplicate Ethereum addresses, but not fixing automatically") + logger.warning("Please run the merge_duplicate_contacts.py script to fix these") + + if generic_names: + fixed_count += self.fix_generic_names() + + if dao_issues: + fixed_count += self.fix_dao_membership_issues() + + logger.info(f"Data check and fix completed. Fixed {fixed_count} issues.") + + # Final check for multi-DAO contacts (this is not an issue, just informational) + final_multi_dao = self.check_multi_dao_contacts() + + return fixed_count + +def main(): + """Main function""" + try: + checker = DataCheckerFixer() + fixed_count = checker.run() + logger.info(f"Data check and fix completed successfully. Fixed {fixed_count} issues.") + return 0 + except Exception as e: + logger.exception(f"Error checking and fixing data: {e}") + return 1 + +if __name__ == "__main__": + sys.exit(main()) \ No newline at end of file diff --git a/scripts/moloch_dao/import_raid_guild_csv.py b/scripts/moloch_dao/import_raid_guild_csv.py index 4e6ec0d..2b265d1 100644 --- a/scripts/moloch_dao/import_raid_guild_csv.py +++ b/scripts/moloch_dao/import_raid_guild_csv.py @@ -91,32 +91,110 @@ class RaidGuildImporter: logger.info(f"Read {len(members)} members from CSV file") return members + def get_ens_name_for_address(self, address: str) -> Optional[str]: + """Check if we already have an ENS name for this address in the database""" + query = 'SELECT "ensName" FROM "Contact" WHERE "ethereumAddress" = %(address)s AND "ensName" IS NOT NULL' + result = self.db.execute_query(query, {"address": address}) + + if result and result[0]["ensName"]: + return result[0]["ensName"] + return None + + def check_existing_dao_membership(self, contact_id: str, dao_name: str) -> Optional[Dict[str, Any]]: + """Check if a contact already has a membership in the specified DAO""" + query = """ + SELECT id, "daoType", "joinedAt" + FROM "DaoMembership" + WHERE "contactId" = %(contact_id)s AND "daoName" = %(dao_name)s + """ + + result = self.db.execute_query(query, { + "contact_id": contact_id, + "dao_name": dao_name + }) + + if result: + return result[0] + return None + + def generate_name_for_contact(self, address: str, ens_name: Optional[str] = None) -> str: + """Generate a better name for the contact based on ENS or address""" + if ens_name: + # Use ENS name without .eth suffix as the name + if ens_name.endswith('.eth'): + return ens_name[:-4] # Remove .eth suffix + return ens_name + + # Use shortened address if no ENS name + return f"RG_{address[:6]}...{address[-4:]}" + def process_member(self, member: Dict[str, Any]) -> Optional[str]: """Process a member and add to the database""" address = member["address"] - # Check if contact already exists - query = 'SELECT id FROM "Contact" WHERE "ethereumAddress" = %(address)s' + # Check if contact already exists and get ENS name if available + query = 'SELECT id, "ensName", name FROM "Contact" WHERE "ethereumAddress" = %(address)s' result = self.db.execute_query(query, {"address": address}) + contact_id = None + ens_name = None + if result: + # Contact exists, update if needed contact_id = result[0]["id"] + ens_name = result[0].get("ensName") + current_name = result[0].get("name") + logger.info(f"Contact already exists for {address} with ID {contact_id}") + + # Update the contact if needed (e.g., if name is generic or missing ENS) + if current_name == "Raid Guild Member" or (not ens_name and self.get_ens_name_for_address(address)): + # Get ENS name if we don't have one + if not ens_name: + ens_name = self.get_ens_name_for_address(address) + + # Generate a better name + name = self.generate_name_for_contact(address, ens_name) + + # Update the contact + update_query = """ + UPDATE "Contact" + SET "ensName" = COALESCE(%(ens_name)s, "ensName"), + name = %(name)s, + "updatedAt" = NOW() + WHERE id = %(contact_id)s + """ + + self.db.execute_update(update_query, { + "contact_id": contact_id, + "ens_name": ens_name, + "name": name + }) + + logger.info(f"Updated contact {contact_id} with name '{name}' and ENS '{ens_name}'") else: + # Create new contact + # Check if we have an ENS name for this address + ens_name = self.get_ens_name_for_address(address) + + # Generate a better name for the contact + name = self.generate_name_for_contact(address, ens_name) + # Create new contact query = """ INSERT INTO "Contact" ( - id, "ethereumAddress", name, "createdAt", "updatedAt" + id, "ethereumAddress", "ensName", name, "createdAt", "updatedAt" ) VALUES ( - gen_random_uuid(), %(address)s, %(name)s, NOW(), NOW() + gen_random_uuid(), %(address)s, %(ens_name)s, %(name)s, NOW(), NOW() ) RETURNING id """ result = self.db.execute_query(query, { "address": address, - "name": f"Raid Guild Member" + "ens_name": ens_name, + "name": name }) if not result: @@ -126,21 +204,10 @@ class RaidGuildImporter: contact_id = result[0]["id"] logger.info(f"Added new contact: {address} with ID {contact_id}") - # Add DAO membership - query = """ - INSERT INTO "DaoMembership" ( - id, "contactId", "daoName", "daoType", "joinedAt", "createdAt", "updatedAt" - ) - VALUES ( - gen_random_uuid(), %(contact_id)s, %(dao_name)s, %(dao_type)s, - %(joined_at)s, NOW(), NOW() - ) - ON CONFLICT ("contactId", "daoName") DO UPDATE - SET "daoType" = EXCLUDED."daoType", - "joinedAt" = EXCLUDED."joinedAt", - "updatedAt" = NOW() - """ + # Check if DAO membership already exists + existing_membership = self.check_existing_dao_membership(contact_id, "Raid Guild") + # Parse joined_at timestamp joined_at = None if member.get("joined_at"): try: @@ -150,28 +217,90 @@ class RaidGuildImporter: except (ValueError, TypeError): joined_at = None - self.db.execute_update(query, { - "contact_id": contact_id, - "dao_name": "Raid Guild", - "dao_type": "Moloch DAO", - "joined_at": joined_at - }) + if existing_membership: + # Only update if we have better information + if joined_at and (not existing_membership.get("joinedAt") or existing_membership.get("joinedAt") > joined_at): + # Update with better join date + update_query = """ + UPDATE "DaoMembership" + SET "joinedAt" = %(joined_at)s, + "updatedAt" = NOW() + WHERE id = %(membership_id)s + """ + + self.db.execute_update(update_query, { + "membership_id": existing_membership["id"], + "joined_at": joined_at + }) + + logger.info(f"Updated existing DAO membership for contact {contact_id} with better join date") + else: + # Add new DAO membership + query = """ + INSERT INTO "DaoMembership" ( + id, "contactId", "daoName", "daoType", "joinedAt", "createdAt", "updatedAt" + ) + VALUES ( + gen_random_uuid(), %(contact_id)s, %(dao_name)s, %(dao_type)s, + %(joined_at)s, NOW(), NOW() + ) + """ + + self.db.execute_update(query, { + "contact_id": contact_id, + "dao_name": "Raid Guild", + "dao_type": "Moloch DAO", + "joined_at": joined_at + }) + + logger.info(f"Added new DAO membership for contact {contact_id}") - # Add a note about the member's shares and loot - query = """ - INSERT INTO "Note" ( - id, "contactId", content, "createdAt", "updatedAt" - ) - VALUES ( - gen_random_uuid(), %(contact_id)s, %(content)s, NOW(), NOW() - ) + # Check if a similar note already exists + note_content = f"Member of Raid Guild DAO (0xfe1084bc16427e5eb7f13fc19bcd4e641f7d571f) with {member['shares']} shares and {member['loot']} loot" + check_note_query = """ + SELECT id FROM "Note" + WHERE "contactId" = %(contact_id)s + AND content LIKE %(content_pattern)s """ - self.db.execute_update(query, { + existing_note = self.db.execute_query(check_note_query, { "contact_id": contact_id, - "content": f"Member of Raid Guild DAO (0xfe1084bc16427e5eb7f13fc19bcd4e641f7d571f) with {member['shares']} shares and {member['loot']} loot" + "content_pattern": "%Member of Raid Guild DAO%shares%loot%" }) + if existing_note: + # Update the existing note + update_note_query = """ + UPDATE "Note" + SET content = %(content)s, + "updatedAt" = NOW() + WHERE id = %(note_id)s + """ + + self.db.execute_update(update_note_query, { + "note_id": existing_note[0]["id"], + "content": note_content + }) + + logger.info(f"Updated existing note for contact {contact_id}") + else: + # Add a note about the member's shares and loot + query = """ + INSERT INTO "Note" ( + id, "contactId", content, "createdAt", "updatedAt" + ) + VALUES ( + gen_random_uuid(), %(contact_id)s, %(content)s, NOW(), NOW() + ) + """ + + self.db.execute_update(query, { + "contact_id": contact_id, + "content": note_content + }) + + logger.info(f"Added new note for contact {contact_id}") + return contact_id def run(self): diff --git a/scripts/moloch_dao/merge_duplicate_contacts.py b/scripts/moloch_dao/merge_duplicate_contacts.py new file mode 100644 index 0000000..b028cff --- /dev/null +++ b/scripts/moloch_dao/merge_duplicate_contacts.py @@ -0,0 +1,318 @@ +#!/usr/bin/env python3 +""" +Merge Duplicate Contacts + +This script finds and merges duplicate contacts in the database based on Ethereum addresses. +It keeps the most complete record and updates all relationships to point to the primary contact. + +Usage: + python merge_duplicate_contacts.py +""" + +import os +import sys +import logging +from typing import Dict, Any, List, Optional, Tuple +from dotenv import load_dotenv + +# Add parent directory to path to import utils +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from utils.db_connector import DatabaseConnector +from utils.logger import setup_logger + +# Load environment variables +load_dotenv() + +# Setup logging +logger = setup_logger("duplicate_contact_merger") + +class DuplicateContactMerger: + """Merger for duplicate contacts""" + + def __init__(self): + """Initialize the merger""" + # Initialize database + self.db = DatabaseConnector() + + def find_duplicate_contacts(self) -> List[Dict[str, Any]]: + """Find duplicate contacts based on Ethereum address""" + query = """ + SELECT "ethereumAddress", COUNT(*) as count, array_agg(id) as contact_ids + FROM "Contact" + WHERE "ethereumAddress" IS NOT NULL + GROUP BY "ethereumAddress" + HAVING COUNT(*) > 1 + ORDER BY COUNT(*) DESC + """ + + return self.db.execute_query(query) + + def get_contact_details(self, contact_id: str) -> Dict[str, Any]: + """Get details for a specific contact""" + query = """ + SELECT id, "ethereumAddress", "ensName", name, "createdAt", "updatedAt" + FROM "Contact" + WHERE id = %(contact_id)s + """ + + result = self.db.execute_query(query, {"contact_id": contact_id}) + if result: + return result[0] + return {} + + def determine_primary_contact(self, contact_ids: List[str]) -> Tuple[str, List[str]]: + """ + Determine which contact should be the primary one to keep. + + Strategy: + 1. Prefer contacts with ENS names + 2. Prefer contacts with non-generic names + 3. Prefer older contacts (earlier creation date) + + Returns: + Tuple of (primary_contact_id, secondary_contact_ids) + """ + contacts = [self.get_contact_details(cid) for cid in contact_ids] + + # Sort contacts by our preference criteria + sorted_contacts = sorted( + contacts, + key=lambda c: ( + # Prefer contacts with ENS names (None sorts last) + c.get("ensName") is None, + # Prefer contacts with non-generic names + c.get("name", "").startswith("RG_"), + # Prefer older contacts + c.get("createdAt") + ) + ) + + primary = sorted_contacts[0]["id"] + secondaries = [c["id"] for c in sorted_contacts[1:]] + + return primary, secondaries + + def merge_contacts(self, primary_id: str, secondary_ids: List[str]) -> bool: + """ + Merge secondary contacts into the primary contact. + + Steps: + 1. Update all relationships to point to the primary contact + 2. Merge any missing data from secondaries into primary + 3. Delete the secondary contacts + + Returns: + True if successful, False otherwise + """ + try: + # Start a transaction + self.db.execute_query("BEGIN") + + # Get primary contact details + primary = self.get_contact_details(primary_id) + logger.info(f"Merging contacts into primary: {primary_id} ({primary.get('name')})") + + # Get existing DAO memberships for the primary contact + primary_daos_query = """ + SELECT "daoName" FROM "DaoMembership" WHERE "contactId" = %(contact_id)s + """ + primary_daos_result = self.db.execute_query(primary_daos_query, {"contact_id": primary_id}) + primary_daos = [row["daoName"] for row in primary_daos_result] if primary_daos_result else [] + + logger.info(f"Primary contact is already a member of: {', '.join(primary_daos) if primary_daos else 'No DAOs'}") + + # For each secondary contact, transfer its DAO memberships to the primary + for secondary_id in secondary_ids: + # Get DAO memberships for this secondary contact + secondary_daos_query = """ + SELECT id, "daoName", "daoType", "joinedAt" + FROM "DaoMembership" + WHERE "contactId" = %(contact_id)s + """ + secondary_daos = self.db.execute_query(secondary_daos_query, {"contact_id": secondary_id}) + + if not secondary_daos: + logger.info(f"Secondary contact {secondary_id} has no DAO memberships") + continue + + logger.info(f"Secondary contact {secondary_id} is a member of: {', '.join([dao['daoName'] for dao in secondary_daos])}") + + # For each DAO membership of the secondary contact + for dao in secondary_daos: + dao_name = dao["daoName"] + + if dao_name in primary_daos: + # Primary already has this membership, check if we need to update join date + primary_dao_query = """ + SELECT id, "joinedAt" FROM "DaoMembership" + WHERE "contactId" = %(contact_id)s AND "daoName" = %(dao_name)s + """ + primary_dao = self.db.execute_query(primary_dao_query, { + "contact_id": primary_id, + "dao_name": dao_name + })[0] + + # If secondary has an earlier join date, update the primary's join date + if dao["joinedAt"] and (not primary_dao["joinedAt"] or dao["joinedAt"] < primary_dao["joinedAt"]): + update_query = """ + UPDATE "DaoMembership" + SET "joinedAt" = %(joined_at)s, + "updatedAt" = NOW() + WHERE id = %(membership_id)s + """ + + self.db.execute_update(update_query, { + "membership_id": primary_dao["id"], + "joined_at": dao["joinedAt"] + }) + + logger.info(f"Updated join date for {dao_name} membership of primary contact") + else: + # Primary doesn't have this membership, transfer it + update_query = """ + UPDATE "DaoMembership" + SET "contactId" = %(primary_id)s, + "updatedAt" = NOW() + WHERE id = %(membership_id)s + """ + + self.db.execute_update(update_query, { + "primary_id": primary_id, + "membership_id": dao["id"] + }) + + # Add to primary's DAO list to avoid duplicates in future iterations + primary_daos.append(dao_name) + logger.info(f"Transferred {dao_name} membership from secondary to primary contact") + + # Update notes + self.db.execute_update(""" + UPDATE "Note" + SET "contactId" = %(primary_id)s, + "updatedAt" = NOW() + WHERE "contactId" IN %(secondary_ids)s + """, { + "primary_id": primary_id, + "secondary_ids": tuple(secondary_ids) + }) + + logger.info(f"Transferred notes from secondary contacts to primary") + + # Update ENS name if primary doesn't have one but a secondary does + if not primary.get("ensName"): + ens_query = """ + SELECT "ensName" FROM "Contact" + WHERE id IN %(secondary_ids)s + AND "ensName" IS NOT NULL + LIMIT 1 + """ + + ens_result = self.db.execute_query(ens_query, {"secondary_ids": tuple(secondary_ids)}) + if ens_result and ens_result[0]["ensName"]: + self.db.execute_update(""" + UPDATE "Contact" + SET "ensName" = %(ens_name)s, + "updatedAt" = NOW() + WHERE id = %(primary_id)s + """, { + "primary_id": primary_id, + "ens_name": ens_result[0]["ensName"] + }) + + logger.info(f"Updated primary contact with ENS name: {ens_result[0]['ensName']}") + + # Transfer any other social media info that primary might be missing + for field in ["twitter", "discord", "telegram", "email", "farcaster", "otherSocial"]: + social_query = f""" + SELECT "{field}" FROM "Contact" + WHERE id IN %(secondary_ids)s + AND "{field}" IS NOT NULL + AND "{field}" != '' + LIMIT 1 + """ + + social_result = self.db.execute_query(social_query, {"secondary_ids": tuple(secondary_ids)}) + if social_result and social_result[0][field]: + # Check if primary has this field + primary_social_query = f""" + SELECT "{field}" FROM "Contact" + WHERE id = %(primary_id)s + """ + + primary_social = self.db.execute_query(primary_social_query, {"primary_id": primary_id})[0] + + # Only update if primary doesn't have this field + if not primary_social[field]: + update_query = f""" + UPDATE "Contact" + SET "{field}" = %(value)s, + "updatedAt" = NOW() + WHERE id = %(primary_id)s + """ + + self.db.execute_update(update_query, { + "primary_id": primary_id, + "value": social_result[0][field] + }) + + logger.info(f"Updated primary contact with {field}: {social_result[0][field]}") + + # Delete secondary contacts + self.db.execute_update(""" + DELETE FROM "Contact" + WHERE id IN %(secondary_ids)s + """, { + "secondary_ids": tuple(secondary_ids) + }) + + # Commit the transaction + self.db.execute_query("COMMIT") + + logger.info(f"Successfully merged {len(secondary_ids)} contacts into {primary_id}") + return True + + except Exception as e: + # Rollback on error + self.db.execute_query("ROLLBACK") + logger.error(f"Error merging contacts: {e}") + return False + + def run(self): + """Run the merger""" + logger.info("Starting duplicate contact merger") + + # Find duplicate contacts + duplicates = self.find_duplicate_contacts() + logger.info(f"Found {len(duplicates)} Ethereum addresses with duplicate contacts") + + total_merged = 0 + for dup in duplicates: + eth_address = dup["ethereumAddress"] + count = dup["count"] + contact_ids = dup["contact_ids"] + + logger.info(f"Processing {count} duplicates for address {eth_address}") + + # Determine primary and secondary contacts + primary_id, secondary_ids = self.determine_primary_contact(contact_ids) + + # Merge the contacts + if self.merge_contacts(primary_id, secondary_ids): + total_merged += len(secondary_ids) + + logger.info(f"Merged a total of {total_merged} duplicate contacts") + return total_merged + +def main(): + """Main function""" + try: + merger = DuplicateContactMerger() + merged_count = merger.run() + logger.info(f"Duplicate contact merging completed successfully. Merged {merged_count} contacts.") + return 0 + except Exception as e: + logger.exception(f"Error merging duplicate contacts: {e}") + return 1 + +if __name__ == "__main__": + sys.exit(main()) \ No newline at end of file diff --git a/scripts/moloch_dao/update_raid_guild_names.py b/scripts/moloch_dao/update_raid_guild_names.py new file mode 100644 index 0000000..e17392a --- /dev/null +++ b/scripts/moloch_dao/update_raid_guild_names.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python3 +""" +Update Raid Guild Member Names + +This script updates the names of Raid Guild members that were previously imported with +the generic name "Raid Guild Member". It sets better names based on ENS names or addresses. + +Usage: + python update_raid_guild_names.py +""" + +import os +import sys +import logging +from typing import Dict, Any, List, Optional +from dotenv import load_dotenv + +# Add parent directory to path to import utils +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from utils.db_connector import DatabaseConnector +from utils.logger import setup_logger + +# Load environment variables +load_dotenv() + +# Setup logging +logger = setup_logger("raid_guild_name_updater") + +class RaidGuildNameUpdater: + """Updater for Raid Guild member names""" + + def __init__(self): + """Initialize the updater""" + # Initialize database + self.db = DatabaseConnector() + + def generate_name_for_contact(self, address: str, ens_name: Optional[str] = None) -> str: + """Generate a better name for the contact based on ENS or address""" + if ens_name: + # Use ENS name without .eth suffix as the name + if ens_name.endswith('.eth'): + return ens_name[:-4] # Remove .eth suffix + return ens_name + + # Use shortened address if no ENS name + return f"RG_{address[:6]}...{address[-4:]}" + + def get_raid_guild_members_with_generic_names(self) -> List[Dict[str, Any]]: + """Get all Raid Guild members with generic names""" + query = """ + SELECT c.id, c."ethereumAddress", c."ensName", c.name + FROM "Contact" c + JOIN "DaoMembership" dm ON c.id = dm."contactId" + WHERE dm."daoName" = 'Raid Guild' + AND c.name = 'Raid Guild Member' + """ + return self.db.execute_query(query) + + def update_member_name(self, contact_id: str, ethereum_address: str, ens_name: Optional[str] = None) -> bool: + """ + Update the name of a member. + + Args: + contact_id: ID of the contact + ethereum_address: Ethereum address of the member + ens_name: ENS name of the member, if any + + Returns: + True if the name was updated, False otherwise + """ + # Generate a better name + name = self.generate_name_for_contact(ethereum_address, ens_name) + + # Update the contact + query = """ + UPDATE "Contact" + SET name = %(name)s, + "updatedAt" = NOW() + WHERE id = %(contact_id)s + """ + + rows_updated = self.db.execute_update(query, { + "contact_id": contact_id, + "name": name + }) + + if rows_updated > 0: + logger.info(f"Updated name for contact {contact_id} to '{name}'") + return True + else: + logger.warning(f"Failed to update name for contact {contact_id}") + return False + + def run(self): + """Run the updater""" + logger.info("Starting Raid Guild member name update") + + # Get all Raid Guild members with generic names + members = self.get_raid_guild_members_with_generic_names() + logger.info(f"Found {len(members)} Raid Guild members with generic names") + + # Update names + updated_count = 0 + for member in members: + if self.update_member_name( + member["id"], + member["ethereumAddress"], + member.get("ensName") + ): + updated_count += 1 + + logger.info(f"Updated names for {updated_count} out of {len(members)} members") + return updated_count + +def main(): + """Main function""" + try: + updater = RaidGuildNameUpdater() + updated_count = updater.run() + logger.info(f"Name update completed successfully. Updated {updated_count} members.") + return 0 + except Exception as e: + logger.exception(f"Error updating names: {e}") + return 1 + +if __name__ == "__main__": + sys.exit(main()) \ No newline at end of file