Concurrency and Safe Multi-Process Access

This section explains how datashard enables safe concurrent access to data from multiple processes.

Concurrency Challenges

In traditional file-based data storage, concurrent access can lead to:

  • Data corruption when multiple processes write simultaneously

  • Inconsistent reads when processes read during write operations

  • Lost updates when processes overwrite each other’s changes

  • Race conditions that can cause unpredictable behavior

datashard solves these challenges through ACID transactions and Optimistic Concurrency Control (OCC).

Optimistic Concurrency Control (OCC)

How OCC Works

datashard uses Optimistic Concurrency Control to handle concurrent access:

  1. Read Phase: Each transaction reads the current state of the table

  2. Validation Phase: Before committing, the system checks if the table has changed

  3. Commit Phase: If no conflicts, the transaction commits; otherwise, it fails

from datashard import create_table, Schema

# Define schema
data_schema = Schema(
    schema_id=1,
    fields=[
        {"id": 1, "name": "record_id", "type": "long", "required": True},
        {"id": 2, "name": "value", "type": "string", "required": True}
    ]
)

table = create_table("/path/to/table", data_schema)

# OCC in action - automatic conflict detection
with table.new_transaction() as tx:
    # Transaction reads current state
    tx.append_data(records=[{"record_id": 1, "value": "A"}], schema=data_schema)
    # When commit() is called, system checks for conflicts
    success = tx.commit()
    # If another process modified the table since reading, commit fails

Handling Conflicts

from datashard import ConcurrentModificationException, create_table, Schema
import time

# Define schema
conflict_schema = Schema(
    schema_id=1,
    fields=[
        {"id": 1, "name": "record_id", "type": "long", "required": True},
        {"id": 2, "name": "value", "type": "string", "required": True}
    ]
)

table = create_table("/path/to/table", conflict_schema)

max_retries = 5

for attempt in range(max_retries):
    try:
        with table.new_transaction() as tx:
            # Perform operations
            tx.append_data(records=[{"record_id": 2, "value": "B"}], schema=conflict_schema)
            result = tx.commit()

            if result:
                print("Transaction succeeded")
                break
    except ConcurrentModificationException:
        print(f"Conflict detected, attempt {attempt + 1}")
        if attempt < max_retries - 1:
            # Wait before retrying (exponential backoff)
            time.sleep(0.01 * (2 ** attempt))
        else:
            print("Max retries exceeded, transaction failed")
            raise

Multiple Process Scenarios

Reading While Writing

datashard safely allows concurrent reads and writes:

from datashard import create_table, Schema
import time

# Define schema
concurrent_schema = Schema(
    schema_id=1,
    fields=[
        {"id": 1, "name": "record_id", "type": "long", "required": True},
        {"id": 2, "name": "value", "type": "string", "required": True}
    ]
)

table = create_table("/path/to/concurrent_table", concurrent_schema)

# Process A: Reading data
def read_process():
    while True:
        # Readers always see a consistent snapshot of the data
        current_snapshot = table.current_snapshot()
        # Even if writers are modifying the table, readers see
        # the state as of the snapshot creation time
        time.sleep(1)

# Process B: Writing data
def write_process():
    while True:
        write_data = [{"record_id": 3, "value": "C"}]
        table.append_records(records=write_data, schema=concurrent_schema)
        time.sleep(2)

Multiple Writers

Multiple processes can safely write to the same table:

from datashard import create_table, Schema
import time

# Define schema
writer_schema = Schema(
    schema_id=1,
    fields=[
        {"id": 1, "name": "record_id", "type": "long", "required": True},
        {"id": 2, "name": "source", "type": "string", "required": True}
    ]
)

table = create_table("/path/to/multiwriter_table", writer_schema)

# Writer Process 1
def writer_1():
    for i in range(100):
        record_data = [{"record_id": i, "source": "writer1"}]
        success = table.append_records(records=record_data, schema=writer_schema)
        if not success:
            print(f"Writer 1 failed at iteration {i}")
        time.sleep(0.1)

# Writer Process 2
def writer_2():
    for i in range(100):
        record_data = [{"record_id": i, "source": "writer2"}]
        success = table.append_records(records=record_data, schema=writer_schema)
        if not success:
            print(f"Writer 2 failed at iteration {i}")
        time.sleep(0.1)

Best Practices for Concurrency

Retry Logic Implementation

import random
import time
from datashard import ConcurrentModificationException, create_table, Schema

# Define schema
retry_schema = Schema(
    schema_id=1,
    fields=[
        {"id": 1, "name": "record_id", "type": "long", "required": True},
        {"id": 2, "name": "data", "type": "string", "required": True}
    ]
)

table = create_table("/path/to/retry_table", retry_schema)

def safe_write_with_retry(table, records, schema, max_retries=5):
    for attempt in range(max_retries):
        try:
            result = table.append_records(records=records, schema=schema)
            return result
        except ConcurrentModificationException:
            if attempt == max_retries - 1:
                raise
            # Jitter to avoid thundering herd problem
            sleep_time = 0.01 * (2 ** attempt) + random.uniform(0, 0.01)
            time.sleep(sleep_time)

    return False  # Should not reach here

# Usage
sample_records = [{"record_id": 1, "data": "example"}]
safe_write_with_retry(table, sample_records, retry_schema)

Batch Operations

For high-throughput scenarios, batch operations reduce conflicts:

from datashard import create_table, Schema
import time

# Define schema
batch_schema = Schema(
    schema_id=1,
    fields=[
        {"id": 1, "name": "record_id", "type": "long", "required": True},
        {"id": 2, "name": "batch_data", "type": "string", "required": True}
    ]
)

table = create_table("/path/to/batch_table", batch_schema)

def batch_write_process(table, all_records, schema, batch_size=100):
    for i in range(0, len(all_records), batch_size):
        batch = all_records[i:i + batch_size]

        success = table.append_records(records=batch, schema=schema)

        if not success:
            print(f"Batch {i//batch_size} failed, retrying...")
            # Implement retry logic here

        time.sleep(0.01)  # Brief pause between batches

# Example usage
all_records = [{"record_id": i, "batch_data": f"data_{i}"} for i in range(1000)]
batch_write_process(table, all_records, batch_schema, batch_size=100)

Transaction Scope

Keep transactions as short as possible:

from datashard import create_table, Schema

# Define schema
scope_schema = Schema(
    schema_id=1,
    fields=[
        {"id": 1, "name": "record_id", "type": "long", "required": True},
        {"id": 2, "name": "value", "type": "string", "required": True}
    ]
)

table = create_table("/path/to/scope_table", scope_schema)

# Good: Short transaction
def good_example(table, records, schema):
    # Process data BEFORE transaction
    processed_records = [process_record(r) for r in records]
    # Short transaction to just write
    return table.append_records(records=processed_records, schema=schema)

# Avoid: Long-running transaction
def bad_example(table, records, schema):
    with table.new_transaction() as tx:
        # Processing large amounts of data inside transaction
        processed_records = []
        for record in records:
            # Expensive processing that takes time
            processed_record = expensive_processing(record)
            processed_records.append(processed_record)

        tx.append_data(records=processed_records, schema=schema)
        return tx.commit()

Monitoring Concurrency

Detecting High Contention

Monitor for frequent conflicts that may indicate high contention:

import logging
import time
from datashard import ConcurrentModificationException, create_table, Schema

# Define schema
monitor_schema = Schema(
    schema_id=1,
    fields=[
        {"id": 1, "name": "record_id", "type": "long", "required": True},
        {"id": 2, "name": "data", "type": "string", "required": True}
    ]
)

table = create_table("/path/to/monitor_table", monitor_schema)

def monitored_write(table, records, schema, max_retries=5):
    start_time = time.time()
    retry_count = 0

    for attempt in range(max_retries):
        try:
            result = table.append_records(records=records, schema=schema)

            end_time = time.time()
            logging.info(f"Transaction completed in {end_time - start_time:.3f}s, "
                       f"with {retry_count} retries")
            return result
        except ConcurrentModificationException:
            retry_count += 1
            logging.warning(f"Transaction conflict, retry {retry_count}/{max_retries}")

            if attempt == max_retries - 1:
                logging.error("Transaction failed after max retries")
                raise

            time.sleep(0.01 * (2 ** attempt))

# Example usage
sample_records = [{"record_id": 1, "data": "monitored"}]
monitored_write(table, sample_records, monitor_schema)

Performance Considerations

  • OCC works best with low-to-moderate contention

  • High contention scenarios may require application-level coordination

  • Consider partitioning strategies to reduce contention on hot data

  • Monitor conflict rates to optimize write patterns

Advanced Concurrency Patterns

Leader/Follower Pattern

For scenarios requiring strict ordering:

import os
import tempfile
import time
from datashard import create_table, Schema

# Define schema
leader_schema = Schema(
    schema_id=1,
    fields=[
        {"id": 1, "name": "record_id", "type": "long", "required": True},
        {"id": 2, "name": "data", "type": "string", "required": True}
    ]
)

table = create_table("/path/to/leader_table", leader_schema)

def coordinated_write(table, records, schema, process_id):
    # Use file-based coordination
    lock_file = os.path.join(tempfile.gettempdir(), f"datashard_lock_{table.table_path}")

    while True:
        if try_acquire_lock(lock_file, process_id):
            try:
                return table.append_records(records=records, schema=schema)
            finally:
                release_lock(lock_file, process_id)
        else:
            time.sleep(0.01)  # Wait and retry

Partition-Aware Writing

Reduce conflicts by organizing writes around partitions:

from datashard import create_table, Schema

# Define schema with partition support
partition_schema = Schema(
    schema_id=1,
    fields=[
        {"id": 1, "name": "record_id", "type": "long", "required": True},
        {"id": 2, "name": "data", "type": "string", "required": True},
        {"id": 3, "name": "partition", "type": "string", "required": True}
    ]
)

table = create_table("/path/to/partitioned_table", partition_schema)

def partitioned_write(table, record_groups_by_partition, schema):
    """Write to different partitions to reduce conflicts"""
    results = []

    for partition_value, records in record_groups_by_partition.items():
        with table.new_transaction() as tx:
            tx.append_data(
                records=records,
                schema=schema,
                partition_values={"partition": partition_value}
            )
            results.append(tx.commit())

    return all(results)

# Example usage
record_groups = {
    "2024-01": [{"record_id": 1, "data": "jan_data", "partition": "2024-01"}],
    "2024-02": [{"record_id": 2, "data": "feb_data", "partition": "2024-02"}]
}
partitioned_write(table, record_groups, partition_schema)

These patterns help optimize concurrent access based on your specific workload requirements.