From 3d9c7d4c5ae135ace068ba50f6b3ae971d8e276b Mon Sep 17 00:00:00 2001 From: sotech117 Date: Wed, 20 Sep 2023 23:24:05 -0400 Subject: implement collecting all bits, if they don't come in one message --- Makefile | 2 +- client.c | 90 ++- protocol.c | 40 + protocol.h | 8 +- server.c | 831 +++++++++++++++++++++ snowcast_control | Bin 35357 -> 35565 bytes .../Contents/Resources/DWARF/snowcast_control | Bin 13757 -> 14770 bytes snowcast_listener | Bin 34654 -> 34654 bytes snowcast_server | Bin 55868 -> 56124 bytes .../Contents/Resources/DWARF/snowcast_server | Bin 26569 -> 27185 bytes snowcast_server_concurrent.c | 791 -------------------- 11 files changed, 927 insertions(+), 835 deletions(-) create mode 100644 protocol.c create mode 100644 server.c delete mode 100644 snowcast_server_concurrent.c diff --git a/Makefile b/Makefile index 277de9a..3aae1ce 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,7 @@ CFLAGS = -g -I. -std=gnu99 -Wall -pthread all: server client server: - $(CC) $(CFLAGS) -o snowcast_server snowcast_server_concurrent.c + $(CC) $(CFLAGS) -o snowcast_server server.c client: $(CC) $(CFLAGS) -o snowcast_control client.c diff --git a/client.c b/client.c index 735330c..852e7ff 100644 --- a/client.c +++ b/client.c @@ -16,7 +16,7 @@ #include -#include "protocol.h" +#include "protocol.c" #define MAXDATASIZE 100 // max number of bytes we can get at once @@ -103,6 +103,9 @@ int main(int argc, char *argv[]) exit(1); } + // CONSIDER: could recieve the welcome message here + + char input[LINE_MAX]; printf("Enter a number to change to it's station. Click q to end stream.\n"); while (1) { @@ -123,8 +126,9 @@ int main(int argc, char *argv[]) struct SetStation setStation; setStation.commandType = 1; setStation.stationNumber = htons(inputInt); - if ((numbytessent = send(sockfd, &setStation, sizeof(struct SetStation), 0)) == -1) { - perror("send"); + int bytes_to_send = sizeof(struct SetStation); + if (send_all(sockfd, &setStation, &bytes_to_send) == -1) { + perror("send_all"); exit(1); } } @@ -135,59 +139,65 @@ int main(int argc, char *argv[]) void *reply_thread_routine(void* args) { int sockfd = (int)args; - int recvbytes; - char buf[MAX_READ_SIZE]; + // int recvbytes; while (1) { - // recv the message, check for errors too - memset(buf, 0, MAX_READ_SIZE); - if ((recvbytes = recv(sockfd, &buf, MAX_READ_SIZE, 0)) == -1) { + // recv the first byte of the message to get it's type + uint8_t reply_type = -1; + // print size of utin8 + if (recv(sockfd, &reply_type, 1, 0) == -1) { perror("recv"); exit(1); } - buf[recvbytes] = '\0'; - // printf("client: received %d bytes on a reply call \n", recvbytes); - // print the two first field of the call - // printf("client: replyType: %d, stringSize: %d\n", buf[0], buf[1]); - // print the while buffer by char - // for (int i = 0; i < recvbytes; i++) { - // printf("%c ", buf[i]); - // } - struct Reply reply; - memcpy(&reply, buf, 2); - - // print out the fields of reply on one line - // printf("\nclient: replyType: %d, stringSize: %d\n", reply.replyType, reply.stringSize); - if (reply.replyType == 2) { - struct Welcome msg; + if (reply_type == 2) { // we have a welcome message // recv the message, check for errors too - memcpy(&msg, buf, sizeof(struct Welcome)); - msg.numStations = ntohs(msg.numStations); - printf("Welcome to Snowcast! The server has %d stations.\n", msg.numStations); + int16_t num_stations = -1; + int bytes_to_read = sizeof(uint16_t); + if (recv_all(sockfd, &num_stations, &bytes_to_read) == -1) { + perror("recv_all"); + exit(1); + } + num_stations = ntohs(num_stations); + printf("Welcome to Snowcast! The server has %d stations.\n", num_stations); continue; } - // print the size of reply - if (reply.replyType == 3) { - // printf("client: received an announce message\n"); - - char *song_name = malloc(reply.stringSize); - // printf(sizeof(struct Reply)); - memcpy(song_name, buf + 2, reply.stringSize); + if (reply_type == 3) { // we have an announce message + // get the string size + u_int8_t string_size = -1; + if (recv(sockfd, &string_size, 1, 0) == -1) { + perror("recv"); + exit(1); + } + char *song_name = malloc(string_size); + int bytes_to_read = string_size; + if (recv_all(sockfd, song_name, &bytes_to_read) == -1) { + perror("recv_all"); + exit(1); + } printf("New song announced: %s\n", song_name); free(song_name); continue; - } else if (reply.replyType == 4) { - // print sockfd - char *message = malloc(reply.stringSize); - // printf(sizeof(struct Reply)); - memcpy(message, buf + 2, reply.stringSize); - printf("Exiting. %s\n", message); + } else if (reply_type == 4) { // we have an invalid command message + // get the string size + u_int8_t string_size = -1; + if (recv(sockfd, &string_size, 1, 0) == -1) { + perror("recv"); + exit(1); + } + char *message = malloc(string_size); + int bytes_to_read = string_size; + if (recv_all(sockfd, message, &bytes_to_read) == -1) { + perror("recv_all"); + exit(1); + } + printf("Invalid protocol: %s. Exiting.\n", message); + free(message); close(sockfd); exit(1); } - printf("Exiting. Lost connection to server."); + printf("Lost connection to server. Exiting."); close(sockfd); exit(1); } diff --git a/protocol.c b/protocol.c new file mode 100644 index 0000000..864afd8 --- /dev/null +++ b/protocol.c @@ -0,0 +1,40 @@ +#include +#include + +#include "protocol.h" + +int send_all(int sock, char *buf, int *len) +{ + int total = 0; // how many bytes we've sent + int bytesleft = *len; // how many we have left to send + int n; + + while(total < *len) { + n = send(sock, buf+total, bytesleft, 0); + if (n == -1) { break; } + total += n; + bytesleft -= n; + } + + *len = total; // return number actually sent here + + return n==-1?-1:0; // return -1 on failure, 0 on success +} + +int recv_all(int sock, char *buf, int *len) +{ + int total = 0; // how many bytes we've sent + int bytesleft = *len; // how many we have left to send + int n; + + while(total < *len) { + n = recv(sock, buf+total, bytesleft, 0); + if (n == -1) { break; } + total += n; + bytesleft -= n; + } + + *len = total; // return number actually sent here + + return n==-1?-1:0; // return -1 on failure, 0 on success +} diff --git a/protocol.h b/protocol.h index 39f26e6..aeeaa54 100644 --- a/protocol.h +++ b/protocol.h @@ -25,8 +25,7 @@ struct Welcome { struct Reply { uint8_t replyType; uint8_t stringSize; - char *string; -} __attribute__((packed)); +} reply_t __attribute__((packed)); struct Announce { uint8_t replyType; uint8_t songnameSize; @@ -36,4 +35,7 @@ struct InvalidCommand { uint8_t replyType; uint8_t replyStringSize; char *replyString; -} __attribute__((packed)); \ No newline at end of file +} __attribute__((packed)); + +int send_all(int sock, char *buf, int *len); +int recv_all(int sock, char *buf, int *len); diff --git a/server.c b/server.c new file mode 100644 index 0000000..837432a --- /dev/null +++ b/server.c @@ -0,0 +1,831 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "protocol.c" + +#define LINE_MAX 1024 +#define MAX_USERS 1000 +#define MAX_PATH 50 + +typedef struct station { + int seekIndex; + char* filePath; +} station_t; + +typedef struct user { + int udpPort; + int stationNum; + int sockfd; + pthread_t streamThread; +} user_t; + + +/* For safe condition variable usage, must use a boolean predicate and */ +/* a mutex with the condition. */ +int count = 0; +pthread_cond_t cond = PTHREAD_COND_INITIALIZER; +pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; + +pthread_mutex_t station_mutex = PTHREAD_MUTEX_INITIALIZER; + +const char *port; +int num_stations; + +int start_threads = 0; +int max_active_users = 0; + +pthread_mutex_t mutex_user_data = PTHREAD_MUTEX_INITIALIZER; +// array from index to user_data +user_t *user_data; +int sockfd_to_user[MAX_USERS]; + +// stations array pointer +station_t *station_data; + +void *send_udp_packet_routine(void* arg); +void *select_thread(void* arg); +void *synchronization_thread(void* arg); + +int parse(char buffer[LINE_MAX], char *tokens[LINE_MAX / 2]); +void *print_info_routine(void *arg); + +void *get_in_addr(struct sockaddr *sa); + +void *init_user(int sockfd); +void *update_user_udpPort(int sockfd, int udpPort); +void *update_user_station(int sockfd, int stationNum); +void *print_user_data(int sockfd); +void destroy_user(int sockfd); + +void send_announce_reply(int fd, int station_num); +void send_invalid_command_reply(int fd, size_t message_size, char* message); + +// void *load_file(void* arg); + +int main(int argc, char *argv[]) +{ + // threads to control reading files at chunks while the other threads sleep + // station_data = malloc(sizeof(station_t) * NUM_STATIONS); + // check and assign arguments + if (argc < 3) { + fprintf(stderr,"usage: ./snowcast_server [file 1] [file 2] ... \n"); + exit(1); + } + + port = argv[1]; + num_stations = argc - 2; + + // printf("port: %s\n", port); + // printf("num_stations: %d\n", num_stations); + + // init stations + size_t totalSize = 0; + // get size to malloc + for (int i = 2; i < argc; i++) + { + // printf("file: %s\n", argv[i]); + totalSize += sizeof(int) + strlen(argv[i]); + } + station_data = malloc(totalSize); + // assign the stations + for (int i = 2; i < argc; i++) + { + station_data[i - 2] = (station_t) { 0, argv[i]}; + } + + // print all indexes in station data + // for (int i = 0; i < num_stations; i++) + // { + // printf("station %d: %s\n", i, station_data[i].filePath); + // } + + // make array of user data + user_data = malloc(sizeof(user_t) * max_active_users); + if (!user_data) { perror("malloc"); return 1; } + + // make and start "select" thread that manages: + // 1) new connections, 2) requests from current connections, 3)cloing connections + pthread_t s_thread; + pthread_create(&s_thread, NULL, select_thread, NULL); + + // start syncchronization thread to broadcast stations + pthread_t sync_thread; + pthread_create(&sync_thread, NULL, synchronization_thread, NULL); + + // command line interface + char input[LINE_MAX]; + memset(input, 0, LINE_MAX); + + char *tokens[LINE_MAX / 2]; + while (read(STDIN_FILENO, input, LINE_MAX) > 0) { + // init tokens + memset(tokens, 0, (LINE_MAX / 2) * sizeof(char *)); + + // if 0, all whitespace + if (!parse(input, tokens)) + continue; + + char *command = tokens[0]; + // if q, shutdown! + if (!strcmp(command, "q")) { + printf("Exiting.\n"); + // TODO: exit better than break + break; + } + + // if p, print info + else if (!strcmp(command, "p")) { + // get the file descriptor + int print_fd = 0; + // see if there is a file path + char *output_file_path = tokens[1]; + if (output_file_path != NULL) + { + if ((print_fd = open(output_file_path, O_CREAT | O_WRONLY | O_TRUNC, S_IRWXU)) == -1) + { + perror("open"); + continue; + } + } else { + print_fd = STDOUT_FILENO; + } + // printf("print_fd: %d\n", print_fd); + pthread_t print_info_thread; + pthread_create(&print_info_thread, NULL, print_info_routine, print_fd); + // note - this file descriptor is closed in the thread + } + else if (!strcmp(command, "u")) + { + // print all user data + for (int i = 0; i < max_active_users; i++) + { + print_user_data(i); + } + } + } + + return 0; +} + +void write_int_to_fd(int fd, int n) { + int l = snprintf(NULL, 0, "%d", n); + char *num = malloc(l + 1); + if (!num) { perror("malloc"); return; } + + snprintf(num, l + 1, "%d", n); + if (write(fd, num, strlen(num)) == -1) { + perror("write"); + } + + + free(num); +} + +void *print_info_routine(void *arg) { + int print_fd = (int) arg; + // printf("thread print_fd: %d\n", print_fd); + // printf("num_stations: %d\n", num_stations); + for (int i = 0; i < num_stations; i++) { + write_int_to_fd(print_fd, i); + char *comma = ","; + write(print_fd, comma, strlen(comma)); + + // write file path + char* file_path = station_data[i].filePath; + write(print_fd, file_path, strlen(file_path)); + + for (int j = 0; j < max_active_users; j++) { + if (!user_data[j].sockfd || user_data[j].sockfd == -1) + continue; + if (user_data[j].stationNum == i) { + char *localhost_ip = ",127.0.0.1:"; + write(print_fd, localhost_ip, strlen(localhost_ip)); + // write udpPort + write_int_to_fd(print_fd, user_data[j].udpPort); + } + } + // wrtie new line + char *newline = "\n"; + write(print_fd, newline, strlen(newline)); + } + + if (print_fd != STDOUT_FILENO) close(print_fd); + return (NULL); +} + +int send_all_udp(int udp_sockfd, char *buf, int *len, struct addrinfo *thread_res) +{ + int MAX_PACKET_SIZE = 512; + int total = 0; // how many bytes we've sent + int bytesleft = *len; // how many we have left to send + int n; + + while(total < *len) { + n = sendto(udp_sockfd, buf+total, MAX_PACKET_SIZE, 0, thread_res->ai_addr, thread_res->ai_addrlen); + // thread_res->ai_addr, thread_res->ai_addrlen)) == -1; + if (n == -1) { break; } + total += n; + bytesleft -= n; + } + + *len = total; // return number actually sent here + + return n==-1?-1:0; // return -1 on failure, 0 on success +} + + +/* Make the manager routine */ +void *send_udp_packet_routine(void *arg) { + // unpack args + int user_index = (int) arg; + // printf("thread : user_index: %d\n", user_index); + // print user data + // print_user_data(user_index); + + // declare vairables to be used + int did_work = 1; + pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER; + int s; + int udp_sockfd; + struct addrinfo thread_hints, *thread_res, *thread_servinfo; + int error_code; + + // TODO: add error checking on these calls*** + + // setup hints + memset(&thread_hints, 0, sizeof thread_hints); + thread_hints.ai_family = AF_INET; // use IPv4 only + thread_hints.ai_socktype = SOCK_DGRAM; + thread_hints.ai_flags = AI_PASSIVE; // fill in my IP for me + + // setup the socket for client listener DATAGRAM (udp) + // cover the port integer to a string + int int_port = user_data[user_index].udpPort; + int length = snprintf( NULL, 0, "%d", int_port ); + char* port = malloc( length + 1 ); + if (!port) { perror("malloc"); return (NULL); } + snprintf( port, length + 1, "%d", int_port ); + sprintf(port, "%d", int_port); + + if (error_code = getaddrinfo(NULL, port, &thread_hints, &thread_servinfo) != 0) + { + fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(error_code)); + return (NULL); + } + free(port); + + // loop through all the results and make a socket + for(thread_res = thread_servinfo; thread_res != NULL; thread_res = thread_res->ai_next) { + if ((udp_sockfd = socket(thread_res->ai_family, thread_res->ai_socktype, + thread_res->ai_protocol)) == -1) { + perror("talker: socket"); + continue; + } + break; + } + if (udp_sockfd == NULL) { + fprintf(stderr, "talker: failed to create socket\n"); + return (NULL); + } + + // bind(udp_sockfd, thread_res->ai_addr, thread_res->ai_addrlen); + + + // freeaddrinfo(thread_servinfo); + + while (1) { + // wait for + pthread_mutex_lock(&m); + did_work = 0; + while (!start_threads) + { + pthread_cond_wait(&cond, &m); + } + + int station_num = user_data[user_index].stationNum; + if (station_num == -1) { + did_work = 1; + } + + if (!did_work) { + // sendto a random string of data to the user + int station_num = user_data[user_index].stationNum; + char *data = station_data[station_num].filePath; + // printf("load data: thread %d \n", user_index); + + // get file path + char* file_path = station_data[station_num].filePath; + // get current seek chunk + int stream_fd = open(file_path, O_RDONLY); + if (stream_fd == -1) { + perror("open"); + return (NULL); + } + int current_chunk = station_data[station_num].seekIndex; + if (lseek(stream_fd, current_chunk, SEEK_SET) == -1) { + perror("fseek"); + return (NULL); + } + size_t BYTES_PER_SECOND = 16*1024; + // read 1000 bytes of the file + char file_buffer[BYTES_PER_SECOND]; + if (read(stream_fd, file_buffer, BYTES_PER_SECOND) == -1) { + perror("fread"); + return (NULL); + } + close(stream_fd); + // printf("send data: thread %d \n", user_index); + // int numbytes; + // if ((numbytes = sendto(udp_sockfd, data, strlen(data), 0, + // thread_res->ai_addr, thread_res->ai_addrlen)) == -1) { + // perror("talker: sendto"); + // return (NULL); + // } + // print the size of the file_buffer + // printf("size of file_buffer: %lu\n", sizeof(file_buffer)); + + int bytes_sent = sizeof(file_buffer); + if (send_all_udp(udp_sockfd, file_buffer, &bytes_sent, thread_res) == -1) + { + perror("send_all_udp"); + printf("We only sent %d bytes because of the error!\n", bytes_sent); + } + // printf("We sent all %d bytes!\n", bytes_sent); + + did_work = 1; + + + usleep(400000); + } + + pthread_mutex_unlock(&m); + + usleep(100000); + } + return NULL; +} + +void *send_announce_routine(void *arg) { + // unpack args + int station_num = (int) arg; + // send the announce messages + for (int i = 0; i < max_active_users; i++) + { + // if (user_data[i].streamThread == NULL) { + // break; + // } + if (user_data[i].sockfd == 0 || user_data[i].sockfd == -1) { + continue; + } + // print_user_data(i); + // update the station of each user + if (user_data[i].stationNum == station_num) + { + // printf("sending announce to user %d\n", i); + send_announce_reply(user_data[i].sockfd, station_num); + } + } +} + +void *synchronization_thread(void *arg) { + int c = 0; + while (1) + { + start_threads = 1; + // printf("\nbroadcast %d\n", c++); + pthread_cond_broadcast(&cond); + usleep(2000); + start_threads = 0; + // printf("before loop"); + // update file seek index for each station + size_t BYTES_PER_SECOND = 16*1024; + // print num_stations + // printf("num_stations: %d\n", num_stations); + for (int i = 0; i < num_stations; i++) + { + // printf("checking station %d\n", i); + // get size of file + struct stat f_info; + // int file_fd = open(station_data[i].filePath, O_RDONLY); + // if (file_fd == -1) { + // perror("open"); + // return (NULL); + // } + if (stat(station_data[i].filePath, &f_info) == -1) { + perror("fstat"); + return (NULL); + } + + size_t size = f_info.st_size; + + // fclose(file_fd); + // if (size == -1) { + // perror("ftell"); + // return (NULL); + // } + station_data[i].seekIndex += BYTES_PER_SECOND; + // if the seek index is greater than the size of the file, reset it + if (station_data[i].seekIndex >= size) + { + station_data[i].seekIndex = 0; + // printf("resetting station %d\n", i); + + pthread_t send_announce_thread; + pthread_create(&send_announce_thread, NULL, send_announce_routine, (void *)i); + } + } + + + usleep(2000); + usleep(1000000-4000); + } +} + +void *select_thread(void *arg) { + fd_set master; // master file descriptor list + fd_set read_fds; // temp file descriptor list for select() + int fdmax; // maximum file descriptor number + + int listener; // listening socket descriptor + int newfd; // newly accept()ed socket descriptor + struct sockaddr_storage remoteaddr; // client address + socklen_t addrlen; + + char buf[256]; // buffer for client data + int nbytes; + + + char remoteIP[INET6_ADDRSTRLEN]; + + int yes=1; // for setsockopt() SO_REUSEADDR, below + int i, j, rv; + + struct addrinfo hints, *ai, *p; + + // const char* port = argv[1]; + + FD_ZERO(&master); // clear the master and temp sets + FD_ZERO(&read_fds); + + // LISTENER: get us a socket and bind it + memset(&hints, 0, sizeof hints); + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE; + if ((rv = getaddrinfo(NULL, port, &hints, &ai)) != 0) { + fprintf(stderr, "snowcast_server: %s\n", gai_strerror(rv)); + exit(1); + } + + for(p = ai; p != NULL; p = p->ai_next) { + listener = socket(p->ai_family, p->ai_socktype, p->ai_protocol); + if (listener < 0) { + continue; + } + + // lose the pesky "address already in use" error message + setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)); + + if (bind(listener, p->ai_addr, p->ai_addrlen) < 0) { + close(listener); + continue; + } + + break; + } + + // if we got here, it means we didn't get bound + if (p == NULL) { + fprintf(stderr, "snowcast_server: failed to bind\n"); + exit(2); + } + + freeaddrinfo(ai); // all done with this + + // listen + if (listen(listener, 10) == -1) { + perror("listen"); + exit(3); + } + + // add the listener to the master set + FD_SET(listener, &master); + + // keep track of the biggest file descriptor + fdmax = listener; // so far, it's this one + + while(1) { + read_fds = master; // copy it + if (select(fdmax+1, &read_fds, NULL, NULL, NULL) == -1) { + perror("select"); + exit(4); + } + + // run through the existing connections looking for data to read + for(i = 0; i <= fdmax; i++) { + if (FD_ISSET(i, &read_fds)) { // we got one!! + if (i == listener) { + // handle new connections + addrlen = sizeof remoteaddr; + newfd = accept(listener, + (struct sockaddr *)&remoteaddr, + &addrlen); + + if (newfd == -1) { + perror("accept"); + } else { + FD_SET(newfd, &master); // add to master set + if (newfd > fdmax) { // keep track of the max + fdmax = newfd; + } + // printf("selectserver: new connection from %s on " + // "socket %d\n.", + // inet_ntop(remoteaddr.ss_family, + // get_in_addr((struct sockaddr*)&remoteaddr), + // remoteIP, INET6_ADDRSTRLEN), + // newfd); + // init user with this newfd + init_user(newfd); + } + } else { + // handle data from a client + struct Command command; + if ((nbytes = recv(i, (char*)&command, sizeof(struct Command), 0)) <= 0) { + // got error or connection closed by client + if (nbytes == 0) { + // connection closed + // printf("selectserver: socket %d hung up\n", i); + } else { + perror("recv"); + } + close(i); // bye! + FD_CLR(i, &master); // remove from master set + // remove user from data structures + destroy_user(i); + } else { + // we got some data from a client + if (command.commandType == 0) { + // hello message with udpPort + // printf("udpPort (from Hello) for new connection is %d.\n", ntohs(command.number)); + // update udp port of user + update_user_udpPort(i, ntohs(command.number)); + + // send the welcome message to client + struct Welcome welcome; + welcome.replyType = 2; + welcome.numStations = htons(num_stations); + int bytes_to_send = sizeof(struct Welcome); + if (send_all(i, &welcome, &bytes_to_send) == -1) + perror("send_all"); + } + else if (command.commandType == 1) { + // check if user has a udpPort + if (user_data[sockfd_to_user[i]].udpPort == -1) { + // send back in invalid command + char * message = "Must send Hello message first."; + send_invalid_command_reply(i, strlen(message), message); + // drop connection upon invalid command + close(i); + FD_CLR(i, &master); + destroy_user(i); + continue; + } + + int station_num = ntohs(command.number); + // printf("station_num: %d\n", station_num); + if (station_num >= num_stations || station_num < 0) { + // send back in invalid command + char * message = "Station number out of range."; + send_invalid_command_reply(i, strlen(message), message); + // drop connection upon invalid command + close(i); + FD_CLR(i, &master); + destroy_user(i); + continue; + } + + // printf("setting station to %d\n", ntohs(command.number)); + // update station of user + update_user_station(i, ntohs(command.number)); + send_announce_reply(i, station_num); + } + else { + // send back in invalid command + char * message = "Invalid command"; + send_invalid_command_reply(i, strlen(message), message); + + // drop connection upon invalid command + close(i); + FD_CLR(i, &master); + destroy_user(i); + } + } + } // END handle data from client + } // END got new incoming connection + } // END looping through file descriptors + + } // END for(;;)--and you thought it would never end! +} + +void *init_user(int sockfd) { + // add the user to the list of user data + pthread_mutex_lock(&mutex_user_data); + // this is to save memory space. + // in general, the displacement of 4 is where a user "used to be" + // int user_index = max_active_users++; + // int running_index = 0; + // while(running_index < max_active_users) + // { + // if (user_data[running_index++].sockfd == -1) + // { + // user_index = running_index; + // break; + // } + // // printf("reusing memory\n"); + // } + int running_index = 0; + while(running_index < max_active_users) { + if (user_data[running_index].sockfd == -1) { + break; + } + running_index++; + } + if (running_index == max_active_users) { + // printf("reached max active users\n"); + // printf("making new memory\n"); + max_active_users++; + // TODO: FIX SO THAT IT USES CURRENT USERS, NOT MAX_ACTIVE_USERS FOT THE RESIZE + user_t *more_users = realloc(user_data, sizeof(user_t) * max_active_users); + if (!more_users) { perror("realloc"); exit(1); } + user_data = more_users; + } + + + // map TCP sockfd to this user index + user_data[running_index] = (user_t){-1, -1, sockfd, -1}; + sockfd_to_user[sockfd] = running_index; + // free(user_stream_threads); + pthread_mutex_unlock(&mutex_user_data); +} +void *update_user_udpPort(int sockfd, int udpPort) { + pthread_mutex_lock(&mutex_user_data); + // get the user + user_t *user = &user_data[sockfd_to_user[sockfd]]; + // set the udpPort + user->udpPort = udpPort; + // start the stream thread, now that we have the udpPort + pthread_create(&user->streamThread, NULL, send_udp_packet_routine, (void *)sockfd_to_user[sockfd]); + pthread_mutex_unlock(&mutex_user_data); +} +void *update_user_station(int sockfd, int stationNum) { + pthread_mutex_lock(&mutex_user_data); + user_data[sockfd_to_user[sockfd]].stationNum = stationNum; + pthread_mutex_unlock(&mutex_user_data); +} +void *print_user_data(int index) { + printf("udpPort: %d, stationNum: %d, sockfd: %d, threadId:%d\n", + user_data[index].udpPort, user_data[index].stationNum, user_data[index].sockfd, user_data[index].streamThread); +} + +void destroy_user(int sockfd) { + pthread_mutex_lock(&mutex_user_data); + // stop the thread streaming to the user + // TODO: close the FD in the stream thread + pthread_cancel(user_data[sockfd_to_user[sockfd]].streamThread); + // close(user_data[sockfd_to_user[sockfd]].udpPort); + // pthread_kill(user_data[sockfd_to_user[sockfd]].streamThread, SIGINT); + // "remove" the user from the list of user data + user_data[sockfd_to_user[sockfd]] = (user_t) {-1, -1, -1, -1}; + // map sockfd to -1 + sockfd_to_user[sockfd] = -1; + // close(sockfd); + + pthread_mutex_unlock(&mutex_user_data); +} + + +void *get_in_addr(struct sockaddr *sa) +{ + if (sa->sa_family == AF_INET) { + return &(((struct sockaddr_in*)sa)->sin_addr); + } + + return &(((struct sockaddr_in6*)sa)->sin6_addr); +} + +void send_announce_reply(int fd, int station_num) { + char* file_path = station_data[station_num].filePath; + int len_file_path = strlen(file_path); + + char *send_buffer = malloc(len_file_path+2); + if (!send_buffer) { + perror("malloc"); + return; + } + send_buffer[0] = 3; + send_buffer[1] = len_file_path; + + memcpy(send_buffer + 2, file_path, len_file_path); + + // printf("buffer: %s\n", send_buffer); + + size_t bytes_to_send = len_file_path + 2; + if (send_all(fd, send_buffer, &bytes_to_send) == -1) + perror("send_all"); + // print the number of bytes sent + // printf("sent %d bytes\n", bytes_to_send); + + free(send_buffer); +} + +void send_invalid_command_reply(int fd, size_t message_size, char* message) { + char *send_buffer = malloc(message_size+2); + if (!send_buffer) { + perror("malloc"); + return; + } + + // type and payload size + send_buffer[0] = 4; + send_buffer[1] = message_size; + + memcpy(send_buffer + 2, message, message_size); + + // printf("buffer: %s\n", send_buffer); + + int bytes_to_send = message_size + 2; + if (send_all(fd, send_buffer, &bytes_to_send) == -1) + perror("send"); + // print the number of bytes sent + // printf("sent %d bytes\n", bytessent); + + free(send_buffer); +} + + +// Parses a buffer into tokens, from cs33 :) +int parse(char buffer[LINE_MAX], char *tokens[LINE_MAX / 2]) { + const char *regex = " \n\t\f\r"; + char *current_token = strtok(buffer, regex); + if (current_token == NULL) return 0; + + for (int i = 0; current_token != NULL; i++) { + tokens[i] = current_token; + current_token = strtok(NULL, regex); + } + + return 1; +} + +// int send_all(int s, char *buf, int *len) +// { +// int total = 0; // how many bytes we've sent +// int bytesleft = *len; // how many we have left to send +// int n; + +// while(total < *len) { +// n = send(s, buf+total, bytesleft, 0); +// if (n == -1) { break; } +// total += n; +// bytesleft -= n; +// } + +// *len = total; // return number actually sent here + +// return n==-1?-1:0; // return -1 on failure, 0 on success +// } + +// int recv_all(int sock, char *buffer, int total_size) +// { +// int total_bytes_read = 0; +// int to_read = total_size; + +// char *ptr = buffer; + +// while (to_read > 0) { +// int bytes_read = recv(sock, ptr, to_read, 0); +// if (bytes_read <= 0) { +// if (bytes_read != 0) { +// perror("recv"); +// } +// return -1; +// } + +// to_read -= bytes_read; +// ptr += bytes_read; +// total_bytes_read += bytes_read; +// } + +// return total_bytes_read; +// } diff --git a/snowcast_control b/snowcast_control index d4a3ba1..df78cdb 100755 Binary files a/snowcast_control and b/snowcast_control differ diff --git a/snowcast_control.dSYM/Contents/Resources/DWARF/snowcast_control b/snowcast_control.dSYM/Contents/Resources/DWARF/snowcast_control index d16a61e..c9e1f06 100644 Binary files a/snowcast_control.dSYM/Contents/Resources/DWARF/snowcast_control and b/snowcast_control.dSYM/Contents/Resources/DWARF/snowcast_control differ diff --git a/snowcast_listener b/snowcast_listener index 414117f..7f0e6a4 100755 Binary files a/snowcast_listener and b/snowcast_listener differ diff --git a/snowcast_server b/snowcast_server index 3434c81..918f71e 100755 Binary files a/snowcast_server and b/snowcast_server differ diff --git a/snowcast_server.dSYM/Contents/Resources/DWARF/snowcast_server b/snowcast_server.dSYM/Contents/Resources/DWARF/snowcast_server index d18b1e8..3f0a8c4 100644 Binary files a/snowcast_server.dSYM/Contents/Resources/DWARF/snowcast_server and b/snowcast_server.dSYM/Contents/Resources/DWARF/snowcast_server differ diff --git a/snowcast_server_concurrent.c b/snowcast_server_concurrent.c deleted file mode 100644 index e0552cf..0000000 --- a/snowcast_server_concurrent.c +++ /dev/null @@ -1,791 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include - - -#include "protocol.h" - -#define LINE_MAX 1024 -#define MAX_USERS 1000 -#define MAX_PATH 50 - -typedef struct station { - int seekIndex; - char* filePath; -} station_t; - -typedef struct user { - int udpPort; - int stationNum; - int sockfd; - pthread_t streamThread; -} user_t; - - -/* For safe condition variable usage, must use a boolean predicate and */ -/* a mutex with the condition. */ -int count = 0; -pthread_cond_t cond = PTHREAD_COND_INITIALIZER; -pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; - -pthread_mutex_t station_mutex = PTHREAD_MUTEX_INITIALIZER; - -const char *port; -int num_stations; - -int start_threads = 0; -int max_active_users = 0; - -pthread_mutex_t mutex_user_data = PTHREAD_MUTEX_INITIALIZER; -// array from index to user_data -user_t *user_data; -int sockfd_to_user[MAX_USERS]; - -// stations array pointer -station_t *station_data; - -void *send_udp_packet_routine(void* arg); -void *select_thread(void* arg); -void *synchronization_thread(void* arg); - -int parse(char buffer[LINE_MAX], char *tokens[LINE_MAX / 2]); -void *print_info_routine(void *arg); - -void *get_in_addr(struct sockaddr *sa); - -void *init_user(int sockfd); -void *update_user_udpPort(int sockfd, int udpPort); -void *update_user_station(int sockfd, int stationNum); -void *print_user_data(int sockfd); -void destroy_user(int sockfd); - -void send_announce_reply(int fd, int station_num); -void send_invalid_command_reply(int fd, size_t message_size, char* message); - - -// void *load_file(void* arg); - -int main(int argc, char *argv[]) -{ - // threads to control reading files at chunks while the other threads sleep - // station_data = malloc(sizeof(station_t) * NUM_STATIONS); - // check and assign arguments - if (argc < 3) { - fprintf(stderr,"usage: ./snowcast_server [file 1] [file 2] ... \n"); - exit(1); - } - - port = argv[1]; - num_stations = argc - 2; - - // printf("port: %s\n", port); - // printf("num_stations: %d\n", num_stations); - - // init stations - size_t totalSize = 0; - // get size to malloc - for (int i = 2; i < argc; i++) - { - // printf("file: %s\n", argv[i]); - totalSize += sizeof(int) + strlen(argv[i]); - } - station_data = malloc(totalSize); - // assign the stations - for (int i = 2; i < argc; i++) - { - station_data[i - 2] = (station_t) { 0, argv[i]}; - } - - // print all indexes in station data - // for (int i = 0; i < num_stations; i++) - // { - // printf("station %d: %s\n", i, station_data[i].filePath); - // } - - // make array of user data - user_data = malloc(sizeof(user_t) * max_active_users); - if (!user_data) { perror("malloc"); return 1; } - - // make and start "select" thread that manages: - // 1) new connections, 2) requests from current connections, 3)cloing connections - pthread_t s_thread; - pthread_create(&s_thread, NULL, select_thread, NULL); - - // start syncchronization thread to broadcast stations - pthread_t sync_thread; - pthread_create(&sync_thread, NULL, synchronization_thread, NULL); - - // command line interface - char input[LINE_MAX]; - memset(input, 0, LINE_MAX); - - char *tokens[LINE_MAX / 2]; - while (read(STDIN_FILENO, input, LINE_MAX) > 0) { - // init tokens - memset(tokens, 0, (LINE_MAX / 2) * sizeof(char *)); - - // if 0, all whitespace - if (!parse(input, tokens)) - continue; - - char *command = tokens[0]; - // if q, shutdown! - if (!strcmp(command, "q")) { - printf("Exiting.\n"); - // TODO: exit better than break - break; - } - - // if p, print info - else if (!strcmp(command, "p")) { - // get the file descriptor - int print_fd = 0; - // see if there is a file path - char *output_file_path = tokens[1]; - if (output_file_path != NULL) - { - if ((print_fd = open(output_file_path, O_CREAT | O_WRONLY | O_TRUNC, S_IRWXU)) == -1) - { - perror("open"); - continue; - } - } else { - print_fd = STDOUT_FILENO; - } - // printf("print_fd: %d\n", print_fd); - pthread_t print_info_thread; - pthread_create(&print_info_thread, NULL, print_info_routine, print_fd); - // note - this file descriptor is closed in the thread - } - else if (!strcmp(command, "u")) - { - // print all user data - for (int i = 0; i < max_active_users; i++) - { - print_user_data(i); - } - } - } - - return 0; -} - -void write_int_to_fd(int fd, int n) { - int l = snprintf(NULL, 0, "%d", n); - char *num = malloc(l + 1); - if (!num) { perror("malloc"); return; } - - snprintf(num, l + 1, "%d", n); - if (write(fd, num, strlen(num)) == -1) { - perror("write"); - } - - - free(num); -} - -void *print_info_routine(void *arg) { - int print_fd = (int) arg; - // printf("thread print_fd: %d\n", print_fd); - // printf("num_stations: %d\n", num_stations); - for (int i = 0; i < num_stations; i++) { - write_int_to_fd(print_fd, i); - char *comma = ","; - write(print_fd, comma, strlen(comma)); - - // write file path - char* file_path = station_data[i].filePath; - write(print_fd, file_path, strlen(file_path)); - - for (int j = 0; j < max_active_users; j++) { - if (!user_data[j].sockfd || user_data[j].sockfd == -1) - continue; - if (user_data[j].stationNum == i) { - char *localhost_ip = ",127.0.0.1:"; - write(print_fd, localhost_ip, strlen(localhost_ip)); - // write udpPort - write_int_to_fd(print_fd, user_data[j].udpPort); - } - } - // wrtie new line - char *newline = "\n"; - write(print_fd, newline, strlen(newline)); - } - - if (print_fd != STDOUT_FILENO) close(print_fd); - return (NULL); -} - -int sendall(int udp_sockfd, char *buf, int *len, struct addrinfo *thread_res) -{ - int MAX_PACKET_SIZE = 512; - int total = 0; // how many bytes we've sent - int bytesleft = *len; // how many we have left to send - int n; - - while(total < *len) { - n = sendto(udp_sockfd, buf+total, MAX_PACKET_SIZE, 0, thread_res->ai_addr, thread_res->ai_addrlen); - // thread_res->ai_addr, thread_res->ai_addrlen)) == -1; - if (n == -1) { break; } - total += n; - bytesleft -= n; - } - - *len = total; // return number actually sent here - - return n==-1?-1:0; // return -1 on failure, 0 on success -} - - -/* Make the manager routine */ -void *send_udp_packet_routine(void *arg) { - // unpack args - int user_index = (int) arg; - // printf("thread : user_index: %d\n", user_index); - // print user data - // print_user_data(user_index); - - // declare vairables to be used - int did_work = 1; - pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER; - int s; - int udp_sockfd; - struct addrinfo thread_hints, *thread_res, *thread_servinfo; - int error_code; - - // TODO: add error checking on these calls*** - - // setup hints - memset(&thread_hints, 0, sizeof thread_hints); - thread_hints.ai_family = AF_INET; // use IPv4 only - thread_hints.ai_socktype = SOCK_DGRAM; - thread_hints.ai_flags = AI_PASSIVE; // fill in my IP for me - - // setup the socket for client listener DATAGRAM (udp) - // cover the port integer to a string - int int_port = user_data[user_index].udpPort; - int length = snprintf( NULL, 0, "%d", int_port ); - char* port = malloc( length + 1 ); - if (!port) { perror("malloc"); return (NULL); } - snprintf( port, length + 1, "%d", int_port ); - sprintf(port, "%d", int_port); - - if (error_code = getaddrinfo(NULL, port, &thread_hints, &thread_servinfo) != 0) - { - fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(error_code)); - return (NULL); - } - free(port); - - // loop through all the results and make a socket - for(thread_res = thread_servinfo; thread_res != NULL; thread_res = thread_res->ai_next) { - if ((udp_sockfd = socket(thread_res->ai_family, thread_res->ai_socktype, - thread_res->ai_protocol)) == -1) { - perror("talker: socket"); - continue; - } - break; - } - if (udp_sockfd == NULL) { - fprintf(stderr, "talker: failed to create socket\n"); - return (NULL); - } - - // bind(udp_sockfd, thread_res->ai_addr, thread_res->ai_addrlen); - - - // freeaddrinfo(thread_servinfo); - - while (1) { - // wait for - pthread_mutex_lock(&m); - did_work = 0; - while (!start_threads) - { - pthread_cond_wait(&cond, &m); - } - - int station_num = user_data[user_index].stationNum; - if (station_num == -1) { - did_work = 1; - } - - if (!did_work) { - // sendto a random string of data to the user - int station_num = user_data[user_index].stationNum; - char *data = station_data[station_num].filePath; - // printf("load data: thread %d \n", user_index); - - // get file path - char* file_path = station_data[station_num].filePath; - // get current seek chunk - int stream_fd = open(file_path, O_RDONLY); - if (stream_fd == -1) { - perror("open"); - return (NULL); - } - int current_chunk = station_data[station_num].seekIndex; - if (lseek(stream_fd, current_chunk, SEEK_SET) == -1) { - perror("fseek"); - return (NULL); - } - size_t BYTES_PER_SECOND = 16*1024; - // read 1000 bytes of the file - char file_buffer[BYTES_PER_SECOND]; - if (read(stream_fd, file_buffer, BYTES_PER_SECOND) == -1) { - perror("fread"); - return (NULL); - } - close(stream_fd); - // printf("send data: thread %d \n", user_index); - // int numbytes; - // if ((numbytes = sendto(udp_sockfd, data, strlen(data), 0, - // thread_res->ai_addr, thread_res->ai_addrlen)) == -1) { - // perror("talker: sendto"); - // return (NULL); - // } - // print the size of the file_buffer - // printf("size of file_buffer: %lu\n", sizeof(file_buffer)); - - int bytes_sent = sizeof(file_buffer); - if (sendall(udp_sockfd, file_buffer, &bytes_sent, thread_res) == -1) - { - perror("sendall"); - printf("We only sent %d bytes because of the error!\n", bytes_sent); - } - // printf("We sent all %d bytes!\n", bytes_sent); - - did_work = 1; - - - usleep(400000); - } - - pthread_mutex_unlock(&m); - - usleep(100000); - } - return NULL; -} - -void *send_announce_routine(void *arg) { - // unpack args - int station_num = (int) arg; - // send the announce messages - for (int i = 0; i < max_active_users; i++) - { - // if (user_data[i].streamThread == NULL) { - // break; - // } - if (user_data[i].sockfd == 0 || user_data[i].sockfd == -1) { - continue; - } - // print_user_data(i); - // update the station of each user - if (user_data[i].stationNum == station_num) - { - // printf("sending announce to user %d\n", i); - send_announce_reply(user_data[i].sockfd, station_num); - } - } -} - -void *synchronization_thread(void *arg) { - int c = 0; - while (1) - { - start_threads = 1; - // printf("\nbroadcast %d\n", c++); - pthread_cond_broadcast(&cond); - usleep(2000); - start_threads = 0; - // printf("before loop"); - // update file seek index for each station - size_t BYTES_PER_SECOND = 16*1024; - // print num_stations - // printf("num_stations: %d\n", num_stations); - for (int i = 0; i < num_stations; i++) - { - // printf("checking station %d\n", i); - // get size of file - struct stat f_info; - // int file_fd = open(station_data[i].filePath, O_RDONLY); - // if (file_fd == -1) { - // perror("open"); - // return (NULL); - // } - if (stat(station_data[i].filePath, &f_info) == -1) { - perror("fstat"); - return (NULL); - } - - size_t size = f_info.st_size; - - // fclose(file_fd); - // if (size == -1) { - // perror("ftell"); - // return (NULL); - // } - station_data[i].seekIndex += BYTES_PER_SECOND; - // if the seek index is greater than the size of the file, reset it - if (station_data[i].seekIndex >= size) - { - station_data[i].seekIndex = 0; - // printf("resetting station %d\n", i); - - pthread_t send_announce_thread; - pthread_create(&send_announce_thread, NULL, send_announce_routine, (void *)i); - } - } - - - usleep(2000); - usleep(1000000-4000); - } -} - -void *select_thread(void *arg) { - fd_set master; // master file descriptor list - fd_set read_fds; // temp file descriptor list for select() - int fdmax; // maximum file descriptor number - - int listener; // listening socket descriptor - int newfd; // newly accept()ed socket descriptor - struct sockaddr_storage remoteaddr; // client address - socklen_t addrlen; - - char buf[256]; // buffer for client data - int nbytes; - - - char remoteIP[INET6_ADDRSTRLEN]; - - int yes=1; // for setsockopt() SO_REUSEADDR, below - int i, j, rv; - - struct addrinfo hints, *ai, *p; - - // const char* port = argv[1]; - - FD_ZERO(&master); // clear the master and temp sets - FD_ZERO(&read_fds); - - // LISTENER: get us a socket and bind it - memset(&hints, 0, sizeof hints); - hints.ai_family = AF_INET; - hints.ai_socktype = SOCK_STREAM; - hints.ai_flags = AI_PASSIVE; - if ((rv = getaddrinfo(NULL, port, &hints, &ai)) != 0) { - fprintf(stderr, "snowcast_server: %s\n", gai_strerror(rv)); - exit(1); - } - - for(p = ai; p != NULL; p = p->ai_next) { - listener = socket(p->ai_family, p->ai_socktype, p->ai_protocol); - if (listener < 0) { - continue; - } - - // lose the pesky "address already in use" error message - setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)); - - if (bind(listener, p->ai_addr, p->ai_addrlen) < 0) { - close(listener); - continue; - } - - break; - } - - // if we got here, it means we didn't get bound - if (p == NULL) { - fprintf(stderr, "snowcast_server: failed to bind\n"); - exit(2); - } - - freeaddrinfo(ai); // all done with this - - // listen - if (listen(listener, 10) == -1) { - perror("listen"); - exit(3); - } - - // add the listener to the master set - FD_SET(listener, &master); - - // keep track of the biggest file descriptor - fdmax = listener; // so far, it's this one - - while(1) { - read_fds = master; // copy it - if (select(fdmax+1, &read_fds, NULL, NULL, NULL) == -1) { - perror("select"); - exit(4); - } - - // run through the existing connections looking for data to read - for(i = 0; i <= fdmax; i++) { - if (FD_ISSET(i, &read_fds)) { // we got one!! - if (i == listener) { - // handle new connections - addrlen = sizeof remoteaddr; - newfd = accept(listener, - (struct sockaddr *)&remoteaddr, - &addrlen); - - if (newfd == -1) { - perror("accept"); - } else { - FD_SET(newfd, &master); // add to master set - if (newfd > fdmax) { // keep track of the max - fdmax = newfd; - } - // printf("selectserver: new connection from %s on " - // "socket %d\n.", - // inet_ntop(remoteaddr.ss_family, - // get_in_addr((struct sockaddr*)&remoteaddr), - // remoteIP, INET6_ADDRSTRLEN), - // newfd); - // init user with this newfd - init_user(newfd); - } - } else { - // handle data from a client - struct Command command; - if ((nbytes = recv(i, (char*)&command, sizeof(struct Command), 0)) <= 0) { - // got error or connection closed by client - if (nbytes == 0) { - // connection closed - // printf("selectserver: socket %d hung up\n", i); - } else { - perror("recv"); - } - close(i); // bye! - FD_CLR(i, &master); // remove from master set - // remove user from data structures - destroy_user(i); - } else { - // we got some data from a client - if (command.commandType == 0) { - // hello message with udpPort - // printf("udpPort (from Hello) for new connection is %d.\n", ntohs(command.number)); - // update udp port of user - update_user_udpPort(i, ntohs(command.number)); - - // send the welcome message to client - struct Welcome welcome; - welcome.replyType = 2; - welcome.numStations = htons(num_stations); - int numbytes; - if ((numbytes=send(i, &welcome, sizeof(struct Welcome), 0)) == -1) - perror("send"); - } - else if (command.commandType == 1) { - // check if user has a udpPort - if (user_data[sockfd_to_user[i]].udpPort == -1) { - // send back in invalid command - char * message = "Must send Hello message first."; - send_invalid_command_reply(i, strlen(message), message); - // drop connection upon invalid command - close(i); - FD_CLR(i, &master); - destroy_user(i); - continue; - } - - int station_num = ntohs(command.number); - // printf("station_num: %d\n", station_num); - if (station_num >= num_stations || station_num < 0) { - // send back in invalid command - char * message = "Station number out of range."; - send_invalid_command_reply(i, strlen(message), message); - // drop connection upon invalid command - close(i); - FD_CLR(i, &master); - destroy_user(i); - continue; - } - - // printf("setting station to %d\n", ntohs(command.number)); - // update station of user - update_user_station(i, ntohs(command.number)); - send_announce_reply(i, station_num); - } - else { - // send back in invalid command - char * message = "Invalid command"; - send_invalid_command_reply(i, strlen(message), message); - - // drop connection upon invalid command - close(i); - FD_CLR(i, &master); - destroy_user(i); - } - } - } // END handle data from client - } // END got new incoming connection - } // END looping through file descriptors - - } // END for(;;)--and you thought it would never end! -} - -void *init_user(int sockfd) { - // add the user to the list of user data - pthread_mutex_lock(&mutex_user_data); - // this is to save memory space. - // in general, the displacement of 4 is where a user "used to be" - // int user_index = max_active_users++; - // int running_index = 0; - // while(running_index < max_active_users) - // { - // if (user_data[running_index++].sockfd == -1) - // { - // user_index = running_index; - // break; - // } - // // printf("reusing memory\n"); - // } - int running_index = 0; - while(running_index < max_active_users) { - if (user_data[running_index].sockfd == -1) { - break; - } - running_index++; - } - if (running_index == max_active_users) { - // printf("reached max active users\n"); - // printf("making new memory\n"); - max_active_users++; - // TODO: FIX SO THAT IT USES CURRENT USERS, NOT MAX_ACTIVE_USERS FOT THE RESIZE - user_t *more_users = realloc(user_data, sizeof(user_t) * max_active_users); - if (!more_users) { perror("realloc"); exit(1); } - user_data = more_users; - } - - - // map TCP sockfd to this user index - user_data[running_index] = (user_t){-1, -1, sockfd, -1}; - sockfd_to_user[sockfd] = running_index; - // free(user_stream_threads); - pthread_mutex_unlock(&mutex_user_data); -} -void *update_user_udpPort(int sockfd, int udpPort) { - pthread_mutex_lock(&mutex_user_data); - // get the user - user_t *user = &user_data[sockfd_to_user[sockfd]]; - // set the udpPort - user->udpPort = udpPort; - // start the stream thread, now that we have the udpPort - pthread_create(&user->streamThread, NULL, send_udp_packet_routine, (void *)sockfd_to_user[sockfd]); - pthread_mutex_unlock(&mutex_user_data); -} -void *update_user_station(int sockfd, int stationNum) { - pthread_mutex_lock(&mutex_user_data); - user_data[sockfd_to_user[sockfd]].stationNum = stationNum; - pthread_mutex_unlock(&mutex_user_data); -} -void *print_user_data(int index) { - printf("udpPort: %d, stationNum: %d, sockfd: %d, threadId:%d\n", - user_data[index].udpPort, user_data[index].stationNum, user_data[index].sockfd, user_data[index].streamThread); -} - -void destroy_user(int sockfd) { - pthread_mutex_lock(&mutex_user_data); - // stop the thread streaming to the user - // TODO: close the FD in the stream thread - pthread_cancel(user_data[sockfd_to_user[sockfd]].streamThread); - // close(user_data[sockfd_to_user[sockfd]].udpPort); - // pthread_kill(user_data[sockfd_to_user[sockfd]].streamThread, SIGINT); - // "remove" the user from the list of user data - user_data[sockfd_to_user[sockfd]] = (user_t) {-1, -1, -1, -1}; - // map sockfd to -1 - sockfd_to_user[sockfd] = -1; - // close(sockfd); - - pthread_mutex_unlock(&mutex_user_data); -} - - -void *get_in_addr(struct sockaddr *sa) -{ - if (sa->sa_family == AF_INET) { - return &(((struct sockaddr_in*)sa)->sin_addr); - } - - return &(((struct sockaddr_in6*)sa)->sin6_addr); -} - -void send_announce_reply(int fd, int station_num) { - char* file_path = station_data[station_num].filePath; - int len_file_path = strlen(file_path); - - char *send_buffer = malloc(len_file_path+2); - if (!send_buffer) { - perror("malloc"); - return; - } - send_buffer[0] = 3; - send_buffer[1] = len_file_path; - - memcpy(send_buffer + 2, file_path, len_file_path); - - // printf("buffer: %s\n", send_buffer); - - int bytessent; - if ((bytessent = send(fd, send_buffer, len_file_path + 2, 0)) == -1) - perror("send"); - // print the number of bytes sent - // printf("sent %d bytes\n", bytessent); - - free(send_buffer); -} - -void send_invalid_command_reply(int fd, size_t message_size, char* message) { - char *send_buffer = malloc(message_size+2); - if (!send_buffer) { - perror("malloc"); - return; - } - - // type and payload size - send_buffer[0] = 4; - send_buffer[1] = message_size; - - memcpy(send_buffer + 2, message, message_size); - - // printf("buffer: %s\n", send_buffer); - - int bytessent; - if ((bytessent = send(fd, send_buffer, message_size + 2, 0)) == -1) - perror("send"); - // print the number of bytes sent - // printf("sent %d bytes\n", bytessent); - - free(send_buffer); -} - - -// Parses a buffer into tokens, from cs33 :) -int parse(char buffer[LINE_MAX], char *tokens[LINE_MAX / 2]) { - const char *regex = " \n\t\f\r"; - char *current_token = strtok(buffer, regex); - if (current_token == NULL) return 0; - - for (int i = 0; current_token != NULL; i++) { - tokens[i] = current_token; - current_token = strtok(NULL, regex); - } - - return 1; -} -- cgit v1.2.3-70-g09d2