diff options
28 files changed, 1731 insertions, 206 deletions
@@ -1,5 +1,5 @@ CC = gcc -CFLAGS = -o +CFLAGS = -g -I. -std=gnu99 -Wall -pthread all: server client @@ -10,4 +10,4 @@ client: $(CC) $(CFLAGS) snowcast_control client.c clean: - rm -f server client
\ No newline at end of file + rm -f server client diff --git a/c.dSYM/Contents/Info.plist b/c.dSYM/Contents/Info.plist new file mode 100644 index 0000000..ceced96 --- /dev/null +++ b/c.dSYM/Contents/Info.plist @@ -0,0 +1,20 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!DOCTYPE plist PUBLIC "-//Apple Computer//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd"> +<plist version="1.0"> + <dict> + <key>CFBundleDevelopmentRegion</key> + <string>English</string> + <key>CFBundleIdentifier</key> + <string>com.apple.xcode.dsym.c</string> + <key>CFBundleInfoDictionaryVersion</key> + <string>6.0</string> + <key>CFBundlePackageType</key> + <string>dSYM</string> + <key>CFBundleSignature</key> + <string>????</string> + <key>CFBundleShortVersionString</key> + <string>1.0</string> + <key>CFBundleVersion</key> + <string>1</string> + </dict> +</plist> diff --git a/c.dSYM/Contents/Resources/DWARF/c b/c.dSYM/Contents/Resources/DWARF/c Binary files differnew file mode 100644 index 0000000..4a8f904 --- /dev/null +++ b/c.dSYM/Contents/Resources/DWARF/c @@ -11,13 +11,20 @@ #include <sys/types.h> #include <netinet/in.h> #include <sys/socket.h> +#include <ctype.h> +#include <pthread.h> #include <arpa/inet.h> -#include "protocol.h" +#include "protocol.c" #define MAXDATASIZE 100 // max number of bytes we can get at once +#define MAX_READ_SIZE 1024 +#define LINE_MAX 1024 + +void *reply_thread_routine(void *args); + // get sockaddr, IPv4 or IPv6: void *get_in_addr(struct sockaddr *sa) { @@ -77,30 +84,124 @@ int main(int argc, char *argv[]) inet_ntop(p->ai_family, get_in_addr((struct sockaddr *)p->ai_addr), s, sizeof s); - // printf("client: connecting to %s\n", s); freeaddrinfo(servinfo); // all done with this structure + + pthread_t reply_thread; + pthread_create(&reply_thread, NULL, reply_thread_routine, (void*)sockfd); + + usleep(1000); + + // sleep(1); + struct Hello hello; hello.commandType = 0; // convert updPort to an int int udpPortInt = atoi(udpPort); hello.udpPort = htons(udpPortInt); - if ((numbytessent = send(sockfd, &hello, sizeof(struct Hello), 0)) == -1) { perror("send"); exit(1); } - struct Welcome msg; - // recv the message, check for errors too - if ((recvbytes = recv(sockfd, (char*)&msg, sizeof(struct snowcast_message), 0)) == -1) { - perror("recv"); - exit(1); - } - msg.numStations = ntohs(msg.numStations); - printf("Welcome to Snowcast! The server has %d stations.\n", msg.numStations); + // CONSIDER: could recieve the welcome message here + + char input[LINE_MAX]; + printf("Enter a number to change to it's station. Click q to end stream.\n"); + while (1) { + char *line = fgets(input, LINE_MAX, stdin); - close(sockfd); + if (line == NULL) { + continue; + } else if (strncmp("q\n", input, LINE_MAX) == 0) { + // end code if type in q + printf("Exiting.\n"); + break; + } else { + // convert input to an int + int inputInt = atoi(input); + // printf("Changing to station %d.\n", inputInt); + + // send the command to change the station + struct SetStation setStation; + setStation.commandType = 1; + setStation.stationNumber = htons(inputInt); + int bytes_to_send = sizeof(struct SetStation); + if (send_all(sockfd, &setStation, &bytes_to_send) == -1) { + perror("send_all"); + exit(1); + } + } + } return 0; +} + +void *reply_thread_routine(void* args) { + int sockfd = (int)args; + // int recvbytes; + while (1) { + // recv the first byte of the message to get it's type + uint8_t reply_type = -1; + // print size of utin8 + if (recv(sockfd, &reply_type, 1, 0) == -1) { + perror("recv"); + exit(1); + } + + if (reply_type == 2) { // we have a welcome message + // recv the message, check for errors too + int16_t num_stations = -1; + int bytes_to_read = sizeof(uint16_t); + if (recv_all(sockfd, &num_stations, &bytes_to_read) == -1) { + perror("recv_all"); + exit(1); + } + num_stations = ntohs(num_stations); + printf("Welcome to Snowcast! The server has %d stations.\n", num_stations); + continue; + } + + if (reply_type == 3) { // we have an announce message + // get the string size + u_int8_t string_size = -1; + if (recv(sockfd, &string_size, 1, 0) == -1) { + perror("recv"); + exit(1); + } + char *song_name = malloc(string_size); + if(song_name == NULL) { perror("malloc in song name"); } + + int bytes_to_read = string_size; + if (recv_all(sockfd, song_name, &bytes_to_read) == -1) { + perror("recv_all"); + exit(1); + } + printf("New song announced: %s\n", song_name); + free(song_name); + continue; + } else if (reply_type == 4) { // we have an invalid command message + // get the string size + u_int8_t string_size = -1; + if (recv(sockfd, &string_size, 1, 0) == -1) { + perror("recv"); + exit(1); + } + char *message = malloc(string_size); + if(message == NULL) { perror("malloc in message"); } + int bytes_to_read = string_size; + if (recv_all(sockfd, message, &bytes_to_read) == -1) { + perror("recv_all"); + exit(1); + } + printf("Invalid protocol: %s. Exiting.\n", message); + free(message); + close(sockfd); + exit(1); + } + + printf("Lost connection to server. Exiting.\n"); + close(sockfd); + exit(1); + } }
\ No newline at end of file diff --git a/l.dSYM/Contents/Info.plist b/l.dSYM/Contents/Info.plist new file mode 100644 index 0000000..7025c85 --- /dev/null +++ b/l.dSYM/Contents/Info.plist @@ -0,0 +1,20 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!DOCTYPE plist PUBLIC "-//Apple Computer//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd"> +<plist version="1.0"> + <dict> + <key>CFBundleDevelopmentRegion</key> + <string>English</string> + <key>CFBundleIdentifier</key> + <string>com.apple.xcode.dsym.l</string> + <key>CFBundleInfoDictionaryVersion</key> + <string>6.0</string> + <key>CFBundlePackageType</key> + <string>dSYM</string> + <key>CFBundleSignature</key> + <string>????</string> + <key>CFBundleShortVersionString</key> + <string>1.0</string> + <key>CFBundleVersion</key> + <string>1</string> + </dict> +</plist> diff --git a/l.dSYM/Contents/Resources/DWARF/l b/l.dSYM/Contents/Resources/DWARF/l Binary files differnew file mode 100644 index 0000000..283ecc9 --- /dev/null +++ b/l.dSYM/Contents/Resources/DWARF/l @@ -13,9 +13,9 @@ #include <arpa/inet.h> #include <netdb.h> -#define MYPORT "4950" // the port users will be connecting to +// #define MYPORT "4950" // the port users will be connecting to -#define MAXBUFLEN 100 +#define MAXBUFLEN 16384 // get sockaddr, IPv4 or IPv6: void *get_in_addr(struct sockaddr *sa) @@ -27,23 +27,30 @@ void *get_in_addr(struct sockaddr *sa) return &(((struct sockaddr_in6*)sa)->sin6_addr); } -int main(void) +int main(int argc, char *argv[]) { int sockfd; struct addrinfo hints, *servinfo, *p; int rv; int numbytes; struct sockaddr_storage their_addr; - char buf[MAXBUFLEN]; socklen_t addr_len; char s[INET6_ADDRSTRLEN]; + if (argc != 2) { + fprintf(stderr,"<udp port>\n"); + exit(1); + } + + const char* udp_port = argv[1]; + + memset(&hints, 0, sizeof hints); - hints.ai_family = AF_INET6; // set to AF_INET to use IPv4 + hints.ai_family = AF_INET; // set to AF_INET to use IPv4 hints.ai_socktype = SOCK_DGRAM; hints.ai_flags = AI_PASSIVE; // use my IP - if ((rv = getaddrinfo(NULL, MYPORT, &hints, &servinfo)) != 0) { + if ((rv = getaddrinfo(NULL, udp_port, &hints, &servinfo)) != 0) { fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv)); return 1; } @@ -72,22 +79,21 @@ int main(void) freeaddrinfo(servinfo); - printf("listener: waiting to recvfrom...\n"); + int count = 0; - addr_len = sizeof their_addr; - if ((numbytes = recvfrom(sockfd, buf, MAXBUFLEN-1 , 0, - (struct sockaddr *)&their_addr, &addr_len)) == -1) { - perror("recvfrom"); - exit(1); - } + char buf[MAXBUFLEN]; + while(1) { + addr_len = sizeof their_addr; + if ((numbytes = recvfrom(sockfd, buf, MAXBUFLEN , 0, + (struct sockaddr *)&their_addr, &addr_len)) == -1) { + perror("recvfrom"); + exit(1); + } + // print the buffer + write(STDOUT_FILENO, buf, numbytes); - printf("listener: got packet from %s\n", - inet_ntop(their_addr.ss_family, - get_in_addr((struct sockaddr *)&their_addr), - s, sizeof s)); - printf("listener: packet is %d bytes long\n", numbytes); - buf[numbytes] = '\0'; - printf("listener: packet contains \"%s\"\n", buf); + memset(buf, 0, MAXBUFLEN); + } close(sockfd); diff --git a/mp3/test.mp3 b/mp3/test.mp3 Binary files differBinary files differnew file mode 100644 index 0000000..9f4f996 --- /dev/null +++ b/mp3/test.mp3 @@ -0,0 +1,497 @@ +#include <string.h> +#include <stdlib.h> +#include <unistd.h> +#include <sys/syscall.h> +#include <pthread.h> +#include <stdio.h> +#include <fcntl.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <netdb.h> +#include <arpa/inet.h> + +#include "protocol.c" + + + +#define LINE_MAX 1024 +#define MAX_RATE_PER_SECOND 16*1024 +#define TCP_TIMEOUT 100000 // 100ms in microseconds + +typedef struct station { + pthread_t thread; + int fd; + char *path; +} station_t; + +int num_stations; +station_t *stations; +setup_stations(int argc, char *argv[]); +*stream_routine(void *arg); + +int setup_listener(int port); +*accept_routine(void *arg); + +// user data structure +typedef struct user { + int tcpfd; + int udpfd; + pthread_t command_thread; + int station; +} user_t; +int num_users = 0; +user_t *users; +int tcpfd_max = 0; +int *fd_to_user; +pthread_mutex_t mutex_users = PTHREAD_MUTEX_INITIALIZER; +*init_user_routine(void *arg); +int init_user(int tcpfd, int udpfd); +void destroy_user(int fd); +*command_routine(void *arg); + +send_invalid_reply(int fd, char *message) { + printf("sending INVALID reply to socket %d\n", fd); + size_t message_size = strlen(message); + char buf[message_size + 2]; + // type and payload size + buf[0] = 4; + buf[1] = message_size; + memcpy(buf + 2, message, message_size); + + int size_buf = message_size + 2; + if (send_all(fd, &buf, &size_buf) == -1) { + perror("send_all (in init_user_routine)"); + return -1; + } + + return 1; +} + +print_users(); + +main(int argc, char *argv[]) { + if (argc < 3) { + printf("usage: ./snowcast_server <port> <file0> [file 1] [file 2] ...\n"); + exit(1); + } + + setup_stations(argc, argv); + + users = malloc(0); + fd_to_user = malloc(0); + + int port = atoi(argv[1]); + int listenerfd = setup_listener(port); + pthread_t accept_thread; + pthread_create(&accept_thread, NULL, accept_routine, listenerfd); + + + while(1) + ; +} + +setup_stations(int argc, char *argv[]) { + num_stations = argc - 2; + + // get the size to malloc + int totalSize = 0; + for(int i = 2; i < argc; i++) + { + // printf("file: %s\n", argv[i]); + totalSize += sizeof(pthread_t) + sizeof(int) + strlen(argv[i]); + } + + // malloc the stations array + stations = malloc(totalSize); + if (!stations) { perror("malloc (stations pointer)"); exit(1); } + // assign the stations, and start the threads + for (int i = 0; i < num_stations; i++) { + stations[i].path = argv[i+2]; + stations[i].fd = open(argv[i+2], O_RDONLY); + printf(stations[i].path); + if (stations[i].fd < 0) { perror("read (from station file)"); exit(1); } + pthread_create(&stations[i].thread, NULL, stream_routine, &stations[i].fd); + } + + printf("successfully created %d stations\n", num_stations); +} + +int setup_listener(int port) { + int sock = socket(AF_INET, SOCK_STREAM, 0); + if (sock < 0) { perror("socket (listener)"); return -1; } + + struct sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + addr.sin_addr.s_addr = INADDR_ANY; + + if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0) { + perror("bind (listener)"); + return -1; + } + + if (listen(sock, 0) < 0) { perror("listen (listener)"); return -1; } + + return sock; +} + +// THREAD FUNCTIONS + +// helper +int read_file(int fd, char buffer[MAX_RATE_PER_SECOND]) { + int bytes_read = read(fd, buffer, MAX_RATE_PER_SECOND); + if (bytes_read < 0) { perror("read (from station file)"); return -1; } + // printf("bytes read: %d\n", bytes_read); + if (bytes_read == 0) { + // printf("end of file, restarting\n"); + if(lseek(fd, 0, SEEK_SET) == -1) { perror("lseek (in resarting file)"); return -1; } + bytes_read = read(fd, buffer, MAX_RATE_PER_SECOND); + if (bytes_read < 0) { perror("read (from station file, after restart)"); return -1; } + } + + return bytes_read; +} + +*stream_cleanup(void *arg) { + int fd = *(int*)arg; + printf("cleanup/delete station\n"); + return (NULL); +} + +*stream_routine(void *arg) { + int fd = *(int*)arg; + + pthread_cleanup_push(stream_cleanup, fd); + + // make buffer which will be used to stream to children + char buffer[MAX_RATE_PER_SECOND]; + memset(buffer, 0, MAX_RATE_PER_SECOND); + // if (!buffer) { perror("malloc (buffer in station thread)"); exit(1); } + + for (;;) + { + // load bytes into buffer + int bytes_read = read_file(fd, buffer); + if (bytes_read == -1) { exit(1); } + + // TODO: send buffer to children + + sleep(1); + memset(buffer, 0, MAX_RATE_PER_SECOND); + } + + pthread_cleanup_pop(1); + + return (NULL); +} + +*accept_cleanup(void *arg) { + int fd = (int) arg; + printf("cleanup/delete accept\n"); + close(fd); + return (NULL); +} + +*accept_routine(void *arg) { + int listener = (int) arg; + + // pthread_cleanup_push(accept_cleanup, listener); + + while(1) { + printf("accepting %d\n", listener); + int userfd = accept(listener, NULL, NULL); + if (userfd < 0) { perror("accept (in accept thread)"); return(NULL); } + + printf("accepted socket %d\n", userfd); + + pthread_t init_user_thread; + pthread_create(&init_user_thread, NULL, init_user_routine, userfd); + } + + // pthread_cleanup_pop(1); +} + +apply_timeout(int fd) { + // handle handshake + struct timeval tv; + tv.tv_sec = 0; + tv.tv_usec = TCP_TIMEOUT; + if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0) { + perror("setsockopt (in apply_timeout)"); + return -1; + } + + return 1; +} + +remove_timeout(int fd) +{ + // handle handshake + struct timeval tv; + tv.tv_sec = 0; + tv.tv_usec = 0; + if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0) { + perror("setsockopt (in remove_timeout)"); + return -1; + } + + return 1; +} + +int handle_handshake(int userfd) { + if (apply_timeout(userfd) == -1) { return -1; } + + // get the command type + uint8_t command_type = -1; + if ((recv(userfd, &command_type, 1, 0)) < 0) + { + perror("recv (in init_user_routine)"); + return -1; + } + + // check + if (command_type != 0) { + printf("user on socket %d must send a Hello message first", userfd); + return -1; + } + + // get the udp port + uint16_t udp_port = -1; + int bytes_to_read = sizeof(uint16_t); + if (recv_all(userfd, &udp_port, &bytes_to_read) == -1) { + perror("recv_all (in init_user_routine)"); + return -1; + } + // remove timeout + if (remove_timeout(userfd) == -1) { return -1; } + + printf("recieved HELLO command from socket %d\n", userfd); + + + // make udp socket + int udpfd = socket(AF_INET, SOCK_DGRAM, AI_PASSIVE); + if (udpfd < 0) + { + perror("socket (in init_user_routine UDP)"); + return -1; + } + + return udpfd; +} + +send_welcome_reply(int fd) { + printf("sending WELCOME reply to socket %d\n", fd); + + struct Welcome welcome; + welcome.replyType = 2; + printf("num_stations: %d\n", num_stations); + welcome.numStations = htons(num_stations); + int bytes_to_send = sizeof(struct Welcome); + if (send_all(fd, &welcome, &bytes_to_send) == -1) { + perror("send_all (in init_user_routine)"); + return -1; + } + + return 1; +} + +*init_user_cleanup(void *arg) { + int fd = (int) arg; + // printf("cleanup/delete user_maybe?\n"); + close(fd); + return (NULL); +} + +*init_user_routine(void *arg) { + int userfd = (int) arg; + // pthread_cleanup_push(init_user_cleanup, userfd); + + printf("new user on socket %d, waiting for HELLO\n", userfd); + int udpfd = handle_handshake(userfd); + if (udpfd == -1) + { + perror("handle_handshake (in init_user_routine)"); + close(userfd); + return (NULL); + } + + if(send_welcome_reply(userfd) == -1) { + perror("send_welcome_reply (in init_user_routine)"); + close(userfd); + return (NULL); + } + + if (init_user(userfd, udpfd) != 0) { + perror("init_user (in init_user_routine)"); + destroy_user(userfd); + return (NULL); + } + + return (NULL); + + // pthread_cleanup_pop(0); +} + +*command_cleanup(void *arg) { + int fd = (int) arg; + printf("cleanup/delete command\n"); + close(fd); + return (NULL); +} + +handle_setstation_command(int fd, uint16_t station_number) { + printf("received SETSTATION command from socket %d\n", fd); + // check if station number is valid + int station_num = ntohs(station_number); + if (station_num < 0 || station_num >= num_stations) { + printf("station number %d is invalid\n", station_num); + send_invalid_reply(fd, "station number is invalid"); + return -1; + } + + // set the station number + pthread_mutex_lock(&mutex_users); + printf("setting station number of user on socket %d to %d, user! %d\n", fd, station_num, fd_to_user[fd]); + users[fd_to_user[fd]].station = station_num; + pthread_mutex_unlock(&mutex_users); + + print_users(); + + return 1; +} + +*command_routine(void *arg) { + int fd = (int) arg; + + printf("waiting for SETSTATION from socket %d\n", fd); + + while(1) { + // get the command type + uint8_t command_type = -1; + if ((recv(fd, &command_type, 1, 0)) < 0) + { + perror("recv (in command_routine)"); + destroy_user(fd); + return (NULL); + } + + // check if have sent hello before + if (command_type == 0) { + printf("user on socket %d has already sent a HELLO command\n", fd); + send_invalid_reply(fd, "already sent a HELLO command"); + destroy_user(fd); + return (NULL); + } + + else if (command_type == 1) { + // get the station number + uint16_t station_number = -1; + int bytes_to_read = sizeof(uint16_t); + if (recv_all(fd, &station_number, &bytes_to_read) == -1) { + perror("recv_all (in command_routine)"); + destroy_user(fd); + return (NULL); + } + if (handle_setstation_command(fd, station_number) == -1) { + destroy_user(fd); + return (NULL); + } + } + + else if (command_type == 5) { + printf("user on socket %d has requested a LIST\n", fd); + } + + else { + printf("user on socket %d has sent an INVALID command type of %d\n", fd, command_type); + send_invalid_reply(fd, "invalid command type"); + destroy_user(fd); + return (NULL); + } + + + } +} + + +// HELPER FUNCTIONS + +int init_user(int sockfd, int udpfd) { + pthread_mutex_lock(&mutex_users); + printf("initializing user on sockets %d (tcp), %d (udp)\n", sockfd, udpfd); + // update map + if(sockfd > tcpfd_max) { + tcpfd_max = sockfd; + int * more_sockfd_to_user = realloc(fd_to_user, sizeof(int) * (tcpfd_max + 1)); + if (!more_sockfd_to_user) { perror("realloc"); exit(1); } + fd_to_user = more_sockfd_to_user; + } + + int running_index = 0; + while(running_index < num_users) { + if (users[running_index].tcpfd == -1) { + break; + } + running_index++; + } + if (running_index == num_users) { + // printf("reached max active users\n"); + // printf("making new memory\n"); + num_users++; + user_t *more_users = realloc(users, sizeof(user_t) * num_users); + if (!more_users) { perror("realloc"); exit(1); } + users = more_users; + } + + // map TCP sockfd to this user index + users[running_index] = (user_t){sockfd, udpfd, -1, -1}; + pthread_create(&users[running_index].command_thread, NULL, command_routine, sockfd); + fd_to_user[sockfd] = running_index; + // free(user_stream_threads); + + print_users(); + pthread_mutex_unlock(&mutex_users); + + return 0; +} + +void destroy_user(int fd) { + pthread_mutex_lock(&mutex_users); + + printf("destroying user on sockets %d (tcp), %d (udp)\n", users[fd_to_user[fd]].tcpfd, users[fd_to_user[fd]].udpfd); + + // close the sockets + + close(fd); + close(users[fd_to_user[fd]].udpfd); + // stop the thread taking commands to the user + pthread_cancel(&users[fd_to_user[fd]].command_thread); + // "remove" the user from the list of user data + users[fd_to_user[fd]] = (user_t) {-1, -1, -1, -1}; + // map sockfd to -1 + fd_to_user[fd] = -1; + pthread_mutex_unlock(&mutex_users); +} + + +print_users() { + printf("num users: %d\n", num_users); + for (int i = 0; i < num_users; i++) { + printf("tcpfd %d , udpfd %d , station %d |\n", users[i].tcpfd, users[i].udpfd, users[i].station); + } + printf("\n"); +} + +// Parses a buffer into tokens, from cs33 :) +int parse(char buffer[LINE_MAX], char *tokens[LINE_MAX / 2]) { + const char *regex = " \n\t\f\r"; + char *current_token = strtok(buffer, regex); + if (current_token == NULL) return 0; + + for (int i = 0; current_token != NULL; i++) { + tokens[i] = current_token; + current_token = strtok(NULL, regex); + } + + return 1; +}
\ No newline at end of file diff --git a/new.dSYM/Contents/Info.plist b/new.dSYM/Contents/Info.plist new file mode 100644 index 0000000..ba6d460 --- /dev/null +++ b/new.dSYM/Contents/Info.plist @@ -0,0 +1,20 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!DOCTYPE plist PUBLIC "-//Apple Computer//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd"> +<plist version="1.0"> + <dict> + <key>CFBundleDevelopmentRegion</key> + <string>English</string> + <key>CFBundleIdentifier</key> + <string>com.apple.xcode.dsym.new</string> + <key>CFBundleInfoDictionaryVersion</key> + <string>6.0</string> + <key>CFBundlePackageType</key> + <string>dSYM</string> + <key>CFBundleSignature</key> + <string>????</string> + <key>CFBundleShortVersionString</key> + <string>1.0</string> + <key>CFBundleVersion</key> + <string>1</string> + </dict> +</plist> diff --git a/new.dSYM/Contents/Resources/DWARF/new b/new.dSYM/Contents/Resources/DWARF/new Binary files differnew file mode 100644 index 0000000..3200cf1 --- /dev/null +++ b/new.dSYM/Contents/Resources/DWARF/new @@ -0,0 +1,2 @@ +0,mp3/ManchurianCandidates-Breakin.mp3,127.0.0.1:12345,127.0.0.1:12346 +1,mp3/VanillaIce-IceIceBaby.mp3 diff --git a/protocol.c b/protocol.c new file mode 100644 index 0000000..d350221 --- /dev/null +++ b/protocol.c @@ -0,0 +1,66 @@ +#include <sys/types.h> +#include <sys/socket.h> + +#include "protocol.h" + +#define TIMEOUT 100000 // 100ms in microseconds + +int send_all(int sock, char *buf, int *len) +{ + struct timeval timeout; + timeout.tv_sec = 0; + timeout.tv_usec = 100000; + // if (setsockopt (sock, SOL_SOCKET, SO_RCVTIMEO, &timeout, + // sizeof timeout) < 0) + // perror("setsockopt failed\n"); + + int total = 0; // how many bytes we've sent + int bytesleft = *len; // how many we have left to send + int n; + + while(total < *len) { + n = send(sock, buf+total, bytesleft, 0); + if (n == -1) { break; } + total += n; + bytesleft -= n; + } + + *len = total; // return number actually sent here + + return n==-1?-1:0; // return -1 on failure, 0 on success +} + +int recv_all(int sock, char *buf, int *len) +{ + // setup the timeout on the socket + struct timeval timeout; + timeout.tv_sec = 0; + timeout.tv_usec = TIMEOUT; + if (setsockopt (sock, SOL_SOCKET, SO_RCVTIMEO, &timeout, + sizeof timeout) < 0) + perror("setsockopt failed\n"); + + // printf("start: %ld\n", start); + int total = 0; // how many bytes we've sent + int bytesleft = *len; // how many we have left to send + int n; + + while(total < *len) { + n = recv(sock, buf+total, bytesleft, 0); + // start = time(NULL); + if (n == -1) { break; } + total += n; + bytesleft -= n; + } + + timeout.tv_sec = 0; + timeout.tv_usec = 0; + if (setsockopt (sock, SOL_SOCKET, SO_RCVTIMEO, &timeout, + sizeof timeout) < 0) + perror("setsockopt failed\n"); + + + *len = total; // return number actually sent here + + return n==-1?-1:0; // return -1 on failure, 0 on success +} @@ -1,16 +1,41 @@ #include <stdint.h> // Provides uint8_t, int8_t, etc. -struct snowcast_message { - uint8_t type; +// client to server messages (commands) + +struct Command { + uint8_t commandType; uint16_t number; } __attribute__((packed)); +struct Hello { + uint8_t commandType; + uint16_t udpPort; +} __attribute__((packed)); +struct SetStation { + uint8_t commandType; + uint16_t stationNumber; +} __attribute__((packed)); + +// server to client message (replies) struct Welcome { uint8_t replyType; uint16_t numStations; } __attribute__((packed)); -struct Hello { - uint8_t commandType; - uint16_t udpPort; +struct Reply { + uint8_t replyType; + uint8_t stringSize; +} reply_t __attribute__((packed)); +struct Announce { + uint8_t replyType; + uint8_t songnameSize; + char *songname; +} __attribute__((packed)); +struct InvalidCommand { + uint8_t replyType; + uint8_t replyStringSize; + char *replyString; } __attribute__((packed)); + +int send_all(int sock, char *buf, int *len); +int recv_all(int sock, char *buf, int *len); diff --git a/s.dSYM/Contents/Info.plist b/s.dSYM/Contents/Info.plist new file mode 100644 index 0000000..9ce500c --- /dev/null +++ b/s.dSYM/Contents/Info.plist @@ -0,0 +1,20 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!DOCTYPE plist PUBLIC "-//Apple Computer//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd"> +<plist version="1.0"> + <dict> + <key>CFBundleDevelopmentRegion</key> + <string>English</string> + <key>CFBundleIdentifier</key> + <string>com.apple.xcode.dsym.s</string> + <key>CFBundleInfoDictionaryVersion</key> + <string>6.0</string> + <key>CFBundlePackageType</key> + <string>dSYM</string> + <key>CFBundleSignature</key> + <string>????</string> + <key>CFBundleShortVersionString</key> + <string>1.0</string> + <key>CFBundleVersion</key> + <string>1</string> + </dict> +</plist> diff --git a/s.dSYM/Contents/Resources/DWARF/s b/s.dSYM/Contents/Resources/DWARF/s Binary files differnew file mode 100644 index 0000000..489e060 --- /dev/null +++ b/s.dSYM/Contents/Resources/DWARF/s @@ -1,154 +1,909 @@ -/* -** server.c -- a stream socket server demo -*/ - -#include <stdio.h> #include <stdlib.h> -#include <stdint.h> +#include <pthread.h> +#include <stdio.h> #include <unistd.h> -#include <errno.h> -#include <string.h> -#include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> -#include <netdb.h> #include <arpa/inet.h> -#include <sys/wait.h> -#include <signal.h> +#include <netdb.h> +#include <string.h> + +#include <sys/stat.h> +#include <fcntl.h> +#include <sys/types.h> + +#include "protocol.c" + +#define LINE_MAX 1024 +#define MAX_USERS 1000 +#define MAX_PATH 50 +#define MAX_RATE_PER_SECOND 16*1024 / 2 + +// typedef struct station { +// int streamFd; +// char* filePath; +// int fileBufferSize; +// char fileBuffer[MAX_STREAM_RATE]; +// } station_t; +typedef struct station { + pthread_t streamThread; + int readfd; + char *filePath; +} station_t; +int num_stations; +station_t *stations; +int setup_stations(int argc, char *argv[]); +void *stream_routine(void *arg); + + +typedef struct user { + int udpPort; + int stationNum; + int sockfd; +} user_t; + + +/* For safe condition variable usage, must use a boolean predicate and */ +/* a mutex with the condition. */ +int count = 0; +pthread_cond_t cond = PTHREAD_COND_INITIALIZER; +pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; + +pthread_mutex_t station_mutex = PTHREAD_MUTEX_INITIALIZER; + +const char *port; +// int num_stations; + +int start_threads = 0; +int max_active_users = 0; +int max_sockfd = 0; + +pthread_mutex_t mutex_user_data = PTHREAD_MUTEX_INITIALIZER; +// array from index to user_data +user_t *user_data; +int *sockfd_to_user; + +// stations array pointer +// station_t *station_data; + +struct udp_packet_routine_args { + int user_index; + int buffer_size; + char *file_buffer; +}; + +void *send_udp_packet_routine(void* arg); +void *select_routine(void* arg); +void *sync_routine(void* arg); +void *send_announce_routine(void* arg); + +void init_station(int station_num, const char *station_name); + +int parse(char buffer[LINE_MAX], char *tokens[LINE_MAX / 2]); +void *print_info_routine(void *arg); -#include "protocol.h" +void *get_in_addr(struct sockaddr *sa); -#define BACKLOG 10 // how many pending connections queue will hold +void *init_user(int sockfd); +void *update_user_udpPort(int sockfd, int udpPort); +void *update_user_station(int sockfd, int stationNum); +void *print_user_data(int sockfd); +void destroy_user(int sockfd); -#define MAXDATASIZE 100 // max number of bytes we can get at once +void send_announce_reply(int fd, int station_num); +void send_invalid_command_reply(int fd, size_t message_size, char* message); +// void *load_file(void* arg); -void sigchld_handler(int s) +main(int argc, char *argv[]) { - // waitpid() might overwrite errno, so we save and restore it: - int saved_errno = errno; + // check and assign arguments + if (argc < 3) { + fprintf(stderr,"usage: ./snowcast_server <listen port> <file0> [file 1] [file 2] ... \n"); + exit(1); + } + + // initizlize the port + port = argv[1]; + + // initialize the stations & their threads + if (setup_stations(argc, argv) == -1) { + perror("setup_stations"); + exit(1); + } + + // make array of user data + printf("max active users: %d\n", sizeof(user_t) * max_active_users); + user_data = malloc(sizeof(user_t) * max_active_users); + if (!user_data) { perror("malloc userdata"); return 1; } + sockfd_to_user = malloc(sizeof(int) * max_active_users); + if (!sockfd_to_user) { perror("malloc sockfd to user"); return 1; } + + // make and start "select" thread that manages: + // 1) new connections, 2) requests from current connections, 3) closing connections + pthread_t select_thread; + pthread_create(&select_thread, NULL, select_routine, NULL); + + // start syncchronization thread to broadcast stations + // pthread_t sync_thread; + // pthread_create(&sync_thread, NULL, sync_routine, NULL); + + // command line interface + char input[LINE_MAX]; + memset(input, 0, LINE_MAX); + + char *tokens[LINE_MAX / 2]; + while (read(STDIN_FILENO, input, LINE_MAX) > 0) { + // init tokens + memset(tokens, 0, (LINE_MAX / 2) * sizeof(char *)); + + // if 0, all whitespace + if (!parse(input, tokens)) + continue; - while(waitpid(-1, NULL, WNOHANG) > 0); + char *command = tokens[0]; + // if q, shutdown! + if (!strcmp(command, "q")) { + printf("Exiting.\n"); + // TODO: exit better than break + break; + } + + // if p, print info + else if (!strcmp(command, "p")) { + // get the file descriptor + int print_fd = 0; + // see if there is a file path + char *output_file_path = tokens[1]; + if (output_file_path != NULL) + { + if ((print_fd = open(output_file_path, O_CREAT | O_WRONLY | O_TRUNC, S_IRWXU)) == -1) + { + perror("open"); + continue; + } + } else { + print_fd = STDOUT_FILENO; + } + // printf("print_fd: %d\n", print_fd); + pthread_t print_info_thread; + pthread_create(&print_info_thread, NULL, print_info_routine, print_fd); + // note - this file descriptor is closed in the thread + } + else if (!strcmp(command, "u")) + { + // print all user data + for (int i = 0; i < max_active_users; i++) + { + print_user_data(i); + } + } + } - errno = saved_errno; + return 0; } +int read_file(int fd, char buffer[MAX_RATE_PER_SECOND], int station_num) { + int bytes_read = read(fd, buffer, MAX_RATE_PER_SECOND); + if (bytes_read < 0) { perror("read (from station file)"); return -1; } + // printf("bytes read: %d\n", bytes_read); + if (bytes_read == 0) { + // printf("end of file, restarting\n"); + pthread_t send_announce_thread; + pthread_create(&send_announce_thread, NULL, send_announce_routine, station_num); -// get sockaddr, IPv4 or IPv6: -void *get_in_addr(struct sockaddr *sa) -{ - if (sa->sa_family == AF_INET) { - return &(((struct sockaddr_in*)sa)->sin_addr); + if (lseek(fd, 0, SEEK_SET) == -1) + { + perror("lseek (in resarting file)"); + return -1; + } + bytes_read = read(fd, buffer, MAX_RATE_PER_SECOND); + if (bytes_read < 0) { perror("read (from station file, after restart)"); return -1; } } - return &(((struct sockaddr_in6*)sa)->sin6_addr); + return bytes_read; +} + +void *stream_routine_cleanup(void *arg) { + int read_fd = (int) arg; + close(read_fd); +} + +void *stream_routine(void *arg) { + int station_num = (int) arg; + printf("stream routine %d\n", station_num); + int read_fd = stations[station_num].readfd; + + pthread_cleanup_push(stream_routine_cleanup, read_fd); + + // make buffer which will be used to stream to children + char buffer[MAX_RATE_PER_SECOND]; + memset(buffer, 0, MAX_RATE_PER_SECOND); + // if (!buffer) { perror("malloc (buffer in station thread)"); exit(1); } + + for (;;) + { + // load bytes into buffer + int bytes_read = read_file(read_fd, buffer, station_num); + if (bytes_read == -1) { exit(1); } + + // TODO: send buffer to children + char *send_buffer = malloc(2 + bytes_read); + for (int i = 0; i < max_active_users; i++) + { + if (!user_data[i].sockfd || user_data[i].sockfd == -1) + continue; + if (user_data[i].stationNum == station_num) + { + // send the udp packet + int *send_buffer = malloc(2 + bytes_read); + memset(send_buffer, 0, 2 + bytes_read); + send_buffer[0] = i; + send_buffer[1] = bytes_read; + memcpy(send_buffer+2, buffer, bytes_read); + // printf("sending udp packet to user %d\n", i); + pthread_t t; + pthread_create(&t, NULL, send_udp_packet_routine, send_buffer); + } + } + free(send_buffer); + usleep(1000000 / 2 - 5000); + start_threads = 1; + pthread_cond_broadcast(&cond); + + usleep(5000); + start_threads = 0; + + memset(buffer, 0, MAX_RATE_PER_SECOND); + } + + return (NULL); + + pthread_cleanup_pop(1); } -int main(int argc, char *argv[]) +int setup_stations(int argc, char *argv[]) { + num_stations = argc - 2; + + // get the size to malloc + int totalSize = 0; + for(int i = 2; i < argc; i++) + { + totalSize += sizeof(pthread_t) + sizeof(int) + strlen(argv[i]); + } + + // malloc the stations array + stations = malloc(totalSize); + if (!stations) { perror("malloc (stations pointer)"); return -1; } + // assign the stations, and start the threads + for (int i = 0; i < num_stations; i++) { + stations[i].filePath = argv[i+2]; + stations[i].readfd = open(argv[i+2], O_RDONLY); + if (stations[i].readfd < 0) { perror("read (from station file)"); return -1; } + pthread_create(&stations[i].streamThread, NULL, stream_routine, i); + } + + printf("successfully created %d stations\n", num_stations); + return 1; +} + +void write_int_to_fd(int fd, int n) { + int l = snprintf(NULL, 0, "%d", n); + char *num = malloc(l + 1); + if (!num) { perror("malloc write to fd"); return; } + + snprintf(num, l + 1, "%d", n); + if (write(fd, num, strlen(num)) == -1) { + perror("write"); + } + + + free(num); +} + +void *print_info_routine(void *arg) { + int print_fd = (int) arg; + // printf("thread print_fd: %d\n", print_fd); + // printf("num_stations: %d\n", num_stations); + for (int i = 0; i < num_stations; i++) { + write_int_to_fd(print_fd, i); + char *comma = ","; + write(print_fd, comma, strlen(comma)); + + // write file path + char* file_path = stations[i].filePath; + write(print_fd, file_path, strlen(file_path)); + + for (int j = 0; j < max_active_users; j++) { + if (!user_data[j].sockfd || user_data[j].sockfd == -1) + continue; + if (user_data[j].stationNum == i) { + char *localhost_ip = ",127.0.0.1:"; + write(print_fd, localhost_ip, strlen(localhost_ip)); + // write udpPort + write_int_to_fd(print_fd, user_data[j].udpPort); + } + } + // wrtie new line + char *newline = "\n"; + write(print_fd, newline, strlen(newline)); + } + + if (print_fd != STDOUT_FILENO) close(print_fd); + return (NULL); +} + +int send_all_udp(int udp_sockfd, char *buf, int *len, struct addrinfo *thread_res) { - int sockfd, new_fd, numbytes, b; // listen on sock_fd, new connection on new_fd - char buf[MAXDATASIZE]; - struct addrinfo hints, *servinfo, *p; - struct sockaddr_storage their_addr; // connector's address information - socklen_t sin_size; - struct sigaction sa; - int yes=1; - char s[INET6_ADDRSTRLEN]; - int rv; + int MAX_PACKET_SIZE = 512; + int total = 0; // how many bytes we've sent + int bytesleft = *len; // how many we have left to send + int n; - if (argc < 3) { - fprintf(stderr,"usage: <listen port> <file0> [file 1] [file 2] ... \n"); - exit(1); + while(total < *len) { + n = sendto(udp_sockfd, buf+total, MAX_PACKET_SIZE, 0, thread_res->ai_addr, thread_res->ai_addrlen); + // thread_res->ai_addr, thread_res->ai_addrlen)) == -1; + if (n == -1) { break; } + total += n; + bytesleft -= n; } - memset(&hints, 0, sizeof hints); - hints.ai_family = AF_INET; // only IPv4 - hints.ai_socktype = SOCK_STREAM; // TCP connection - hints.ai_flags = AI_PASSIVE; // use my IP + *len = total; // return number actually sent here + + return n==-1?-1:0; // return -1 on failure, 0 on success +} + +void udp_port_cleanup_handler(void *arg) +{ + int sockfd = (int) arg; + close(sockfd); +} + +/* Make the manager routine */ +void *send_udp_packet_routine(void *arg) { + // printf("send udp packet routine\n"); + int *buf = arg; + // unpack args + int user_index = buf[0]; + int buffer_size = buf[1]; + char *file_buffer = malloc(buffer_size); + memcpy(file_buffer, buf+2, buffer_size); + + // printf("udp packet routine, user:%d\n size: %d\n", user_index, buffer_size); + + // declare vairables to be used + int did_work = 1; + pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER; + int s; + int udp_sockfd; + struct addrinfo thread_hints, *thread_res, *thread_servinfo; + int error_code; + + // setup hints + memset(&thread_hints, 0, sizeof thread_hints); + thread_hints.ai_family = AF_INET; // use IPv4 only + thread_hints.ai_socktype = SOCK_DGRAM; + thread_hints.ai_flags = AI_PASSIVE; // fill in my IP for me + + int int_port = user_data[user_index].udpPort; + int length = snprintf( NULL, 0, "%d", int_port ); + char* port = malloc( length + 1 ); + if (!port) { perror("malloc on port"); return (NULL); } + snprintf(port, length + 1, "%d", int_port); + sprintf(port, "%d", int_port); + + if (error_code = getaddrinfo(NULL, port, &thread_hints, &thread_servinfo) != 0) + { + fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(error_code)); + return (NULL); + } + free(port); + + // loop through all the results and make a socket + for(thread_res = thread_servinfo; thread_res != NULL; thread_res = thread_res->ai_next) { + if ((udp_sockfd = socket(thread_res->ai_family, thread_res->ai_socktype, + thread_res->ai_protocol)) == -1) { + perror("talker: socket"); + continue; + } + break; + } + if (udp_sockfd == NULL) { + fprintf(stderr, "talker: failed to create socket\n"); + return (NULL); + } - const char* port = argv[1]; // the port users will be connecting to + pthread_cleanup_push(udp_port_cleanup_handler, (void *)udp_sockfd); + // wait for + pthread_mutex_lock(&m); + did_work = 0; + while (!start_threads) + { + pthread_cond_wait(&cond, &m); + } - if ((rv = getaddrinfo(NULL, port, &hints, &servinfo)) != 0) { - fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv)); - return 1; + int station_num = user_data[user_index].stationNum; + if (station_num == -1) { + did_work = 1; } + // potential error here! + // printf("station info - bytes read: %d, station_fd: %d, filePath: %s, buffersize: %d\n", bytes_read, station_info->streamFd, station_info->filePath, station_info->fileBufferSize); + + if (send_all_udp(udp_sockfd, file_buffer, &buffer_size, thread_res) == -1) + { + perror("send_all_udp"); + printf("We only sent %d bytes because of the error!\n", buffer_size); + } + + free(file_buffer); + + pthread_mutex_unlock(&m); + + pthread_cleanup_pop(1); + + return (NULL); +} - // loop through all the results and bind to the first we can - for(p = servinfo; p != NULL; p = p->ai_next) { - if ((sockfd = socket(p->ai_family, p->ai_socktype, - p->ai_protocol)) == -1) { - perror("server: socket"); +void *send_announce_routine(void *arg) { + // unpack args + int station_num = (int) arg; + // send the announce messages + for (int i = 0; i < max_active_users; i++) + { + if (user_data[i].sockfd == 0 || user_data[i].sockfd == -1) { continue; } + // update the station of each user + if (user_data[i].stationNum == station_num) + { + send_announce_reply(user_data[i].sockfd, station_num); + } + } +} + +// void *sync_routine(void *arg) { +// int c = 0; +// while (1) +// { +// start_threads = 1; +// pthread_cond_broadcast(&cond); +// usleep(2000); + +// start_threads = 0; + +// usleep(1000000-2000); +// } +// } + +void *select_routine(void *arg) { + fd_set master; // master file descriptor list + fd_set read_fds; // temp file descriptor list for select() + int fdmax; // maximum file descriptor number + + int listener; // listening socket descriptor + int newfd; // newly accept()ed socket descriptor + struct sockaddr_storage remoteaddr; // client address + socklen_t addrlen; + + char buf[256]; // buffer for client data + int nbytes; + + + char remoteIP[INET6_ADDRSTRLEN]; + + int yes=1; // for setsockopt() SO_REUSEADDR, below + int i, j, rv; - if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes, - sizeof(int)) == -1) { - perror("setsockopt"); - exit(1); + struct addrinfo hints, *ai, *p; + + // const char* port = argv[1]; + + FD_ZERO(&master); // clear the master and temp sets + FD_ZERO(&read_fds); + + // LISTENER: get us a socket and bind it + memset(&hints, 0, sizeof hints); + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE; + if ((rv = getaddrinfo(NULL, port, &hints, &ai)) != 0) { + fprintf(stderr, "snowcast_server: %s\n", gai_strerror(rv)); + exit(1); + } + + for(p = ai; p != NULL; p = p->ai_next) { + listener = socket(p->ai_family, p->ai_socktype, p->ai_protocol); + if (listener < 0) { + continue; } + + // lose the pesky "address already in use" error message + setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)); - if (bind(sockfd, p->ai_addr, p->ai_addrlen) == -1) { - close(sockfd); - perror("server: bind"); + if (bind(listener, p->ai_addr, p->ai_addrlen) < 0) { + close(listener); continue; } break; } - freeaddrinfo(servinfo); // all done with this structure - - if (p == NULL) { - fprintf(stderr, "server: failed to bind\n"); - exit(1); + // if we got here, it means we didn't get bound + if (p == NULL) { + fprintf(stderr, "snowcast_server: failed to bind\n"); + exit(2); } - if (listen(sockfd, BACKLOG) == -1) { + freeaddrinfo(ai); // all done with this + + // listen + if (listen(listener, 10) == -1) { perror("listen"); - exit(1); + exit(3); } - sa.sa_handler = sigchld_handler; // reap all dead processes - sigemptyset(&sa.sa_mask); - sa.sa_flags = SA_RESTART; - if (sigaction(SIGCHLD, &sa, NULL) == -1) { - perror("sigaction"); - exit(1); - } + // add the listener to the master set + FD_SET(listener, &master); - printf("server: waiting for connections...\n"); + // keep track of the biggest file descriptor + fdmax = listener; // so far, it's this one - while(1) { // main accept() loop - sin_size = sizeof their_addr; - new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &sin_size); - if (new_fd == -1) { - perror("accept"); - continue; + while(1) { + read_fds = master; // copy it + if (select(fdmax+1, &read_fds, NULL, NULL, NULL) == -1) { + perror("select"); + exit(4); } - inet_ntop(their_addr.ss_family, - get_in_addr((struct sockaddr *)&their_addr), - s, sizeof s); - printf("server: got connection from %s\n", s); + // run through the existing connections looking for data to read + for(i = 0; i <= fdmax; i++) { + if (FD_ISSET(i, &read_fds)) { // we got one!! + if (i == listener) { + // handle new connections + addrlen = sizeof remoteaddr; + newfd = accept(listener, + (struct sockaddr *)&remoteaddr, + &addrlen); - // make a struct for the message, number is the number of stations - struct Hello hello; - if ((recv(new_fd, &hello, sizeof(struct Hello), 0)) == -1) - { - perror("send"); - close(new_fd); - exit(0); + if (newfd == -1) { + perror("accept"); + } else { + struct timeval tv; + tv.tv_sec = 0; + tv.tv_usec = TIMEOUT; + if (setsockopt(newfd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0) { + perror("setsockopt"); + } + + uint8_t command_type = -1; + if ((nbytes = recv(newfd, &command_type, 1, 0)) <= 0) + { + // got error or connection closed by client + if (nbytes == 0) { + // connection closed + // printf("selectserver: socket %d hung up\n", i); + } else { + perror("recv"); + } + // remove user from data structures + close(newfd); + // destroy_user(i); + continue; + } + + if (command_type != 0) { + char * message = "must send a Hello message first"; + send_invalid_command_reply(newfd, strlen(message), message); + close(newfd); + continue; + } + + // get the udp port + uint16_t udp_port = -1; + int bytes_to_read = sizeof(uint16_t); + if (recv_all(newfd, &udp_port, &bytes_to_read) == -1) { + perror("recv_all"); + close(newfd); + // destroy_user(i); + continue; + } + udp_port = ntohs(udp_port); + + FD_SET(newfd, &master); // add to master set + if (newfd > fdmax) { // keep track of the max + fdmax = newfd; + } + init_user(newfd); + update_user_udpPort(newfd, udp_port); + + // send the welcome message to client + struct Welcome welcome; + welcome.replyType = 2; + welcome.numStations = htons(num_stations); + int bytes_to_send = sizeof(struct Welcome); + if (send_all(newfd, &welcome, &bytes_to_send) == -1) + perror("send_all"); + } + } else { + // handle data from a client + uint8_t command_type = -1; + if ((nbytes = recv(i, &command_type, 1, 0)) <= 0) + { + // got error or connection closed by client + if (nbytes == 0) { + // connection closed + // printf("selectserver: socket %d hung up\n", i); + } else { + perror("recv"); + } + close(i); // bye! + FD_CLR(i, &master); // remove from master set + // remove user from data structures + destroy_user(i); + } + else + { + // we got some data from a client + if (command_type == 0) { // we got a Hello commmand + + // get the udp port + uint16_t udp_port = -1; + int bytes_to_read = sizeof(uint16_t); + if (recv_all(i, &udp_port, &bytes_to_read) == -1) { + perror("recv_all"); + close(i); + FD_CLR(i, &master); + destroy_user(i); + continue; + } + + if (user_data[sockfd_to_user[i]].udpPort != -1) { + // send back in invalid command + char * message = "must not sent more than one Hello message"; + send_invalid_command_reply(i, strlen(message), message); + // drop connection upon invalid command + close(i); + FD_CLR(i, &master); + destroy_user(i); + continue; + } + } + else if (command_type == 1) { // we got a SetStation command + // get the station number + + uint16_t station_number = -1; + int bytes_to_read = sizeof(uint16_t); + if (recv_all(i, &station_number, &bytes_to_read) == -1) { + perror("recv_all"); + close(i); + FD_CLR(i, &master); + destroy_user(i); + continue; + } + station_number = ntohs(station_number); + + // check if user has a udpPort to stream to + if (user_data[sockfd_to_user[i]].udpPort == -1) { + // send back in invalid command + char * message = "must send Hello message first"; + send_invalid_command_reply(i, strlen(message), message); + // drop connection upon invalid command + close(i); + FD_CLR(i, &master); + destroy_user(i); + continue; + } + + // check if station num is in range + if (station_number >= num_stations || station_number < 0) { + // send back in invalid command + char * message = "station number out of range"; + send_invalid_command_reply(i, strlen(message), message); + // drop connection upon invalid command + close(i); + FD_CLR(i, &master); + destroy_user(i); + continue; + } + + // printf("setting station to %d\n", ntohs(command.number)); + // update station of user + update_user_station(i, station_number); + send_announce_reply(i, station_number); + } + else { + // send back in invalid command + char * message = "invalid command"; + send_invalid_command_reply(i, strlen(message), message); + + // drop connection upon invalid command + close(i); + FD_CLR(i, &master); + destroy_user(i); + } + } + } // END handle data from client + } // END got new incoming connection + } // END looping through file descriptors + + } // END for(;;)--and you thought it would never end! +} + +void *init_user(int sockfd) { + // add the user to the list of user data + pthread_mutex_lock(&mutex_user_data); + + if(sockfd > max_sockfd) { + max_sockfd = sockfd; + int * more_sockfd_to_user = realloc(sockfd_to_user, sizeof(int) * (max_sockfd + 1)); + if (!more_sockfd_to_user) { perror("realloc"); exit(1); } + sockfd_to_user = more_sockfd_to_user; + } + + int running_index = 0; + while(running_index < max_active_users) { + if (user_data[running_index].sockfd == -1) { + break; } + running_index++; + } + if (running_index == max_active_users) { + // printf("reached max active users\n"); + // printf("making new memory\n"); + max_active_users++; + user_t *more_users = realloc(user_data, sizeof(user_t) * max_active_users); + if (!more_users) { perror("realloc"); exit(1); } + user_data = more_users; + } + + // map TCP sockfd to this user index + user_data[running_index] = (user_t){-1, -1, sockfd}; + sockfd_to_user[sockfd] = running_index; + // free(user_stream_threads); + pthread_mutex_unlock(&mutex_user_data); +} +void *update_user_udpPort(int sockfd, int udpPort) { + pthread_mutex_lock(&mutex_user_data); + // get the user + user_t *user = &user_data[sockfd_to_user[sockfd]]; + // set the udpPort + user->udpPort = udpPort; + // start the stream thread, now that we have the udpPort + // pthread_create(&user->streamThread, NULL, send_udp_packet_routine, (void *)sockfd_to_user[sockfd]); + pthread_mutex_unlock(&mutex_user_data); +} +void *update_user_station(int sockfd, int stationNum) { + pthread_mutex_lock(&mutex_user_data); + user_data[sockfd_to_user[sockfd]].stationNum = stationNum; + pthread_mutex_unlock(&mutex_user_data); +} +void *print_user_data(int index) { + printf("udpPort: %d, stationNum: %d, sockfd: %d\n", + user_data[index].udpPort, user_data[index].stationNum, user_data[index].sockfd); +} + +void destroy_user(int sockfd) { + pthread_mutex_lock(&mutex_user_data); + // stop the thread streaming to the user + // pthread_t thread = user_data[sockfd_to_user[sockfd]].streamThread; + // if (thread != -1) { + // pthread_cancel(thread); + // } + // "remove" the user from the list of user data + user_data[sockfd_to_user[sockfd]] = (user_t) {-1, -1, -1}; + // map sockfd to -1 + sockfd_to_user[sockfd] = -1; - // make a struct for the message, number is the number of stations - struct Welcome welcome; - welcome.replyType = 2; - welcome.numStations = htons(argc - 2); - if ((send(new_fd, &welcome, sizeof(struct Welcome), 0)) == -1) - perror("send"); - close(new_fd); - exit(0); + pthread_mutex_unlock(&mutex_user_data); +} + + +void *get_in_addr(struct sockaddr *sa) +{ + if (sa->sa_family == AF_INET) { + return &(((struct sockaddr_in*)sa)->sin_addr); } - return 0; + return &(((struct sockaddr_in6*)sa)->sin6_addr); +} + +void send_announce_reply(int fd, int station_num) { + char* file_path = stations[station_num].filePath; + int len_file_path = strlen(file_path); + + char *send_buffer = malloc(len_file_path+2); + if (!send_buffer) { + perror("malloc in send announce"); + return; + } + send_buffer[0] = 3; + send_buffer[1] = len_file_path; + + memcpy(send_buffer + 2, file_path, len_file_path); + + size_t bytes_to_send = len_file_path + 2; + if (send_all(fd, send_buffer, &bytes_to_send) == -1) + perror("send_all"); + + free(send_buffer); +} + +void send_invalid_command_reply(int fd, size_t message_size, char* message) { + char *send_buffer = malloc(message_size+2); + if (!send_buffer) { + perror("malloc in send invalid command"); + return; + } + + // type and payload size + send_buffer[0] = 4; + send_buffer[1] = message_size; + + memcpy(send_buffer + 2, message, message_size); + + int bytes_to_send = message_size + 2; + if (send_all(fd, send_buffer, &bytes_to_send) == -1) + perror("send"); + + free(send_buffer); +} + +// void init_station(int station_num, const char* station_name) { +// station_t *station = &station_data[station_num]; + +// // open the file +// int stream_fd = open(station_name, O_RDONLY); +// if (stream_fd == -1) { +// perror("open"); +// return; +// } +// station->streamFd = stream_fd; +// station->filePath = station_name; + +// // setup file buffer +// char stream_buffer[MAX_STREAM_RATE]; +// memset(stream_buffer, 0, MAX_STREAM_RATE); + +// station->fileBufferSize = MAX_STREAM_RATE; +// memcpy(&station->fileBufferSize, stream_buffer, MAX_STREAM_RATE); + + +// // load the first buffer into the stations +// seek_stations(station_num); +// } + +// void seek_stations(int station_num) { +// station_t *station_info = &station_data[station_num]; +// memset(&station_info->fileBuffer, 0, MAX_STREAM_RATE); +// int bytes_read = read(station_info->streamFd, &station_info->fileBuffer, MAX_STREAM_RATE); +// lseek(station_info->streamFd, -16, SEEK_SET); +// // printf("station info - bytes read: %d, station_fd: %d, filePath: %s, buffersize: %d\n", bytes_read, station_info->streamFd, station_info->filePath, station_info->fileBufferSize); + +// // time to restart the file +// if (bytes_read == 0) { +// if (lseek(station_info->streamFd, 0, SEEK_SET) == -1) { +// perror("fseek"); +// } +// pthread_t send_announce_thread; +// pthread_create(&send_announce_thread, NULL, send_announce_routine, (void *)station_num); + +// // load first chunk +// bytes_read = read(station_info->streamFd, &station_info->fileBuffer, MAX_STREAM_RATE); +// } +// station_info->fileBufferSize = bytes_read; +// } + + +// Parses a buffer into tokens, from cs33 :) +int parse(char buffer[LINE_MAX], char *tokens[LINE_MAX / 2]) { + const char *regex = " \n\t\f\r"; + char *current_token = strtok(buffer, regex); + if (current_token == NULL) return 0; + + for (int i = 0; current_token != NULL; i++) { + tokens[i] = current_token; + current_token = strtok(NULL, regex); + } + + return 1; }
\ No newline at end of file diff --git a/snowcast-server.c b/snowcast-server.c deleted file mode 100644 index e69de29..0000000 --- a/snowcast-server.c +++ /dev/null diff --git a/snowcast_control.dSYM/Contents/Info.plist b/snowcast_control.dSYM/Contents/Info.plist new file mode 100644 index 0000000..afe5b44 --- /dev/null +++ b/snowcast_control.dSYM/Contents/Info.plist @@ -0,0 +1,20 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!DOCTYPE plist PUBLIC "-//Apple Computer//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd"> +<plist version="1.0"> + <dict> + <key>CFBundleDevelopmentRegion</key> + <string>English</string> + <key>CFBundleIdentifier</key> + <string>com.apple.xcode.dsym.snowcast_control</string> + <key>CFBundleInfoDictionaryVersion</key> + <string>6.0</string> + <key>CFBundlePackageType</key> + <string>dSYM</string> + <key>CFBundleSignature</key> + <string>????</string> + <key>CFBundleShortVersionString</key> + <string>1.0</string> + <key>CFBundleVersion</key> + <string>1</string> + </dict> +</plist> diff --git a/snowcast_control.dSYM/Contents/Resources/DWARF/snowcast_control b/snowcast_control.dSYM/Contents/Resources/DWARF/snowcast_control Binary files differnew file mode 100644 index 0000000..1b435c8 --- /dev/null +++ b/snowcast_control.dSYM/Contents/Resources/DWARF/snowcast_control diff --git a/snowcast_listener b/snowcast_listener Binary files differnew file mode 100755 index 0000000..3e47e13 --- /dev/null +++ b/snowcast_listener diff --git a/snowcast_listener.dSYM/Contents/Info.plist b/snowcast_listener.dSYM/Contents/Info.plist new file mode 100644 index 0000000..0af79e1 --- /dev/null +++ b/snowcast_listener.dSYM/Contents/Info.plist @@ -0,0 +1,20 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!DOCTYPE plist PUBLIC "-//Apple Computer//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd"> +<plist version="1.0"> + <dict> + <key>CFBundleDevelopmentRegion</key> + <string>English</string> + <key>CFBundleIdentifier</key> + <string>com.apple.xcode.dsym.snowcast_listener</string> + <key>CFBundleInfoDictionaryVersion</key> + <string>6.0</string> + <key>CFBundlePackageType</key> + <string>dSYM</string> + <key>CFBundleSignature</key> + <string>????</string> + <key>CFBundleShortVersionString</key> + <string>1.0</string> + <key>CFBundleVersion</key> + <string>1</string> + </dict> +</plist> diff --git a/snowcast_listener.dSYM/Contents/Resources/DWARF/snowcast_listener b/snowcast_listener.dSYM/Contents/Resources/DWARF/snowcast_listener Binary files differnew file mode 100644 index 0000000..7be4031 --- /dev/null +++ b/snowcast_listener.dSYM/Contents/Resources/DWARF/snowcast_listener diff --git a/snowcast_server b/snowcast_server Binary files differdeleted file mode 100755 index b5d0885..0000000 --- a/snowcast_server +++ /dev/null diff --git a/snowcast_server.dSYM/Contents/Info.plist b/snowcast_server.dSYM/Contents/Info.plist new file mode 100644 index 0000000..789e734 --- /dev/null +++ b/snowcast_server.dSYM/Contents/Info.plist @@ -0,0 +1,20 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!DOCTYPE plist PUBLIC "-//Apple Computer//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd"> +<plist version="1.0"> + <dict> + <key>CFBundleDevelopmentRegion</key> + <string>English</string> + <key>CFBundleIdentifier</key> + <string>com.apple.xcode.dsym.snowcast_server</string> + <key>CFBundleInfoDictionaryVersion</key> + <string>6.0</string> + <key>CFBundlePackageType</key> + <string>dSYM</string> + <key>CFBundleSignature</key> + <string>????</string> + <key>CFBundleShortVersionString</key> + <string>1.0</string> + <key>CFBundleVersion</key> + <string>1</string> + </dict> +</plist> diff --git a/snowcast_server.dSYM/Contents/Resources/DWARF/snowcast_server b/snowcast_server.dSYM/Contents/Resources/DWARF/snowcast_server Binary files differnew file mode 100644 index 0000000..787e279 --- /dev/null +++ b/snowcast_server.dSYM/Contents/Resources/DWARF/snowcast_server diff --git a/talker.c b/talker.c deleted file mode 100644 index bb801e5..0000000 --- a/talker.c +++ /dev/null @@ -1,67 +0,0 @@ -/* -** talker.c -- a datagram "client" demo -*/ - -#include <stdio.h> -#include <stdlib.h> -#include <unistd.h> -#include <errno.h> -#include <string.h> -#include <sys/types.h> -#include <sys/socket.h> -#include <netinet/in.h> -#include <arpa/inet.h> -#include <netdb.h> - -#define SERVERPORT "4950" // the port users will be connecting to - -int main(int argc, char *argv[]) -{ - int sockfd; - struct addrinfo hints, *servinfo, *p; - int rv; - int numbytes; - - if (argc != 3) { - fprintf(stderr,"usage: talker hostname message\n"); - exit(1); - } - - memset(&hints, 0, sizeof hints); - hints.ai_family = AF_INET6; // set to AF_INET to use IPv4 - hints.ai_socktype = SOCK_DGRAM; - - if ((rv = getaddrinfo(argv[1], SERVERPORT, &hints, &servinfo)) != 0) { - fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv)); - return 1; - } - - // loop through all the results and make a socket - for(p = servinfo; p != NULL; p = p->ai_next) { - if ((sockfd = socket(p->ai_family, p->ai_socktype, - p->ai_protocol)) == -1) { - perror("talker: socket"); - continue; - } - - break; - } - - if (p == NULL) { - fprintf(stderr, "talker: failed to create socket\n"); - return 2; - } - - if ((numbytes = sendto(sockfd, argv[2], strlen(argv[2]), 0, - p->ai_addr, p->ai_addrlen)) == -1) { - perror("talker: sendto"); - exit(1); - } - - freeaddrinfo(servinfo); - - printf("talker: sent %d bytes to %s\n", numbytes, argv[1]); - close(sockfd); - - return 0; -}
\ No newline at end of file |