diff --git a/client/tracy_concurrentqueue.h b/client/tracy_concurrentqueue.h index f1f373d7..dfd4fc31 100644 --- a/client/tracy_concurrentqueue.h +++ b/client/tracy_concurrentqueue.h @@ -836,7 +836,55 @@ public: return count; } - + template + size_t try_dequeue_bulk_single(consumer_token_t& token, It itemFirst, size_t max, uint64_t& threadId ) + { + if (token.desiredProducer == nullptr || token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) { + if (!update_current_producer_after_rotation(token)) { + return 0; + } + } + + size_t count = static_cast(token.currentProducer)->dequeue_bulk(itemFirst, max); + if (count == max) { + if ((token.itemsConsumedFromCurrent += static_cast(max)) >= EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE) { + globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed); + } + threadId = token.currentProducer->threadId; + return max; + } + token.itemsConsumedFromCurrent += static_cast(count); + + auto tail = producerListTail.load(std::memory_order_acquire); + auto ptr = static_cast(token.currentProducer)->next_prod(); + if (ptr == nullptr) { + ptr = tail; + } + if( count == 0 ) + { + while (ptr != static_cast(token.currentProducer)) { + auto dequeued = ptr->dequeue_bulk(itemFirst, max); + if (dequeued != 0) { + threadId = ptr->threadId; + token.currentProducer = ptr; + token.itemsConsumedFromCurrent = static_cast(dequeued); + return dequeued; + } + ptr = ptr->next_prod(); + if (ptr == nullptr) { + ptr = tail; + } + } + return 0; + } + else + { + threadId = token.currentProducer->threadId; + token.currentProducer = ptr; + token.itemsConsumedFromCurrent = 0; + return count; + } + } // Attempts to dequeue from a specific producer's inner queue. // If you happen to know which producer you want to dequeue from, this