|
26 | 26 | use Metadata\MetadataFactoryInterface;
|
27 | 27 | use Nelmio\ApiDocBundle\Annotation\Model;
|
28 | 28 | use OpenApi\Attributes as OA;
|
| 29 | +use Psr\Log\LoggerInterface; |
29 | 30 | use Symfony\Component\DependencyInjection\Attribute\Autowire;
|
30 | 31 | use Symfony\Component\ExpressionLanguage\Expression;
|
31 | 32 | use Symfony\Component\HttpKernel\Attribute\MapQueryParameter;
|
@@ -63,6 +64,7 @@ public function __construct(
|
63 | 64 | ConfigurationService $config,
|
64 | 65 | EventLogService $eventLogService,
|
65 | 66 | protected readonly ImportExportService $importExportService,
|
| 67 | + protected readonly LoggerInterface $logger, |
66 | 68 | protected readonly AssetUpdateService $assetUpdater
|
67 | 69 | ) {
|
68 | 70 | parent::__construct($entityManager, $dj, $config, $eventLogService);
|
@@ -651,7 +653,7 @@ public function getEventFeedAction(
|
651 | 653 | $response->headers->set('Content-Type', 'application/x-ndjson');
|
652 | 654 | $response->setCallback(function () use ($format, $cid, $contest, $request, $since_id, $types, $strict, $stream, $metadataFactory, $kernel) {
|
653 | 655 | $lastUpdate = 0;
|
654 |
| - $lastIdSent = $since_id; |
| 656 | + $lastIdSent = max(0, $since_id); // Don't try to look for event_id=0 |
655 | 657 | $typeFilter = false;
|
656 | 658 | if ($types) {
|
657 | 659 | $typeFilter = explode(',', $types);
|
@@ -714,39 +716,83 @@ public function getEventFeedAction(
|
714 | 716 | // Reload the contest as the above method will clear the entity manager.
|
715 | 717 | $contest = $this->getContestWithId($request, $cid);
|
716 | 718 |
|
| 719 | + $missingEventRetries = 0; |
717 | 720 | while (true) {
|
718 | 721 | // Add missing state events that should have happened already.
|
719 | 722 | $this->eventLogService->addMissingStateEvents($contest);
|
720 | 723 |
|
721 |
| - $qb = $this->em->createQueryBuilder() |
| 724 | + // We fetch *all* events after the last seen to check that |
| 725 | + // we don't skip events that are committed out of order. |
| 726 | + $q = $this->em->createQueryBuilder() |
722 | 727 | ->from(Event::class, 'e')
|
723 | 728 | ->select('e')
|
724 | 729 | ->andWhere('e.eventid > :lastIdSent')
|
725 | 730 | ->setParameter('lastIdSent', $lastIdSent)
|
726 |
| - ->andWhere('e.contest = :cid') |
727 |
| - ->setParameter('cid', $contest->getCid()) |
728 |
| - ->orderBy('e.eventid', 'ASC'); |
729 |
| - |
730 |
| - if ($typeFilter !== false) { |
731 |
| - $qb = $qb |
732 |
| - ->andWhere('e.endpointtype IN (:types)') |
733 |
| - ->setParameter('types', $typeFilter); |
734 |
| - } |
735 |
| - if (!$canViewAll) { |
736 |
| - $restricted_types = ['judgements', 'runs', 'clarifications']; |
737 |
| - if ($contest->getStarttime() === null || Utils::now() < $contest->getStarttime()) { |
738 |
| - $restricted_types[] = 'problems'; |
| 731 | + ->orderBy('e.eventid', 'ASC') |
| 732 | + ->getQuery(); |
| 733 | + |
| 734 | + /** @var Event[] $events */ |
| 735 | + $events = $q->getResult(); |
| 736 | + |
| 737 | + // Look for any missing sequential events and wait for them to |
| 738 | + // be committed if so. |
| 739 | + $missingEvents = false; |
| 740 | + $expectedId = $lastIdSent + 1; |
| 741 | + $lastFoundId = null; |
| 742 | + foreach ($events as $event) { |
| 743 | + if ($event->getEventid() !== $expectedId) { |
| 744 | + $missingEvents = true; |
| 745 | + $lastFoundId = $event->getEventid(); |
| 746 | + break; |
739 | 747 | }
|
740 |
| - $qb = $qb |
741 |
| - ->andWhere('e.endpointtype NOT IN (:restricted_types)') |
742 |
| - ->setParameter('restricted_types', $restricted_types); |
| 748 | + $expectedId++; |
743 | 749 | }
|
| 750 | + if ($missingEvents) { |
| 751 | + if ($missingEventRetries == 0) { |
| 752 | + $this->logger->info( |
| 753 | + 'Detected missing events %d ... %d, waiting for these to appear', |
| 754 | + [$expectedId, $lastFoundId-1] |
| 755 | + ); |
| 756 | + } |
| 757 | + if (++$missingEventRetries < 10) { |
| 758 | + usleep(100 * 1000); |
| 759 | + continue; |
| 760 | + } |
744 | 761 |
|
745 |
| - $q = $qb->getQuery(); |
| 762 | + // We've decided to permanently ignore these non-existing |
| 763 | + // events for this connection. The wait for any |
| 764 | + // non-committed events was long enough. |
| 765 | + // |
| 766 | + // There might be multiple non-existing events. Log the |
| 767 | + // first consecutive gap of non-existing events. A consecutive |
| 768 | + // gap is guaranteed since the events are ordered. |
| 769 | + $this->logger->warning( |
| 770 | + 'Waited too long for missing events %d ... %d, skipping', |
| 771 | + [$expectedId, $lastFoundId-1] |
| 772 | + ); |
| 773 | + } |
| 774 | + $missingEventRetries = 0; |
746 | 775 |
|
747 |
| - /** @var Event[] $events */ |
748 |
| - $events = $q->getResult(); |
| 776 | + $numEventsSent = 0; |
749 | 777 | foreach ($events as $event) {
|
| 778 | + // Filter out unwanted events |
| 779 | + if ($event->getContest()->getCid() !== $contest->getCid()) { |
| 780 | + continue; |
| 781 | + } |
| 782 | + if ($typeFilter !== false && |
| 783 | + !in_array($event->getEndpointtype(), $typeFilter)) { |
| 784 | + continue; |
| 785 | + } |
| 786 | + if (!$canViewAll) { |
| 787 | + $restricted_types = ['judgements', 'runs', 'clarifications']; |
| 788 | + if ($contest->getStarttime() === null || Utils::now() < $contest->getStarttime()) { |
| 789 | + $restricted_types[] = 'problems'; |
| 790 | + } |
| 791 | + if (in_array($event->getEndpointtype(), $restricted_types)) { |
| 792 | + continue; |
| 793 | + } |
| 794 | + } |
| 795 | + |
750 | 796 | $data = $event->getContent();
|
751 | 797 | // Filter fields with specific access restrictions.
|
752 | 798 | if (!$canViewAll) {
|
@@ -814,9 +860,17 @@ public function getEventFeedAction(
|
814 | 860 | flush();
|
815 | 861 | $lastUpdate = Utils::now();
|
816 | 862 | $lastIdSent = $event->getEventid();
|
| 863 | + $numEventsSent++; |
| 864 | + |
| 865 | + if ($missingEvents && $event->getEventid() >= $lastFoundId) { |
| 866 | + // The first event after the first gap has been emitted. Stop |
| 867 | + // emitting events and restart the gap detection logic to find |
| 868 | + // any potential gaps after this last emitted event. |
| 869 | + break; |
| 870 | + } |
817 | 871 | }
|
818 | 872 |
|
819 |
| - if (count($events) == 0) { |
| 873 | + if ($numEventsSent == 0) { |
820 | 874 | if (!$stream) {
|
821 | 875 | break;
|
822 | 876 | }
|
|
0 commit comments