Distributed Computing

Distributed computing utilities for PySpark, HDFS, and cluster operations.

Distributed functions package initialization with enhanced auto-discovery. This package contains functions to help with distributed functions in Spark.

siege_utilities.distributed.check_hdfs_status()[source]

Check if HDFS is accessible

siege_utilities.distributed.get_quick_file_signature(file_path)[source]

“”” Perform file operations: get quick file signature.

Part of Siege Utilities File Operations module. Auto-discovered and available at package level.

Returns:

Description needed

Example

>>> import siege_utilities
>>> result = siege_utilities.get_quick_file_signature()
>>> print(result)

Note

This function is auto-discovered and available without imports across all siege_utilities modules.

“””

siege_utilities.distributed.log_error(msg)[source]

“”” Log a message using the error level.

Part of Siege Utilities Logging module. Auto-discovered and available at package level.

Returns:

Description needed

Example

>>> import siege_utilities
>>> result = siege_utilities.log_error()
>>> print(result)

Note

This function is auto-discovered and available without imports across all siege_utilities modules.

“””

siege_utilities.distributed.log_info(msg)[source]

“”” Log a message using the info level.

Part of Siege Utilities Logging module. Auto-discovered and available at package level.

Returns:

Description needed

Example

>>> import siege_utilities
>>> result = siege_utilities.log_info()
>>> print(result)

Note

This function is auto-discovered and available without imports across all siege_utilities modules.

“””

siege_utilities.distributed.create_cluster_config(data_path: str, **kwargs) HDFSConfig[source]

Create config optimized for cluster deployment

siege_utilities.distributed.create_geocoding_config(data_path: str, **kwargs) HDFSConfig[source]

Create config optimized for geocoding workloads

siege_utilities.distributed.create_hdfs_config(**kwargs) HDFSConfig[source]

Factory function to create HDFS configuration

siege_utilities.distributed.create_local_config(data_path: str, **kwargs) HDFSConfig[source]

Create config optimized for local development

siege_utilities.distributed.dataclass(cls=None, /, *, init=True, repr=True, eq=True, order=False, unsafe_hash=False, frozen=False, match_args=True, kw_only=False, slots=False, weakref_slot=False)[source]

Add dunder methods based on the fields defined in the class.

Examines PEP 526 __annotations__ to determine fields.

If init is true, an __init__() method is added to the class. If repr is true, a __repr__() method is added. If order is true, rich comparison dunder methods are added. If unsafe_hash is true, a __hash__() method is added. If frozen is true, fields may not be assigned to after instance creation. If match_args is true, the __match_args__ tuple is added. If kw_only is true, then by default all fields are keyword-only. If slots is true, a new class with a __slots__ attribute is returned.

siege_utilities.distributed.create_hdfs_operations(config)[source]

Factory function to create HDFS operations instance

siege_utilities.distributed.setup_distributed_environment(config, data_path: str | None = None, dependency_paths: List[str] | None = None)[source]

Convenience function to set up distributed environment

HDFS Configuration

Abstract HDFS Configuration Configurable settings for HDFS operations - no hard-coded project dependencies

class siege_utilities.distributed.hdfs_config.HDFSConfig(data_path: str | None = None, hdfs_base_directory: str = '/data/', cache_directory: str | None = None, app_name: str = 'SparkDistributedProcessing', spark_log_level: str = 'WARN', enable_sedona: bool = True, num_executors: int | None = None, executor_cores: int | None = None, executor_memory: str = '2g', network_timeout: str = '800s', heartbeat_interval: str = '60s', hdfs_timeout: int = 10, hdfs_copy_timeout: int = 300, force_sync: bool = False, log_info_func: Callable[[str], None] | None = None, log_error_func: Callable[[str], None] | None = None, hash_func: Callable[[str], str] | None = None, quick_signature_func: Callable[[str], str] | None = None)[source]

Bases: object

Configuration class for HDFS operations

data_path: str | None = None[source]
hdfs_base_directory: str = '/data/'[source]
cache_directory: str | None = None[source]
app_name: str = 'SparkDistributedProcessing'[source]
spark_log_level: str = 'WARN'[source]
enable_sedona: bool = True[source]
num_executors: int | None = None[source]
executor_cores: int | None = None[source]
executor_memory: str = '2g'[source]
network_timeout: str = '800s'[source]
heartbeat_interval: str = '60s'[source]
hdfs_timeout: int = 10[source]
hdfs_copy_timeout: int = 300[source]
force_sync: bool = False[source]
log_info_func: Callable[[str], None] | None = None[source]
log_error_func: Callable[[str], None] | None = None[source]
hash_func: Callable[[str], str] | None = None[source]
quick_signature_func: Callable[[str], str] | None = None[source]
get_cache_path(filename: str) Path[source]

Get path for cache files

get_optimal_partitions() int[source]

Calculate optimal partitions for I/O heavy workloads

log_info(message: str)[source]

Log info message using configured function

log_error(message: str)[source]

Log error message using configured function

siege_utilities.distributed.hdfs_config.create_hdfs_config(**kwargs) HDFSConfig[source]

Factory function to create HDFS configuration

siege_utilities.distributed.hdfs_config.create_local_config(data_path: str, **kwargs) HDFSConfig[source]

Create config optimized for local development

siege_utilities.distributed.hdfs_config.create_cluster_config(data_path: str, **kwargs) HDFSConfig[source]

Create config optimized for cluster deployment

siege_utilities.distributed.hdfs_config.create_geocoding_config(data_path: str, **kwargs) HDFSConfig[source]

Create config optimized for geocoding workloads

HDFS Operations

Abstract HDFS Operations - Fully Configurable and Reusable Zero hard-coded project dependencies

class siege_utilities.distributed.hdfs_operations.AbstractHDFSOperations(config)[source]

Bases: object

Abstract HDFS Operations class that can be configured for any project

check_hdfs_status() bool[source]

Check if HDFS is accessible

create_spark_session()[source]

Create Spark session using configuration

sync_directory_to_hdfs(local_path: str | None = None, hdfs_subdir: str = 'inputs') Tuple[str | None, Dict | None][source]

Sync local directory/file to HDFS with proper verification

setup_distributed_environment(data_path: str | None = None, dependency_paths: List[str] | None = None)[source]

Main setup function with proper verification

siege_utilities.distributed.hdfs_operations.setup_distributed_environment(config, data_path: str | None = None, dependency_paths: List[str] | None = None)[source]

Convenience function to set up distributed environment

siege_utilities.distributed.hdfs_operations.create_hdfs_operations(config)[source]

Factory function to create HDFS operations instance

Spark Utils