Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/cluster-it-1c1d1a.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,5 @@ jobs:
uses: actions/upload-artifact@v4
with:
name: cluster-log-ainode-${{ matrix.os }}
path: integration-test/target/ainode-logs
path: integration-test/target/*-logs
retention-days: 30
2 changes: 1 addition & 1 deletion integration-test/src/assembly/mpp-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
</fileSet>
<fileSet>
<outputDirectory>lib</outputDirectory>
<directory>${project.basedir}/../iotdb-core/ainode/dist/</directory>
<directory>${project.basedir}/../iotdb-core/ainode/dist/ainode/</directory>
<fileMode>0755</fileMode>
</fileSet>
</fileSets>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import org.apache.tsfile.external.commons.io.file.PathUtils;
import org.slf4j.Logger;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
Expand All @@ -37,6 +37,7 @@
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.stream.Stream;

import static org.apache.iotdb.it.env.cluster.ClusterConstant.AI_NODE_NAME;
Expand All @@ -62,15 +63,19 @@ public class AINodeWrapper extends AbstractNodeWrapper {
public static final String CACHE_BUILT_IN_MODEL_PATH = "/data/ainode/models/weights";

private void replaceAttribute(String[] keys, String[] values, String filePath) {
try (BufferedWriter writer = new BufferedWriter(new FileWriter(filePath, true))) {
for (int i = 0; i < keys.length; i++) {
String line = keys[i] + "=" + values[i];
writer.newLine();
writer.write(line);
}
Properties props = new Properties();
try (FileInputStream in = new FileInputStream(filePath)) {
props.load(in);
} catch (IOException e) {
logger.warn("Failed to load existing AINode properties from {}, because: ", filePath, e);
}
for (int i = 0; i < keys.length; i++) {
props.setProperty(keys[i], values[i]);
}
try (FileOutputStream out = new FileOutputStream(filePath)) {
props.store(out, "Updated by AINode integration-test env");
} catch (IOException e) {
logger.error(
"Failed to set attribute for AINode in file: {} because {}", filePath, e.getMessage());
logger.error("Failed to save properties to {}, because:", filePath, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,33 +90,6 @@ private static void prepareDataForTableModel() throws SQLException {
}
}

// @Test
public void concurrentCPUCallInferenceTest() throws SQLException, InterruptedException {
concurrentCPUCallInferenceTest("timer_xl");
concurrentCPUCallInferenceTest("sundial");
}

private void concurrentCPUCallInferenceTest(String modelId)
throws SQLException, InterruptedException {
try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TREE_SQL_DIALECT);
Statement statement = connection.createStatement()) {
final int threadCnt = 4;
final int loop = 10;
final int predictLength = 96;
statement.execute(String.format("LOAD MODEL %s TO DEVICES 'cpu'", modelId));
checkModelOnSpecifiedDevice(statement, modelId, "cpu");
concurrentInference(
statement,
String.format(
"CALL INFERENCE(%s, 'SELECT s FROM root.AI', predict_length=%d)",
modelId, predictLength),
threadCnt,
loop,
predictLength);
statement.execute(String.format("UNLOAD MODEL %s FROM DEVICES 'cpu'", modelId));
}
}

// @Test
public void concurrentGPUCallInferenceTest() throws SQLException, InterruptedException {
concurrentGPUCallInferenceTest("timer_xl");
Expand Down Expand Up @@ -150,39 +123,6 @@ private void concurrentGPUCallInferenceTest(String modelId)
String forecastUDTFSql =
"SELECT forecast(s, 'MODEL_ID'='%s', 'PREDICT_LENGTH'='%d') FROM root.AI";

@Test
public void concurrentCPUForecastTest() throws SQLException, InterruptedException {
concurrentCPUForecastTest("timer_xl", forecastUDTFSql);
concurrentCPUForecastTest("sundial", forecastUDTFSql);
concurrentCPUForecastTest("timer_xl", forecastTableFunctionSql);
concurrentCPUForecastTest("sundial", forecastTableFunctionSql);
}

private void concurrentCPUForecastTest(String modelId, String selectSQL)
throws SQLException, InterruptedException {
try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
Statement statement = connection.createStatement()) {
final int threadCnt = 4;
final int loop = 10;
final int predictLength = 96;
statement.execute(String.format("LOAD MODEL %s TO DEVICES 'cpu'", modelId));
checkModelOnSpecifiedDevice(statement, modelId, "cpu");
long startTime = System.currentTimeMillis();
concurrentInference(
statement,
String.format(selectSQL, modelId, predictLength),
threadCnt,
loop,
predictLength);
long endTime = System.currentTimeMillis();
LOGGER.info(
String.format(
"Model %s concurrent inference %d reqs (%d threads, %d loops) in CPU takes time: %dms",
modelId, threadCnt * loop, threadCnt, loop, endTime - startTime));
statement.execute(String.format("UNLOAD MODEL %s FROM DEVICES 'cpu'", modelId));
}
}

@Test
public void concurrentGPUForecastTest() throws SQLException, InterruptedException {
concurrentGPUForecastTest("timer_xl", forecastUDTFSql);
Expand Down Expand Up @@ -221,7 +161,7 @@ private void checkModelOnSpecifiedDevice(Statement statement, String modelId, St
throws SQLException, InterruptedException {
Set<String> targetDevices = ImmutableSet.copyOf(device.split(","));
LOGGER.info("Checking model: {} on target devices: {}", modelId, targetDevices);
for (int retry = 0; retry < 20; retry++) {
for (int retry = 0; retry < 200; retry++) {
Set<String> foundDevices = new HashSet<>();
try (final ResultSet resultSet =
statement.executeQuery(String.format("SHOW LOADED MODELS '%s'", device))) {
Expand Down
6 changes: 2 additions & 4 deletions iotdb-core/ainode/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
# generated by maven
/iotdb/ainode/conf/

# .whl of ainode, generated by Poetry
# generated by pyinstaller
/dist/

# the config to build ainode, it will be generated automatically
pyproject.toml
/build/
199 changes: 199 additions & 0 deletions iotdb-core/ainode/ainode.spec
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
# -*- mode: python ; coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under this License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

from pathlib import Path

# Get project root directory
project_root = Path(SPECPATH).parent

block_cipher = None

# Auto-collect all submodules of large dependency libraries
# Using collect_all automatically includes all dependencies and avoids manual maintenance of hiddenimports
from PyInstaller.utils.hooks import collect_all, collect_submodules, collect_data_files

# Collect only essential data files and binaries for large libraries
# Using collect_all for all submodules slows down startup significantly.
# However, for certain libraries with many dynamic imports (e.g., torch, transformers, safetensors),
# collect_all is necessary to ensure all required modules are included.
# For other libraries, we use lighter-weight collection methods to improve startup time.
all_datas = []
all_binaries = []
all_hiddenimports = []

# Only collect essential data files and binaries for critical libraries
# This reduces startup time by avoiding unnecessary module imports
essential_libraries = {
'torch': True, # Keep collect_all for torch as it has many dynamic imports
'transformers': True, # Keep collect_all for transformers
'safetensors': True, # Keep collect_all for safetensors
}

# For other libraries, use selective collection to speed up startup
other_libraries = ['sktime', 'scipy', 'pandas', 'sklearn', 'statsmodels', 'optuna']

for lib in essential_libraries:
try:
lib_datas, lib_binaries, lib_hiddenimports = collect_all(lib)
all_datas.extend(lib_datas)
all_binaries.extend(lib_binaries)
all_hiddenimports.extend(lib_hiddenimports)
except Exception:
pass

# For other libraries, only collect submodules (lighter weight)
# This relies on PyInstaller's dependency analysis to include what's actually used
for lib in other_libraries:
try:
submodules = collect_submodules(lib)
all_hiddenimports.extend(submodules)
# Only collect essential data files and binaries, not all submodules
# This significantly reduces startup time
try:
lib_datas, lib_binaries, _ = collect_all(lib)
all_datas.extend(lib_datas)
all_binaries.extend(lib_binaries)
except Exception:
# If collect_all fails, try collect_data_files for essential data only
try:
lib_datas = collect_data_files(lib)
all_datas.extend(lib_datas)
except Exception:
pass
except Exception:
pass

# Project-specific packages that need their submodules collected
# Only list top-level packages - collect_submodules will recursively collect all submodules
TOP_LEVEL_PACKAGES = [
'iotdb.ainode.core', # This will include all sub-packages: manager, model, inference, etc.
'iotdb.thrift', # This will include all thrift sub-packages
]

# Collect all submodules for project packages automatically
# Using top-level packages avoids duplicate collection
for package in TOP_LEVEL_PACKAGES:
try:
submodules = collect_submodules(package)
all_hiddenimports.extend(submodules)
except Exception:
# If package doesn't exist or collection fails, add the package itself
all_hiddenimports.append(package)

# Add parent packages to ensure they are included
all_hiddenimports.extend(['iotdb', 'iotdb.ainode'])

# Multiprocessing support for PyInstaller
# When using multiprocessing with PyInstaller, we need to ensure proper handling
multiprocessing_modules = [
'multiprocessing',
'multiprocessing.spawn',
'multiprocessing.popen_spawn_posix',
'multiprocessing.popen_spawn_win32',
'multiprocessing.popen_fork',
'multiprocessing.popen_forkserver',
'multiprocessing.context',
'multiprocessing.reduction',
'multiprocessing.util',
'torch.multiprocessing',
'torch.multiprocessing.spawn',
]

# Additional dependencies that may need explicit import
# These are external libraries that might use dynamic imports
external_dependencies = [
'huggingface_hub',
'tokenizers',
'hf_xet',
'einops',
'dynaconf',
'tzlocal',
'thrift',
'psutil',
'requests',
]

all_hiddenimports.extend(multiprocessing_modules)
all_hiddenimports.extend(external_dependencies)

# Analyze main entry file
# Note: Do NOT add virtual environment site-packages to pathex manually.
# When PyInstaller is run from the virtual environment's Python, it automatically
# detects and uses the virtual environment's site-packages.
a = Analysis(
['iotdb/ainode/core/script.py'],
pathex=[str(project_root)],
binaries=all_binaries,
datas=all_datas,
hiddenimports=all_hiddenimports,
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[
# Exclude unnecessary modules to reduce size and improve startup time
# Note: Do not exclude unittest, as torch and other libraries require it
# Only exclude modules that are definitely not used and not required by dependencies
'matplotlib',
'IPython',
'jupyter',
'notebook',
'pytest',
'test',
'tests'
],
win_no_prefer_redirects=False,
win_private_assemblies=False,
cipher=block_cipher,
noarchive=True, # Set to True to speed up startup - files are not archived into PYZ
)

# Package all PYZ files
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)

# Create executable (onedir mode for faster startup)
exe = EXE(
pyz,
a.scripts,
[],
exclude_binaries=True,
name='ainode',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
console=True,
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
)

# Collect all files into a directory (onedir mode)
coll = COLLECT(
exe,
a.binaries,
a.zipfiles,
a.datas,
strip=False,
upx=True,
upx_exclude=[],
name='ainode',
)
17 changes: 5 additions & 12 deletions iotdb-core/ainode/ainode.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
</file>
</files>
<fileSets>
<fileSet>
<directory>iotdb/ainode/conf</directory>
<outputDirectory>conf</outputDirectory>
</fileSet>
<fileSet>
<directory>resources/conf</directory>
<outputDirectory>conf</outputDirectory>
Expand All @@ -52,19 +56,8 @@
<fileMode>0755</fileMode>
</fileSet>
<fileSet>
<directory>dist</directory>
<directory>dist/ainode</directory>
<outputDirectory>lib</outputDirectory>
<includes>
<include>*.whl</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.basedir}/../../scripts/conf</directory>
<outputDirectory>conf</outputDirectory>
<includes>
<include>ainode-env.*</include>
<include>**/ainode-env.*</include>
</includes>
<fileMode>0755</fileMode>
</fileSet>
<fileSet>
Expand Down
Loading
Loading