DarkPlaces
Game engine based on the Quake 1 engine by id Software, developed by LadyHavoc
taskqueue.c File Reference
#include "quakedef.h"
#include "taskqueue.h"
Include dependency graph for taskqueue.c:

Go to the source code of this file.

Data Structures

struct  taskqueue_state_t
struct  taskqueue_state_thread_t

Macros

#define MAXTHREADS   1024
#define RECENTFRAMES   64
#define THREADBATCH   64
#define THREADSLEEPCOUNT   1000
#define THREADTASKS   256

Functions

static void TaskQueue_DistributeTasks (void)
void TaskQueue_Enqueue (int numtasks, taskqueue_task_t *tasks)
static void TaskQueue_ExecuteTask (taskqueue_task_t *t)
void TaskQueue_Frame (qbool shutdown)
void TaskQueue_Init (void)
qbool TaskQueue_IsDone (taskqueue_task_t *t)
void TaskQueue_Setup (taskqueue_task_t *t, taskqueue_task_t *preceding, void(*func)(taskqueue_task_t *), size_t i0, size_t i1, void *p0, void *p1)
void TaskQueue_Shutdown (void)
void TaskQueue_Task_CheckTasksDone (taskqueue_task_t *t)
static int TaskQueue_ThreadFunc (void *d)
void TaskQueue_WaitForTaskDone (taskqueue_task_t *t)
void TaskQueue_Yield (taskqueue_task_t *t)

Variables

cvar_t taskqueue_maxthreads = {CF_CLIENT | CF_SERVER | CF_ARCHIVE, "taskqueue_maxthreads", "32", "maximum number of threads to start up as needed based on task count"}
cvar_t taskqueue_minthreads = {CF_CLIENT | CF_SERVER | CF_ARCHIVE, "taskqueue_minthreads", "0", "minimum number of threads to keep active for executing tasks"}
static taskqueue_state_t taskqueue_state
cvar_t taskqueue_tasksperthread = {CF_CLIENT | CF_SERVER | CF_ARCHIVE, "taskqueue_tasksperthread", "4000", "expected amount of work that a single thread can do in a frame - the number of threads being used depends on the average workload in recent frames"}

Macro Definition Documentation

◆ MAXTHREADS

#define MAXTHREADS   1024

Definition at line 8 of file taskqueue.c.

Referenced by TaskQueue_Frame().

◆ RECENTFRAMES

#define RECENTFRAMES   64

Definition at line 9 of file taskqueue.c.

Referenced by TaskQueue_Frame().

◆ THREADBATCH

#define THREADBATCH   64

Definition at line 11 of file taskqueue.c.

◆ THREADSLEEPCOUNT

#define THREADSLEEPCOUNT   1000

Definition at line 12 of file taskqueue.c.

Referenced by TaskQueue_ThreadFunc().

◆ THREADTASKS

#define THREADTASKS   256

Definition at line 10 of file taskqueue.c.

Referenced by TaskQueue_DistributeTasks(), and TaskQueue_ThreadFunc().

Function Documentation

◆ TaskQueue_DistributeTasks()

void TaskQueue_DistributeTasks ( void )
static

Definition at line 144 of file taskqueue.c.

145{
146 Thread_AtomicLock(&taskqueue_state.command_lock);
147 if (taskqueue_state.numthreads > 0)
148 {
149 unsigned int attempts = taskqueue_state.numthreads;
150 while (attempts-- > 0 && taskqueue_state.queue_enqueueposition != taskqueue_state.queue_dequeueposition)
151 {
152 taskqueue_task_t *t = taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition];
153 if (t->preceding && t->preceding->done == 0)
154 {
155 // task is waiting on something
156 // first dequeue it properly
157 taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition] = NULL;
158 taskqueue_state.queue_dequeueposition++;
159 if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size)
160 taskqueue_state.queue_dequeueposition = 0;
161 // now put it back in the distributor queue - we know there is room because we just made room
162 taskqueue_state.queue_data[taskqueue_state.queue_enqueueposition] = t;
163 taskqueue_state.queue_enqueueposition++;
164 if (taskqueue_state.queue_enqueueposition >= taskqueue_state.queue_size)
165 taskqueue_state.queue_enqueueposition = 0;
166 // we do not refresh the attempt counter here to avoid deadlock - quite often the only things sitting in the distributor queue are waiting on other tasks
167 }
168 else
169 {
170 taskqueue_state_thread_t *s = &taskqueue_state.threads[taskqueue_state.enqueuethread];
172 {
173 // add the task to the thread's queue
174 s->queue[(s->enqueueposition++) % THREADTASKS] = t;
175 // since we succeeded in assigning the task, advance the distributor queue
176 taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition] = NULL;
177 taskqueue_state.queue_dequeueposition++;
178 if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size)
179 taskqueue_state.queue_dequeueposition = 0;
180 // refresh our attempt counter because we did manage to assign something to a thread
181 attempts = taskqueue_state.numthreads;
182 }
183 }
184 }
185 }
187 // execute one pending task on the distributor queue, this matters if numthreads is 0
188 if (taskqueue_state.queue_dequeueposition != taskqueue_state.queue_enqueueposition)
189 {
190 taskqueue_task_t *t = taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition];
191 taskqueue_state.queue_dequeueposition++;
192 if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size)
193 taskqueue_state.queue_dequeueposition = 0;
194 if (t)
196 }
197}
#define NULL
Definition qtypes.h:12
unsigned int enqueueposition
Definition taskqueue.c:21
unsigned int dequeueposition
Definition taskqueue.c:22
taskqueue_task_t * queue[THREADTASKS]
Definition taskqueue.c:23
struct taskqueue_task_s * preceding
Definition taskqueue.h:11
#define THREADTASKS
Definition taskqueue.c:10
static taskqueue_state_t taskqueue_state
Definition taskqueue.c:51
static void TaskQueue_ExecuteTask(taskqueue_task_t *t)
Definition taskqueue.c:66
#define Thread_AtomicUnlock(lock)
Definition thread.h:36
#define Thread_AtomicLock(lock)
Definition thread.h:35

References taskqueue_state_thread_t::dequeueposition, taskqueue_state_thread_t::enqueueposition, NULL, taskqueue_task_t::preceding, taskqueue_state_thread_t::queue, TaskQueue_ExecuteTask(), taskqueue_state, Thread_AtomicLock, Thread_AtomicUnlock, and THREADTASKS.

Referenced by TaskQueue_Frame(), and TaskQueue_WaitForTaskDone().

◆ TaskQueue_Enqueue()

void TaskQueue_Enqueue ( int numtasks,
taskqueue_task_t * tasks )

Definition at line 105 of file taskqueue.c.

106{
107 int i;
108 Thread_AtomicLock(&taskqueue_state.command_lock);
109 if (taskqueue_state.queue_size <
110 (taskqueue_state.queue_enqueueposition < taskqueue_state.queue_dequeueposition ? taskqueue_state.queue_size : 0) +
111 taskqueue_state.queue_enqueueposition - taskqueue_state.queue_dequeueposition + numtasks)
112 {
113 // we have to grow the queue...
114 unsigned int newsize = (taskqueue_state.queue_size + numtasks) * 2;
115 if (newsize < 1024)
116 newsize = 1024;
117 taskqueue_state.queue_data = (taskqueue_task_t **)Mem_Realloc(zonemempool, taskqueue_state.queue_data, sizeof(*taskqueue_state.queue_data) * newsize);
118 taskqueue_state.queue_size = newsize;
119 }
120 for (i = 0; i < numtasks; i++)
121 {
122 if (tasks[i].yieldcount == 0)
123 taskqueue_state.tasks_thisframe++;
124 taskqueue_state.queue_data[taskqueue_state.queue_enqueueposition] = &tasks[i];
125 taskqueue_state.queue_enqueueposition++;
126 if (taskqueue_state.queue_enqueueposition >= taskqueue_state.queue_size)
127 taskqueue_state.queue_enqueueposition = 0;
128 }
130}
mempool_t * zonemempool
Definition zone.c:796
#define Mem_Realloc(pool, data, size)
Definition zone.h:94

References Mem_Realloc, taskqueue_state, Thread_AtomicLock, Thread_AtomicUnlock, and zonemempool.

Referenced by R_Shadow_BounceGrid_EnqueuePhotons_Task(), R_Shadow_BounceGrid_EnqueueSlices_Task(), R_Shadow_UpdateBounceGridTexture(), and TaskQueue_Yield().

◆ TaskQueue_ExecuteTask()

void TaskQueue_ExecuteTask ( taskqueue_task_t * t)
static

Definition at line 66 of file taskqueue.c.

67{
68 // see if t is waiting on something
69 if (t->preceding && t->preceding->done == 0)
71 else
72 t->func(t);
73}
void(* func)(struct taskqueue_task_s *task)
Definition taskqueue.h:17
void TaskQueue_Yield(taskqueue_task_t *t)
Definition taskqueue.c:133

References taskqueue_task_t::func, taskqueue_task_t::preceding, and TaskQueue_Yield().

Referenced by TaskQueue_DistributeTasks(), and TaskQueue_ThreadFunc().

◆ TaskQueue_Frame()

void TaskQueue_Frame ( qbool shutdown)

Definition at line 213 of file taskqueue.c.

214{
215 int i;
216 unsigned long long int avg;
217 int maxthreads = bound(0, taskqueue_maxthreads.integer, MAXTHREADS);
218 int numthreads = maxthreads;
219 int tasksperthread = bound(10, taskqueue_tasksperthread.integer, 100000);
220#ifdef THREADDISABLE
221 numthreads = 0;
222#endif
223
224 Thread_AtomicLock(&taskqueue_state.command_lock);
225 taskqueue_state.tasks_recentframesindex = (taskqueue_state.tasks_recentframesindex + 1) % RECENTFRAMES;
226 taskqueue_state.tasks_recentframes[taskqueue_state.tasks_recentframesindex] = taskqueue_state.tasks_thisframe;
227 taskqueue_state.tasks_thisframe = 0;
228 avg = 0;
229 for (i = 0; i < RECENTFRAMES; i++)
230 avg += taskqueue_state.tasks_recentframes[i];
231 taskqueue_state.tasks_averageperframe = avg / RECENTFRAMES;
233
234 numthreads = taskqueue_state.tasks_averageperframe / tasksperthread;
235 numthreads = bound(taskqueue_minthreads.integer, numthreads, taskqueue_maxthreads.integer);
236
237 if (shutdown)
238 numthreads = 0;
239
240 // check if we need to close some threads
241 if (taskqueue_state.numthreads > numthreads)
242 {
243 // tell extra threads to quit
244 Thread_AtomicLock(&taskqueue_state.command_lock);
245 for (i = numthreads; i < taskqueue_state.numthreads; i++)
246 taskqueue_state.threads[i].quit = 1;
248 for (i = numthreads; i < taskqueue_state.numthreads; i++)
249 {
250 if (taskqueue_state.threads[i].handle)
251 Thread_WaitThread(taskqueue_state.threads[i].handle, 0);
252 taskqueue_state.threads[i].handle = NULL;
253 }
254 // okay we're at the new state now
255 taskqueue_state.numthreads = numthreads;
256 }
257
258 // check if we need to start more threads
259 if (taskqueue_state.numthreads < numthreads)
260 {
261 // make sure we're not telling new threads to just quit on startup
262 Thread_AtomicLock(&taskqueue_state.command_lock);
263 for (i = taskqueue_state.numthreads; i < numthreads; i++)
264 taskqueue_state.threads[i].quit = 0;
266
267 // start new threads
268 for (i = taskqueue_state.numthreads; i < numthreads; i++)
269 {
270 taskqueue_state.threads[i].thread_index = i;
272 }
273
274 // okay we're at the new state now
275 taskqueue_state.numthreads = numthreads;
276 }
277
278 // just for good measure, distribute any pending tasks that span across frames
280}
#define bound(min, num, max)
Definition mathlib.h:34
cvar_t taskqueue_minthreads
Definition taskqueue.c:4
static int TaskQueue_ThreadFunc(void *d)
Definition taskqueue.c:77
cvar_t taskqueue_maxthreads
Definition taskqueue.c:5
#define RECENTFRAMES
Definition taskqueue.c:9
#define MAXTHREADS
Definition taskqueue.c:8
static void TaskQueue_DistributeTasks(void)
Definition taskqueue.c:144
cvar_t taskqueue_tasksperthread
Definition taskqueue.c:6
#define Thread_WaitThread(thread, retval)
Definition thread.h:25
#define Thread_CreateThread(fn, data)
Definition thread.h:24

References bound, MAXTHREADS, NULL, RECENTFRAMES, TaskQueue_DistributeTasks(), taskqueue_maxthreads, taskqueue_minthreads, taskqueue_state, taskqueue_tasksperthread, TaskQueue_ThreadFunc(), Thread_AtomicLock, Thread_AtomicUnlock, Thread_CreateThread, and Thread_WaitThread.

Referenced by Host_Frame(), and TaskQueue_Shutdown().

◆ TaskQueue_Init()

void TaskQueue_Init ( void )

Definition at line 53 of file taskqueue.c.

54{
58}
void Cvar_RegisterVariable(cvar_t *variable)
registers a cvar that already has the name, string, and optionally the archive elements set.
Definition cvar.c:599

References Cvar_RegisterVariable(), taskqueue_maxthreads, taskqueue_minthreads, and taskqueue_tasksperthread.

Referenced by Host_Init().

◆ TaskQueue_IsDone()

qbool TaskQueue_IsDone ( taskqueue_task_t * t)

Definition at line 139 of file taskqueue.c.

140{
141 return !!t->done;
142}
volatile int done
Definition taskqueue.h:14

References taskqueue_task_t::done.

◆ TaskQueue_Setup()

void TaskQueue_Setup ( taskqueue_task_t * t,
taskqueue_task_t * preceding,
void(* func )(taskqueue_task_t *),
size_t i0,
size_t i1,
void * p0,
void * p1 )

Definition at line 282 of file taskqueue.c.

283{
284 memset(t, 0, sizeof(*t));
285 t->preceding = preceding;
286 t->func = func;
287 t->i[0] = i0;
288 t->i[1] = i1;
289 t->p[0] = p0;
290 t->p[1] = p1;
291}
void * p[2]
Definition taskqueue.h:19
size_t i[2]
Definition taskqueue.h:20

References taskqueue_task_t::func, taskqueue_task_t::i, taskqueue_task_t::p, and taskqueue_task_t::preceding.

Referenced by R_Shadow_BounceGrid_EnqueuePhotons_Task(), R_Shadow_BounceGrid_EnqueueSlices_Task(), and R_Shadow_UpdateBounceGridTexture().

◆ TaskQueue_Shutdown()

void TaskQueue_Shutdown ( void )

Definition at line 60 of file taskqueue.c.

61{
62 if (taskqueue_state.numthreads)
63 TaskQueue_Frame(true);
64}
void TaskQueue_Frame(qbool shutdown)
Definition taskqueue.c:213

References TaskQueue_Frame(), and taskqueue_state.

Referenced by Host_Shutdown().

◆ TaskQueue_Task_CheckTasksDone()

void TaskQueue_Task_CheckTasksDone ( taskqueue_task_t * t)

Definition at line 293 of file taskqueue.c.

294{
295 size_t numtasks = t->i[0];
296 taskqueue_task_t *tasks = (taskqueue_task_t *)t->p[0];
297 while (numtasks > 0)
298 {
299 // check the last task first as it's usually going to be the last to finish, so we do the least work by checking it first
300 if (!tasks[numtasks - 1].done)
301 {
302 // update our partial progress, then yield to another pending task.
303 t->i[0] = numtasks;
304 // set our preceding task to one of the ones we are watching for
305 t->preceding = &tasks[numtasks - 1];
307 return;
308 }
309 numtasks--;
310 }
311 t->done = 1;
312}

References taskqueue_task_t::done, taskqueue_task_t::i, taskqueue_task_t::p, taskqueue_task_t::preceding, and TaskQueue_Yield().

Referenced by R_Shadow_BounceGrid_EnqueuePhotons_Task(), and R_Shadow_BounceGrid_EnqueueSlices_Task().

◆ TaskQueue_ThreadFunc()

int TaskQueue_ThreadFunc ( void * d)
static

Definition at line 77 of file taskqueue.c.

78{
80 unsigned int sleepcounter = 0;
81 for (;;)
82 {
83 qbool quit;
84 while (s->dequeueposition != s->enqueueposition)
85 {
88 // when we advance, also clear the pointer for good measure
90 sleepcounter = 0;
91 }
93 quit = s->quit != 0;
95 if (quit)
96 break;
97 sleepcounter++;
98 if (sleepcounter >= THREADSLEEPCOUNT)
99 Sys_Sleep(0.001);
100 sleepcounter = 0;
101 }
102 return 0;
103}
bool qbool
Definition qtypes.h:9
double Sys_Sleep(double time)
called to yield for a little bit so as not to hog cpu when paused or debugging
Definition sys_shared.c:500
#define THREADSLEEPCOUNT
Definition taskqueue.c:12

References taskqueue_state_thread_t::dequeueposition, taskqueue_state_thread_t::enqueueposition, NULL, taskqueue_state_thread_t::queue, taskqueue_state_thread_t::quit, Sys_Sleep(), TaskQueue_ExecuteTask(), taskqueue_state, Thread_AtomicLock, Thread_AtomicUnlock, THREADSLEEPCOUNT, and THREADTASKS.

Referenced by TaskQueue_Frame().

◆ TaskQueue_WaitForTaskDone()

void TaskQueue_WaitForTaskDone ( taskqueue_task_t * t)

Definition at line 199 of file taskqueue.c.

200{
201 qbool done = false;
202 for (;;)
203 {
204 Thread_AtomicLock(&taskqueue_state.command_lock);
205 done = t->done != 0;
207 if (done)
208 break;
210 }
211}

References taskqueue_task_t::done, TaskQueue_DistributeTasks(), taskqueue_state, Thread_AtomicLock, and Thread_AtomicUnlock.

Referenced by R_Shadow_UpdateBounceGridTexture().

◆ TaskQueue_Yield()

void TaskQueue_Yield ( taskqueue_task_t * t)

Definition at line 133 of file taskqueue.c.

134{
135 t->yieldcount++;
136 TaskQueue_Enqueue(1, t);
137}
unsigned int yieldcount
Definition taskqueue.h:22
void TaskQueue_Enqueue(int numtasks, taskqueue_task_t *tasks)
Definition taskqueue.c:105

References TaskQueue_Enqueue(), and taskqueue_task_t::yieldcount.

Referenced by R_Shadow_BounceGrid_EnqueueSlices_Task(), TaskQueue_ExecuteTask(), and TaskQueue_Task_CheckTasksDone().

Variable Documentation

◆ taskqueue_maxthreads

cvar_t taskqueue_maxthreads = {CF_CLIENT | CF_SERVER | CF_ARCHIVE, "taskqueue_maxthreads", "32", "maximum number of threads to start up as needed based on task count"}

Definition at line 5 of file taskqueue.c.

5{CF_CLIENT | CF_SERVER | CF_ARCHIVE, "taskqueue_maxthreads", "32", "maximum number of threads to start up as needed based on task count"};
#define CF_SERVER
cvar/command that only the server can change/execute
Definition cmd.h:49
#define CF_CLIENT
cvar/command that only the client can change/execute
Definition cmd.h:48
#define CF_ARCHIVE
cvar should have its set value saved to config.cfg and persist across sessions
Definition cmd.h:53

Referenced by TaskQueue_Frame(), and TaskQueue_Init().

◆ taskqueue_minthreads

cvar_t taskqueue_minthreads = {CF_CLIENT | CF_SERVER | CF_ARCHIVE, "taskqueue_minthreads", "0", "minimum number of threads to keep active for executing tasks"}

Definition at line 4 of file taskqueue.c.

4{CF_CLIENT | CF_SERVER | CF_ARCHIVE, "taskqueue_minthreads", "0", "minimum number of threads to keep active for executing tasks"};

Referenced by TaskQueue_Frame(), and TaskQueue_Init().

◆ taskqueue_state

◆ taskqueue_tasksperthread

cvar_t taskqueue_tasksperthread = {CF_CLIENT | CF_SERVER | CF_ARCHIVE, "taskqueue_tasksperthread", "4000", "expected amount of work that a single thread can do in a frame - the number of threads being used depends on the average workload in recent frames"}

Definition at line 6 of file taskqueue.c.

6{CF_CLIENT | CF_SERVER | CF_ARCHIVE, "taskqueue_tasksperthread", "4000", "expected amount of work that a single thread can do in a frame - the number of threads being used depends on the average workload in recent frames"};

Referenced by TaskQueue_Frame(), and TaskQueue_Init().