Source code for datashard.metadata_manager

"""
Metadata management for the Python Iceberg implementation
"""

import threading
from datetime import datetime
from typing import TYPE_CHECKING, Any, Dict, List, Optional

from .data_structures import HistoryEntry, Schema, Snapshot, TableMetadata

if TYPE_CHECKING:
    from .storage_backend import StorageBackend


[docs] class ConcurrentModificationException(Exception): """Exception thrown when concurrent modifications are detected""" pass
class MetadataManager: """Manages table metadata persistence and updates""" def __init__(self, table_path: str, storage: "StorageBackend"): self.table_path = table_path self.storage = storage self.metadata_path = "metadata" # Relative to table_path self.current_version = 0 self._lock = threading.RLock() # For thread safety # Distributed lock for multi-process/multi-host safety # PHASE 2: Added distributed locking (FileLock for local, S3 Lock for cloud) self.lock_provider = self.storage.create_lock(".locks/metadata.lock", timeout=30.0) # Ensure metadata directory exists self.storage.makedirs(self.metadata_path, exist_ok=True) def initialize_table(self, metadata: TableMetadata) -> TableMetadata: """Initialize a new table with the given metadata""" with self._lock: # Set initial values if metadata.current_snapshot_id is None: metadata.current_snapshot_id = -1 # No snapshot initially metadata.last_updated_ms = int(datetime.now().timestamp() * 1000) # Write the metadata file metadata_file = f"v{self.current_version}.metadata.json" metadata_path = f"{self.metadata_path}/{metadata_file}" self._write_metadata_file(metadata_path, metadata) # Create version hint file self._write_version_hint(self.current_version) return metadata def refresh(self) -> Optional[TableMetadata]: """Refresh metadata from the latest version""" with self._lock: version = self._read_version_hint() if version is None: return None metadata_file = f"v{version}.metadata.json" metadata_path = f"{self.metadata_path}/{metadata_file}" if not self.storage.exists(metadata_path): return None return self._read_metadata_file(metadata_path) def commit(self, base_metadata: TableMetadata, new_metadata: TableMetadata) -> TableMetadata: """Commit new metadata with Optimistic Concurrency Control following Iceberg pattern. CRITICAL RACE CONDITION FIX: - Hold lock for entire operation (check + increment + write) - PHASE 2: Use file-based locking for multi-process safety - Write metadata file BEFORE updating version hint (two-phase commit) """ # Acquire thread lock for thread safety within same process # This also protects the non-thread-safe FileLock instance from concurrent access with self._lock: # PHASE 2: Acquire distributed lock for multi-process safety self.lock_provider.acquire() try: # PHASE 1: Validation (inside lock to prevent races) current = self.refresh() # Check UUID consistency if current and current.table_uuid != base_metadata.table_uuid: raise ValueError("Table UUID mismatch - concurrent modification detected") # The key OCC check: verify that the metadata hasn't changed since the caller read it if current and current.current_snapshot_id != base_metadata.current_snapshot_id: raise ConcurrentModificationException( f"Cannot commit metadata: concurrent modification detected. " f"Expected current_snapshot_id: {base_metadata.current_snapshot_id}, " f"but found: {current.current_snapshot_id}" ) if current and current.last_updated_ms != base_metadata.last_updated_ms: raise ConcurrentModificationException( f"Cannot commit metadata: concurrent modification detected. " f"Expected last_updated_ms: {base_metadata.last_updated_ms}, " f"but found: {current.last_updated_ms}" ) # PHASE 2: Prepare new version new_metadata.last_updated_ms = int(datetime.now().timestamp() * 1000) # Read current version from filesystem for multi-process safety # CRITICAL: This must be inside the lock to prevent race on version increment filesystem_version = self._read_version_hint() if filesystem_version is None: filesystem_version = 0 next_version = filesystem_version + 1 # PHASE 3: Write new metadata file (but don't make it visible yet) metadata_file = f"v{next_version}.metadata.json" metadata_path = f"{self.metadata_path}/{metadata_file}" self._write_metadata_file(metadata_path, new_metadata) # PHASE 4: Atomically make new version visible # This is the commit point - after this, the new metadata is visible # If we crash before this, the new metadata file is orphaned but table is consistent self._write_version_hint(next_version) # Success - update in-memory version self.current_version = next_version return new_metadata finally: # PHASE 2: Always release lock self.lock_provider.release() def get_snapshot_by_id(self, snapshot_id: int) -> Optional[Snapshot]: """Get a specific snapshot by ID""" metadata = self.refresh() if not metadata: return None for snapshot in metadata.snapshots: if snapshot.snapshot_id == snapshot_id: return snapshot return None def get_current_snapshot(self) -> Optional[Snapshot]: """Get the current snapshot""" metadata = self.refresh() if not metadata or metadata.current_snapshot_id is None: return None for snapshot in metadata.snapshots: if snapshot.snapshot_id == metadata.current_snapshot_id: return snapshot return None def get_all_snapshots(self) -> List[Snapshot]: """Get all snapshots""" metadata = self.refresh() if not metadata: return [] return metadata.snapshots def get_snapshot_history(self) -> List[HistoryEntry]: """Get snapshot history""" metadata = self.refresh() if not metadata: return [] return metadata.snapshot_log def _write_metadata_file(self, path: str, metadata: TableMetadata) -> None: """Write metadata to a JSON file""" metadata_dict = self._metadata_to_dict(metadata) self.storage.write_json(path, metadata_dict) def _read_metadata_file(self, path: str) -> TableMetadata: """Read metadata from a JSON file""" metadata_dict = self.storage.read_json(path) return self._dict_to_metadata(metadata_dict) def _metadata_to_dict(self, metadata: TableMetadata) -> Dict[str, Any]: """Convert TableMetadata to dictionary for JSON serialization""" return { "location": metadata.location, "table_uuid": metadata.table_uuid, "format_version": metadata.format_version, "last_sequence_number": metadata.last_sequence_number, "last_updated_ms": metadata.last_updated_ms, "last_column_id": metadata.last_column_id, "schemas": [ { "schema_id": schema.schema_id, "fields": schema.fields, "schema_string": schema.schema_string, } for schema in metadata.schemas ], "current_schema_id": metadata.current_schema_id, "partition_specs": [ { "spec_id": spec.spec_id, "fields": [ { "source_id": field.source_id, "field_id": field.field_id, "name": field.name, "transform": field.transform, } for field in spec.fields ], } for spec in metadata.partition_specs ], "default_spec_id": metadata.default_spec_id, "sort_orders": [ { "order_id": order.order_id, "fields": [ { "source_id": field.source_id, "field_id": field.field_id, "transform": field.transform, "direction": field.direction, } for field in order.fields ], } for order in metadata.sort_orders ], "default_sort_order_id": metadata.default_sort_order_id, "properties": metadata.properties, "current_snapshot_id": metadata.current_snapshot_id, "snapshots": [ { "snapshot_id": snapshot.snapshot_id, "timestamp_ms": snapshot.timestamp_ms, "manifest_list": snapshot.manifest_list, "parent_snapshot_id": snapshot.parent_snapshot_id, "operation": snapshot.operation, "summary": snapshot.summary, "schema_id": snapshot.schema_id, } for snapshot in metadata.snapshots ], "snapshot_log": [ {"timestamp_ms": entry.timestamp_ms, "snapshot_id": entry.snapshot_id} for entry in metadata.snapshot_log ], "metadata_log": metadata.metadata_log, } def _dict_to_metadata(self, metadata_dict: Dict[str, Any]) -> TableMetadata: """Convert dictionary back to TableMetadata""" from .data_structures import ( HistoryEntry as HistoryEntryStruct, PartitionField, PartitionSpec, Snapshot as SnapshotStruct, SortField, SortOrder, ) # Reconstruct schemas schemas = [ Schema( schema_id=schema_dict["schema_id"], fields=schema_dict["fields"], schema_string=schema_dict.get("schema_string", ""), ) for schema_dict in metadata_dict["schemas"] ] # Reconstruct partition specs partition_specs = [] for spec_dict in metadata_dict["partition_specs"]: fields = [ PartitionField( source_id=field_dict["source_id"], field_id=field_dict["field_id"], name=field_dict["name"], transform=field_dict["transform"], ) for field_dict in spec_dict["fields"] ] partition_specs.append(PartitionSpec(spec_id=spec_dict["spec_id"], fields=fields)) # Reconstruct sort orders sort_orders = [] for order_dict in metadata_dict["sort_orders"]: sort_fields = [ SortField( source_id=field_dict["source_id"], field_id=field_dict["field_id"], transform=field_dict["transform"], direction=field_dict["direction"], ) for field_dict in order_dict["fields"] ] sort_orders.append(SortOrder(order_id=order_dict["order_id"], fields=sort_fields)) # Reconstruct snapshots snapshots = [ SnapshotStruct( snapshot_id=snapshot_dict["snapshot_id"], timestamp_ms=snapshot_dict["timestamp_ms"], manifest_list=snapshot_dict["manifest_list"], parent_snapshot_id=snapshot_dict.get("parent_snapshot_id"), operation=snapshot_dict.get("operation"), summary=snapshot_dict.get("summary", {}), schema_id=snapshot_dict.get("schema_id"), ) for snapshot_dict in metadata_dict["snapshots"] ] # Reconstruct history snapshot_log = [ HistoryEntryStruct( timestamp_ms=entry_dict["timestamp_ms"], snapshot_id=entry_dict["snapshot_id"] ) for entry_dict in metadata_dict["snapshot_log"] ] return TableMetadata( location=metadata_dict["location"], table_uuid=metadata_dict["table_uuid"], format_version=metadata_dict["format_version"], last_sequence_number=metadata_dict["last_sequence_number"], last_updated_ms=metadata_dict["last_updated_ms"], last_column_id=metadata_dict["last_column_id"], schemas=schemas, current_schema_id=metadata_dict["current_schema_id"], partition_specs=partition_specs, default_spec_id=metadata_dict["default_spec_id"], sort_orders=sort_orders, default_sort_order_id=metadata_dict["default_sort_order_id"], properties=metadata_dict["properties"], current_snapshot_id=metadata_dict["current_snapshot_id"], snapshots=snapshots, snapshot_log=snapshot_log, metadata_log=metadata_dict["metadata_log"], ) def _write_version_hint(self, version: int) -> None: """Write the current version number to a hint file""" hint_path = "metadata.version-hint.text" content = str(version).encode("utf-8") self.storage.write_file(hint_path, content) def _read_version_hint(self) -> Optional[int]: """Read the current version number from the hint file""" hint_path = "metadata.version-hint.text" if not self.storage.exists(hint_path): return None content = self.storage.read_file(hint_path).decode("utf-8").strip() return int(content) if content.isdigit() else None