stones/scripts/nft_holders/nft_holders_scraper.py

216 lines
8.0 KiB
Python

#!/usr/bin/env python3
"""
NFT Holders Scraper
This script fetches all holders of a specific NFT contract and stores their
Ethereum addresses in the database. It also attempts to resolve ENS names
for the addresses.
Usage:
python nft_holders_scraper.py --contract 0x1234... --name "CryptoPunks"
"""
import os
import sys
import argparse
import json
import time
from datetime import datetime
from typing import Dict, List, Optional, Any
import requests
from web3 import Web3
from dotenv import load_dotenv
# Add parent directory to path to import utils
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from utils.db_connector import DatabaseConnector
from utils.ens_resolver import ENSResolver
from utils.logger import setup_logger
# Load environment variables
load_dotenv()
# Setup logging
logger = setup_logger("nft_holders_scraper")
class NFTHoldersScraper:
"""Scraper for NFT holders."""
def __init__(self, contract_address: str, collection_name: str):
"""
Initialize the NFT holders scraper.
Args:
contract_address: Ethereum address of the NFT contract
collection_name: Name of the NFT collection
"""
self.contract_address = Web3.to_checksum_address(contract_address)
self.collection_name = collection_name
self.etherscan_api_key = os.getenv("ETHERSCAN_API_KEY")
self.alchemy_api_key = os.getenv("ALCHEMY_API_KEY")
self.web3 = Web3(Web3.HTTPProvider(f"https://eth-mainnet.g.alchemy.com/v2/{self.alchemy_api_key}"))
self.db = DatabaseConnector()
self.ens_resolver = ENSResolver(self.web3)
# Validate API keys
if not self.etherscan_api_key:
logger.error("ETHERSCAN_API_KEY not found in environment variables")
sys.exit(1)
if not self.alchemy_api_key:
logger.error("ALCHEMY_API_KEY not found in environment variables")
sys.exit(1)
# Register data source
self.register_data_source()
def register_data_source(self) -> None:
"""Register this NFT collection as a data source in the database."""
self.db.upsert_data_source(
name=f"NFT:{self.collection_name}",
source_type="NFT",
description=f"Holders of {self.collection_name} NFT ({self.contract_address})"
)
def get_token_holders(self) -> List[Dict[str, Any]]:
"""
Fetch all token holders for the NFT contract.
Returns:
List of dictionaries containing token ID and holder address
"""
logger.info(f"Fetching token holders for {self.collection_name} ({self.contract_address})")
# Start a scraping job
job_id = self.db.create_scraping_job(
source_name=f"NFT:{self.collection_name}",
status="running"
)
holders = []
try:
# For ERC-721 tokens, we need to get all token IDs first
# This is a simplified approach - in a real implementation, you would need to:
# 1. Get the total supply
# 2. Iterate through token IDs or use a more efficient method
# Using Alchemy NFT API for this example
url = f"https://eth-mainnet.g.alchemy.com/nft/v2/{self.alchemy_api_key}/getOwnersForCollection"
params = {"contractAddress": self.contract_address}
response = requests.get(url, params=params)
if response.status_code != 200:
logger.error(f"Failed to fetch owners: {response.text}")
self.db.update_scraping_job(job_id, "failed", error_message=f"API error: {response.text}")
return []
data = response.json()
# Process owners
records_processed = 0
for owner_data in data.get("ownerAddresses", []):
records_processed += 1
# Get token IDs owned by this address
owner_tokens_url = f"https://eth-mainnet.g.alchemy.com/nft/v2/{self.alchemy_api_key}/getNFTs"
owner_tokens_params = {
"owner": owner_data,
"contractAddresses": [self.contract_address],
"withMetadata": "true"
}
owner_response = requests.get(owner_tokens_url, params=owner_tokens_params)
if owner_response.status_code != 200:
logger.warning(f"Failed to fetch tokens for owner {owner_data}: {owner_response.text}")
continue
owner_tokens = owner_response.json()
for token in owner_tokens.get("ownedNfts", []):
token_id = token.get("id", {}).get("tokenId")
if token_id:
holders.append({
"address": owner_data,
"token_id": token_id,
"collection_name": self.collection_name
})
# Update job with success
self.db.update_scraping_job(
job_id=job_id,
status="completed",
records_processed=records_processed,
records_added=len(holders)
)
except Exception as e:
logger.error(f"Error fetching token holders: {str(e)}")
self.db.update_scraping_job(job_id, "failed", error_message=str(e))
return []
logger.info(f"Found {len(holders)} token holders")
return holders
def process_holders(self, holders: List[Dict[str, Any]]) -> None:
"""
Process the list of holders and store in database.
Args:
holders: List of dictionaries containing token ID and holder address
"""
logger.info(f"Processing {len(holders)} holders")
for holder in holders:
address = Web3.to_checksum_address(holder["address"])
token_id = holder["token_id"]
# Try to resolve ENS name
ens_name = self.ens_resolver.get_ens_name(address)
# Check if the holder has a Warpcast address (this would need to be implemented)
warpcast_address = None
# In a real implementation, you would check for Warpcast addresses here
# Store in database
contact_id = self.db.upsert_contact(
ethereum_address=address,
ens_name=ens_name,
warpcast_address=warpcast_address
)
# Add NFT holding
self.db.add_nft_holding(
contact_id=contact_id,
contract_address=self.contract_address,
token_id=token_id,
collection_name=self.collection_name
)
# If we have an ENS name, try to get additional profile information
if ens_name:
self.ens_resolver.update_contact_from_ens(contact_id, ens_name)
# Rate limiting to avoid API throttling
time.sleep(0.1)
def run(self) -> None:
"""Run the scraper to fetch and process NFT holders."""
holders = self.get_token_holders()
if holders:
self.process_holders(holders)
logger.info("NFT holders scraping completed successfully")
else:
logger.warning("No holders found or error occurred")
def main():
"""Main entry point for the script."""
parser = argparse.ArgumentParser(description="Scrape NFT holders")
parser.add_argument("--contract", required=True, help="NFT contract address")
parser.add_argument("--name", required=True, help="NFT collection name")
args = parser.parse_args()
scraper = NFTHoldersScraper(args.contract, args.name)
scraper.run()
if __name__ == "__main__":
main()