/*
* counter.c
* Adam Carpenter - 2017 - acarpent - acarpenter@email.wm.edu
* Utilizes code from P. Kearns
* Utilizes code from linuxthreads demonstration "prodcons.c"
*/
// **** Library inclusions ****
#include<errno.h>
#include<pthread.h>
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<time.h>
#include<unistd.h>
// **** Definitions for counter ****
#define FALSE 0
#define LEN_LINE 2048
#define LEN_FILENAME 4096
#define MAX_THREADS 26
#define TRUE 1
#define USAGE "counter: usage: counter -b num_lines -t max_counters -d file_delay -D thread_delay [file]...\n"
// **** Structure declarations ****
struct prodConBuff {
/*
* struct prodConBuff
*/
pthread_mutex_t bufferLock;// Mutex ensures exclusive access to buffer.
int readPos; // Position for reading.
int writePos; // Position for writing.
pthread_cond_t notEmpty; // Signaled when buffer isn't empty.
pthread_cond_t notFull; // Signaled when buffer isn't full.
// char *lines[];
char **lines; // Array of strings serves as data container.
};
struct NODE {
/*
* struct NODE
*/
char *word; // char *word or word[LEN_LINE]
long count;
struct NODE *next;
};
struct linkedList {
/*
* struct linkedList
*/
pthread_mutex_t listLock; // Mutex ensures exclusive access to linked list.
struct NODE *head; // Pointer to first node in list.
};
// **** Global variables ****
static char nextThread;
static int fileDelay;
static int maxCounters;
static int moreStuff;
static int numLines;
static int threadDelay;
static pthread_t threadPool[MAX_THREADS]; // Bad Threadpool.
struct prodConBuff buffer;
struct linkedList evenList;
struct linkedList oddList;
// **** Structure initializations ****
static void initBuffer(struct prodConBuff *buffer) {
/*
* initBuffer() initializes the buffer structure to an empty array ready for
* producer-consumer actions.
* The buffer of lines is initialized to NULL. As the program runs, if
* slots are empty they will be malloc'd to fit given lines being inserted
* into the buffer, and freed as they are taken out.
*/
int i;
pthread_mutex_init(&buffer->bufferLock, NULL);
pthread_cond_init(&buffer->notEmpty, NULL);
pthread_cond_init(&buffer->notFull, NULL);
buffer->readPos = 0;
buffer->writePos = 0;
buffer->lines = (char **) calloc(numLines, LEN_LINE);
for (i = 0; i < numLines; i++) {
buffer->lines[i] = (char *) calloc(LEN_LINE, sizeof(char));
}
if (numLines == 1) {
buffer->lines[0][0] = '\0';
}
}
static void initLinkedList(struct linkedList *list) {
/*
* initLinkedList() initializes the linked list structure (its lock and its
* head node.)
*/
pthread_mutex_init(&list->listLock, NULL);
list->head = NULL;
}
// **** Modifier and thread functions ****
static void sleeper(const struct timespec *req, struct timespec *rem) {
// Researched methods of maintaining nanosleep times online and
// found an interesting method using recursion that was similar to this.
struct timespec newRem;
if (nanosleep(req, rem) < 0) {
sleeper(rem, &newRem);
}
}
static void quitThread() {
/*
* quitThread()
*/
#ifdef atcdebug
fprintf(stderr, "counter: thread quit because no more stuff.\n");
#endif
nextThread--;
pthread_exit(NULL);
}
static int putInBuffer(struct prodConBuff *buffer, char *line) {
/*
* putInBuffer()
*/
int newThread;
newThread = FALSE;
pthread_mutex_lock(&buffer->bufferLock);
// If buffer is full, spawn a new counter thread.
if ((buffer->writePos + 1) % numLines == buffer->readPos) {
newThread = TRUE;
}
// Wait until buffer isn't full anymore.
while ((buffer->writePos + 1) % numLines == buffer->readPos) {
pthread_cond_wait(&buffer->notFull, &buffer->bufferLock);
#ifdef atcdebug
fprintf(stderr, "reader: waiting for space...\n");
#endif
}
#ifdef atcdebug
fprintf(stderr, "reader: gonna insert in slot: %d\n", buffer->writePos);
#endif
// Copy the text into the next slot in the buffer.
strncpy(buffer->lines[buffer->writePos], line, LEN_LINE);
buffer->writePos++;
if (buffer->writePos >= numLines) {
buffer->writePos = 0;
}
#ifdef atcdebug
fprintf(stderr, "reader: buffer is now: { ");
int i;
for (i = 0; i < numLines; i++) {
if (i == buffer->writePos) {
fprintf(stderr, "*");
}
fprintf(stderr, "%s ", buffer->lines[i]);
}
fprintf(stderr, "}\n\n");
#endif
// Signal buffer isn't empty anymore and unlock mutex.
pthread_cond_signal(&buffer->notEmpty);
pthread_mutex_unlock(&buffer->bufferLock);
return newThread;
}
static int putInSingleBuffer(struct prodConBuff *buffer, char *line) {
/*
* putInSingleBuffer(0)
*/
int newThread;
pthread_mutex_lock(&buffer->bufferLock);
newThread = FALSE;
// If slot is taken, spawn a new counter thread
if (buffer->lines[0][0] != '\0') {
newThread = TRUE;
}
// Wait until slot isn't full anymore.
while (buffer->lines[0][0] != '\0') {
pthread_cond_wait(&buffer->notFull, &buffer->bufferLock);
}
// Insert line into buffer.
strncpy(buffer->lines[buffer->writePos], line, LEN_LINE);
// Signal buffer slot isn't empty anymore and unlock mutex.
pthread_cond_signal(&buffer->notEmpty);
pthread_mutex_unlock(&buffer->bufferLock);
return newThread;
}
static void getFromBuffer(struct prodConBuff *buffer, char *line) {
/*
* getFromBuffer()
*/
pthread_mutex_lock(&buffer->bufferLock);
// If buffer is 'empty' and nothing more will be written, quit.
if (!moreStuff && buffer->readPos == buffer->writePos) {
quitThread();
}
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
// Wait until buffer isn't empty anymore.
while (buffer->readPos == buffer->writePos) {
#ifdef atcdebug
fprintf(stderr, "counter: waiting for stuff...\n");
#endif
pthread_cond_wait(&buffer->notEmpty, &buffer->bufferLock);
}
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
#ifdef atcdebug
fprintf(stderr, "counter: getting from slot: %d\n", buffer->readPos);
#endif
// Get line from buffer and advance read position.
strncpy(line, buffer->lines[buffer->readPos], LEN_LINE);
buffer->readPos++;
if (buffer->readPos >= numLines) buffer->readPos = 0;
#ifdef atcdebug
fprintf(stderr, "counter: buffer is now: { ");
int i;
for (i = 0; i < numLines; i++) {
if (i == buffer->readPos) {
fprintf(stderr, "*");
}
fprintf(stderr, "%s ", buffer->lines[i]);
}
fprintf(stderr, "}\n\n");
#endif
// Signal buffer isn't full anymore and unlock mutex.
pthread_cond_signal(&buffer->notFull);
pthread_mutex_unlock(&buffer->bufferLock);
}
static void getFromSingleBuffer(struct prodConBuff *buffer, char *line) {
/*
* getFromSingleBuffer();
*/
pthread_mutex_lock(&buffer->bufferLock);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
// Wait until buffer isn't empty anymore.
while (buffer->lines[0][0] == '\0') {
// If buffer slot is empty and nothing more will be written to it, quit.
if ((buffer->lines[0][0] == 0) && !moreStuff) {
quitThread();
}
pthread_cond_wait(&buffer->notEmpty, &buffer->bufferLock);
}
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
// Get line from buffer slot.
strncpy(line, buffer->lines[0], LEN_LINE);
buffer->lines[0][0] = '\0';
// Signal buffer slot isn't full anmore and unlock mutex.
pthread_cond_signal(&buffer->notFull);
pthread_mutex_unlock(&buffer->bufferLock);
}
static void putInList(struct linkedList *list, char *word) {
/*
* putInList()
*/
struct NODE *current;
struct NODE *new;
pthread_mutex_lock(&list->listLock);
#ifdef atcdebug
fprintf(stderr, "counter: gonna insert into list\n");
#endif
if (list->head == NULL) {
new = (struct NODE *) malloc(sizeof(struct NODE));
new->word = (char *) malloc(strlen(word));
strcpy(new->word, word);
new->count = 1;
list->head = new;
}
else {
for (current = list->head; current != NULL; current = current->next) {
if (strcmp(current->word, word) == 0) {
// Word already in list; update count.
current->count++;
break;
}
else if (current->next != NULL
&& strcmp(word, current->next->word) < 0) {
// Insert in front of next-greatest lexiographical word.
new = (struct NODE *) malloc(sizeof(struct NODE));
new->word = (char *) malloc(strlen(word));
strcpy(new->word, word);
new->count = 1;
new->next = current->next;
current->next = new;
break;
}
else if (current->next == NULL) {
// Reached the end; Insert at back of list.
new = (struct NODE *) malloc(sizeof(struct NODE));
new->word = (char *) malloc(strlen(word));
strcpy(new->word, word);
new->count = 1;
new->next = NULL;
current->next = new;
break;
}
}
}
#ifdef atcdebug
fprintf(stderr, "counter: list is now: {");
current = list->head;
while (current != NULL) {
fprintf(stderr, "%s->", current->word);
current = current->next;
}
fprintf(stderr, "}\n\n");
#endif
pthread_mutex_unlock(&list->listLock);
}
static void printList(struct linkedList *list) {
/*
* printList()
*/
struct NODE *current;
// For debugging purposes and future implementations, lock the list.
pthread_mutex_lock(&list->listLock);
// Print the word of every node in the list.
for (current = list->head; current != NULL; current = current->next) {
printf("Count: %ld Word: %s\n", current->count, current->word);
}
pthread_mutex_unlock(&list->listLock);
}
static void freeList(struct linkedList *list) {
/*
* freeList()
*/
struct NODE *current;
struct NODE *temp;
// For debugging purposes and future implementations, lock the list.
pthread_mutex_lock(&list->listLock);
// Free every node in the list.
while (current != NULL) {
temp = current;
current = current->next;
free(temp);
}
pthread_mutex_unlock(&list->listLock);
}
static void * counterThread(void * arg) {
/*
* counterThread()
*/
char name;
char line[LEN_LINE];
char *token;
struct timespec requestedTime;
struct timespec remainingTime;
name = (char) arg;
requestedTime.tv_sec = threadDelay / 1000;
requestedTime.tv_nsec = threadDelay % 1000 * 1000000;
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
#ifdef atcdebug
fprintf(stderr, "counter: name == %c, numLines == %d, threadDelay == %d\n", name, numLines, threadDelay);
#endif
while (TRUE) {
// Sleep for allotted time.
sleeper(&requestedTime, &remainingTime);
if (numLines == 1) {
getFromSingleBuffer(&buffer, line);
}
else {
getFromBuffer(&buffer, line);
}
// Get the first word in the line.
token = strtok(line, " ");
// With each word, see if there are an even or odd number of characters
// and insert the word into the designated list. Then print the name
// of the thread on stdout.
while (token != NULL) {
if (strlen(token) % 2 == 0) {
putInList(&evenList, token);
#ifndef atcdebug
printf("%c ", name);
#endif
}
else {
putInList(&oddList, token);
#ifndef atcdebug
printf("%c ", name);
#endif
}
token = strtok(NULL, " ");
}
}
return NULL;
}
static void createThread() {
/*
* createThread()
*/
// Would a new thread put us over the maximum allowed?
// If not, create another counter thread as requested.
if (((nextThread - 'a') < maxCounters)) {
#ifdef atcdebug
fprintf(stderr, "reader: creating thread, name == %c\n", nextThread);
#endif
pthread_create(&threadPool[nextThread - 'a'], NULL, counterThread, (void *) nextThread);
nextThread++;
}
}
// **** Main ****
int main(int argc, char *argv[]) {
/*
* main() acts as the reader thread, and therefore the producer.
*/
char line[LEN_LINE];
int i;
FILE *textFile;
struct timespec requestedTime;
struct timespec remainingTime;
// Get command line args.
if (argc < 10) {
fprintf(stderr, USAGE);
exit(1);
}
numLines = maxCounters = fileDelay = threadDelay = -1;
i = 1;
errno = 0;
while (argv[i] && i < 9) {
// Get numLines.
if (strcmp(argv[i], "-b") == 0) {
i++;
if (argv[i]) {
numLines = strtol(argv[i], NULL, 10);
if (errno != 0 || numLines <= 0) {
fprintf(stderr, "counter: num_lines must be a positive, non-zero integer.\n");
exit(1);
}
}
else {
fprintf(stderr, USAGE);
exit(1);
}
}
// Get maxCounters.
else if (strcmp(argv[i], "-t") == 0) {
i++;
if (argv[i]) {
maxCounters = strtol(argv[i], NULL, 10);
if (errno != 0 || maxCounters <= 0 || maxCounters > 26) {
fprintf(stderr, "counter: max_counters must be a positive, non-zero integer no greater than 26.\n");
exit(1);
}
}
else {
fprintf(stderr, USAGE);
exit(1);
}
}
// Get fileDelay.
else if (strcmp(argv[i], "-d") == 0) {
i++;
if (argv[i]) {
fileDelay = strtol(argv[i], NULL, 10);
if (errno != 0 || fileDelay < 0) {
fprintf(stderr, "counter: file_delay must be a positive integer.\n");
exit(1);
}
}
else {
fprintf(stderr, USAGE);
exit(1);
}
}
// Get threadDelay.
else if (strcmp(argv[i], "-D") == 0) {
i++;
if (argv[i]) {
threadDelay = strtol(argv[i], NULL, 10);
if (errno != 0 || threadDelay < 0) {
fprintf(stderr, "counter: thread_delay must be a positive integer.\n");
exit(1);
}
}
else {
fprintf(stderr, USAGE);
exit(1);
}
}
else {
if (numLines < 0 || maxCounters < 0
|| fileDelay < 0 || threadDelay < 0) {
fprintf(stderr, USAGE);
exit(1);
}
}
i++;
}
#ifdef atcdebug
fprintf(stderr, "reader: numLines = %d, maxCounters = %d, fileDelay = %d, "
"threadDelay = %d\n", numLines, maxCounters, fileDelay, threadDelay);
#endif
// Set up thread environment.
initBuffer(&buffer);
initLinkedList(&evenList);
initLinkedList(&oddList);
nextThread = 'a';
moreStuff = TRUE;
// Start up first counter (consumer) thread.
createThread();
// **** Reader (producer) thread ****
textFile = NULL;
requestedTime.tv_sec = threadDelay / 1000;
requestedTime.tv_nsec = threadDelay % 1000 * 1000000;
while (argv[i]) {
// Open each file given in argv[].
if (!(textFile = fopen(argv[i], "r"))) {
fprintf(stderr, "counter: could not open %s\n", argv[i]);
}
else {
// Read each line in the file and insert it into the buffer.
while (fgets(line, LEN_LINE, textFile)) {
// Strip newline.
line[strcspn(line, "\n")] = '\0';
if (numLines == 1) {
// Attempt to create a new thread if function says
// buffer was full.
if (putInSingleBuffer(&buffer, line) == TRUE) {
createThread();
}
}
else {
// Attempt to create a new thread if put function says
// buffer was full.
if (putInBuffer(&buffer, line) == TRUE) {
createThread();
}
}
// Sleep for allotted time.
while (nanosleep(&requestedTime, &remainingTime) == -1) {
requestedTime = remainingTime;
}
}
fclose(textFile);
}
i++;
}
// Let counter threads know that there won't be any more input and wait
// for threads to finish before printing and terminating.
moreStuff = FALSE;
sleep(2);
for (i = 0; i < MAX_THREADS; i++) {
if (threadPool[i]) {
pthread_cancel(threadPool[i]);
}
}
printf("\n==== Words with an even number of letters ====\n");
printList(&evenList);
printf("\n==== Words with an odd number of letters ====\n");
printList(&oddList);
freeList(&evenList);
freeList(&oddList);
exit(0);
}