Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
89 commits
Select commit Hold shift + click to select a range
aa38ee2
TIKA-4181 - grpc server and client
Feb 11, 2024
c59be22
latest updates - wip
Mar 2, 2024
f1f3d38
code formatting
Mar 29, 2024
c7cae5f
fix the issues with deps and such
Mar 29, 2024
d57a557
add bidi streaming
Mar 29, 2024
f80844e
clean up wording
Mar 29, 2024
ce8c887
make it as a stale connection expiring store
Mar 29, 2024
cc144e1
clean
Mar 29, 2024
35f803d
delete dead code
Mar 29, 2024
818ad66
add closeable
Mar 29, 2024
e35102f
add closeable
Mar 29, 2024
9ce7aaf
add closeable
Mar 29, 2024
6796079
fixed issues related to proto lint
Mar 31, 2024
03b6477
example docker file
Apr 4, 2024
e9db491
example docker file and install docker file
Apr 4, 2024
adf7964
fix the rm
Apr 4, 2024
d5f171e
more info
Apr 4, 2024
9e02b84
--platform linux/arm64
Apr 4, 2024
9027524
--platform linux/arm64
Apr 4, 2024
d1ddb3f
add work around comment for future
Apr 4, 2024
eaff6a7
add comment
Apr 4, 2024
ddac790
fix issues with files in wrong dir
Apr 4, 2024
1dcea29
add quotes
Apr 4, 2024
7de1c52
add a few more cleanups
Apr 4, 2024
8b65ea2
add logging
Apr 10, 2024
ddac083
fix issues and add tests for get, delete
Apr 10, 2024
4f928ac
push latest fixes
Apr 10, 2024
c24be46
TIKA-4229
Mar 28, 2024
786771a
TIKA-4229
Mar 28, 2024
b376b32
jwt fetcher initial commit
Apr 4, 2024
4b83d39
add jwt fetching
Apr 5, 2024
a3edd8e
jwt generation
Apr 5, 2024
4496249
jwt generation
Apr 5, 2024
60f2385
jwt config
Apr 10, 2024
568f3d8
remaining merges to get code up to date
Apr 12, 2024
7278437
add start of a test scenario
Apr 22, 2024
399172b
add a fully functional http test case
Apr 22, 2024
1d88c9e
add mtls as an option
Apr 23, 2024
cdba335
add mtls in the example
Apr 23, 2024
04d1f5e
fix merge issue.
Apr 23, 2024
4f0be91
add some robustness and add an exec
May 7, 2024
cf812e7
TIKA-4247 HttpFetcher - add ability to send request headers
Apr 29, 2024
3d0babd
Fix issues with the fetch metadata
May 8, 2024
590b650
TIKA-4252: fix metadata issue
May 9, 2024
0e5a4b6
TIKA-4252: fix metadata issue
May 9, 2024
377db2d
TIKA-4252: fix defaults. fix header parsing.
May 9, 2024
5ee0f0b
TIKA-4252: shorten huge line
May 9, 2024
1da95c4
TIKA-4252: make metadata optional
May 9, 2024
88e725e
TIKA-4252: fail when you should and get rid of empty catch
May 9, 2024
a7ede04
TIKA-4252: duplicate fetcher - don't fail for now.
May 10, 2024
f8fb719
TIKA-4252: add http request headers at fetcher config level
May 24, 2024
6c50dba
skip ossindex because i'm out of sync with master
May 26, 2024
829e506
TIKA-4252: fix issue with config param serialization.
May 26, 2024
6afdeca
TIKA-4252: fix metadata issue
May 9, 2024
fa1eb93
TIKA-4252: fix issue with config param serialization.
May 26, 2024
ee852b7
TIKA-4252: add error path
Jun 18, 2024
85dfbe6
TIKA-4252: add protection against null metadata
Jun 18, 2024
10cc9bd
TIKA-4252: fix merge conflicts from main
Jun 19, 2024
2c48c0c
TIKA-4252: add json schema methods
Jun 19, 2024
29b920b
TIKA-4252: fix broken tests, useless method
Jun 19, 2024
c30392d
TIKA-4252: remove stupid sleep from test
Jun 20, 2024
d115dba
TIKA-4252: fix checkstyle issue
Jun 20, 2024
296855f
TIKA-4252: log violations to console
Jun 20, 2024
bef26d8
TIKA-4252: log violations to console
Jun 20, 2024
9c63154
TIKA-4252: exclude generated sources
Jun 20, 2024
09d499d
TIKA-4252: if tika config is read-only, use a tmp file for tika serve…
Jun 23, 2024
9cab0ce
TIKA-4252: if tika config is read-only, use a tmp file for tika serve…
Jun 23, 2024
9204f7a
resolve conflicts
Jun 23, 2024
15ed7da
improve build script
Jun 24, 2024
d69976c
TIKA-4166: update commons-collections4
THausherr Jun 21, 2024
cf5bc75
add a health check
Jun 24, 2024
7121da6
fix wrongly named config class
Jul 11, 2024
a473bf6
add a go package
Jul 12, 2024
b1f4b70
TIKA-4272: fix issue with headers for all requests
Sep 6, 2024
7aaa3b9
TIKA-4272: back to additional metadata the old way
Sep 6, 2024
6a47348
TIKA-4272: fix an issue where the aad credential was not serializing …
Nov 1, 2024
17b17f1
TIKA-4272: fix an issue where the aad credential was not serializing …
Nov 1, 2024
b21f1d6
TIKA-4272: add another assertion
Nov 1, 2024
9566bc8
TIKA-4272: add microsoft graph to tika server
Nov 1, 2024
678f7b8
TIKA-4272: apply fixes to ms graph connector
Nov 1, 2024
165c21c
TIKA-4272: don't check in fetcher
Nov 1, 2024
b44d7c2
TIKA-4272: don't check in fetcher
Nov 1, 2024
cde6e8a
TIKA-4272: push config
Nov 7, 2024
79227d7
Add GoogleFetcher
bartek Nov 28, 2024
3822c9b
fixup! Add GoogleFetcher
bartek Dec 5, 2024
bff8774
fixup! Add GoogleFetcher
bartek Dec 5, 2024
5be2d80
fixup! Add GoogleFetcher
bartek Dec 5, 2024
1956f24
fixup! Add GoogleFetcher
bartek Dec 5, 2024
4c254df
Target tika-fetcher-google for 4.0.0-SNAPSHOT
bartek Dec 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ public static PipesConfig load(Path tikaConfig) throws IOException, TikaConfigEx
return pipesConfig;
}

public static PipesConfig load(InputStream tikaConfigInputStream) throws IOException, TikaConfigException {
PipesConfig pipesConfig = new PipesConfig();
pipesConfig.configure("pipes", tikaConfigInputStream);
return pipesConfig;
}

private PipesConfig() {

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ public class PipesConfigBase extends ConfigBase {
private int numClients = DEFAULT_NUM_CLIENTS;

private int maxFilesProcessedPerProcess = DEFAULT_MAX_FILES_PROCESSED_PER_PROCESS;

public static final int DEFAULT_STALE_FETCHER_TIMEOUT_SECONDS = 600;
private int staleFetcherTimeoutSeconds = DEFAULT_STALE_FETCHER_TIMEOUT_SECONDS;
public static final int DEFAULT_STALE_FETCHER_DELAY_SECONDS = 60;
private int staleFetcherDelaySeconds = DEFAULT_STALE_FETCHER_DELAY_SECONDS;
private List<String> forkedJvmArgs = new ArrayList<>();
private Path tikaConfig;
private String javaPath = "java";
Expand Down Expand Up @@ -171,4 +174,20 @@ public long getSleepOnStartupTimeoutMillis() {
public void setSleepOnStartupTimeoutMillis(long sleepOnStartupTimeoutMillis) {
this.sleepOnStartupTimeoutMillis = sleepOnStartupTimeoutMillis;
}

public int getStaleFetcherTimeoutSeconds() {
return staleFetcherTimeoutSeconds;
}

public void setStaleFetcherTimeoutSeconds(int staleFetcherTimeoutSeconds) {
this.staleFetcherTimeoutSeconds = staleFetcherTimeoutSeconds;
}

public int getStaleFetcherDelaySeconds() {
return staleFetcherDelaySeconds;
}

public void setStaleFetcherDelaySeconds(int staleFetcherDelaySeconds) {
this.staleFetcherDelaySeconds = staleFetcherDelaySeconds;
}
}
98 changes: 54 additions & 44 deletions tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,11 @@
import org.apache.tika.pipes.emitter.EmitterManager;
import org.apache.tika.pipes.emitter.StreamEmitter;
import org.apache.tika.pipes.emitter.TikaEmitterException;
import org.apache.tika.pipes.extractor.EmbeddedDocumentBytesConfig;
import org.apache.tika.pipes.extractor.EmittingEmbeddedDocumentBytesHandler;
import org.apache.tika.pipes.fetcher.FetchKey;
import org.apache.tika.pipes.fetcher.Fetcher;
import org.apache.tika.pipes.fetcher.FetcherManager;
import org.apache.tika.pipes.fetcher.RangeFetcher;
import org.apache.tika.sax.BasicContentHandlerFactory;
import org.apache.tika.sax.ContentHandlerFactory;
import org.apache.tika.sax.RecursiveParserWrapperHandler;
Expand Down Expand Up @@ -280,7 +281,7 @@ private String getContainerStacktrace(FetchEmitTuple t, List<Metadata> metadataL

private void emit(String taskId, EmitKey emitKey,
boolean isExtractEmbeddedBytes, MetadataListAndEmbeddedBytes parseData,
String parseExceptionStack, ParseContext parseContext) {
String parseExceptionStack) {
Emitter emitter = null;

try {
Expand All @@ -296,7 +297,7 @@ private void emit(String taskId, EmitKey emitKey,
parseData.toBePackagedForStreamEmitter()) {
emitContentsAndBytes(emitter, emitKey, parseData);
} else {
emitter.emit(emitKey.getEmitKey(), parseData.getMetadataList(), parseContext);
emitter.emit(emitKey.getEmitKey(), parseData.getMetadataList());
}
} catch (IOException | TikaEmitterException e) {
LOG.warn("emit exception", e);
Expand Down Expand Up @@ -377,7 +378,7 @@ private void actuallyParse(FetchEmitTuple t) {
LOG.trace("timer -- to parse: {} ms", System.currentTimeMillis() - start);
}

if (metadataIsEmpty(parseData.getMetadataList())) {
if (parseData == null || metadataIsEmpty(parseData.getMetadataList())) {
write(STATUS.EMPTY_OUTPUT);
return;
}
Expand All @@ -400,26 +401,23 @@ private void emitParseData(FetchEmitTuple t, MetadataListAndEmbeddedBytes parseD
String stack = getContainerStacktrace(t, parseData.getMetadataList());
//we need to apply this after we pull out the stacktrace
filterMetadata(parseData.getMetadataList());
ParseContext parseContext = t.getParseContext();
FetchEmitTuple.ON_PARSE_EXCEPTION onParseException = t.getOnParseException();
EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig = parseContext.get(EmbeddedDocumentBytesConfig.class);
if (StringUtils.isBlank(stack) ||
onParseException == FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT) {
t.getOnParseException() == FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT) {
injectUserMetadata(t.getMetadata(), parseData.getMetadataList());
EmitKey emitKey = t.getEmitKey();
if (StringUtils.isBlank(emitKey.getEmitKey())) {
emitKey = new EmitKey(emitKey.getEmitterName(), t.getFetchKey().getFetchKey());
t.setEmitKey(emitKey);
}
EmitData emitData = new EmitData(t.getEmitKey(), parseData.getMetadataList(), stack);
if (embeddedDocumentBytesConfig.isExtractEmbeddedDocumentBytes() &&
if (t.getEmbeddedDocumentBytesConfig().isExtractEmbeddedDocumentBytes() &&
parseData.toBePackagedForStreamEmitter()) {
emit(t.getId(), emitKey, embeddedDocumentBytesConfig.isExtractEmbeddedDocumentBytes(),
parseData, stack, parseContext);
emit(t.getId(), emitKey, t.getEmbeddedDocumentBytesConfig().isExtractEmbeddedDocumentBytes(),
parseData, stack);
} else if (maxForEmitBatchBytes >= 0 &&
emitData.getEstimatedSizeBytes() >= maxForEmitBatchBytes) {
emit(t.getId(), emitKey, embeddedDocumentBytesConfig.isExtractEmbeddedDocumentBytes(),
parseData, stack, parseContext);
emit(t.getId(), emitKey, t.getEmbeddedDocumentBytesConfig().isExtractEmbeddedDocumentBytes(),
parseData, stack);
} else {
//send back to the client
write(emitData);
Expand Down Expand Up @@ -458,18 +456,35 @@ private Fetcher getFetcher(FetchEmitTuple t) {
}

protected MetadataListAndEmbeddedBytes parseFromTuple(FetchEmitTuple t, Fetcher fetcher) {

Metadata metadata = new Metadata();
try (InputStream stream = fetcher.fetch(t.getFetchKey().getFetchKey(), metadata, t.getParseContext())) {
return parseWithStream(t, stream, metadata);
} catch (SecurityException e) {
LOG.error("security exception " + t.getId(), e);
throw e;
} catch (TikaException | IOException e) {
LOG.warn("fetch exception " + t.getId(), e);
write(STATUS.FETCH_EXCEPTION, ExceptionUtils.getStackTrace(e));
FetchKey fetchKey = t.getFetchKey();
if (fetchKey.hasRange()) {
if (!(fetcher instanceof RangeFetcher)) {
throw new IllegalArgumentException(
"fetch key has a range, but the fetcher is not a range fetcher");
}
Metadata metadata = t.getMetadata() == null ? new Metadata() : t.getMetadata();
try (InputStream stream = ((RangeFetcher) fetcher).fetch(fetchKey.getFetchKey(),
fetchKey.getRangeStart(), fetchKey.getRangeEnd(), metadata)) {
return parseWithStream(t, stream, metadata);
} catch (SecurityException e) {
LOG.error("security exception " + t.getId(), e);
throw e;
} catch (TikaException | IOException e) {
LOG.warn("fetch exception " + t.getId(), e);
write(STATUS.FETCH_EXCEPTION, ExceptionUtils.getStackTrace(e));
}
} else {
Metadata metadata = t.getMetadata() == null ? new Metadata() : t.getMetadata();
try (InputStream stream = fetcher.fetch(t.getFetchKey().getFetchKey(), metadata)) {
return parseWithStream(t, stream, metadata);
} catch (SecurityException e) {
LOG.error("security exception " + t.getId(), e);
throw e;
} catch (TikaException | IOException e) {
LOG.warn("fetch exception " + t.getId(), e);
write(STATUS.FETCH_EXCEPTION, ExceptionUtils.getStackTrace(e));
}
}

return null;
}

Expand Down Expand Up @@ -513,11 +528,10 @@ private void handleOOM(String taskId, OutOfMemoryError oom) {
private MetadataListAndEmbeddedBytes parseWithStream(FetchEmitTuple fetchEmitTuple,
InputStream stream, Metadata metadata)
throws TikaConfigException {

HandlerConfig handlerConfig = fetchEmitTuple.getHandlerConfig();
List<Metadata> metadataList;
//this adds the EmbeddedDocumentByteStore to the parsecontext
ParseContext parseContext = setupParseContext(fetchEmitTuple);
HandlerConfig handlerConfig = parseContext.get(HandlerConfig.class);
ParseContext parseContext = createParseContext(fetchEmitTuple);
if (handlerConfig.getParseMode() == HandlerConfig.PARSE_MODE.RMETA) {
metadataList =
parseRecursive(fetchEmitTuple, handlerConfig, stream, metadata, parseContext);
Expand All @@ -530,16 +544,10 @@ private MetadataListAndEmbeddedBytes parseWithStream(FetchEmitTuple fetchEmitTup
parseContext.get(EmbeddedDocumentBytesHandler.class));
}

private ParseContext setupParseContext(FetchEmitTuple fetchEmitTuple)
private ParseContext createParseContext(FetchEmitTuple fetchEmitTuple)
throws TikaConfigException {
ParseContext parseContext = fetchEmitTuple.getParseContext();
if (parseContext.get(HandlerConfig.class) == null) {
parseContext.set(HandlerConfig.class, HandlerConfig.DEFAULT_HANDLER_CONFIG);
}
EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig = parseContext.get(EmbeddedDocumentBytesConfig.class);
if (embeddedDocumentBytesConfig == null) {
//make sure there's one here -- or do we make this default in fetchemit tuple?
parseContext.set(EmbeddedDocumentBytesConfig.class, EmbeddedDocumentBytesConfig.SKIP);
ParseContext parseContext = new ParseContext();
if (! fetchEmitTuple.getEmbeddedDocumentBytesConfig().isExtractEmbeddedDocumentBytes()) {
return parseContext;
}
EmbeddedDocumentExtractorFactory factory = ((AutoDetectParser)autoDetectParser)
Expand All @@ -553,17 +561,18 @@ private ParseContext setupParseContext(FetchEmitTuple fetchEmitTuple)
"instance of EmbeddedDocumentByteStoreExtractorFactory if you want" +
"to extract embedded bytes! I see this embedded doc factory: " +
factory.getClass() + "and a request: " +
embeddedDocumentBytesConfig);
fetchEmitTuple.getEmbeddedDocumentBytesConfig());
}
}
//TODO: especially clean this up.
if (!StringUtils.isBlank(embeddedDocumentBytesConfig.getEmitter())) {
if (!StringUtils.isBlank(fetchEmitTuple.getEmbeddedDocumentBytesConfig().getEmitter())) {
parseContext.set(EmbeddedDocumentBytesHandler.class,
new EmittingEmbeddedDocumentBytesHandler(fetchEmitTuple, emitterManager));
new EmittingEmbeddedDocumentBytesHandler(fetchEmitTuple.getEmitKey(),
fetchEmitTuple.getEmbeddedDocumentBytesConfig(), emitterManager));
} else {
parseContext.set(EmbeddedDocumentBytesHandler.class,
new BasicEmbeddedDocumentBytesHandler(
embeddedDocumentBytesConfig));
fetchEmitTuple.getEmbeddedDocumentBytesConfig()));
}
return parseContext;
}
Expand Down Expand Up @@ -684,10 +693,11 @@ private void _preParse(FetchEmitTuple t, TikaInputStream tis, Metadata metadata,
} catch (IOException e) {
LOG.warn("problem detecting: " + t.getId(), e);
}
EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig = parseContext.get(EmbeddedDocumentBytesConfig.class);
if (embeddedDocumentBytesConfig != null &&
embeddedDocumentBytesConfig.isIncludeOriginal()) {
EmbeddedDocumentBytesHandler embeddedDocumentByteStore = parseContext.get(EmbeddedDocumentBytesHandler.class);

if (t.getEmbeddedDocumentBytesConfig() != null &&
t.getEmbeddedDocumentBytesConfig().isIncludeOriginal()) {
EmbeddedDocumentBytesHandler embeddedDocumentByteStore =
parseContext.get(EmbeddedDocumentBytesHandler.class);
try (InputStream is = Files.newInputStream(tis.getPath())) {
embeddedDocumentByteStore.add(0, metadata, is);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.tika.config.ConfigBase;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.exception.TikaException;
Expand All @@ -35,7 +38,7 @@
* This forbids multiple fetchers supporting the same name.
*/
public class FetcherManager extends ConfigBase {

private static final Logger LOG = LoggerFactory.getLogger(FetcherManager.class);
public static FetcherManager load(Path p) throws IOException, TikaConfigException {
try (InputStream is =
Files.newInputStream(p)) {
Expand All @@ -48,12 +51,12 @@ public static FetcherManager load(Path p) throws IOException, TikaConfigExceptio
public FetcherManager(List<Fetcher> fetchers) throws TikaConfigException {
for (Fetcher fetcher : fetchers) {
String name = fetcher.getName();
if (name == null || name.trim().length() == 0) {
throw new TikaConfigException("fetcher name must not be blank");
if (name == null || name.trim().isEmpty()) {
throw new TikaConfigException("Fetcher name must not be blank");
}
if (fetcherMap.containsKey(fetcher.getName())) {
throw new TikaConfigException(
"Multiple fetchers cannot support the same prefix: " + fetcher.getName());
LOG.warn("Duplicate fetcher saved in the tika-config xml: {}. Ignoring.", fetcher.getName());
continue;
}
fetcherMap.put(fetcher.getName(), fetcher);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tika.pipes.fetcher.config;

public abstract class AbstractConfig {
// Nothing to do here yet.
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,16 @@
import org.apache.tika.metadata.TikaCoreProperties;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.pipes.fetcher.AbstractFetcher;
import org.apache.tika.pipes.fetcher.fs.config.FileSystemFetcherConfig;

public class FileSystemFetcher extends AbstractFetcher implements Initializable {
public FileSystemFetcher() {
}

public FileSystemFetcher(FileSystemFetcherConfig fileSystemFetcherConfig) {
setBasePath(fileSystemFetcherConfig.getBasePath());
setExtractFileSystemMetadata(fileSystemFetcherConfig.isExtractFileSystemMetadata());
}

private static final Logger LOG = LoggerFactory.getLogger(FileSystemFetcher.class);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tika.pipes.fetcher.fs.config;

import org.apache.tika.pipes.fetcher.config.AbstractConfig;

public class FileSystemFetcherConfig extends AbstractConfig {
private String basePath;
private boolean extractFileSystemMetadata;

public String getBasePath() {
return basePath;
}

public FileSystemFetcherConfig setBasePath(String basePath) {
this.basePath = basePath;
return this;
}

public boolean isExtractFileSystemMetadata() {
return extractFileSystemMetadata;
}

public FileSystemFetcherConfig setExtractFileSystemMetadata(boolean extractFileSystemMetadata) {
this.extractFileSystemMetadata = extractFileSystemMetadata;
return this;
}
}
Loading