"""
Abstract HDFS Configuration
Configurable settings for HDFS operations - no hard-coded project dependencies
"""
import os
import pathlib
from dataclasses import dataclass
from typing import Optional, Callable
from siege_utilities.core.logging import get_logger, log_info, log_warning, log_error, log_debug
[docs]
@dataclass
class HDFSConfig:
"""Configuration class for HDFS and Spark operations.
Supports local, standalone cluster, and YARN deployments.
Attributes:
master: Spark master URL ('local[*]', 'yarn', 'spark://host:7077')
deploy_mode: Deployment mode for YARN ('client' or 'cluster')
yarn_queue: YARN queue name for resource allocation
driver_memory: Memory allocated to driver process
"""
data_path: Optional[str] = None
hdfs_base_directory: str = '/data/'
cache_directory: Optional[str] = None
app_name: str = 'SparkDistributedProcessing'
spark_log_level: str = 'WARN'
# Spark master configuration
master: str = 'local[*]'
deploy_mode: str = 'client' # 'client' or 'cluster'
yarn_queue: Optional[str] = None
# Sedona configuration
enable_sedona: bool = True
sedona_global_index_type: str = 'rtree' # 'rtree' or 'quadtree'
sedona_join_broadcast_threshold: int = 10 * 1024 * 1024 # 10MB
# Resource allocation
num_executors: Optional[int] = None
executor_cores: Optional[int] = None
executor_memory: str = '2g'
driver_memory: str = '2g'
driver_cores: int = 1
# Timeouts and networking
network_timeout: str = '800s'
heartbeat_interval: str = '60s'
hdfs_timeout: int = 10
hdfs_copy_timeout: int = 300
# Sync behavior
force_sync: bool = False
# Logging and hashing functions
log_info_func: Optional[Callable[[str], None]] = None
log_error_func: Optional[Callable[[str], None]] = None
hash_func: Optional[Callable[[str], str]] = None
quick_signature_func: Optional[Callable[[str], str]] = None
[docs]
def __post_init__(self):
"""Set up defaults after initialization"""
if self.cache_directory is None:
self.cache_directory = str(pathlib.Path.home() /
'.spark_hdfs_cache')
if self.num_executors is None:
self.num_executors = int(os.environ.get(
'SPARK_EXECUTOR_INSTANCES', '4'))
if self.executor_cores is None:
self.executor_cores = int(os.environ.get('SPARK_EXECUTOR_CORES',
'2'))
if 'SPARK_EXECUTOR_MEMORY' in os.environ:
self.executor_memory = os.environ['SPARK_EXECUTOR_MEMORY']
if 'SPARK_DRIVER_MEMORY' in os.environ:
self.driver_memory = os.environ['SPARK_DRIVER_MEMORY']
if 'SPARK_MASTER' in os.environ:
self.master = os.environ['SPARK_MASTER']
if 'YARN_QUEUE' in os.environ:
self.yarn_queue = os.environ['YARN_QUEUE']
if self.log_info_func is None:
self.log_info_func = lambda msg: log_info(msg)
if self.log_error_func is None:
self.log_error_func = lambda msg: log_error(msg)
[docs]
def get_cache_path(self, filename: str) ->pathlib.Path:
"""Get path for cache files"""
return pathlib.Path(self.cache_directory) / filename
[docs]
def get_optimal_partitions(self) ->int:
"""Calculate optimal partitions for I/O heavy workloads"""
return self.num_executors * self.executor_cores * 3
@property
def is_yarn(self) -> bool:
"""Check if this config targets a YARN cluster."""
return self.master.lower() == 'yarn'
@property
def is_local(self) -> bool:
"""Check if this config targets local mode."""
return self.master.lower().startswith('local')
[docs]
def log_info(self, message: str):
"""Log info message using configured function"""
if self.log_info_func:
self.log_info_func(message)
[docs]
def log_error(self, message: str):
"""Log error message using configured function"""
if self.log_error_func:
self.log_error_func(message)
[docs]
def create_hdfs_config(**kwargs) ->HDFSConfig:
"""Factory function to create HDFS configuration"""
return HDFSConfig(**kwargs)
[docs]
def create_local_config(data_path: str, **kwargs) ->HDFSConfig:
"""Create config optimized for local development"""
defaults = {'data_path': data_path, 'num_executors': 2,
'executor_cores': 1, 'executor_memory': '1g', 'enable_sedona':
False, 'spark_log_level': 'WARN'}
defaults.update(kwargs)
return HDFSConfig(**defaults)
[docs]
def create_cluster_config(data_path: str, **kwargs) ->HDFSConfig:
"""Create config optimized for cluster deployment"""
defaults = {'data_path': data_path, 'num_executors': 8,
'executor_cores': 4, 'executor_memory': '4g', 'enable_sedona': True,
'spark_log_level': 'WARN'}
defaults.update(kwargs)
return HDFSConfig(**defaults)
[docs]
def create_geocoding_config(data_path: str, **kwargs) ->HDFSConfig:
"""Create config optimized for geocoding workloads"""
defaults = {'data_path': data_path, 'app_name': 'GeocodingPipeline',
'num_executors': 4, 'executor_cores': 2, 'executor_memory': '2g',
'enable_sedona': True, 'network_timeout': '1200s',
'spark_log_level': 'WARN'}
defaults.update(kwargs)
return HDFSConfig(**defaults)
def create_yarn_config(data_path: str, **kwargs) -> HDFSConfig:
"""Create config optimized for YARN cluster deployment.
This configuration is designed for production YARN clusters with:
- YARN as the resource manager
- Client deploy mode (driver runs locally)
- Higher resource allocation for executors
- Sedona enabled for spatial operations
Args:
data_path: Path to data (local or HDFS)
**kwargs: Override any default settings
Returns:
HDFSConfig configured for YARN deployment
Example:
>>> config = create_yarn_config('/data/census', yarn_queue='analytics')
>>> spark, path, _ = setup_distributed_environment(config)
"""
defaults = {
'data_path': data_path,
'app_name': 'SiegeAnalyticsYARN',
'master': 'yarn',
'deploy_mode': 'client',
'num_executors': 10,
'executor_cores': 4,
'executor_memory': '8g',
'driver_memory': '4g',
'driver_cores': 2,
'enable_sedona': True,
'sedona_global_index_type': 'rtree',
'network_timeout': '600s',
'heartbeat_interval': '60s',
'spark_log_level': 'WARN',
}
defaults.update(kwargs)
return HDFSConfig(**defaults)
def create_census_analysis_config(data_path: str, **kwargs) -> HDFSConfig:
"""Create config optimized for Census longitudinal analysis.
Designed for processing large Census datasets with:
- Sedona for spatial operations (boundary crosswalks)
- Optimized partitioning for Census tract-level data
- Memory settings for time-series joins
Args:
data_path: Path to Census data
**kwargs: Override any default settings
Returns:
HDFSConfig optimized for Census analysis
Example:
>>> config = create_census_analysis_config('/data/census/acs')
>>> spark, path, _ = setup_distributed_environment(config)
"""
defaults = {
'data_path': data_path,
'app_name': 'CensusLongitudinalAnalysis',
'num_executors': 6,
'executor_cores': 4,
'executor_memory': '4g',
'driver_memory': '4g',
'enable_sedona': True,
'sedona_global_index_type': 'rtree',
'sedona_join_broadcast_threshold': 50 * 1024 * 1024, # 50MB for Census shapes
'network_timeout': '900s',
'spark_log_level': 'WARN',
}
defaults.update(kwargs)
return HDFSConfig(**defaults)