Metric refresh no longer uses QtConcurrent::map()

.. since Qt5.15 QtConcurrent::map() will use all available
   worker threads in the global thread pool. And this causes
   a deadlock in the GUI since QGraphicsView uses threads to
   manage updates.

.. we now manage the metric refresh via RideCacheRefreshThread
   and use at most 50% of the overall threads available in the
   global thread pool.

.. Have tested obvious triggers such as metric schema updates
   and user metrics being changed, but more testing is needed.

Fixes #3611
This commit is contained in:
Mark Liversedge
2022-06-12 16:59:40 +01:00
parent dd5bc51604
commit 9d9ad753f3
2 changed files with 111 additions and 35 deletions

View File

@@ -15,7 +15,6 @@
* with this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include "RideCache.h"
#include "Context.h"
@@ -162,13 +161,6 @@ RideCache::postLoad()
// do we have any stale items ?
connect(context, SIGNAL(configChanged(qint32)), this, SLOT(configChanged(qint32)));
// future watching
connect(&watcher, SIGNAL(finished()), this, SLOT(garbageCollect()));
connect(&watcher, SIGNAL(finished()), this, SLOT(save()));
connect(&watcher, SIGNAL(finished()), context, SLOT(notifyRefreshEnd()));
connect(&watcher, SIGNAL(started()), context, SLOT(notifyRefreshStart()));
connect(&watcher, SIGNAL(progressValueChanged(int)), this, SLOT(progressing(int)));
}
struct comparerideitem { bool operator()(const RideItem *p1, const RideItem *p2) { return p1->dateTime < p2->dateTime; } };
@@ -479,23 +471,35 @@ RideCache::writeAsCSV(QString filename)
file.close();
}
void
itemRefresh(RideItem *&item)
int
RideCache::nextRefresh()
{
// debugging below to watch refreshing take place
//fprintf(stderr, "%s %s refresh\n", item->context->athlete->cyclist.toStdString().c_str(), item->dateTime.toString().toStdString().c_str()); fflush(stderr);
int returning=-1;
updateMutex.lock();
// need parser to be reentrant !item->refresh();
if (item->isstale) {
item->refresh();
if (updates < 0) {
returning = -1; // force termination by returning -1
} else if (updates < reverse_.count()-1) {
returning = updates;
updates++;
progressing(returning);
}
updateMutex.unlock();
return(returning);
}
// and trap changes during refresh to current ride
if (item == item->context->currentRideItem())
item->context->notifyRideChanged(item);
void
RideCache::threadCompleted(RideCacheRefreshThread*thread)
{
updateMutex.lock();
refreshThreads.removeOne(thread);
updateMutex.unlock();
#ifdef SLOW_REFRESH
sleep(1);
#endif
if (refreshThreads.count() == 0) {
//fprintf(stderr,"refresh ended\n"); fflush(stderr);
context->notifyRefreshEnd();
garbageCollect();
save();
}
}
@@ -503,7 +507,7 @@ void
RideCache::progressing(int value)
{
// we're working away, notfy everyone where we got
progress_ = 100.0f * (double(value) / double(watcher.progressMaximum()));
progress_ = 100.0f * (double(value) / double(reverse_.count()));
if (value) {
QDate here = reverse_.at(value-1)->dateTime.date();
context->notifyRefreshUpdate(here);
@@ -514,9 +518,16 @@ RideCache::progressing(int value)
void
RideCache::cancel()
{
if (future.isRunning()) {
future.cancel();
future.waitForFinished();
updateMutex.lock();
QVector<RideCacheRefreshThread*>current = refreshThreads;
updates=-1;
updateMutex.unlock();
// wait till threads are empty, but use our copy as the master
// is going to be changing as threads terminate and we need to be
// sure all our threads have stopped before returning.
foreach(RideCacheRefreshThread *thread, current) {
thread->wait();
}
}
@@ -525,7 +536,7 @@ void
RideCache::refresh()
{
// already on it !
if (future.isRunning()) return;
if (refreshThreads.count()) return;
// how many need refreshing ?
int staleCount = 0;
@@ -540,15 +551,33 @@ RideCache::refresh()
// start if there is work to do
// and future watcher can notify of updates
if (staleCount) {
reverse_ = rides_;
std::sort(reverse_.begin(), reverse_.end(), rideCacheGreaterThan);
future = QtConcurrent::map(reverse_, itemRefresh);
watcher.setFuture(future);
//future = QtConcurrent::map(reverse_, itemRefresh);
//watcher.setFuture(future);
// calculate number of threads and work per thread
int maxthreads = QThreadPool::globalInstance()->maxThreadCount();
int threads = maxthreads / 2;
if (threads==0) threads=1; // need at least one!
int n=0;
// refresh happenning
updates = 0;
context->notifyRefreshStart();
while(n++ < threads) {
// if goes past last make it the last
RideCacheRefreshThread *thread = new RideCacheRefreshThread(this);
refreshThreads << thread;
thread->start();
}
} else {
// nothing to do, notify its started and done immediately
context->notifyRefreshStart();
// wait five seconds, so mainwindow can get up and running...
QTimer::singleShot(5000, context, SLOT(notifyRefreshEnd()));
@@ -828,3 +857,30 @@ RideCache::isMetricRelevantForRides(Specification specification,
return false;
}
// refresh metrics
void RideCacheRefreshThread::run()
{
//fprintf(stderr, "worker thread starts!\n"); fflush(stderr);
while (1) {
int n = cache->nextRefresh();
//fprintf(stderr, "refreshing %d of %d\n", n, cache->reverse_.count()); fflush(stderr);
if (n<0) {
//fprintf(stderr, "worker thread exits!\n"); fflush(stderr);
goto exitthread;
}
// we have one to do
RideItem *item = cache->reverse_[n];
if(item->isstale) {
item->refresh();
if (item == item->context->currentRideItem())
item->context->notifyRideChanged(item);
}
}
exitthread:
cache->threadCompleted(this);
return;
}