"""
Abstract HDFS Configuration
Configurable settings for HDFS operations - no hard-coded project dependencies
"""
import os
import pathlib
from dataclasses import dataclass
from typing import Optional, Callable
@dataclass
[docs]
class HDFSConfig:
"""Configuration class for HDFS operations"""
[docs]
data_path: Optional[str] = None
[docs]
hdfs_base_directory: str = '/data/'
[docs]
cache_directory: Optional[str] = None
[docs]
app_name: str = 'SparkDistributedProcessing'
[docs]
spark_log_level: str = 'WARN'
[docs]
enable_sedona: bool = True
[docs]
num_executors: Optional[int] = None
[docs]
executor_cores: Optional[int] = None
[docs]
executor_memory: str = '2g'
[docs]
network_timeout: str = '800s'
[docs]
heartbeat_interval: str = '60s'
[docs]
hdfs_copy_timeout: int = 300
[docs]
force_sync: bool = False
[docs]
log_info_func: Optional[Callable[[str], None]] = None
[docs]
log_error_func: Optional[Callable[[str], None]] = None
[docs]
hash_func: Optional[Callable[[str], str]] = None
[docs]
quick_signature_func: Optional[Callable[[str], str]] = None
[docs]
def __post_init__(self):
"""Set up defaults after initialization"""
if self.cache_directory is None:
self.cache_directory = str(pathlib.Path.home() /
'.spark_hdfs_cache')
if self.num_executors is None:
self.num_executors = int(os.environ.get(
'SPARK_EXECUTOR_INSTANCES', '4'))
if self.executor_cores is None:
self.executor_cores = int(os.environ.get('SPARK_EXECUTOR_CORES',
'2'))
if 'SPARK_EXECUTOR_MEMORY' in os.environ:
self.executor_memory = os.environ['SPARK_EXECUTOR_MEMORY']
if self.log_info_func is None:
self.log_info_func = lambda msg: print(f'INFO: {msg}')
if self.log_error_func is None:
self.log_error_func = lambda msg: print(f'ERROR: {msg}')
[docs]
def get_cache_path(self, filename: str) ->pathlib.Path:
"""Get path for cache files"""
return pathlib.Path(self.cache_directory) / filename
[docs]
def get_optimal_partitions(self) ->int:
"""Calculate optimal partitions for I/O heavy workloads"""
return self.num_executors * self.executor_cores * 3
[docs]
def log_info(self, message: str):
"""Log info message using configured function"""
if self.log_info_func:
self.log_info_func(message)
[docs]
def log_error(self, message: str):
"""Log error message using configured function"""
if self.log_error_func:
self.log_error_func(message)
[docs]
def create_hdfs_config(**kwargs) ->HDFSConfig:
"""Factory function to create HDFS configuration"""
return HDFSConfig(**kwargs)
[docs]
def create_local_config(data_path: str, **kwargs) ->HDFSConfig:
"""Create config optimized for local development"""
defaults = {'data_path': data_path, 'num_executors': 2,
'executor_cores': 1, 'executor_memory': '1g', 'enable_sedona':
False, 'spark_log_level': 'WARN'}
defaults.update(kwargs)
return HDFSConfig(**defaults)
[docs]
def create_cluster_config(data_path: str, **kwargs) ->HDFSConfig:
"""Create config optimized for cluster deployment"""
defaults = {'data_path': data_path, 'num_executors': 8,
'executor_cores': 4, 'executor_memory': '4g', 'enable_sedona': True,
'spark_log_level': 'WARN'}
defaults.update(kwargs)
return HDFSConfig(**defaults)
[docs]
def create_geocoding_config(data_path: str, **kwargs) ->HDFSConfig:
"""Create config optimized for geocoding workloads"""
defaults = {'data_path': data_path, 'app_name': 'GeocodingPipeline',
'num_executors': 4, 'executor_cores': 2, 'executor_memory': '2g',
'enable_sedona': True, 'network_timeout': '1200s',
'spark_log_level': 'WARN'}
defaults.update(kwargs)
return HDFSConfig(**defaults)