OBJECTIVE The purpose of this programming project is to explore process synchron
ID: 3854324 • Letter: O
Question
OBJECTIVE
The purpose of this programming project is to explore process synchronization. This will be accomplished by writing a simulation to the Producer / Consumer problem described below. Your simulation will be implemented using Pthreads. This simulation is a modification to the programming project found at the end of Chapter 6 and most of the text of this project specification is directly from that project description. Name the program osproj3.cpp or osproj3.c and the executable as osproj3.
THE PRODUCER / CONSUMER PROBLEM
In Chapter 3, we developed a model of a system consisting of cooperating sequential processes or threads, all running asynchronously and possibly sharing data. We illustrated this model with the producer - consumer problem, which is representative of operating systems. Specifically, in Section 3.4.1 we discussed how a "bounded buffer" could be used to enable processes to share memory.
In Chapter 3, we described a technique using a circular buffer that can hold BUFFER_SIZE-1 items. By using a shared memory location count, the buffer can hold all BUFFER_SIZE items. This count is initialized to 0 and is incremented every time an item is placed into the buffer and decremented every time an item is removed from the buffer. The count data item can also be implemented as a counting semaphore.
The producer can place items into the buffer only if the buffer has a free memory location to store the item. The producer cannot add items to a full buffer. The consumer can remove items from the buffer if the buffer is not empty. The consumer must wait to consume items if the buffer is empty.
The "items" stored in this buffer will be integers. Your producer process will have to insert random numbers into the buffer. The consumer process will consume a number and detect if the number is prime.
PROJECT SPECIFICATIONS
The buffer used between producer and consumer processes will consist of a fixed-size array of type buffer_item. The queue of buffer_item objects will be manipulated using a circular array. The buffer will be manipulated with two functions, buffer_insert_item() and buffer_remove_item(), which are called by the producer and consumer threads, respectively. A skeleton outlining these function can be found in buffer.h.
The buffer_insert_item() and buffer_remove_item() functions will synchronize the producer and consumer using the algorithms similar to those in in Figure 6.10 and 6.11. The buffer will also require an initialization function (not supplied in buffer.h) that initializes the mutual exclusion object "mutex" along with the "empty" and "full" semaphores.
The producer thread will alternate between sleeping for a random period of time and generating and inserting (trying to) an integer into the buffer. Random numbers will be generated using the rand_r() function. The sleep function used must be a "thread safe" sleep function. See the text on page 276 for an overview of the producer algorithm.
The consumer thread will alternate between sleeping for a random period of time (thread safe of course) and (trying to) removing a number out of the buffer. The number removed will then be verified if it is prime. See the text on page 276 for an overview of the consumer algorithm.
The main function will initialize the buffer and create the separate producer and consumer threads. Once it has created the producer and consumer threads, the main() function will sleep (thread safe) for duration of the simulation. Upon awakening, the main thread will signal other threads to quit by setting a simulation flag which is a global variable. The main thread will join with the other threads and then display the simulation statistics. The main() function will be passed five parameters on the command line:
The length of time the main thread is to sleep before terminating (simulation length in seconds)
The maximum length of time the producer and consumer threads will sleep prior to producing or consuming a buffer_item
The number of producer threads
The number of consumer threads
A "yes" or "no" to output the individual buffer snapshots for each item produced and consumed
A skeleton for the main function appears as:
Creating Pthreads using the Pthreads API is discussed in Chapter 4 and in supplemental notes provided online. Please refer to those references for specific instructions regarding creation of the producer and consumer Pthreads.
The following code sample illustrates how mutex locks available in the Pthread API can be used to protect a critical section:
Pthreads uses the pthread_mutex_t data type for mutex locks. A mutex is created with the pthread_mutex_init() function, with the first parameter being a pointer to the mutex. By passing NULL as a second parameter, we initialize the mutex to its default attributes. The mutex is acquired and released with the pthread_mutex_lock() and pthread_mutex_unlock() functions. If the mutex lock is unavailable when pthread_mutex_lock() is invoked, the calling thread is blocked until the owner invokes pthread_mutex_unlock(). All mutex functions return a value of 0 with correct operaton; if an error occurs, these functions return a nonzero error code.
Pthreads provides two types of semaphores: named and unnamed. For this project, we will use unnamed semaphores. The code below illustrates how a semaphore is created:
The sem_init() function creates and initializes a semaphore. This function is passed three parameters: A pointer to the semaphore, a flag indicating the level of sharing, and the semaphore's initial value. In this example, by passing the flag 0, we are indicating that this semaphore can only be shared by threads belonging to the same process that created the semaphore. A nonzero value would allow other processes to access the semaphore as well. In this example, we initialize the semaphore to the value 5.
In Section 6.5, we described the classical wait() and signal() semaphore operations. Pthread names the wait() and signal() operations sem_wait() and sem_post(), respectively. The code example below creates a binary semaphore mutex with an initial value 1 and illustrates it use in protecting a critical section:
PROGRAM OUTPUT
Output for this simulation is critical to verify that your simulation program is working correctly. Use this sample as to determine what your simulation should output when various conditions occur (buffer empty/full, location of next producer/consumer, etc.) Your program output format should be identical to the following:
The following is buffer.h:
#ifndef _BUFFER_H_DEFINED_
#define _BUFFER_H_DEFINED_
typedef int buffer_item;
#define BUFFER_SIZE 5
bool buffer_insert_item( buffer_item item );
bool buffer_remove_item( buffer_item *item );
#endif // _BUFFER_H_DEFINED_
Im really having trouble getting started on this. If I could get some help, I would really appreciate it. Thanks!
Explanation / Answer
main.cpp
#include <iostream>
#include <cstring>
#include <string>
#include <cstdlib>
#include "buffer.h"
#include "stats.h"
#include <semaphore.h>
#include <pthread.h>
#include <unistd.h>
using namespace std;
//Structs
struct ARGS //struct to hold args from command line
{
int simLength; //simulation length
int maxThreadSleep; //maximum sleep time for thread
int numProducers; //number of producers
int numConsumers; //number of consumers
bool outputBufferSnapshots; //true/false output buffer snapeshots
};
//Global args
ARGS global_args; //create an instance of arguments struct
bool runSim = true; //flag to indicate that the simluation should stop when false
//function prototype
void arguments(int, char **);
bool checkPrime(buffer_item);
void *producer(void * param);
void *consumer(void * param);
int main( int argc, char *argv[] )
{
//Get command line arguments
arguments(argc, argv);
//Store simulation statistics
Stats simulationStats(global_args.simLength, global_args.maxThreadSleep,
global_args.numProducers, global_args.numConsumers,
BUFFER_SIZE);
//Initialize the buffer
buffer_initialize();
//========= Threads =========
pthread_t tidP[global_args.numProducers]; //producer thread identifier array
pthread_t tidC[global_args.numConsumers]; //consumer thread identifier array
pthread_attr_t attr; //set of thread attributes
pthread_attr_init( &attr ); //get the default attributes
cout << "Starting Threads... ";
//Create producer thread(s)
for(int i=0; i < global_args.numProducers; i++)
{
pthread_create( &tidP[i], &attr, producer, NULL);
}
//Create consumer thread(s)
for(int i=0; i < global_args.numConsumers; i++)
{
pthread_create( &tidC[i], &attr, consumer, NULL);
}
//Sleep
sleep(global_args.simLength);
//Signal Threads to quit
//cout << "simulation stopping ";
runSim = false;
//Used to accept array from threads
int *tempStats;
//Join Threads
//cout << "Seg fault here may be caused by no mutex/semaphore "; //debug statement here
//cout << "join producers ";
for(int i=0; i < global_args.numProducers; i++)
{
//cout << "join producer #" << i << " ";
//Store stats for each thread
tempStats = (int*) pthread_join( tidP[i], NULL );
simulationStats.totalThreadNumProduced[i] = tempStats[0];
simulationStats.numTimesBufferFull += tempStats[1];
}
//cout << "done join producers ";
//cout << "join consumers ";
for(int i=0; i < global_args.numConsumers; i++)
{
//Store stats for each thread
tempStats = (int*) pthread_join( tidC[i], NULL );
simulationStats.totalThreadNumConsumed[i] = tempStats[0];
simulationStats.numTimesBufferEmpty += tempStats[1];
}
//cout << "done join consumers ";
//Grab number of items remaining in buffer
simulationStats.numItemsRemaining = bufferCount;
//Display Statistics
simulationStats.print();
//Exit
return 0;
}
/********************************************************************
//
// Arguments function
//
// This function takes in the command line arguments and
// places them into a struct.
//
// Return Value
// ------------
// void
//
// Value Parameters
// ----------------
// argc int Number of arguments in argv
//
// Reference Parameters
// --------------------
// argv char Array of char arrays. Provides command line args
//
*******************************************************************/
void arguments(int argc, char *argv[])
{
if(argc < 6)
{
cerr << "5 arguments are needed. The arguments should be ";
cerr << "in the following order: simulation length ";
cerr << "max thread sleep number of producers number ";
cerr << "of consumers yes or no for outputting buffer snapshots ";
exit(-1);
}
global_args.simLength = atoi(argv[1]);
global_args.maxThreadSleep = atoi(argv[2]);
global_args.numProducers = atoi(argv[3]);
global_args.numConsumers = atoi(argv[4]);
if(strcmp(argv[5],"yes") == 0)
global_args.outputBufferSnapshots = 1;
else if(strcmp(argv[5],"no") == 0)
global_args.outputBufferSnapshots = 0;
else
{
cerr << "The last argument must be 'yes' or 'no' ";
exit(-1);
}
}
/********************************************************************
//
// Check Prime Function
//
// This function determines if a number is prime
//
// Return Value
// ------------
// bool True/False if numer is prime
//
// Value Parameters
// ----------------
// in buffer_item The value
//
// Local Variables
// ---------------
// lim int Limit to go to checking for a prime number
// result int Temporary result of mod operation
//
*******************************************************************/
bool checkPrime(buffer_item in)
{
int lim = in/2;
int result;
for(int i=2; i<=lim; i++)
{
result = in % i;
if(result == 0)
return false;
}
return true;
}
/********************************************************************
//
// Producer function
//
// Producer function to produce numbers and put them into the buffer
//
// Return Value
// ------------
// void
//
// Reference Parameters
// --------------------
// void
// thread returns stats value
//
// Local Variables
// ---------------
// stats int holds stats for # of times buffer is full
// & number of items produced.
//
*******************************************************************/
void *producer(void * param)
{
//Variables
int * stats = new int[2]; //used to return stats
int time; //holds random number for time to sleep
stats[0] = 0; //holds # of items produced
stats[1] = 0; //holds # of times buffer is full
buffer_item bItem; // this is my item
while(runSim)
{
//Sleep for a random amount of time.
time = rand();
time = time % global_args.maxThreadSleep;
sleep(time);
//Generate a random number
bItem = random();
bItem %= 9999;
//bItem = bItem % 99;
//cout << "Producer item: " << bItem << endl << endl;
//Check to see if buffer is full
if(bufferCount == BUFFER_SIZE)
{
cout << "All buffers full. Producer " << pthread_self();
cout << " waits. ";
stats[1]=(stats[1] + 1);
}
//Wait
sem_wait( &bufferEmpty );
//Lock
pthread_mutex_lock( &mutex );
//Produce - CRITICAL SECTION
if(runSim && buffer_insert_item(bItem))
{
cout << "Producer " << pthread_self() << " writes ";
cout << bItem << endl;
stats[0]= (stats[0] + 1); //add # of items produced
if(global_args.outputBufferSnapshots) //if snapshots enabled
buffer_print(); //print snapshots
}
//unlock
pthread_mutex_unlock( &mutex );
//signal
sem_post( &bufferEmpty );
}
pthread_exit( (void*)stats ); //return stats array
}
/********************************************************************
//
// Consumer function
//
// Consumer function to consume numbers and remove them from the buffer
//
// Return Value
// ------------
// void
//
// Reference Parameters
// --------------------
// void
// thread returns stats value
//
// Local Variables
// ---------------
// stats int holds stats for # of times buffer is empty
// & number of items consumed.
//
*******************************************************************/
void *consumer(void * param)
{
//Variables
buffer_item bItem; //bItem to store received item
int time; //int to hold random time to sleep
int * stats = new int[2]; //array explain on next two lines
stats[0] = 0; //holds # of items consumed
stats[1] =0; //holds # of times buffer was empty
while(runSim)
{
//Sleep for a random amount of time
time = rand();
time = time % global_args.maxThreadSleep;
sleep(time);
//Check if buffers are empty
if(bufferCount == 0)
{
cout << "All buffers empty. Consumer ";
cout << pthread_self();
cout << " waits. ";
stats[1]= (stats[1] + 1); //add one time buffer was empty
}
//Check semaphore
sem_wait( &bufferFull );
//Lock
pthread_mutex_lock( &mutex );
//Consume
if(runSim && buffer_remove_item( &bItem)) //consumes in if statement
{
//line 1
cout << "Consumer " << pthread_self() << " reads ";
cout << bItem;
stats[0] = (stats[0]+1); //increase # of items consumed
if(checkPrime(bItem)) //find if # was prime
cout << " * * * PRIME * * *";
cout << endl;
//if snapshots are enabled
if(global_args.outputBufferSnapshots)
buffer_print();
}
//Unlock
pthread_mutex_unlock( &mutex );
//Signal
sem_post( &bufferFull );
}
pthread_exit( (void*)stats ); //return stats for this thread
}
buffer.h
/********************************************************************
//
// Christopher Carlisle
// Operating Systems
// Programming Project #3: Process Synchronization Using Pthreads:
// The Producer / Consumer Problem With
// Prime Number Detector
// November 23, 2009
// Instructor: Dr. Ajay K. Katangur
//
********************************************************************/
#ifndef _BUFFER_H_DEFINED_
#define _BUFFER_H_DEFINED_
#include <iostream>
#include <iomanip>
#include <pthread.h>
#include <semaphore.h>
using namespace std;
typedef int buffer_item;
#define BUFFER_SIZE 5
buffer_item global_buffer[BUFFER_SIZE];
unsigned int bufferCount=0; //keeps track of number of items in buffer
unsigned int buffer_In_index=0; //index position for putting stuff in buffer
unsigned int buffer_Out_index=0; //index position for pulling stuff out of buffer
//function prototypes
bool buffer_insert_item( buffer_item item );
bool buffer_remove_item( buffer_item *item );
void buffer_print();
void buffer_initialize();
//Semphores and mutexs
sem_t bufferEmpty;
sem_t bufferFull;
pthread_mutex_t mutex;
/********************************************************************
//
// Buffer Initialize
//
// This function initializes the semaphores, mutexs and buffer
// positions to -1
//
// Return Value
// ------------
// void
//
*******************************************************************/
void buffer_initialize()
{
sem_init(&bufferEmpty, 0, BUFFER_SIZE );
sem_init( &bufferFull, 0, 0 );
pthread_mutex_init( &mutex, NULL );
for(int i=0 ; i < BUFFER_SIZE; i++)
{
global_buffer[i] = -1; //sets all buffer positions to -1
}
}
/********************************************************************
//
// Buffer Insert Item Function
//
// This function inserts an item into the buffer moves
// the buffer index and adds one to the count of the buffer
//
// Return Value
// ------------
// bool True that function succeeded
//
// Value Parameters
// ----------------
// item buffer_item Value to place into buffer
//
*******************************************************************/
bool buffer_insert_item( buffer_item item )
{
cout << "Item: " << (int) item << endl; //debug print statement
//put item into buffer
global_buffer[buffer_In_index] = item;
//move buffer in index plus one in circular fashion
buffer_In_index = (buffer_In_index + 1) % BUFFER_SIZE;
//cout << bufferCount << endl; //debug output buffercount
//increase buffer count since item was inserted
bufferCount++;
return true;
}
/********************************************************************
//
// Buffer Remove Item Function
//
// This function removes an item from the buffer, moves
// the buffer index, and subtracts one from the count of the buffer
//
// Return Value
// ------------
// bool True that function succeeded
//
// Reference Parameters
// ----------------
// item buffer_item Value to remove from buffer
//
*******************************************************************/
bool buffer_remove_item( buffer_item *item )
{
//Grab item from buffer
*item = global_buffer[buffer_Out_index];
//Move out to next index position in circular fashion
buffer_Out_index = ( buffer_Out_index + 1) % BUFFER_SIZE;
//Decrease number of items in buffer
bufferCount--;
return true;
}
/********************************************************************
//
// Linear Search Function
//
// This function prints out the number of buffers occupied
// The buffers contents. And it also prints the position of
// the in and out indexes.
//
// Return Value
// ------------
// void
//
*******************************************************************/
void buffer_print()
{
//line 1
cout << "(buffers occupied: " << (uint) bufferCount << ") ";
//line 2
cout << "buffers: ";
for(int i=0; i < BUFFER_SIZE; i++)
cout << " " << (uint) global_buffer[i] << " "; //print #'s in buffer
cout << endl; //start a new line
// Third line of output. puts dashes under numbers
cout << " ";
for(int i=0; i < BUFFER_SIZE; i++)
cout << " ----";
cout << endl;
// Fourth line of output. Shows position of in & out indexes
cout << " ";
for(int i=0; i < BUFFER_SIZE; i++)
{
cout << " ";
if(buffer_In_index == i || buffer_Out_index == i)
{
if( bufferCount == BUFFER_SIZE )
cout << "WR";
else if( bufferCount == 0 )
cout << "RW";
else if( buffer_In_index == i )
cout << "W ";
else
cout << "R ";
cout << " ";
}
else
cout << " ";
}
cout << endl << endl;
}
#endif // _BUFFER_H_DEFINED_
stats.h
#ifndef STATS_H
#define STATS_H
#include <iostream>
using namespace std;
class Stats
{
private:
//vars
int simulationTime; //holds simulation time
int maxThreadSleepTime; //holds max sleep time for threads
int numProducerThreads; //holds # of producer threads
int numConsumerThreads; //holds # of consumer threads
int sizeOfBuffer; //holds size of the buffer
int totalNumConsumed; //gets # of items consumed
int totalNumProduced; //gets # of items produced
public:
//vars
int *totalThreadNumProduced; //holds # of items produced in array for each thread
int *totalThreadNumConsumed; //holds # of items consumed in array for each thread
int numTimesBufferFull; //total # of times the buffer was full
int numTimesBufferEmpty; //total # of times the buffer was empty
int numItemsRemaining; //total # of items that were left in the buffer
//functions
void print(); //does the output at the end
Stats(int, int, int, int, int); //Constructor to set all the stuff we get from cli
~Stats(); //destructor that gets rid of dynamic elements
};
/********************************************************************
//
// Stats Constructor Function
//
// This function initializes all of the values passed in by the
// command line arguments
//
// Return Value
// ------------
// none
//
// Value Parameters
// ----------------
// _simTime int simulation time
// _maxSleep int maximum sleep time
// _numProducerThreads int number of producer threads
// _numConsumerThreads int number of consumer threads
// _bufferSize int size of buffer
//
*******************************************************************/
Stats::Stats(int _simTime, int _maxSleep, int _numProducerThreads, int _numConsumerThreads, int _bufferSize)
{
//Vars
simulationTime = _simTime;
maxThreadSleepTime = _maxSleep;
numProducerThreads = _numProducerThreads;
numConsumerThreads = _numConsumerThreads;
sizeOfBuffer = _bufferSize;
//Allocate memory
totalThreadNumProduced = new int[_numProducerThreads];
totalThreadNumConsumed = new int[_numConsumerThreads];
}
/********************************************************************
//
// ~Stats Function
//
// This function deletes pointers
//
// Return Value
// ------------
// none
//
*******************************************************************/
Stats::~Stats()
{
delete [] totalThreadNumProduced;
delete [] totalThreadNumConsumed;
}
/********************************************************************
//
// Print Function
//
// This function prints out a summary of stats at the end of
// the program
//
// Return Value
// ------------
// void
//
*******************************************************************/
void Stats::print()
{
cout << "PRODUCER / CONSUMER SIMULATION COMPLETE ";
cout << "======================================= ";
cout << "Simulation Time:" << setw (24) << simulationTime << endl;
cout << "Maximum Thread Sleep Time:" << setw(16) << maxThreadSleepTime << endl;
cout << "Number of Producer Threads:" << setw(16) << numProducerThreads << endl;
cout << "Number of Consumer Threads:" << setw(16) << numConsumerThreads << endl;
cout << "Size of Buffer:" << setw(24) << sizeOfBuffer << " ";
//Producer threads
int totalProd = 0;
//-----sum up the amount of items produced by all threads
for(int i=0; i < numProducerThreads; i++)
totalProd += totalThreadNumProduced[i];
//-----display total found above
cout << "Total Number of Items Produced:" << setw(11) << totalProd << endl;
//-----for each producer thread show # of items produced
for(int i=0; i < numProducerThreads; i++)
{
cout << " Thread " << setw(2);
cout << (i+1) << ":" << setw(30);
cout << totalThreadNumProduced[i] << endl;
}
cout << endl;
//Consumer threads
int totalCons = 0;
//------sum up the amount of items consumed by all threads
for(int i=0; i < numConsumerThreads; i++)
totalCons += totalThreadNumConsumed[i];
//------display total # of items consumed
cout << "Total Number of Items Consumed:" << setw(11) << totalCons << endl;
//-----for each consumer thread show # of items produced
for(int i=0; i < numConsumerThreads; i++)
{
cout << " Thread " << setw(2);
cout << (i+numProducerThreads+1) << ":" << setw(30);
cout << totalThreadNumConsumed[i] << endl;
}
cout << endl;
cout << "Number Of Items Remaining in Buffer:" << setw(6) << numItemsRemaining << endl;
cout << "Number Of Times Buffer Was Full:" << setw(10) << numTimesBufferFull << endl;
cout << "Number Of Times Buffer Was Empty:" << setw(9) << numTimesBufferEmpty << endl;
}
/*
=======================================
Simulation Time: 30
Maximum Thread Sleep Time: 3
Number of Producer Threads: 2
Number of Consumer Threads: 2
Size of Buffer: 5
Total Number of Items Produced: 50000
Thread 1: 30000
Thread 2: 20000
Total Number of Items Consumed: 60000
Thread 3: 40000
Thread 4: 20000
Number Of Items Remaining in Buffer: 0
Number Of Times Buffer Was Full: 3
Number Of Times Buffer Was Empty: 4
*/
#endif
Related Questions
Navigate
Integrity-first tutoring: explanations and feedback only — we do not complete graded work. Learn more.