Source code for siege_utilities.distributed.hdfs_config

"""
Abstract HDFS Configuration
Configurable settings for HDFS operations - no hard-coded project dependencies
"""
import os
import pathlib
from dataclasses import dataclass
from typing import Optional, Callable


@dataclass
[docs] class HDFSConfig: """Configuration class for HDFS operations"""
[docs] data_path: Optional[str] = None
[docs] hdfs_base_directory: str = '/data/'
[docs] cache_directory: Optional[str] = None
[docs] app_name: str = 'SparkDistributedProcessing'
[docs] spark_log_level: str = 'WARN'
[docs] enable_sedona: bool = True
[docs] num_executors: Optional[int] = None
[docs] executor_cores: Optional[int] = None
[docs] executor_memory: str = '2g'
[docs] network_timeout: str = '800s'
[docs] heartbeat_interval: str = '60s'
[docs] hdfs_timeout: int = 10
[docs] hdfs_copy_timeout: int = 300
[docs] force_sync: bool = False
[docs] log_info_func: Optional[Callable[[str], None]] = None
[docs] log_error_func: Optional[Callable[[str], None]] = None
[docs] hash_func: Optional[Callable[[str], str]] = None
[docs] quick_signature_func: Optional[Callable[[str], str]] = None
[docs] def __post_init__(self): """Set up defaults after initialization""" if self.cache_directory is None: self.cache_directory = str(pathlib.Path.home() / '.spark_hdfs_cache') if self.num_executors is None: self.num_executors = int(os.environ.get( 'SPARK_EXECUTOR_INSTANCES', '4')) if self.executor_cores is None: self.executor_cores = int(os.environ.get('SPARK_EXECUTOR_CORES', '2')) if 'SPARK_EXECUTOR_MEMORY' in os.environ: self.executor_memory = os.environ['SPARK_EXECUTOR_MEMORY'] if self.log_info_func is None: self.log_info_func = lambda msg: print(f'INFO: {msg}') if self.log_error_func is None: self.log_error_func = lambda msg: print(f'ERROR: {msg}')
[docs] def get_cache_path(self, filename: str) ->pathlib.Path: """Get path for cache files""" return pathlib.Path(self.cache_directory) / filename
[docs] def get_optimal_partitions(self) ->int: """Calculate optimal partitions for I/O heavy workloads""" return self.num_executors * self.executor_cores * 3
[docs] def log_info(self, message: str): """Log info message using configured function""" if self.log_info_func: self.log_info_func(message)
[docs] def log_error(self, message: str): """Log error message using configured function""" if self.log_error_func: self.log_error_func(message)
[docs] def create_hdfs_config(**kwargs) ->HDFSConfig: """Factory function to create HDFS configuration""" return HDFSConfig(**kwargs)
[docs] def create_local_config(data_path: str, **kwargs) ->HDFSConfig: """Create config optimized for local development""" defaults = {'data_path': data_path, 'num_executors': 2, 'executor_cores': 1, 'executor_memory': '1g', 'enable_sedona': False, 'spark_log_level': 'WARN'} defaults.update(kwargs) return HDFSConfig(**defaults)
[docs] def create_cluster_config(data_path: str, **kwargs) ->HDFSConfig: """Create config optimized for cluster deployment""" defaults = {'data_path': data_path, 'num_executors': 8, 'executor_cores': 4, 'executor_memory': '4g', 'enable_sedona': True, 'spark_log_level': 'WARN'} defaults.update(kwargs) return HDFSConfig(**defaults)
[docs] def create_geocoding_config(data_path: str, **kwargs) ->HDFSConfig: """Create config optimized for geocoding workloads""" defaults = {'data_path': data_path, 'app_name': 'GeocodingPipeline', 'num_executors': 4, 'executor_cores': 2, 'executor_memory': '2g', 'enable_sedona': True, 'network_timeout': '1200s', 'spark_log_level': 'WARN'} defaults.update(kwargs) return HDFSConfig(**defaults)