Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions src/iocore/cache/CacheDir.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#include "tscore/hugepages.h"
#include "tscore/Random.h"
#include "ts/ats_probe.h"

#ifdef LOOP_CHECK_MODE
#define DIR_LOOP_THRESHOLD 1000
Expand Down Expand Up @@ -327,6 +328,9 @@ dir_delete_entry(Dir *e, Dir *p, int s, Directory *directory)
} else {
Dir *n = next_dir(e, seg);
if (n) {
// "Shuffle" here means that we're copying the second entry's data to the head entry's location, and removing the second entry
// - because the head entry can't be moved.
ATS_PROBE3(cache_dir_shuffle, s, dir_to_offset(e, seg), dir_to_offset(n, seg));
dir_assign(e, n);
dir_delete_entry(n, e, s, directory);
return e;
Expand All @@ -339,7 +343,7 @@ dir_delete_entry(Dir *e, Dir *p, int s, Directory *directory)
}

inline void
dir_clean_bucket(Dir *b, int s, Stripe *stripe)
dir_clean_bucket(Dir *b, int s, StripeSM *stripe)
{
Dir *e = b, *p = nullptr;
Dir *seg = stripe->directory.get_segment(s);
Expand All @@ -363,6 +367,8 @@ dir_clean_bucket(Dir *b, int s, Stripe *stripe)
ts::Metrics::Gauge::decrement(cache_rsb.direntries_used);
ts::Metrics::Gauge::decrement(stripe->cache_vol->vol_rsb.direntries_used);
}
// Match cache_dir_remove arguments
ATS_PROBE7(cache_dir_remove_clean_bucket, stripe->fd, s, dir_to_offset(e, seg), dir_offset(e), dir_approx_size(e), 0, 0);
e = dir_delete_entry(e, p, s, &stripe->directory);
continue;
}
Expand All @@ -372,7 +378,7 @@ dir_clean_bucket(Dir *b, int s, Stripe *stripe)
}

void
Directory::clean_segment(int s, Stripe *stripe)
Directory::clean_segment(int s, StripeSM *stripe)
{
Dir *seg = this->get_segment(s);
for (int64_t i = 0; i < this->buckets; i++) {
Expand All @@ -382,7 +388,7 @@ Directory::clean_segment(int s, Stripe *stripe)
}

void
Directory::cleanup(Stripe *stripe)
Directory::cleanup(StripeSM *stripe)
{
for (int64_t i = 0; i < this->segments; i++) {
this->clean_segment(i, stripe);
Expand All @@ -391,7 +397,7 @@ Directory::cleanup(Stripe *stripe)
}

void
Directory::clear_range(off_t start, off_t end, Stripe *stripe)
Directory::clear_range(off_t start, off_t end, StripeSM *stripe)
{
for (off_t i = 0; i < this->entries(); i++) {
Dir *e = dir_index(stripe, i);
Expand Down Expand Up @@ -522,6 +528,8 @@ Directory::probe(const CacheKey *key, StripeSM *stripe, Dir *result, Dir **last_
} else { // delete the invalid entry
ts::Metrics::Gauge::decrement(cache_rsb.direntries_used);
ts::Metrics::Gauge::decrement(stripe->cache_vol->vol_rsb.direntries_used);
ATS_PROBE7(cache_dir_remove_invalid, stripe->fd, s, dir_to_offset(e, seg), dir_offset(e), dir_approx_size(e),
key->slice64(0), key->slice64(1));
e = dir_delete_entry(e, p, s, this);
continue;
}
Expand Down Expand Up @@ -605,6 +613,8 @@ Directory::insert(const CacheKey *key, StripeSM *stripe, Dir *to_part)
ink_assert(stripe->vol_offset(e) < (stripe->skip + stripe->len));
DDbg(dbg_ctl_dir_insert, "insert %p %X into vol %d bucket %d at %p tag %X %X boffset %" PRId64 "", e, key->slice32(0), stripe->fd,
bi, e, key->slice32(1), dir_tag(e), dir_offset(e));
ATS_PROBE7(cache_dir_insert, stripe->fd, s, dir_to_offset(e, seg), dir_offset(e), dir_approx_size(e), key->slice64(0),
key->slice64(1));
CHECK_DIR(d);
stripe->directory.header->dirty = 1;
ts::Metrics::Gauge::increment(cache_rsb.direntries_used);
Expand Down Expand Up @@ -724,9 +734,12 @@ Directory::remove(const CacheKey *key, StripeSM *stripe, Dir *del)
return 0;
}
#endif
if (dir_compare_tag(e, key) && dir_offset(e) == dir_offset(del)) {
int64_t offset = dir_offset(e);
if (dir_compare_tag(e, key) && offset == dir_offset(del)) {
ts::Metrics::Gauge::decrement(cache_rsb.direntries_used);
ts::Metrics::Gauge::decrement(stripe->cache_vol->vol_rsb.direntries_used);
ATS_PROBE7(cache_dir_remove, stripe ? stripe->fd : -1, s, dir_to_offset(e, seg), offset, dir_approx_size(e),
key->slice64(0), key->slice64(1));
dir_delete_entry(e, p, s, this);
CHECK_DIR(d);
return 1;
Expand Down
6 changes: 3 additions & 3 deletions src/iocore/cache/P_CacheDir.h
Original file line number Diff line number Diff line change
Expand Up @@ -292,12 +292,12 @@ struct Directory {
int remove(const CacheKey *key, StripeSM *stripe, Dir *del);
void free_entry(Dir *e, int s);
int check();
void cleanup(Stripe *stripe);
void clear_range(off_t start, off_t end, Stripe *stripe);
void cleanup(StripeSM *stripe);
void clear_range(off_t start, off_t end, StripeSM *stripe);
uint64_t entries_used();
int bucket_length(Dir *b, int s);
int freelist_length(int s);
void clean_segment(int s, Stripe *stripe);
void clean_segment(int s, StripeSM *stripe);
};

inline int
Expand Down
44 changes: 44 additions & 0 deletions tests/gold_tests/ats_probe_cache_dir/cache_dir_probe.bt
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#!/usr/bin/env bpftrace
/** @file

Trace cache directory SystemTap probes.

@section license License

Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

BEGIN
{
printf("cache_dir_probe: ready\n");
}

usdt:*:cache_dir_insert
{
if (@inserted == 0) {
printf("cache_dir_insert\n");
}
@inserted = 1;
}

usdt:*:cache_dir_remove
{
if (@removed == 0) {
printf("cache_dir_remove\n");
}
@removed = 1;
}
122 changes: 122 additions & 0 deletions tests/gold_tests/ats_probe_cache_dir/cache_dir_probe.test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
'''Verify cache directory SystemTap probes fire on insert and remove.'''
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os

Test.Summary = '''Verify cache directory SystemTap probes fire on cache fill and PURGE.'''

# Skipping this test generally because it requires privilege. Thus most CI systems will skip it.
Test.SkipUnless(
Condition(lambda: os.geteuid() == 0, "Test requires privilege", True),
Condition.HasProgram("bpftrace", "Need bpftrace to verify the probe."))


class CacheDirProbeTest:
'''Verify cache directory SystemTap probes.'''
bt_script: str = 'cache_dir_probe.bt'
_cache_path: str = '/cacheable'

def __init__(self):
tr = Test.AddTestRun('Cache directory probes should trigger on insert and purge.')
self._configure_origin(tr)
self._configure_traffic_server(tr)
self._configure_bpftrace(tr)
self._configure_client(tr)

def _configure_origin(self, tr: 'TestRun') -> 'Process':
'''Configure the origin microserver.'''
origin = Test.MakeOriginServer('origin')
self._origin = origin

cache_request = {
"headers": f"GET {self._cache_path} HTTP/1.1\r\nHost: cache-probe.test\r\n\r\n",
"timestamp": "1469733493.993",
"body": ""
}
cache_response = {
"headers": "HTTP/1.1 200 OK\r\nConnection: close\r\nCache-Control: max-age=120\r\nContent-Length: 5\r\n\r\n",
"timestamp": "1469733493.993",
"body": "hello"
}
origin.addResponse("sessionlog.json", cache_request, cache_response)
origin.addResponse("sessionlog.json", cache_request, cache_response)

purge_request = {
"headers": f"PURGE {self._cache_path} HTTP/1.1\r\nHost: cache-probe.test\r\n\r\n",
"timestamp": "1469733493.993",
"body": ""
}
purge_response = {
"headers": "HTTP/1.1 200 OK\r\nConnection: close\r\nContent-Length: 0\r\n\r\n",
"timestamp": "1469733493.993",
"body": ""
}
origin.addResponse("sessionlog.json", purge_request, purge_response)
return origin

def _configure_traffic_server(self, tr: 'TestRun') -> 'Process':
'''Configure the Traffic Server process.'''
ts = tr.MakeATSProcess("ts_cache_dir_probe", enable_cache=True)
self._ts = ts
ts.Disk.records_config.update(
{
'proxy.config.diags.debug.enabled': 1,
'proxy.config.diags.debug.tags': 'http|cache',
'proxy.config.http.cache.required_headers': 0,
# Keep ATS running as the invoking user inside sudo (no privilege drop).
'proxy.config.admin.user_id': '#-1',
})
ts.Disk.remap_config.AddLine(f'map / http://127.0.0.1:{self._origin.Variables.Port}')
return ts

def _configure_bpftrace(self, tr: 'TestRun') -> 'Process':
'''Configure the bpftrace process for the cache directory probes.'''
bpftrace = tr.Processes.Process('bpftrace')
self._bpftrace = bpftrace

tr.Setup.Copy(self.bt_script)
tr_script = os.path.join(tr.RunDirectory, self.bt_script)

# fan out output so AuTest stream checks still work
tee_path = os.path.join(tr.RunDirectory, 'bpftrace.out')
bpftrace.Command = f"bpftrace {tr_script}"
bpftrace.ReturnCode = 0
bpftrace.Streams.All += Testers.ContainsExpression('cache_dir_insert', 'cache_dir_insert probe fired.')
bpftrace.Streams.All += Testers.ContainsExpression('cache_dir_remove', 'cache_dir_remove probe fired.')

return bpftrace

def _configure_client(self, tr: 'TestRun') -> 'Process':
'''Configure the client traffic to exercise cache insert and purge.'''
client = tr.Processes.Default
self._client = client
cache_url = f"http://127.0.0.1:{self._ts.Variables.port}{self._cache_path}"
# Ideally we don't need this "sleep 1", but I haven't been able to get it to work with the Ready = When.FileContains(...) approach.
client.Command = (
f"sleep 1 && curl -sSf -o /dev/null -H 'Host: cache-probe.test' {cache_url} && "
f"curl -sSf -o /dev/null -X PURGE -H 'Host: cache-probe.test' {cache_url} && "
f"curl -sSf -o /dev/null -H 'Host: cache-probe.test' {cache_url}")
client.ReturnCode = 0
client.Env = self._ts.Env

self._ts.StartBefore(self._origin) # origin before ts
self._bpftrace.StartBefore(self._ts) # ts before bpftrace
client.StartBefore(self._bpftrace) # bpftrace before client
return client


CacheDirProbeTest()