Performance Optimization

This section covers best practices and techniques for optimizing datashard performance.

File Format Selection

Choosing the Right Format

datashard supports multiple file formats, each with different performance characteristics:

Parquet

  • Best for analytical workloads with complex queries

  • Excellent compression ratios

  • Columnar storage enables efficient scans of specific columns

  • Supports predicate pushdown for faster filtering

from datashard import FileFormat, DataFile

# Parquet is typically the best choice for most use cases
parquet_file = DataFile(
    file_path="data.parquet",
    file_format=FileFormat.PARQUET,
    partition_values={},
    record_count=1000,
    file_size_in_bytes=50000
)

Avro

  • Good for schema evolution scenarios

  • Efficient for append-heavy workloads

  • Good compression with complex nested types

  • Schema stored with data, self-describing

avro_file = DataFile(
    file_path="data.avro",
    file_format=FileFormat.AVRO,
    partition_values={},
    record_count=1000,
    file_size_in_bytes=45000
)

ORC

  • Optimized for read-heavy workloads

  • Good compression and predicate pushdown

  • Built-in ACID transaction support in Hadoop ecosystems

Partitioning Strategies

Vertical Partitioning

Organize data by columns frequently accessed together:

from datashard import create_table, Schema

# Define schema for hot data
hot_schema = Schema(
    schema_id=1,
    fields=[
        {"id": 1, "name": "record_id", "type": "long", "required": True},
        {"id": 2, "name": "name", "type": "string", "required": True},
        {"id": 3, "name": "status", "type": "string", "required": True}
    ]
)

table = create_table("/path/to/partitioned_table", hot_schema)

# Good: Separate frequently accessed columns from rarely accessed ones
with table.new_transaction() as tx:
    # Hot data partition
    tx.append_data(
        records=[{"record_id": 1, "name": "Alice", "status": "active"}],
        schema=hot_schema,
        partition_values={"type": "hot"}
    )
    tx.commit()

Horizontal Partitioning

Organize data by frequently queried dimensions:

from datashard import create_table, Schema
import datetime

# Define schema for time-series data
event_schema = Schema(
    schema_id=1,
    fields=[
        {"id": 1, "name": "event", "type": "string", "required": True},
        {"id": 2, "name": "user_id", "type": "long", "required": True}
    ]
)

table = create_table("/path/to/events", event_schema)

# Partition by time for time-series data
today = datetime.date.today()
with table.new_transaction() as tx:
    tx.append_data(
        records=[{"event": "login", "user_id": 123}],
        schema=event_schema,
        partition_values={
            "year": today.year,
            "month": today.month,
            "day": today.day
        }
    )
    tx.commit()

Transaction Optimization

Batch Operations

Minimize transaction overhead by batching operations:

from datashard import create_table, Schema

# Define schema
batch_schema = Schema(
    schema_id=1,
    fields=[
        {"id": 1, "name": "record_id", "type": "long", "required": True},
        {"id": 2, "name": "value", "type": "string", "required": True}
    ]
)

table = create_table("/path/to/batch_table", batch_schema)

# Efficient: Single transaction for multiple records
batch_records = []
for i in range(1000):
    batch_records.append({"record_id": i, "value": f"value_{i}"})

with table.new_transaction() as tx:
    tx.append_data(records=batch_records, schema=batch_schema)
    tx.commit()

Transaction Size Guidelines

  • Keep individual transactions under 100MB when possible

  • For large operations, split into multiple smaller transactions

  • Monitor for timeout errors which may indicate transactions are too large

from datashard import create_table, Schema

# Define schema
large_schema = Schema(
    schema_id=1,
    fields=[
        {"id": 1, "name": "record_id", "type": "long", "required": True},
        {"id": 2, "name": "data", "type": "string", "required": True}
    ]
)

table = create_table("/path/to/large_table", large_schema)

def safe_large_write(table, large_dataset, schema, batch_size=500):
    for i in range(0, len(large_dataset), batch_size):
        batch = large_dataset[i:i + batch_size]
        with table.new_transaction() as tx:
            tx.append_data(records=batch, schema=schema)
            success = tx.commit()
            if not success:
                # Handle failure and possibly retry
                break

# Example usage
large_data = [{"record_id": i, "data": f"data_{i}"} for i in range(10000)]
safe_large_write(table, large_data, large_schema, batch_size=500)

Memory Management

Data File Management

Control memory usage when working with large datasets:

from datashard import create_table, Schema
import gc

# Define schema
memory_schema = Schema(
    schema_id=1,
    fields=[
        {"id": 1, "name": "record_id", "type": "long", "required": True},
        {"id": 2, "name": "data", "type": "string", "required": True}
    ]
)

table = create_table("/path/to/memory_table", memory_schema)

def memory_efficient_write(table, large_records, schema):
    # Process in chunks to control memory usage
    chunk_size = 100

    for i in range(0, len(large_records), chunk_size):
        chunk = large_records[i:i + chunk_size]

        with table.new_transaction() as tx:
            tx.append_data(records=chunk, schema=schema)
            tx.commit()

        # Explicitly clean up if needed
        if i % (chunk_size * 10) == 0:  # Every 10 chunks
            gc.collect()

# Example usage
large_dataset = [{"record_id": i, "data": f"data_{i}"} for i in range(5000)]
memory_efficient_write(table, large_dataset, memory_schema)

Metadata Cache Optimization

The metadata manager caches information to improve performance:

# Efficiently refresh metadata only when needed
current_metadata = table.metadata_manager.refresh()

# Reuse metadata for multiple operations in a short period
snapshot = table.current_snapshot()  # Uses cached metadata
history = table.metadata_manager.get_snapshot_history()  # Also uses cache

Concurrency Optimization

Reducing Contention

Minimize conflicts in high-concurrency scenarios:

from datashard import create_table, Schema
import time
import random

# Define schema
concurrency_schema = Schema(
    schema_id=1,
    fields=[
        {"id": 1, "name": "record_id", "type": "long", "required": True},
        {"id": 2, "name": "data", "type": "string", "required": True}
    ]
)

table = create_table("/path/to/concurrent_table", concurrency_schema)

def low_contention_write(table, record, schema, base_delay=0.01):
    # Add jitter to reduce thundering herd problem
    jitter = random.uniform(0, 0.01)
    time.sleep(base_delay + jitter)

    with table.new_transaction() as tx:
        tx.append_data(records=[record], schema=schema)
        return tx.commit()

# Example usage
sample_record = {"record_id": 1, "data": "example"}
low_contention_write(table, sample_record, concurrency_schema)

Parallel Processing

Structure operations to allow parallel execution:

from datashard import create_table, Schema
import concurrent.futures

# Define schema
parallel_schema = Schema(
    schema_id=1,
    fields=[
        {"id": 1, "name": "record_id", "type": "long", "required": True},
        {"id": 2, "name": "value", "type": "double", "required": True}
    ]
)

table = create_table("/path/to/parallel_table", parallel_schema)

def parallel_writes(table, datasets, schema, max_workers=4):
    def write_dataset(data_subset):
        with table.new_transaction() as tx:
            tx.append_data(records=data_subset, schema=schema)
            return tx.commit()

    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(write_dataset, subset) for subset in datasets]
        results = [future.result() for future in futures]

    return all(results)

# Example usage
datasets = [
    [{"record_id": i, "value": float(i)} for i in range(0, 250)],
    [{"record_id": i, "value": float(i)} for i in range(250, 500)],
    [{"record_id": i, "value": float(i)} for i in range(500, 750)],
    [{"record_id": i, "value": float(i)} for i in range(750, 1000)]
]
parallel_writes(table, datasets, parallel_schema, max_workers=4)

Monitoring and Profiling

Performance Metrics

Track key performance indicators:

from datashard import create_table, Schema
import time
from collections import defaultdict

# Define schema
metrics_schema = Schema(
    schema_id=1,
    fields=[
        {"id": 1, "name": "metric_id", "type": "long", "required": True},
        {"id": 2, "name": "value", "type": "double", "required": True}
    ]
)

table = create_table("/path/to/metrics_table", metrics_schema)

class PerformanceTracker:
    def __init__(self):
        self.metrics = defaultdict(list)

    def time_operation(self, name, operation, *args, **kwargs):
        start = time.time()
        result = operation(*args, **kwargs)
        duration = time.time() - start
        self.metrics[name].append(duration)
        return result

    def get_avg_time(self, name):
        times = self.metrics[name]
        return sum(times) / len(times) if times else 0

# Usage
tracker = PerformanceTracker()
sample_records = [{"metric_id": 1, "value": 42.0}]
success = tracker.time_operation("append_records", table.append_records,
                                 records=sample_records, schema=metrics_schema)
avg_time = tracker.get_avg_time("append_records")
print(f"Average append_records time: {avg_time:.4f}s")

Common Performance Issues

  1. Slow Transaction Commits: Often caused by high contention; implement retry logic with backoff

  2. High Memory Usage: Process large datasets in smaller chunks

  3. Slow Metadata Refresh: Cache metadata when performing multiple operations

  4. File I/O Bottlenecks: Ensure sufficient disk I/O capacity for your workload

Query Optimization (NEW in v0.3.3)

Predicate Pushdown

Filter at the parquet level to reduce I/O by 90%+:

# Without predicate pushdown (reads all data, filters in Python)
all_records = table.scan()
filtered = [r for r in all_records if r['status'] == 'failed']  # Slow!

# With predicate pushdown (filters at parquet level)
filtered = table.scan(filter={"status": "failed"})  # 90%+ faster!

# Multiple filters (AND)
results = table.scan(filter={
    "status": "active",
    "age": (">=", 30)
})

# Comparison operators
table.scan(filter={"value": (">", 100)})
table.scan(filter={"value": ("<=", 50)})

# Membership filters
table.scan(filter={"category": ("in", ["A", "B", "C"])})

# Range filters
table.scan(filter={"timestamp": ("between", (start_ts, end_ts))})

Partition Pruning

DataShard automatically skips files that cannot contain matching records:

# Column bounds (min/max) are computed during write
# and stored in manifest files

# When you filter, files are pruned based on bounds:
# - File with timestamp range [1000, 2000] is skipped
#   when filtering for timestamp > 3000

# This happens automatically - no configuration needed!
results = table.scan(filter={"timestamp": (">", 3000)})

Parallel Reading

Use multi-threaded I/O for 2-4x speedup:

# Sequential (default)
records = table.scan()

# Parallel with all CPU cores
records = table.scan(parallel=True)

# Parallel with specific thread count
records = table.scan(parallel=4)

# Combine with filter for maximum performance
records = table.scan(
    filter={"status": "active"},
    parallel=True
)

# Works with to_pandas too
df = table.to_pandas(parallel=True)

Column Projection

Only read the columns you need:

# Read all columns (slower)
records = table.scan()

# Read only needed columns (faster)
records = table.scan(columns=["id", "name"])

# Combine with filter and parallel
records = table.scan(
    columns=["id", "status"],
    filter={"status": "active"},
    parallel=True
)

Streaming for Large Tables

Process large tables with constant memory:

# Instead of loading entire table into memory:
all_records = table.scan()  # May OOM for large tables!

# Stream in batches:
for batch in table.scan_batches(batch_size=10000):
    process(batch)  # Only batch_size records in memory

# Stream record by record:
for record in table.iter_records():
    handle(record)  # Only 1 record in memory

# Stream as pandas chunks:
for chunk_df in table.iter_pandas(chunksize=50000):
    results.append(chunk_df.groupby('x').sum())

Performance Comparison

Query Optimization Impact

Feature

Without Optimization

With Optimization

Predicate Pushdown

Read 100% of data

Read only matching rows (90%+ reduction)

Partition Pruning

Scan all files

Skip non-matching files (99% reduction for time-range)

Parallel Reading

1 file at a time

N files concurrent (2-4x speedup)

Column Projection

Read all columns

Read only needed columns (proportional reduction)

Streaming

Entire table in memory

Constant memory (~100MB for any size)

Hardware Considerations

For optimal performance, consider:

  • SSD Storage: Faster I/O for metadata and manifest files

  • Sufficient RAM: For caching frequently accessed metadata

  • Multiple CPU Cores: For handling concurrent transactions and parallel reads

  • Network Bandwidth: If using network-attached storage or S3