DarkPlaces
Game engine based on the Quake 1 engine by id Software, developed by LadyHavoc
 
taskqueue.c
Go to the documentation of this file.
1#include "quakedef.h"
2#include "taskqueue.h"
3
4cvar_t taskqueue_minthreads = {CF_CLIENT | CF_SERVER | CF_ARCHIVE, "taskqueue_minthreads", "0", "minimum number of threads to keep active for executing tasks"};
5cvar_t taskqueue_maxthreads = {CF_CLIENT | CF_SERVER | CF_ARCHIVE, "taskqueue_maxthreads", "32", "maximum number of threads to start up as needed based on task count"};
6cvar_t taskqueue_tasksperthread = {CF_CLIENT | CF_SERVER | CF_ARCHIVE, "taskqueue_tasksperthread", "4000", "expected amount of work that a single thread can do in a frame - the number of threads being used depends on the average workload in recent frames"};
7
8#define MAXTHREADS 1024
9#define RECENTFRAMES 64 // averaging thread activity over this many frames to decide how many threads we need
10#define THREADTASKS 256 // thread can hold this many tasks in its own queue
11#define THREADBATCH 64 // thread will run this many tasks before checking status again
12#define THREADSLEEPCOUNT 1000 // thread will sleep for a little while if it checks this many times and has no work to do
13
14typedef struct taskqueue_state_thread_s
15{
16 void *handle;
17 unsigned int quit;
18 unsigned int thread_index;
19 unsigned int tasks_completed;
20
21 unsigned int enqueueposition;
22 unsigned int dequeueposition;
24}
26
27typedef struct taskqueue_state_s
28{
29 // TaskQueue_DistributeTasks cycles through the threads when assigning, each has its own queue
30 unsigned int enqueuethread;
33
34 // synchronization point for enqueue and some other memory access
36
37 // distributor queue (not assigned to threads yet, or waiting on other tasks)
40 unsigned int queue_size;
42
43 // metrics to balance workload vs cpu resources
45 unsigned int tasks_recentframes[RECENTFRAMES];
46 unsigned int tasks_thisframe;
48}
50
52
59
61{
63 TaskQueue_Frame(true);
64}
65
67{
68 // see if t is waiting on something
69 if (t->preceding && t->preceding->done == 0)
71 else
72 t->func(t);
73}
74
75// FIXME: don't use mutex
76// FIXME: this is basically fibers but less featureful - context switching for yield is not implemented
77static int TaskQueue_ThreadFunc(void *d)
78{
80 unsigned int sleepcounter = 0;
81 for (;;)
82 {
83 qbool quit;
84 while (s->dequeueposition != s->enqueueposition)
85 {
88 // when we advance, also clear the pointer for good measure
90 sleepcounter = 0;
91 }
93 quit = s->quit != 0;
95 if (quit)
96 break;
97 sleepcounter++;
98 if (sleepcounter >= THREADSLEEPCOUNT)
99 Sys_Sleep(0.001);
100 sleepcounter = 0;
101 }
102 return 0;
103}
104
131
132// if the task can not be completed due yet to preconditions, just enqueue it again...
134{
135 t->yieldcount++;
136 TaskQueue_Enqueue(1, t);
137}
138
140{
141 return !!t->done;
142}
143
145{
148 {
149 unsigned int attempts = taskqueue_state.numthreads;
151 {
153 if (t->preceding && t->preceding->done == 0)
154 {
155 // task is waiting on something
156 // first dequeue it properly
161 // now put it back in the distributor queue - we know there is room because we just made room
166 // we do not refresh the attempt counter here to avoid deadlock - quite often the only things sitting in the distributor queue are waiting on other tasks
167 }
168 else
169 {
172 {
173 // add the task to the thread's queue
174 s->queue[(s->enqueueposition++) % THREADTASKS] = t;
175 // since we succeeded in assigning the task, advance the distributor queue
180 // refresh our attempt counter because we did manage to assign something to a thread
181 attempts = taskqueue_state.numthreads;
182 }
183 }
184 }
185 }
187 // execute one pending task on the distributor queue, this matters if numthreads is 0
189 {
194 if (t)
196 }
197}
198
200{
201 qbool done = false;
202 for (;;)
203 {
205 done = t->done != 0;
207 if (done)
208 break;
210 }
211}
212
213void TaskQueue_Frame(qbool shutdown)
214{
215 int i;
216 unsigned long long int avg;
217 int maxthreads = bound(0, taskqueue_maxthreads.integer, MAXTHREADS);
218 int numthreads = maxthreads;
219 int tasksperthread = bound(10, taskqueue_tasksperthread.integer, 100000);
220#ifdef THREADDISABLE
221 numthreads = 0;
222#endif
223
228 avg = 0;
229 for (i = 0; i < RECENTFRAMES; i++)
233
234 numthreads = taskqueue_state.tasks_averageperframe / tasksperthread;
236
237 if (shutdown)
238 numthreads = 0;
239
240 // check if we need to close some threads
241 if (taskqueue_state.numthreads > numthreads)
242 {
243 // tell extra threads to quit
245 for (i = numthreads; i < taskqueue_state.numthreads; i++)
248 for (i = numthreads; i < taskqueue_state.numthreads; i++)
249 {
253 }
254 // okay we're at the new state now
255 taskqueue_state.numthreads = numthreads;
256 }
257
258 // check if we need to start more threads
259 if (taskqueue_state.numthreads < numthreads)
260 {
261 // make sure we're not telling new threads to just quit on startup
263 for (i = taskqueue_state.numthreads; i < numthreads; i++)
266
267 // start new threads
268 for (i = taskqueue_state.numthreads; i < numthreads; i++)
269 {
272 }
273
274 // okay we're at the new state now
275 taskqueue_state.numthreads = numthreads;
276 }
277
278 // just for good measure, distribute any pending tasks that span across frames
280}
281
282void TaskQueue_Setup(taskqueue_task_t *t, taskqueue_task_t *preceding, void(*func)(taskqueue_task_t *), size_t i0, size_t i1, void *p0, void *p1)
283{
284 memset(t, 0, sizeof(*t));
285 t->preceding = preceding;
286 t->func = func;
287 t->i[0] = i0;
288 t->i[1] = i1;
289 t->p[0] = p0;
290 t->p[1] = p1;
291}
292
294{
295 size_t numtasks = t->i[0];
296 taskqueue_task_t *tasks = (taskqueue_task_t *)t->p[0];
297 while (numtasks > 0)
298 {
299 // check the last task first as it's usually going to be the last to finish, so we do the least work by checking it first
300 if (!tasks[numtasks - 1].done)
301 {
302 // update our partial progress, then yield to another pending task.
303 t->i[0] = numtasks;
304 // set our preceding task to one of the ones we are watching for
305 t->preceding = &tasks[numtasks - 1];
307 return;
308 }
309 numtasks--;
310 }
311 t->done = 1;
312}
#define CF_SERVER
cvar/command that only the server can change/execute
Definition cmd.h:49
#define CF_CLIENT
cvar/command that only the client can change/execute
Definition cmd.h:48
#define CF_ARCHIVE
cvar should have its set value saved to config.cfg and persist across sessions
Definition cmd.h:53
void Cvar_RegisterVariable(cvar_t *variable)
registers a cvar that already has the name, string, and optionally the archive elements set.
Definition cvar.c:599
#define bound(min, num, max)
Definition mathlib.h:34
int i
#define NULL
Definition qtypes.h:12
bool qbool
Definition qtypes.h:9
Definition cvar.h:66
int integer
Definition cvar.h:73
unsigned int tasks_thisframe
Definition taskqueue.c:46
unsigned int queue_enqueueposition
Definition taskqueue.c:38
unsigned int tasks_averageperframe
Definition taskqueue.c:47
taskqueue_state_thread_t threads[MAXTHREADS]
Definition taskqueue.c:32
taskqueue_task_t ** queue_data
Definition taskqueue.c:41
Thread_SpinLock command_lock
Definition taskqueue.c:35
unsigned int queue_size
Definition taskqueue.c:40
unsigned int tasks_recentframes[RECENTFRAMES]
Definition taskqueue.c:45
unsigned int tasks_recentframesindex
Definition taskqueue.c:44
unsigned int enqueuethread
Definition taskqueue.c:30
unsigned int queue_dequeueposition
Definition taskqueue.c:39
unsigned int enqueueposition
Definition taskqueue.c:21
unsigned int dequeueposition
Definition taskqueue.c:22
unsigned int tasks_completed
Definition taskqueue.c:19
unsigned int thread_index
Definition taskqueue.c:18
taskqueue_task_t * queue[THREADTASKS]
Definition taskqueue.c:23
void * p[2]
Definition taskqueue.h:19
size_t i[2]
Definition taskqueue.h:20
struct taskqueue_task_s * preceding
Definition taskqueue.h:11
volatile int done
Definition taskqueue.h:14
unsigned int yieldcount
Definition taskqueue.h:22
void(* func)(struct taskqueue_task_s *task)
Definition taskqueue.h:17
double Sys_Sleep(double time)
called to yield for a little bit so as not to hog cpu when paused or debugging
Definition sys_shared.c:500
cvar_t taskqueue_minthreads
Definition taskqueue.c:4
void TaskQueue_Frame(qbool shutdown)
Definition taskqueue.c:213
void TaskQueue_Yield(taskqueue_task_t *t)
Definition taskqueue.c:133
#define THREADTASKS
Definition taskqueue.c:10
static int TaskQueue_ThreadFunc(void *d)
Definition taskqueue.c:77
cvar_t taskqueue_maxthreads
Definition taskqueue.c:5
void TaskQueue_WaitForTaskDone(taskqueue_task_t *t)
Definition taskqueue.c:199
qbool TaskQueue_IsDone(taskqueue_task_t *t)
Definition taskqueue.c:139
#define RECENTFRAMES
Definition taskqueue.c:9
void TaskQueue_Shutdown(void)
Definition taskqueue.c:60
void TaskQueue_Setup(taskqueue_task_t *t, taskqueue_task_t *preceding, void(*func)(taskqueue_task_t *), size_t i0, size_t i1, void *p0, void *p1)
Definition taskqueue.c:282
static taskqueue_state_t taskqueue_state
Definition taskqueue.c:51
#define MAXTHREADS
Definition taskqueue.c:8
#define THREADSLEEPCOUNT
Definition taskqueue.c:12
void TaskQueue_Init(void)
Definition taskqueue.c:53
void TaskQueue_Enqueue(int numtasks, taskqueue_task_t *tasks)
Definition taskqueue.c:105
static void TaskQueue_ExecuteTask(taskqueue_task_t *t)
Definition taskqueue.c:66
static void TaskQueue_DistributeTasks(void)
Definition taskqueue.c:144
void TaskQueue_Task_CheckTasksDone(taskqueue_task_t *t)
Definition taskqueue.c:293
cvar_t taskqueue_tasksperthread
Definition taskqueue.c:6
#define Thread_AtomicUnlock(lock)
Definition thread.h:36
int Thread_SpinLock
Definition thread.h:12
#define Thread_WaitThread(thread, retval)
Definition thread.h:25
#define Thread_AtomicLock(lock)
Definition thread.h:35
#define Thread_CreateThread(fn, data)
Definition thread.h:24
mempool_t * zonemempool
Definition zone.c:796
#define Mem_Realloc(pool, data, size)
Definition zone.h:94