aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsotech117 <michael_foiani@brown.edu>2023-09-23 07:08:28 -0400
committersotech117 <michael_foiani@brown.edu>2023-09-23 07:08:28 -0400
commitd93da5af53d6beb9a2339839aa47fbbbbeafc208 (patch)
tree7f9d2013845137979e0805b96585ae54f9cdb47f
parent2707112ffc0b0eed6af8271d32f94e1622203a80 (diff)
bitrate still low, but pass the no read from file test
-rw-r--r--Makefile5
-rwxr-xr-xnewbin0 -> 55440 bytes
-rw-r--r--new.c497
-rw-r--r--new.dSYM/Contents/Info.plist20
-rw-r--r--new.dSYM/Contents/Resources/DWARF/newbin0 -> 19866 bytes
-rw-r--r--server.c407
-rwxr-xr-xsnowcast_serverbin44944 -> 45200 bytes
-rw-r--r--snowcast_server.dSYM/Contents/Resources/DWARF/snowcast_serverbin27442 -> 27865 bytes
8 files changed, 759 insertions, 170 deletions
diff --git a/Makefile b/Makefile
index 3aae1ce..7a0a8e1 100644
--- a/Makefile
+++ b/Makefile
@@ -17,4 +17,7 @@ client:
# $(CC) $(CFLAGS) -o c client.c
clean:
- rm -fv snowcast_server snowcast_control snowcast_listener \ No newline at end of file
+ rm -fv snowcast_server snowcast_control snowcast_listener
+
+new:
+ $(CC) $(CFLAGS) -o snowcast_server new.c
diff --git a/new b/new
new file mode 100755
index 0000000..1116fa8
--- /dev/null
+++ b/new
Binary files differ
diff --git a/new.c b/new.c
new file mode 100644
index 0000000..e180200
--- /dev/null
+++ b/new.c
@@ -0,0 +1,497 @@
+#include <string.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/syscall.h>
+#include <pthread.h>
+#include <stdio.h>
+#include <fcntl.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netdb.h>
+#include <arpa/inet.h>
+
+#include "protocol.c"
+
+
+
+#define LINE_MAX 1024
+#define MAX_RATE_PER_SECOND 16*1024
+#define TCP_TIMEOUT 100000 // 100ms in microseconds
+
+typedef struct station {
+ pthread_t thread;
+ int fd;
+ char *path;
+} station_t;
+
+int num_stations;
+station_t *stations;
+setup_stations(int argc, char *argv[]);
+*stream_routine(void *arg);
+
+int setup_listener(int port);
+*accept_routine(void *arg);
+
+// user data structure
+typedef struct user {
+ int tcpfd;
+ int udpfd;
+ pthread_t command_thread;
+ int station;
+} user_t;
+int num_users = 0;
+user_t *users;
+int tcpfd_max = 0;
+int *fd_to_user;
+pthread_mutex_t mutex_users = PTHREAD_MUTEX_INITIALIZER;
+*init_user_routine(void *arg);
+int init_user(int tcpfd, int udpfd);
+void destroy_user(int fd);
+*command_routine(void *arg);
+
+send_invalid_reply(int fd, char *message) {
+ printf("sending INVALID reply to socket %d\n", fd);
+ size_t message_size = strlen(message);
+ char buf[message_size + 2];
+ // type and payload size
+ buf[0] = 4;
+ buf[1] = message_size;
+ memcpy(buf + 2, message, message_size);
+
+ int size_buf = message_size + 2;
+ if (send_all(fd, &buf, &size_buf) == -1) {
+ perror("send_all (in init_user_routine)");
+ return -1;
+ }
+
+ return 1;
+}
+
+print_users();
+
+main(int argc, char *argv[]) {
+ if (argc < 3) {
+ printf("usage: ./snowcast_server <port> <file0> [file 1] [file 2] ...\n");
+ exit(1);
+ }
+
+ setup_stations(argc, argv);
+
+ users = malloc(0);
+ fd_to_user = malloc(0);
+
+ int port = atoi(argv[1]);
+ int listenerfd = setup_listener(port);
+ pthread_t accept_thread;
+ pthread_create(&accept_thread, NULL, accept_routine, listenerfd);
+
+
+ while(1)
+ ;
+}
+
+setup_stations(int argc, char *argv[]) {
+ num_stations = argc - 2;
+
+ // get the size to malloc
+ int totalSize = 0;
+ for(int i = 2; i < argc; i++)
+ {
+ // printf("file: %s\n", argv[i]);
+ totalSize += sizeof(pthread_t) + sizeof(int) + strlen(argv[i]);
+ }
+
+ // malloc the stations array
+ stations = malloc(totalSize);
+ if (!stations) { perror("malloc (stations pointer)"); exit(1); }
+ // assign the stations, and start the threads
+ for (int i = 0; i < num_stations; i++) {
+ stations[i].path = argv[i+2];
+ stations[i].fd = open(argv[i+2], O_RDONLY);
+ printf(stations[i].path);
+ if (stations[i].fd < 0) { perror("read (from station file)"); exit(1); }
+ pthread_create(&stations[i].thread, NULL, stream_routine, &stations[i].fd);
+ }
+
+ printf("successfully created %d stations\n", num_stations);
+}
+
+int setup_listener(int port) {
+ int sock = socket(AF_INET, SOCK_STREAM, 0);
+ if (sock < 0) { perror("socket (listener)"); return -1; }
+
+ struct sockaddr_in addr;
+ addr.sin_family = AF_INET;
+ addr.sin_port = htons(port);
+ addr.sin_addr.s_addr = INADDR_ANY;
+
+ if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
+ perror("bind (listener)");
+ return -1;
+ }
+
+ if (listen(sock, 0) < 0) { perror("listen (listener)"); return -1; }
+
+ return sock;
+}
+
+// THREAD FUNCTIONS
+
+// helper
+int read_file(int fd, char buffer[MAX_RATE_PER_SECOND]) {
+ int bytes_read = read(fd, buffer, MAX_RATE_PER_SECOND);
+ if (bytes_read < 0) { perror("read (from station file)"); return -1; }
+ // printf("bytes read: %d\n", bytes_read);
+ if (bytes_read == 0) {
+ // printf("end of file, restarting\n");
+ if(lseek(fd, 0, SEEK_SET) == -1) { perror("lseek (in resarting file)"); return -1; }
+ bytes_read = read(fd, buffer, MAX_RATE_PER_SECOND);
+ if (bytes_read < 0) { perror("read (from station file, after restart)"); return -1; }
+ }
+
+ return bytes_read;
+}
+
+*stream_cleanup(void *arg) {
+ int fd = *(int*)arg;
+ printf("cleanup/delete station\n");
+ return (NULL);
+}
+
+*stream_routine(void *arg) {
+ int fd = *(int*)arg;
+
+ pthread_cleanup_push(stream_cleanup, fd);
+
+ // make buffer which will be used to stream to children
+ char buffer[MAX_RATE_PER_SECOND];
+ memset(buffer, 0, MAX_RATE_PER_SECOND);
+ // if (!buffer) { perror("malloc (buffer in station thread)"); exit(1); }
+
+ for (;;)
+ {
+ // load bytes into buffer
+ int bytes_read = read_file(fd, buffer);
+ if (bytes_read == -1) { exit(1); }
+
+ // TODO: send buffer to children
+
+ sleep(1);
+ memset(buffer, 0, MAX_RATE_PER_SECOND);
+ }
+
+ pthread_cleanup_pop(1);
+
+ return (NULL);
+}
+
+*accept_cleanup(void *arg) {
+ int fd = (int) arg;
+ printf("cleanup/delete accept\n");
+ close(fd);
+ return (NULL);
+}
+
+*accept_routine(void *arg) {
+ int listener = (int) arg;
+
+ // pthread_cleanup_push(accept_cleanup, listener);
+
+ while(1) {
+ printf("accepting %d\n", listener);
+ int userfd = accept(listener, NULL, NULL);
+ if (userfd < 0) { perror("accept (in accept thread)"); return(NULL); }
+
+ printf("accepted socket %d\n", userfd);
+
+ pthread_t init_user_thread;
+ pthread_create(&init_user_thread, NULL, init_user_routine, userfd);
+ }
+
+ // pthread_cleanup_pop(1);
+}
+
+apply_timeout(int fd) {
+ // handle handshake
+ struct timeval tv;
+ tv.tv_sec = 0;
+ tv.tv_usec = TCP_TIMEOUT;
+ if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0) {
+ perror("setsockopt (in apply_timeout)");
+ return -1;
+ }
+
+ return 1;
+}
+
+remove_timeout(int fd)
+{
+ // handle handshake
+ struct timeval tv;
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
+ if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0) {
+ perror("setsockopt (in remove_timeout)");
+ return -1;
+ }
+
+ return 1;
+}
+
+int handle_handshake(int userfd) {
+ if (apply_timeout(userfd) == -1) { return -1; }
+
+ // get the command type
+ uint8_t command_type = -1;
+ if ((recv(userfd, &command_type, 1, 0)) < 0)
+ {
+ perror("recv (in init_user_routine)");
+ return -1;
+ }
+
+ // check
+ if (command_type != 0) {
+ printf("user on socket %d must send a Hello message first", userfd);
+ return -1;
+ }
+
+ // get the udp port
+ uint16_t udp_port = -1;
+ int bytes_to_read = sizeof(uint16_t);
+ if (recv_all(userfd, &udp_port, &bytes_to_read) == -1) {
+ perror("recv_all (in init_user_routine)");
+ return -1;
+ }
+ // remove timeout
+ if (remove_timeout(userfd) == -1) { return -1; }
+
+ printf("recieved HELLO command from socket %d\n", userfd);
+
+
+ // make udp socket
+ int udpfd = socket(AF_INET, SOCK_DGRAM, AI_PASSIVE);
+ if (udpfd < 0)
+ {
+ perror("socket (in init_user_routine UDP)");
+ return -1;
+ }
+
+ return udpfd;
+}
+
+send_welcome_reply(int fd) {
+ printf("sending WELCOME reply to socket %d\n", fd);
+
+ struct Welcome welcome;
+ welcome.replyType = 2;
+ printf("num_stations: %d\n", num_stations);
+ welcome.numStations = htons(num_stations);
+ int bytes_to_send = sizeof(struct Welcome);
+ if (send_all(fd, &welcome, &bytes_to_send) == -1) {
+ perror("send_all (in init_user_routine)");
+ return -1;
+ }
+
+ return 1;
+}
+
+*init_user_cleanup(void *arg) {
+ int fd = (int) arg;
+ // printf("cleanup/delete user_maybe?\n");
+ close(fd);
+ return (NULL);
+}
+
+*init_user_routine(void *arg) {
+ int userfd = (int) arg;
+ // pthread_cleanup_push(init_user_cleanup, userfd);
+
+ printf("new user on socket %d, waiting for HELLO\n", userfd);
+ int udpfd = handle_handshake(userfd);
+ if (udpfd == -1)
+ {
+ perror("handle_handshake (in init_user_routine)");
+ close(userfd);
+ return (NULL);
+ }
+
+ if(send_welcome_reply(userfd) == -1) {
+ perror("send_welcome_reply (in init_user_routine)");
+ close(userfd);
+ return (NULL);
+ }
+
+ if (init_user(userfd, udpfd) != 0) {
+ perror("init_user (in init_user_routine)");
+ destroy_user(userfd);
+ return (NULL);
+ }
+
+ return (NULL);
+
+ // pthread_cleanup_pop(0);
+}
+
+*command_cleanup(void *arg) {
+ int fd = (int) arg;
+ printf("cleanup/delete command\n");
+ close(fd);
+ return (NULL);
+}
+
+handle_setstation_command(int fd, uint16_t station_number) {
+ printf("received SETSTATION command from socket %d\n", fd);
+ // check if station number is valid
+ int station_num = ntohs(station_number);
+ if (station_num < 0 || station_num >= num_stations) {
+ printf("station number %d is invalid\n", station_num);
+ send_invalid_reply(fd, "station number is invalid");
+ return -1;
+ }
+
+ // set the station number
+ pthread_mutex_lock(&mutex_users);
+ printf("setting station number of user on socket %d to %d, user! %d\n", fd, station_num, fd_to_user[fd]);
+ users[fd_to_user[fd]].station = station_num;
+ pthread_mutex_unlock(&mutex_users);
+
+ print_users();
+
+ return 1;
+}
+
+*command_routine(void *arg) {
+ int fd = (int) arg;
+
+ printf("waiting for SETSTATION from socket %d\n", fd);
+
+ while(1) {
+ // get the command type
+ uint8_t command_type = -1;
+ if ((recv(fd, &command_type, 1, 0)) < 0)
+ {
+ perror("recv (in command_routine)");
+ destroy_user(fd);
+ return (NULL);
+ }
+
+ // check if have sent hello before
+ if (command_type == 0) {
+ printf("user on socket %d has already sent a HELLO command\n", fd);
+ send_invalid_reply(fd, "already sent a HELLO command");
+ destroy_user(fd);
+ return (NULL);
+ }
+
+ else if (command_type == 1) {
+ // get the station number
+ uint16_t station_number = -1;
+ int bytes_to_read = sizeof(uint16_t);
+ if (recv_all(fd, &station_number, &bytes_to_read) == -1) {
+ perror("recv_all (in command_routine)");
+ destroy_user(fd);
+ return (NULL);
+ }
+ if (handle_setstation_command(fd, station_number) == -1) {
+ destroy_user(fd);
+ return (NULL);
+ }
+ }
+
+ else if (command_type == 5) {
+ printf("user on socket %d has requested a LIST\n", fd);
+ }
+
+ else {
+ printf("user on socket %d has sent an INVALID command type of %d\n", fd, command_type);
+ send_invalid_reply(fd, "invalid command type");
+ destroy_user(fd);
+ return (NULL);
+ }
+
+
+ }
+}
+
+
+// HELPER FUNCTIONS
+
+int init_user(int sockfd, int udpfd) {
+ pthread_mutex_lock(&mutex_users);
+ printf("initializing user on sockets %d (tcp), %d (udp)\n", sockfd, udpfd);
+ // update map
+ if(sockfd > tcpfd_max) {
+ tcpfd_max = sockfd;
+ int * more_sockfd_to_user = realloc(fd_to_user, sizeof(int) * (tcpfd_max + 1));
+ if (!more_sockfd_to_user) { perror("realloc"); exit(1); }
+ fd_to_user = more_sockfd_to_user;
+ }
+
+ int running_index = 0;
+ while(running_index < num_users) {
+ if (users[running_index].tcpfd == -1) {
+ break;
+ }
+ running_index++;
+ }
+ if (running_index == num_users) {
+ // printf("reached max active users\n");
+ // printf("making new memory\n");
+ num_users++;
+ user_t *more_users = realloc(users, sizeof(user_t) * num_users);
+ if (!more_users) { perror("realloc"); exit(1); }
+ users = more_users;
+ }
+
+ // map TCP sockfd to this user index
+ users[running_index] = (user_t){sockfd, udpfd, -1, -1};
+ pthread_create(&users[running_index].command_thread, NULL, command_routine, sockfd);
+ fd_to_user[sockfd] = running_index;
+ // free(user_stream_threads);
+
+ print_users();
+ pthread_mutex_unlock(&mutex_users);
+
+ return 0;
+}
+
+void destroy_user(int fd) {
+ pthread_mutex_lock(&mutex_users);
+
+ printf("destroying user on sockets %d (tcp), %d (udp)\n", users[fd_to_user[fd]].tcpfd, users[fd_to_user[fd]].udpfd);
+
+ // close the sockets
+
+ close(fd);
+ close(users[fd_to_user[fd]].udpfd);
+ // stop the thread taking commands to the user
+ pthread_cancel(&users[fd_to_user[fd]].command_thread);
+ // "remove" the user from the list of user data
+ users[fd_to_user[fd]] = (user_t) {-1, -1, -1, -1};
+ // map sockfd to -1
+ fd_to_user[fd] = -1;
+ pthread_mutex_unlock(&mutex_users);
+}
+
+
+print_users() {
+ printf("num users: %d\n", num_users);
+ for (int i = 0; i < num_users; i++) {
+ printf("tcpfd %d , udpfd %d , station %d |\n", users[i].tcpfd, users[i].udpfd, users[i].station);
+ }
+ printf("\n");
+}
+
+// Parses a buffer into tokens, from cs33 :)
+int parse(char buffer[LINE_MAX], char *tokens[LINE_MAX / 2]) {
+ const char *regex = " \n\t\f\r";
+ char *current_token = strtok(buffer, regex);
+ if (current_token == NULL) return 0;
+
+ for (int i = 0; current_token != NULL; i++) {
+ tokens[i] = current_token;
+ current_token = strtok(NULL, regex);
+ }
+
+ return 1;
+} \ No newline at end of file
diff --git a/new.dSYM/Contents/Info.plist b/new.dSYM/Contents/Info.plist
new file mode 100644
index 0000000..ba6d460
--- /dev/null
+++ b/new.dSYM/Contents/Info.plist
@@ -0,0 +1,20 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE plist PUBLIC "-//Apple Computer//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
+<plist version="1.0">
+ <dict>
+ <key>CFBundleDevelopmentRegion</key>
+ <string>English</string>
+ <key>CFBundleIdentifier</key>
+ <string>com.apple.xcode.dsym.new</string>
+ <key>CFBundleInfoDictionaryVersion</key>
+ <string>6.0</string>
+ <key>CFBundlePackageType</key>
+ <string>dSYM</string>
+ <key>CFBundleSignature</key>
+ <string>????</string>
+ <key>CFBundleShortVersionString</key>
+ <string>1.0</string>
+ <key>CFBundleVersion</key>
+ <string>1</string>
+ </dict>
+</plist>
diff --git a/new.dSYM/Contents/Resources/DWARF/new b/new.dSYM/Contents/Resources/DWARF/new
new file mode 100644
index 0000000..3200cf1
--- /dev/null
+++ b/new.dSYM/Contents/Resources/DWARF/new
Binary files differ
diff --git a/server.c b/server.c
index 7df1f12..324527e 100644
--- a/server.c
+++ b/server.c
@@ -17,20 +17,29 @@
#define LINE_MAX 1024
#define MAX_USERS 1000
#define MAX_PATH 50
-#define MAX_STREAM_RATE 16*1024
-
+#define MAX_RATE_PER_SECOND 16*1024
+
+// typedef struct station {
+// int streamFd;
+// char* filePath;
+// int fileBufferSize;
+// char fileBuffer[MAX_STREAM_RATE];
+// } station_t;
typedef struct station {
- int streamFd;
- char* filePath;
- int fileBufferSize;
- char fileBuffer[MAX_STREAM_RATE];
+ pthread_t streamThread;
+ int readfd;
+ char *filePath;
} station_t;
+int num_stations;
+station_t *stations;
+int setup_stations(int argc, char *argv[]);
+void *stream_routine(void *arg);
+
typedef struct user {
int udpPort;
int stationNum;
int sockfd;
- pthread_t streamThread;
} user_t;
@@ -43,7 +52,7 @@ pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t station_mutex = PTHREAD_MUTEX_INITIALIZER;
const char *port;
-int num_stations;
+// int num_stations;
int start_threads = 0;
int max_active_users = 0;
@@ -55,14 +64,20 @@ user_t *user_data;
int *sockfd_to_user;
// stations array pointer
-station_t *station_data;
+// station_t *station_data;
+
+struct udp_packet_routine_args {
+ int user_index;
+ int buffer_size;
+ char *file_buffer;
+};
void *send_udp_packet_routine(void* arg);
-void *select_thread(void* arg);
-void *synchronization_thread(void* arg);
+void *select_routine(void* arg);
+void *sync_routine(void* arg);
+void *send_announce_routine(void* arg);
void init_station(int station_num, const char *station_name);
-void seek_stations(int station_num);
int parse(char buffer[LINE_MAX], char *tokens[LINE_MAX / 2]);
void *print_info_routine(void *arg);
@@ -88,41 +103,30 @@ main(int argc, char *argv[])
exit(1);
}
+ // initizlize the port
port = argv[1];
- num_stations = argc - 2;
-
- // init stations
- size_t totalSize = 0;
- // get size to malloc
- for (int i = 2; i < argc; i++)
- {
- // printf("file: %s\n", argv[i]);
- // each "station" has a fd (int), filePath (char*), file_buffer_size (int), buffer_size (MAX_STREAM_RATE)
- totalSize += sizeof(int) + strlen(argv[i]) + sizeof(int) + MAX_STREAM_RATE;
- }
- station_data = malloc(totalSize);
- if (!station_data) { perror("malloc station data"); return 1; }
- // assign the stations
- for (int i = 2; i < argc; i++)
- {
- init_station(i - 2, argv[i]);
+ // initialize the stations & their threads
+ if (setup_stations(argc, argv) == -1) {
+ perror("setup_stations");
+ exit(1);
}
// make array of user data
+ printf("max active users: %d\n", sizeof(user_t) * max_active_users);
user_data = malloc(sizeof(user_t) * max_active_users);
if (!user_data) { perror("malloc userdata"); return 1; }
sockfd_to_user = malloc(sizeof(int) * max_active_users);
if (!sockfd_to_user) { perror("malloc sockfd to user"); return 1; }
// make and start "select" thread that manages:
- // 1) new connections, 2) requests from current connections, 3)cloing connections
- pthread_t s_thread;
- pthread_create(&s_thread, NULL, select_thread, NULL);
+ // 1) new connections, 2) requests from current connections, 3) closing connections
+ pthread_t select_thread;
+ pthread_create(&select_thread, NULL, select_routine, NULL);
// start syncchronization thread to broadcast stations
- pthread_t sync_thread;
- pthread_create(&sync_thread, NULL, synchronization_thread, NULL);
+ // pthread_t sync_thread;
+ // pthread_create(&sync_thread, NULL, sync_routine, NULL);
// command line interface
char input[LINE_MAX];
@@ -179,6 +183,110 @@ main(int argc, char *argv[])
return 0;
}
+int read_file(int fd, char buffer[MAX_RATE_PER_SECOND], int station_num) {
+ int bytes_read = read(fd, buffer, MAX_RATE_PER_SECOND);
+ if (bytes_read < 0) { perror("read (from station file)"); return -1; }
+ // printf("bytes read: %d\n", bytes_read);
+ if (bytes_read == 0) {
+ // printf("end of file, restarting\n");
+ pthread_t send_announce_thread;
+ pthread_create(&send_announce_thread, NULL, send_announce_routine, station_num);
+
+ if (lseek(fd, 0, SEEK_SET) == -1)
+ {
+ perror("lseek (in resarting file)");
+ return -1;
+ }
+ bytes_read = read(fd, buffer, MAX_RATE_PER_SECOND);
+ if (bytes_read < 0) { perror("read (from station file, after restart)"); return -1; }
+ }
+
+ return bytes_read;
+}
+
+void *stream_routine_cleanup(void *arg) {
+ int read_fd = (int) arg;
+ close(read_fd);
+}
+
+void *stream_routine(void *arg) {
+ int station_num = (int) arg;
+ printf("stream routine %d\n", station_num);
+ int read_fd = stations[station_num].readfd;
+
+ pthread_cleanup_push(stream_routine_cleanup, read_fd);
+
+ // make buffer which will be used to stream to children
+ char buffer[MAX_RATE_PER_SECOND];
+ memset(buffer, 0, MAX_RATE_PER_SECOND);
+ // if (!buffer) { perror("malloc (buffer in station thread)"); exit(1); }
+
+ for (;;)
+ {
+ // load bytes into buffer
+ int bytes_read = read_file(read_fd, buffer, station_num);
+ if (bytes_read == -1) { exit(1); }
+
+ // TODO: send buffer to children
+ char *send_buffer = malloc(2 + bytes_read);
+ for (int i = 0; i < max_active_users; i++)
+ {
+ if (!user_data[i].sockfd || user_data[i].sockfd == -1)
+ continue;
+ if (user_data[i].stationNum == station_num)
+ {
+ // send the udp packet
+ int *send_buffer = malloc(2 + bytes_read);
+ memset(send_buffer, 0, 2 + bytes_read);
+ send_buffer[0] = i;
+ send_buffer[1] = bytes_read;
+ memcpy(send_buffer+2, buffer, bytes_read);
+ // printf("sending udp packet to user %d\n", i);
+ pthread_t t;
+ pthread_create(&t, NULL, send_udp_packet_routine, send_buffer);
+ }
+ }
+ free(send_buffer);
+ usleep(1000000-5000);
+ start_threads = 1;
+ pthread_cond_broadcast(&cond);
+
+ usleep(5000);
+ start_threads = 0;
+
+ memset(buffer, 0, MAX_RATE_PER_SECOND);
+ }
+
+ return (NULL);
+
+ pthread_cleanup_pop(1);
+}
+
+int setup_stations(int argc, char *argv[]) {
+ num_stations = argc - 2;
+
+ // get the size to malloc
+ int totalSize = 0;
+ for(int i = 2; i < argc; i++)
+ {
+ totalSize += sizeof(pthread_t) + sizeof(int) + strlen(argv[i]);
+ }
+
+ // malloc the stations array
+ stations = malloc(totalSize);
+ if (!stations) { perror("malloc (stations pointer)"); return -1; }
+ // assign the stations, and start the threads
+ for (int i = 0; i < num_stations; i++) {
+ stations[i].filePath = argv[i+2];
+ stations[i].readfd = open(argv[i+2], O_RDONLY);
+ if (stations[i].readfd < 0) { perror("read (from station file)"); return -1; }
+ pthread_create(&stations[i].streamThread, NULL, stream_routine, i);
+ }
+
+ printf("successfully created %d stations\n", num_stations);
+ return 1;
+}
+
void write_int_to_fd(int fd, int n) {
int l = snprintf(NULL, 0, "%d", n);
char *num = malloc(l + 1);
@@ -203,7 +311,7 @@ void *print_info_routine(void *arg) {
write(print_fd, comma, strlen(comma));
// write file path
- char* file_path = station_data[i].filePath;
+ char* file_path = stations[i].filePath;
write(print_fd, file_path, strlen(file_path));
for (int j = 0; j < max_active_users; j++) {
@@ -253,8 +361,15 @@ void udp_port_cleanup_handler(void *arg)
/* Make the manager routine */
void *send_udp_packet_routine(void *arg) {
+ printf("send udp packet routine\n");
+ int *buf = arg;
// unpack args
- int user_index = (int) arg;
+ int user_index = buf[0];
+ int buffer_size = buf[1];
+ char *file_buffer = malloc(buffer_size);
+ memcpy(file_buffer, buf+2, buffer_size);
+
+ // printf("udp packet routine, user:%d\n size: %d\n", user_index, buffer_size);
// declare vairables to be used
int did_work = 1;
@@ -299,73 +414,34 @@ void *send_udp_packet_routine(void *arg) {
}
pthread_cleanup_push(udp_port_cleanup_handler, (void *)udp_sockfd);
- while (1)
- {
// wait for
- pthread_mutex_lock(&m);
- did_work = 0;
- while (!start_threads)
- {
- pthread_cond_wait(&cond, &m);
- }
-
- int station_num = user_data[user_index].stationNum;
- if (station_num == -1) {
- did_work = 1;
- }
-
- if (!did_work) {
- // sendto a random string of data to the user
- // int station_num = user_data[user_index].stationNum;
- // char *data = station_data[station_num].filePath;
- // printf("load data: thread %d \n", user_index);
-
- // get file path
- // char* file_path = station_data[station_num].filePath;
- // // get current seek chunk
- // int stream_fd = open(file_path, O_RDONLY);
- // if (stream_fd == -1) {
- // perror("open");
- // return (NULL);
- // }
- // int current_chunk = station_data[station_num].seekIndex;
- // if (lseek(stream_fd, current_chunk, SEEK_SET) == -1) {
- // perror("fseek");
- // return (NULL);
- // }
- // read 1000 bytes of the file
-
- // char file_buffer[BYTES_PER_SECOND];
- // int bytes_read = 0;
- // if ((bytes_read = read(stream_fd, file_buffer, BYTES_PER_SECOND)) == -1) {
- // perror("fread");
- // return (NULL);
- // }
- // close(stream_fd);
-
- station_t *station_info = &station_data[station_num];
- int bytes_read = station_info->fileBufferSize;
- // potential error here!
- // printf("station info - bytes read: %d, station_fd: %d, filePath: %s, buffersize: %d\n", bytes_read, station_info->streamFd, station_info->filePath, station_info->fileBufferSize);
-
- if (send_all_udp(udp_sockfd, station_info->fileBuffer, &bytes_read, thread_res) == -1)
- {
- perror("send_all_udp");
- printf("We only sent %d bytes because of the error!\n", bytes_read);
- }
- did_work = 1;
+ pthread_mutex_lock(&m);
+ did_work = 0;
+ while (!start_threads)
+ {
+ pthread_cond_wait(&cond, &m);
+ }
+ int station_num = user_data[user_index].stationNum;
+ if (station_num == -1) {
+ did_work = 1;
+ }
+ // potential error here!
+ // printf("station info - bytes read: %d, station_fd: %d, filePath: %s, buffersize: %d\n", bytes_read, station_info->streamFd, station_info->filePath, station_info->fileBufferSize);
- usleep(400000);
- }
+ if (send_all_udp(udp_sockfd, file_buffer, &buffer_size, thread_res) == -1)
+ {
+ perror("send_all_udp");
+ printf("We only sent %d bytes because of the error!\n", buffer_size);
+ }
- pthread_mutex_unlock(&m);
+ free(file_buffer);
- usleep(100000);
- }
+ pthread_mutex_unlock(&m);
pthread_cleanup_pop(1);
- return NULL;
+
+ return (NULL);
}
void *send_announce_routine(void *arg) {
@@ -385,28 +461,21 @@ void *send_announce_routine(void *arg) {
}
}
-void *synchronization_thread(void *arg) {
- int c = 0;
- while (1)
- {
- start_threads = 1;
- pthread_cond_broadcast(&cond);
- usleep(2000);
-
- start_threads = 0;
- size_t BYTES_PER_SECOND = 16*1024;
+// void *sync_routine(void *arg) {
+// int c = 0;
+// while (1)
+// {
+// start_threads = 1;
+// pthread_cond_broadcast(&cond);
+// usleep(2000);
- // seek each file and read
- for (int i = 0; i < num_stations; i++)
- {
- seek_stations(i);
- }
+// start_threads = 0;
- usleep(1000000-2000);
- }
-}
+// usleep(1000000-2000);
+// }
+// }
-void *select_thread(void *arg) {
+void *select_routine(void *arg) {
fd_set master; // master file descriptor list
fd_set read_fds; // temp file descriptor list for select()
int fdmax; // maximum file descriptor number
@@ -688,7 +757,7 @@ void *init_user(int sockfd) {
}
// map TCP sockfd to this user index
- user_data[running_index] = (user_t){-1, -1, sockfd, -1};
+ user_data[running_index] = (user_t){-1, -1, sockfd};
sockfd_to_user[sockfd] = running_index;
// free(user_stream_threads);
pthread_mutex_unlock(&mutex_user_data);
@@ -700,7 +769,7 @@ void *update_user_udpPort(int sockfd, int udpPort) {
// set the udpPort
user->udpPort = udpPort;
// start the stream thread, now that we have the udpPort
- pthread_create(&user->streamThread, NULL, send_udp_packet_routine, (void *)sockfd_to_user[sockfd]);
+ // pthread_create(&user->streamThread, NULL, send_udp_packet_routine, (void *)sockfd_to_user[sockfd]);
pthread_mutex_unlock(&mutex_user_data);
}
void *update_user_station(int sockfd, int stationNum) {
@@ -709,19 +778,19 @@ void *update_user_station(int sockfd, int stationNum) {
pthread_mutex_unlock(&mutex_user_data);
}
void *print_user_data(int index) {
- printf("udpPort: %d, stationNum: %d, sockfd: %d, threadId:%d\n",
- user_data[index].udpPort, user_data[index].stationNum, user_data[index].sockfd, user_data[index].streamThread);
+ printf("udpPort: %d, stationNum: %d, sockfd: %d\n",
+ user_data[index].udpPort, user_data[index].stationNum, user_data[index].sockfd);
}
void destroy_user(int sockfd) {
pthread_mutex_lock(&mutex_user_data);
// stop the thread streaming to the user
- pthread_t thread = user_data[sockfd_to_user[sockfd]].streamThread;
- if (thread != -1) {
- pthread_cancel(thread);
- }
+ // pthread_t thread = user_data[sockfd_to_user[sockfd]].streamThread;
+ // if (thread != -1) {
+ // pthread_cancel(thread);
+ // }
// "remove" the user from the list of user data
- user_data[sockfd_to_user[sockfd]] = (user_t) {-1, -1, -1, -1};
+ user_data[sockfd_to_user[sockfd]] = (user_t) {-1, -1, -1};
// map sockfd to -1
sockfd_to_user[sockfd] = -1;
@@ -739,7 +808,7 @@ void *get_in_addr(struct sockaddr *sa)
}
void send_announce_reply(int fd, int station_num) {
- char* file_path = station_data[station_num].filePath;
+ char* file_path = stations[station_num].filePath;
int len_file_path = strlen(file_path);
char *send_buffer = malloc(len_file_path+2);
@@ -779,50 +848,50 @@ void send_invalid_command_reply(int fd, size_t message_size, char* message) {
free(send_buffer);
}
-void init_station(int station_num, const char* station_name) {
- station_t *station = &station_data[station_num];
-
- // open the file
- int stream_fd = open(station_name, O_RDONLY);
- if (stream_fd == -1) {
- perror("open");
- return;
- }
- station->streamFd = stream_fd;
- station->filePath = station_name;
-
- // setup file buffer
- char stream_buffer[MAX_STREAM_RATE];
- memset(stream_buffer, 0, MAX_STREAM_RATE);
-
- station->fileBufferSize = MAX_STREAM_RATE;
- memcpy(&station->fileBufferSize, stream_buffer, MAX_STREAM_RATE);
-
-
- // load the first buffer into the stations
- seek_stations(station_num);
-}
-
-void seek_stations(int station_num) {
- station_t *station_info = &station_data[station_num];
- memset(&station_info->fileBuffer, 0, MAX_STREAM_RATE);
- int bytes_read = read(station_info->streamFd, &station_info->fileBuffer, MAX_STREAM_RATE);
- lseek(station_info->streamFd, -16, SEEK_SET);
- // printf("station info - bytes read: %d, station_fd: %d, filePath: %s, buffersize: %d\n", bytes_read, station_info->streamFd, station_info->filePath, station_info->fileBufferSize);
-
- // time to restart the file
- if (bytes_read == 0) {
- if (lseek(station_info->streamFd, 0, SEEK_SET) == -1) {
- perror("fseek");
- }
- pthread_t send_announce_thread;
- pthread_create(&send_announce_thread, NULL, send_announce_routine, (void *)station_num);
-
- // load first chunk
- bytes_read = read(station_info->streamFd, &station_info->fileBuffer, MAX_STREAM_RATE);
- }
- station_info->fileBufferSize = bytes_read;
-}
+// void init_station(int station_num, const char* station_name) {
+// station_t *station = &station_data[station_num];
+
+// // open the file
+// int stream_fd = open(station_name, O_RDONLY);
+// if (stream_fd == -1) {
+// perror("open");
+// return;
+// }
+// station->streamFd = stream_fd;
+// station->filePath = station_name;
+
+// // setup file buffer
+// char stream_buffer[MAX_STREAM_RATE];
+// memset(stream_buffer, 0, MAX_STREAM_RATE);
+
+// station->fileBufferSize = MAX_STREAM_RATE;
+// memcpy(&station->fileBufferSize, stream_buffer, MAX_STREAM_RATE);
+
+
+// // load the first buffer into the stations
+// seek_stations(station_num);
+// }
+
+// void seek_stations(int station_num) {
+// station_t *station_info = &station_data[station_num];
+// memset(&station_info->fileBuffer, 0, MAX_STREAM_RATE);
+// int bytes_read = read(station_info->streamFd, &station_info->fileBuffer, MAX_STREAM_RATE);
+// lseek(station_info->streamFd, -16, SEEK_SET);
+// // printf("station info - bytes read: %d, station_fd: %d, filePath: %s, buffersize: %d\n", bytes_read, station_info->streamFd, station_info->filePath, station_info->fileBufferSize);
+
+// // time to restart the file
+// if (bytes_read == 0) {
+// if (lseek(station_info->streamFd, 0, SEEK_SET) == -1) {
+// perror("fseek");
+// }
+// pthread_t send_announce_thread;
+// pthread_create(&send_announce_thread, NULL, send_announce_routine, (void *)station_num);
+
+// // load first chunk
+// bytes_read = read(station_info->streamFd, &station_info->fileBuffer, MAX_STREAM_RATE);
+// }
+// station_info->fileBufferSize = bytes_read;
+// }
// Parses a buffer into tokens, from cs33 :)
diff --git a/snowcast_server b/snowcast_server
index cb074ef..89e4cae 100755
--- a/snowcast_server
+++ b/snowcast_server
Binary files differ
diff --git a/snowcast_server.dSYM/Contents/Resources/DWARF/snowcast_server b/snowcast_server.dSYM/Contents/Resources/DWARF/snowcast_server
index 948984a..667c8c1 100644
--- a/snowcast_server.dSYM/Contents/Resources/DWARF/snowcast_server
+++ b/snowcast_server.dSYM/Contents/Resources/DWARF/snowcast_server
Binary files differ