transaction Module API

This section documents the transaction and table management functionality.

Classes

Transaction

class datashard.Transaction(metadata_manager: MetadataManager, snapshot_manager: SnapshotManager, file_manager: FileManager)[source]

Bases: object

Represents a database transaction with ACID properties

__init__(metadata_manager: MetadataManager, snapshot_manager: SnapshotManager, file_manager: FileManager)[source]
begin() Transaction[source]

Start a new transaction

is_active() bool[source]

Check if transaction is active

append_files(files: List[DataFile]) Transaction[source]

Queue files to append to the table

append_pandas(df: Any, schema: Schema | None = None) Transaction[source]

Append a pandas DataFrame to the table.

Parameters:
  • df – pandas DataFrame to append

  • schema – Optional Schema. If None, uses the table’s current schema.

Returns:

Self for chaining

append_data(records: List[Dict[str, Any]], schema: Schema | None = None, partition_values: Dict[str, Any] | None = None) Transaction[source]

Append actual data records to the table by creating new data files.

CRITICAL FIX: Track written files for cleanup on rollback.

delete_files(file_paths: List[str]) Transaction[source]

Queue files to delete from the table

overwrite_by_filter(filter_func: Callable[[Any], bool]) Transaction[source]

Queue an overwrite operation by filter

expire_snapshots(older_than_ms: int) Transaction[source]

Queue snapshot expiration operation

commit() bool[source]

Commit the transaction with ACID properties using Optimistic Concurrency Control

rollback() bool[source]

Rollback the transaction

TransactionManager

Table

class datashard.Table(table_path: str, create_if_not_exists: bool = True)[source]

Bases: object

Main table interface with transaction support

__init__(table_path: str, create_if_not_exists: bool = True)[source]
new_transaction() Transaction[source]

Create a new transaction

current_snapshot() Snapshot | None[source]

Get the current snapshot

snapshot_by_id(snapshot_id: int) Snapshot | None[source]

Get a specific snapshot by ID

snapshots() List[Dict[str, Any]][source]

Get all snapshots

time_travel(snapshot_id: int | None = None, timestamp: int | None = None) Any[source]

Time travel to a specific snapshot or timestamp

append_data(files: List[DataFile]) bool[source]

Append data files to the table (convenience method)

append_pandas(df: Any, schema: Schema | None = None) bool[source]

Append pandas DataFrame to table (convenience method)

append_records(records: List[Dict[str, Any]], schema: Schema | None = None, partition_values: Dict[str, Any] | None = None) bool[source]

Append actual data records to the table by creating new data files (convenience method)

refresh() bool[source]

Refresh the table metadata from storage

garbage_collect(grace_period_ms: int = 3600000) Dict[str, int][source]

Delete orphaned files not referenced by any snapshot.

Parameters:

grace_period_ms – Only delete orphaned files older than this age (default 1 hour). This protects against deleting files currently being written.

Returns:

Dict with counts of deleted files by type.

row_count() int[source]

Get total row count from parquet metadata without scanning data.

This is a fast O(manifest_files) operation that reads only metadata, not the actual parquet data files. Use this for count-only queries instead of len(table.scan()).

Returns:

Total number of rows across all data files in current snapshot.

scan(columns: List[str] | None = None, filter: Dict[str, Any] | None = None, parallel: bool | int = False) List[Dict[str, Any]][source]

Scan all data in the table and return as list of records.

This method reads all parquet files in the data directory, providing a complete view of all data across all snapshots.

Parameters:
  • columns – Optional list of column names to read. If None, reads all columns.

  • filter

    Optional filter dict for predicate pushdown. .. rubric:: Examples

    {“status”: “failed”} # status == “failed” {“age”: (“>”, 18)} # age > 18 {“id”: (“in”, [1, 2, 3])} # id in [1, 2, 3] {“ts”: (“between”, (t1, t2))} # t1 <= ts <= t2

  • parallel – Enable parallel reading. - False: Sequential reading (default) - True: Use all CPU cores - int: Use specified number of threads

Returns:

List of dictionaries, each representing a record.

to_pandas(columns: List[str] | None = None, filter: Dict[str, Any] | None = None, parallel: bool | int = False) Any[source]

Read all data from the table as a pandas DataFrame.

This method reads all parquet files in the data directory, providing a complete view of all data across all snapshots.

Parameters:
  • columns – Optional list of column names to read. If None, reads all columns.

  • filter

    Optional filter dict for predicate pushdown. .. rubric:: Examples

    {“status”: “failed”} # status == “failed” {“age”: (“>”, 18)} # age > 18 {“id”: (“in”, [1, 2, 3])} # id in [1, 2, 3] {“ts”: (“between”, (t1, t2))} # t1 <= ts <= t2

  • parallel – Enable parallel reading. - False: Sequential reading (default) - True: Use all CPU cores - int: Use specified number of threads

Returns:

pandas DataFrame with all records.

Raises:

ImportError – If pandas is not installed.

scan_batches(batch_size: int = 10000, columns: List[str] | None = None, filter: Dict[str, Any] | None = None) Iterator[List[Dict[str, Any]]][source]

Scan data in batches for memory-efficient processing.

Yields batches of records, processing one parquet file at a time using PyArrow’s iter_batches for memory efficiency.

Parameters:
  • batch_size – Approximate number of records per batch

  • columns – Optional column projection

  • filter – Optional predicate pushdown filter

Yields:

List of records (dicts) per batch

iter_records(columns: List[str] | None = None, filter: Dict[str, Any] | None = None) Iterator[Dict[str, Any]][source]

Iterate over records one at a time.

Memory efficient - only one record in memory at a time. Ideal for row-by-row processing of large tables.

Parameters:
  • columns – Optional column projection

  • filter – Optional predicate pushdown filter

Yields:

Individual records as dicts

iter_pandas(chunksize: int = 50000, columns: List[str] | None = None, filter: Dict[str, Any] | None = None) Iterator[Any][source]

Iterate over data as pandas DataFrame chunks.

Memory efficient - only one chunk in memory at a time. Ideal for processing large tables with pandas operations.

Parameters:
  • chunksize – Approximate rows per chunk

  • columns – Optional column projection

  • filter – Optional predicate pushdown filter

Yields:

pandas DataFrame chunks

Raises:

ImportError – If pandas is not installed.

Table.scan() Method

Read data from the table with optional filtering and parallel processing.

Parameters:

  • columns (List[str], optional): Column names to read. If None, reads all columns.

  • filter (Dict[str, Any], optional): Filter dict for predicate pushdown.

  • parallel (bool or int, optional): Enable parallel reading. True uses all cores, int specifies thread count.

Filter Syntax:

# Equality
{"column": value}
{"column": ("==", value)}

# Comparison
{"column": (">", value)}
{"column": (">=", value)}
{"column": ("<", value)}
{"column": ("<=", value)}

# Membership
{"column": ("in", [v1, v2, v3])}
{"column": ("not_in", [v1, v2])}

# Range
{"column": ("between", (low, high))}

Example:

# Basic scan
records = table.scan()

# With filter
records = table.scan(filter={"status": "active"})

# With columns and parallel
records = table.scan(columns=["id", "name"], parallel=True)

Table.to_pandas() Method

Read data from the table as a pandas DataFrame.

Parameters:

Same as scan() plus returns a pandas DataFrame instead of list of dicts.

Example:

df = table.to_pandas(filter={"age": (">", 30)}, parallel=True)

Table.scan_batches() Method

Iterate over data in batches for memory-efficient processing.

Parameters:

  • batch_size (int): Approximate number of records per batch. Default: 10000.

  • columns (List[str], optional): Column names to read.

  • filter (Dict[str, Any], optional): Filter dict for predicate pushdown.

Example:

for batch in table.scan_batches(batch_size=5000):
    process(batch)  # batch is List[Dict]

Table.iter_records() Method

Iterate over records one at a time.

Parameters:

  • columns (List[str], optional): Column names to read.

  • filter (Dict[str, Any], optional): Filter dict for predicate pushdown.

Example:

for record in table.iter_records(filter={"status": "failed"}):
    handle_failure(record)

Table.iter_pandas() Method

Iterate over data as pandas DataFrame chunks.

Parameters:

  • chunksize (int): Approximate rows per chunk. Default: 50000.

  • columns (List[str], optional): Column names to read.

  • filter (Dict[str, Any], optional): Filter dict for predicate pushdown.

Example:

for chunk_df in table.iter_pandas(chunksize=10000):
    results.append(chunk_df.groupby('x').sum())