Lab(2): Thread Synchronization
Lab Overview
This lab builds upon Lab 1's introduction to concurrent programming with POSIX Threads (Pthreads). While Lab 1 demonstrated the power of parallelism, you may have noticed that when multiple threads access shared resources simultaneously, unpredictable behavior can occur. This lab addresses the critical challenge of thread synchronization and explores how to coordinate multiple threads safely when they share data.
You'll learn about race conditions - one of the most common and dangerous problems in parallel programming - and discover how to solve them using mutexes and semaphores. By the end of this lab, you'll understand why synchronization is essential and how to implement it correctly.
Lab Objectives
- Understand the concept of race conditions and why they occur
- Learn how to identify and reproduce race conditions in code
- Master the use of mutexes for thread synchronization
- Implement thread-safe shared data structures
- Understand semaphores and their applications
- Solve the classic Producer-Consumer problem
- Apply synchronization concepts to real-world scenarios
Prerequisites
- Completion of Lab 1 (Pthread basics)
- Understanding of thread creation and joining
- Basic knowledge of shared memory concepts
- Familiarity with pointers and dynamic memory allocation
Lab Submission
- Complete all exercises (1 and 2)
- Write your name and index in the first line of each program
- Take screenshots of your code including your name and index
- Take screenshots of all output screens showing different runs
- Put all your work in a Word document, convert to PDF, and upload to LMS
1. The Race Condition Problem
What is a Race Condition?
A race condition occurs when multiple threads access shared data concurrently, and the final result depends on the unpredictable timing of thread execution. The name comes from the fact that threads are "racing" to access the same resource, and the winner determines the outcome.
Race conditions are particularly dangerous because:
- They may not always manifest (the program might work correctly sometimes)
- They're difficult to debug due to their non-deterministic nature
- They can cause data corruption, crashes, or incorrect results
- They become more likely on faster machines or under heavy load
A Simple Example: Shared Counter
Let's start with a seemingly innocent example that demonstrates the race condition problem:
#include <iostream>
#include <pthread.h>
int shared_counter = 0; // Global variable shared by all threads
const int NUM_THREADS = 4;
const int INCREMENTS_PER_THREAD = 1000000;
void* increment_counter(void* arg) {
int thread_id = *static_cast<int*>(arg);
// Each thread increments the counter many times
for (int i = 0; i < INCREMENTS_PER_THREAD; i++) {
shared_counter++; // This is the problematic line!
}
std::cout << "Thread " << thread_id << " finished incrementing" << std::endl;
return nullptr;
}
int main() {
pthread_t threads[NUM_THREADS];
int thread_ids[NUM_THREADS];
std::cout << "Starting with counter = " << shared_counter << std::endl;
std::cout << "Expected final value = " << NUM_THREADS * INCREMENTS_PER_THREAD << std::endl;
// Create threads
for (int i = 0; i < NUM_THREADS; i++) {
thread_ids[i] = i;
pthread_create(&threads[i], nullptr, increment_counter, &thread_ids[i]);
}
// Wait for all threads to complete
for (int i = 0; i < NUM_THREADS; i++) {
pthread_join(threads[i], nullptr);
}
std::cout << "Final counter value = " << shared_counter << std::endl;
std::cout << "Expected: " << NUM_THREADS * INCREMENTS_PER_THREAD << std::endl;
std::cout << "Difference: " << (NUM_THREADS * INCREMENTS_PER_THREAD) - shared_counter << std::endl;
return 0;
}
Why Does This Happen?
The statement shared_counter++
appears simple, but it actually involves three separate operations at the machine code level:
- Load: Read the current value from memory into a CPU register
- Increment: Add 1 to the value in the register
- Store: Write the new value back to memory
Here's what can happen when two threads execute this "simultaneously":
Initial shared_counter = 100
Thread A Thread B
-------- --------
Load (gets 100)
Load (gets 100)
Increment (101)
Increment (101)
Store (writes 101)
Store (writes 101)
Final shared_counter = 101 (should be 102!)
This is called a lost update - Thread B's increment overwrites Thread A's work.
Sample Output (Results Will Vary!)
Starting with counter = 0
Expected final value = 4000000
Thread 0 finished incrementing
Thread 1 finished incrementing
Thread 2 finished incrementing
Thread 3 finished incrementing
Final counter value = 2847193
Expected: 4000000
Difference: 1152807
Notice how we lost over 1 million increments! The exact number will vary between runs because race conditions are non-deterministic.
2. Mutexes: The Solution to Race Conditions
What is a Mutex?
A mutex (short for "mutual exclusion") is a synchronization primitive that ensures only one thread can access a shared resource at a time. Think of it as a digital lock:
- Only one thread can "hold" the mutex at any given time
- Other threads must wait until the mutex is released
- This creates a critical section - code that only one thread can execute at a time
Mutex Operations
Pthreads provides several mutex functions:
pthread_mutex_t mutex; // Declare a mutex
pthread_mutex_init(&mutex, nullptr); // Initialize mutex
pthread_mutex_lock(&mutex); // Acquire mutex (blocks if unavailable)
pthread_mutex_unlock(&mutex); // Release mutex
pthread_mutex_destroy(&mutex); // Clean up mutex
Fixing Our Counter Example
Here's the corrected version using a mutex:
#include <iostream>
#include <pthread.h>
int shared_counter = 0;
pthread_mutex_t counter_mutex; // Mutex to protect shared_counter
const int NUM_THREADS = 4;
const int INCREMENTS_PER_THREAD = 1000000;
void* increment_counter_safe(void* arg) {
int thread_id = *static_cast<int*>(arg);
for (int i = 0; i < INCREMENTS_PER_THREAD; i++) {
// Critical Section: Only one thread can execute this at a time
pthread_mutex_lock(&counter_mutex); // Acquire lock
shared_counter++; // Safe to modify now
pthread_mutex_unlock(&counter_mutex); // Release lock
}
std::cout << "Thread " << thread_id << " finished incrementing" << std::endl;
return nullptr;
}
int main() {
pthread_t threads[NUM_THREADS];
int thread_ids[NUM_THREADS];
// Initialize the mutex
pthread_mutex_init(&counter_mutex, nullptr);
std::cout << "Starting with counter = " << shared_counter << std::endl;
std::cout << "Expected final value = " << NUM_THREADS * INCREMENTS_PER_THREAD << std::endl;
// Create threads
for (int i = 0; i < NUM_THREADS; i++) {
thread_ids[i] = i;
pthread_create(&threads[i], nullptr, increment_counter_safe, &thread_ids[i]);
}
// Wait for all threads to complete
for (int i = 0; i < NUM_THREADS; i++) {
pthread_join(threads[i], nullptr);
}
std::cout << "Final counter value = " << shared_counter << std::endl;
std::cout << "Expected: " << NUM_THREADS * INCREMENTS_PER_THREAD << std::endl;
std::cout << "Difference: " << (NUM_THREADS * INCREMENTS_PER_THREAD) - shared_counter << std::endl;
// Clean up the mutex
pthread_mutex_destroy(&counter_mutex);
return 0;
}
Sample Output (Now Consistent!)
Starting with counter = 0
Expected final value = 4000000
Thread 0 finished incrementing
Thread 1 finished incrementing
Thread 2 finished incrementing
Thread 3 finished incrementing
Final counter value = 4000000
Expected: 4000000
Difference: 0
Perfect! The mutex ensures that our counter is always correct, no matter how many times we run the program.
Critical Section Guidelines
When using mutexes, follow these important guidelines:
- Keep critical sections small: Only protect the minimum code necessary
- Always unlock: Every
lock()
must have a correspondingunlock()
- Avoid nested locks: Can lead to deadlock
- Initialize and destroy: Always initialize mutexes before use and destroy them when done
3. Understanding Semaphores
What is a Semaphore?
A semaphore is a more general synchronization primitive than a mutex. While a mutex is binary (locked or unlocked), a semaphore maintains a counter that represents the number of available resources.
Semaphores are fundamental synchronization tools in concurrent programming, used to manage access to shared resources and coordinate thread execution. They are widely utilized in POSIX-compliant systems, such as Unix-like operating systems, to handle multi-threaded applications.
A semaphore is an integer variable manipulated through two atomic operations:
- Wait (P): Decrements the semaphore; if the value is zero, the thread blocks until it becomes positive.
- Signal (V): Increments the semaphore, potentially unblocking a waiting thread.
Types of Semaphores
- Binary Semaphore: Limited to 0 or 1, often used for locking (mutual exclusion).
- Counting Semaphore: Can take any non-negative value, ideal for managing multiple resource instances.
POSIX Semaphores in C/C++
This lab uses POSIX semaphores from <semaphore.h>
, focusing on unnamed semaphores for thread synchronization within a single process. These are paired with POSIX threads (pthreads) from <pthread.h>
.
Key Functions
sem_init(sem_t *sem, int pshared, unsigned int value)
: Initializes the semaphore;pshared = 0
for threads.sem_wait(sem_t *sem)
: Decrements the semaphore or blocks.sem_post(sem_t *sem)
: Increments the semaphore, waking a blocked thread if any.sem_destroy(sem_t *sem)
: Cleans up the semaphore.
Semaphores enable mutual exclusion and synchronization, which we’ll explore through practical examples.
Compilation
POSIX Semaphores (sem_
) are part of the Pthread library in modern glibc. Therefore, compile each program with:
Mutual Exclusion with Semaphores
Concept
Mutual exclusion prevents multiple threads from simultaneously accessing a shared resource, avoiding race conditions. A binary semaphore, initialized to 1, acts like a lock: a thread must acquire it (via sem_wait) before entering the critical section and release it (via sem_post) afterward.
Example: Protecting a Shared Counter
Consider multiple threads incrementing a shared integer. Without synchronization, concurrent updates lead to unpredictable results due to race conditions. A semaphore ensures only one thread modifies the counter at a time.
Note
Semaphore Definition
Semaphore Initialization
The third argument is the initial value of the semaphoresem
, which is 1
. For this lab, the second parameter is always 0.
Critical Section
How Semaphore Works
Before the critical section you want to protect, you place a call to the function sem_wait
. A thread that executes sem_wait
will block if the semaphore is 0. If the semaphore is nonzero, it will decrement the semaphore and proceed. After executing the code in the critical section, a thread calls sem_post
, which increments the semaphore, and a thread waiting in sem_wait
can proceed.
Code Example
#include <semaphore.h>
#include <pthread.h>
#include <iostream>
const int NUM_THREADS = 4;
sem_t sem;
int shared_counter = 0;
void* increment_counter(void* arg) {
for (int i = 0; i < 10000; i++) {
sem_wait(&sem); // Lock
shared_counter++; // Critical section
sem_post(&sem); // Unlock
}
return nullptr;
}
int main() {
sem_init(&sem, 0, 1); // Binary semaphore
pthread_t threads[NUM_THREADS];
for (int i = 0; i < NUM_THREADS; i++) {
pthread_create(&threads[i], nullptr, increment_counter, nullptr);
}
for (int i = 0; i < NUM_THREADS; i++) {
pthread_join(threads[i], nullptr);
}
sem_destroy(&sem);
std::cout << "Final counter value: " << shared_counter << std::endl; // Should be 50000
return 0;
}
Enforcing Execution Order with Semaphores
Concept
Semaphores can coordinate threads to execute in a specific order. A semaphore initialized to 0 acts as a gate: a thread waits until another thread signals completion.
Example: Ordered Printing
Two threads: one thread prints "Hello" and the other thread prints "World". We wish to print "Hello" then "World" using a semaphore.
Note
Semaphore Definition
Semaphore Initialization
The initial value must be zero.After printing "Hello", send signal on sem
Before printing "World", wait for signal on sem
```cpp
sem_wait(&sem);
// print "World"
Code Example
#include <semaphore.h>
#include <pthread.h>
#include <iostream>
sem_t sem;
void* print_hello(void* arg) {
std::cout << "Hello" << std::endl;
sem_post(&sem); // Send signal
return nullptr;
}
void* print_world(void* arg) {
sem_wait(&sem); // Wait for signal
std::cout << "World" << std::endl;
return nullptr;
}
int main() {
sem_init(&sem, 0, 0); // Start locked
pthread_t thread1, thread2;
pthread_create(&thread1, nullptr, print_hello, nullptr);
pthread_create(&thread2, nullptr, print_world, nullptr);
pthread_join(thread1, nullptr);
pthread_join(thread2, nullptr);
sem_destroy(&sem);
return 0;
}
Explanation
- Semaphore: Initialized to 0, blocking the "World" thread until "Hello" signals.
- Order: Ensures "Hello" precedes "World" every time.
Simulation of a Smart Parking Garage
Concept
A counting semaphore manages a limited number of resources, like parking spots.
Scenario Setup
- There is a parking garage with a maximum of 3 parking spots.
- A digital sign shows how many spots are available.
- A smart barrier gate allows cars in only if there’s space.
To simulate the above scenario, cars represent threads or processes trying to access a resource. The smart control system is the POSIX semaphore.
POSIX Function | What It Does in the Garage |
---|---|
sem_init(&sem, 0, 3) |
Sets the initial count to 3 (3 parking spots). |
sem_wait(&sem) |
A car arrives. If a spot is available, it enters and the count is decremented. If no spots, the car waits (blocked). |
sem_post(&sem) |
A car leaves the garage, freeing a spot. The count is incremented. |
sem_destroy(&sem) |
Shuts down the system, removes the garage logic. |
Code
#include <semaphore.h>
#include <pthread.h>
#include <iostream>
#include <unistd.h>
#include <cstdlib>
#define NUM_CARS 5
#define PARKING_SPOTS 3
sem_t sem;
void* car_thread(void* arg) {
int id = *(int*)arg;
std::cout << "Car " << id << " trying to enter" << std::endl;
sem_wait(&sem); // Get a spot
std::cout << "Car " << id << " entered" << std::endl;
sleep(rand() % 5 + 1); // Park for random time
std::cout << "Car " << id << " leaving" << std::endl;
sem_post(&sem); // Free the spot
return nullptr;
}
int main() {
sem_init(&sem, 0, PARKING_SPOTS); // 3 spots
pthread_t threads[NUM_CARS];
int ids[NUM_CARS];
for (int i = 0; i < NUM_CARS; i++) {
ids[i] = i;
pthread_create(&threads[i], nullptr, car_thread, &ids[i]);
sleep(1); // Stagger arrivals
}
for (int i = 0; i < NUM_CARS; i++) {
pthread_join(threads[i], nullptr);
}
sem_destroy(&sem);
return 0;
}
Sample Output
Car 0 trying to enter
Car 0 entered
Car 1 trying to enter
Car 1 entered
Car 2 trying to enter
Car 2 entered
Car 3 trying to enter
Car 2 leaving
Car 3 entered
Car 4 trying to enter
Car 0 leaving
Car 4 entered
Car 1 leaving
Car 3 leaving
Car 4 leaving
Explanation
- Semaphore: Starts at 3; cars wait when it reaches 0.
- Simulation: Demonstrates resource allocation and release.
4. The Producer-Consumer Problem
Concept
Producers add items to a finite buffer, and consumers remove them, requiring synchronization to handle full or empty states.
Problem Description
The Producer-Consumer problem is a classic synchronization challenge where:
- Producers create data items and place them in a shared buffer
- Consumers remove data items from the buffer and process them
- The buffer has limited capacity
- Multiple producers and consumers may work simultaneously
Challenges to Solve
- Buffer overflow: Producers must wait when buffer is full
- Buffer underflow: Consumers must wait when buffer is empty
- Race conditions: Multiple threads accessing buffer simultaneously
- Starvation: Ensuring fair access for all threads
Example: Multi-Threaded Buffer
Two producers and three consumers share a buffer of size 5, using a std::queue
.
#include <iostream>
#include <cstdio>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
#include <queue>
const int BUFFER_SIZE = 5;
const int NUM_PRODUCERS = 2;
const int NUM_CONSUMERS = 3;
const int ITEMS_PER_PRODUCER = 10;
// Shared buffer
std::queue<int> buffer;
// Synchronization primitives
sem_t empty_slots; // Counts empty slots (initially BUFFER_SIZE)
sem_t filled_slots; // Counts filled slots (initially 0)
pthread_mutex_t buffer_mutex; // Protects buffer access
void* producer(void* arg) {
int producer_id = *static_cast<int*>(arg);
for (int i = 0; i < ITEMS_PER_PRODUCER; i++) {
// Produce an item
int item = producer_id * ITEMS_PER_PRODUCER + i;
// Wait for empty slot
sem_wait(&empty_slots);
// Critical section: add item to buffer
pthread_mutex_lock(&buffer_mutex);
buffer.push(item);
int buf_size = buffer.size();
pthread_mutex_unlock(&buffer_mutex);
printf("Producer %d produced item %d (buffer size: %d)\n", producer_id, item, buf_size);
// Signal that we added an item
sem_post(&filled_slots);
// Simulate production time
usleep(100000); // 0.1 seconds
}
printf("Producer %d finished!\n", producer_id);
return nullptr;
}
void* consumer(void* arg) {
int consumer_id = *static_cast<int*>(arg);
while (true) {
// Wait for filled slot
sem_wait(&filled_slots);
// Critical section: remove item from buffer
pthread_mutex_lock(&buffer_mutex);
int item = buffer.front();
buffer.pop();
int buf_size = buffer.size();
// End of critical section
pthread_mutex_unlock(&buffer_mutex);
// Signal that we freed a slot
sem_post(&empty_slots);
if (item == -1)
break;
printf("Consumer %d consumed item %d (buffer size: %d)\n", consumer_id, item, buf_size);
// Simulate consumption time
usleep(150000); // 0.15 seconds
}
printf("Consumer %d finished!\n", consumer_id);
return nullptr;
}
int main() {
pthread_t producers[NUM_PRODUCERS];
pthread_t consumers[NUM_CONSUMERS];
int producer_ids[NUM_PRODUCERS];
int consumer_ids[NUM_CONSUMERS];
// Initialize synchronization primitives
sem_init(&empty_slots, 0, BUFFER_SIZE); // Initially all slots empty
sem_init(&filled_slots, 0, 0); // Initially no items
pthread_mutex_init(&buffer_mutex, nullptr);
std::cout << "Starting Producer-Consumer simulation..." << std::endl;
std::cout << "Buffer size: " << BUFFER_SIZE << std::endl;
std::cout << "Producers: " << NUM_PRODUCERS << ", Consumers: " << NUM_CONSUMERS << std::endl;
// Create producer threads
for (int i = 0; i < NUM_PRODUCERS; i++) {
producer_ids[i] = i;
pthread_create(&producers[i], nullptr, producer, &producer_ids[i]);
}
// Create consumer threads
for (int i = 0; i < NUM_CONSUMERS; i++) {
consumer_ids[i] = i;
pthread_create(&consumers[i], nullptr, consumer, &consumer_ids[i]);
}
// Wait for all producers to finish
for (int i = 0; i < NUM_PRODUCERS; i++) {
pthread_join(producers[i], nullptr);
}
// Send sentinel values (-1) to terminate consumers
for (int c=0; c<NUM_CONSUMERS; c++) {
sem_wait(&empty_slots);
pthread_mutex_lock(&bufer_mutex);
buffer.push(-1);
pthread_mutex_unlock(&buffer_mutex);
sem_post(&filled_slots);
}
// Wait for all consumers to finish
for (int i = 0; i < NUM_CONSUMERS; i++) {
pthread_join(consumers[i], nullptr);
}
// Cleanup
sem_destroy(&empty_slots);
sem_destroy(&filled_slots);
pthread_mutex_destroy(&buffer_mutex);
std::cout << "Simulation completed!" << std::endl;
return 0;
}
Sample Output
Starting Producer-Consumer simulation...
Buffer size: 5
Producers: 2, Consumers: 3
Producer 0 produced item 1 (buffer size: 1)
Producer 1 produced item 11 (buffer size: 2)
Consumer 1 consumed item 11 (buffer size: 0)
Consumer 0 consumed item 1 (buffer size: 1)
Producer 0 produced item 2 (buffer size: 1)
Producer 1 produced item 12 (buffer size: 2)
Consumer 2 consumed item 2 (buffer size: 1)
Consumer 0 consumed item 12 (buffer size: 0)
Producer 0 produced item 3 (buffer size: 1)
Producer 1 produced item 13 (buffer size: 2)
Consumer 1 consumed item 3 (buffer size: 1)
Consumer 2 consumed item 13 (buffer size: 0)
Producer 0 produced item 4 (buffer size: 1)
Producer 1 produced item 14 (buffer size: 2)
Consumer 0 consumed item 4 (buffer size: 1)
Consumer 1 consumed item 14 (buffer size: 0)
Producer 0 produced item 5 (buffer size: 1)
Producer 1 produced item 15 (buffer size: 2)
Consumer 2 consumed item 5 (buffer size: 1)
Consumer 0 consumed item 15 (buffer size: 0)
Producer 0 produced item 6 (buffer size: 1)
Producer 1 produced item 16 (buffer size: 2)
Consumer 1 consumed item 6 (buffer size: 1)
Consumer 2 consumed item 16 (buffer size: 0)
Producer 0 produced item 7 (buffer size: 1)
Producer 1 produced item 17 (buffer size: 2)
Consumer 0 consumed item 7 (buffer size: 1)
Consumer 1 consumed item 17 (buffer size: 0)
Producer 0 produced item 8 (buffer size: 1)
Producer 1 produced item 18 (buffer size: 2)
Consumer 2 consumed item 8 (buffer size: 1)
Consumer 0 consumed item 18 (buffer size: 0)
Producer 0 produced item 9 (buffer size: 1)
Producer 1 produced item 19 (buffer size: 2)
Consumer 1 consumed item 9 (buffer size: 1)
Consumer 2 consumed item 19 (buffer size: 0)
Producer 0 produced item 10 (buffer size: 1)
Producer 1 produced item 20 (buffer size: 2)
Consumer 0 consumed item 10 (buffer size: 1)
Consumer 1 consumed item 20 (buffer size: 0)
Producer 0 finished!
Producer 1 finished!
Consumer 2 finished!
Consumer 0 finished!
Consumer 1 finished!
Simulation completed!
How the Solution Works
- Initialization:
- The empty semaphore starts at 5 (buffer capacity), and full starts at 0 (no items initially).
- The mutex is initialized to protect the shared buffer.
- Producer Logic:
- Each producer (ID 0 and 1) generates 10 items (e.g., 1-10 for producer 0, 11-20 for producer 1).
- It waits for an empty slot, locks the buffer, adds an item, unlocks, and signals a full slot.
- Prints its ID, the item, and the buffer size after each production.
- Consumer Logic:
- Each consumer (ID 0, 1, 2) waits for a full slot, locks the buffer, removes an item, unlocks, and signals an empty slot.
- Prints its ID, the item, and the buffer size unless the item is -1, which causes it to exit.
- Main Thread:
- Creates 2 producer and 3 consumer threads.
- Waits for producers to finish (total of 20 items produced).
- Adds 3 sentinel values (-1) to the buffer to terminate the consumers.
- Waits for consumers to finish and cleans up.
Thread Synchronization
- empty_slots semaphore: Tracks available buffer space (starts at BUFFER_SIZE)
- filled_slots semaphore: Tracks items in buffer (starts at 0)
- buffer_mutex: Ensures atomic access to the shared buffer
Exercises
Exercise 1[10 points]: Enforcing Ordering for 3 Threads
Write a C++ program that creates 3 threads. The first thread prints out "Power", the second thread prints out " is ", and the third thread prints out "Knowledge". Use POSIX semaphore such that the final output is always "Knowledge is Power". [Hine: Use two semaphors]
Exercise 2[20 points]: Thread-Safe Bank Account
Write a C++ program that simulates a bank account with multiple threads performing deposits and withdrawals concurrently.
Requirements:
- Create a
BankAccount
class with:- Private balance member
deposit(amount)
methodwithdraw(amount)
method (should not allow negative balance)getBalance()
method
- Create 5 threads: 3 performing deposits, 2 performing withdrawals
- Each thread should perform 1000 operations
- Use appropriate synchronization to prevent race conditions
- Print the final balance and verify it matches expected calculations
Expected Structure:
class BankAccount {
private:
double balance;
pthread_mutex_t account_mutex;
public:
BankAccount(double initial_balance);
void deposit(double amount);
bool withdraw(double amount); // returns false if insufficient funds
double getBalance();
~BankAccount();
};
Exercise 3[20 points]: Multi-threaded File Processing
Create a program that processes multiple text files concurrently while maintaining thread safety.
Requirements:
- Create 5 text files with random numbers (one number per line)
- Use 3 worker threads to process files concurrently
- Each thread should:
- Read numbers from assigned files
- Calculate sum, count, and average
- Add results to shared data structure
- Use mutex to protect shared results
- Main thread should print final combined statistics
- Demonstrate that the multi-threaded version produces the same results as sequential processing
Shared Data Structure: