transaction Module API
This section documents the transaction and table management functionality.
Classes
Transaction
- class datashard.Transaction(metadata_manager: MetadataManager, snapshot_manager: SnapshotManager, file_manager: FileManager)[source]
Bases:
objectRepresents a database transaction with ACID properties
- __init__(metadata_manager: MetadataManager, snapshot_manager: SnapshotManager, file_manager: FileManager)[source]
- begin() Transaction[source]
Start a new transaction
- append_files(files: List[DataFile]) Transaction[source]
Queue files to append to the table
- append_pandas(df: Any, schema: Schema | None = None) Transaction[source]
Append a pandas DataFrame to the table.
- Parameters:
df – pandas DataFrame to append
schema – Optional Schema. If None, uses the table’s current schema.
- Returns:
Self for chaining
- append_data(records: List[Dict[str, Any]], schema: Schema | None = None, partition_values: Dict[str, Any] | None = None) Transaction[source]
Append actual data records to the table by creating new data files.
CRITICAL FIX: Track written files for cleanup on rollback.
- delete_files(file_paths: List[str]) Transaction[source]
Queue files to delete from the table
- overwrite_by_filter(filter_func: Callable[[Any], bool]) Transaction[source]
Queue an overwrite operation by filter
- expire_snapshots(older_than_ms: int) Transaction[source]
Queue snapshot expiration operation
TransactionManager
Table
- class datashard.Table(table_path: str, create_if_not_exists: bool = True)[source]
Bases:
objectMain table interface with transaction support
- new_transaction() Transaction[source]
Create a new transaction
- time_travel(snapshot_id: int | None = None, timestamp: int | None = None) Any[source]
Time travel to a specific snapshot or timestamp
- append_data(files: List[DataFile]) bool[source]
Append data files to the table (convenience method)
- append_pandas(df: Any, schema: Schema | None = None) bool[source]
Append pandas DataFrame to table (convenience method)
- append_records(records: List[Dict[str, Any]], schema: Schema | None = None, partition_values: Dict[str, Any] | None = None) bool[source]
Append actual data records to the table by creating new data files (convenience method)
- garbage_collect(grace_period_ms: int = 3600000) Dict[str, int][source]
Delete orphaned files not referenced by any snapshot.
- Parameters:
grace_period_ms – Only delete orphaned files older than this age (default 1 hour). This protects against deleting files currently being written.
- Returns:
Dict with counts of deleted files by type.
- row_count() int[source]
Get total row count from parquet metadata without scanning data.
This is a fast O(manifest_files) operation that reads only metadata, not the actual parquet data files. Use this for count-only queries instead of len(table.scan()).
- Returns:
Total number of rows across all data files in current snapshot.
- scan(columns: List[str] | None = None, filter: Dict[str, Any] | None = None, parallel: bool | int = False) List[Dict[str, Any]][source]
Scan all data in the table and return as list of records.
This method reads all parquet files in the data directory, providing a complete view of all data across all snapshots.
- Parameters:
columns – Optional list of column names to read. If None, reads all columns.
filter –
Optional filter dict for predicate pushdown. .. rubric:: Examples
{“status”: “failed”} # status == “failed” {“age”: (“>”, 18)} # age > 18 {“id”: (“in”, [1, 2, 3])} # id in [1, 2, 3] {“ts”: (“between”, (t1, t2))} # t1 <= ts <= t2
parallel – Enable parallel reading. - False: Sequential reading (default) - True: Use all CPU cores - int: Use specified number of threads
- Returns:
List of dictionaries, each representing a record.
- to_pandas(columns: List[str] | None = None, filter: Dict[str, Any] | None = None, parallel: bool | int = False) Any[source]
Read all data from the table as a pandas DataFrame.
This method reads all parquet files in the data directory, providing a complete view of all data across all snapshots.
- Parameters:
columns – Optional list of column names to read. If None, reads all columns.
filter –
Optional filter dict for predicate pushdown. .. rubric:: Examples
{“status”: “failed”} # status == “failed” {“age”: (“>”, 18)} # age > 18 {“id”: (“in”, [1, 2, 3])} # id in [1, 2, 3] {“ts”: (“between”, (t1, t2))} # t1 <= ts <= t2
parallel – Enable parallel reading. - False: Sequential reading (default) - True: Use all CPU cores - int: Use specified number of threads
- Returns:
pandas DataFrame with all records.
- Raises:
ImportError – If pandas is not installed.
- scan_batches(batch_size: int = 10000, columns: List[str] | None = None, filter: Dict[str, Any] | None = None) Iterator[List[Dict[str, Any]]][source]
Scan data in batches for memory-efficient processing.
Yields batches of records, processing one parquet file at a time using PyArrow’s iter_batches for memory efficiency.
- Parameters:
batch_size – Approximate number of records per batch
columns – Optional column projection
filter – Optional predicate pushdown filter
- Yields:
List of records (dicts) per batch
- iter_records(columns: List[str] | None = None, filter: Dict[str, Any] | None = None) Iterator[Dict[str, Any]][source]
Iterate over records one at a time.
Memory efficient - only one record in memory at a time. Ideal for row-by-row processing of large tables.
- Parameters:
columns – Optional column projection
filter – Optional predicate pushdown filter
- Yields:
Individual records as dicts
- iter_pandas(chunksize: int = 50000, columns: List[str] | None = None, filter: Dict[str, Any] | None = None) Iterator[Any][source]
Iterate over data as pandas DataFrame chunks.
Memory efficient - only one chunk in memory at a time. Ideal for processing large tables with pandas operations.
- Parameters:
chunksize – Approximate rows per chunk
columns – Optional column projection
filter – Optional predicate pushdown filter
- Yields:
pandas DataFrame chunks
- Raises:
ImportError – If pandas is not installed.
Table.scan() Method
Read data from the table with optional filtering and parallel processing.
Parameters:
columns(List[str], optional): Column names to read. If None, reads all columns.filter(Dict[str, Any], optional): Filter dict for predicate pushdown.parallel(bool or int, optional): Enable parallel reading. True uses all cores, int specifies thread count.
Filter Syntax:
# Equality
{"column": value}
{"column": ("==", value)}
# Comparison
{"column": (">", value)}
{"column": (">=", value)}
{"column": ("<", value)}
{"column": ("<=", value)}
# Membership
{"column": ("in", [v1, v2, v3])}
{"column": ("not_in", [v1, v2])}
# Range
{"column": ("between", (low, high))}
Example:
# Basic scan
records = table.scan()
# With filter
records = table.scan(filter={"status": "active"})
# With columns and parallel
records = table.scan(columns=["id", "name"], parallel=True)
Table.to_pandas() Method
Read data from the table as a pandas DataFrame.
Parameters:
Same as scan() plus returns a pandas DataFrame instead of list of dicts.
Example:
df = table.to_pandas(filter={"age": (">", 30)}, parallel=True)
Table.scan_batches() Method
Iterate over data in batches for memory-efficient processing.
Parameters:
batch_size(int): Approximate number of records per batch. Default: 10000.columns(List[str], optional): Column names to read.filter(Dict[str, Any], optional): Filter dict for predicate pushdown.
Example:
for batch in table.scan_batches(batch_size=5000):
process(batch) # batch is List[Dict]
Table.iter_records() Method
Iterate over records one at a time.
Parameters:
columns(List[str], optional): Column names to read.filter(Dict[str, Any], optional): Filter dict for predicate pushdown.
Example:
for record in table.iter_records(filter={"status": "failed"}):
handle_failure(record)
Table.iter_pandas() Method
Iterate over data as pandas DataFrame chunks.
Parameters:
chunksize(int): Approximate rows per chunk. Default: 50000.columns(List[str], optional): Column names to read.filter(Dict[str, Any], optional): Filter dict for predicate pushdown.
Example:
for chunk_df in table.iter_pandas(chunksize=10000):
results.append(chunk_df.groupby('x').sum())