From b46da10abf7648069a75873b1781c53f30d84d0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paul=20R=C3=BCtter?= Date: Fri, 5 Sep 2025 15:22:10 +0200 Subject: [PATCH 1/2] Based on https://github.com/apache/solr/commit/f589607e700a94544fc546f5d641b5bfd695f32b, backport changes to latest solr 8.11.4 --- solr/CHANGES.txt | 4 +- .../DistributedZkUpdateProcessor.java | 108 ++++++++++-------- .../cloud/FullSolrCloudDistribCmdsTest.java | 36 +++++- .../solr/common/cloud/CompositeIdRouter.java | 12 ++ 4 files changed, 108 insertions(+), 52 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 1a93b050c183..fbb0c93a6225 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -10,7 +10,9 @@ Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this r Bug Fixes --------------------- -(No changes) + +* SOLR-15705: A delete-by-id command is forwarded to all shards when using the CompositeId router with a router field + and the route is missing from the command. (Michael Kosten via Christine Poerschke, David Smiley, Eric Pugh) ================== 8.11.4 ================== diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java index fca6b77bbfc8..4adca281812c 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java @@ -82,6 +82,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor { private Set skippedCoreNodeNames; private final String collection; private boolean readOnlyCollection = false; + private boolean broadcastDeleteById = false; // The cached immutable clusterState for the update... usually refreshed for each individual update. // Different parts of this class used to request current clusterState views, which lead to subtle bugs and race conditions @@ -317,6 +318,14 @@ public void processDelete(DeleteUpdateCommand cmd) throws IOException { protected void doDeleteById(DeleteUpdateCommand cmd) throws IOException { setupRequest(cmd); + if (broadcastDeleteById && DistribPhase.NONE == DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM))) { + DocCollection coll = clusterState.getCollection(collection); + if (log.isDebugEnabled()) { + log.debug("The deleteById command for doc {} is missing the required route, broadcasting to leaders of other shards", cmd.getId()); + } + forwardDelete(coll, cmd); + } + // check if client has requested minimum replication factor information. will set replicationTracker to null if // we aren't the leader or subShardLeader checkReplicationTracker(cmd); @@ -388,52 +397,7 @@ protected void doDeleteByQuery(DeleteUpdateCommand cmd) throws IOException { if (rollupReplicationTracker == null) { rollupReplicationTracker = new RollupRequestReplicationTracker(); } - boolean leaderForAnyShard = false; // start off by assuming we are not a leader for any shard - - ModifiableSolrParams outParams = new ModifiableSolrParams(filterParams(req.getParams())); - outParams.set(DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString()); - outParams.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( - zkController.getBaseUrl(), req.getCore().getName())); - - SolrParams params = req.getParams(); - String route = params.get(ShardParams._ROUTE_); - Collection slices = coll.getRouter().getSearchSlices(route, params, coll); - - List leaders = new ArrayList<>(slices.size()); - for (Slice slice : slices) { - String sliceName = slice.getName(); - Replica leader; - try { - leader = zkController.getZkStateReader().getLeaderRetry(collection, sliceName); - } catch (InterruptedException e) { - throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Exception finding leader for shard " + sliceName, e); - } - - // TODO: What if leaders changed in the meantime? - // should we send out slice-at-a-time and if a node returns "hey, I'm not a leader" (or we get an error because it went down) then look up the new leader? - - // Am I the leader for this slice? - ZkCoreNodeProps coreLeaderProps = new ZkCoreNodeProps(leader); - String leaderCoreNodeName = leader.getName(); - String coreNodeName = cloudDesc.getCoreNodeName(); - isLeader = coreNodeName.equals(leaderCoreNodeName); - - if (isLeader) { - // don't forward to ourself - leaderForAnyShard = true; - } else { - leaders.add(new SolrCmdDistributor.ForwardNode(coreLeaderProps, zkController.getZkStateReader(), collection, sliceName, maxRetriesOnForward)); - } - } - - outParams.remove("commit"); // this will be distributed from the local commit - - - if (params.get(UpdateRequest.MIN_REPFACT) != null) { - // TODO: Kept this for rolling upgrades. Remove in Solr 9 - outParams.add(UpdateRequest.MIN_REPFACT, req.getParams().get(UpdateRequest.MIN_REPFACT)); - } - cmdDistrib.distribDelete(cmd, leaders, outParams, false, rollupReplicationTracker, null); + boolean leaderForAnyShard = forwardDelete(coll, cmd); if (!leaderForAnyShard) { return; @@ -458,6 +422,53 @@ protected void doDeleteByQuery(DeleteUpdateCommand cmd) throws IOException { super.doDeleteByQuery(cmd, replicas, coll); } + private boolean forwardDelete(DocCollection coll, DeleteUpdateCommand cmd) throws IOException { + + boolean leaderForAnyShard = false; // start off by assuming we are not a leader for any shard + + ModifiableSolrParams outParams = new ModifiableSolrParams(filterParams(req.getParams())); + outParams.set(DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString()); + outParams.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( + zkController.getBaseUrl(), req.getCore().getName())); + + SolrParams params = req.getParams(); + String route = params.get(ShardParams._ROUTE_); + Collection slices = coll.getRouter().getSearchSlices(route, params, coll); + + List leaders = new ArrayList<>(slices.size()); + for (Slice slice : slices) { + String sliceName = slice.getName(); + Replica leader; + try { + leader = zkController.getZkStateReader().getLeaderRetry(collection, sliceName); + } catch (InterruptedException e) { + throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Exception finding leader for shard " + sliceName, e); + } + + // TODO: What if leaders changed in the meantime? + // should we send out slice-at-a-time and if a node returns "hey, I'm not a leader" (or we get an error because it went down) then look up the new leader? + + // Am I the leader for this slice? + ZkCoreNodeProps coreLeaderProps = new ZkCoreNodeProps(leader); + String leaderCoreNodeName = leader.getName(); + String coreNodeName = cloudDesc.getCoreNodeName(); + isLeader = coreNodeName.equals(leaderCoreNodeName); + + if (isLeader) { + // don't forward to ourself + leaderForAnyShard = true; + } else { + leaders.add(new SolrCmdDistributor.ForwardNode(coreLeaderProps, zkController.getZkStateReader(), collection, sliceName, maxRetriesOnForward)); + } + } + + outParams.remove("commit"); // this will be distributed from the local commit + + cmdDistrib.distribDelete(cmd, leaders, outParams, false, rollupReplicationTracker, null); + + return leaderForAnyShard; + } + @Override protected void doDistribDeleteByQuery(DeleteUpdateCommand cmd, List replicas, DocCollection coll) throws IOException { @@ -619,6 +630,11 @@ protected List setupRequest(String id, SolrInputDocumen } } + // if doc == null, then this is a DeleteById request with missing route, flag for forwarding to all shard leaders + if (doc == null && coll.getRouter() instanceof CompositeIdRouter && coll.getActiveSlicesMap().size() > 1) { + broadcastDeleteById = true; + } + DistribPhase phase = DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)); diff --git a/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java b/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java index 5238704be422..e426c1204f27 100644 --- a/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java @@ -249,19 +249,24 @@ public void testDeleteByIdCompositeRouterWithRouterField() throws Exception { try (SolrClient shard1 = getHttpSolrClient(docCol.getSlice("shard1").getLeader().getCoreUrl()); SolrClient shard2 = getHttpSolrClient(docCol.getSlice("shard2").getLeader().getCoreUrl())) { - // Add three documents w/diff routes (all sent to shard1 leader's core) + // Add six documents w/diff routes (all sent to shard1 leader's core) shard1.add(sdoc("id", "1", "routefield_s", "europe")); shard1.add(sdoc("id", "3", "routefield_s", "europe")); shard1.add(sdoc("id", "5", "routefield_s", "africa")); + shard1.add(sdoc("id", "7", "routefield_s", "europe")); + shard1.add(sdoc("id", "9", "routefield_s", "europe")); + shard1.add(sdoc("id", "11", "routefield_s", "africa")); shard1.commit(); - // Add two documents w/diff routes (all sent to shard2 leader's core) + // Add four documents w/diff routes (all sent to shard2 leader's core) + shard2.add(sdoc("id", "8", "routefield_s", "africa")); + shard2.add(sdoc("id", "6", "routefield_s", "europe")); shard2.add(sdoc("id", "4", "routefield_s", "africa")); shard2.add(sdoc("id", "2", "routefield_s", "europe")); shard2.commit(); - - final AtomicInteger docCountsEurope = new AtomicInteger(3); - final AtomicInteger docCountsAfrica = new AtomicInteger(2); + + final AtomicInteger docCountsEurope = new AtomicInteger(6); + final AtomicInteger docCountsAfrica = new AtomicInteger(4); // A re-usable helper to verify that the expected number of documents can be found based on _route_ key... Runnable checkShardCounts = () -> { @@ -298,6 +303,27 @@ public void testDeleteByIdCompositeRouterWithRouterField() throws Exception { docCountsAfrica.decrementAndGet(); } checkShardCounts.run(); + + // Tests for distributing delete by id when route is missing from the request + { // Send a delete request with no route to shard1 for document on shard2, should be distributed + final UpdateRequest deleteRequest = new UpdateRequest(); + deleteRequest.deleteById("8"); + shard1.request(deleteRequest); + shard1.commit(); + docCountsAfrica.decrementAndGet(); + } + checkShardCounts.run(); + + { // Multiple deleteById commands with missing route in a single request, should be distributed + final UpdateRequest deleteRequest = new UpdateRequest(); + deleteRequest.deleteById("6"); + deleteRequest.deleteById("11"); + shard1.request(deleteRequest); + shard1.commit(); + docCountsEurope.decrementAndGet(); + docCountsAfrica.decrementAndGet(); + } + checkShardCounts.run(); } } diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java b/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java index 4b7acc1a3624..17501d4c8ef6 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java @@ -101,6 +101,18 @@ public int sliceHash(String id, SolrInputDocument doc, SolrParams params, DocCol return new KeyParser(id).getHash(); } + @Override + public Slice getTargetSlice(String id, SolrInputDocument sdoc, String route, SolrParams params, DocCollection collection) { + // if this is a delete-by-id (sdoc==null), then return null if the route is missing and there is a route field defined. + // otherwise, we will return the slice using the hash on the id + if (sdoc == null && route == null) { + String shardFieldName = getRouteField(collection); + if (shardFieldName != null) { + return null; + } + } + return super.getTargetSlice(id, sdoc, route, params, collection); + } /** * Get Range for a given CompositeId based route key From 38d2b905e2bb19944029a9601f9891820f721d74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paul=20R=C3=BCtter?= Date: Mon, 8 Sep 2025 11:45:20 +0200 Subject: [PATCH 2/2] Update actions --- .github/workflows/ant.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ant.yml b/.github/workflows/ant.yml index 189a6f697238..7fec777dbc9f 100644 --- a/.github/workflows/ant.yml +++ b/.github/workflows/ant.yml @@ -17,7 +17,7 @@ jobs: java-version: 1.8 - name: Cache Ivy packages - uses: actions/cache@v2 + uses: actions/cache@v3 with: path: ~/.ivy2/cache/ key: ${{ runner.os }}-ant-precommit-${{ hashFiles('**/ivy-versions.properties') }}