Read-Write Lockパターン

増補改訂版 Java言語で学ぶデザインパターン入門 マルチスレッド編』にある
「Read-Write Lockパターン」をC++ Builder 2009で実装してみました。

プログラムの意味は『増補改訂版 Java言語で学ぶデザインパターン入門 マルチスレッド編』をご覧下さい。

さて、今回もJavaのnotifyAll()をどのように実装するかが問題でした。
C++Builder2009では、Javaのように待機中のすべてのスレッドをウェイトセットから出すnotifyAll()がありません。
できることはnotify()と同じで、ウェイトセットから出せるスレッドは1つだけです。

ガード条件を考えて、書き込み待ちスレッドがあればそれを、なければ読み込み待ちスレッドを起こすようにしました。
そのため、書き込み待ちスレッドと読み込み待ちスレッドを分けて(WaitReadメソッドとWaitWriteメソッド)管理しています。

//スレッドセーフな出力
std::unique_ptr<TCriticalSection> CriticalSection(new TCriticalSection);
void Print(const std::string& S)
{
  CriticalSection->Acquire();
  std::cout << S << std::endl;
  CriticalSection->Release();
}
//TReadWriteLockクラス
class TReadWriteLock
{
public:
  TReadWriteLock()
       : FReadingReaders(0), FWaitingWriters(0), FWritingWriters(0),
         FPreferWriter(true), FCriticalSection(new TCriticalSection) {};
  void ReadLock(TEvent* Event)
  {
    FCriticalSection->Acquire();
    while (FWritingWriters > 0 || (FPreferWriter && FWaitingWriters > 0))
    {
      WaitRead(Event);
    }
    //実際に読んでいるスレッドの数を1増やす
    FReadingReaders++;
    FCriticalSection->Release();
  };
  void ReadUnlock()
  {
    FCriticalSection->Acquire();
    //実際に読んでいるスレッドの数を1減らす
    FReadingReaders--;
    FPreferWriter = true;
    NotifyAll();
    FCriticalSection->Release();
  };
  void WriteLock(TEvent* Event)
  {
    FCriticalSection->Acquire();
    //書くのを待っているスレッドの数を1増やす
    FWaitingWriters++;
    while (FReadingReaders > 0 || FWritingWriters > 0)
    {
      WaitWrite(Event);
    }
    //書くのを待っているスレッドの数を1減らす
    FWaitingWriters--;
    //実際に書いているスレッドの数を1増やす
    FWritingWriters++;
    FCriticalSection->Release();
  };
  void WriteUnlock()
  {
    FCriticalSection->Acquire();
    //実際に書いているスレッドの数を1減らす
    FWritingWriters--;
    FPreferWriter = false;
    NotifyAll();
    FCriticalSection->Release();
  };
private:
  int FReadingReaders; //実際に読んでいる最中のスレッドの数
  int FWaitingWriters; //書くのを待っているスレッドの数
  int FWritingWriters; //実際に書いている最中のスレッドの数
  bool FPreferWriter;
  std::queue<TEvent*> FReadEvents;
  std::queue<TEvent*> FWriteEvents;
  std::unique_ptr<TCriticalSection> FCriticalSection;
  void WaitRead(TEvent* Event)
  {
    FReadEvents.push(Event);
    FCriticalSection->Release();
    Event->WaitFor(INFINITE);
    FCriticalSection->Acquire();
  }
  void WaitWrite(TEvent* Event)
  {
    FWriteEvents.push(Event);
    FCriticalSection->Release();
    Event->WaitFor(INFINITE);
    FCriticalSection->Acquire();
  }
  //すべてのイベントに対して通知を行う
  void NotifyAll()
  {
    if (!FWriteEvents.empty())
    {
      FWriteEvents.front()->SetEvent();
      FWriteEvents.pop();
    }
    else if (!FReadEvents.empty())
    {
      FReadEvents.front()->SetEvent();
      FReadEvents.pop();
    }
  };
};
//読み込むためのロックのRAII
class TReadLock
{
public:
  TReadLock(TReadWriteLock* Lock, TEvent* Event) : FLock(Lock)
  {
    FLock->ReadLock(Event);
  };
  ~TReadLock()
  {
    FLock->ReadUnlock();
  }
private:
  TReadWriteLock* FLock;
};
//書き込むためのロックのRAII
class TWriteLock
{
public:
  TWriteLock(TReadWriteLock* Lock, TEvent* Event) : FLock(Lock)
  {
    FLock->WriteLock(Event);
  };
  ~TWriteLock()
  {
    FLock->WriteUnlock();
  }
private:
  TReadWriteLock* FLock;
};
//TDataクラス
class TData
{
public:
  TData(const int Count) : FLock(new TReadWriteLock()),
  FBuffer(std::string(Count, '*'))
  {
  };
  std::string Read(TEvent* Event)
  {
    TReadLock lock(FLock, Event);
    return DoRead();
  };
  void Write(char C, TEvent* Event)
  {
    TWriteLock lock(FLock, Event);
    DoWrite(C);

  }
private:
  std::string FBuffer;
  TReadWriteLock* FLock;
  std::string DoRead()
  {
    Slowly();
    return FBuffer;
  };
  void DoWrite(char C)
  {
    for (std::string::size_type i = 0; i < FBuffer.size(); i++)
    {
      FBuffer[i] = C;
      Slowly();
    }
  };
  void Slowly()
  {
    Sleep(50);
  };
};
//TWriterThreadクラス
class TWriterThread : public TThread
{
public:
  __fastcall TWriterThread(TData* Data, std::string Filter)
       : TThread(false), FData(Data), FFilter(Filter), FIndex(0),
         FEvent(new TEvent(NULL, false, false, "", false)) {};
protected:
  void __fastcall Execute()
  {
    while (true)
    {
      char c = NextChar();
      FData->Write(c, FEvent.get());
      Sleep(random(3000));
    }
  };
private:
  TData* FData;
  std::string FFilter;
  unsigned int FIndex;
  std::unique_ptr<TEvent> FEvent;
  char NextChar()
  {
    char c = FFilter[FIndex];
    FIndex++;
    if (FIndex >= FFilter.size())
    {
      FIndex = 0;
    }
    return c;
  };
};
//TReaderThreadクラス
class TReaderThread : public TThread
{
public:
  __fastcall TReaderThread(std::string Name, TData* Data)
       : TThread(false), FName(Name), FData(Data),
         FEvent(new TEvent(NULL, false, false, "", false)) {};
protected:
  void __fastcall Execute()
  {
    while (true)
    {
      std::string readbuf = FData->Read(FEvent.get());
      Print(FName + " reads " + readbuf);
    }
  };
private:
  const std::string FName;
  TData* FData;
  std::unique_ptr<TEvent> FEvent;
};

int _tmain(int argc, _TCHAR* argv[])
{
  TData* data = new TData(10);
  new TReaderThread("TReaderThread-1", data);
  new TReaderThread("TReaderThread-2", data);
  new TReaderThread("TReaderThread-3", data);
  new TReaderThread("TReaderThread-4", data);
  new TReaderThread("TReaderThread-5", data);
  new TReaderThread("TReaderThread-6", data);
  new TWriterThread(data, "ABCDEFGHIJKLMNOPQRSTUVWXYZ");
  new TWriterThread(data, "abcdefghijklmnopqrstuvwxyz");

  Sleep(INFINITE);
  return 0;
}

コメント

  1. Pingback: 増補改訂版 Java言語で学ぶデザインパターン入門 マルチスレッド編 « 山本隆の開発日誌

  2. コメントありがとうございます。System::TMonitorクラス便利ですね。
    System::TMonitorクラスを使った処理を別の記事として書きたいと思います。

  3. ちょっと訂正です。
    System::TMonitorクラスではなくSystem::TMonitor構造体でした。

  4. Pingback: [C++Builder] TMultiReadExclusiveWriteSynchronizerでRead-Write Lockパターン « 山本隆の開発日誌

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

このサイトはスパムを低減するために Akismet を使っています。コメントデータの処理方法の詳細はこちらをご覧ください