『増補改訂版 Java言語で学ぶデザインパターン入門 マルチスレッド編』にある
「Read-Write Lockパターン」をC++ Builder 2009で実装してみました。
プログラムの意味は『増補改訂版 Java言語で学ぶデザインパターン入門 マルチスレッド編』をご覧下さい。
さて、今回もJavaのnotifyAll()をどのように実装するかが問題でした。
C++Builder2009では、Javaのように待機中のすべてのスレッドをウェイトセットから出すnotifyAll()がありません。
できることはnotify()と同じで、ウェイトセットから出せるスレッドは1つだけです。
ガード条件を考えて、書き込み待ちスレッドがあればそれを、なければ読み込み待ちスレッドを起こすようにしました。
そのため、書き込み待ちスレッドと読み込み待ちスレッドを分けて(WaitReadメソッドとWaitWriteメソッド)管理しています。
//スレッドセーフな出力
std::unique_ptr<TCriticalSection> CriticalSection(new TCriticalSection);
void Print(const std::string& S)
{
CriticalSection->Acquire();
std::cout << S << std::endl;
CriticalSection->Release();
}
//TReadWriteLockクラス
class TReadWriteLock
{
public:
TReadWriteLock()
: FReadingReaders(0), FWaitingWriters(0), FWritingWriters(0),
FPreferWriter(true), FCriticalSection(new TCriticalSection) {};
void ReadLock(TEvent* Event)
{
FCriticalSection->Acquire();
while (FWritingWriters > 0 || (FPreferWriter && FWaitingWriters > 0))
{
WaitRead(Event);
}
//実際に読んでいるスレッドの数を1増やす
FReadingReaders++;
FCriticalSection->Release();
};
void ReadUnlock()
{
FCriticalSection->Acquire();
//実際に読んでいるスレッドの数を1減らす
FReadingReaders--;
FPreferWriter = true;
NotifyAll();
FCriticalSection->Release();
};
void WriteLock(TEvent* Event)
{
FCriticalSection->Acquire();
//書くのを待っているスレッドの数を1増やす
FWaitingWriters++;
while (FReadingReaders > 0 || FWritingWriters > 0)
{
WaitWrite(Event);
}
//書くのを待っているスレッドの数を1減らす
FWaitingWriters--;
//実際に書いているスレッドの数を1増やす
FWritingWriters++;
FCriticalSection->Release();
};
void WriteUnlock()
{
FCriticalSection->Acquire();
//実際に書いているスレッドの数を1減らす
FWritingWriters--;
FPreferWriter = false;
NotifyAll();
FCriticalSection->Release();
};
private:
int FReadingReaders; //実際に読んでいる最中のスレッドの数
int FWaitingWriters; //書くのを待っているスレッドの数
int FWritingWriters; //実際に書いている最中のスレッドの数
bool FPreferWriter;
std::queue<TEvent*> FReadEvents;
std::queue<TEvent*> FWriteEvents;
std::unique_ptr<TCriticalSection> FCriticalSection;
void WaitRead(TEvent* Event)
{
FReadEvents.push(Event);
FCriticalSection->Release();
Event->WaitFor(INFINITE);
FCriticalSection->Acquire();
}
void WaitWrite(TEvent* Event)
{
FWriteEvents.push(Event);
FCriticalSection->Release();
Event->WaitFor(INFINITE);
FCriticalSection->Acquire();
}
//すべてのイベントに対して通知を行う
void NotifyAll()
{
if (!FWriteEvents.empty())
{
FWriteEvents.front()->SetEvent();
FWriteEvents.pop();
}
else if (!FReadEvents.empty())
{
FReadEvents.front()->SetEvent();
FReadEvents.pop();
}
};
};
//読み込むためのロックのRAII
class TReadLock
{
public:
TReadLock(TReadWriteLock* Lock, TEvent* Event) : FLock(Lock)
{
FLock->ReadLock(Event);
};
~TReadLock()
{
FLock->ReadUnlock();
}
private:
TReadWriteLock* FLock;
};
//書き込むためのロックのRAII
class TWriteLock
{
public:
TWriteLock(TReadWriteLock* Lock, TEvent* Event) : FLock(Lock)
{
FLock->WriteLock(Event);
};
~TWriteLock()
{
FLock->WriteUnlock();
}
private:
TReadWriteLock* FLock;
};
//TDataクラス
class TData
{
public:
TData(const int Count) : FLock(new TReadWriteLock()),
FBuffer(std::string(Count, '*'))
{
};
std::string Read(TEvent* Event)
{
TReadLock lock(FLock, Event);
return DoRead();
};
void Write(char C, TEvent* Event)
{
TWriteLock lock(FLock, Event);
DoWrite(C);
}
private:
std::string FBuffer;
TReadWriteLock* FLock;
std::string DoRead()
{
Slowly();
return FBuffer;
};
void DoWrite(char C)
{
for (std::string::size_type i = 0; i < FBuffer.size(); i++)
{
FBuffer[i] = C;
Slowly();
}
};
void Slowly()
{
Sleep(50);
};
};
//TWriterThreadクラス
class TWriterThread : public TThread
{
public:
__fastcall TWriterThread(TData* Data, std::string Filter)
: TThread(false), FData(Data), FFilter(Filter), FIndex(0),
FEvent(new TEvent(NULL, false, false, "", false)) {};
protected:
void __fastcall Execute()
{
while (true)
{
char c = NextChar();
FData->Write(c, FEvent.get());
Sleep(random(3000));
}
};
private:
TData* FData;
std::string FFilter;
unsigned int FIndex;
std::unique_ptr<TEvent> FEvent;
char NextChar()
{
char c = FFilter[FIndex];
FIndex++;
if (FIndex >= FFilter.size())
{
FIndex = 0;
}
return c;
};
};
//TReaderThreadクラス
class TReaderThread : public TThread
{
public:
__fastcall TReaderThread(std::string Name, TData* Data)
: TThread(false), FName(Name), FData(Data),
FEvent(new TEvent(NULL, false, false, "", false)) {};
protected:
void __fastcall Execute()
{
while (true)
{
std::string readbuf = FData->Read(FEvent.get());
Print(FName + " reads " + readbuf);
}
};
private:
const std::string FName;
TData* FData;
std::unique_ptr<TEvent> FEvent;
};
int _tmain(int argc, _TCHAR* argv[])
{
TData* data = new TData(10);
new TReaderThread("TReaderThread-1", data);
new TReaderThread("TReaderThread-2", data);
new TReaderThread("TReaderThread-3", data);
new TReaderThread("TReaderThread-4", data);
new TReaderThread("TReaderThread-5", data);
new TReaderThread("TReaderThread-6", data);
new TWriterThread(data, "ABCDEFGHIJKLMNOPQRSTUVWXYZ");
new TWriterThread(data, "abcdefghijklmnopqrstuvwxyz");
Sleep(INFINITE);
return 0;
}
Pingback: 増補改訂版 Java言語で学ぶデザインパターン入門 マルチスレッド編 « 山本隆の開発日誌
> C++Builder2009では、Javaのように待機中のすべてのスレッドをウェイトセットから出すnotifyAll()がありません。
Delphi 2009(C++Builder 2009)で導入された System::TMonitorクラスとそのPulseAllメソッドが使えませんか?
http://blogs.embarcadero.com/teamj/2011/05/30/2012/
コメントありがとうございます。System::TMonitorクラス便利ですね。
System::TMonitorクラスを使った処理を別の記事として書きたいと思います。
ちょっと訂正です。
System::TMonitorクラスではなくSystem::TMonitor構造体でした。
Pingback: [C++Builder] TMultiReadExclusiveWriteSynchronizerでRead-Write Lockパターン « 山本隆の開発日誌