From 8050622b0f305286599ec0717d05769fbd3ed4fd Mon Sep 17 00:00:00 2001 From: Bartosz Taudul Date: Mon, 28 Oct 2019 23:22:50 +0100 Subject: [PATCH] Read and decompress network data on a separate thread. --- server/TracyWorker.cpp | 80 +++++++++++++++++++++++++++++++----------- server/TracyWorker.hpp | 19 ++++++++++ 2 files changed, 79 insertions(+), 20 deletions(-) diff --git a/server/TracyWorker.cpp b/server/TracyWorker.cpp index 7488b9c2..7a239e61 100644 --- a/server/TracyWorker.cpp +++ b/server/TracyWorker.cpp @@ -2282,6 +2282,42 @@ const Worker::SourceLocationZones& Worker::GetZonesForSourceLocation( int16_t sr void Worker::Network() { + auto ShouldExit = [this] { return m_shutdown.load( std::memory_order_relaxed ); }; + auto lz4buf = std::make_unique( LZ4Size ); + + for(;;) + { + { + std::unique_lock lock( m_netWriteLock ); + m_netWriteCv.wait( lock, [this] { return m_netWriteCnt > 0 || m_shutdown.load( std::memory_order_relaxed ); } ); + if( m_shutdown.load( std::memory_order_relaxed ) ) goto close; + m_netWriteCnt--; + } + + auto buf = m_buffer + m_bufferOffset; + lz4sz_t lz4sz; + if( !m_sock.Read( &lz4sz, sizeof( lz4sz ), 10, ShouldExit ) ) goto close; + if( !m_sock.Read( lz4buf.get(), lz4sz, 10, ShouldExit ) ) goto close; + m_bytes.fetch_add( sizeof( lz4sz ) + lz4sz, std::memory_order_relaxed ); + + auto sz = LZ4_decompress_safe_continue( m_stream, lz4buf.get(), buf, lz4sz, TargetFrameSize ); + assert( sz >= 0 ); + m_decBytes.fetch_add( sz, std::memory_order_relaxed ); + + { + std::lock_guard lock( m_netReadLock ); + m_netRead.push_back( NetBuffer { m_bufferOffset, sz } ); + m_netReadCv.notify_one(); + } + + m_bufferOffset += sz; + if( m_bufferOffset > TargetFrameSize * 2 ) m_bufferOffset = 0; + } + +close: + std::lock_guard lock( m_netReadLock ); + m_netRead.push_back( NetBuffer { -1 } ); + m_netReadCv.notify_one(); } void Worker::Exec() @@ -2294,13 +2330,8 @@ void Worker::Exec() if( m_sock.Connect( m_addr.c_str(), m_port ) ) break; } - auto lz4buf = std::make_unique( LZ4Size ); - std::chrono::time_point t0; - uint64_t bytes = 0; - uint64_t decBytes = 0; - m_sock.Send( HandshakeShibboleth, HandshakeShibbolethSize ); uint32_t protocolVersion = ProtocolVersion; m_sock.Send( &protocolVersion, sizeof( protocolVersion ) ); @@ -2381,6 +2412,11 @@ void Worker::Exec() LZ4_setStreamDecode( m_stream, nullptr, 0 ); m_connected.store( true, std::memory_order_relaxed ); + { + std::lock_guard lock( m_netWriteLock ); + m_netWriteCnt = 2; + m_netWriteCv.notify_one(); + } t0 = std::chrono::high_resolution_clock::now(); @@ -2392,18 +2428,17 @@ void Worker::Exec() goto close; } - auto buf = m_buffer + m_bufferOffset; - lz4sz_t lz4sz; - if( !m_sock.Read( &lz4sz, sizeof( lz4sz ), 10, ShouldExit ) ) goto close; - if( !m_sock.Read( lz4buf.get(), lz4sz, 10, ShouldExit ) ) goto close; - bytes += sizeof( lz4sz ) + lz4sz; + NetBuffer netbuf; + { + std::unique_lock lock( m_netReadLock ); + m_netReadCv.wait( lock, [this] { return !m_netRead.empty(); } ); + netbuf = m_netRead.front(); + m_netRead.erase( m_netRead.begin() ); + } + if( netbuf.bufferOffset < 0 ) goto close; - auto sz = LZ4_decompress_safe_continue( m_stream, lz4buf.get(), buf, lz4sz, TargetFrameSize ); - assert( sz >= 0 ); - decBytes += sz; - - char* ptr = buf; - const char* end = buf + sz; + char* ptr = m_buffer + netbuf.bufferOffset; + const char* end = ptr + netbuf.size; { std::lock_guard lock( m_data.lock ); @@ -2417,8 +2452,11 @@ void Worker::Exec() } } - m_bufferOffset += sz; - if( m_bufferOffset > TargetFrameSize * 2 ) m_bufferOffset = 0; + { + std::lock_guard lock( m_netWriteLock ); + m_netWriteCnt++; + m_netWriteCv.notify_one(); + } HandlePostponedPlots(); @@ -2436,6 +2474,8 @@ void Worker::Exec() enum { MbpsUpdateTime = 200 }; if( td > MbpsUpdateTime ) { + const auto bytes = m_bytes.exchange( 0, std::memory_order_relaxed ); + const auto decBytes = m_decBytes.exchange( 0, std::memory_order_relaxed ); std::lock_guard lock( m_mbpsData.lock ); m_mbpsData.mbps.erase( m_mbpsData.mbps.begin() ); m_mbpsData.mbps.emplace_back( bytes / ( td * 125.f ) ); @@ -2443,8 +2483,6 @@ void Worker::Exec() m_mbpsData.queue = m_serverQueryQueue.size(); m_mbpsData.transferred += bytes; t0 = t1; - bytes = 0; - decBytes = 0; } if( m_terminate ) @@ -2474,6 +2512,8 @@ void Worker::Exec() } close: + Shutdown(); + m_netWriteCv.notify_one(); m_sock.Close(); m_connected.store( false, std::memory_order_relaxed ); } diff --git a/server/TracyWorker.hpp b/server/TracyWorker.hpp index 3ba4ca67..e3545e85 100644 --- a/server/TracyWorker.hpp +++ b/server/TracyWorker.hpp @@ -2,7 +2,9 @@ #define __TRACYWORKER_HPP__ #include +#include #include +#include #include #include #include @@ -653,6 +655,23 @@ private: int64_t m_refTimeSerial = 0; int64_t m_refTimeCtx = 0; int64_t m_refTimeGpu = 0; + + std::atomic m_bytes { 0 }; + std::atomic m_decBytes { 0 }; + + struct NetBuffer + { + int bufferOffset; + int size; + }; + + std::vector m_netRead; + std::mutex m_netReadLock; + std::condition_variable m_netReadCv; + + int m_netWriteCnt = 0; + std::mutex m_netWriteLock; + std::condition_variable m_netWriteCv; }; }