Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions connect-file-pulse-filesystems/filepulse-smb-fs/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
SPDX-License-Identifier: Apache-2.0
Copyright (c) StreamThoughts

Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.streamthoughts</groupId>
<artifactId>kafka-connect-filepulse-filesystems</artifactId>
<version>2.17.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<name>Kafka Connect Source File Pulse SMB FS</name>
<artifactId>kafka-connect-filepulse-smb-fs</artifactId>

<properties>
<checkstyle.config.location>${project.parent.basedir}/..</checkstyle.config.location>
<license.header.file>${project.parent.basedir}/../header</license.header.file>
<jcifs.version>2.1.40</jcifs.version>
<mockito-junit-jupiter.version>5.5.0</mockito-junit-jupiter.version>
<assertj-core.version>3.26.3</assertj-core.version>
<lombok.version>1.18.34</lombok.version>
</properties>

<dependencies>
<dependency>
<groupId>org.codelibs</groupId>
<artifactId>jcifs</artifactId>
<version>${jcifs.version}</version>
</dependency>
<dependency>
<groupId>io.streamthoughts</groupId>
<artifactId>kafka-connect-filepulse-commons-fs</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
<!-- Test Dependencies-->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<version>${mockito-junit-jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj-core.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright (c) StreamThoughts
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.streamthoughts.kafka.connect.filepulse.fs;

import io.streamthoughts.kafka.connect.filepulse.errors.ConnectFilePulseException;
import io.streamthoughts.kafka.connect.filepulse.fs.client.SmbClient;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
import java.io.InputStream;
import java.net.URI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Implementation of {@link Storage} for SMB/CIFS file systems.
*/
public class SmbFileStorage implements Storage {

private static final Logger LOG = LoggerFactory.getLogger(SmbFileStorage.class);

private final SmbClient smbClient;

public SmbFileStorage(SmbFileSystemListingConfig config) {
this.smbClient = new SmbClient(config);
}

SmbFileStorage(SmbClient smbClient) {
this.smbClient = smbClient;
}

/**
* {@inheritDoc}
*/
@Override
public FileObjectMeta getObjectMetadata(URI uri) {
LOG.debug("Getting object metadata for '{}'", uri);
try {
return smbClient.getObjectMetadata(uri);
} catch (Exception e) {
throw new ConnectFilePulseException(String.format("Cannot stat file with uri: %s", uri), e);
}
}

/**
* {@inheritDoc}
*/
@Override
public boolean exists(URI uri) {
LOG.debug("Checking if '{}' exists", uri);
try {
return smbClient.exists(uri);
} catch (Exception e) {
throw new ConnectFilePulseException(
String.format("Failed to check if SMB file exists: %s", uri), e);
}
}

/**
* {@inheritDoc}
*/
@Override
public boolean delete(URI uri) {
LOG.info("Deleting '{}'", uri);
try {
return smbClient.delete(uri);
} catch (Exception e) {
LOG.error("Failed to delete SMB file: {}", uri, e);
throw new ConnectFilePulseException("Failed to delete SMB file: " + uri, e);
}
}

/**
* {@inheritDoc}
*/
@Override
public boolean move(URI source, URI dest) {
LOG.info("Moving '{}' to '{}'", source, dest);
try {
return smbClient.move(source, dest);
} catch (Exception e) {
LOG.error("Failed to move SMB file from {} to {}", source, dest, e);
throw new ConnectFilePulseException(
String.format("Failed to move SMB file from %s to %s", source, dest), e);
}
}

/**
* {@inheritDoc}
*/
@Override
public InputStream getInputStream(URI uri) {
LOG.debug("Getting input stream for '{}'", uri);
try {
return smbClient.getInputStream(uri);
} catch (Exception e) {
LOG.error("Failed to get input stream for SMB file: {}", uri, e);
throw new ConnectFilePulseException("Failed to get input stream for SMB file: " + uri, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright (c) StreamThoughts
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.streamthoughts.kafka.connect.filepulse.fs;

import io.streamthoughts.kafka.connect.filepulse.fs.client.SmbClient;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Implementation of {@link FileSystemListing} for SMB/CIFS file systems.
*/
public class SmbFileSystemListing implements FileSystemListing<SmbFileStorage> {

private static final Logger LOG = LoggerFactory.getLogger(SmbFileSystemListing.class);

private FileListFilter filter;
private SmbFileSystemListingConfig config;
private SmbClient smbClient;

public SmbFileSystemListing(final List<FileListFilter> filters) {
Objects.requireNonNull(filters, "filters can't be null");
this.filter = new CompositeFileListFilter(filters);
}

@SuppressWarnings("unused")
public SmbFileSystemListing() {
this(Collections.emptyList());
}

/**
* {@inheritDoc}
*/
@Override
public void configure(final Map<String, ?> configs) {
LOG.debug("Configuring SmbFilesystemListing");
config = new SmbFileSystemListingConfig(configs);
smbClient = new SmbClient(config);
}

/**
* {@inheritDoc}
*/
@Override
public Collection<FileObjectMeta> listObjects() {
String listingDirectoryPath = getConfig().getSmbDirectoryPath();

LOG.info("Listing SMB files in directory: smb://{}/{}{}",
getConfig().getSmbHost(),
getConfig().getSmbShare(),
listingDirectoryPath);

List<FileObjectMeta> filesMetadata = getSmbClient()
.listFiles(listingDirectoryPath)
.collect(Collectors.toList());

LOG.info("Found {} files in SMB directory before filtering", filesMetadata.size());

Collection<FileObjectMeta> filteredFiles = filter.filterFiles(filesMetadata);

LOG.info("Returning {} files after filtering", filteredFiles.size());

return filteredFiles;
}

/**
* {@inheritDoc}
*/
@Override
public void setFilter(FileListFilter filter) {
this.filter = filter;
}

/**
* {@inheritDoc}
*/
@Override
public SmbFileStorage storage() {
return new SmbFileStorage(config);
}

SmbClient getSmbClient() {
return smbClient;
}

SmbFileSystemListingConfig getConfig() {
return config;
}
}
Loading
Loading