Time Travel and Historical Data Access
One of datashard’s most powerful features is time travel, which allows you to query data as it existed at any point in time.
Understanding Time Travel
Time travel in datashard allows you to access historical states of your data by navigating between snapshots. Each snapshot represents the table’s state at a specific point in time.
Snapshot Basics
What is a Snapshot?
A snapshot in datashard captures the complete state of a table at a specific moment:
Snapshot ID: A unique identifier for the snapshot
Timestamp: When the snapshot was created
Manifest List: References to all data files at that point in time
Operation: What operation created the snapshot (append, replace, delete)
Parent ID: Link to the previous snapshot for lineage tracking
Creating Snapshots
Snapshots are automatically created when you commit a transaction:
from datashard import create_table, Schema
# Define schema
history_schema = Schema(
schema_id=1,
fields=[
{"id": 1, "name": "event_id", "type": "long", "required": True},
{"id": 2, "name": "event_name", "type": "string", "required": True},
{"id": 3, "name": "timestamp", "type": "long", "required": True}
]
)
# Create table
table = create_table("/path/to/historical_table", history_schema)
# Each append creates a new snapshot
event1 = [{"event_id": 1, "event_name": "user_login", "timestamp": 1700000000000}]
table.append_records(records=event1, schema=history_schema) # Creates snapshot 1
event2 = [{"event_id": 2, "event_name": "user_logout", "timestamp": 1700000100000}]
table.append_records(records=event2, schema=history_schema) # Creates snapshot 2
Accessing Snapshots
Listing Snapshots
# Get all snapshots for the table
snapshots = table.snapshots()
print(f"Table has {len(snapshots)} snapshots")
for snapshot in snapshots:
print(f"Snapshot {snapshot['snapshot_id']} at {snapshot['timestamp']}")
print(f" Operation: {snapshot['operation']}")
print(f" Parent ID: {snapshot['parent_id']}")
Getting Current Snapshot
# Get the current (most recent) snapshot
current_snapshot = table.current_snapshot()
if current_snapshot:
print(f"Current snapshot ID: {current_snapshot.snapshot_id}")
print(f"Created at: {current_snapshot.timestamp_ms}")
Accessing Specific Snapshots
# Get a specific snapshot by ID
specific_snapshot = table.snapshot_by_id(1234567890)
if specific_snapshot:
print(f"Found snapshot: {specific_snapshot.snapshot_id}")
Time Travel Operations
Travel to Specific Snapshot
# Time travel to a specific snapshot
historical_data = table.time_travel(snapshot_id=1234567890)
# The returned data represents the table state at that snapshot
if historical_data:
print(f"Traveled to snapshot: {historical_data.snapshot_id}")
Travel to Specific Time
from datashard import create_table, Schema
import time
# Define schema
event_schema = Schema(
schema_id=1,
fields=[
{"id": 1, "name": "event_id", "type": "long", "required": True},
{"id": 2, "name": "event_type", "type": "string", "required": True},
{"id": 3, "name": "value", "type": "long", "required": True}
]
)
table = create_table("/path/to/table", event_schema)
# Record a timestamp before making changes
before_time_ms = int(time.time() * 1000)
# Make some changes
changes = [{"event_id": 3, "event_type": "update", "value": 100}]
table.append_records(records=changes, schema=event_schema)
# Later, travel back to the state before the changes
historical_data = table.time_travel(timestamp=before_time_ms)
if historical_data:
print(f"Traveled to state at timestamp: {before_time_ms}")
Practical Time Travel Example
from datashard import create_table, Schema
from datetime import datetime, timedelta
import time
# Define schema for events
demo_schema = Schema(
schema_id=1,
fields=[
{"id": 1, "name": "event", "type": "string", "required": True},
{"id": 2, "name": "value", "type": "long", "required": True}
]
)
# Create a table and add data over time
historical_table = create_table("/tmp/time_travel_demo", demo_schema)
# Add initial data
initial_data = [{"event": "start", "value": 100}]
historical_table.append_records(records=initial_data, schema=demo_schema)
# Record the state after initial data
initial_time = int(time.time() * 1000)
time.sleep(1) # Wait a moment
# Add more data
update_data = [{"event": "update", "value": 200}]
historical_table.append_records(records=update_data, schema=demo_schema)
# Travel back to the initial state
initial_state = historical_table.time_travel(timestamp=initial_time)
print(f"State at initial_time: {initial_state.snapshot_id if initial_state else 'None'}")
# Travel to the current state
current_state = historical_table.time_travel()
print(f"Current state: {current_state.snapshot_id if current_state else 'None'}")
Managing Snapshots
Snapshot History
# Get the complete history of snapshots
history = table.metadata_manager.get_snapshot_history()
for entry in history:
print(f"Snapshot {entry.snapshot_id} at {entry.timestamp_ms}")
Snapshot Retention
# You can expire old snapshots to save space
with table.new_transaction() as tx:
# Expire snapshots older than 1 week (in milliseconds)
one_week_ms = 7 * 24 * 60 * 60 * 1000
tx.expire_snapshots(older_than_ms=one_week_ms)
tx.commit()
Use Cases for Time Travel
Data Recovery
Time travel is invaluable for data recovery scenarios:
from datashard import create_table, Schema
# Define schema for user data
user_schema = Schema(
schema_id=1,
fields=[
{"id": 1, "name": "user_id", "type": "long", "required": True},
{"id": 2, "name": "username", "type": "string", "required": True},
{"id": 3, "name": "status", "type": "string", "required": True}
]
)
table = create_table("/path/to/users", user_schema)
# Add good data
good_data = [{"user_id": 1, "username": "alice", "status": "active"}]
table.append_records(records=good_data, schema=user_schema)
# Get the good snapshot ID
good_snapshot = table.current_snapshot()
good_snapshot_id = good_snapshot.snapshot_id if good_snapshot else None
# If bad data was accidentally written, recover from a previous snapshot
if good_snapshot_id:
recovered_data = table.time_travel(snapshot_id=good_snapshot_id)
print(f"Recovered to known good state: {recovered_data.snapshot_id}")
Reproducible Analysis
from datashard import create_table, Schema
import time
# Define schema for ML training data
training_schema = Schema(
schema_id=1,
fields=[
{"id": 1, "name": "sample_id", "type": "long", "required": True},
{"id": 2, "name": "feature_1", "type": "double", "required": True},
{"id": 3, "name": "feature_2", "type": "double", "required": True},
{"id": 4, "name": "label", "type": "long", "required": True}
]
)
table = create_table("/path/to/ml_data", training_schema)
# Add training data
training_data = [
{"sample_id": 1, "feature_1": 0.5, "feature_2": 0.8, "label": 1},
{"sample_id": 2, "feature_1": 0.3, "feature_2": 0.6, "label": 0}
]
table.append_records(records=training_data, schema=training_schema)
# For ML/AI workflows, ensure analysis is reproducible
analysis_timestamp = int(time.time() * 1000)
# Perform analysis on historical data
historical_analysis_data = table.time_travel(timestamp=analysis_timestamp)
# Now your analysis is tied to a specific point in time
# and can be reproduced later
Auditing and Compliance
from datashard import create_table, Schema
# Define schema for audit logs
audit_schema = Schema(
schema_id=1,
fields=[
{"id": 1, "name": "audit_id", "type": "long", "required": True},
{"id": 2, "name": "operation", "type": "string", "required": True},
{"id": 3, "name": "user", "type": "string", "required": True},
{"id": 4, "name": "timestamp", "type": "long", "required": True}
]
)
table = create_table("/path/to/audit_log", audit_schema)
# Add audit records
audit_records = [
{"audit_id": 1, "operation": "INSERT", "user": "admin", "timestamp": 1700000000000},
{"audit_id": 2, "operation": "UPDATE", "user": "alice", "timestamp": 1700000100000}
]
table.append_records(records=audit_records, schema=audit_schema)
# Track how data has changed over time for compliance
snapshots = table.snapshots()
for snapshot in snapshots:
print(f"Audit log - Snapshot {snapshot['snapshot_id']} created by {snapshot['operation']} operation")
Advanced Time Travel
Working with Multiple Tables
from datashard import create_table, Schema
# Define schemas for related tables
orders_schema = Schema(
schema_id=1,
fields=[
{"id": 1, "name": "order_id", "type": "long", "required": True},
{"id": 2, "name": "customer_id", "type": "long", "required": True},
{"id": 3, "name": "amount", "type": "double", "required": True}
]
)
customers_schema = Schema(
schema_id=1,
fields=[
{"id": 1, "name": "customer_id", "type": "long", "required": True},
{"id": 2, "name": "name", "type": "string", "required": True}
]
)
# Synchronize time travel across multiple related tables
# This requires careful coordination of snapshot IDs across tables
table1 = create_table("/tmp/orders", orders_schema)
table2 = create_table("/tmp/customers", customers_schema)
# Add data to both tables
table1.append_records(
records=[{"order_id": 1, "customer_id": 100, "amount": 99.99}],
schema=orders_schema
)
table2.append_records(
records=[{"customer_id": 100, "name": "Alice"}],
schema=customers_schema
)
# Get a snapshot ID from one table
ref_snapshot = table1.current_snapshot()
# Travel both tables to consistent points in time
if ref_snapshot:
table1_data = table1.time_travel(snapshot_id=ref_snapshot.snapshot_id)
table2_data = table2.time_travel(snapshot_id=ref_snapshot.snapshot_id)
Performance Considerations
Time travel queries may be slower than current data access
Keeping many snapshots requires additional storage
Consider snapshot retention policies for large datasets
Accessing very old snapshots may require reading more historical metadata files
Complete Time Travel Workflow
from datashard import create_table, Schema
import time
import shutil
# 1. Define schema
workflow_schema = Schema(
schema_id=1,
fields=[
{"id": 1, "name": "step_id", "type": "long", "required": True},
{"id": 2, "name": "step_name", "type": "string", "required": True},
{"id": 3, "name": "status", "type": "string", "required": True},
{"id": 4, "name": "timestamp", "type": "long", "required": True}
]
)
# 2. Create table
table = create_table("/tmp/workflow_history", workflow_schema)
# 3. Add initial data
step1 = [{"step_id": 1, "step_name": "init", "status": "completed", "timestamp": int(time.time() * 1000)}]
table.append_records(records=step1, schema=workflow_schema)
snapshot1 = table.current_snapshot()
snapshot1_id = snapshot1.snapshot_id if snapshot1 else None
print(f"Created snapshot 1: {snapshot1_id}")
time.sleep(0.1)
# 4. Add more data
step2 = [{"step_id": 2, "step_name": "process", "status": "completed", "timestamp": int(time.time() * 1000)}]
table.append_records(records=step2, schema=workflow_schema)
snapshot2 = table.current_snapshot()
snapshot2_id = snapshot2.snapshot_id if snapshot2 else None
print(f"Created snapshot 2: {snapshot2_id}")
# 5. Time travel to snapshot 1
if snapshot1_id:
historical = table.time_travel(snapshot_id=snapshot1_id)
print(f"Time traveled to snapshot: {historical.snapshot_id if historical else 'None'}")
# 6. List all snapshots
all_snapshots = table.snapshots()
print(f"Total snapshots: {len(all_snapshots)}")
# 7. Clean up
shutil.rmtree("/tmp/workflow_history")
print("Time travel workflow completed successfully")