iceberg Module API

This section documents the main API functions for creating and loading tables.

Main Functions

Main module for the Python Iceberg implementation Provides the primary API for working with Iceberg tables

class datashard.iceberg.Table(table_path: str, create_if_not_exists: bool = True)[source]

Bases: object

Main table interface with transaction support

__init__(table_path: str, create_if_not_exists: bool = True)[source]
new_transaction() Transaction[source]

Create a new transaction

current_snapshot() Snapshot | None[source]

Get the current snapshot

snapshot_by_id(snapshot_id: int) Snapshot | None[source]

Get a specific snapshot by ID

snapshots() List[Dict[str, Any]][source]

Get all snapshots

time_travel(snapshot_id: int | None = None, timestamp: int | None = None) Any[source]

Time travel to a specific snapshot or timestamp

append_data(files: List[DataFile]) bool[source]

Append data files to the table (convenience method)

append_pandas(df: Any, schema: Schema | None = None) bool[source]

Append pandas DataFrame to table (convenience method)

append_records(records: List[Dict[str, Any]], schema: Schema | None = None, partition_values: Dict[str, Any] | None = None) bool[source]

Append actual data records to the table by creating new data files (convenience method)

refresh() bool[source]

Refresh the table metadata from storage

garbage_collect(grace_period_ms: int = 3600000) Dict[str, int][source]

Delete orphaned files not referenced by any snapshot.

Parameters:

grace_period_ms – Only delete orphaned files older than this age (default 1 hour). This protects against deleting files currently being written.

Returns:

Dict with counts of deleted files by type.

row_count() int[source]

Get total row count from parquet metadata without scanning data.

This is a fast O(manifest_files) operation that reads only metadata, not the actual parquet data files. Use this for count-only queries instead of len(table.scan()).

Returns:

Total number of rows across all data files in current snapshot.

scan(columns: List[str] | None = None, filter: Dict[str, Any] | None = None, parallel: bool | int = False) List[Dict[str, Any]][source]

Scan all data in the table and return as list of records.

This method reads all parquet files in the data directory, providing a complete view of all data across all snapshots.

Parameters:
  • columns – Optional list of column names to read. If None, reads all columns.

  • filter

    Optional filter dict for predicate pushdown. .. rubric:: Examples

    {“status”: “failed”} # status == “failed” {“age”: (“>”, 18)} # age > 18 {“id”: (“in”, [1, 2, 3])} # id in [1, 2, 3] {“ts”: (“between”, (t1, t2))} # t1 <= ts <= t2

  • parallel – Enable parallel reading. - False: Sequential reading (default) - True: Use all CPU cores - int: Use specified number of threads

Returns:

List of dictionaries, each representing a record.

to_pandas(columns: List[str] | None = None, filter: Dict[str, Any] | None = None, parallel: bool | int = False) Any[source]

Read all data from the table as a pandas DataFrame.

This method reads all parquet files in the data directory, providing a complete view of all data across all snapshots.

Parameters:
  • columns – Optional list of column names to read. If None, reads all columns.

  • filter

    Optional filter dict for predicate pushdown. .. rubric:: Examples

    {“status”: “failed”} # status == “failed” {“age”: (“>”, 18)} # age > 18 {“id”: (“in”, [1, 2, 3])} # id in [1, 2, 3] {“ts”: (“between”, (t1, t2))} # t1 <= ts <= t2

  • parallel – Enable parallel reading. - False: Sequential reading (default) - True: Use all CPU cores - int: Use specified number of threads

Returns:

pandas DataFrame with all records.

Raises:

ImportError – If pandas is not installed.

scan_batches(batch_size: int = 10000, columns: List[str] | None = None, filter: Dict[str, Any] | None = None) Iterator[List[Dict[str, Any]]][source]

Scan data in batches for memory-efficient processing.

Yields batches of records, processing one parquet file at a time using PyArrow’s iter_batches for memory efficiency.

Parameters:
  • batch_size – Approximate number of records per batch

  • columns – Optional column projection

  • filter – Optional predicate pushdown filter

Yields:

List of records (dicts) per batch

iter_records(columns: List[str] | None = None, filter: Dict[str, Any] | None = None) Iterator[Dict[str, Any]][source]

Iterate over records one at a time.

Memory efficient - only one record in memory at a time. Ideal for row-by-row processing of large tables.

Parameters:
  • columns – Optional column projection

  • filter – Optional predicate pushdown filter

Yields:

Individual records as dicts

iter_pandas(chunksize: int = 50000, columns: List[str] | None = None, filter: Dict[str, Any] | None = None) Iterator[Any][source]

Iterate over data as pandas DataFrame chunks.

Memory efficient - only one chunk in memory at a time. Ideal for processing large tables with pandas operations.

Parameters:
  • chunksize – Approximate rows per chunk

  • columns – Optional column projection

  • filter – Optional predicate pushdown filter

Yields:

pandas DataFrame chunks

Raises:

ImportError – If pandas is not installed.

class datashard.iceberg.Transaction(metadata_manager: MetadataManager, snapshot_manager: SnapshotManager, file_manager: FileManager)[source]

Bases: object

Represents a database transaction with ACID properties

__init__(metadata_manager: MetadataManager, snapshot_manager: SnapshotManager, file_manager: FileManager)[source]
begin() Transaction[source]

Start a new transaction

is_active() bool[source]

Check if transaction is active

append_files(files: List[DataFile]) Transaction[source]

Queue files to append to the table

append_pandas(df: Any, schema: Schema | None = None) Transaction[source]

Append a pandas DataFrame to the table.

Parameters:
  • df – pandas DataFrame to append

  • schema – Optional Schema. If None, uses the table’s current schema.

Returns:

Self for chaining

append_data(records: List[Dict[str, Any]], schema: Schema | None = None, partition_values: Dict[str, Any] | None = None) Transaction[source]

Append actual data records to the table by creating new data files.

CRITICAL FIX: Track written files for cleanup on rollback.

delete_files(file_paths: List[str]) Transaction[source]

Queue files to delete from the table

overwrite_by_filter(filter_func: Callable[[Any], bool]) Transaction[source]

Queue an overwrite operation by filter

expire_snapshots(older_than_ms: int) Transaction[source]

Queue snapshot expiration operation

commit() bool[source]

Commit the transaction with ACID properties using Optimistic Concurrency Control

rollback() bool[source]

Rollback the transaction

datashard.iceberg.create_table(table_path: str, schema: Schema | None = None, partition_spec: PartitionSpec | None = None) Table[source]

Create a new Iceberg table

Parameters:
  • table_path – Path where the table should be stored

  • schema – Optional schema for the table

  • partition_spec – Optional partition spec for the table

Returns:

Table instance

datashard.iceberg.load_table(table_path: str) Table[source]

Load an existing Iceberg table

Parameters:

table_path – Path to the existing table

Returns:

Table instance

class datashard.iceberg.DataFile(file_path: str, file_format: FileFormat, partition_values: Dict[str, Any], record_count: int, file_size_in_bytes: int, column_sizes: Dict[int, int] | None = None, value_counts: Dict[int, int] | None = None, null_value_counts: Dict[int, int] | None = None, lower_bounds: Dict[int, Any] | None = None, upper_bounds: Dict[int, Any] | None = None, key_metadata: bytes | None = None, checksum: str | None = None, split_offsets: List[int] | None = None, split_compressed_offsets: List[int] | None = None, equality_ids: List[int] | None = None, sort_order_id: int | None = None)[source]

Bases: object

Represents a data file in the table

file_path: str
file_format: FileFormat
partition_values: Dict[str, Any]
record_count: int
file_size_in_bytes: int
column_sizes: Dict[int, int] | None = None
value_counts: Dict[int, int] | None = None
null_value_counts: Dict[int, int] | None = None
lower_bounds: Dict[int, Any] | None = None
upper_bounds: Dict[int, Any] | None = None
key_metadata: bytes | None = None
checksum: str | None = None
split_offsets: List[int] | None = None
split_compressed_offsets: List[int] | None = None
equality_ids: List[int] | None = None
sort_order_id: int | None = None
__init__(file_path: str, file_format: FileFormat, partition_values: Dict[str, Any], record_count: int, file_size_in_bytes: int, column_sizes: Dict[int, int] | None = None, value_counts: Dict[int, int] | None = None, null_value_counts: Dict[int, int] | None = None, lower_bounds: Dict[int, Any] | None = None, upper_bounds: Dict[int, Any] | None = None, key_metadata: bytes | None = None, checksum: str | None = None, split_offsets: List[int] | None = None, split_compressed_offsets: List[int] | None = None, equality_ids: List[int] | None = None, sort_order_id: int | None = None) None
class datashard.iceberg.FileFormat(value)[source]

Bases: Enum

Supported file formats

PARQUET = 'parquet'
AVRO = 'avro'
ORC = 'orc'

Classes

Table

class datashard.Table(table_path: str, create_if_not_exists: bool = True)[source]

Bases: object

Main table interface with transaction support

__init__(table_path: str, create_if_not_exists: bool = True)[source]
new_transaction() Transaction[source]

Create a new transaction

current_snapshot() Snapshot | None[source]

Get the current snapshot

snapshot_by_id(snapshot_id: int) Snapshot | None[source]

Get a specific snapshot by ID

snapshots() List[Dict[str, Any]][source]

Get all snapshots

time_travel(snapshot_id: int | None = None, timestamp: int | None = None) Any[source]

Time travel to a specific snapshot or timestamp

append_data(files: List[DataFile]) bool[source]

Append data files to the table (convenience method)

append_pandas(df: Any, schema: Schema | None = None) bool[source]

Append pandas DataFrame to table (convenience method)

append_records(records: List[Dict[str, Any]], schema: Schema | None = None, partition_values: Dict[str, Any] | None = None) bool[source]

Append actual data records to the table by creating new data files (convenience method)

refresh() bool[source]

Refresh the table metadata from storage

garbage_collect(grace_period_ms: int = 3600000) Dict[str, int][source]

Delete orphaned files not referenced by any snapshot.

Parameters:

grace_period_ms – Only delete orphaned files older than this age (default 1 hour). This protects against deleting files currently being written.

Returns:

Dict with counts of deleted files by type.

row_count() int[source]

Get total row count from parquet metadata without scanning data.

This is a fast O(manifest_files) operation that reads only metadata, not the actual parquet data files. Use this for count-only queries instead of len(table.scan()).

Returns:

Total number of rows across all data files in current snapshot.

scan(columns: List[str] | None = None, filter: Dict[str, Any] | None = None, parallel: bool | int = False) List[Dict[str, Any]][source]

Scan all data in the table and return as list of records.

This method reads all parquet files in the data directory, providing a complete view of all data across all snapshots.

Parameters:
  • columns – Optional list of column names to read. If None, reads all columns.

  • filter

    Optional filter dict for predicate pushdown. .. rubric:: Examples

    {“status”: “failed”} # status == “failed” {“age”: (“>”, 18)} # age > 18 {“id”: (“in”, [1, 2, 3])} # id in [1, 2, 3] {“ts”: (“between”, (t1, t2))} # t1 <= ts <= t2

  • parallel – Enable parallel reading. - False: Sequential reading (default) - True: Use all CPU cores - int: Use specified number of threads

Returns:

List of dictionaries, each representing a record.

to_pandas(columns: List[str] | None = None, filter: Dict[str, Any] | None = None, parallel: bool | int = False) Any[source]

Read all data from the table as a pandas DataFrame.

This method reads all parquet files in the data directory, providing a complete view of all data across all snapshots.

Parameters:
  • columns – Optional list of column names to read. If None, reads all columns.

  • filter

    Optional filter dict for predicate pushdown. .. rubric:: Examples

    {“status”: “failed”} # status == “failed” {“age”: (“>”, 18)} # age > 18 {“id”: (“in”, [1, 2, 3])} # id in [1, 2, 3] {“ts”: (“between”, (t1, t2))} # t1 <= ts <= t2

  • parallel – Enable parallel reading. - False: Sequential reading (default) - True: Use all CPU cores - int: Use specified number of threads

Returns:

pandas DataFrame with all records.

Raises:

ImportError – If pandas is not installed.

scan_batches(batch_size: int = 10000, columns: List[str] | None = None, filter: Dict[str, Any] | None = None) Iterator[List[Dict[str, Any]]][source]

Scan data in batches for memory-efficient processing.

Yields batches of records, processing one parquet file at a time using PyArrow’s iter_batches for memory efficiency.

Parameters:
  • batch_size – Approximate number of records per batch

  • columns – Optional column projection

  • filter – Optional predicate pushdown filter

Yields:

List of records (dicts) per batch

iter_records(columns: List[str] | None = None, filter: Dict[str, Any] | None = None) Iterator[Dict[str, Any]][source]

Iterate over records one at a time.

Memory efficient - only one record in memory at a time. Ideal for row-by-row processing of large tables.

Parameters:
  • columns – Optional column projection

  • filter – Optional predicate pushdown filter

Yields:

Individual records as dicts

iter_pandas(chunksize: int = 50000, columns: List[str] | None = None, filter: Dict[str, Any] | None = None) Iterator[Any][source]

Iterate over data as pandas DataFrame chunks.

Memory efficient - only one chunk in memory at a time. Ideal for processing large tables with pandas operations.

Parameters:
  • chunksize – Approximate rows per chunk

  • columns – Optional column projection

  • filter – Optional predicate pushdown filter

Yields:

pandas DataFrame chunks

Raises:

ImportError – If pandas is not installed.

Transaction

class datashard.Transaction(metadata_manager: MetadataManager, snapshot_manager: SnapshotManager, file_manager: FileManager)[source]

Bases: object

Represents a database transaction with ACID properties

__init__(metadata_manager: MetadataManager, snapshot_manager: SnapshotManager, file_manager: FileManager)[source]
begin() Transaction[source]

Start a new transaction

is_active() bool[source]

Check if transaction is active

append_files(files: List[DataFile]) Transaction[source]

Queue files to append to the table

append_pandas(df: Any, schema: Schema | None = None) Transaction[source]

Append a pandas DataFrame to the table.

Parameters:
  • df – pandas DataFrame to append

  • schema – Optional Schema. If None, uses the table’s current schema.

Returns:

Self for chaining

append_data(records: List[Dict[str, Any]], schema: Schema | None = None, partition_values: Dict[str, Any] | None = None) Transaction[source]

Append actual data records to the table by creating new data files.

CRITICAL FIX: Track written files for cleanup on rollback.

delete_files(file_paths: List[str]) Transaction[source]

Queue files to delete from the table

overwrite_by_filter(filter_func: Callable[[Any], bool]) Transaction[source]

Queue an overwrite operation by filter

expire_snapshots(older_than_ms: int) Transaction[source]

Queue snapshot expiration operation

commit() bool[source]

Commit the transaction with ACID properties using Optimistic Concurrency Control

rollback() bool[source]

Rollback the transaction

DataFile and FileFormat

class datashard.DataFile(file_path: str, file_format: FileFormat, partition_values: Dict[str, Any], record_count: int, file_size_in_bytes: int, column_sizes: Dict[int, int] | None = None, value_counts: Dict[int, int] | None = None, null_value_counts: Dict[int, int] | None = None, lower_bounds: Dict[int, Any] | None = None, upper_bounds: Dict[int, Any] | None = None, key_metadata: bytes | None = None, checksum: str | None = None, split_offsets: List[int] | None = None, split_compressed_offsets: List[int] | None = None, equality_ids: List[int] | None = None, sort_order_id: int | None = None)[source]

Bases: object

Represents a data file in the table

file_path: str
file_format: FileFormat
partition_values: Dict[str, Any]
record_count: int
file_size_in_bytes: int
column_sizes: Dict[int, int] | None = None
value_counts: Dict[int, int] | None = None
null_value_counts: Dict[int, int] | None = None
lower_bounds: Dict[int, Any] | None = None
upper_bounds: Dict[int, Any] | None = None
key_metadata: bytes | None = None
checksum: str | None = None
split_offsets: List[int] | None = None
split_compressed_offsets: List[int] | None = None
equality_ids: List[int] | None = None
sort_order_id: int | None = None
__init__(file_path: str, file_format: FileFormat, partition_values: Dict[str, Any], record_count: int, file_size_in_bytes: int, column_sizes: Dict[int, int] | None = None, value_counts: Dict[int, int] | None = None, null_value_counts: Dict[int, int] | None = None, lower_bounds: Dict[int, Any] | None = None, upper_bounds: Dict[int, Any] | None = None, key_metadata: bytes | None = None, checksum: str | None = None, split_offsets: List[int] | None = None, split_compressed_offsets: List[int] | None = None, equality_ids: List[int] | None = None, sort_order_id: int | None = None) None
class datashard.FileFormat(value)[source]

Bases: Enum

Supported file formats

PARQUET = 'parquet'
AVRO = 'avro'
ORC = 'orc'