#!/usr/bin/env python3 """ Import Public Haus Members by Querying Token Holders This script fetches members of Public Haus DAO by querying holders of the shares token, imports them into the database, and links them to the Public Haus DAO. Usage: python import_public_haus_token_holders.py """ import os import sys import logging import json import time from typing import Dict, Any, List, Optional from web3 import Web3 from dotenv import load_dotenv # Add parent directory to path to import utils sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from utils.db_connector import DatabaseConnector from utils.logger import setup_logger # Load environment variables load_dotenv() # Setup logging logger = setup_logger("public_haus_token_importer") # Constants PUBLIC_HAUS_DAO_ID = "0xf5d6b637a9185707f52d40d452956ca49018247a" # Public Haus DAO ID on Optimism SHARES_TOKEN_ADDRESS = "0x4950c436F69c8b4F68ed814A70a5E1D94495c4a7" # From the image, sharesToken address # ERC20 ABI (minimal for balance checking) ERC20_ABI = [ { "constant": True, "inputs": [{"name": "_owner", "type": "address"}], "name": "balanceOf", "outputs": [{"name": "balance", "type": "uint256"}], "type": "function" }, { "constant": True, "inputs": [], "name": "totalSupply", "outputs": [{"name": "", "type": "uint256"}], "type": "function" }, { "constant": True, "inputs": [], "name": "name", "outputs": [{"name": "", "type": "string"}], "type": "function" }, { "constant": True, "inputs": [], "name": "symbol", "outputs": [{"name": "", "type": "string"}], "type": "function" }, { "constant": True, "inputs": [], "name": "decimals", "outputs": [{"name": "", "type": "uint8"}], "type": "function" }, { "constant": True, "inputs": [{"name": "_owner", "type": "address"}, {"name": "_spender", "type": "address"}], "name": "allowance", "outputs": [{"name": "", "type": "uint256"}], "type": "function" }, { "constant": False, "inputs": [{"name": "_to", "type": "address"}, {"name": "_value", "type": "uint256"}], "name": "transfer", "outputs": [{"name": "", "type": "bool"}], "type": "function" } ] # Transfer event ABI for querying token transfers TRANSFER_EVENT_ABI = [ { "anonymous": False, "inputs": [ {"indexed": True, "name": "from", "type": "address"}, {"indexed": True, "name": "to", "type": "address"}, {"indexed": False, "name": "value", "type": "uint256"} ], "name": "Transfer", "type": "event" } ] class PublicHausTokenImporter: """Importer for Public Haus members by querying token holders""" def __init__(self): """Initialize the importer""" # Initialize database self.db = DatabaseConnector() # Initialize Web3 optimism_rpc_url = os.getenv("OPTIMISM_RPC_URL") if not optimism_rpc_url: raise ValueError("OPTIMISM_RPC_URL environment variable not set") self.web3 = Web3(Web3.HTTPProvider(optimism_rpc_url)) if not self.web3.is_connected(): raise ValueError("Failed to connect to Optimism RPC") logger.info(f"Connected to Optimism: {self.web3.is_connected()}") # Initialize token contract self.shares_token = self.web3.eth.contract( address=self.web3.to_checksum_address(SHARES_TOKEN_ADDRESS), abi=ERC20_ABI ) # Register data source self.data_source_id = self.register_data_source() # Initialize scraping job self.job_id = self.db.create_scraping_job( source_name="Public Haus Token Holders", status="running" ) logger.info(f"Created scraping job with ID: {self.job_id}") def register_data_source(self) -> str: """Register the Public Haus data source in the database""" return self.db.upsert_data_source( name="Public Haus DAO Token Holders", source_type="blockchain", description="Public Haus DAO members identified by token holdings" ) def get_token_info(self) -> Dict[str, Any]: """ Get information about the shares token Returns: Token information """ try: name = self.shares_token.functions.name().call() symbol = self.shares_token.functions.symbol().call() decimals = self.shares_token.functions.decimals().call() total_supply = self.shares_token.functions.totalSupply().call() token_info = { "address": SHARES_TOKEN_ADDRESS, "name": name, "symbol": symbol, "decimals": decimals, "totalSupply": total_supply } logger.info(f"Token info: {name} ({symbol})") logger.info(f"Total supply: {total_supply / (10 ** decimals):.2f} {symbol}") return token_info except Exception as e: logger.error(f"Error getting token info: {e}") raise def fetch_token_holders(self) -> List[Dict[str, Any]]: """ Fetch holders of the shares token by analyzing transfer events Returns: List of token holders with their balances """ try: # Get token info token_info = self.get_token_info() decimals = token_info["decimals"] # Get the latest block number latest_block = self.web3.eth.block_number # Calculate the starting block (approximately 6 months ago) # Optimism has ~1 block every 2 seconds blocks_per_day = 43200 # 86400 seconds / 2 seconds per block start_block = max(0, latest_block - (blocks_per_day * 180)) # 180 days logger.info(f"Fetching Transfer events from block {start_block} to {latest_block}") # Create a contract instance with the Transfer event ABI token_events = self.web3.eth.contract( address=self.web3.to_checksum_address(SHARES_TOKEN_ADDRESS), abi=TRANSFER_EVENT_ABI ) # Get Transfer events transfer_filter = token_events.events.Transfer.create_filter( fromBlock=start_block, toBlock=latest_block ) transfers = transfer_filter.get_all_entries() logger.info(f"Found {len(transfers)} Transfer events") # Track addresses that have received tokens holder_addresses = set() for transfer in transfers: from_address = transfer.args.get('from') to_address = transfer.args.get('to') # Skip zero address (minting/burning) if to_address != '0x0000000000000000000000000000000000000000': holder_addresses.add(to_address) # Check current balances for all potential holders holders = [] for address in holder_addresses: try: balance = self.shares_token.functions.balanceOf(address).call() # Only include addresses with non-zero balance if balance > 0: holders.append({ "address": address, "balance": balance, "balanceFormatted": balance / (10 ** decimals), "dao": "Public Haus" }) except Exception as e: logger.error(f"Error checking balance for {address}: {e}") # Sort holders by balance (descending) holders.sort(key=lambda x: x["balance"], reverse=True) logger.info(f"Found {len(holders)} token holders with non-zero balance") return holders except Exception as e: logger.error(f"Error fetching token holders: {e}") raise def process_holder(self, holder: Dict[str, Any]) -> Optional[str]: """ Process a token holder and import into the database Args: holder: Token holder information Returns: Contact ID if successful, None otherwise """ try: # Extract holder information address = holder["address"] balance = holder["balance"] balance_formatted = holder["balanceFormatted"] dao_name = holder["dao"] # Check if contact exists query = 'SELECT id, name, "ensName" FROM "Contact" WHERE "ethereumAddress" ILIKE %(address)s' existing_contacts = self.db.execute_query(query, {"address": address}) contact_id = None if existing_contacts: # Use existing contact contact_id = existing_contacts[0]["id"] logger.info(f"Found existing contact {contact_id} for address {address}") else: # Create new contact contact_data = { "ethereumAddress": address, "name": f"Public Haus Member {address[:8]}", # Default name } contact_id = self.db.upsert_contact(contact_data) logger.info(f"Created new contact {contact_id} for address {address}") # Add DAO membership self.db.execute_update( """ INSERT INTO "DaoMembership" ("contactId", "daoName", "shares", "loot", "delegatingTo") VALUES (%(contact_id)s, %(dao_name)s, %(shares)s, %(loot)s, %(delegating_to)s) ON CONFLICT ("contactId", "daoName") DO UPDATE SET "shares" = %(shares)s, "loot" = %(loot)s, "updatedAt" = NOW() """, { "contact_id": contact_id, "dao_name": dao_name, "shares": balance, # Use token balance as shares "loot": 0, # We don't have loot information "delegating_to": None } ) # Add note about membership note_content = f"Public Haus DAO Member\nShares Token Balance: {balance_formatted}" self.db.add_note_to_contact( contact_id=contact_id, content=note_content, source="Public Haus DAO Token Holders" ) # Add tag for the DAO self.db.add_tag_to_contact( contact_id=contact_id, tag_name=dao_name ) # Link to data source self.db.link_contact_to_data_source(contact_id, self.data_source_id) return contact_id except Exception as e: logger.error(f"Error processing holder {holder.get('address')}: {e}") return None def run(self) -> int: """ Run the importer Returns: Number of holders imported """ try: # Fetch token holders holders = self.fetch_token_holders() if not holders: logger.info("No token holders found") self.db.update_scraping_job(self.job_id, "completed") return 0 # Process holders imported_count = 0 existing_count = 0 for holder in holders: try: contact_id = self.process_holder(holder) if contact_id: imported_count += 1 except Exception as e: logger.exception(f"Error processing holder {holder.get('address')}: {e}") # Add a small delay to avoid overwhelming the database time.sleep(0.1) # Complete the scraping job self.db.update_scraping_job( self.job_id, "completed", records_processed=len(holders), records_added=imported_count, records_updated=existing_count ) logger.info(f"Imported {imported_count} holders out of {len(holders)} processed") return imported_count except Exception as e: # Update the scraping job with error self.db.update_scraping_job(self.job_id, "failed", error_message=str(e)) logger.exception(f"Error importing holders: {e}") raise def main(): """Main function""" try: importer = PublicHausTokenImporter() imported_count = importer.run() logger.info(f"Import completed successfully. Imported {imported_count} token holders.") return 0 except Exception as e: logger.exception(f"Error importing token holders: {e}") return 1 if __name__ == "__main__": sys.exit(main())