#!/usr/bin/env python3 """ Import Raid Guild Members from CSV This script imports Raid Guild DAO members from a CSV file exported from DAOhaus. It adds the members to the database with proper DAO membership records and notes. Usage: python import_raid_guild_csv.py """ import os import sys import csv import logging from typing import Dict, Any, List, Optional from datetime import datetime from dotenv import load_dotenv # Add parent directory to path to import utils sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from utils.db_connector import DatabaseConnector from utils.logger import setup_logger # Load environment variables load_dotenv() # Setup logging logger = setup_logger("raid_guild_importer") class RaidGuildImporter: """Importer for Raid Guild members from CSV file""" def __init__(self, csv_path: str): """Initialize the importer""" self.csv_path = csv_path # Initialize database self.db = DatabaseConnector() # Register data source self.data_source_id = self.register_data_source() def register_data_source(self) -> str: """Register the data source in the database""" query = """ INSERT INTO "DataSource" ( id, name, type, description, "createdAt", "updatedAt" ) VALUES ( gen_random_uuid(), %(name)s, %(type)s, %(description)s, NOW(), NOW() ) ON CONFLICT (name) DO UPDATE SET type = EXCLUDED.type, description = EXCLUDED.description, "updatedAt" = NOW() RETURNING id """ result = self.db.execute_query(query, { "name": "Raid Guild DAO CSV", "description": "Raid Guild is a Moloch DAO on Gnosis Chain with 151 members. Imported from CSV export.", "type": "blockchain" }) data_source_id = result[0]["id"] logger.info(f"Registered data source with ID: {data_source_id}") return data_source_id def read_csv(self) -> List[Dict[str, Any]]: """Read the CSV file and return a list of members""" members = [] try: with open(self.csv_path, 'r') as csvfile: reader = csv.DictReader(csvfile) for row in reader: # Only include members that exist and haven't ragequit if row.get('exists', '').lower() == 'true' and row.get('didRagequit', '').lower() == 'false': members.append({ "address": row.get('memberAddress', '').lower(), "delegateKey": row.get('delegateKey', '').lower(), "shares": int(row.get('shares', 0)), "loot": int(row.get('loot', 0)), "joined_at": row.get('createdAt', None) }) except Exception as e: logger.error(f"Error reading CSV file: {e}") raise logger.info(f"Read {len(members)} members from CSV file") return members def get_ens_name_for_address(self, address: str) -> Optional[str]: """Check if we already have an ENS name for this address in the database""" query = 'SELECT "ensName" FROM "Contact" WHERE "ethereumAddress" = %(address)s AND "ensName" IS NOT NULL' result = self.db.execute_query(query, {"address": address}) if result and result[0]["ensName"]: return result[0]["ensName"] return None def check_existing_dao_membership(self, contact_id: str, dao_name: str) -> Optional[Dict[str, Any]]: """Check if a contact already has a membership in the specified DAO""" query = """ SELECT id, "daoType", "joinedAt" FROM "DaoMembership" WHERE "contactId" = %(contact_id)s AND "daoName" = %(dao_name)s """ result = self.db.execute_query(query, { "contact_id": contact_id, "dao_name": dao_name }) if result: return result[0] return None def generate_name_for_contact(self, address: str, ens_name: Optional[str] = None) -> str: """Generate a better name for the contact based on ENS or address""" if ens_name: # Use ENS name without .eth suffix as the name if ens_name.endswith('.eth'): return ens_name[:-4] # Remove .eth suffix return ens_name # Use shortened address if no ENS name return f"RG_{address[:6]}...{address[-4:]}" def process_member(self, member: Dict[str, Any]) -> Optional[str]: """Process a member and add to the database""" address = member["address"] # Check if contact already exists and get ENS name if available query = 'SELECT id, "ensName", name FROM "Contact" WHERE "ethereumAddress" = %(address)s' result = self.db.execute_query(query, {"address": address}) contact_id = None ens_name = None if result: # Contact exists, update if needed contact_id = result[0]["id"] ens_name = result[0].get("ensName") current_name = result[0].get("name") logger.info(f"Contact already exists for {address} with ID {contact_id}") # Update the contact if needed (e.g., if name is generic or missing ENS) if current_name == "Raid Guild Member" or (not ens_name and self.get_ens_name_for_address(address)): # Get ENS name if we don't have one if not ens_name: ens_name = self.get_ens_name_for_address(address) # Generate a better name name = self.generate_name_for_contact(address, ens_name) # Update the contact update_query = """ UPDATE "Contact" SET "ensName" = COALESCE(%(ens_name)s, "ensName"), name = %(name)s, "updatedAt" = NOW() WHERE id = %(contact_id)s """ self.db.execute_update(update_query, { "contact_id": contact_id, "ens_name": ens_name, "name": name }) logger.info(f"Updated contact {contact_id} with name '{name}' and ENS '{ens_name}'") else: # Create new contact # Check if we have an ENS name for this address ens_name = self.get_ens_name_for_address(address) # Generate a better name for the contact name = self.generate_name_for_contact(address, ens_name) # Create new contact query = """ INSERT INTO "Contact" ( id, "ethereumAddress", "ensName", name, "createdAt", "updatedAt" ) VALUES ( gen_random_uuid(), %(address)s, %(ens_name)s, %(name)s, NOW(), NOW() ) RETURNING id """ result = self.db.execute_query(query, { "address": address, "ens_name": ens_name, "name": name }) if not result: logger.error(f"Failed to add contact for {address}") return None contact_id = result[0]["id"] logger.info(f"Added new contact: {address} with ID {contact_id}") # Check if DAO membership already exists existing_membership = self.check_existing_dao_membership(contact_id, "Raid Guild") # Parse joined_at timestamp joined_at = None if member.get("joined_at"): try: # Convert Unix timestamp to datetime joined_at_timestamp = int(member["joined_at"]) joined_at = datetime.fromtimestamp(joined_at_timestamp) except (ValueError, TypeError): joined_at = None if existing_membership: # Only update if we have better information if joined_at and (not existing_membership.get("joinedAt") or existing_membership.get("joinedAt") > joined_at): # Update with better join date update_query = """ UPDATE "DaoMembership" SET "joinedAt" = %(joined_at)s, "updatedAt" = NOW() WHERE id = %(membership_id)s """ self.db.execute_update(update_query, { "membership_id": existing_membership["id"], "joined_at": joined_at }) logger.info(f"Updated existing DAO membership for contact {contact_id} with better join date") else: # Add new DAO membership query = """ INSERT INTO "DaoMembership" ( id, "contactId", "daoName", "daoType", "joinedAt", "createdAt", "updatedAt" ) VALUES ( gen_random_uuid(), %(contact_id)s, %(dao_name)s, %(dao_type)s, %(joined_at)s, NOW(), NOW() ) """ self.db.execute_update(query, { "contact_id": contact_id, "dao_name": "Raid Guild", "dao_type": "Moloch DAO", "joined_at": joined_at }) logger.info(f"Added new DAO membership for contact {contact_id}") # Check if a similar note already exists note_content = f"Member of Raid Guild DAO (0xfe1084bc16427e5eb7f13fc19bcd4e641f7d571f) with {member['shares']} shares and {member['loot']} loot" check_note_query = """ SELECT id FROM "Note" WHERE "contactId" = %(contact_id)s AND content LIKE %(content_pattern)s """ existing_note = self.db.execute_query(check_note_query, { "contact_id": contact_id, "content_pattern": "%Member of Raid Guild DAO%shares%loot%" }) if existing_note: # Update the existing note update_note_query = """ UPDATE "Note" SET content = %(content)s, "updatedAt" = NOW() WHERE id = %(note_id)s """ self.db.execute_update(update_note_query, { "note_id": existing_note[0]["id"], "content": note_content }) logger.info(f"Updated existing note for contact {contact_id}") else: # Add a note about the member's shares and loot query = """ INSERT INTO "Note" ( id, "contactId", content, "createdAt", "updatedAt" ) VALUES ( gen_random_uuid(), %(contact_id)s, %(content)s, NOW(), NOW() ) """ self.db.execute_update(query, { "contact_id": contact_id, "content": note_content }) logger.info(f"Added new note for contact {contact_id}") return contact_id def run(self): """Run the importer""" logger.info(f"Starting Raid Guild member import from {self.csv_path}") # Read members from CSV members = self.read_csv() # Process members processed_count = 0 for member in members: if self.process_member(member): processed_count += 1 logger.info(f"Processed {processed_count} members out of {len(members)} found") return processed_count def main(): """Main function""" try: csv_path = os.path.join( os.path.dirname(os.path.abspath(__file__)), "raid-guild-hkr_Members_1742163047.csv" ) importer = RaidGuildImporter(csv_path) processed_count = importer.run() logger.info(f"Import completed successfully. Processed {processed_count} members.") return 0 except Exception as e: logger.exception(f"Error running importer: {e}") return 1 if __name__ == "__main__": sys.exit(main())