|
86 | 86 | import com.here.xyz.models.geojson.implementation.FeatureCollection; |
87 | 87 | import com.here.xyz.models.geojson.implementation.FeatureCollection.ModificationFailure; |
88 | 88 | import com.here.xyz.models.geojson.implementation.XyzNamespace; |
| 89 | +import com.here.xyz.models.hub.Space.Extension; |
89 | 90 | import com.here.xyz.responses.BinaryResponse; |
90 | 91 | import com.here.xyz.responses.CountResponse; |
91 | 92 | import com.here.xyz.responses.ErrorResponse; |
|
98 | 99 | import com.here.xyz.responses.SuccessResponse; |
99 | 100 | import com.here.xyz.responses.XyzResponse; |
100 | 101 | import io.vertx.core.AsyncResult; |
| 102 | +import io.vertx.core.CompositeFuture; |
101 | 103 | import io.vertx.core.Future; |
102 | 104 | import io.vertx.core.Handler; |
103 | 105 | import io.vertx.core.MultiMap; |
| 106 | +import io.vertx.core.Promise; |
104 | 107 | import io.vertx.core.buffer.Buffer; |
105 | 108 | import io.vertx.core.http.Cookie; |
106 | 109 | import io.vertx.core.json.DecodeException; |
@@ -746,65 +749,103 @@ private static EventNotification createNotification(NotificationContext nc, Payl |
746 | 749 | * Resolves the space, its storage and its listeners. |
747 | 750 | */ |
748 | 751 | static <X extends FeatureTask> void resolveSpace(final X task, final Callback<X> callback) { |
| 752 | + try { |
| 753 | + resolveSpace(task) |
| 754 | + .compose(space -> CompositeFuture.all( |
| 755 | + resolveStorageConnector(task), |
| 756 | + resolveListenersAndProcessors(task), |
| 757 | + resolveExtendedSpaces(task, space) |
| 758 | + )) |
| 759 | + .onFailure(t -> callback.exception(t)) |
| 760 | + .onSuccess(connector -> callback.call(task)); |
| 761 | + } |
| 762 | + catch (Exception e) { |
| 763 | + callback.exception(new HttpException(INTERNAL_SERVER_ERROR, "Unable to load the resource definition.", e)); |
| 764 | + } |
| 765 | + } |
| 766 | + |
| 767 | + private static <X extends FeatureTask> Future<Space> resolveSpace(final X task) { |
749 | 768 | try { |
750 | 769 | //FIXME: Can be removed once the Space events are handled by the SpaceTaskHandler (refactoring pending ...) |
751 | | - if (task.space != null) { //If the space is already given we don't need to retrieve it |
752 | | - onSpaceResolved(task, callback); |
753 | | - return; |
754 | | - } |
| 770 | + if (task.space != null) //If the space is already given we don't need to retrieve it |
| 771 | + return Future.succeededFuture(task.space); |
755 | 772 |
|
756 | 773 | //Load the space definition. |
757 | | - Service.spaceConfigClient.get(task.getMarker(), task.getEvent().getSpace()) |
758 | | - .onFailure(t -> { |
759 | | - logger.warn(task.getMarker(), "Unable to load the space definition for space '{}' {}", task.getEvent().getSpace(), t); |
760 | | - callback.exception(new HttpException(INTERNAL_SERVER_ERROR, "Unable to load the resource definition", t)); |
761 | | - }) |
762 | | - .onSuccess(space -> { |
763 | | - task.space = space; |
764 | | - if (task.space != null) |
765 | | - task.getEvent().setParams(task.space.getStorage().getParams()); |
766 | | - onSpaceResolved(task, callback); |
767 | | - }); |
| 774 | + return Space.resolveSpace(task.getMarker(), task.getEvent().getSpace()) |
| 775 | + .compose( |
| 776 | + space -> { |
| 777 | + task.space = space; |
| 778 | + if (space != null) |
| 779 | + task.getEvent().setParams(space.getStorage().getParams()); |
| 780 | + return Future.succeededFuture(space); |
| 781 | + }, |
| 782 | + t -> { |
| 783 | + logger.warn(task.getMarker(), "Unable to load the space definition for space '{}' {}", task.getEvent().getSpace(), t); |
| 784 | + return Future.failedFuture(new HttpException(INTERNAL_SERVER_ERROR, "Unable to load the resource definition", t)); |
| 785 | + } |
| 786 | + ); |
768 | 787 | } |
769 | 788 | catch (Exception e) { |
770 | | - callback.exception(new HttpException(INTERNAL_SERVER_ERROR, "Unable to load the resource definition.", e)); |
| 789 | + return Future.failedFuture(new HttpException(INTERNAL_SERVER_ERROR, "Unable to load the resource definition.", e)); |
771 | 790 | } |
772 | 791 | } |
773 | 792 |
|
774 | | - private static <X extends FeatureTask> void onSpaceResolved(final X task, final Callback<X> callback) { |
775 | | - if (task.space == null) { |
776 | | - callback.exception(new HttpException(NOT_FOUND, "The resource with this ID does not exist.")); |
777 | | - return; |
778 | | - } |
779 | | - logger.debug(task.getMarker(), "Given space configuration is: {}", Json.encode(task.space)); |
| 793 | + private static <X extends FeatureTask> Future<Space> resolveExtendedSpaces(X task, Space extendingSpace) { |
| 794 | + if (extendingSpace == null) |
| 795 | + return Future.succeededFuture(); |
| 796 | + return resolveExtendedSpace(task, extendingSpace.getExtension()); |
| 797 | + } |
780 | 798 |
|
781 | | - final String storageId = task.space.getStorage().getId(); |
782 | | - AbstractHttpServerVerticle.addStreamInfo(task.context, "SID", storageId); |
783 | | - Space.resolveConnector(task.getMarker(), storageId, (arStorage) -> { |
784 | | - if (arStorage.failed()) { |
785 | | - callback.exception(new InvalidStorageException("Unable to load the definition for this storage.")); |
786 | | - return; |
787 | | - } |
788 | | - task.storage = arStorage.result(); |
789 | | - onStorageResolved(task, callback); |
790 | | - }); |
| 799 | + private static <X extends FeatureTask> Future<Space> resolveExtendedSpace(X task, Extension spaceExtension) { |
| 800 | + if (spaceExtension == null) |
| 801 | + return Future.succeededFuture(); |
| 802 | + return Space.resolveSpace(task.getMarker(), spaceExtension.getSpaceId()) |
| 803 | + .compose( |
| 804 | + extendedSpace -> { |
| 805 | + if (task.extendedSpaces == null) |
| 806 | + task.extendedSpaces = new ArrayList(); |
| 807 | + task.extendedSpaces.add(extendedSpace); |
| 808 | + return resolveExtendedSpace(task, extendedSpace.getExtension()); //Go to next extension level |
| 809 | + }, |
| 810 | + t -> Future.failedFuture(t) |
| 811 | + ); |
791 | 812 | } |
792 | 813 |
|
793 | | - private static <X extends FeatureTask> void onStorageResolved(final X task, final Callback<X> callback) { |
| 814 | + private static <X extends FeatureTask> Future<Connector> resolveStorageConnector(final X task) { |
| 815 | + if (task.space == null) |
| 816 | + return Future.failedFuture(new HttpException(NOT_FOUND, "The resource with this ID does not exist.")); |
| 817 | + |
| 818 | + logger.debug(task.getMarker(), "Given space configuration is: {}", task.space); |
| 819 | + |
| 820 | + final String storageId = task.space.getStorage().getId(); |
| 821 | + AbstractHttpServerVerticle.addStreamInfo(task.context, "SID", storageId); |
| 822 | + return Space.resolveConnector(task.getMarker(), storageId) |
| 823 | + .compose( |
| 824 | + connector -> { |
| 825 | + task.storage = connector; |
| 826 | + return Future.succeededFuture(connector); |
| 827 | + }, |
| 828 | + t -> Future.failedFuture(new InvalidStorageException("Unable to load the definition for this storage.")) |
| 829 | + ); |
| 830 | + } |
| 831 | + |
| 832 | + private static <X extends FeatureTask> Future<Void> resolveListenersAndProcessors(final X task) { |
| 833 | + Promise<Void> p = Promise.promise(); |
794 | 834 | try { |
795 | 835 | //Also resolve all listeners & processors |
796 | 836 | CompletableFuture.allOf( |
797 | 837 | resolveConnectors(task.getMarker(), task.space, ConnectorType.LISTENER), |
798 | 838 | resolveConnectors(task.getMarker(), task.space, ConnectorType.PROCESSOR) |
799 | 839 | ).thenRun(() -> { |
800 | 840 | //All listener & processor refs have been resolved now |
801 | | - callback.call(task); |
| 841 | + p.complete(); |
802 | 842 | }); |
803 | 843 | } |
804 | 844 | catch (Exception e) { |
805 | 845 | logger.error(task.getMarker(), "The listeners for this space cannot be initialized", e); |
806 | | - callback.exception(new HttpException(INTERNAL_SERVER_ERROR, "The listeners for this space cannot be initialized")); |
| 846 | + p.fail(new HttpException(INTERNAL_SERVER_ERROR, "The listeners for this space cannot be initialized")); |
807 | 847 | } |
| 848 | + return p.future(); |
808 | 849 | } |
809 | 850 |
|
810 | 851 | private static CompletableFuture<Void> resolveConnectors(Marker marker, final Space space, final ConnectorType connectorType) { |
|
0 commit comments