diff --git a/connect-file-pulse-filesystems/filepulse-smb-fs/pom.xml b/connect-file-pulse-filesystems/filepulse-smb-fs/pom.xml
new file mode 100644
index 00000000..662b21dd
--- /dev/null
+++ b/connect-file-pulse-filesystems/filepulse-smb-fs/pom.xml
@@ -0,0 +1,69 @@
+
+
+
+
+ io.streamthoughts
+ kafka-connect-filepulse-filesystems
+ 2.17.0-SNAPSHOT
+
+ 4.0.0
+
+ Kafka Connect Source File Pulse SMB FS
+ kafka-connect-filepulse-smb-fs
+
+
+ ${project.parent.basedir}/..
+ ${project.parent.basedir}/../header
+ 2.1.40
+ 5.5.0
+ 3.26.3
+ 1.18.34
+
+
+
+
+ org.codelibs
+ jcifs
+ ${jcifs.version}
+
+
+ io.streamthoughts
+ kafka-connect-filepulse-commons-fs
+ ${project.version}
+
+
+ org.apache.commons
+ commons-compress
+
+
+ org.apache.avro
+ avro
+
+
+
+ org.mockito
+ mockito-junit-jupiter
+ ${mockito-junit-jupiter.version}
+ test
+
+
+ org.assertj
+ assertj-core
+ ${assertj-core.version}
+ test
+
+
+ org.projectlombok
+ lombok
+ ${lombok.version}
+ test
+
+
+
diff --git a/connect-file-pulse-filesystems/filepulse-smb-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/SmbFileStorage.java b/connect-file-pulse-filesystems/filepulse-smb-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/SmbFileStorage.java
new file mode 100644
index 00000000..59d92171
--- /dev/null
+++ b/connect-file-pulse-filesystems/filepulse-smb-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/SmbFileStorage.java
@@ -0,0 +1,103 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright (c) StreamThoughts
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.streamthoughts.kafka.connect.filepulse.fs;
+
+import io.streamthoughts.kafka.connect.filepulse.errors.ConnectFilePulseException;
+import io.streamthoughts.kafka.connect.filepulse.fs.client.SmbClient;
+import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
+import java.io.InputStream;
+import java.net.URI;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of {@link Storage} for SMB/CIFS file systems.
+ */
+public class SmbFileStorage implements Storage {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SmbFileStorage.class);
+
+ private final SmbClient smbClient;
+
+ public SmbFileStorage(SmbFileSystemListingConfig config) {
+ this.smbClient = new SmbClient(config);
+ }
+
+ SmbFileStorage(SmbClient smbClient) {
+ this.smbClient = smbClient;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public FileObjectMeta getObjectMetadata(URI uri) {
+ LOG.debug("Getting object metadata for '{}'", uri);
+ try {
+ return smbClient.getObjectMetadata(uri);
+ } catch (Exception e) {
+ throw new ConnectFilePulseException(String.format("Cannot stat file with uri: %s", uri), e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean exists(URI uri) {
+ LOG.debug("Checking if '{}' exists", uri);
+ try {
+ return smbClient.exists(uri);
+ } catch (Exception e) {
+ throw new ConnectFilePulseException(
+ String.format("Failed to check if SMB file exists: %s", uri), e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean delete(URI uri) {
+ LOG.info("Deleting '{}'", uri);
+ try {
+ return smbClient.delete(uri);
+ } catch (Exception e) {
+ LOG.error("Failed to delete SMB file: {}", uri, e);
+ throw new ConnectFilePulseException("Failed to delete SMB file: " + uri, e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean move(URI source, URI dest) {
+ LOG.info("Moving '{}' to '{}'", source, dest);
+ try {
+ return smbClient.move(source, dest);
+ } catch (Exception e) {
+ LOG.error("Failed to move SMB file from {} to {}", source, dest, e);
+ throw new ConnectFilePulseException(
+ String.format("Failed to move SMB file from %s to %s", source, dest), e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public InputStream getInputStream(URI uri) {
+ LOG.debug("Getting input stream for '{}'", uri);
+ try {
+ return smbClient.getInputStream(uri);
+ } catch (Exception e) {
+ LOG.error("Failed to get input stream for SMB file: {}", uri, e);
+ throw new ConnectFilePulseException("Failed to get input stream for SMB file: " + uri, e);
+ }
+ }
+}
diff --git a/connect-file-pulse-filesystems/filepulse-smb-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/SmbFileSystemListing.java b/connect-file-pulse-filesystems/filepulse-smb-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/SmbFileSystemListing.java
new file mode 100644
index 00000000..314da0bf
--- /dev/null
+++ b/connect-file-pulse-filesystems/filepulse-smb-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/SmbFileSystemListing.java
@@ -0,0 +1,99 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright (c) StreamThoughts
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.streamthoughts.kafka.connect.filepulse.fs;
+
+import io.streamthoughts.kafka.connect.filepulse.fs.client.SmbClient;
+import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of {@link FileSystemListing} for SMB/CIFS file systems.
+ */
+public class SmbFileSystemListing implements FileSystemListing {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SmbFileSystemListing.class);
+
+ private FileListFilter filter;
+ private SmbFileSystemListingConfig config;
+ private SmbClient smbClient;
+
+ public SmbFileSystemListing(final List filters) {
+ Objects.requireNonNull(filters, "filters can't be null");
+ this.filter = new CompositeFileListFilter(filters);
+ }
+
+ @SuppressWarnings("unused")
+ public SmbFileSystemListing() {
+ this(Collections.emptyList());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void configure(final Map configs) {
+ LOG.debug("Configuring SmbFilesystemListing");
+ config = new SmbFileSystemListingConfig(configs);
+ smbClient = new SmbClient(config);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Collection listObjects() {
+ String listingDirectoryPath = getConfig().getSmbDirectoryPath();
+
+ LOG.info("Listing SMB files in directory: smb://{}/{}{}",
+ getConfig().getSmbHost(),
+ getConfig().getSmbShare(),
+ listingDirectoryPath);
+
+ List filesMetadata = getSmbClient()
+ .listFiles(listingDirectoryPath)
+ .collect(Collectors.toList());
+
+ LOG.info("Found {} files in SMB directory before filtering", filesMetadata.size());
+
+ Collection filteredFiles = filter.filterFiles(filesMetadata);
+
+ LOG.info("Returning {} files after filtering", filteredFiles.size());
+
+ return filteredFiles;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void setFilter(FileListFilter filter) {
+ this.filter = filter;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public SmbFileStorage storage() {
+ return new SmbFileStorage(config);
+ }
+
+ SmbClient getSmbClient() {
+ return smbClient;
+ }
+
+ SmbFileSystemListingConfig getConfig() {
+ return config;
+ }
+}
diff --git a/connect-file-pulse-filesystems/filepulse-smb-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/SmbFileSystemListingConfig.java b/connect-file-pulse-filesystems/filepulse-smb-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/SmbFileSystemListingConfig.java
new file mode 100644
index 00000000..c4e013ba
--- /dev/null
+++ b/connect-file-pulse-filesystems/filepulse-smb-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/SmbFileSystemListingConfig.java
@@ -0,0 +1,270 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright (c) StreamThoughts
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.streamthoughts.kafka.connect.filepulse.fs;
+
+import java.util.Map;
+import jcifs.SmbConstants;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+
+/**
+ * Configuration for SMB filesystem listing.
+ */
+public class SmbFileSystemListingConfig extends AbstractConfig {
+
+ public static final String SMB_LISTING_HOST_CONFIG = "smb.listing.host";
+ private static final String SMB_LISTING_HOST_DOC = "SMB server hostname or IP address";
+
+ public static final String SMB_LISTING_PORT_CONFIG = "smb.listing.port";
+ private static final String SMB_LISTING_PORT_DOC = "SMB server port";
+ private static final int SMB_LISTING_PORT_DEFAULT = 445;
+
+ public static final String SMB_LISTING_SHARE_CONFIG = "smb.listing.share";
+ private static final String SMB_LISTING_SHARE_DOC = "SMB share name";
+
+ public static final String SMB_LISTING_DOMAIN_CONFIG = "smb.listing.domain";
+ private static final String SMB_LISTING_DOMAIN_DOC = "SMB domain/workgroup";
+ private static final String SMB_LISTING_DOMAIN_DEFAULT = "WORKGROUP";
+
+ public static final String SMB_LISTING_USER_CONFIG = "smb.listing.user";
+ private static final String SMB_LISTING_USER_DOC = "SMB username";
+
+ public static final String SMB_LISTING_PASSWORD_CONFIG = "smb.listing.password";
+ private static final String SMB_LISTING_PASSWORD_DOC = "SMB password";
+
+ public static final String SMB_LISTING_DIRECTORY_PATH_CONFIG = "smb.listing.directory.path";
+ private static final String SMB_LISTING_DIRECTORY_PATH_DOC = "The directory path on the SMB share to scan";
+ private static final String SMB_LISTING_DIRECTORY_PATH_DEFAULT = "/";
+
+ public static final String SMB_CONNECTION_TIMEOUT_CONFIG = "smb.connection.timeout";
+ private static final String SMB_CONNECTION_TIMEOUT_DOC = "SMB connection timeout in milliseconds (jcifs.smb.client.connTimeout)";
+ private static final int SMB_CONNECTION_TIMEOUT_DEFAULT = SmbConstants.DEFAULT_CONN_TIMEOUT;
+
+ public static final String SMB_RESPONSE_TIMEOUT_CONFIG = "smb.response.timeout";
+ private static final String SMB_RESPONSE_TIMEOUT_DOC = "SMB response timeout in milliseconds (jcifs.smb.client.responseTimeout)";
+ private static final int SMB_RESPONSE_TIMEOUT_DEFAULT = SmbConstants.DEFAULT_RESPONSE_TIMEOUT;
+
+ public static final String SMB_SO_TIMEOUT_CONFIG = "smb.so.timeout";
+ private static final String SMB_SO_TIMEOUT_DOC = "SMB socket timeout in milliseconds (jcifs.smb.client.soTimeout)";
+ private static final int SMB_SO_TIMEOUT_DEFAULT = SmbConstants.DEFAULT_SO_TIMEOUT;
+
+ public static final String SMB_CONNECTION_RETRIES_CONFIG = "smb.connection.retries";
+ private static final String SMB_CONNECTION_RETRIES_DOC = "Number of retries for SMB connection failures";
+ private static final int SMB_CONNECTION_RETRIES_DEFAULT = 5;
+
+ public static final String SMB_CONNECTION_RETRIES_DELAY_CONFIG = "smb.connection.retries.delay";
+ private static final String SMB_CONNECTION_RETRIES_DELAY_DOC = "Delay between connection retries in milliseconds";
+ private static final int SMB_CONNECTION_RETRIES_DELAY_DEFAULT = 5000;
+
+ public static final String SMB_MAX_VERSION_CONFIG = "smb.protocol.max.version";
+ private static final String SMB_MAX_VERSION_DOC = "Maximum SMB protocol version to use (SMB300, SMB302, SMB311)";
+ private static final String SMB_MAX_VERSION_DEFAULT = "SMB311";
+
+ public SmbFileSystemListingConfig(Map originals) {
+ super(configDef(), originals);
+ }
+
+ public String getSmbHost() {
+ return getString(SMB_LISTING_HOST_CONFIG);
+ }
+
+ public int getSmbPort() {
+ return getInt(SMB_LISTING_PORT_CONFIG);
+ }
+
+ public String getSmbShare() {
+ return getString(SMB_LISTING_SHARE_CONFIG);
+ }
+
+ public String getSmbDomain() {
+ return getString(SMB_LISTING_DOMAIN_CONFIG);
+ }
+
+ public String getSmbUser() {
+ return getString(SMB_LISTING_USER_CONFIG);
+ }
+
+ public String getSmbPassword() {
+ return getPassword(SMB_LISTING_PASSWORD_CONFIG).value();
+ }
+
+ public String getSmbDirectoryPath() {
+ return getString(SMB_LISTING_DIRECTORY_PATH_CONFIG);
+ }
+
+ public int getConnectionTimeout() {
+ return getInt(SMB_CONNECTION_TIMEOUT_CONFIG);
+ }
+
+ public int getResponseTimeout() {
+ return getInt(SMB_RESPONSE_TIMEOUT_CONFIG);
+ }
+
+ public int getSoTimeout() {
+ return getInt(SMB_SO_TIMEOUT_CONFIG);
+ }
+
+ public int getConnectionRetries() {
+ return getInt(SMB_CONNECTION_RETRIES_CONFIG);
+ }
+
+ public int getConnectionRetriesDelay() {
+ return getInt(SMB_CONNECTION_RETRIES_DELAY_CONFIG);
+ }
+
+ public String getSmbMaxVersion() {
+ return getString(SMB_MAX_VERSION_CONFIG);
+ }
+
+ public static ConfigDef configDef() {
+ int configGroupCounter = 0;
+ return new ConfigDef()
+ .define(
+ SMB_LISTING_HOST_CONFIG,
+ ConfigDef.Type.STRING,
+ ConfigDef.NO_DEFAULT_VALUE,
+ ConfigDef.Importance.HIGH,
+ SMB_LISTING_HOST_DOC,
+ "SMB",
+ configGroupCounter++,
+ ConfigDef.Width.NONE,
+ SMB_LISTING_HOST_CONFIG
+ )
+ .define(
+ SMB_LISTING_PORT_CONFIG,
+ ConfigDef.Type.INT,
+ SMB_LISTING_PORT_DEFAULT,
+ ConfigDef.Importance.HIGH,
+ SMB_LISTING_PORT_DOC,
+ "SMB",
+ configGroupCounter++,
+ ConfigDef.Width.NONE,
+ SMB_LISTING_PORT_CONFIG
+ )
+ .define(
+ SMB_LISTING_SHARE_CONFIG,
+ ConfigDef.Type.STRING,
+ ConfigDef.NO_DEFAULT_VALUE,
+ ConfigDef.Importance.HIGH,
+ SMB_LISTING_SHARE_DOC,
+ "SMB",
+ configGroupCounter++,
+ ConfigDef.Width.NONE,
+ SMB_LISTING_SHARE_CONFIG
+ )
+ .define(
+ SMB_LISTING_DOMAIN_CONFIG,
+ ConfigDef.Type.STRING,
+ SMB_LISTING_DOMAIN_DEFAULT,
+ ConfigDef.Importance.MEDIUM,
+ SMB_LISTING_DOMAIN_DOC,
+ "SMB",
+ configGroupCounter++,
+ ConfigDef.Width.NONE,
+ SMB_LISTING_DOMAIN_CONFIG
+ )
+ .define(
+ SMB_LISTING_USER_CONFIG,
+ ConfigDef.Type.STRING,
+ ConfigDef.NO_DEFAULT_VALUE,
+ ConfigDef.Importance.HIGH,
+ SMB_LISTING_USER_DOC,
+ "SMB",
+ configGroupCounter++,
+ ConfigDef.Width.NONE,
+ SMB_LISTING_USER_CONFIG
+ )
+ .define(
+ SMB_LISTING_PASSWORD_CONFIG,
+ ConfigDef.Type.PASSWORD,
+ ConfigDef.NO_DEFAULT_VALUE,
+ ConfigDef.Importance.HIGH,
+ SMB_LISTING_PASSWORD_DOC,
+ "SMB",
+ configGroupCounter++,
+ ConfigDef.Width.NONE,
+ SMB_LISTING_PASSWORD_CONFIG
+ )
+ .define(
+ SMB_LISTING_DIRECTORY_PATH_CONFIG,
+ ConfigDef.Type.STRING,
+ SMB_LISTING_DIRECTORY_PATH_DEFAULT,
+ ConfigDef.Importance.HIGH,
+ SMB_LISTING_DIRECTORY_PATH_DOC,
+ "SMB",
+ configGroupCounter++,
+ ConfigDef.Width.NONE,
+ SMB_LISTING_DIRECTORY_PATH_CONFIG
+ )
+ .define(
+ SMB_CONNECTION_TIMEOUT_CONFIG,
+ ConfigDef.Type.INT,
+ SMB_CONNECTION_TIMEOUT_DEFAULT,
+ ConfigDef.Importance.MEDIUM,
+ SMB_CONNECTION_TIMEOUT_DOC,
+ "SMB",
+ configGroupCounter++,
+ ConfigDef.Width.NONE,
+ SMB_CONNECTION_TIMEOUT_CONFIG
+ )
+ .define(
+ SMB_RESPONSE_TIMEOUT_CONFIG,
+ ConfigDef.Type.INT,
+ SMB_RESPONSE_TIMEOUT_DEFAULT,
+ ConfigDef.Importance.MEDIUM,
+ SMB_RESPONSE_TIMEOUT_DOC,
+ "SMB",
+ configGroupCounter++,
+ ConfigDef.Width.NONE,
+ SMB_RESPONSE_TIMEOUT_CONFIG
+ )
+ .define(
+ SMB_SO_TIMEOUT_CONFIG,
+ ConfigDef.Type.INT,
+ SMB_SO_TIMEOUT_DEFAULT,
+ ConfigDef.Importance.MEDIUM,
+ SMB_SO_TIMEOUT_DOC,
+ "SMB",
+ configGroupCounter++,
+ ConfigDef.Width.NONE,
+ SMB_SO_TIMEOUT_CONFIG
+ )
+ .define(
+ SMB_CONNECTION_RETRIES_CONFIG,
+ ConfigDef.Type.INT,
+ SMB_CONNECTION_RETRIES_DEFAULT,
+ ConfigDef.Importance.MEDIUM,
+ SMB_CONNECTION_RETRIES_DOC,
+ "SMB",
+ configGroupCounter++,
+ ConfigDef.Width.NONE,
+ SMB_CONNECTION_RETRIES_CONFIG
+ )
+ .define(
+ SMB_CONNECTION_RETRIES_DELAY_CONFIG,
+ ConfigDef.Type.INT,
+ SMB_CONNECTION_RETRIES_DELAY_DEFAULT,
+ ConfigDef.Importance.MEDIUM,
+ SMB_CONNECTION_RETRIES_DELAY_DOC,
+ "SMB",
+ configGroupCounter++,
+ ConfigDef.Width.NONE,
+ SMB_CONNECTION_RETRIES_DELAY_CONFIG
+ )
+ .define(
+ SMB_MAX_VERSION_CONFIG,
+ ConfigDef.Type.STRING,
+ SMB_MAX_VERSION_DEFAULT,
+ ConfigDef.Importance.MEDIUM,
+ SMB_MAX_VERSION_DOC,
+ "SMB",
+ configGroupCounter++,
+ ConfigDef.Width.NONE,
+ SMB_MAX_VERSION_CONFIG
+ );
+ }
+}
diff --git a/connect-file-pulse-filesystems/filepulse-smb-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/client/SmbClient.java b/connect-file-pulse-filesystems/filepulse-smb-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/client/SmbClient.java
new file mode 100644
index 00000000..0fec27de
--- /dev/null
+++ b/connect-file-pulse-filesystems/filepulse-smb-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/client/SmbClient.java
@@ -0,0 +1,306 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright (c) StreamThoughts
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.streamthoughts.kafka.connect.filepulse.fs.client;
+
+import io.streamthoughts.kafka.connect.filepulse.errors.ConnectFilePulseException;
+import io.streamthoughts.kafka.connect.filepulse.fs.SmbFileSystemListingConfig;
+import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
+import io.streamthoughts.kafka.connect.filepulse.source.GenericFileObjectMeta;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Stream;
+import jcifs.CIFSContext;
+import jcifs.CIFSException;
+import jcifs.config.PropertyConfiguration;
+import jcifs.context.BaseContext;
+import jcifs.smb.NtlmPasswordAuthenticator;
+import jcifs.smb.SmbFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SMB client wrapper for handling SMB/CIFS connections and operations.
+ */
+public class SmbClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SmbClient.class);
+
+ private final SmbFileSystemListingConfig config;
+ private final CIFSContext cifsContext;
+ private final String baseUrl;
+
+ public SmbClient(SmbFileSystemListingConfig config) {
+ this.config = config;
+ this.cifsContext = createCifsContext();
+ this.baseUrl = buildBaseUrl();
+ }
+
+ /**
+ * Create CIFS context with authentication and configuration.
+ */
+ private CIFSContext createCifsContext() {
+ try {
+ Properties props = new Properties();
+
+ // Configure SMB protocol versions
+ props.setProperty("jcifs.smb.client.maxVersion", config.getSmbMaxVersion());
+ props.setProperty("jcifs.smb.client.minVersion", "SMB300");
+
+ // Configure timeouts
+ props.setProperty("jcifs.smb.client.responseTimeout", String.valueOf(config.getResponseTimeout()));
+ props.setProperty("jcifs.smb.client.connTimeout", String.valueOf(config.getConnectionTimeout()));
+ props.setProperty("jcifs.smb.client.soTimeout", String.valueOf(config.getSoTimeout()));
+
+ PropertyConfiguration propConfig = new PropertyConfiguration(props);
+ BaseContext baseContext = new BaseContext(propConfig);
+
+ // Create authentication credentials
+ NtlmPasswordAuthenticator auth = new NtlmPasswordAuthenticator(
+ config.getSmbDomain(),
+ config.getSmbUser(),
+ config.getSmbPassword()
+ );
+
+ return baseContext.withCredentials(auth);
+ } catch (CIFSException e) {
+ throw new ConnectFilePulseException("Failed to create CIFS context", e);
+ }
+ }
+
+ /**
+ * Build base SMB URL from configuration.
+ */
+ private String buildBaseUrl() {
+ String host = config.getSmbHost();
+ String share = config.getSmbShare();
+
+ // Ensure share doesn't start with /
+ if (share.startsWith("/")) {
+ share = share.substring(1);
+ }
+
+ return String.format("smb://%s/%s/", host, share);
+ }
+
+ /**
+ * List files in the specified directory.
+ */
+ public Stream listFiles(String directoryPath) {
+ String smbUrl = baseUrl + (directoryPath.startsWith("/") ? directoryPath.substring(1) : directoryPath);
+
+ return executeWithRetry(() -> {
+ LOG.debug("Listing files in SMB directory: {}", smbUrl);
+
+ try (SmbFile directory = new SmbFile(smbUrl, cifsContext)) {
+ if (!directory.exists()) {
+ throw new ConnectFilePulseException("SMB directory does not exist: " + smbUrl);
+ }
+
+ if (!directory.isDirectory()) {
+ throw new ConnectFilePulseException("SMB path is not a directory: " + smbUrl);
+ }
+
+ SmbFile[] files = directory.listFiles();
+ if (files == null) {
+ return Stream.empty();
+ }
+
+ return Stream.of(files)
+ .filter(this::isRegularFile)
+ .map(this::buildFileMetadata);
+ }
+ });
+ }
+
+ /**
+ * Check if SMB file is a regular file (not directory).
+ */
+ private boolean isRegularFile(SmbFile file) {
+ try {
+ return file.isFile();
+ } catch (Exception e) {
+ LOG.warn("Failed to check if SMB file is regular: {}", file.getPath(), e);
+ return false;
+ }
+ }
+
+ /**
+ * Build file metadata from SMB file.
+ */
+ private FileObjectMeta buildFileMetadata(SmbFile file) {
+ try {
+ Map metadata = new HashMap<>();
+ metadata.put("smb.server", config.getSmbHost());
+ metadata.put("smb.share", config.getSmbShare());
+
+ return new GenericFileObjectMeta.Builder()
+ .withUri(file.getURL().toURI())
+ .withName(file.getName())
+ .withContentLength(file.length())
+ .withLastModified(file.getLastModified())
+ .withUserDefinedMetadata(metadata)
+ .build();
+ } catch (Exception e) {
+ throw new ConnectFilePulseException("Failed to build file metadata for: " + file.getPath(), e);
+ }
+ }
+
+ /**
+ * Get metadata for a specific file.
+ */
+ public FileObjectMeta getObjectMetadata(URI uri) {
+ return executeWithRetry(() -> {
+ String smbUrl = uri.toString();
+ LOG.debug("Getting metadata for SMB file: {}", smbUrl);
+
+ try (SmbFile file = new SmbFile(smbUrl, cifsContext)) {
+ if (!file.exists()) {
+ throw new ConnectFilePulseException("SMB file does not exist: " + smbUrl);
+ }
+
+ return buildFileMetadata(file);
+ }
+ });
+ }
+
+ /**
+ * Check if file exists.
+ */
+ public boolean exists(URI uri) {
+ return executeWithRetry(() -> {
+ String smbUrl = uri.toString();
+ LOG.debug("Checking if SMB file exists: {}", smbUrl);
+
+ try (SmbFile file = new SmbFile(smbUrl, cifsContext)) {
+ return file.exists() && file.isFile();
+ }
+ });
+ }
+
+ /**
+ * Delete file.
+ */
+ public boolean delete(URI uri) {
+ return executeWithRetry(() -> {
+ String smbUrl = uri.toString();
+ LOG.info("Deleting SMB file: {}", smbUrl);
+
+ try (SmbFile file = new SmbFile(smbUrl, cifsContext)) {
+ if (!file.exists()) {
+ LOG.warn("Cannot delete SMB file - does not exist: {}", smbUrl);
+ return false;
+ }
+
+ file.delete();
+ return true;
+ }
+ });
+ }
+
+ /**
+ * Move/rename file.
+ */
+ public boolean move(URI source, URI dest) {
+ return executeWithRetry(() -> {
+ String sourceSmbUrl = source.toString();
+ String destSmbUrl = dest.toString();
+ LOG.info("Moving SMB file from {} to {}", sourceSmbUrl, destSmbUrl);
+
+ try (SmbFile sourceFile = new SmbFile(sourceSmbUrl, cifsContext);
+ SmbFile destFile = new SmbFile(destSmbUrl, cifsContext)) {
+
+ if (!sourceFile.exists()) {
+ throw new ConnectFilePulseException("Source SMB file does not exist: " + sourceSmbUrl);
+ }
+
+ sourceFile.renameTo(destFile);
+ return true;
+ }
+ });
+ }
+
+ /**
+ * Get input stream for file.
+ */
+ public InputStream getInputStream(URI uri) {
+ return executeWithRetry(() -> {
+ String smbUrl = uri.toString();
+ LOG.debug("Opening input stream for SMB file: {}", smbUrl);
+
+ SmbFile file = new SmbFile(smbUrl, cifsContext);
+
+ if (!file.exists()) {
+ throw new ConnectFilePulseException("SMB file does not exist: " + smbUrl);
+ }
+
+ return file.getInputStream();
+ });
+ }
+
+ /**
+ * Execute operation with retry logic.
+ */
+ private T executeWithRetry(SmbOperation operation) {
+ int retries = config.getConnectionRetries();
+ int delay = config.getConnectionRetriesDelay();
+
+ Exception lastException = null;
+
+ for (int attempt = 0; attempt <= retries; attempt++) {
+ try {
+ return operation.execute();
+ } catch (Exception e) {
+ lastException = e;
+
+ if (!isRetryable(e) || attempt == retries) {
+ LOG.error("SMB operation failed after {} attempts", attempt + 1, e);
+ throw new ConnectFilePulseException(
+ "SMB operation failed after " + (attempt + 1) + " attempts", e);
+ }
+
+ LOG.warn("SMB operation failed (attempt {}/{}), retrying in {}ms: {}",
+ attempt + 1, retries + 1, delay, e.getMessage());
+
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new ConnectFilePulseException("SMB operation interrupted", ie);
+ }
+ }
+ }
+
+ throw new ConnectFilePulseException("SMB operation failed", lastException);
+ }
+
+ private boolean isRetryable(Throwable t) {
+ if (t instanceof ConnectFilePulseException && t.getCause() != null) {
+ return isRetryable(t.getCause());
+ }
+ if (t instanceof IOException && !(t instanceof CIFSException)) {
+ return true;
+ }
+ if (t instanceof CIFSException) {
+ // For now, be conservative: consider network-like CIFS issues retryable,
+ // but leave room to refine based on status codes if needed.
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Functional interface for SMB operations.
+ */
+ @FunctionalInterface
+ private interface SmbOperation {
+ T execute() throws IOException;
+ }
+}
diff --git a/connect-file-pulse-filesystems/filepulse-smb-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/SmbBytesArrayInputReader.java b/connect-file-pulse-filesystems/filepulse-smb-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/SmbBytesArrayInputReader.java
new file mode 100644
index 00000000..e3986bde
--- /dev/null
+++ b/connect-file-pulse-filesystems/filepulse-smb-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/SmbBytesArrayInputReader.java
@@ -0,0 +1,76 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright (c) StreamThoughts
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.streamthoughts.kafka.connect.filepulse.fs.reader;
+
+import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
+import io.streamthoughts.kafka.connect.filepulse.fs.SmbFileStorage;
+import io.streamthoughts.kafka.connect.filepulse.fs.SmbFileSystemListingConfig;
+import io.streamthoughts.kafka.connect.filepulse.fs.reader.text.BytesArrayInputIteratorFactory;
+import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator;
+import io.streamthoughts.kafka.connect.filepulse.reader.StorageAwareFileInputReader;
+import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
+import java.net.URI;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SMB file input reader for reading entire files as byte arrays.
+ * The {@code SmbBytesArrayInputReader} creates one record per input file.
+ * Each record has single field {@code message} containing the content of the file as a byte array.
+ */
+public class SmbBytesArrayInputReader extends AbstractFileInputReader
+ implements StorageAwareFileInputReader {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SmbBytesArrayInputReader.class);
+
+ private SmbFileStorage storage;
+ private BytesArrayInputIteratorFactory factory;
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void configure(Map configs) {
+ super.configure(configs);
+
+ LOG.debug("Configuring SmbBytesArrayInputReader");
+
+ if (storage == null) {
+ storage = initStorage(configs);
+ LOG.debug("SMB storage instantiated successfully");
+ }
+
+ this.factory = initIteratorFactory();
+ }
+
+ BytesArrayInputIteratorFactory initIteratorFactory() {
+ return new BytesArrayInputIteratorFactory(storage, iteratorManager());
+ }
+
+ SmbFileStorage initStorage(Map configs) {
+ final SmbFileSystemListingConfig config = new SmbFileSystemListingConfig(configs);
+ return new SmbFileStorage(config);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public SmbFileStorage storage() {
+ return storage;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected FileInputIterator> newIterator(URI objectURI, IteratorManager iteratorManager) {
+ LOG.info("Getting new iterator for SMB object '{}'", objectURI);
+ return factory.newIterator(objectURI);
+ }
+}
diff --git a/connect-file-pulse-filesystems/filepulse-smb-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/SmbRowFileInputReader.java b/connect-file-pulse-filesystems/filepulse-smb-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/SmbRowFileInputReader.java
new file mode 100644
index 00000000..c25f5157
--- /dev/null
+++ b/connect-file-pulse-filesystems/filepulse-smb-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/SmbRowFileInputReader.java
@@ -0,0 +1,74 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright (c) StreamThoughts
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.streamthoughts.kafka.connect.filepulse.fs.reader;
+
+import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
+import io.streamthoughts.kafka.connect.filepulse.fs.SmbFileStorage;
+import io.streamthoughts.kafka.connect.filepulse.fs.SmbFileSystemListingConfig;
+import io.streamthoughts.kafka.connect.filepulse.fs.reader.text.RowFileInputIteratorConfig;
+import io.streamthoughts.kafka.connect.filepulse.fs.reader.text.RowFileInputIteratorFactory;
+import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator;
+import io.streamthoughts.kafka.connect.filepulse.reader.StorageAwareFileInputReader;
+import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
+import java.net.URI;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SMB file input reader for row-based file formats.
+ */
+public class SmbRowFileInputReader extends AbstractFileInputReader
+ implements StorageAwareFileInputReader {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SmbRowFileInputReader.class);
+
+ private SmbFileStorage storage;
+ private RowFileInputIteratorFactory factory;
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void configure(Map configs) {
+ super.configure(configs);
+
+ LOG.debug("Configuring SmbRowFileInputReader");
+
+ if (storage == null) {
+ storage = initStorage(configs);
+ LOG.debug("SMB storage instantiated successfully");
+ }
+
+ this.factory = new RowFileInputIteratorFactory(
+ new RowFileInputIteratorConfig(configs),
+ storage,
+ iteratorManager());
+ }
+
+ SmbFileStorage initStorage(Map configs) {
+ final SmbFileSystemListingConfig config = new SmbFileSystemListingConfig(configs);
+ return new SmbFileStorage(config);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public SmbFileStorage storage() {
+ return storage;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected FileInputIterator> newIterator(URI objectURI, IteratorManager iteratorManager) {
+ LOG.info("Getting new iterator for SMB object '{}'", objectURI);
+ return factory.newIterator(objectURI);
+ }
+}
diff --git a/connect-file-pulse-filesystems/filepulse-smb-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/SmbFileStorageTest.java b/connect-file-pulse-filesystems/filepulse-smb-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/SmbFileStorageTest.java
new file mode 100644
index 00000000..39d73cb2
--- /dev/null
+++ b/connect-file-pulse-filesystems/filepulse-smb-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/SmbFileStorageTest.java
@@ -0,0 +1,127 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright (c) StreamThoughts
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.streamthoughts.kafka.connect.filepulse.fs;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import io.streamthoughts.kafka.connect.filepulse.errors.ConnectFilePulseException;
+import io.streamthoughts.kafka.connect.filepulse.fs.client.SmbClient;
+import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.UUID;
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class SmbFileStorageTest {
+
+ @Test
+ @SneakyThrows
+ void when_exists_delegates_to_client_and_returns_result() {
+ SmbClient client = mock(SmbClient.class);
+ when(client.exists(Fixture.FILE_URI)).thenReturn(true);
+
+ SmbFileStorage storage = new SmbFileStorage(client);
+
+ assertThat(storage.exists(Fixture.FILE_URI)).isTrue();
+ verify(client).exists(eq(Fixture.FILE_URI));
+ }
+
+ @Test
+ void when_exists_throws_exception_it_should_be_wrapped_in_ConnectFilePulseException() {
+ SmbClient client = mock(SmbClient.class);
+ when(client.exists(Fixture.FILE_URI)).thenThrow(new RuntimeException("boom"));
+
+ SmbFileStorage storage = new SmbFileStorage(client);
+
+ assertThatThrownBy(() -> storage.exists(Fixture.FILE_URI))
+ .isInstanceOf(ConnectFilePulseException.class);
+ }
+
+ @Test
+ @SneakyThrows
+ void when_getObjectMetadata_delegates_to_client_and_returns_metadata() {
+ SmbClient client = mock(SmbClient.class);
+ FileObjectMeta meta = mock(FileObjectMeta.class);
+ when(client.getObjectMetadata(Fixture.FILE_URI)).thenReturn(meta);
+
+ SmbFileStorage storage = new SmbFileStorage(client);
+
+ assertThat(storage.getObjectMetadata(Fixture.FILE_URI)).isEqualTo(meta);
+ verify(client).getObjectMetadata(eq(Fixture.FILE_URI));
+ }
+
+ @Test
+ void when_getObjectMetadata_throws_exception_it_should_be_wrapped_in_ConnectFilePulseException() {
+ SmbClient client = mock(SmbClient.class);
+ when(client.getObjectMetadata(Fixture.FILE_URI)).thenThrow(new RuntimeException("boom"));
+
+ SmbFileStorage storage = new SmbFileStorage(client);
+
+ assertThatThrownBy(() -> storage.getObjectMetadata(Fixture.FILE_URI))
+ .isInstanceOf(ConnectFilePulseException.class);
+ }
+
+ @Test
+ @SneakyThrows
+ void when_getInputStream_delegates_to_client_and_returns_stream() {
+ SmbClient client = mock(SmbClient.class);
+ InputStream is = mock(InputStream.class);
+ when(client.getInputStream(Fixture.FILE_URI)).thenReturn(is);
+
+ SmbFileStorage storage = new SmbFileStorage(client);
+
+ assertThat(storage.getInputStream(Fixture.FILE_URI)).isEqualTo(is);
+ verify(client).getInputStream(eq(Fixture.FILE_URI));
+ }
+
+ @Test
+ void when_getInputStream_throws_exception_it_should_be_wrapped_in_ConnectFilePulseException() {
+ SmbClient client = mock(SmbClient.class);
+ when(client.getInputStream(Fixture.FILE_URI)).thenThrow(new RuntimeException("boom"));
+
+ SmbFileStorage storage = new SmbFileStorage(client);
+
+ assertThatThrownBy(() -> storage.getInputStream(Fixture.FILE_URI))
+ .isInstanceOf(ConnectFilePulseException.class);
+ }
+
+ @Test
+ void when_delete_is_called_it_should_delegate_to_client_delete() {
+ SmbClient client = mock(SmbClient.class);
+ when(client.delete(Fixture.FILE_URI)).thenReturn(true);
+
+ SmbFileStorage storage = new SmbFileStorage(client);
+
+ assertThat(storage.delete(Fixture.FILE_URI)).isTrue();
+ verify(client).delete(eq(Fixture.FILE_URI));
+ }
+
+ @Test
+ void when_move_is_called_it_should_delegate_to_client_move() {
+ SmbClient client = mock(SmbClient.class);
+ when(client.move(Fixture.FILE_URI, Fixture.DEST_URI)).thenReturn(true);
+
+ SmbFileStorage storage = new SmbFileStorage(client);
+
+ assertThat(storage.move(Fixture.FILE_URI, Fixture.DEST_URI)).isTrue();
+ verify(client).move(eq(Fixture.FILE_URI), eq(Fixture.DEST_URI));
+ }
+
+ interface Fixture {
+ URI FILE_URI = URI.create("smb://server/share/" + UUID.randomUUID());
+ URI DEST_URI = URI.create("smb://server/share/dest/" + UUID.randomUUID());
+ }
+}
diff --git a/connect-file-pulse-filesystems/filepulse-smb-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/SmbFileSystemListingConfigTest.java b/connect-file-pulse-filesystems/filepulse-smb-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/SmbFileSystemListingConfigTest.java
new file mode 100644
index 00000000..65ae3314
--- /dev/null
+++ b/connect-file-pulse-filesystems/filepulse-smb-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/SmbFileSystemListingConfigTest.java
@@ -0,0 +1,50 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright (c) StreamThoughts
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.streamthoughts.kafka.connect.filepulse.fs;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.common.config.ConfigDef;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Unit tests for {@link SmbFileSystemListingConfig}.
+ *
+ */
+public class SmbFileSystemListingConfigTest {
+
+ @Test
+ void should_build_valid_config_from_properties() {
+ Map props = new HashMap<>();
+ props.put(SmbFileSystemListingConfig.SMB_LISTING_HOST_CONFIG, "server");
+ props.put(SmbFileSystemListingConfig.SMB_LISTING_SHARE_CONFIG, "share");
+ props.put(SmbFileSystemListingConfig.SMB_LISTING_USER_CONFIG, "user");
+ props.put(SmbFileSystemListingConfig.SMB_LISTING_PASSWORD_CONFIG, "pass");
+
+ SmbFileSystemListingConfig config = new SmbFileSystemListingConfig(props);
+
+ assertThat(config.getSmbHost()).isEqualTo("server");
+ assertThat(config.getSmbShare()).isEqualTo("share");
+ assertThat(config.getSmbUser()).isEqualTo("user");
+ assertThat(config.getSmbPassword()).isEqualTo("pass");
+ }
+
+ @Test
+ void configDef_should_define_required_fields() {
+ ConfigDef def = SmbFileSystemListingConfig.configDef();
+
+ assertThat(def.configKeys())
+ .containsKeys(
+ SmbFileSystemListingConfig.SMB_LISTING_HOST_CONFIG,
+ SmbFileSystemListingConfig.SMB_LISTING_SHARE_CONFIG,
+ SmbFileSystemListingConfig.SMB_LISTING_USER_CONFIG,
+ SmbFileSystemListingConfig.SMB_LISTING_PASSWORD_CONFIG
+ );
+ }
+}
diff --git a/connect-file-pulse-filesystems/filepulse-smb-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/SmbFileSystemListingTest.java b/connect-file-pulse-filesystems/filepulse-smb-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/SmbFileSystemListingTest.java
new file mode 100644
index 00000000..7fa6dfe2
--- /dev/null
+++ b/connect-file-pulse-filesystems/filepulse-smb-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/SmbFileSystemListingTest.java
@@ -0,0 +1,114 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright (c) StreamThoughts
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.streamthoughts.kafka.connect.filepulse.fs;
+
+import static java.time.Instant.ofEpochMilli;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import io.streamthoughts.kafka.connect.filepulse.fs.client.SmbClient;
+import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
+import io.streamthoughts.kafka.connect.filepulse.source.GenericFileObjectMeta;
+import java.net.URI;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class SmbFileSystemListingTest {
+
+ @Test
+ @Order(1)
+ void when_no_filter_specified_listObjects_should_return_all_files_metadata() {
+ Stream entries = Stream.of(Fixture.VISITOR_META, Fixture.REFERRER_META);
+
+ SmbFileSystemListing listing = buildSmbFilesystemListingMock(entries, Collections.emptyList());
+
+ Collection result = listing.listObjects();
+
+ assertThat(result).hasSize(2);
+ assertThat(result).containsExactlyInAnyOrder(Fixture.VISITOR_META, Fixture.REFERRER_META);
+ }
+
+ @Test
+ @Order(2)
+ void when_filter_specified_listObjects_should_return_only_matching_files_metadata() {
+ Stream entries = Stream.of(Fixture.VISITOR_META, Fixture.REFERRER_META);
+
+ SmbFileSystemListing listing = buildSmbFilesystemListingMock(entries, Collections.singletonList(files ->
+ files.stream()
+ .filter(f -> f.name().matches(Fixture.VISITOR_REGEX))
+ .collect(Collectors.toList())));
+
+ Collection result = listing.listObjects();
+
+ assertThat(result).hasSize(1);
+ assertThat(result).containsExactlyInAnyOrder(Fixture.VISITOR_META);
+ }
+
+ @SneakyThrows
+ private SmbFileSystemListing buildSmbFilesystemListingMock(Stream entries,
+ List filters) {
+ SmbFileSystemListingConfig config = mock(SmbFileSystemListingConfig.class);
+ when(config.getSmbDirectoryPath()).thenReturn(Fixture.PATH);
+ when(config.getSmbHost()).thenReturn("smb-host");
+ when(config.getSmbShare()).thenReturn("share");
+
+ SmbClient client = mock(SmbClient.class);
+ doReturn(entries).when(client).listFiles(anyString());
+
+ SmbFileSystemListing listing = spy(new SmbFileSystemListing(filters));
+ doReturn(client).when(listing).getSmbClient();
+ doReturn(config).when(listing).getConfig();
+
+ return listing;
+ }
+
+ private static URI buildFileURI(String fileName) {
+ return URI.create(String.format("%s/%s", Fixture.PATH, fileName));
+ }
+
+ interface Fixture {
+ String PATH = "/userdata";
+ String VISITOR_REGEX = "^visitors_[a-zA-Z0-9_-]+.csv";
+
+ String VISITOR_NAME = "visitors_2025-01-01.csv";
+ String REFERRER_NAME = "referrer_2025-01-01.csv";
+
+ long VISITOR_MTIME = Instant.parse("2025-01-01T00:00:00Z").toEpochMilli();
+ long REFERRER_MTIME = Instant.parse("2025-01-02T00:00:00Z").toEpochMilli();
+
+ long VISITOR_SIZE = 1024L;
+ long REFERRER_SIZE = 2048L;
+
+ FileObjectMeta VISITOR_META = new GenericFileObjectMeta.Builder()
+ .withName(VISITOR_NAME)
+ .withUri(buildFileURI(VISITOR_NAME))
+ .withLastModified(ofEpochMilli(VISITOR_MTIME))
+ .withContentLength(VISITOR_SIZE)
+ .build();
+
+ FileObjectMeta REFERRER_META = new GenericFileObjectMeta.Builder()
+ .withName(REFERRER_NAME)
+ .withUri(buildFileURI(REFERRER_NAME))
+ .withLastModified(ofEpochMilli(REFERRER_MTIME))
+ .withContentLength(REFERRER_SIZE)
+ .build();
+ }
+}
diff --git a/connect-file-pulse-filesystems/pom.xml b/connect-file-pulse-filesystems/pom.xml
index d6491b02..beecd468 100644
--- a/connect-file-pulse-filesystems/pom.xml
+++ b/connect-file-pulse-filesystems/pom.xml
@@ -36,6 +36,7 @@ limitations under the License.
filepulse-azure-storage-fs
filepulse-google-cloud-storage-fs
filepulse-sftp-fs
+ filepulse-smb-fs
filepulse-aliyunoss-fs
diff --git a/docs/content/en/docs/Developer Guide/file-readers.md b/docs/content/en/docs/Developer Guide/file-readers.md
index 2594e261..e1d0887b 100644
--- a/docs/content/en/docs/Developer Guide/file-readers.md
+++ b/docs/content/en/docs/Developer Guide/file-readers.md
@@ -42,6 +42,13 @@ package: `io.streamthoughts.kafka.connect.filepulse.fs.reader`
* `GcsXMLFileInputReader`
* `GcsMetadataFileInputReader`
+**SMB**
+
+package: `io.streamthoughts.kafka.connect.filepulse.fs.reader`
+
+* `SmbBytesArrayInputReader`
+* `SmbRowFileInputReader`
+
**Local Filesystem**
package: `io.streamthoughts.kafka.connect.filepulse.fs.reader`
diff --git a/docs/content/en/docs/Developer Guide/file-system-listing/smb-filesystem.md b/docs/content/en/docs/Developer Guide/file-system-listing/smb-filesystem.md
new file mode 100644
index 00000000..29166fe1
--- /dev/null
+++ b/docs/content/en/docs/Developer Guide/file-system-listing/smb-filesystem.md
@@ -0,0 +1,163 @@
+---
+date: 2025-11-17
+title: "SMB/CIFS"
+linkTitle: "SMB/CIFS"
+weight: 20
+description: >
+ Learn how to configure the `SmbFilesystemListing` to read files from SMB 3.x network shares.
+---
+
+The `SmbFilesystemListing` class can be used for listing files on SMB 3.x network shares (Windows file shares or Samba 4.x).
+
+{{% alert title="Security Notice" color="warning" %}}
+This implementation supports **SMB 3.0, 3.0.2, and 3.1.1 only**. SMB 1.0 and 2.x are intentionally disabled for security reasons. Ensure your file servers support SMB 3.0 or later.
+{{% /alert %}}
+
+## How to use it ?
+
+Use the following property in your Connector's configuration:
+
+`fs.listing.class=io.streamthoughts.kafka.connect.filepulse.fs.SmbFileSystemListing`
+
+## Configuration
+
+The following table describes the properties that can be used to configure the `SmbFilesystemListing`:
+
+| Configuration | Description | Type | Default | Importance |
+|----------------------------------|------------------------------------------------------------------|-----------|---------------------------------------|------------|
+| `smb.listing.host` | SMB server hostname or IP address | `string` | - | HIGH |
+| `smb.listing.port` | SMB server port | `int` | `445` | HIGH |
+| `smb.listing.share` | SMB share name | `string` | - | HIGH |
+| `smb.listing.domain` | SMB domain/workgroup | `string` | `WORKGROUP` | MEDIUM |
+| `smb.listing.user` | SMB username | `string` | - | HIGH |
+| `smb.listing.password` | SMB password | `string` | - | HIGH |
+| `smb.listing.directory.path` | The directory path on the SMB share to scan | `string` | `/` | HIGH |
+| `smb.connection.timeout` | SMB connection timeout in milliseconds (jcifs.smb.client.connTimeout) | `int` | `30000` | MEDIUM |
+| `smb.response.timeout` | SMB response timeout in milliseconds (jcifs.smb.client.responseTimeout) | `int` | `30000` | MEDIUM |
+| `smb.so.timeout` | SMB socket timeout in milliseconds (jcifs.smb.client.soTimeout) | `int` | `30000` | MEDIUM |
+| `smb.connection.retries` | Number of retries for SMB connection failures | `int` | `5` | MEDIUM |
+| `smb.connection.retries.delay` | Delay between connection retries in milliseconds | `int` | `5000` | MEDIUM |
+| `smb.protocol.max.version` | Maximum SMB protocol version (SMB300, SMB302, SMB311) | `string` | `SMB311` | MEDIUM |
+
+{{% alert title="Note" color="info" %}}
+**SMB Protocol Version**: The minimum version is hardcoded to SMB 3.0 for security. The maximum version is configurable and defaults to SMB 3.1.1 (SMB311). You can set it to SMB300 or SMB302 if needed for compatibility with older servers.
+
+**Timeout Configuration**: Three separate timeout values can be configured:
+- `smb.connection.timeout` - Timeout for establishing the initial connection
+- `smb.response.timeout` - Timeout for receiving responses from the server
+- `smb.so.timeout` - Socket timeout for read operations
+{{% /alert %}}
+
+## Example Configuration
+
+Here's a complete example configuration for reading CSV files from a Windows SMB share:
+
+```properties
+# Connector configuration
+name=smb-file-pulse-connector
+connector.class=io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector
+fs.listing.class=io.streamthoughts.kafka.connect.filepulse.fs.SmbFileSystemListing
+
+# SMB server configuration
+smb.listing.host=192.168.1.100
+smb.listing.port=445
+smb.listing.share=shared_files
+smb.listing.domain=MYDOMAIN
+smb.listing.user=kafkauser
+smb.listing.password=secretpassword
+smb.listing.directory.path=/data/csv
+
+# Connection and protocol configuration
+smb.connection.timeout=30000
+smb.response.timeout=30000
+smb.so.timeout=30000
+smb.connection.retries=5
+smb.connection.retries.delay=5000
+smb.protocol.max.version=SMB311
+
+# File reader configuration
+file.filter.regex.pattern=.*\\.csv$
+tasks.reader.class=io.streamthoughts.kafka.connect.filepulse.fs.reader.SmbRowFileInputReader
+
+# Kafka topic
+kafka.topic=smb-data-topic
+```
+
+## Supported Features
+
+- ✅ **SMB 3.0/3.0.2/3.1.1** protocol support only (SMB 1.0 and 2.x disabled)
+- ✅ **Windows Server 2012+** and **Samba 4.x** compatibility
+- ✅ **Domain authentication** (Active Directory)
+- ✅ **NTLMv2 authentication** (automatically negotiated by jCIFS library)
+- ✅ **SMB 3.x encryption** support for secure file transfers (automatically negotiated)
+- ✅ **Automatic retry** on connection failures
+- ✅ **File operations**: list, read, move, delete
+- ✅ **Large file support** with streaming
+- ✅ **Proper resource management** to prevent file lock issues
+
+## Security Considerations
+
+### Protocol Security
+
+The connector enforces the following security measures:
+- **SMB 3.0** minimum version (hardcoded for security)
+- **SMB 3.1.1** maximum version (configurable, default)
+- **SMB signing** is enabled by default for message integrity
+- **SMB encryption** is automatically negotiated when supported by the server
+
+### Network
+
+- Ensure SMB port (445) is accessible from Kafka Connect workers
+
+### Permissions
+
+Grant the SMB user only the necessary permissions:
+- **Read** permission for file listing and ingestion
+- **Write** and **Delete** permissions only if using file move/cleanup strategies
+
+## Troubleshooting
+
+### Connection Issues
+
+If you encounter connection problems:
+
+1. **Verify network connectivity**: `ping `
+2. **Check SMB port**: `telnet 445` or `nc -zv 445`
+3. **Test credentials**: Try connecting manually with `smbclient` (Linux) or Windows Explorer
+4. **Check firewall rules**: Ensure port 445 is not blocked
+5. **Verify SMB 3.x support**: Check server configuration supports SMB 3.0 or later
+
+Example smbclient test:
+```bash
+smbclient /// -U / -m SMB3
+```
+
+## Compatibility
+
+### Supported Platforms
+
+- **Windows Server**: 2012 and later (SMB 3.0 introduced)
+- **Windows Client**: Windows 8/10/11
+- **Samba**: 4.0 and later (SMB 3.0 support)
+- **NAS devices**: Modern NAS with SMB 3.0+ support
+
+### Unsupported (Security Reasons)
+
+- ❌ **Windows Server 2008 R2** and earlier (SMB 2.1 and older)
+- ❌ **Samba 3.x** (no SMB 3.0 support)
+- ❌ **Legacy NAS** devices without SMB 3.0
+- ❌ **SMB 1.0 clients/servers** (deprecated, security risk)
+
+## Known Limitations
+
+- Kerberos authentication is not supported (only NTLMv2)
+
+## Internal Implementation Details
+
+The connector uses the [jCIFS library (version 2.1.40)](https://github.com/codelibs/jcifs) for SMB protocol implementation.
+
+**Configurable jCIFS properties:**
+- **Connection timeout**: Configurable via `smb.connection.timeout` (maps to `jcifs.smb.client.connTimeout`)
+- **Response timeout**: Configurable via `smb.response.timeout` (maps to `jcifs.smb.client.responseTimeout`)
+- **Socket timeout**: Configurable via `smb.so.timeout` (maps to `jcifs.smb.client.soTimeout`)
+- **SMB protocol versions**: Min=SMB300 (hardcoded), Max=configurable (default SMB311)