Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ant.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
java-version: 1.8

- name: Cache Ivy packages
uses: actions/cache@v2
uses: actions/cache@v3
with:
path: ~/.ivy2/cache/
key: ${{ runner.os }}-ant-precommit-${{ hashFiles('**/ivy-versions.properties') }}
Expand Down
4 changes: 3 additions & 1 deletion solr/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this r

Bug Fixes
---------------------
(No changes)

* SOLR-15705: A delete-by-id command is forwarded to all shards when using the CompositeId router with a router field
and the route is missing from the command. (Michael Kosten via Christine Poerschke, David Smiley, Eric Pugh)

================== 8.11.4 ==================

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
private Set<String> skippedCoreNodeNames;
private final String collection;
private boolean readOnlyCollection = false;
private boolean broadcastDeleteById = false;

// The cached immutable clusterState for the update... usually refreshed for each individual update.
// Different parts of this class used to request current clusterState views, which lead to subtle bugs and race conditions
Expand Down Expand Up @@ -317,6 +318,14 @@ public void processDelete(DeleteUpdateCommand cmd) throws IOException {
protected void doDeleteById(DeleteUpdateCommand cmd) throws IOException {
setupRequest(cmd);

if (broadcastDeleteById && DistribPhase.NONE == DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM))) {
DocCollection coll = clusterState.getCollection(collection);
if (log.isDebugEnabled()) {
log.debug("The deleteById command for doc {} is missing the required route, broadcasting to leaders of other shards", cmd.getId());
}
forwardDelete(coll, cmd);
}

// check if client has requested minimum replication factor information. will set replicationTracker to null if
// we aren't the leader or subShardLeader
checkReplicationTracker(cmd);
Expand Down Expand Up @@ -388,52 +397,7 @@ protected void doDeleteByQuery(DeleteUpdateCommand cmd) throws IOException {
if (rollupReplicationTracker == null) {
rollupReplicationTracker = new RollupRequestReplicationTracker();
}
boolean leaderForAnyShard = false; // start off by assuming we are not a leader for any shard

ModifiableSolrParams outParams = new ModifiableSolrParams(filterParams(req.getParams()));
outParams.set(DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString());
outParams.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName()));

SolrParams params = req.getParams();
String route = params.get(ShardParams._ROUTE_);
Collection<Slice> slices = coll.getRouter().getSearchSlices(route, params, coll);

List<SolrCmdDistributor.Node> leaders = new ArrayList<>(slices.size());
for (Slice slice : slices) {
String sliceName = slice.getName();
Replica leader;
try {
leader = zkController.getZkStateReader().getLeaderRetry(collection, sliceName);
} catch (InterruptedException e) {
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Exception finding leader for shard " + sliceName, e);
}

// TODO: What if leaders changed in the meantime?
// should we send out slice-at-a-time and if a node returns "hey, I'm not a leader" (or we get an error because it went down) then look up the new leader?

// Am I the leader for this slice?
ZkCoreNodeProps coreLeaderProps = new ZkCoreNodeProps(leader);
String leaderCoreNodeName = leader.getName();
String coreNodeName = cloudDesc.getCoreNodeName();
isLeader = coreNodeName.equals(leaderCoreNodeName);

if (isLeader) {
// don't forward to ourself
leaderForAnyShard = true;
} else {
leaders.add(new SolrCmdDistributor.ForwardNode(coreLeaderProps, zkController.getZkStateReader(), collection, sliceName, maxRetriesOnForward));
}
}

outParams.remove("commit"); // this will be distributed from the local commit


if (params.get(UpdateRequest.MIN_REPFACT) != null) {
// TODO: Kept this for rolling upgrades. Remove in Solr 9
outParams.add(UpdateRequest.MIN_REPFACT, req.getParams().get(UpdateRequest.MIN_REPFACT));
}
cmdDistrib.distribDelete(cmd, leaders, outParams, false, rollupReplicationTracker, null);
boolean leaderForAnyShard = forwardDelete(coll, cmd);

if (!leaderForAnyShard) {
return;
Expand All @@ -458,6 +422,53 @@ protected void doDeleteByQuery(DeleteUpdateCommand cmd) throws IOException {
super.doDeleteByQuery(cmd, replicas, coll);
}

private boolean forwardDelete(DocCollection coll, DeleteUpdateCommand cmd) throws IOException {

boolean leaderForAnyShard = false; // start off by assuming we are not a leader for any shard

ModifiableSolrParams outParams = new ModifiableSolrParams(filterParams(req.getParams()));
outParams.set(DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString());
outParams.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName()));

SolrParams params = req.getParams();
String route = params.get(ShardParams._ROUTE_);
Collection<Slice> slices = coll.getRouter().getSearchSlices(route, params, coll);

List<SolrCmdDistributor.Node> leaders = new ArrayList<>(slices.size());
for (Slice slice : slices) {
String sliceName = slice.getName();
Replica leader;
try {
leader = zkController.getZkStateReader().getLeaderRetry(collection, sliceName);
} catch (InterruptedException e) {
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Exception finding leader for shard " + sliceName, e);
}

// TODO: What if leaders changed in the meantime?
// should we send out slice-at-a-time and if a node returns "hey, I'm not a leader" (or we get an error because it went down) then look up the new leader?

// Am I the leader for this slice?
ZkCoreNodeProps coreLeaderProps = new ZkCoreNodeProps(leader);
String leaderCoreNodeName = leader.getName();
String coreNodeName = cloudDesc.getCoreNodeName();
isLeader = coreNodeName.equals(leaderCoreNodeName);

if (isLeader) {
// don't forward to ourself
leaderForAnyShard = true;
} else {
leaders.add(new SolrCmdDistributor.ForwardNode(coreLeaderProps, zkController.getZkStateReader(), collection, sliceName, maxRetriesOnForward));
}
}

outParams.remove("commit"); // this will be distributed from the local commit

cmdDistrib.distribDelete(cmd, leaders, outParams, false, rollupReplicationTracker, null);

return leaderForAnyShard;
}

@Override
protected void doDistribDeleteByQuery(DeleteUpdateCommand cmd, List<SolrCmdDistributor.Node> replicas,
DocCollection coll) throws IOException {
Expand Down Expand Up @@ -619,6 +630,11 @@ protected List<SolrCmdDistributor.Node> setupRequest(String id, SolrInputDocumen
}
}

// if doc == null, then this is a DeleteById request with missing route, flag for forwarding to all shard leaders
if (doc == null && coll.getRouter() instanceof CompositeIdRouter && coll.getActiveSlicesMap().size() > 1) {
broadcastDeleteById = true;
}

DistribPhase phase =
DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,19 +249,24 @@ public void testDeleteByIdCompositeRouterWithRouterField() throws Exception {
try (SolrClient shard1 = getHttpSolrClient(docCol.getSlice("shard1").getLeader().getCoreUrl());
SolrClient shard2 = getHttpSolrClient(docCol.getSlice("shard2").getLeader().getCoreUrl())) {

// Add three documents w/diff routes (all sent to shard1 leader's core)
// Add six documents w/diff routes (all sent to shard1 leader's core)
shard1.add(sdoc("id", "1", "routefield_s", "europe"));
shard1.add(sdoc("id", "3", "routefield_s", "europe"));
shard1.add(sdoc("id", "5", "routefield_s", "africa"));
shard1.add(sdoc("id", "7", "routefield_s", "europe"));
shard1.add(sdoc("id", "9", "routefield_s", "europe"));
shard1.add(sdoc("id", "11", "routefield_s", "africa"));
shard1.commit();

// Add two documents w/diff routes (all sent to shard2 leader's core)
// Add four documents w/diff routes (all sent to shard2 leader's core)
shard2.add(sdoc("id", "8", "routefield_s", "africa"));
shard2.add(sdoc("id", "6", "routefield_s", "europe"));
shard2.add(sdoc("id", "4", "routefield_s", "africa"));
shard2.add(sdoc("id", "2", "routefield_s", "europe"));
shard2.commit();
final AtomicInteger docCountsEurope = new AtomicInteger(3);
final AtomicInteger docCountsAfrica = new AtomicInteger(2);

final AtomicInteger docCountsEurope = new AtomicInteger(6);
final AtomicInteger docCountsAfrica = new AtomicInteger(4);

// A re-usable helper to verify that the expected number of documents can be found based on _route_ key...
Runnable checkShardCounts = () -> {
Expand Down Expand Up @@ -298,6 +303,27 @@ public void testDeleteByIdCompositeRouterWithRouterField() throws Exception {
docCountsAfrica.decrementAndGet();
}
checkShardCounts.run();

// Tests for distributing delete by id when route is missing from the request
{ // Send a delete request with no route to shard1 for document on shard2, should be distributed
final UpdateRequest deleteRequest = new UpdateRequest();
deleteRequest.deleteById("8");
shard1.request(deleteRequest);
shard1.commit();
docCountsAfrica.decrementAndGet();
}
checkShardCounts.run();

{ // Multiple deleteById commands with missing route in a single request, should be distributed
final UpdateRequest deleteRequest = new UpdateRequest();
deleteRequest.deleteById("6");
deleteRequest.deleteById("11");
shard1.request(deleteRequest);
shard1.commit();
docCountsEurope.decrementAndGet();
docCountsAfrica.decrementAndGet();
}
checkShardCounts.run();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,18 @@ public int sliceHash(String id, SolrInputDocument doc, SolrParams params, DocCol
return new KeyParser(id).getHash();
}

@Override
public Slice getTargetSlice(String id, SolrInputDocument sdoc, String route, SolrParams params, DocCollection collection) {
// if this is a delete-by-id (sdoc==null), then return null if the route is missing and there is a route field defined.
// otherwise, we will return the slice using the hash on the id
if (sdoc == null && route == null) {
String shardFieldName = getRouteField(collection);
if (shardFieldName != null) {
return null;
}
}
return super.getTargetSlice(id, sdoc, route, params, collection);
}

/**
* Get Range for a given CompositeId based route key
Expand Down