diff --git a/aslam_offline_calibration/kalibr/python/kalibr_common/TargetExtractor.py b/aslam_offline_calibration/kalibr/python/kalibr_common/TargetExtractor.py index 40f49c070..d6bc57cdd 100644 --- a/aslam_offline_calibration/kalibr/python/kalibr_common/TargetExtractor.py +++ b/aslam_offline_calibration/kalibr/python/kalibr_common/TargetExtractor.py @@ -1,7 +1,6 @@ import sm - +import os import numpy as np -import sys import multiprocessing try: import queue @@ -11,80 +10,65 @@ import copy import cv2 -def multicoreExtractionWrapper(detector, taskq, resultq, clearImages, noTransformation): - while 1: - try: - task = taskq.get_nowait() - except queue.Empty: - return - idx = task[0] - stamp = task[1] - image = task[2] - - if noTransformation: - success, obs = detector.findTargetNoTransformation(stamp, np.array(image)) - else: - success, obs = detector.findTarget(stamp, np.array(image)) - - if clearImages: - obs.clearImage() - if success: - resultq.put( (obs, idx) ) +def multicoreExtractionWrapper(args): + # Unpack arguments + detector, timestamp, image, clearImages, noTransformation = args + + # Perform detection + np_image = np.array(image) + if noTransformation: + success, obs = detector.findTargetNoTransformation(timestamp, np_image) + else: + success, obs = detector.findTarget(timestamp, np_image) + + if clearImages: + obs.clearImage() + + return obs if success else None def extractCornersFromDataset(dataset, detector, multithreading=False, numProcesses=None, clearImages=True, noTransformation=False): - print("Extracting calibration target corners") + print("Extracting calibration target corners") targetObservations = [] numImages = dataset.numImages() - + # prepare progess bar iProgress = sm.Progress2(numImages) iProgress.sample() - - if multithreading: + + if multithreading: if not numProcesses: - numProcesses = max(1,multiprocessing.cpu_count()-1) - try: - manager = multiprocessing.Manager() - resultq = manager.Queue() - manager2 = multiprocessing.Manager() - taskq = manager2.Queue() - - for idx, (timestamp, image) in enumerate(dataset.readDataset()): - taskq.put( (idx, timestamp, image) ) - - plist=list() - for pidx in range(0, numProcesses): - detector_copy = copy.copy(detector) - p = multiprocessing.Process(target=multicoreExtractionWrapper, args=(detector_copy, taskq, resultq, clearImages, noTransformation, )) - p.start() - plist.append(p) - - #wait for results - last_done=0 - while 1: - if all([not p.is_alive() for p in plist]): - time.sleep(0.1) - break - done = numImages-taskq.qsize() - sys.stdout.flush() - if (done-last_done) > 0: - iProgress.sample(done-last_done) - last_done = done - time.sleep(0.5) - resultq.put('STOP') - except Exception as e: - raise RuntimeError("Exception during multithreaded extraction: {0}".format(e)) - - #get result sorted by time (=idx) - if resultq.qsize() > 1: - targetObservations = [[]]*(resultq.qsize()-1) - for lidx, data in enumerate(iter(resultq.get, 'STOP')): - obs=data[0]; time_idx = data[1] - targetObservations[lidx] = (time_idx, obs) - targetObservations = list(zip(*sorted(targetObservations, key=lambda tup: tup[0])))[1] - else: - targetObservations=[] - + # Get available CPU count. Prefer `os.sched_getaffinity` if it's available as that + # works when the CPUs avaialble to the process are restricted: + # https://stackoverflow.com/a/55423170 + if "sched_getaffinity" in dir(os): + numProcesses = len(os.sched_getaffinity(0)) + else: + numProcesses = multiprocessing.cpu_count() + + # Leave one CPU core free + numProcesses = max(1, numProcesses - 1) + + # Create a pool of worker processes + with multiprocessing.Pool(processes=numProcesses) as pool: + # Build a generator of tasks to avoid loading everything in memory at once + tasks = ( + (copy.copy(detector), ts, img, clearImages, noTransformation) + for ts, img in dataset.readDataset() + ) + + # Use imap to lazily (one by one) exccute the tasks in the process pool + results_iter = pool.imap(multicoreExtractionWrapper, tasks) + + # Get results as they finish. imap returns results in the order the tasks + # are submitted. That's the same order as the timestamp + targetObservations = [] + done_count = 0 + for obs in results_iter: + if obs is not None: + targetObservations.append(obs) + done_count += 1 + iProgress.sample() + #single threaded implementation else: for timestamp, image in dataset.readDataset(): @@ -101,10 +85,10 @@ def extractCornersFromDataset(dataset, detector, multithreading=False, numProces if len(targetObservations) == 0: print("\r") sm.logFatal("No corners could be extracted for camera {0}! Check the calibration target configuration and dataset.".format(dataset.topic)) - else: + else: print("\r Extracted corners for %d images (of %d images) " % (len(targetObservations), numImages)) #close all opencv windows that might be open cv2.destroyAllWindows() - + return targetObservations