Source code for siege_utilities.geo.providers.census_geocoder

"""
Census Bureau Geocoder integration with historical vintage support.

Wraps the Census Bureau's geocoding API for US address geocoding. Returns
FIPS codes directly (state, county, tract, block) so most addresses don't
need a spatial join. Supports historical vintages for temporal accuracy.

Usage:
    from siege_utilities.geo import (
        CensusVintage, select_vintage_for_cycle,
        geocode_single, geocode_batch, CensusGeocodeResult,
    )

    # Single address
    result = geocode_single("1600 Pennsylvania Ave", "Washington", "DC", "20500")

    # Batch (up to 10,000)
    results = geocode_batch(addresses, vintage=CensusVintage.CENSUS_2020)

    # Auto-select vintage for an FEC cycle year
    vintage = select_vintage_for_cycle(2018)  # -> CensusVintage.CENSUS_2020
"""

import csv
import io
import logging
import tempfile
from dataclasses import dataclass
from enum import Enum
from typing import Optional

logger = logging.getLogger(__name__)


def log_warning(message: str) -> None:
    logger.warning(message)


def log_info(message: str) -> None:
    logger.info(message)


def log_debug(message: str) -> None:
    logger.debug(message)


def log_error(message: str) -> None:
    logger.error(message)


[docs] class CensusGeocodeError(RuntimeError): """Raised when the Census geocoder API call fails unexpectedly. Distinct from "no match" results (which return a CensusGeocodeResult with matched=False). This exception indicates an API / network / parse failure where the geocoder could not even attempt to match. Use `__cause__` to inspect the underlying exception. """
class CensusVintage(str, Enum): """Census geocoder benchmark/vintage pairs. Each value is the benchmark string accepted by the Census Geocoder API. The vintage determines which geographic boundaries are used for matching. """ CENSUS_2010 = "Census2010_Census2010" CENSUS_2020 = "Census2020_Census2020" CURRENT = "Public_Current" ACS_2022 = "Public_ACS2022" ACS_2023 = "Public_ACS2023" @property def benchmark(self) -> str: """Extract the benchmark portion (before underscore).""" return self.value.split("_")[0] @property def vintage(self) -> str: """Extract the vintage portion (after underscore).""" parts = self.value.split("_", 1) return parts[1] if len(parts) > 1 else parts[0] # Year boundaries for vintage selection _VINTAGE_THRESHOLDS = [ (2020, CensusVintage.CURRENT), (2012, CensusVintage.CENSUS_2020), (1980, CensusVintage.CENSUS_2010), ] def select_vintage_for_cycle(year: int) -> CensusVintage: """Select the appropriate Census vintage for an FEC cycle year. Args: year: The FEC election cycle year. Returns: CensusVintage matching the boundaries in effect for that cycle. Examples: >>> select_vintage_for_cycle(2024) <CensusVintage.CURRENT: 'Public_Current'> >>> select_vintage_for_cycle(2016) <CensusVintage.CENSUS_2020: 'Census2020_Census2020'> >>> select_vintage_for_cycle(2008) <CensusVintage.CENSUS_2010: 'Census2010_Census2010'> """ for threshold, vintage in _VINTAGE_THRESHOLDS: if year >= threshold: return vintage return CensusVintage.CENSUS_2010 @dataclass class CensusGeocodeResult: """Result from Census Bureau geocoding. Attributes: matched: Whether the address was matched. input_address: The original input address string. matched_address: The standardized matched address (if matched). lat: Latitude (if matched). lon: Longitude (if matched). state_fips: 2-digit state FIPS code. county_fips: 3-digit county FIPS code. tract: 6-digit tract code. block: 4-digit block code. match_type: "Exact", "Non_Exact", or "No_Match". side: Street side ("L" or "R", from TIGER). tiger_line_id: TIGER/Line feature ID. """ matched: bool = False input_address: str = "" matched_address: str = "" lat: Optional[float] = None lon: Optional[float] = None state_fips: str = "" county_fips: str = "" tract: str = "" block: str = "" match_type: str = "No_Match" side: str = "" tiger_line_id: str = "" # Populated by id if provided in batch input input_id: str = "" @property def state_geoid(self) -> str: """2-digit state GEOID.""" return self.state_fips if self.state_fips else "" @property def county_geoid(self) -> str: """5-digit county GEOID (state + county FIPS).""" if self.state_fips and self.county_fips: return f"{self.state_fips}{self.county_fips}" return "" @property def tract_geoid(self) -> str: """11-digit tract GEOID (state + county + tract).""" if self.state_fips and self.county_fips and self.tract: return f"{self.state_fips}{self.county_fips}{self.tract}" return "" @property def block_geoid(self) -> str: """15-digit block GEOID (state + county + tract + block).""" if self.state_fips and self.county_fips and self.tract and self.block: return f"{self.state_fips}{self.county_fips}{self.tract}{self.block}" return "" @property def block_group_geoid(self) -> str: """12-digit block group GEOID (block GEOID truncated to 12 chars).""" bg = self.block_geoid return bg[:12] if len(bg) >= 12 else "" def _get_geocoder(vintage: CensusVintage = CensusVintage.CURRENT): """Get a censusgeocode.CensusGeocode instance with the specified vintage.""" try: import censusgeocode except ImportError: raise ImportError( "censusgeocode is required for Census geocoding. " "Install it with: pip install 'siege-utilities[geo]' or pip install censusgeocode" ) return censusgeocode.CensusGeocode( benchmark=vintage.benchmark, vintage=vintage.vintage, ) def _parse_single_result(result: dict) -> CensusGeocodeResult: """Parse a single result dict from censusgeocode into a CensusGeocodeResult.""" if not result: return CensusGeocodeResult() matched_address = result.get("matchedAddress", "") coords = result.get("coordinates", {}) geographies = result.get("geographies", {}) tiger = result.get("addressComponents", {}) # Census Blocks is the most detailed geography; extract FIPS from there blocks = geographies.get("Census Blocks", geographies.get("2020 Census Blocks", [])) if not blocks: # Try other geography keys for key in geographies: if "block" in key.lower() or "tract" in key.lower(): blocks = geographies[key] break geo_data = blocks[0] if blocks else {} return CensusGeocodeResult( matched=True, matched_address=matched_address, lat=coords.get("y"), lon=coords.get("x"), state_fips=geo_data.get("STATE", ""), county_fips=geo_data.get("COUNTY", ""), tract=geo_data.get("TRACT", ""), block=geo_data.get("BLOCK", ""), match_type="Exact", side=tiger.get("side", ""), tiger_line_id=tiger.get("tigerLineId", ""), ) def geocode_single( street: str, city: str, state: str, zipcode: str, vintage: CensusVintage = CensusVintage.CURRENT, ) -> CensusGeocodeResult: """Geocode a single address via the Census Bureau API. Args: street: Street address (e.g., "1600 Pennsylvania Ave NW"). city: City name. state: State abbreviation or name. zipcode: ZIP code. vintage: Census vintage for boundary matching. Returns: CensusGeocodeResult with lat/lon and FIPS codes if matched. """ cg = _get_geocoder(vintage) input_addr = f"{street}, {city}, {state} {zipcode}" try: result = cg.onelineaddress(input_addr, returntype="geographies") matches = result.get("result", {}).get("addressMatches", []) if not matches: log_info(f"No match for: {input_addr}") return CensusGeocodeResult(input_address=input_addr) parsed = _parse_single_result(matches[0]) parsed.input_address = input_addr log_debug(f"Matched: {input_addr} -> {parsed.matched_address}") return parsed except Exception as e: raise CensusGeocodeError( f"Census geocode failed for {input_addr}: {e}" ) from e def geocode_batch( addresses: list[dict], vintage: CensusVintage = CensusVintage.CURRENT, ) -> list[CensusGeocodeResult]: """Geocode a batch of addresses via the Census Bureau batch API. The Census batch API accepts up to 10,000 addresses per request. Each address dict should have keys: id, street, city, state, zipcode. Args: addresses: List of dicts with keys {id, street, city, state, zipcode}. vintage: Census vintage for boundary matching. Returns: List of CensusGeocodeResult, one per input address (order preserved). Raises: ValueError: If batch exceeds 10,000 addresses. """ if len(addresses) > 10_000: raise ValueError( f"Census batch API accepts max 10,000 addresses, got {len(addresses)}. " "Use geocode_batch_chunked() for larger sets." ) if not addresses: return [] cg = _get_geocoder(vintage) # Build CSV for batch submission csv_buffer = io.StringIO() writer = csv.writer(csv_buffer) for addr in addresses: writer.writerow([ addr.get("id", ""), addr.get("street", ""), addr.get("city", ""), addr.get("state", ""), addr.get("zipcode", ""), ]) # Write to temp file for censusgeocode with tempfile.NamedTemporaryFile(mode="w", suffix=".csv", delete=False) as f: f.write(csv_buffer.getvalue()) csv_path = f.name try: result = cg.addressbatch(csv_path, returntype="geographies") except Exception as e: raise CensusGeocodeError( f"Census batch geocode failed for {len(addresses)} addresses: {e}" ) from e # Parse batch results results_by_id = {} if isinstance(result, list): for row in result: row_id = str(row.get("id", "")) matched = row.get("is_match", "").strip().lower() == "match" if matched: parsed = CensusGeocodeResult( matched=True, input_id=row_id, input_address=row.get("address", ""), matched_address=row.get("match_address", ""), lat=_safe_float(row.get("lat")), lon=_safe_float(row.get("lon")), state_fips=row.get("statefp", ""), county_fips=row.get("countyfp", ""), tract=row.get("tract", ""), block=row.get("block", ""), match_type=row.get("match_type", ""), tiger_line_id=row.get("tigerlineid", ""), side=row.get("side", ""), ) else: parsed = CensusGeocodeResult( input_id=row_id, input_address=row.get("address", ""), ) results_by_id[row_id] = parsed # Return in input order output = [] for addr in addresses: addr_id = str(addr.get("id", "")) if addr_id in results_by_id: output.append(results_by_id[addr_id]) else: output.append(CensusGeocodeResult( input_id=addr_id, input_address=f"{addr.get('street', '')}, {addr.get('city', '')}, {addr.get('state', '')} {addr.get('zipcode', '')}", )) matched_count = sum(1 for r in output if r.matched) log_info(f"Census batch: {matched_count}/{len(output)} matched") return output def geocode_batch_chunked( addresses: list[dict], vintage: CensusVintage = CensusVintage.CURRENT, chunk_size: int = 10_000, ) -> list[CensusGeocodeResult]: """Geocode addresses in chunks of up to chunk_size (default 10,000). Convenience wrapper around geocode_batch() for larger datasets. Args: addresses: List of dicts with keys {id, street, city, state, zipcode}. vintage: Census vintage for boundary matching. chunk_size: Max addresses per API call (max 10,000). Returns: List of CensusGeocodeResult, one per input address. """ chunk_size = min(chunk_size, 10_000) results = [] total = len(addresses) for i in range(0, total, chunk_size): chunk = addresses[i:i + chunk_size] chunk_num = (i // chunk_size) + 1 total_chunks = (total + chunk_size - 1) // chunk_size log_info(f"Census batch chunk {chunk_num}/{total_chunks} ({len(chunk)} addresses)") results.extend(geocode_batch(chunk, vintage=vintage)) return results def _safe_float(val) -> Optional[float]: """Safely convert a value to float, returning None on failure.""" if val is None: return None try: return float(val) except (ValueError, TypeError): return None