This is the mail archive of the pthreads-win32@sources.redhat.com mailing list for the pthreas-win32 project.
| Index Nav: | [Date Index] [Subject Index] [Author Index] [Thread Index] | |
|---|---|---|
| Message Nav: | [Date Prev] [Date Next] | [Thread Prev] [Thread Next] |
| Other format: | [Raw text] | |
/*
Command line options are (in this order):
Number of Sender threads
Number of Receiver threads
CV wait time (microsecs)
Trace level
Sender sleeps (bool)
Receiver sleeps (bool)
Monitor rate (per second)
E.g.
condvar10.exe 8 8 1000000 -10 1 0 10
Log info is to 'condvar10.log'.
*/
#include <stdio.h>
#include <sys/timeb.h>
#include <stdlib.h>
#include <signal.h>
#include "condvar10.h"
char * logFile = "condvar10.log";
const DWORD MILLISEC_PER_SEC = 1000L;
const DWORD MICROSEC_PER_NANOSEC = 1000L;
const DWORD NANOSEC_PER_MILLISEC = 1000000L;
const DWORD MICROSEC_PER_SEC = 1000000L;
const DWORD NANOSEC_PER_SEC = 1000000000L;
enum {
Receiver = 0,
Sender,
MaxThreads = 100
};
pthread_t tid[2][MaxThreads];
typedef struct thrState_t_ {
int op;
int watchdog;
int signalled;
pthread_mutex_t opLock;
} thrState_t;
thrState_t thrState[2][MaxThreads];
void* recvReq(void *arg);
void* sendReq(void *arg);
void SendData();
void RecvData();
int msg=0;
int trace=1;
int sendSleep=0;
int recvSleep=0;
int monitorRate=10;
DWORD monitorInterval;
DWORD counter=0;
DWORD lastCount=0;
DWORD received=0;
DWORD TOs=0;
pthread_mutex_t lock;
pthread_cond_t sig;
int noSthr = 1;
int noRthr = 1;
DWORD timeint = 5*MICROSEC_PER_SEC; // 5 sec
int ThreadRecvCount[MaxThreads];
int ThreadTOCount[MaxThreads];
int ThreadSentCount[MaxThreads];
enum Operations {
SLock = 0x00000001,
ELock = 0x00000002,
SUnlock = 0x00000010,
EUnlock = 0x00000020,
SWait = 0x00000100,
EWait = 0x00000200,
WaitTimeout = 0x00000400,
SSignal = 0x00001000,
ESignal = 0x00002000,
MsgFalse = 0x40000000,
MsgTrue = 0x80000000,
};
void RecvData(int threadNum);
void SendData(int threadNum);
void * recvReq(void *arg);
void * sendReq(void *arg);
pthread_mutex_t LOGX;
#define OPENLOG(_openMode) \
{ \
FILE * LOGFP; \
(void)pthread_mutex_lock(&LOGX); \
if ((LOGFP=fopen(logFile, _openMode)) == NULL) \
{ \
fprintf(stdout, "Line %d: Log open error\n", __LINE__); \
fflush(stdout); \
} \
else \
{
#define CLOSELOG(_exitAfterClose) \
fclose(LOGFP); \
if(_exitAfterClose) exit(1); \
} \
(void)pthread_mutex_unlock(&LOGX); \
}
#define LOGERR \
{ \
if(status!=0) \
{ \
OPENLOG("a"); \
fprintf(LOGFP,"Error at line %d, status %d\n",__LINE__, status); \
CLOSELOG(1); \
} \
}
void PR (char *s)
{
long id;
if(trace>0)
{
id=GetCurrentThreadId ();
OPENLOG("a");
fprintf(LOGFP,"TH-%lx:%s\n",id,s);
CLOSELOG(0);
}
}
void SetOp(int SR, int threadNum, int op)
{
int status = pthread_mutex_lock(&thrState[SR][threadNum].opLock);
LOGERR;
thrState[SR][threadNum].op = op;
status = pthread_mutex_unlock(&thrState[SR][threadNum].opLock);
LOGERR;
}
void OrOp(int SR, int threadNum, int op)
{
int status = pthread_mutex_lock(&thrState[SR][threadNum].opLock);
LOGERR;
thrState[SR][threadNum].op |= op;
status = pthread_mutex_unlock(&thrState[SR][threadNum].opLock);
LOGERR;
}
void SetWatchdog(int SR, int threadNum, int woof)
{
int status = pthread_mutex_lock(&thrState[SR][threadNum].opLock);
LOGERR;
thrState[SR][threadNum].watchdog = woof;
status = pthread_mutex_unlock(&thrState[SR][threadNum].opLock);
LOGERR;
}
BOOL logPoint(long increment)
{
if (counter >= lastCount + increment)
{
lastCount = counter;
return TRUE;
}
return FALSE;
}
void PrintOptions (FILE * fp)
{
fprintf(fp, "Options are (in this order):\n");
fprintf(fp, " %-30s: %8ld\n", "Number of Sender threads", noSthr);
fprintf(fp, " %-30s: %8ld\n", "Number of Receiver threads", noRthr);
fprintf(fp, " %-30s: %8ld\n", "CV wait time (microsecs)", timeint);
fprintf(fp, " %-30s: %8ld\n", "Trace level", trace);
fprintf(fp, " %-30s: %8ld\n", "Sender sleeps (bool)", sendSleep);
fprintf(fp, " %-30s: %8ld\n", "Receiver sleeps (bool)", recvSleep);
fprintf(fp, " %-30s: %8ld\n", "Monitor rate (per second)", monitorRate);
putc('\n',fp);
}
int
main(int argc , char * argv[])
{
pthread_mutexattr_t la;
int status;
int ii;
DWORD milliseconds = 0;
DWORD lastSendWatch = 0;
DWORD lastRecvWatch = 0;
int r = 0;
DWORD seconds = 0;
DWORD lastLogSeconds = 0;
char * rotor = "/-\\|";
if(argc>1)
{
noSthr=atoi(argv[1]);
if (noSthr >= MaxThreads)
{
printf("Requested too many Secnder threads = %d. Max is %d\n", noSthr, MaxThreads);
exit(1);
}
}
if(argc>2)
{
noRthr=atoi(argv[2]);
if (noRthr >= MaxThreads)
{
printf("Requested too many Receiver threads = %d. Max is %d\n", noRthr, MaxThreads);
exit(1);
}
}
if(argc>3)
{
timeint=atoi(argv[3]);
}
if(argc>4)
{
trace=atoi(argv[4]);
}
if(argc>5)
{
sendSleep=atoi(argv[5]);
}
if(argc>6)
{
recvSleep=atoi(argv[6]);
}
if(argc>7)
{
monitorRate=atoi(argv[7]);
// Round to nearest 10 so that 1000ms (1 sec) is a multiple of the interval
monitorRate=((monitorRate+5)/10)*10;
}
monitorInterval = MILLISEC_PER_SEC/monitorRate;
if (pthread_mutexattr_init(&la) != 0
|| pthread_mutexattr_settype(&la, PTHREAD_MUTEX_ERRORCHECK) != 0
|| pthread_mutex_init(&LOGX, &la) != 0)
{
printf("Line %d: Error initialising log mutex.\n", __LINE__);
exit(1);
}
status = pthread_mutex_init(&lock, &la);
LOGERR;
status = pthread_cond_init(&sig, NULL);
LOGERR;
PrintOptions(stdout);
fflush(stdout);
OPENLOG("w");
PrintOptions(LOGFP);
CLOSELOG(0);
for(ii = 0; ii < noRthr; ii++)
{
status = pthread_mutex_init(&thrState[Receiver][ii].opLock, &la);
LOGERR;
status = pthread_create(&tid[Receiver][ii], NULL, (PTHREAD_START_ROUTINE_DECL)&recvReq, (void *)ii);
LOGERR;
}
for(ii = 0; ii < noSthr; ii++)
{
status = pthread_mutex_init(&thrState[Sender][ii].opLock, &la);
LOGERR;
status = pthread_create(&tid[Sender][ii], NULL, (PTHREAD_START_ROUTINE_DECL)&sendReq, (void *)ii);
LOGERR;
}
status = pthread_mutexattr_destroy(&la);
LOGERR;
while(1) //Monitor threads until they hang
{
int stillRunning;
BOOL newSecond;
Sleep(monitorInterval);
milliseconds+=monitorInterval;
newSecond = (milliseconds >= MILLISEC_PER_SEC);
putchar(rotor[r=((r++)&0x3)]);
putchar('\b');
// Log Sends and Receives/Timeouts
if (trace > 0
|| (trace < 0 && trace >= -5000
&& (logPoint(-trace) || seconds > lastLogSeconds + 1 /* At least 1 second */)))
{
int ii;
lastLogSeconds = seconds;
OPENLOG("a");
fprintf(LOGFP,"count=%010ld, Thr/Recvd/TOs",
counter);
for (ii=0;ii<noRthr;ii++)
{
fprintf(LOGFP, " %d/%04d/%04d",
ii,
ThreadRecvCount[ii],
ThreadTOCount[ii]);
ThreadRecvCount[ii]=0;
ThreadTOCount[ii]=0;
}
fprintf(LOGFP," : Thr/Sent");
for (ii=0;ii<noSthr;ii++)
{
fprintf(LOGFP, " %d/%04d",
ii,
ThreadSentCount[ii]);
ThreadSentCount[ii]=0;
}
putc('\n',LOGFP);
CLOSELOG(0);
}
// Check for hung threads.
stillRunning=noSthr;
if (!sendSleep
|| seconds > (lastSendWatch + (2*timeint/MICROSEC_PER_SEC)))
{
for (ii=0;ii<noSthr;ii++)
{
status = pthread_mutex_lock(&thrState[Sender][ii].opLock);
LOGERR;
if (thrState[Sender][ii].signalled == 0)
{
OPENLOG("a");
fprintf(LOGFP, "Thread %2d: didn't emit signal.\n",
ii);
CLOSELOG(0);
}
thrState[Sender][ii].signalled = 0;
if (thrState[Sender][ii].watchdog == 0)
{
stillRunning--;
OPENLOG("a");
fprintf(LOGFP, "Thread %d: Sender operation trace: 0x%x\n",
ii,
thrState[Sender][ii].op);
CLOSELOG(0);
}
status = pthread_mutex_unlock(&thrState[Sender][ii].opLock);
LOGERR;
}
if (stillRunning==0)
{
OPENLOG("a");
fprintf(LOGFP,"Line %d: exit.\n", __LINE__);
CLOSELOG(0);
exit(1);
}
}
if (!(recvSleep || sendSleep)
|| (recvSleep && seconds > (lastRecvWatch + (2*timeint/MICROSEC_PER_SEC)))
|| (sendSleep && seconds > (lastSendWatch + (2*timeint/MICROSEC_PER_SEC))))
{
stillRunning=noRthr;
for (ii=0;ii<noRthr;ii++)
{
status = pthread_mutex_lock(&thrState[Receiver][ii].opLock);
LOGERR;
if (thrState[Receiver][ii].watchdog == 0)
{
stillRunning--;
OPENLOG("a");
fprintf(LOGFP, "Thread %d: Receiver operation trace: 0x%x\n",
ii,
thrState[Receiver][ii].op);
CLOSELOG(0);
}
status = pthread_mutex_unlock(&thrState[Receiver][ii].opLock);
LOGERR;
}
if (stillRunning==0)
{
OPENLOG("a");
fprintf(LOGFP,"Line %d: exit.\n", __LINE__);
CLOSELOG(0);
exit(1);
}
}
if (newSecond)
{
milliseconds = 0;
seconds++;
OPENLOG("a");
fprintf(LOGFP, "==Seconds=[%ld]==Msg=[%d]==Count=[%ld]==Received=[%ld]==TOs=[%ld]====\n", \
seconds,
msg,
counter,
received,
TOs);
for (ii=0;ii<noRthr;ii++)
{
fprintf(LOGFP, " %d/%04d/%04d",
ii,
ThreadRecvCount[ii],
ThreadTOCount[ii]);
ThreadRecvCount[ii]=0;
ThreadTOCount[ii]=0;
}
fprintf(LOGFP," : Thr/Sent");
for (ii=0;ii<noSthr;ii++)
{
fprintf(LOGFP, " %d/%04d",
ii,
ThreadSentCount[ii]);
ThreadSentCount[ii]=0;
}
putc('\n',LOGFP);
for (ii=0;ii<noSthr;ii++)
{
status = pthread_mutex_lock(&thrState[Sender][ii].opLock);
LOGERR;
fprintf(LOGFP, "S%d/0x%x ",
ii,
thrState[Sender][ii].op);
status = pthread_mutex_unlock(&thrState[Sender][ii].opLock);
LOGERR;
}
putc('\n',LOGFP);
for (ii=0;ii<noRthr;ii++)
{
status = pthread_mutex_lock(&thrState[Receiver][ii].opLock);
LOGERR;
fprintf(LOGFP, "R%d/0x%x ",
ii,
thrState[Receiver][ii].op);
status = pthread_mutex_unlock(&thrState[Receiver][ii].opLock);
LOGERR;
}
putc('\n',LOGFP);
CLOSELOG(0);
// Reset watchdogs
#if 1
if (!sendSleep
|| (sendSleep && seconds > (lastSendWatch + (2*timeint/MICROSEC_PER_SEC))))
{
lastSendWatch = seconds;
for (ii=0;ii<noSthr;ii++)
{
SetWatchdog(Sender,ii,0);
}
}
if (!(recvSleep || sendSleep)
|| (recvSleep && seconds > (lastRecvWatch + (2*timeint/MICROSEC_PER_SEC)))
|| (sendSleep && seconds > (lastSendWatch + (2*timeint/MICROSEC_PER_SEC))))
{
lastRecvWatch = seconds;
for (ii=0;ii<noRthr;ii++)
{
SetWatchdog(Receiver,ii,0);
}
}
#endif
}
}
return 0;
}
/////////////
void *
sendReq(void *arg)
{
int thr = (int)arg;
DWORD sleepTime_ms = (2*timeint*MILLISEC_PER_SEC)/MICROSEC_PER_SEC;
OPENLOG("a");
fprintf(LOGFP, "Sender Thread %2d id %lx started:", thr, GetCurrentThreadId());
if (sendSleep)
{
fprintf(LOGFP, " sleep time %ld ms", sleepTime_ms);
}
putc('\n', LOGFP);
CLOSELOG(0);
thrState[Sender][thr].watchdog = 1;
thrState[Sender][thr].signalled = 0;
while(1)
{
SetWatchdog(Sender,thr,1);
SendData(thr);
// Note: If timeint < 500 microseconds we just yield the CPU
// with Sleep(0).
if (sendSleep)
{
Sleep(sleepTime_ms);
}
}
return 0;
}
/////////////
void
SendData(int threadNum)
{
int status;
SetOp(Sender,threadNum,SLock);
status = pthread_mutex_lock(&lock);
OrOp(Sender,threadNum,ELock);
LOGERR;
PR("lock -01");
if(msg==0)
{
OrOp(Sender,threadNum,MsgFalse);
msg=1;
counter++;
ThreadSentCount[threadNum]++;
PR("unlock -01");
OrOp(Sender,threadNum,SUnlock);
status=pthread_mutex_unlock(&lock);
OrOp(Sender,threadNum,EUnlock);
LOGERR;
PR("signal -01");
OrOp(Sender,threadNum,SSignal);
status = pthread_cond_signal(&sig);
OrOp(Sender,threadNum,ESignal);
LOGERR;
thrState[Sender][threadNum].signalled++;
}
else
{
OrOp(Sender,threadNum,MsgTrue);
PR("unlock -01");
OrOp(Sender,threadNum,SUnlock);
status=pthread_mutex_unlock(&lock);
OrOp(Sender,threadNum,EUnlock);
LOGERR;
}
}
///////////////
void *
recvReq(void *arg)
{
int thr = (int)arg;
OPENLOG("a");
fprintf(LOGFP, "Receiver Thread %2d id %lx started\n", thr, GetCurrentThreadId());
CLOSELOG(0);
thrState[Receiver][thr].watchdog = 1;
ThreadRecvCount[thr]=0;
ThreadTOCount[thr]=0;
while(1)
{
SetWatchdog(Receiver,thr,1);
RecvData(thr);
if (recvSleep)
{
Sleep(0);
}
}
return 0;
}
///////////////
void
RecvData(int threadNum)
{
int status;
SetOp(Receiver,threadNum,SLock);
status = pthread_mutex_lock(&lock);
OrOp(Receiver,threadNum,ELock);
LOGERR;
PR("lock -11");
while (msg == 0)
{
struct timespec abstime;
struct _timeb currSysTime;
OrOp(Receiver,threadNum,MsgFalse);
_ftime(&currSysTime);
abstime.tv_sec = currSysTime.time;
abstime.tv_nsec = NANOSEC_PER_MILLISEC * currSysTime.millitm;
// printf("Now: %ld.%ld\n", abstime.tv_sec, abstime.tv_nsec);
// fflush(stdout);
abstime.tv_nsec += (timeint%MICROSEC_PER_SEC)*MICROSEC_PER_NANOSEC;
if (abstime.tv_nsec >= NANOSEC_PER_SEC)
{
abstime.tv_nsec -= NANOSEC_PER_SEC;
abstime.tv_sec++;
}
abstime.tv_sec += timeint/MICROSEC_PER_SEC;
// printf("TO : %ld.%ld\n", abstime.tv_sec, abstime.tv_nsec);
// fflush(stdout);
PR("wait/unlock -11");
OrOp(Receiver,threadNum,SWait);
status = pthread_cond_timedwait(&sig, &lock, &abstime);
OrOp(Receiver,threadNum,EWait);
PR("lock/awake -11");
if (status == ETIMEDOUT)
{
ThreadTOCount[threadNum]++;
TOs++;
PR("timeout -11");
PR("unlock -11");
OrOp(Receiver,threadNum,WaitTimeout);
OrOp(Receiver,threadNum,SUnlock);
status=pthread_mutex_unlock(&lock);
OrOp(Receiver,threadNum,EUnlock);
LOGERR;
return ;
}
LOGERR;
}
if (msg==1)
{
OrOp(Receiver,threadNum,MsgTrue);
}
ThreadRecvCount[threadNum]++;
msg=0;
received++;
PR("unlock -11");
OrOp(Receiver,threadNum,SUnlock);
status=pthread_mutex_unlock(&lock);
OrOp(Receiver,threadNum,EUnlock);
LOGERR;
return ;
}
Attachment:
condvar10.h
Description: type
CP = copy RM = erase MKDIR = mkdir TOUCH = echo Passed > ECHO = @echo CPHDR = pthread.h semaphore.h sched.h #OPTIM = /O2 /Ob2 OPTIM = # C++ Exceptions VCEFLAGS = /GX /TP /DPtW32NoCatchWarn /D__CLEANUP_CXX VCELIB = ../lib/pthreadVCE.lib VCEDLL = pthreadVCE.dll # Structured Exceptions VSEFLAGS = /D__CLEANUP_SEH VSELIB = ../lib/pthreadVSE.lib VSEDLL = pthreadVSE.dll # C cleanup code VCFLAGS = /D__CLEANUP_C VCLIB = ../lib/pthreadVC.lib VCDLL = pthreadVC.dll # C++ Exceptions in application - using VC version of pthreads dll VCXFLAGS = /GX /TP /D__CLEANUP_C CFLAGS= $(OPTIM) /W3 /WX /MD /nologo /Yd /Zi -D_WIN32_WINNT=0x400 LFLAGS= /INCREMENTAL:NO INCLUDES=-I. -I../include COPYFILES = $(CPHDR) $(CPLIB) $(CPDLL) TEST = condvar10.exe EHFLAGS = default: VC VCE: @ nmake CPLIB="$(VCELIB)" CPDLL="$(VCEDLL)" EHFLAGS="$(VCEFLAGS)" $(TEST) VSE: @ nmake CPLIB="$(VSELIB)" CPDLL="$(VSEDLL)" EHFLAGS="$(VSEFLAGS)" $(TEST) VC: @ nmake CPLIB="$(VCLIB)" CPDLL="$(VCDLL)" EHFLAGS="$(VCFLAGS)" $(TEST) VCX: @ nmake CPLIB="$(VCLIB)" CPDLL="$(VCDLL)" EHFLAGS="$(VCXFLAGS)" $(TEST) .c.exe: @ $(ECHO) $(CC) $(EHFLAGS) $(CFLAGS) $(INCLUDES) $< /Fe$@ /link $(LFLAGS) $(CPLIB) @ $(CC) $(EHFLAGS) $(CFLAGS) $(INCLUDES) $< /Fe$@ /link $(LFLAGS) $(CPLIB) .c.i: @ $(CC) /P $(VCEFLAGS) $(CFLAGS) $(INCLUDES) $< $(COPYFILES): @ $(ECHO) Copying $@ @ $(CP) $(BUILD_DIR)\$@ . pthread.dll: @ $(CP) $(CPDLL) $*.dll @ $(CP) $(CPLIB) $*.lib clean: - $(RM) *.e - $(RM) *.i - $(RM) *.obj - $(RM) *.pdb - $(RM) *.o - $(RM) *.asm - $(RM) *.exe - $(RM) *.log
CP = copy MV = rename RM = erase MKDIR = mkdir TOUCH = echo Passed > ECHO = @echo MAKE = make # # Mingw32 # GLANG = c++ CC = gcc XXCFLAGS = CFLAGS = -O3 -g -UNDEBUG -Wall $(XXCFLAGS) #CFLAGS = -g -O0 -UNDEBUG -Wall $(XXCFLAGS) INCLUDES = -I. -I../include LIBDIRS = -L. -L../lib GCX = DUMMY HDR = pthread.h semaphore.h sched.h LIB = ../lib/libpthread$(GCX).a DLL = pthread$(GCX).dll # If a test case returns a non-zero exit code to the shell, make will # stop. TEST = condvar10.exe GC: $(MAKE) GCX=GC XXCFLAGS="-x c -D__CLEANUP_C" $(TEST) GCE: $(MAKE) GCX=GCE XXCFLAGS="-mthreads -x c++ -D__CLEANUP_CXX" $(TEST) GCX: $(MAKE) GCX=GC XXCFLAGS="-mthreads -x c++ -D__CLEANUP_C" $(TEST) %.exe: %.c @ $(ECHO) Compiling $@ @ $(ECHO) $(CC) $(CFLAGS) -o $@ $^ $(INCLUDES) $(LIBDIRS) -lpthread$(GCX) @ $(CC) $(CFLAGS) -o $@ $^ $(INCLUDES) $(LIBDIRS) -lpthread$(GCX) %.pre: %.c @ $(CC) -E $(CFLAGS) -o $@ $^ $(INCLUDES) %.s: %.c @ $(CC) -S $(CFLAGS) -o $@ $^ $(INCLUDES) clean: - $(RM) *.a - $(RM) *.e - $(RM) *.i - $(RM) *.obj - $(RM) *.pdb - $(RM) *.exe - $(RM) *.log
| Index Nav: | [Date Index] [Subject Index] [Author Index] [Thread Index] | |
|---|---|---|
| Message Nav: | [Date Prev] [Date Next] | [Thread Prev] [Thread Next] |