#!/usr/bin/env python3 """ Import Public Haus Members from DAOhaus v3 Subgraph This script fetches members of Public Haus DAO from the DAOhaus v3 subgraph on Optimism mainnet, imports them into the database, and links them to the Public Haus DAO. Usage: python import_public_haus_members.py """ import os import sys import logging import requests import json from typing import Dict, Any, List, Optional from dotenv import load_dotenv # Add parent directory to path to import utils sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from utils.db_connector import DatabaseConnector from utils.logger import setup_logger # Load environment variables load_dotenv() # Setup logging logger = setup_logger("public_haus_importer") # Constants SUBGRAPH_URL = "https://api.thegraph.com/subgraphs/name/hausdao/daohaus-v3-optimism" PUBLIC_HAUS_DAO_ID = "0xf5d6b637a9185707f52d40d452956ca49018247a" # Public Haus DAO ID on Optimism class PublicHausImporter: """Importer for Public Haus members from DAOhaus v3 subgraph""" def __init__(self): """Initialize the importer""" # Initialize database self.db = DatabaseConnector() # Register data source self.data_source_id = self.register_data_source() def register_data_source(self) -> str: """Register the Public Haus data source in the database""" return self.db.upsert_data_source( name="Public Haus DAO Subgraph", source_type="subgraph", description="Public Haus DAO members from DAOhaus v3 subgraph on Optimism mainnet" ) def fetch_members_from_subgraph(self) -> List[Dict[str, Any]]: """ Fetch Public Haus members from the DAOhaus v3 subgraph Returns: List of member data from the subgraph """ # GraphQL query to fetch members query = """ query { dao(id: "%s") { id name members { id memberAddress shares loot createdAt delegatingTo delegateOfCount delegateOf { memberAddress } } } } """ % PUBLIC_HAUS_DAO_ID.lower() # Make request to subgraph response = requests.post( SUBGRAPH_URL, json={"query": query} ) # Check for errors if response.status_code != 200: logger.error(f"Error fetching members: {response.text}") raise Exception(f"Error fetching members: {response.status_code}") data = response.json() # Check if DAO exists if not data.get("data") or not data["data"].get("dao"): logger.error(f"DAO not found: {PUBLIC_HAUS_DAO_ID}") raise Exception(f"DAO not found: {PUBLIC_HAUS_DAO_ID}") # Get members members = data["data"]["dao"]["members"] logger.info(f"Fetched {len(members)} members from subgraph") return members def process_member(self, member: Dict[str, Any]) -> Optional[str]: """ Process a single member and import into database Args: member: Member data from the subgraph Returns: Contact ID if successful, None otherwise """ # Extract member data address = member["memberAddress"] shares = int(member["shares"]) loot = int(member["loot"]) created_at = member["createdAt"] delegating_to = member.get("delegatingTo") # Skip if no address if not address: logger.warning(f"Member has no address: {member}") return None # Check if contact already exists query = 'SELECT id, name, "ensName" FROM "Contact" WHERE "ethereumAddress" ILIKE %(address)s' existing_contacts = self.db.execute_query(query, {"address": address}) contact_id = None if existing_contacts: # Use existing contact contact_id = existing_contacts[0]["id"] logger.info(f"Found existing contact {contact_id} for address {address}") else: # Create new contact contact_data = { "ethereumAddress": address, "name": f"Public Haus Member {address[:8]}", # Default name } contact_id = self.db.upsert_contact(contact_data) logger.info(f"Created new contact {contact_id} for address {address}") # Add DAO membership self.db.execute_update( """ INSERT INTO "DaoMembership" ("contactId", "daoName", "shares", "loot", "delegatingTo") VALUES (%(contact_id)s, %(dao_name)s, %(shares)s, %(loot)s, %(delegating_to)s) ON CONFLICT ("contactId", "daoName") DO UPDATE SET "shares" = %(shares)s, "loot" = %(loot)s, "delegatingTo" = %(delegating_to)s, "updatedAt" = NOW() """, { "contact_id": contact_id, "dao_name": "Public Haus", "shares": shares, "loot": loot, "delegating_to": delegating_to } ) # Add note about membership note_content = f"Public Haus DAO Member\nShares: {shares}\nLoot: {loot}\nJoined: {created_at}" if delegating_to: note_content += f"\nDelegating to: {delegating_to}" self.db.add_note_to_contact( contact_id=contact_id, content=note_content, source="Public Haus DAO Subgraph" ) # Link to data source self.db.link_contact_to_data_source(contact_id, self.data_source_id) return contact_id def run(self) -> int: """ Run the importer Returns: Number of members imported """ # Create a scraping job job_id = self.db.create_scraping_job("Public Haus DAO Importer", "running") logger.info(f"Created scraping job with ID: {job_id}") try: # Fetch members members = self.fetch_members_from_subgraph() if not members: logger.info("No members found") self.db.update_scraping_job(job_id, "completed") return 0 # Process members imported_count = 0 existing_count = 0 for member in members: try: contact_id = self.process_member(member) if contact_id: imported_count += 1 except Exception as e: logger.exception(f"Error processing member {member.get('memberAddress')}: {e}") # Complete the scraping job self.db.update_scraping_job( job_id, "completed", records_processed=len(members), records_added=imported_count, records_updated=existing_count ) logger.info(f"Imported {imported_count} members out of {len(members)} processed") return imported_count except Exception as e: # Update the scraping job with error self.db.update_scraping_job(job_id, "failed", error_message=str(e)) logger.exception(f"Error importing members: {e}") raise def main(): """Main function""" try: importer = PublicHausImporter() imported_count = importer.run() logger.info(f"Import completed successfully. Imported {imported_count} members.") return 0 except Exception as e: logger.exception(f"Error importing members: {e}") return 1 if __name__ == "__main__": sys.exit(main())