Source code for sadrive.helpers.utils

"""
This module provides helper functions and utilities for the CLI application.
It uses service account storage for Google Drive operations and offers:

- Configuration directory handling
- Path construction for accounts and database
- Rclone configuration initialization
- Human-readable byte and time formatting
- List partitioning
- Generator wrapper
- Service account selection by free space
- Directory tree mapping
- File size measurement
- Filename shortening

Constants:
- CONFIG_POINTER: Path to the file storing the config directory pointer
- MAGIC_SIZE: Total capacity threshold for service accounts (in bytes)
- BUFFER: Buffer size threshold (in bytes)
- MAX_THREADS: Maximum number of threads permitted
"""
from pathlib import Path
import time
import os
from math import ceil
from typing import List, TypeVar,Any,Union, Dict,TypedDict
import json

T = TypeVar('T')

DirTree = Dict[str, Union['DirTree', int]]

[docs] class PartInfo(TypedDict): """ Information about a part of a split file. Attributes: filename: Name of the file part. size: Size of the file part in bytes. """ filename: str size: int
[docs] class Manifest(TypedDict): """ Manifest describing an original file and its parts. Attributes: original_filename: The original filename before splitting. parts: A list of PartInfo dictionaries for each part. """ original_filename: str parts: List[PartInfo]
[docs] class FF: """ Represents a file or folder entry in Google Drive. Attributes: name: The display name of the file or folder. file_id: The unique identifier in Drive. parent_id: The parent folder's identifier. type: The entry type, e.g. 'folder' or 'file'. """ def __init__(self, name: str, file_id: str, parent_id: str, type: str) -> None: """ Initializes an FF object. Args: name: Display name of the file/folder. file_id: Unique identifier in Drive. parent_id: Identifier of the parent folder. type: 'folder' or 'file'. """ self.name = name self.file_id = file_id self.parent_id = parent_id self.type = type def __repr__(self): """ Returns a human-readable representation of the entry. Returns: A formatted string including type, ID, and name. """ if self.type != "": return f"[{self.type.rjust(6)}] [{self.file_id}] {self.name}" return self.name
CONFIG_POINTER = Path.home() / ".sadrive_config_dir" MAGIC_SIZE = 15784004812 BUFFER = 250 * 1024 * 1024 MAX_THREADS = 10 def get_config_dir() -> Path: """ Retrieves the configuration directory path from CONFIG_POINTER. Raises: RuntimeError: If the pointer file is missing or the stored path is invalid. Returns: Path: The directory used for storing application config files. """ if CONFIG_POINTER.exists(): config_dir = Path(CONFIG_POINTER.read_text().strip()) if config_dir.is_dir(): return config_dir else: raise RuntimeError(f"Stored config dir {config_dir} does not exist. Run `sadrive config set-dir <path>`") raise RuntimeError( "No config directory set. Run `sadrive config set-dir <path>` first." )
[docs] def get_accounts_path() -> Path: """ Constructs the path to the 'accounts' subdirectory within the config directory. Returns: Path: Path to the service accounts directory. """ return Path.joinpath(get_config_dir(), "accounts")
[docs] def get_database_path() -> Path: """ Constructs the path to the SQLite database file within the config directory. Returns: Path: Path to the 'database.db' file. """ return Path.joinpath(get_config_dir(), "database.db")
[docs] def get_parent_id() -> str: """ Reads and returns the parent folder ID from config.json. Returns: str: The 'parent_id' value stored in config.json. """ with open(Path.joinpath(get_config_dir(),'config.json')) as f: parent_id:str = json.load(f)['parent_id'] return parent_id
[docs] def get_gclone_exe() -> Path: """ Reads and returns the path to the gclone executable from config.json. Returns: Path: Path to the 'gclone' executable. """ with open(Path.joinpath(get_config_dir(),'config.json')) as f: path:Path = Path(json.load(f)['path_to_gclone.exe']) return path
[docs] def set_rclone_conf(): """ Creates rcone.conf next to gclone executable with default content using the first service account. """ gcpath_parent = get_gclone_exe().parent confpath = gcpath_parent.joinpath('rclone.conf') with open(confpath,'w') as f: f.write(f'''[sadrive] type = drive scope = drive service_account_file = {get_accounts_path().joinpath('0.json')} service_account_file_path = {get_accounts_path()} root_folder_id = {get_parent_id()}''')
[docs] def humanbytes(size: float) -> str: """ Converts a size in bytes to a human-readable string. Args: size: Size in bytes. Returns: str: Formatted size (e.g. '1.234 MiB'). """ if not size: return "" power = 2**10 number = 0 dict_power_n = {0: " ", 1: "K", 2: "M", 3: "G", 4: "T", 5: "P"} while size > power: size /= power number += 1 return str(round(size, 3)) + " " + dict_power_n[number] + "iB"
[docs] def humantime(seconds: int): """ Formats a duration in seconds into HhMmSs or MmSs. Args: seconds: Duration in seconds. Returns: str: Formatted time string. """ if seconds > 3600: return time.strftime("%Hh%Mm%Ss", time.gmtime(seconds)) else: return time.strftime("%Mm%Ss", time.gmtime(seconds))
[docs] def list_into_n_parts(lst: List[T], n: int) -> List[List[T]]: """ Splits a list into n approximately equal parts. Args: lst: List of items to split. n: Number of parts. Returns: List[List[T]]: A list containing n sublists. """ size = ceil(len(lst) / n) return [lst[i * size : i * size + size] for i in range(n)]
[docs] class Generator: """ Wrapper to enable 'yield from' for a generator function. Attributes: gen: The underlying generator. """ def __init__(self, gen:Any): """ Initializes the Generator wrapper. Args: gen: A generator object. """ self.gen = gen def __iter__(self): #type: ignore """ Enables iteration by delegating to the underlying generator. Yields: Any: Values produced by the generator. """ self.value:Any = yield from self.gen
[docs] def get_free_sa(sa_map:List[dict[str,Any]],file_size:int): """ Selects service account IDs with enough free space. Args: sa_map: List of dicts containing '_id' and 'size' keys. file_size: Required file size in bytes. Returns: List[int]: Sorted list of account IDs that can accommodate the file. """ tmp:List[List[int]] = [] for i in sa_map: if MAGIC_SIZE - int(i['size']) >=file_size: tmp.append([int(i['size']),int(i['_id'])]) tmp.sort(key=lambda x:x[0]) ok_sas = [i[1] for i in tmp] return ok_sas
[docs] def get_dir_structure(path: Path) -> DirTree: """ Recursively builds a directory tree mapping folder names to subtrees or file sizes. Args: path: Root directory path. Returns: DirTree: Nested dict mapping names to file sizes or further DirTrees. """ def helper(current_path:Path) -> DirTree: structure:DirTree = {} for entry in os.listdir(current_path): full_path = Path.joinpath(current_path, entry) if os.path.isdir(full_path): structure[entry] = helper(full_path) else: file_size = os.path.getsize(full_path) structure[entry] = file_size return structure return {os.path.basename(path): helper(path)}
[docs] def get_file_size(file_path:Path): """ Returns the size of a file in bytes by seeking to its end. Args: file_path: Path to the target file. Returns: int: File size in bytes. """ with open(file_path, 'rb') as stream_bytes: stream_bytes.seek(0, 2) size = stream_bytes.tell() return size
[docs] def shorten_fn(name: str, max_len: int = 75) -> str: """ Truncates a filename to a maximum length with an ellipsis in the middle. Args: name: Original filename. max_len: Maximum allowed length. Returns: str: Shortened filename if needed, else original. """ if len(name) <= max_len: return name if max_len < 5: return name[:max_len] head_len = max_len - max_len // 2 tail_start = len(name) - (max_len // 2) head = name[:head_len] tail = name[tail_start:] return f"{head}...{tail}"