Academic Integrity: tutoring, explanations, and feedback — we don’t complete graded work or submit on a student’s behalf.

Thread Synchronization Objective: The objective of this assignment is to use sem

ID: 3595420 • Letter: T

Question

Thread Synchronization

Objective: The objective of this assignment is to use semaphores to protect the critical section between two competing threads.

Assignment: Using Threads and Mutex/Counting Semaphores The idea is to write a C program that creates two threads. The first thread is the consumer thread that consumes the data written to a shared memory buffer. The second thread is the producer thread that “produces” the data for the shared memory buffer. In order to prevent a race condition (e.g. the consumer reading before the producer writing) use a mutex semaphore and counting semaphores to coordinate when each thread can safely write or read to/from a common shared memory region. Prodcon Program Implementation The producer/consumer program (prodcon.c) that takes three arguments from the command line (no prompting the user from within the program).

1. To start the prodcon program ./prodcon where determines the overall size (multiple of 32) and total number of blocks (memsize/32) of the shared memory region. The argument indicates the number of times the producer writes to and the consumer reads from the shared memory region.

2. The main process is to create the shared memory region from the heap based upon memsize, initialize the mutex and counting semaphores and create both the producer and consumer threads then wait for both threads to finish.

3. The producer thread is to create 30 bytes of random data (0-255) then store the checksum (use the Internet checksum) in the last 2 bytes of the shared memory block. The producer is to do this ntimes synchronizing with the consumer.

4. The consumer thread is to read the shared memory buffer of 30 bytes, calculate the checksum based upon the 30 bytes and compare that with the value stored in the shared memory buffer to ensure that the data did not get corrupted. The consumer is to do this ntimes synchronizing each read with the producer.

Error Handling: Perform the necessary error checking to ensure the correct number of command-line parameters. Limit memsize to 64K and ensure ntimes is a positive integer. If the consumer detects a mismatched checksum it is to report the error along with the expected checksum and the calculated checksum and exit the program.

Grading:The program will be graded on the basic functionality, error handling and how well the implementation description was followed. Be sure to name your program prodcon.c (no extra characters, capitals) Note that documentation and style are worth 10% of the assignment's grade!

Explanation / Answer

//To use sleep() method
#ifdef __unix__
# include <unistd.h>
#elif defined _WIN32
# include <windows.h>
#define sleep(x) Sleep(1000 * x)
#endif

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>

#define SIZE 30
#define NUMBTHREADS 6
#define PRODUCERLOOPS 2

typedef int bufferType;
bufferType bufferData[SIZE];
int bufferIndex;

pthread_mutex_t buffer_mutex;

// When 0, buffer is full
sem_t fullSem;
//When 0, buffer is empty.
sem_t emptySem;

//Function to insert value to buffer
void insertBuffer(bufferType value)
{
//Checks if buffer index is less than size
if (bufferIndex < SIZE)
{
//Adds the value to the buffer
bufferData[bufferIndex++] = value;
}
//Otherwise display error message
else
{
printf("Error: Buffer overflow ");
}
}//End of function

//Function to return an item from the buffer
bufferType deleteBuffer()
{
//Checks if the buffer index is greater than zero
if (bufferIndex > 0)
{
//Returns the value at bufferIndex position
return bufferData[--bufferIndex]; // buffer_index-- would be error!
}
//Otherwise display error message
else
{
printf("Error: Buffer underflow ");
}
return 0;
}//End of function

//Function for producer
void *producerFunction(void *threadN)
{
int threadNumber = *(int *)threadN;
bufferType value;
int c = 0;
//Loops till c value is less than producer loops value
while (c++ < PRODUCERLOOPS)
{
sleep(rand() % 10);
//Generates a random value
value = rand() % 100;
// sem = 0: wait. sem > 0: go and decrement it
sem_wait(&fullSem);
/* possible race condition here. After this thread wakes up, another thread could aqcuire mutex before this one, and add to list.
Then the list would be full again and when this thread tried to insert to buffer there would be a buffer overflow error */
pthread_mutex_lock(&buffer_mutex); /* protecting critical section */
insertBuffer(value);
pthread_mutex_unlock(&buffer_mutex);
// post (increment) empty buffer semaphore
sem_post(&emptySem);
printf("Producer %d added %d to buffer ", threadNumber, value);
}
pthread_exit(0);
}//End of function

//Function for consumer
void *consumerFunction(void *threadN)
{
int threadNumber = *(int *)threadN;
bufferType value;
int c = 0;
//Loops till c value is less then producer loops value
while (c++ < PRODUCERLOOPS)
{
//calls wait semaphore
sem_wait(&emptySem);
/* there could be race condition here, that could cause buffer underflow error */
pthread_mutex_lock(&buffer_mutex);
value = deleteBuffer(value);
pthread_mutex_unlock(&buffer_mutex);
// post (increment) full buffer semaphore
sem_post(&fullSem);
printf("Consumer %d dequeue %d from buffer ", threadNumber, value);
}
pthread_exit(0);
}//End of function

//Main function definition
int main()
{
bufferIndex = 0;

pthread_mutex_init(&buffer_mutex, NULL);
// sem_t *sem
// int pshared. 0 = shared between threads of process, 1 = shared between processes
// unsigned int value. Initial value
sem_init(&fullSem, 0, SIZE);
sem_init(&emptySem, 0, 0);
/* full_sem is initialized to buffer size because SIZE number of producers can add one element to buffer each. They will wait
semaphore each time, which will decrement semaphore value. empty_sem is initialized to 0, because buffer starts empty and
consumer cannot take any element from it. They will have to wait until producer posts to that semaphore (increments semaphore value) */
pthread_t thread[NUMBTHREADS];
int threadNumber[NUMBTHREADS];
int c;
for (c = 0; c < NUMBTHREADS; )
{
threadNumber[c] = c;
// pthread_t *t
// const pthread_attr_t *attr
// void *(*start_routine) (void *)
// void *arg
pthread_create(thread + c, NULL, producerFunction, threadNumber + c);
c++;
threadNumber[c] = c;

// pthread_t *t
// const pthread_attr_t *attr
// void *(*start_routine) (void *)
// void *arg
pthread_create(&thread[c], NULL, consumerFunction, &threadNumber[c]);
c++;
}//End of for loop

for (c = 0; c < NUMBTHREADS; c++)
pthread_join(thread[c], NULL);

pthread_mutex_destroy(&buffer_mutex);
sem_destroy(&fullSem);
sem_destroy(&emptySem);

return 0;
}//End of main function

Sample Run:

Producer 0 added 67 to buffer
Consumer 1 dequeue 67 from buffer
Producer 2 added 67 to buffer
Producer 2 added 0 to buffer
Consumer 3 dequeue 0 from buffer
Consumer 3 dequeue 0 from buffer
Consumer 5 dequeue 67 from buffer
Consumer 1 dequeue 67 from buffer
Producer 0 added 0 to buffer
Producer 4 added 67 to buffer
Producer 4 added 0 to buffer
Consumer 5 dequeue 0 from buffer

Hire Me For All Your Tutoring Needs
Integrity-first tutoring: clear explanations, guidance, and feedback.
Drop an Email at
drjack9650@gmail.com
Chat Now And Get Quote