iceberg Module API
This section documents the main API functions for creating and loading tables.
Main Functions
Main module for the Python Iceberg implementation Provides the primary API for working with Iceberg tables
- class datashard.iceberg.Table(table_path: str, create_if_not_exists: bool = True)[source]
Bases:
objectMain table interface with transaction support
- new_transaction() Transaction[source]
Create a new transaction
- time_travel(snapshot_id: int | None = None, timestamp: int | None = None) Any[source]
Time travel to a specific snapshot or timestamp
- append_data(files: List[DataFile]) bool[source]
Append data files to the table (convenience method)
- append_pandas(df: Any, schema: Schema | None = None) bool[source]
Append pandas DataFrame to table (convenience method)
- append_records(records: List[Dict[str, Any]], schema: Schema | None = None, partition_values: Dict[str, Any] | None = None) bool[source]
Append actual data records to the table by creating new data files (convenience method)
- garbage_collect(grace_period_ms: int = 3600000) Dict[str, int][source]
Delete orphaned files not referenced by any snapshot.
- Parameters:
grace_period_ms – Only delete orphaned files older than this age (default 1 hour). This protects against deleting files currently being written.
- Returns:
Dict with counts of deleted files by type.
- row_count() int[source]
Get total row count from parquet metadata without scanning data.
This is a fast O(manifest_files) operation that reads only metadata, not the actual parquet data files. Use this for count-only queries instead of len(table.scan()).
- Returns:
Total number of rows across all data files in current snapshot.
- scan(columns: List[str] | None = None, filter: Dict[str, Any] | None = None, parallel: bool | int = False) List[Dict[str, Any]][source]
Scan all data in the table and return as list of records.
This method reads all parquet files in the data directory, providing a complete view of all data across all snapshots.
- Parameters:
columns – Optional list of column names to read. If None, reads all columns.
filter –
Optional filter dict for predicate pushdown. .. rubric:: Examples
{“status”: “failed”} # status == “failed” {“age”: (“>”, 18)} # age > 18 {“id”: (“in”, [1, 2, 3])} # id in [1, 2, 3] {“ts”: (“between”, (t1, t2))} # t1 <= ts <= t2
parallel – Enable parallel reading. - False: Sequential reading (default) - True: Use all CPU cores - int: Use specified number of threads
- Returns:
List of dictionaries, each representing a record.
- to_pandas(columns: List[str] | None = None, filter: Dict[str, Any] | None = None, parallel: bool | int = False) Any[source]
Read all data from the table as a pandas DataFrame.
This method reads all parquet files in the data directory, providing a complete view of all data across all snapshots.
- Parameters:
columns – Optional list of column names to read. If None, reads all columns.
filter –
Optional filter dict for predicate pushdown. .. rubric:: Examples
{“status”: “failed”} # status == “failed” {“age”: (“>”, 18)} # age > 18 {“id”: (“in”, [1, 2, 3])} # id in [1, 2, 3] {“ts”: (“between”, (t1, t2))} # t1 <= ts <= t2
parallel – Enable parallel reading. - False: Sequential reading (default) - True: Use all CPU cores - int: Use specified number of threads
- Returns:
pandas DataFrame with all records.
- Raises:
ImportError – If pandas is not installed.
- scan_batches(batch_size: int = 10000, columns: List[str] | None = None, filter: Dict[str, Any] | None = None) Iterator[List[Dict[str, Any]]][source]
Scan data in batches for memory-efficient processing.
Yields batches of records, processing one parquet file at a time using PyArrow’s iter_batches for memory efficiency.
- Parameters:
batch_size – Approximate number of records per batch
columns – Optional column projection
filter – Optional predicate pushdown filter
- Yields:
List of records (dicts) per batch
- iter_records(columns: List[str] | None = None, filter: Dict[str, Any] | None = None) Iterator[Dict[str, Any]][source]
Iterate over records one at a time.
Memory efficient - only one record in memory at a time. Ideal for row-by-row processing of large tables.
- Parameters:
columns – Optional column projection
filter – Optional predicate pushdown filter
- Yields:
Individual records as dicts
- iter_pandas(chunksize: int = 50000, columns: List[str] | None = None, filter: Dict[str, Any] | None = None) Iterator[Any][source]
Iterate over data as pandas DataFrame chunks.
Memory efficient - only one chunk in memory at a time. Ideal for processing large tables with pandas operations.
- Parameters:
chunksize – Approximate rows per chunk
columns – Optional column projection
filter – Optional predicate pushdown filter
- Yields:
pandas DataFrame chunks
- Raises:
ImportError – If pandas is not installed.
- class datashard.iceberg.Transaction(metadata_manager: MetadataManager, snapshot_manager: SnapshotManager, file_manager: FileManager)[source]
Bases:
objectRepresents a database transaction with ACID properties
- __init__(metadata_manager: MetadataManager, snapshot_manager: SnapshotManager, file_manager: FileManager)[source]
- begin() Transaction[source]
Start a new transaction
- append_files(files: List[DataFile]) Transaction[source]
Queue files to append to the table
- append_pandas(df: Any, schema: Schema | None = None) Transaction[source]
Append a pandas DataFrame to the table.
- Parameters:
df – pandas DataFrame to append
schema – Optional Schema. If None, uses the table’s current schema.
- Returns:
Self for chaining
- append_data(records: List[Dict[str, Any]], schema: Schema | None = None, partition_values: Dict[str, Any] | None = None) Transaction[source]
Append actual data records to the table by creating new data files.
CRITICAL FIX: Track written files for cleanup on rollback.
- delete_files(file_paths: List[str]) Transaction[source]
Queue files to delete from the table
- overwrite_by_filter(filter_func: Callable[[Any], bool]) Transaction[source]
Queue an overwrite operation by filter
- expire_snapshots(older_than_ms: int) Transaction[source]
Queue snapshot expiration operation
- datashard.iceberg.create_table(table_path: str, schema: Schema | None = None, partition_spec: PartitionSpec | None = None) Table[source]
Create a new Iceberg table
- Parameters:
table_path – Path where the table should be stored
schema – Optional schema for the table
partition_spec – Optional partition spec for the table
- Returns:
Table instance
- datashard.iceberg.load_table(table_path: str) Table[source]
Load an existing Iceberg table
- Parameters:
table_path – Path to the existing table
- Returns:
Table instance
- class datashard.iceberg.DataFile(file_path: str, file_format: FileFormat, partition_values: Dict[str, Any], record_count: int, file_size_in_bytes: int, column_sizes: Dict[int, int] | None = None, value_counts: Dict[int, int] | None = None, null_value_counts: Dict[int, int] | None = None, lower_bounds: Dict[int, Any] | None = None, upper_bounds: Dict[int, Any] | None = None, key_metadata: bytes | None = None, checksum: str | None = None, split_offsets: List[int] | None = None, split_compressed_offsets: List[int] | None = None, equality_ids: List[int] | None = None, sort_order_id: int | None = None)[source]
Bases:
objectRepresents a data file in the table
- file_format: FileFormat
- __init__(file_path: str, file_format: FileFormat, partition_values: Dict[str, Any], record_count: int, file_size_in_bytes: int, column_sizes: Dict[int, int] | None = None, value_counts: Dict[int, int] | None = None, null_value_counts: Dict[int, int] | None = None, lower_bounds: Dict[int, Any] | None = None, upper_bounds: Dict[int, Any] | None = None, key_metadata: bytes | None = None, checksum: str | None = None, split_offsets: List[int] | None = None, split_compressed_offsets: List[int] | None = None, equality_ids: List[int] | None = None, sort_order_id: int | None = None) None
Classes
Table
- class datashard.Table(table_path: str, create_if_not_exists: bool = True)[source]
Bases:
objectMain table interface with transaction support
- new_transaction() Transaction[source]
Create a new transaction
- time_travel(snapshot_id: int | None = None, timestamp: int | None = None) Any[source]
Time travel to a specific snapshot or timestamp
- append_data(files: List[DataFile]) bool[source]
Append data files to the table (convenience method)
- append_pandas(df: Any, schema: Schema | None = None) bool[source]
Append pandas DataFrame to table (convenience method)
- append_records(records: List[Dict[str, Any]], schema: Schema | None = None, partition_values: Dict[str, Any] | None = None) bool[source]
Append actual data records to the table by creating new data files (convenience method)
- garbage_collect(grace_period_ms: int = 3600000) Dict[str, int][source]
Delete orphaned files not referenced by any snapshot.
- Parameters:
grace_period_ms – Only delete orphaned files older than this age (default 1 hour). This protects against deleting files currently being written.
- Returns:
Dict with counts of deleted files by type.
- row_count() int[source]
Get total row count from parquet metadata without scanning data.
This is a fast O(manifest_files) operation that reads only metadata, not the actual parquet data files. Use this for count-only queries instead of len(table.scan()).
- Returns:
Total number of rows across all data files in current snapshot.
- scan(columns: List[str] | None = None, filter: Dict[str, Any] | None = None, parallel: bool | int = False) List[Dict[str, Any]][source]
Scan all data in the table and return as list of records.
This method reads all parquet files in the data directory, providing a complete view of all data across all snapshots.
- Parameters:
columns – Optional list of column names to read. If None, reads all columns.
filter –
Optional filter dict for predicate pushdown. .. rubric:: Examples
{“status”: “failed”} # status == “failed” {“age”: (“>”, 18)} # age > 18 {“id”: (“in”, [1, 2, 3])} # id in [1, 2, 3] {“ts”: (“between”, (t1, t2))} # t1 <= ts <= t2
parallel – Enable parallel reading. - False: Sequential reading (default) - True: Use all CPU cores - int: Use specified number of threads
- Returns:
List of dictionaries, each representing a record.
- to_pandas(columns: List[str] | None = None, filter: Dict[str, Any] | None = None, parallel: bool | int = False) Any[source]
Read all data from the table as a pandas DataFrame.
This method reads all parquet files in the data directory, providing a complete view of all data across all snapshots.
- Parameters:
columns – Optional list of column names to read. If None, reads all columns.
filter –
Optional filter dict for predicate pushdown. .. rubric:: Examples
{“status”: “failed”} # status == “failed” {“age”: (“>”, 18)} # age > 18 {“id”: (“in”, [1, 2, 3])} # id in [1, 2, 3] {“ts”: (“between”, (t1, t2))} # t1 <= ts <= t2
parallel – Enable parallel reading. - False: Sequential reading (default) - True: Use all CPU cores - int: Use specified number of threads
- Returns:
pandas DataFrame with all records.
- Raises:
ImportError – If pandas is not installed.
- scan_batches(batch_size: int = 10000, columns: List[str] | None = None, filter: Dict[str, Any] | None = None) Iterator[List[Dict[str, Any]]][source]
Scan data in batches for memory-efficient processing.
Yields batches of records, processing one parquet file at a time using PyArrow’s iter_batches for memory efficiency.
- Parameters:
batch_size – Approximate number of records per batch
columns – Optional column projection
filter – Optional predicate pushdown filter
- Yields:
List of records (dicts) per batch
- iter_records(columns: List[str] | None = None, filter: Dict[str, Any] | None = None) Iterator[Dict[str, Any]][source]
Iterate over records one at a time.
Memory efficient - only one record in memory at a time. Ideal for row-by-row processing of large tables.
- Parameters:
columns – Optional column projection
filter – Optional predicate pushdown filter
- Yields:
Individual records as dicts
- iter_pandas(chunksize: int = 50000, columns: List[str] | None = None, filter: Dict[str, Any] | None = None) Iterator[Any][source]
Iterate over data as pandas DataFrame chunks.
Memory efficient - only one chunk in memory at a time. Ideal for processing large tables with pandas operations.
- Parameters:
chunksize – Approximate rows per chunk
columns – Optional column projection
filter – Optional predicate pushdown filter
- Yields:
pandas DataFrame chunks
- Raises:
ImportError – If pandas is not installed.
Transaction
- class datashard.Transaction(metadata_manager: MetadataManager, snapshot_manager: SnapshotManager, file_manager: FileManager)[source]
Bases:
objectRepresents a database transaction with ACID properties
- __init__(metadata_manager: MetadataManager, snapshot_manager: SnapshotManager, file_manager: FileManager)[source]
- begin() Transaction[source]
Start a new transaction
- append_files(files: List[DataFile]) Transaction[source]
Queue files to append to the table
- append_pandas(df: Any, schema: Schema | None = None) Transaction[source]
Append a pandas DataFrame to the table.
- Parameters:
df – pandas DataFrame to append
schema – Optional Schema. If None, uses the table’s current schema.
- Returns:
Self for chaining
- append_data(records: List[Dict[str, Any]], schema: Schema | None = None, partition_values: Dict[str, Any] | None = None) Transaction[source]
Append actual data records to the table by creating new data files.
CRITICAL FIX: Track written files for cleanup on rollback.
- delete_files(file_paths: List[str]) Transaction[source]
Queue files to delete from the table
- overwrite_by_filter(filter_func: Callable[[Any], bool]) Transaction[source]
Queue an overwrite operation by filter
- expire_snapshots(older_than_ms: int) Transaction[source]
Queue snapshot expiration operation
DataFile and FileFormat
- class datashard.DataFile(file_path: str, file_format: FileFormat, partition_values: Dict[str, Any], record_count: int, file_size_in_bytes: int, column_sizes: Dict[int, int] | None = None, value_counts: Dict[int, int] | None = None, null_value_counts: Dict[int, int] | None = None, lower_bounds: Dict[int, Any] | None = None, upper_bounds: Dict[int, Any] | None = None, key_metadata: bytes | None = None, checksum: str | None = None, split_offsets: List[int] | None = None, split_compressed_offsets: List[int] | None = None, equality_ids: List[int] | None = None, sort_order_id: int | None = None)[source]
Bases:
objectRepresents a data file in the table
- file_format: FileFormat
- __init__(file_path: str, file_format: FileFormat, partition_values: Dict[str, Any], record_count: int, file_size_in_bytes: int, column_sizes: Dict[int, int] | None = None, value_counts: Dict[int, int] | None = None, null_value_counts: Dict[int, int] | None = None, lower_bounds: Dict[int, Any] | None = None, upper_bounds: Dict[int, Any] | None = None, key_metadata: bytes | None = None, checksum: str | None = None, split_offsets: List[int] | None = None, split_compressed_offsets: List[int] | None = None, equality_ids: List[int] | None = None, sort_order_id: int | None = None) None