1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181 | /* ============================================================
*
* This file is a part of digiKam project
* https://www.digikam.org
*
* Date : 2024-11-10
* Description : Foundation for all ML pipelines
*
* SPDX-FileCopyrightText: 2024 by Gilles Caulier <caulier dot gilles at gmail dot com>
* SPDX-FileCopyrightText: 2024 by Michael Miller <michael underscore miller at msn dot com>
*
* SPDX-License-Identifier: GPL-2.0-or-later
*
* ============================================================ */
#pragma once
// Qt includes
#include <QImage>
#include <QIcon>
#include <QAtomicInteger>
#include <QMutex>
#include <QMutexLocker>
#include <QThreadPool>
#include <QFuture>
#include <QFutureWatcher>
// local includes
#include "digikam_export.h"
#include "digikam_opencv.h"
#include "mlpipelinepackagenotify.h"
#include "mlpipelinepackagefoundation.h"
namespace Digikam
{
// class DImg;
class MLPipelinePackage;
template <typename T>
class DIGIKAM_EXPORT SharedQueue;
class DIGIKAM_EXPORT MLPipelineFoundation : public QObject
{
Q_OBJECT
public:
enum MLPipelineStage
{
/// Finder stage finds the data for the pipeline
Finder,
/// Loader stage loads and prepares the data for extraction
Loader,
/// Extractor stage pulls the features from the data
Extractor,
/// Classifier stage adds a label (face, autotag, etc) to an extracted object
Classifier,
/// Classifier stage adds a label (face, autotag, etc) to an extracted object
Trainer,
/// Writer stage saves the data to the DB
Writer,
// empty stage
None
};
enum MLPipelineNotification
{
notifySkipped,
notifyProcessed
};
typedef struct _MLPipelinePerformanceProfile
{
int elapsedTime;
int itemCount;
int maxQueueCount;
int maxElapsedTime;
QAtomicInteger<int> currentThreadCount;
QAtomicInteger<int> maxThreadCount;
}
MLPipelinePerformanceProfile;
typedef SharedQueue<MLPipelinePackageFoundation*> MLPipelineQueue;
MLPipelineFoundation();
virtual ~MLPipelineFoundation();
virtual bool start();
virtual void cancel();
bool hasFinished() const;
Q_SIGNALS:
/// Emitted when processing is scheduled.
void scheduled();
/// Emitted when processing has started.
void started(const QString& message);
/// Emitted when one package begins processing.
void processing(const MLPipelinePackageNotify::Ptr& package);
/// Emitted when one package has finished processing.
void processed(const MLPipelinePackageNotify::Ptr& package);
void progressValueChanged(float progress);
/// Emitted when the last package has finished processing.
void finished();
/// Emitted when one or several packages were skipped, usually because they have already been scanned.
void skipped(const MLPipelinePackageNotify::Ptr& package);
void signalAddMoreWorkers();
void signalUpdateItemCount(const qlonglong itemCount);
private Q_SLOTS:
void slotFinished();
void slotAddMoreWorkers();
protected:
QMap<MLPipelineStage, MLPipelineQueue*> queues;
QMutex mutex;
QMutex threadStageMutex;
QAtomicInteger<int> itemsProcessed;
QAtomicInteger<int> totalItemCount;
bool cancelled = false;
QThreadPool* threadPool = nullptr;
QList<QFutureWatcher<bool>* > watchList;
QMap<MLPipelineStage, MLPipelinePerformanceProfile> performanceProfileList;
quint64 maxBufferSize = 2147483648; ///< 2 GB default
quint64 usedBufferSize = 0;
protected:
virtual bool finder() = 0;
virtual bool loader() = 0;
virtual bool extractor() = 0;
virtual bool classifier() = 0;
virtual bool trainer() = 0;
virtual bool writer() = 0;
virtual void addMoreWorkers() = 0;
// queue helper functions
MLPipelinePackageFoundation* queueEndSignal() const { return nullptr; }
void clearQueue(MLPipelineQueue* thisQueue);
void clearAllQueues();
virtual bool enqueue(MLPipelineQueue* thisQueue, MLPipelinePackageFoundation* package);
virtual MLPipelinePackageFoundation* dequeue(MLPipelineQueue* thisQueue);
bool addWorker(const MLPipelineStage& stage);
void waitForStart() { QMutexLocker lock(&mutex); }
void stageStart(QThread::Priority threadPriority, MLPipelineStage thisStage, MLPipelineStage nextStage, MLPipelineQueue*& thisQueue, MLPipelineQueue*& nextQueue);
void stageEnd(MLPipelineStage thisStage, MLPipelineStage nextStage);
void notify(MLPipelineNotification notification, const QString _name, const QString _path, int _processed, const QImage& _thumbnail);<--- Function parameter '_name' should be passed by const reference.<--- Function parameter '_path' should be passed by const reference.
void notify(MLPipelineNotification notification, const QString _name, const QString _path, int _processed, const DImg& _thumbnail);<--- Function parameter '_name' should be passed by const reference.<--- Function parameter '_path' should be passed by const reference.
void notify(MLPipelineNotification notification, const QString _name, const QString _path, int _processed, const QIcon& _thumbnail);<--- Function parameter '_name' should be passed by const reference.<--- Function parameter '_path' should be passed by const reference.
void showPipelinePerformance() const;
private:
MLPipelineFoundation(MLPipelineFoundation&) = delete;
};
}
|