Skip to content

Commit bb203bd

Browse files
committed
connect to storage server failover with multi IPs
1 parent 020857d commit bb203bd

File tree

12 files changed

+408
-60
lines changed

12 files changed

+408
-60
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,4 @@ target
1515
*.conf
1616
*.PNG
1717
*.class
18+
*.swp

HISTORY

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11

2-
Version 1.31 2023-12-07
2+
Version 1.31 2023-12-13
33
* adapt to FastDFS server V6.11 for IPv6
44
you must upgrade your FastDFS server to V6.11 or higher version
5+
* connect to storage server failover with multi IPs
56

67
Version 1.30 2023-01-29
78
* support tracker server fail over

fdfs_client.conf

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,13 @@ tracker_server = 10.0.11.247:22122
1616
tracker_server = 10.0.11.248:22122
1717
tracker_server = 10.0.11.249:22122
1818

19+
# connect which ip address first for multi IPs of a storage server, value list:
20+
## tracker: connect to the ip address return by tracker server first
21+
## last-connected: connect to the ip address last connected first
22+
# default value is tracker
23+
connect_first_by = tracker
24+
1925
connection_pool.enabled = true
2026
connection_pool.max_count_per_entry = 500
2127
connection_pool.max_idle_time = 3600
2228
connection_pool.max_wait_time_in_ms = 1000
23-
24-
server_ipv6.enabled = false

src/main/java/org/csource/fastdfs/ClientGlobal.java

Lines changed: 96 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public class ClientGlobal {
4242
public static final String PROP_KEY_HTTP_SECRET_KEY = "fastdfs.http_secret_key";
4343
public static final String PROP_KEY_HTTP_TRACKER_HTTP_PORT = "fastdfs.http_tracker_http_port";
4444
public static final String PROP_KEY_TRACKER_SERVERS = "fastdfs.tracker_servers";
45-
45+
public static final String PROP_KEY_CONNECT_FIRST_BY = "fastdfs.connect_first_by";
4646

4747
public static final String PROP_KEY_CONNECTION_POOL_ENABLED = "fastdfs.connection_pool.enabled";
4848
public static final String PROP_KEY_CONNECTION_POOL_MAX_COUNT_PER_ENTRY = "fastdfs.connection_pool.max_count_per_entry";
@@ -56,6 +56,9 @@ public class ClientGlobal {
5656
public static final String DEFAULT_HTTP_SECRET_KEY = "FastDFS1234567890";
5757
public static final int DEFAULT_HTTP_TRACKER_HTTP_PORT = 80;
5858

59+
public static final int CONNECT_FIRST_BY_TRACKER = 0;
60+
public static final int CONNECT_FIRST_BY_LAST_CONNECTED = 1;
61+
5962
public static final boolean DEFAULT_CONNECTION_POOL_ENABLED = true;
6063
public static final int DEFAULT_CONNECTION_POOL_MAX_COUNT_PER_ENTRY = 100;
6164
public static final int DEFAULT_CONNECTION_POOL_MAX_IDLE_TIME = 3600 ;//second
@@ -67,6 +70,9 @@ public class ClientGlobal {
6770
public static boolean g_anti_steal_token = DEFAULT_HTTP_ANTI_STEAL_TOKEN; //if anti-steal token
6871
public static String g_secret_key = DEFAULT_HTTP_SECRET_KEY; //generage token secret key
6972
public static int g_tracker_http_port = DEFAULT_HTTP_TRACKER_HTTP_PORT;
73+
public static int g_connect_first_by = CONNECT_FIRST_BY_TRACKER;
74+
public static boolean g_multi_storage_ips = false;
75+
public static StorageAddressMap g_storages_address_map;
7076

7177
public static boolean g_connection_pool_enabled = DEFAULT_CONNECTION_POOL_ENABLED;
7278
public static int g_connection_pool_max_count_per_entry = DEFAULT_CONNECTION_POOL_MAX_COUNT_PER_ENTRY;
@@ -78,6 +84,76 @@ public class ClientGlobal {
7884
private ClientGlobal() {
7985
}
8086

87+
private static void loadStoragsFromTracker() throws IOException, MyException {
88+
TrackerClient tracker = new TrackerClient();
89+
StringBuilder builder = tracker.fetchStorageIds();
90+
if (builder.length() == 0) {
91+
return;
92+
}
93+
94+
System.out.println(builder.toString());
95+
96+
boolean without_port = true;
97+
int count = 0;
98+
String[] lines = builder.toString().split("\n");
99+
String[] ipAddresses = new String[lines.length];
100+
for (String line : lines) {
101+
String[] cols = line.split(" ");
102+
if (cols.length != 3) {
103+
throw new MyException("invalid line: " + line);
104+
}
105+
106+
String ipAddrs = cols[2];
107+
if (ipAddrs.indexOf(',') > 0) {
108+
ipAddresses[count++] = ipAddrs;
109+
}
110+
}
111+
112+
if (count == 0) {
113+
return;
114+
}
115+
116+
int startIndex;
117+
if (ipAddresses[0].charAt(0) == '[') { //IPv6
118+
if ((startIndex=ipAddresses[0].indexOf(']')) < 0) {
119+
throw new MyException("invalid IPv6 address: " + ipAddresses[0]);
120+
}
121+
} else {
122+
startIndex = 0;
123+
}
124+
if (ipAddresses[0].indexOf(':', startIndex) > 0) {
125+
without_port = false;
126+
}
127+
128+
g_multi_storage_ips = true;
129+
g_storages_address_map = new StorageAddressMap(without_port);
130+
if (without_port) {
131+
for (String ipAddr: ipAddresses) {
132+
if (ipAddr.charAt(0) == '[') { //IPv6
133+
ipAddr = ipAddr.substring(1, ipAddr.length() - 1);
134+
}
135+
String[] cols = ipAddr.split(",");
136+
g_storages_address_map.puts(cols[0], cols[1]);
137+
}
138+
} else {
139+
for (String ipPort: ipAddresses) {
140+
int colonIndex = ipPort.lastIndexOf(':');
141+
if (colonIndex < 0) {
142+
throw new MyException("invalid ip and port: " + ipPort);
143+
}
144+
145+
String ipAddr = ipPort.substring(0, colonIndex);
146+
int port = Integer.parseInt(ipPort.substring(colonIndex + 1));
147+
148+
if (ipAddr.charAt(0) == '[') { //IPv6
149+
ipAddr = ipAddr.substring(1, ipAddr.length() - 1);
150+
}
151+
String[] cols = ipAddr.split(",");
152+
g_storages_address_map.puts(cols[0], cols[1], port);
153+
}
154+
}
155+
}
156+
81157
/**
82158
* load global variables
83159
*
@@ -114,11 +190,11 @@ public static void init(String conf_filename) throws IOException, MyException {
114190

115191
InetSocketAddress[] tracker_servers = new InetSocketAddress[szTrackerServers.length];
116192
for (int i = 0; i < szTrackerServers.length; i++) {
117-
if(szTrackerServers[i].contains("[")){
193+
if (szTrackerServers[i].contains("[")) {
118194
parts = new String[2];
119195
parts[0] = szTrackerServers[i].substring(1, szTrackerServers[i].indexOf("]"));
120196
parts[1] = szTrackerServers[i].substring(szTrackerServers[i].lastIndexOf(":") + 1);
121-
}else {
197+
} else {
122198
parts = szTrackerServers[i].split("\\:", 2);
123199
}
124200

@@ -130,6 +206,11 @@ public static void init(String conf_filename) throws IOException, MyException {
130206
}
131207
g_tracker_group = new TrackerGroup(tracker_servers);
132208

209+
String connect_first_by = iniReader.getStrValue("connect_first_by");
210+
if (connect_first_by != null && connect_first_by.equalsIgnoreCase("last-connected")) {
211+
g_connect_first_by = CONNECT_FIRST_BY_LAST_CONNECTED;
212+
}
213+
133214
g_tracker_http_port = iniReader.getIntValue("http.tracker_http_port", 80);
134215
g_anti_steal_token = iniReader.getBoolValue("http.anti_steal_token", false);
135216
if (g_anti_steal_token) {
@@ -146,6 +227,8 @@ public static void init(String conf_filename) throws IOException, MyException {
146227
if (g_connection_pool_max_wait_time_in_ms < 0) {
147228
g_connection_pool_max_wait_time_in_ms = DEFAULT_CONNECTION_POOL_MAX_WAIT_TIME_IN_MS;
148229
}
230+
231+
loadStoragsFromTracker();
149232
}
150233

151234
/**
@@ -183,6 +266,8 @@ public static void initByProperties(Properties props) throws IOException, MyExce
183266
String httpAntiStealTokenConf = props.getProperty(PROP_KEY_HTTP_ANTI_STEAL_TOKEN);
184267
String httpSecretKeyConf = props.getProperty(PROP_KEY_HTTP_SECRET_KEY);
185268
String httpTrackerHttpPortConf = props.getProperty(PROP_KEY_HTTP_TRACKER_HTTP_PORT);
269+
270+
String connectFirstBy = props.getProperty(PROP_KEY_CONNECT_FIRST_BY);
186271
String poolEnabled = props.getProperty(PROP_KEY_CONNECTION_POOL_ENABLED);
187272
String poolMaxCountPerEntry = props.getProperty(PROP_KEY_CONNECTION_POOL_MAX_COUNT_PER_ENTRY);
188273
String poolMaxIdleTime = props.getProperty(PROP_KEY_CONNECTION_POOL_MAX_IDLE_TIME);
@@ -205,6 +290,10 @@ public static void initByProperties(Properties props) throws IOException, MyExce
205290
if (httpTrackerHttpPortConf != null && httpTrackerHttpPortConf.trim().length() != 0) {
206291
g_tracker_http_port = Integer.parseInt(httpTrackerHttpPortConf);
207292
}
293+
294+
if (connectFirstBy != null && connectFirstBy.equalsIgnoreCase("last-connected")) {
295+
g_connect_first_by = CONNECT_FIRST_BY_LAST_CONNECTED;
296+
}
208297
if (poolEnabled != null && poolEnabled.trim().length() != 0) {
209298
g_connection_pool_enabled = Boolean.parseBoolean(poolEnabled);
210299
}
@@ -217,6 +306,8 @@ public static void initByProperties(Properties props) throws IOException, MyExce
217306
if (poolMaxWaitTimeInMS != null && poolMaxWaitTimeInMS.trim().length() != 0) {
218307
g_connection_pool_max_wait_time_in_ms = Integer.parseInt(poolMaxWaitTimeInMS);
219308
}
309+
310+
loadStoragsFromTracker();
220311
}
221312

222313
/**
@@ -360,6 +451,8 @@ public static String configInfo() {
360451
+ "\n g_anti_steal_token = " + g_anti_steal_token
361452
+ "\n g_secret_key = " + g_secret_key
362453
+ "\n g_tracker_http_port = " + g_tracker_http_port
454+
+ "\n g_multi_storage_ips = " + g_multi_storage_ips
455+
+ "\n g_connect_first_by = " + (g_connect_first_by == CONNECT_FIRST_BY_TRACKER ? "tracker" : "last-connected")
363456
+ "\n g_connection_pool_enabled = " + g_connection_pool_enabled
364457
+ "\n g_connection_pool_max_count_per_entry = " + g_connection_pool_max_count_per_entry
365458
+ "\n g_connection_pool_max_idle_time(ms) = " + g_connection_pool_max_idle_time

src/main/java/org/csource/fastdfs/ProtoCommon.java

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
*/
2727
public class ProtoCommon {
2828
public static final byte FDFS_PROTO_CMD_QUIT = 82;
29+
public static final byte TRACKER_PROTO_CMD_FETCH_STORAGE_IDS = 69;
2930
public static final byte TRACKER_PROTO_CMD_SERVER_LIST_GROUP = 91;
3031
public static final byte TRACKER_PROTO_CMD_SERVER_LIST_STORAGE = 92;
3132
public static final byte TRACKER_PROTO_CMD_SERVER_DELETE_STORAGE = 93;
@@ -310,26 +311,42 @@ public static boolean activeTest(Socket sock) throws IOException {
310311
return headerInfo.errno == 0 ? true : false;
311312
}
312313

314+
/**
315+
* int convert to buff (big-endian)
316+
*
317+
* @param n int number
318+
* @return 4 bytes buff
319+
*/
320+
public static byte[] int2buff(int n) {
321+
byte[] bs;
322+
323+
bs = new byte[4];
324+
bs[0] = (byte) ((n >> 24) & 0xFF);
325+
bs[1] = (byte) ((n >> 16) & 0xFF);
326+
bs[2] = (byte) ((n >> 8) & 0xFF);
327+
bs[3] = (byte) (n & 0xFF);
328+
return bs;
329+
}
330+
313331
/**
314332
* long convert to buff (big-endian)
315333
*
316334
* @param n long number
317335
* @return 8 bytes buff
318336
*/
319337
public static byte[] long2buff(long n) {
320-
byte[] bs;
321-
322-
bs = new byte[8];
323-
bs[0] = (byte) ((n >> 56) & 0xFF);
324-
bs[1] = (byte) ((n >> 48) & 0xFF);
325-
bs[2] = (byte) ((n >> 40) & 0xFF);
326-
bs[3] = (byte) ((n >> 32) & 0xFF);
327-
bs[4] = (byte) ((n >> 24) & 0xFF);
328-
bs[5] = (byte) ((n >> 16) & 0xFF);
329-
bs[6] = (byte) ((n >> 8) & 0xFF);
330-
bs[7] = (byte) (n & 0xFF);
331-
332-
return bs;
338+
byte[] bs;
339+
340+
bs = new byte[8];
341+
bs[0] = (byte) ((n >> 56) & 0xFF);
342+
bs[1] = (byte) ((n >> 48) & 0xFF);
343+
bs[2] = (byte) ((n >> 40) & 0xFF);
344+
bs[3] = (byte) ((n >> 32) & 0xFF);
345+
bs[4] = (byte) ((n >> 24) & 0xFF);
346+
bs[5] = (byte) ((n >> 16) & 0xFF);
347+
bs[6] = (byte) ((n >> 8) & 0xFF);
348+
bs[7] = (byte) (n & 0xFF);
349+
return bs;
333350
}
334351

335352
/**
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/**
2+
* Copyright (C) 2023 Happy Fish / YuQing
3+
* <p>
4+
* FastDFS Java Client may be copied only under the terms of the GNU Lesser
5+
* General Public License (LGPL).
6+
* Please visit the FastDFS Home Page https://github.com/happyfish100/fastdfs for more detail.
7+
*/
8+
9+
package org.csource.fastdfs;
10+
11+
import java.util.HashMap;
12+
import java.net.InetSocketAddress;
13+
14+
/**
15+
* Storage Server Address Map
16+
*
17+
* @author Happy Fish / YuQing
18+
* @version Version 1.31
19+
*/
20+
public class StorageAddressMap {
21+
22+
protected boolean without_port;
23+
protected HashMap<String, InetSocketAddress> storages;
24+
25+
public StorageAddressMap(boolean without_port) {
26+
this.without_port = without_port;
27+
this.storages = new HashMap<String, InetSocketAddress>();
28+
}
29+
30+
protected String getKey(String ipAddr, int port) {
31+
return ipAddr + "@" + port;
32+
}
33+
34+
public void puts(String srcIpAddr, String destIpAddr, int port) {
35+
storages.put(this.getKey(srcIpAddr, port),
36+
new InetSocketAddress(destIpAddr, port));
37+
storages.put(this.getKey(destIpAddr, port),
38+
new InetSocketAddress(srcIpAddr, port));
39+
}
40+
41+
public void puts(String srcIpAddr, String destIpAddr) {
42+
storages.put(srcIpAddr, new InetSocketAddress(destIpAddr, 0));
43+
storages.put(destIpAddr, new InetSocketAddress(srcIpAddr, 0));
44+
}
45+
46+
public InetSocketAddress get(String ipAddr, int port) {
47+
if (this.without_port) {
48+
InetSocketAddress sockAddr;
49+
if ((sockAddr=storages.get(ipAddr)) == null) {
50+
return null;
51+
}
52+
53+
return new InetSocketAddress(sockAddr.getAddress(), port);
54+
} else {
55+
return storages.get(this.getKey(ipAddr, port));
56+
}
57+
}
58+
59+
public InetSocketAddress get(InetSocketAddress sockAddr) {
60+
return this.get(sockAddr.getAddress().getHostAddress(), sockAddr.getPort());
61+
}
62+
}

src/main/java/org/csource/fastdfs/StorageClient.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -847,7 +847,6 @@ protected String[] do_upload_file(byte cmd, String group_name, String master_fil
847847
throw ex;
848848
} finally {
849849
releaseConnection(connection, bNewStorageServer);
850-
851850
}
852851
}
853852

0 commit comments

Comments
 (0)