Skip to content

Commit 061c3b0

Browse files
authored
Merge pull request #2 from vicweeks/milestone2-patch0
Milestone2 patch0
2 parents 72d1679 + 009d5fa commit 061c3b0

File tree

8 files changed

+262
-7
lines changed

8 files changed

+262
-7
lines changed

cs455/scaling/client/Client.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,17 @@
11
package cs455.scaling.client;
22

3-
public class Client {
3+
import cs455.scaling.tasks.ClientTask;
4+
import java.nio.channels.SocketChannel;
5+
import java.net.InetSocketAddress;
6+
import java.io.IOException;
47

8+
public class Client {
9+
10+
private static SocketChannel socketChannel;
11+
512
public static void main(String[] args) {
613

7-
Client s = new Client();
14+
Client c = new Client();
815

916
if (args.length < 3) {
1017
System.err.println(
@@ -19,7 +26,26 @@ public static void main(String[] args) {
1926

2027
if (args.length == 4)
2128
debug = true;
29+
30+
try {
31+
c.setUpChannel(serverHost, serverPort);
32+
ClientTask clientTask = new ClientTask(socketChannel, messageRate);
33+
clientTask.run();
34+
} catch (IOException ioe) {
35+
System.out.println(ioe.getMessage());
36+
} catch (InterruptedException ie) {
37+
System.out.println(ie.getMessage());
38+
}
2239

2340
}
41+
42+
private void setUpChannel(String serverHost, int serverPort)
43+
throws IOException, InterruptedException{
44+
System.out.println("Setting up client...");
45+
socketChannel = SocketChannel.open();
46+
socketChannel.connect(new InetSocketAddress(serverHost, serverPort));
47+
while(!socketChannel.finishConnect())
48+
Thread.sleep(100);
49+
}
2450

2551
}

cs455/scaling/server/Server.java

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,24 @@
11
package cs455.scaling.server;
22

3+
import cs455.scaling.threadpool.ThreadPoolManager;
4+
import cs455.scaling.tasks.ServerTask;
5+
import java.nio.channels.Selector;
6+
import java.nio.channels.SelectionKey;
7+
import java.nio.channels.ServerSocketChannel;
8+
import java.nio.channels.SocketChannel;
9+
import java.nio.channels.ClosedChannelException;
10+
import java.nio.ByteBuffer;
11+
import java.net.InetSocketAddress;
12+
import java.io.IOException;
13+
import java.util.Set;
14+
315
public class Server {
16+
17+
private static boolean debug;
18+
private static ThreadPoolManager tpm;
19+
private static ServerSocketChannel ssChannel;
20+
private static Selector serverSelector;
21+
//private final int clientInterestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
422

523
public static void main(String[] args) {
624

@@ -14,11 +32,68 @@ public static void main(String[] args) {
1432

1533
int portNumber = Integer.parseInt(args[0]);
1634
int threadPoolSize = Integer.parseInt(args[1]);
17-
boolean debug = false;
35+
debug = false;
1836

1937
if (args.length == 3)
2038
debug = true;
39+
40+
tpm = new ThreadPoolManager(threadPoolSize, debug);
2141

42+
try {
43+
s.setupServerSocket(portNumber);
44+
serverSelector = Selector.open();
45+
s.executeServerLoop();
46+
} catch (IOException ioe) {
47+
System.out.println(ioe.getMessage());
48+
}
49+
50+
}
51+
52+
private void setupServerSocket(int portNumber) throws IOException {
53+
System.out.println("Setting up server...");
54+
ssChannel = ServerSocketChannel.open();
55+
ssChannel.socket().bind(new InetSocketAddress(portNumber));
56+
ssChannel.configureBlocking(false);
57+
}
58+
59+
private void executeServerLoop() throws IOException {
60+
while (true) {
61+
// Get new sockets
62+
SocketChannel socketChannel = ssChannel.accept();
63+
// Register new sockets
64+
if (socketChannel != null)
65+
registerIncomingKey(socketChannel);
66+
// Find read ready channels
67+
if (serverSelector.selectNow() > 0) {
68+
Set<SelectionKey> selectedKeys = serverSelector.selectedKeys();
69+
70+
for (SelectionKey key : selectedKeys) {
71+
if (key.isReadable()) {
72+
// create and enqueue a task to read message from client
73+
createTask(key);
74+
tpm.assignTask();
75+
}
76+
selectedKeys.remove(key);
77+
}
78+
79+
}
80+
}
81+
}
82+
83+
private void createTask(SelectionKey key) throws IOException {
84+
ServerTask task = new ServerTask(key);
85+
tpm.addTaskToQueue(task);
86+
}
87+
88+
private void registerIncomingKey(SocketChannel socketChannel) throws IOException {
89+
if (debug)
90+
System.out.println("Registering new client...");
91+
try {
92+
socketChannel.configureBlocking(false);
93+
SelectionKey key = socketChannel.register(serverSelector, SelectionKey.OP_READ);
94+
} catch (ClosedChannelException cce) {
95+
System.out.println(cce.getMessage());
96+
}
2297
}
2398

2499
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package cs455.scaling.tasks;
2+
3+
import cs455.scaling.util.HashGenerator;
4+
import java.nio.channels.SocketChannel;
5+
import java.nio.ByteBuffer;
6+
import java.io.IOException;
7+
import java.util.Random;
8+
import java.util.LinkedList;
9+
10+
public class ClientTask extends Thread {
11+
12+
private SocketChannel socketChannel;
13+
private ByteBuffer buf;
14+
private int messageRate;
15+
private LinkedList<String> packetHashcodes;
16+
private final HashGenerator hashGen;
17+
18+
public ClientTask(SocketChannel socketChannel, int messageRate) {
19+
this.socketChannel = socketChannel;
20+
buf = ByteBuffer.allocate(8000);
21+
this.messageRate = messageRate;
22+
packetHashcodes = new LinkedList<String>();
23+
hashGen = new HashGenerator();
24+
}
25+
26+
public void run() {
27+
while(!isInterrupted()) {
28+
// test
29+
try {
30+
sendMessage();
31+
Thread.sleep(1000 / messageRate);
32+
} catch (IOException ioe) {
33+
System.out.println(ioe.getMessage());
34+
} catch (InterruptedException ie) {
35+
System.out.println(ie.getMessage());
36+
}
37+
}
38+
}
39+
40+
private byte[] generateRandomBytes() {
41+
byte[] rBytes = new byte[8000];
42+
new Random().nextBytes(rBytes);
43+
return rBytes;
44+
}
45+
46+
private void sendMessage() throws IOException {
47+
byte[] message = generateRandomBytes();
48+
String messageHash = hashGen.SHA1FromBytes(message);
49+
packetHashcodes.add(messageHash);
50+
buf.clear();
51+
buf.put(message);
52+
buf.flip();
53+
54+
while(buf.hasRemaining())
55+
socketChannel.write(buf);
56+
}
57+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package cs455.scaling.tasks;
2+
3+
import cs455.scaling.util.HashGenerator;
4+
import java.nio.channels.SelectionKey;
5+
import java.nio.channels.SocketChannel;
6+
import java.nio.ByteBuffer;
7+
import java.io.IOException;
8+
9+
public class ServerTask implements Runnable {
10+
11+
private final HashGenerator hashGen;
12+
private SocketChannel socketChannel;
13+
private ByteBuffer buf;
14+
15+
public ServerTask(SelectionKey key) {
16+
hashGen = new HashGenerator();
17+
socketChannel = (SocketChannel) key.channel();
18+
buf = ByteBuffer.allocate(8000);
19+
}
20+
21+
public void run() {
22+
try {
23+
int bytesRead = socketChannel.read(buf);
24+
byte[] message = new byte[bytesRead];
25+
buf.flip();
26+
buf.get(message);
27+
String messageHash = getHash(message);
28+
System.out.println(messageHash);
29+
buf.clear();
30+
//replyWithHash(messageHash);
31+
} catch (IOException ioe) {
32+
System.out.println(ioe.getMessage());
33+
}
34+
}
35+
36+
private String getHash(byte[] message) {
37+
return hashGen.SHA1FromBytes(message);
38+
}
39+
40+
private void replyWithHash(String messageHash) throws IOException {
41+
byte[] replyMessage = messageHash.getBytes();
42+
buf.put(replyMessage);
43+
buf.flip();
44+
45+
while(buf.hasRemaining())
46+
socketChannel.write(buf);
47+
}
48+
49+
}

cs455/scaling/threadpool/ThreadPoolManager.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,32 @@ public ThreadPoolManager(int threadPoolSize, boolean debug) {
1616
this.taskQueue = new LinkedList<Runnable>();
1717
this.threadPool = new FixedThreadPool(threadPoolSize, debug);
1818
}
19+
20+
public void addTaskToQueue(Runnable task) {
21+
taskQueue.add(task);
22+
}
23+
24+
public void assignTask() {
25+
synchronized(taskQueue) {
26+
WorkerThread thread = threadPool.retrieveSpareWorker();
27+
Runnable task = taskQueue.pollFirst();
28+
if (task == null) {
29+
if (debug)
30+
System.out.println("Task Queue is empty, waiting for tasks.");
31+
try {
32+
taskQueue.wait();
33+
task = taskQueue.removeFirst();
34+
} catch (InterruptedException ie) {
35+
System.out.println(ie.getMessage());
36+
}
37+
}
38+
thread.assignTask(task);
39+
}
40+
}
1941

2042
public static void main(String[] args) {
2143

22-
//TODO: partial implementation
44+
// partial implementation
2345
// Note: this component can be created and tested in isolation from the rest of the system
2446
// should allocate a given number of threads;
2547
// maintain a queue of pending tasks;
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package cs455.scaling.util;
2+
3+
import java.security.MessageDigest;
4+
import java.math.BigInteger;
5+
import java.security.NoSuchAlgorithmException;
6+
7+
public class HashGenerator {
8+
9+
private MessageDigest digest = null;
10+
11+
public HashGenerator() {
12+
try {
13+
digest = MessageDigest.getInstance("SHA1");
14+
} catch (NoSuchAlgorithmException nsae) {
15+
System.out.println(nsae.getMessage());
16+
}
17+
}
18+
19+
public String SHA1FromBytes(byte[] data) {
20+
byte[] hash = digest.digest(data);
21+
BigInteger hashInt = new BigInteger(1, hash);
22+
return hashInt.toString(16);
23+
}
24+
25+
}

makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ PACKAGES = \
44
cs455/scaling/server \
55
cs455/scaling/client \
66
cs455/scaling/threadpool \
7-
cs455/scaling/tasks
7+
cs455/scaling/tasks \
8+
cs455/scaling/util
89

910
JC = javac
1011
JVM = java

setup

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ test_home=Classes/Spring_18/CS455/HW2-PC/CS455-HW2-PC
66

77
for i in `cat machine_list`
88
do
9-
for value in {1..7}
9+
for value in {1..2}
1010
do
1111
echo 'logging into '${i}
12-
gnome-terminal -- bash -c "ssh -t ${i} 'cd ${test_home};java cs455.scaling.client.Client richmond 4444 4;bash;'" &
12+
gnome-terminal -- bash -c "ssh -t ${i} 'cd ${test_home};java cs455.scaling.client.Client richmond 4444 1 d;bash;'" &
1313
done
1414
done

0 commit comments

Comments
 (0)