Multi-threaded Server - Mutexes - Semaphores Modify the multi-threaded server (f
ID: 3727920 • Letter: M
Question
Multi-threaded Server - Mutexes - Semaphores
Modify the multi-threaded server (file server.c) such that it is created with a pool of
N threads and a buffer of M ints. The sever validates the argument:
$ ./server
usage: ./server N-threads M-items
file server.c:
#include <stdbool.h>
#include <semaphore.h>
#include <stdlib.h>
#include <pthread.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#define N 2 // number of threads
volatile int buffer;
pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; // for buffer access
sem_t semaphore; // controls the pool of threads
static void * worker(void *arg) {
// input
int j;
while (true) {
// block until work is available
if (sem_wait(&semaphore)) {
fprintf(stderr, "sem_wait: %s ", strerror(errno));
exit(EXIT_FAILURE);
}
// get exclusive access to buffer
if (pthread_mutex_lock(&mtx)) {
fprintf(stderr, "pthread_mutex_lock: %s ", strerror(errno));
exit(EXIT_FAILURE);
}
// store input
j = buffer;
// release exclusive access to buffer
if (pthread_mutex_unlock(&mtx)) {
fprintf(stderr, "pthread_mutex_unlock: %s ", strerror(errno));
exit(EXIT_FAILURE);
}
// process data
fprintf(stdout, "Processing: %d ", j);
sleep(10);
fprintf(stdout, "Done with: %d ", j);
}
}
int main() {
// input
int j;
// initialize semaphore
if (sem_init(&semaphore, 0, 0)) {
fprintf(stderr, "sem_init: %s ", strerror(errno));
exit(EXIT_FAILURE);
}
// create pool of N detached threads
pthread_t t;
for (int i=0; i<N; ++i) {
if (pthread_create(&t, NULL, worker, NULL)) {
fprintf(stderr, "pthread_create: %s ", strerror(errno));
exit(EXIT_FAILURE);
}
if (pthread_detach(t)) {
fprintf(stderr, "pthread_detach: %s ", strerror(errno));
exit(EXIT_FAILURE);
}
}
// main loop
while (fscanf(stdin, "%d", &j)!=EOF) {
// get exclusive access to buffer
if (pthread_mutex_lock(&mtx)) {
fprintf(stderr, "pthread_mutex_lock: %s ", strerror(errno));
exit(EXIT_FAILURE);
}
// load input into buffer
buffer = j;
// release exclusive access to buffer
if (pthread_mutex_unlock(&mtx)) {
fprintf(stderr, "pthread_mutex_unlock: %s ", strerror(errno));
exit(EXIT_FAILURE);
}
// unlock one thread
if (sem_post(&semaphore)) {
fprintf(stderr, "sem_post: %s ", strerror(errno));
exit(EXIT_FAILURE);
}
}
return(0);
}
Example output:
When the buffer has reached its full capacity, inputs are ignored until room is available.
In the following scenario, inputs 5 and 6 are lost because the buffer is full:
$ ./server 2 2
*** 2 threads, 2 buffered items
1 2
Processing: 1
Processing: 2
3 4
5 6
buffer is full!
buffer is full!
Done with: 1
Processing: 3
Done with: 2
Processing: 4
7
Done with: 3
Processing: 7
Done with: 4
Done with: 7
Here is another normal execution:
./server 5 10
*** 5 threads, 10 buffered items
1 2 3 4 5
1
Processing: 1
Processing: 2
Processing: 3
Processing: 4
Processing: 5
6 7 8 9 10 11 12 13 14 15
Done with: 1
Done with: 2
Processing: 7
Done with: 3
Done with: 5
Processing: 8
Processing: 6
Processing: 9
Done with: 4
Processing: 10
Done with: 7
Done with: 9
Processing: 12
Processing: 11
Done with: 6
Processing: 13
Done with: 8
Processing: 14
Done with: 10
Processing: 15
Done with: 12
Done with: 11
Done with: 13
Done with: 14
Done with: 1
Explanation / Answer
// multiple producers & consumers
#include <pthread.h>
#include <stdio.h>
#include <semaphore.h>
#include <stdlib.h>
#include <iostream>
#include <unistd.h>
#define MAX 15
int buffer[MAX];
int fill = 0;
int use = 0;
void put(int value) {
buffer[fill] = value;
fill = (fill + 1) % MAX;
}
int get() {
int b = buffer[use];
use = (use + 1) % MAX;
return b;
}
int loops = 0;
sem_t empty;
sem_t full;
sem_t mutex;
void *producer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
sem_wait(∅);
sem_wait(&mutex;);
put(i);
sem_post(&full;);
printf("put %d ", i); fflush(NULL);
sem_post(&mutex;);
sleep(1);
}
pthread_exit(NULL);
}
void *consumer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
sem_wait(&full;);
sem_wait(&mutex;);
int b = get();
sem_post(∅);
printf("get %d ", b); fflush(NULL);
sem_post(&mutex;);
}
pthread_exit(NULL);
}
int main(int argc, char *argv[])
{
if(argc < 2 ){
printf("Needs 2nd arg for loop count variable. ");
return 1;
}
loops = atoi(argv[1]);
if(loops > MAX){
printf("Max allowed arg is %d ", MAX);
return 1;
}
sem_init(∅, 0, MAX); // MAX buffers are empty to begin with...
sem_init(&full;, 0, 0); // ... and 0 are full
sem_init(&mutex;, 0, 1); // mutex = 1 since it a lock
pthread_t pThread, cThread;
pthread_create(&pThread;, 0, producer, 0);
pthread_create(&cThread;, 0, consumer, 0);
pthread_join(pThread, NULL);
pthread_join(cThread, NULL);
int emptyValue, fullValue, mutexValue;
sem_getvalue(∅, &emptyValue;);
sem_getvalue(&full;, &fullValue;);
sem_getvalue(&mutex;, &mutexValue;);
if(emptyValue == MAX) sem_destroy(∅);
else std::cout << "Error destroying empty. Value is " << emptyValue << std::endl;
if(fullValue == 0) sem_destroy(&full;);
else std::cout << "Error destroying full. Value is " << fullValue << std::endl;
if(mutexValue == 1) sem_destroy(&mutex;);
else std::cout << "Error destroying mutex. Value is " << mutexValue << std::endl;
return 0;
}
Related Questions
Navigate
Integrity-first tutoring: explanations and feedback only — we do not complete graded work. Learn more.