Performance Optimization
This section covers best practices and techniques for optimizing datashard performance.
File Format Selection
Choosing the Right Format
datashard supports multiple file formats, each with different performance characteristics:
Parquet
Best for analytical workloads with complex queries
Excellent compression ratios
Columnar storage enables efficient scans of specific columns
Supports predicate pushdown for faster filtering
from datashard import FileFormat, DataFile
# Parquet is typically the best choice for most use cases
parquet_file = DataFile(
file_path="data.parquet",
file_format=FileFormat.PARQUET,
partition_values={},
record_count=1000,
file_size_in_bytes=50000
)
Avro
Good for schema evolution scenarios
Efficient for append-heavy workloads
Good compression with complex nested types
Schema stored with data, self-describing
avro_file = DataFile(
file_path="data.avro",
file_format=FileFormat.AVRO,
partition_values={},
record_count=1000,
file_size_in_bytes=45000
)
ORC
Optimized for read-heavy workloads
Good compression and predicate pushdown
Built-in ACID transaction support in Hadoop ecosystems
Partitioning Strategies
Vertical Partitioning
Organize data by columns frequently accessed together:
from datashard import create_table, Schema
# Define schema for hot data
hot_schema = Schema(
schema_id=1,
fields=[
{"id": 1, "name": "record_id", "type": "long", "required": True},
{"id": 2, "name": "name", "type": "string", "required": True},
{"id": 3, "name": "status", "type": "string", "required": True}
]
)
table = create_table("/path/to/partitioned_table", hot_schema)
# Good: Separate frequently accessed columns from rarely accessed ones
with table.new_transaction() as tx:
# Hot data partition
tx.append_data(
records=[{"record_id": 1, "name": "Alice", "status": "active"}],
schema=hot_schema,
partition_values={"type": "hot"}
)
tx.commit()
Horizontal Partitioning
Organize data by frequently queried dimensions:
from datashard import create_table, Schema
import datetime
# Define schema for time-series data
event_schema = Schema(
schema_id=1,
fields=[
{"id": 1, "name": "event", "type": "string", "required": True},
{"id": 2, "name": "user_id", "type": "long", "required": True}
]
)
table = create_table("/path/to/events", event_schema)
# Partition by time for time-series data
today = datetime.date.today()
with table.new_transaction() as tx:
tx.append_data(
records=[{"event": "login", "user_id": 123}],
schema=event_schema,
partition_values={
"year": today.year,
"month": today.month,
"day": today.day
}
)
tx.commit()
Transaction Optimization
Batch Operations
Minimize transaction overhead by batching operations:
from datashard import create_table, Schema
# Define schema
batch_schema = Schema(
schema_id=1,
fields=[
{"id": 1, "name": "record_id", "type": "long", "required": True},
{"id": 2, "name": "value", "type": "string", "required": True}
]
)
table = create_table("/path/to/batch_table", batch_schema)
# Efficient: Single transaction for multiple records
batch_records = []
for i in range(1000):
batch_records.append({"record_id": i, "value": f"value_{i}"})
with table.new_transaction() as tx:
tx.append_data(records=batch_records, schema=batch_schema)
tx.commit()
Transaction Size Guidelines
Keep individual transactions under 100MB when possible
For large operations, split into multiple smaller transactions
Monitor for timeout errors which may indicate transactions are too large
from datashard import create_table, Schema
# Define schema
large_schema = Schema(
schema_id=1,
fields=[
{"id": 1, "name": "record_id", "type": "long", "required": True},
{"id": 2, "name": "data", "type": "string", "required": True}
]
)
table = create_table("/path/to/large_table", large_schema)
def safe_large_write(table, large_dataset, schema, batch_size=500):
for i in range(0, len(large_dataset), batch_size):
batch = large_dataset[i:i + batch_size]
with table.new_transaction() as tx:
tx.append_data(records=batch, schema=schema)
success = tx.commit()
if not success:
# Handle failure and possibly retry
break
# Example usage
large_data = [{"record_id": i, "data": f"data_{i}"} for i in range(10000)]
safe_large_write(table, large_data, large_schema, batch_size=500)
Memory Management
Data File Management
Control memory usage when working with large datasets:
from datashard import create_table, Schema
import gc
# Define schema
memory_schema = Schema(
schema_id=1,
fields=[
{"id": 1, "name": "record_id", "type": "long", "required": True},
{"id": 2, "name": "data", "type": "string", "required": True}
]
)
table = create_table("/path/to/memory_table", memory_schema)
def memory_efficient_write(table, large_records, schema):
# Process in chunks to control memory usage
chunk_size = 100
for i in range(0, len(large_records), chunk_size):
chunk = large_records[i:i + chunk_size]
with table.new_transaction() as tx:
tx.append_data(records=chunk, schema=schema)
tx.commit()
# Explicitly clean up if needed
if i % (chunk_size * 10) == 0: # Every 10 chunks
gc.collect()
# Example usage
large_dataset = [{"record_id": i, "data": f"data_{i}"} for i in range(5000)]
memory_efficient_write(table, large_dataset, memory_schema)
Metadata Cache Optimization
The metadata manager caches information to improve performance:
# Efficiently refresh metadata only when needed
current_metadata = table.metadata_manager.refresh()
# Reuse metadata for multiple operations in a short period
snapshot = table.current_snapshot() # Uses cached metadata
history = table.metadata_manager.get_snapshot_history() # Also uses cache
Concurrency Optimization
Reducing Contention
Minimize conflicts in high-concurrency scenarios:
from datashard import create_table, Schema
import time
import random
# Define schema
concurrency_schema = Schema(
schema_id=1,
fields=[
{"id": 1, "name": "record_id", "type": "long", "required": True},
{"id": 2, "name": "data", "type": "string", "required": True}
]
)
table = create_table("/path/to/concurrent_table", concurrency_schema)
def low_contention_write(table, record, schema, base_delay=0.01):
# Add jitter to reduce thundering herd problem
jitter = random.uniform(0, 0.01)
time.sleep(base_delay + jitter)
with table.new_transaction() as tx:
tx.append_data(records=[record], schema=schema)
return tx.commit()
# Example usage
sample_record = {"record_id": 1, "data": "example"}
low_contention_write(table, sample_record, concurrency_schema)
Parallel Processing
Structure operations to allow parallel execution:
from datashard import create_table, Schema
import concurrent.futures
# Define schema
parallel_schema = Schema(
schema_id=1,
fields=[
{"id": 1, "name": "record_id", "type": "long", "required": True},
{"id": 2, "name": "value", "type": "double", "required": True}
]
)
table = create_table("/path/to/parallel_table", parallel_schema)
def parallel_writes(table, datasets, schema, max_workers=4):
def write_dataset(data_subset):
with table.new_transaction() as tx:
tx.append_data(records=data_subset, schema=schema)
return tx.commit()
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(write_dataset, subset) for subset in datasets]
results = [future.result() for future in futures]
return all(results)
# Example usage
datasets = [
[{"record_id": i, "value": float(i)} for i in range(0, 250)],
[{"record_id": i, "value": float(i)} for i in range(250, 500)],
[{"record_id": i, "value": float(i)} for i in range(500, 750)],
[{"record_id": i, "value": float(i)} for i in range(750, 1000)]
]
parallel_writes(table, datasets, parallel_schema, max_workers=4)
Monitoring and Profiling
Performance Metrics
Track key performance indicators:
from datashard import create_table, Schema
import time
from collections import defaultdict
# Define schema
metrics_schema = Schema(
schema_id=1,
fields=[
{"id": 1, "name": "metric_id", "type": "long", "required": True},
{"id": 2, "name": "value", "type": "double", "required": True}
]
)
table = create_table("/path/to/metrics_table", metrics_schema)
class PerformanceTracker:
def __init__(self):
self.metrics = defaultdict(list)
def time_operation(self, name, operation, *args, **kwargs):
start = time.time()
result = operation(*args, **kwargs)
duration = time.time() - start
self.metrics[name].append(duration)
return result
def get_avg_time(self, name):
times = self.metrics[name]
return sum(times) / len(times) if times else 0
# Usage
tracker = PerformanceTracker()
sample_records = [{"metric_id": 1, "value": 42.0}]
success = tracker.time_operation("append_records", table.append_records,
records=sample_records, schema=metrics_schema)
avg_time = tracker.get_avg_time("append_records")
print(f"Average append_records time: {avg_time:.4f}s")
Common Performance Issues
Slow Transaction Commits: Often caused by high contention; implement retry logic with backoff
High Memory Usage: Process large datasets in smaller chunks
Slow Metadata Refresh: Cache metadata when performing multiple operations
File I/O Bottlenecks: Ensure sufficient disk I/O capacity for your workload
Query Optimization (NEW in v0.3.3)
Predicate Pushdown
Filter at the parquet level to reduce I/O by 90%+:
# Without predicate pushdown (reads all data, filters in Python)
all_records = table.scan()
filtered = [r for r in all_records if r['status'] == 'failed'] # Slow!
# With predicate pushdown (filters at parquet level)
filtered = table.scan(filter={"status": "failed"}) # 90%+ faster!
# Multiple filters (AND)
results = table.scan(filter={
"status": "active",
"age": (">=", 30)
})
# Comparison operators
table.scan(filter={"value": (">", 100)})
table.scan(filter={"value": ("<=", 50)})
# Membership filters
table.scan(filter={"category": ("in", ["A", "B", "C"])})
# Range filters
table.scan(filter={"timestamp": ("between", (start_ts, end_ts))})
Partition Pruning
DataShard automatically skips files that cannot contain matching records:
# Column bounds (min/max) are computed during write
# and stored in manifest files
# When you filter, files are pruned based on bounds:
# - File with timestamp range [1000, 2000] is skipped
# when filtering for timestamp > 3000
# This happens automatically - no configuration needed!
results = table.scan(filter={"timestamp": (">", 3000)})
Parallel Reading
Use multi-threaded I/O for 2-4x speedup:
# Sequential (default)
records = table.scan()
# Parallel with all CPU cores
records = table.scan(parallel=True)
# Parallel with specific thread count
records = table.scan(parallel=4)
# Combine with filter for maximum performance
records = table.scan(
filter={"status": "active"},
parallel=True
)
# Works with to_pandas too
df = table.to_pandas(parallel=True)
Column Projection
Only read the columns you need:
# Read all columns (slower)
records = table.scan()
# Read only needed columns (faster)
records = table.scan(columns=["id", "name"])
# Combine with filter and parallel
records = table.scan(
columns=["id", "status"],
filter={"status": "active"},
parallel=True
)
Streaming for Large Tables
Process large tables with constant memory:
# Instead of loading entire table into memory:
all_records = table.scan() # May OOM for large tables!
# Stream in batches:
for batch in table.scan_batches(batch_size=10000):
process(batch) # Only batch_size records in memory
# Stream record by record:
for record in table.iter_records():
handle(record) # Only 1 record in memory
# Stream as pandas chunks:
for chunk_df in table.iter_pandas(chunksize=50000):
results.append(chunk_df.groupby('x').sum())
Performance Comparison
Feature |
Without Optimization |
With Optimization |
|---|---|---|
Predicate Pushdown |
Read 100% of data |
Read only matching rows (90%+ reduction) |
Partition Pruning |
Scan all files |
Skip non-matching files (99% reduction for time-range) |
Parallel Reading |
1 file at a time |
N files concurrent (2-4x speedup) |
Column Projection |
Read all columns |
Read only needed columns (proportional reduction) |
Streaming |
Entire table in memory |
Constant memory (~100MB for any size) |
Hardware Considerations
For optimal performance, consider:
SSD Storage: Faster I/O for metadata and manifest files
Sufficient RAM: For caching frequently accessed metadata
Multiple CPU Cores: For handling concurrent transactions and parallel reads
Network Bandwidth: If using network-attached storage or S3