Skip to content

Commit 0dc9bd0

Browse files
authored
Merge pull request ceph#62982 from samarahu/wip-rgw-d4n-next-test
rgw: D4N Test Updates Reviewed-by: Pritha Srivastava <[email protected]>
2 parents c2bee4c + f08f709 commit 0dc9bd0

File tree

7 files changed

+201
-127
lines changed

7 files changed

+201
-127
lines changed

qa/workunits/rgw/test_rgw_d4n.py

Lines changed: 141 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,29 @@
11
#!/usr/bin/python3
22

33
'''
4-
This workunits tests the functionality of the D4N read workflow on a small object of size 4.
4+
This workunit tests the functionality of the D4N read workflow on a small object of size 4 and
5+
a multipart object of a randomly generated size. Each test runs the following workflow:
6+
7+
1. Upload the object
8+
2. Perform a GET call (object should be retrieved from backend)
9+
3. Compare the cached object's contents to the original object
10+
4. Check the directory contents
11+
5. Perform another GET call (object should be retrieved from datacache)
12+
6. Compare the cached object's contents to the original object
13+
7. Check the directory contents once more
514
'''
615

716
import logging as log
817
from configobj import ConfigObj
18+
import botocore
919
import boto3
1020
import redis
1121
import subprocess
12-
import json
1322
import os
1423
import hashlib
1524
import string
1625
import random
26+
import time
1727

1828
log.basicConfig(level=log.DEBUG)
1929

@@ -138,7 +148,8 @@ def get_body(response):
138148
got = got.decode()
139149
return got
140150

141-
def test_small_object(r, client, obj):
151+
def test_small_object(r, client, s3):
152+
obj = s3.Object(bucket_name='bkt', key='test.txt')
142153
test_txt = 'test'
143154

144155
response_put = obj.put(Body=test_txt)
@@ -156,65 +167,65 @@ def test_small_object(r, client, obj):
156167
body = get_body(response_get)
157168
assert(body == "test")
158169

159-
data = subprocess.check_output(['ls', '/tmp/rgw_d4n_datacache/'])
160-
data = data.decode('latin-1').strip()
161-
output = subprocess.check_output(['md5sum', '/tmp/rgw_d4n_datacache/' + data]).decode('latin-1')
162-
170+
bucketID = subprocess.check_output(['ls', '/tmp/rgw_d4n_datacache/']).decode('latin-1').strip()
171+
datacache_path = '/tmp/rgw_d4n_datacache/' + bucketID + '/test.txt/'
172+
datacache = subprocess.check_output(['ls', '-a', datacache_path])
173+
datacache = datacache.decode('latin-1').strip().splitlines()
174+
if '#' in datacache[3]: # datablock key
175+
datacache = datacache[3]
176+
else:
177+
datacache = datacache[2]
178+
output = subprocess.check_output(['md5sum', datacache_path + datacache]).decode('latin-1')
163179
assert(output.splitlines()[0].split()[0] == hashlib.md5("test".encode('utf-8')).hexdigest())
164180

165-
data = r.hgetall('bkt_test.txt_0_4')
166-
output = subprocess.check_output(['radosgw-admin', 'object', 'stat', '--bucket=bkt', '--object=test.txt'])
167-
attrs = json.loads(output.decode('latin-1'))
168-
169-
# directory entry comparisons
170-
assert(data.get('blockID') == '0')
171-
assert(data.get('version') == attrs.get('tag'))
172-
assert(data.get('size') == '4')
173-
assert(data.get('globalWeight') == '0')
174-
assert(data.get('blockHosts') == '127.0.0.1:6379')
175-
assert(data.get('objName') == 'test.txt')
176-
assert(data.get('bucketName') == 'bkt')
177-
assert(data.get('creationTime') == attrs.get('mtime'))
178-
assert(data.get('dirty') == '0')
179-
assert(data.get('objHosts') == '')
180-
181-
# repopulate cache
182-
response_put = obj.put(Body=test_txt)
183-
assert(response_put.get('ResponseMetadata').get('HTTPStatusCode') == 200)
181+
data = {}
182+
for entry in r.scan_iter("*_test.txt_0_4"):
183+
data = r.hgetall(entry)
184+
185+
# directory entry comparisons
186+
assert(data.get('blockID') == '0')
187+
assert(data.get('deleteMarker') == '0')
188+
assert(data.get('size') == '4')
189+
assert(data.get('globalWeight') == '0')
190+
assert(data.get('objName') == 'test.txt')
191+
assert(data.get('bucketName') == bucketID)
192+
assert(data.get('dirty') == '0')
193+
assert(data.get('hosts') == '127.0.0.1:6379')
184194

185195
# second get call
186196
response_get = obj.get()
187197
assert(response_get.get('ResponseMetadata').get('HTTPStatusCode') == 200)
188198

189199
# check logs to ensure object was retrieved from cache
190-
res = subprocess.call(['grep', '"SSDCache: get_async(): ::aio_read(), ret=0"', '/var/log/ceph/rgw.ceph.client.0.log'])
200+
oid_in_cache = bucketID + "#" + data.get('version') + "test.txt#0" + data.get('size')
201+
res = subprocess.call(['grep', '"D4NFilterObject::iterate:: iterate(): READ FROM CACHE: oid="' + oid_in_cache, '/var/log/ceph/rgw.ceph.client.0.log'])
191202
assert(res >= 1)
192203

193204
# retrieve and compare cache contents
194205
body = get_body(response_get)
195206
assert(body == "test")
196207

197-
data = subprocess.check_output(['ls', '/tmp/rgw_d4n_datacache/'])
198-
data = data.decode('latin-1').strip()
199-
output = subprocess.check_output(['md5sum', '/tmp/rgw_d4n_datacache/' + data]).decode('latin-1')
200-
208+
datacache = subprocess.check_output(['ls', '-a', datacache_path])
209+
datacache = datacache.decode('latin-1').strip().splitlines()
210+
if '#' in datacache[3]: # datablock key
211+
datacache = datacache[3]
212+
else:
213+
datacache = datacache[2]
214+
output = subprocess.check_output(['md5sum', datacache_path + datacache]).decode('latin-1')
201215
assert(output.splitlines()[0].split()[0] == hashlib.md5("test".encode('utf-8')).hexdigest())
202216

203-
data = r.hgetall('bkt_test.txt_0_4')
204-
output = subprocess.check_output(['radosgw-admin', 'object', 'stat', '--bucket=bkt', '--object=test.txt'])
205-
attrs = json.loads(output.decode('latin-1'))
206-
207-
# directory entries should remain consistent
208-
assert(data.get('blockID') == '0')
209-
assert(data.get('version') == attrs.get('tag'))
210-
assert(data.get('size') == '4')
211-
assert(data.get('globalWeight') == '0')
212-
assert(data.get('blockHosts') == '127.0.0.1:6379')
213-
assert(data.get('objName') == 'test.txt')
214-
assert(data.get('bucketName') == 'bkt')
215-
assert(data.get('creationTime') == attrs.get('mtime'))
216-
assert(data.get('dirty') == '0')
217-
assert(data.get('objHosts') == '')
217+
for entry in r.scan_iter("*_test.txt_0_4"):
218+
data = r.hgetall(entry)
219+
220+
# directory entries should remain consistent
221+
assert(data.get('blockID') == '0')
222+
assert(data.get('deleteMarker') == '0')
223+
assert(data.get('size') == '4')
224+
assert(data.get('globalWeight') == '0')
225+
assert(data.get('objName') == 'test.txt')
226+
assert(data.get('bucketName') == bucketID)
227+
assert(data.get('dirty') == '0')
228+
assert(data.get('hosts') == '127.0.0.1:6379')
218229

219230
r.flushall()
220231

@@ -225,94 +236,121 @@ def test_large_object(r, client, s3):
225236
objlen = 30 * 1024 * 1024
226237
metadata = {'foo': 'bar'}
227238

228-
(upload_id, data, parts) = _multipart_upload(bucket_name=bucket_name, key=key, size=objlen, client=client, content_type=content_type, metadata=metadata)
239+
(upload_id, multipart_data, parts) = _multipart_upload(bucket_name=bucket_name, key=key, size=objlen, client=client, content_type=content_type, metadata=metadata)
229240
client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts})
230241

231242
file_path = os.path.dirname(__file__)+'mymultipart'
232243

233244
# first get
234-
s3.Object(bucket_name, key).download_file(file_path)
245+
try:
246+
s3.Object(bucket_name, key).download_file(file_path)
247+
except botocore.exceptions.ClientError as e:
248+
log.error("ERROR: " + e)
249+
raise
235250

236251
# check logs to ensure object was retrieved from storage backend
237252
res = subprocess.call(['grep', '"D4NFilterObject::iterate:: iterate(): Fetching object from backend store"', '/var/log/ceph/rgw.ceph.client.0.log'])
238253
assert(res >= 1)
239254

240255
# retrieve and compare cache contents
241256
with open(file_path, 'r') as body:
242-
assert(body.read() == data)
257+
assert(body.read() == multipart_data)
243258

244-
datacache_path = '/tmp/rgw_d4n_datacache/'
245-
datacache = subprocess.check_output(['ls', datacache_path])
246-
datacache = datacache.decode('latin-1').splitlines()
259+
time.sleep(0.1)
260+
bucketID = subprocess.check_output(['ls', '/tmp/rgw_d4n_datacache/']).decode('latin-1').strip()
261+
datacache_path = '/tmp/rgw_d4n_datacache/' + bucketID + '/mymultipart/'
262+
datacache = subprocess.check_output(['ls', '-a', datacache_path])
263+
datacache = datacache.decode('latin-1').splitlines()[2:]
247264

248265
for file in datacache:
249-
ofs = int(file.split("_")[3])
250-
size = int(file.split("_")[4])
251-
output = subprocess.check_output(['md5sum', datacache_path + file]).decode('latin-1')
252-
assert(output.splitlines()[0].split()[0] == hashlib.md5(data[ofs:ofs+size].encode('utf-8')).hexdigest())
266+
if '#' in file: # data blocks
267+
ofs = int(file.split("#")[1])
268+
size = file.split("#")[2]
269+
if '_' in file: # account for temp files
270+
size = size.split("_")[0]
253271

254-
output = subprocess.check_output(['radosgw-admin', 'object', 'stat', '--bucket=bkt', '--object=mymultipart'])
255-
attrs = json.loads(output.decode('latin-1'))
272+
output = subprocess.check_output(['md5sum', datacache_path + file]).decode('latin-1')
273+
assert(output.splitlines()[0].split()[0] == hashlib.md5(multipart_data[ofs:ofs+int(size)].encode('utf-8')).hexdigest())
256274

257-
for entry in r.scan_iter("bkt_mymultipart_*"):
275+
data = {}
276+
for entry in r.scan_iter("*_mymultipart_*"):
258277
data = r.hgetall(entry)
259-
name = entry.split("_")
278+
entry_name = entry.split("_")
260279

261280
# directory entry comparisons
262-
assert(data.get('blockID') == name[2])
263-
assert(data.get('version') == attrs.get('tag'))
264-
assert(data.get('size') == name[3])
281+
if len(entry_name) == 6: # versioned block
282+
assert(data.get('blockID') == entry_name[4])
283+
assert(data.get('deleteMarker') == '0')
284+
assert(data.get('size') == entry_name[5])
285+
assert(data.get('globalWeight') == '0')
286+
assert(data.get('objName') == '_:null_mymultipart')
287+
assert(data.get('bucketName') == bucketID)
288+
assert(data.get('dirty') == '0')
289+
assert(data.get('hosts') == '127.0.0.1:6379')
290+
continue
291+
292+
assert(data.get('blockID') == entry_name[2])
293+
assert(data.get('deleteMarker') == '0')
294+
assert(data.get('size') == entry_name[3])
265295
assert(data.get('globalWeight') == '0')
266-
assert(data.get('blockHosts') == '127.0.0.1:6379')
267296
assert(data.get('objName') == 'mymultipart')
268-
assert(data.get('bucketName') == 'bkt')
269-
assert(data.get('creationTime') == attrs.get('mtime'))
297+
assert(data.get('bucketName') == bucketID)
270298
assert(data.get('dirty') == '0')
271-
assert(data.get('objHosts') == '')
272-
273-
# repopulate cache
274-
(upload_id, data, parts) = _multipart_upload(bucket_name=bucket_name, key=key, size=objlen, client=client, content_type=content_type, metadata=metadata)
275-
client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts})
299+
assert(data.get('hosts') == '127.0.0.1:6379')
276300

277-
#second get
278-
s3.Object(bucket_name, key).download_file(file_path)
301+
# second get
302+
try:
303+
s3.Object(bucket_name, key).download_file(file_path)
304+
except botocore.exceptions.ClientError as e:
305+
log.error("ERROR: " + e)
306+
raise
279307

280308
# check logs to ensure object was retrieved from cache
281-
res = subprocess.call(['grep', '"SSDCache: get_async(): ::aio_read(), ret=0"', '/var/log/ceph/rgw.ceph.client.0.log'])
309+
oid_in_cache = bucketID + "#" + data.get('version') + "mymultipart#0" + data.get('size')
310+
res = subprocess.call(['grep', '"D4NFilterObject::iterate:: iterate(): READ FROM CACHE: oid="' + oid_in_cache, '/var/log/ceph/rgw.ceph.client.0.log'])
282311
assert(res >= 1)
283312

284313
# retrieve and compare cache contents
285314
with open(file_path, 'r') as body:
286-
assert(body.read() == data)
315+
assert(body.read() == multipart_data)
287316

288-
datacache_path = '/tmp/rgw_d4n_datacache/'
289-
datacache = subprocess.check_output(['ls', datacache_path])
290-
datacache = datacache.decode('latin-1').splitlines()
317+
datacache = subprocess.check_output(['ls', '-a', datacache_path])
318+
datacache = datacache.decode('latin-1').splitlines()[2:]
291319

292320
for file in datacache:
293-
ofs = int(file.split("_")[3])
294-
size = int(file.split("_")[4])
295-
output = subprocess.check_output(['md5sum', datacache_path + file]).decode('latin-1')
296-
assert(output.splitlines()[0].split()[0] == hashlib.md5(data[ofs:ofs+size].encode('utf-8')).hexdigest())
297-
298-
output = subprocess.check_output(['radosgw-admin', 'object', 'stat', '--bucket=bkt', '--object=mymultipart'])
299-
attrs = json.loads(output.decode('latin-1'))
321+
if '#' in file: # data blocks
322+
ofs = int(file.split("#")[1])
323+
size = file.split("#")[2]
324+
if '_' in file: # account for temp files
325+
size = size.split("_")[0]
300326

301-
for key in r.scan_iter("bkt_mymultipart_*"):
302-
data = r.hgetall(key)
303-
name = key.split("_")
327+
output = subprocess.check_output(['md5sum', datacache_path + file]).decode('latin-1')
328+
assert(output.splitlines()[0].split()[0] == hashlib.md5(multipart_data[ofs:ofs+int(size)].encode('utf-8')).hexdigest())
304329

305-
# directory entry comparisons
306-
assert(data.get('blockID') == name[2])
307-
assert(data.get('version') == attrs.get('tag'))
308-
assert(data.get('size') == name[3])
330+
for entry in r.scan_iter("*_mymultipart_*"):
331+
data = r.hgetall(entry)
332+
entry_name = entry.split("_")
333+
334+
# directory entries should remain consistent
335+
if len(entry_name) == 6: # versioned block
336+
assert(data.get('blockID') == entry_name[4])
337+
assert(data.get('deleteMarker') == '0')
338+
assert(data.get('size') == entry_name[5])
339+
assert(data.get('globalWeight') == '0')
340+
assert(data.get('objName') == '_:null_mymultipart')
341+
assert(data.get('bucketName') == bucketID)
342+
assert(data.get('dirty') == '0')
343+
assert(data.get('hosts') == '127.0.0.1:6379')
344+
continue
345+
346+
assert(data.get('blockID') == entry_name[2])
347+
assert(data.get('deleteMarker') == '0')
348+
assert(data.get('size') == entry_name[3])
309349
assert(data.get('globalWeight') == '0')
310-
assert(data.get('blockHosts') == '127.0.0.1:6379')
311350
assert(data.get('objName') == 'mymultipart')
312-
assert(data.get('bucketName') == 'bkt')
313-
assert(data.get('creationTime') == attrs.get('mtime'))
351+
assert(data.get('bucketName') == bucketID)
314352
assert(data.get('dirty') == '0')
315-
assert(data.get('objHosts') == '')
353+
assert(data.get('hosts') == '127.0.0.1:6379')
316354

317355
r.flushall()
318356

@@ -346,14 +384,13 @@ def main():
346384

347385
bucket = s3.Bucket('bkt')
348386
bucket.create()
349-
obj = s3.Object(bucket_name='bkt', key='test.txt')
350387

351388
# Check for Redis instance
352389
try:
353390
connection = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
354391
connection.ping()
355392
except:
356-
log.debug("ERROR: Redis instance not running.")
393+
log.error("ERROR: Redis instance not running.")
357394
raise
358395

359396
# Create s3cmd config
@@ -363,10 +400,15 @@ def main():
363400
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
364401

365402
# Run small object test
366-
test_small_object(r, client, obj)
403+
test_small_object(r, client, s3)
367404

368405
# Run large object test
369406
test_large_object(r, client, s3)
407+
408+
# close filter client
409+
filter_client = [client for client in r.client_list()
410+
if client.get('name') in ['D4N.Filter']]
411+
r.client_kill_filter(_id=filter_client[0].get('id'))
370412

371413
log.info("D4NFilterTest completed.")
372414

src/rgw/driver/d4n/d4n_policy.h

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -182,9 +182,10 @@ class LFUDAPolicy : public CachePolicy {
182182
int delete_data_blocks(const DoutPrefixProvider* dpp, LFUDAObjEntry* e, optional_yield y);
183183

184184
public:
185-
LFUDAPolicy(std::shared_ptr<connection>& conn, rgw::cache::CacheDriver* cacheDriver) : CachePolicy(),
186-
conn(conn),
187-
cacheDriver(cacheDriver)
185+
LFUDAPolicy(std::shared_ptr<connection>& conn, rgw::cache::CacheDriver* cacheDriver, optional_yield y) : CachePolicy(),
186+
y(y),
187+
conn(conn),
188+
cacheDriver(cacheDriver)
188189
{
189190
blockDir = new BlockDirectory{conn};
190191
objDir = new ObjectDirectory{conn};
@@ -207,7 +208,6 @@ class LFUDAPolicy : public CachePolicy {
207208
virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, uint8_t op, optional_yield y, std::string& restore_val=empty) override;
208209
virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
209210
virtual bool _erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y);
210-
void save_y(optional_yield y) { this->y = y; }
211211
virtual void update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size,
212212
double creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
213213
const rgw_obj_key& obj_key, uint8_t op, optional_yield y, std::string& restore_val=empty) override;
@@ -221,6 +221,7 @@ class LFUDAPolicy : public CachePolicy {
221221
}
222222
return it->second.first;
223223
}
224+
void save_y(optional_yield y) { this->y = y; }
224225
};
225226

226227
class LRUPolicy : public CachePolicy {
@@ -258,10 +259,10 @@ class PolicyDriver {
258259
CachePolicy* cachePolicy;
259260

260261
public:
261-
PolicyDriver(std::shared_ptr<connection>& conn, rgw::cache::CacheDriver* cacheDriver, const std::string& _policyName) : policyName(_policyName)
262+
PolicyDriver(std::shared_ptr<connection>& conn, rgw::cache::CacheDriver* cacheDriver, const std::string& _policyName, optional_yield y) : policyName(_policyName)
262263
{
263264
if (policyName == "lfuda") {
264-
cachePolicy = new LFUDAPolicy(conn, cacheDriver);
265+
cachePolicy = new LFUDAPolicy(conn, cacheDriver, y);
265266
} else if (policyName == "lru") {
266267
cachePolicy = new LRUPolicy(cacheDriver);
267268
}

0 commit comments

Comments
 (0)