diff --git a/hibernate-core/src/main/java/org/hibernate/engine/jdbc/connections/spi/DataSourceBasedMultiTenantConnectionProviderImpl.java b/hibernate-core/src/main/java/org/hibernate/engine/jdbc/connections/spi/DataSourceBasedMultiTenantConnectionProviderImpl.java
index 5a5d715bbd72..ca33bdd1912e 100644
--- a/hibernate-core/src/main/java/org/hibernate/engine/jdbc/connections/spi/DataSourceBasedMultiTenantConnectionProviderImpl.java
+++ b/hibernate-core/src/main/java/org/hibernate/engine/jdbc/connections/spi/DataSourceBasedMultiTenantConnectionProviderImpl.java
@@ -24,7 +24,7 @@
 import static org.hibernate.cfg.MultiTenancySettings.TENANT_IDENTIFIER_TO_USE_FOR_ANY_KEY;
 
 /**
- * A concrete implementation of the {@link MultiTenantConnectionProvider} contract bases on
+ * A concrete implementation of the {@link MultiTenantConnectionProvider} contract based on
  * a number of reasonable assumptions. We assume that:
  *     - 
  *         The {@link DataSource} instances are all available from JNDI named by the tenant
@@ -70,8 +70,8 @@ private Map dataSourceMap() {
 
 	@Override
 	public void injectServices(ServiceRegistryImplementor serviceRegistry) {
-		final ConfigurationService configurationService = serviceRegistry.requireService( ConfigurationService.class );
-		final Object dataSourceConfigValue = configurationService.getSettings().get( DATASOURCE );
+		final var settings = serviceRegistry.requireService( ConfigurationService.class ).getSettings();
+		final Object dataSourceConfigValue = settings.get( DATASOURCE );
 		if ( !(dataSourceConfigValue instanceof String configuredJndiName) ) {
 			throw new HibernateException( "illegal value for configuration setting '" + DATASOURCE + "'" );
 		}
@@ -82,21 +82,21 @@ public void injectServices(ServiceRegistryImplementor serviceRegistry) {
 			throw new HibernateException( "Could not locate JndiService from DataSourceBasedMultiTenantConnectionProviderImpl" );
 		}
 
-		final Object namedObject = jndiService.locate( this.jndiName );
+		final Object namedObject = jndiService.locate( jndiName );
 		if ( namedObject == null ) {
-			throw new HibernateException( "JNDI name [" + this.jndiName + "] could not be resolved" );
+			throw new HibernateException( "JNDI name [" + jndiName + "] could not be resolved" );
 		}
 		else if ( namedObject instanceof DataSource datasource ) {
-			final int loc = this.jndiName.lastIndexOf( '/' );
-			baseJndiNamespace = this.jndiName.substring( 0, loc );
-			final String prefix = this.jndiName.substring( loc + 1);
+			final int loc = jndiName.lastIndexOf( '/' );
+			baseJndiNamespace = jndiName.substring( 0, loc );
+			final String prefix = jndiName.substring( loc + 1);
 			tenantIdentifierForAny = (T) prefix;
 			dataSourceMap().put( tenantIdentifierForAny, datasource );
 		}
 		else if ( namedObject instanceof Context ) {
-			baseJndiNamespace = this.jndiName;
+			baseJndiNamespace = jndiName;
 			final Object configuredTenantId =
-					configurationService.getSettings().get( TENANT_IDENTIFIER_TO_USE_FOR_ANY_KEY );
+					settings.get( TENANT_IDENTIFIER_TO_USE_FOR_ANY_KEY );
 			tenantIdentifierForAny = (T) configuredTenantId;
 			if ( tenantIdentifierForAny == null ) {
 				throw new HibernateException( "JNDI name named a Context, but tenant identifier to use for ANY was not specified" );
@@ -105,7 +105,7 @@ else if ( namedObject instanceof Context ) {
 		else {
 			throw new HibernateException(
 					"Unknown object type [" + namedObject.getClass().getName() +
-					"] found in JNDI location [" + this.jndiName + "]"
+					"] found in JNDI location [" + jndiName + "]"
 			);
 		}
 	}
diff --git a/hibernate-core/src/main/java/org/hibernate/engine/jdbc/connections/spi/SchemaBasedMultiTenantConnectionProviderImpl.java b/hibernate-core/src/main/java/org/hibernate/engine/jdbc/connections/spi/SchemaBasedMultiTenantConnectionProviderImpl.java
new file mode 100644
index 000000000000..efede1a91887
--- /dev/null
+++ b/hibernate-core/src/main/java/org/hibernate/engine/jdbc/connections/spi/SchemaBasedMultiTenantConnectionProviderImpl.java
@@ -0,0 +1,101 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright Red Hat Inc. and Hibernate Authors
+ */
+package org.hibernate.engine.jdbc.connections.spi;
+
+import org.hibernate.HibernateException;
+import org.hibernate.engine.config.spi.ConfigurationService;
+import org.hibernate.engine.jndi.spi.JndiService;
+import org.hibernate.service.spi.ServiceRegistryAwareService;
+import org.hibernate.service.spi.ServiceRegistryImplementor;
+import org.hibernate.service.spi.Stoppable;
+
+import javax.sql.DataSource;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import static org.hibernate.cfg.JdbcSettings.DATASOURCE;
+import static org.hibernate.cfg.MultiTenancySettings.TENANT_IDENTIFIER_TO_USE_FOR_ANY_KEY;
+
+/**
+ * A {@link MultiTenantConnectionProvider} backed by a single {@link DataSource},
+ * where each tenant has a dedicated schema, and {@link Connection#setSchema(String)}
+ * is used to select the appropriate schema. The name of the database schema must
+ * match the {@linkplain org.hibernate.context.spi.CurrentTenantIdentifierResolver
+ * tenant id}, which must be a string.
+ *
+ * @author Gavin King
+ * @since 7.1
+ */
+public class SchemaBasedMultiTenantConnectionProviderImpl
+		extends AbstractDataSourceBasedMultiTenantConnectionProviderImpl
+		implements ServiceRegistryAwareService, Stoppable {
+
+	private DataSource dataSource;
+	private String tenantIdentifierForAny;
+
+	@Override
+	protected DataSource selectAnyDataSource() {
+		return dataSource;
+	}
+
+	@Override
+	protected DataSource selectDataSource(String tenantIdentifier) {
+		return selectAnyDataSource();
+	}
+
+	@Override
+	public Connection getAnyConnection() throws SQLException {
+		final Connection connection = super.getAnyConnection();
+		if ( tenantIdentifierForAny != null ) {
+			connection.setSchema( tenantIdentifierForAny );
+		}
+		return connection;
+	}
+
+	@Override
+	public Connection getConnection(String tenantIdentifier) throws SQLException {
+		final Connection connection = super.getConnection( tenantIdentifier );
+		connection.setSchema( tenantIdentifier );
+		return connection;
+	}
+
+	@Override
+	public void injectServices(ServiceRegistryImplementor serviceRegistry) {
+		final var settings = serviceRegistry.requireService( ConfigurationService.class ).getSettings();
+		final Object dataSourceConfigValue = settings.get( DATASOURCE );
+		if ( !(dataSourceConfigValue instanceof String configuredJndiName) ) {
+			throw new HibernateException( "illegal value for configuration setting '" + DATASOURCE + "'" );
+		}
+
+		final JndiService jndiService = serviceRegistry.getService( JndiService.class );
+		if ( jndiService == null ) {
+			throw new HibernateException( "Could not locate JndiService from DataSourceBasedMultiTenantConnectionProviderImpl" );
+		}
+
+		final Object namedObject = jndiService.locate( configuredJndiName );
+		if ( namedObject == null ) {
+			throw new HibernateException( "JNDI name [" + configuredJndiName + "] could not be resolved" );
+		}
+		else if ( namedObject instanceof DataSource datasource ) {
+			dataSource = datasource;
+		}
+		else {
+			throw new HibernateException(
+					"Unknown object type [" + namedObject.getClass().getName() +
+					"] found in JNDI location [" + configuredJndiName + "]"
+			);
+		}
+
+		final Object configuredTenantId =
+				settings.get( TENANT_IDENTIFIER_TO_USE_FOR_ANY_KEY );
+		tenantIdentifierForAny = (String) configuredTenantId;
+	}
+
+	@Override
+	public void stop() {
+		dataSource = null;
+	}
+}