Worker-Threadパターン

増補改訂版 Java言語で学ぶデザインパターン入門 マルチスレッド編』にある
「Worker-Threadパターン」をC++ Builder 2009で実装してみました。

プログラムの意味は『増補改訂版 Java言語で学ぶデザインパターン入門 マルチスレッド編』をご覧下さい。

実装のポイントは、仕事のリクエストの受け渡しとワーカースレッドの保持とを行うTChannelクラスです。
JavaのnotifyAll()と同じ機能がC++Builder2009にはないため、工夫が必要です。
Producer-Consumerパターンと同じように、リクエストを出すスレッドとリクエストを受け取るスレッドを別々に管理しました。

//実行関数
int _tmain(int argc, _TCHAR* argv[])
{
  TChannel* channel = new TChannel(5); //ワーカースレッドの個数
  channel->StartWorkers();
  new TClientThread("Alice", channel);
  new TClientThread("Bobby", channel);
  new TClientThread("Chris", channel);

  Sleep(INFINITE);
  return 0;
}

TRequest.h

//仕事のリクエストを表しているTRequestクラス
class TRequest
{
public:
  TRequest(std::string Name, int Number) : FName(Name), FNumber(Number) {};
  /**
   * @param Name 呼び出したスレッドのスレッド名
   */
  void Execute(std::string Name)
  {
    boost::format fmt("%s executes [ Request From %s No. %d]");
    Print((fmt % Name % FName % FNumber).str());
  };
private:
  const std::string FName;
  const int FNumber;
};

TClientThread.h

//仕事のリクエストを出すTClientThreadクラス
class TClientThread : public TThread
{
public:
  /**
   * @param Name スレッド名
   * @param Channel Channel
   */
  __fastcall TClientThread(std::string Name, TChannel* Channel)
    : TThread(false), FName(Name), FChannel(Channel),
      FEvent(new TEvent(NULL, false, false, "", false)) {};
protected:
  void __fastcall Execute();
private:
  const std::string FName;
  TChannel* FChannel;
  std::unique_ptr<TEvent> FEvent;
};

TClientThread.cpp

void __fastcall TClientThread::Execute()
{
  randomize();
  for (int i = 0; true; i++)
  {
    TRequest request(FName, i);
    FChannel->PutRequest(request, FEvent.get());
    Sleep(random(1000));
  }
}

TChannel.h

//仕事のリクエストの受け渡しと、ワーカースレッドの保持とを行うTChannelクラス
class TChannel
{
public:
  TChannel(int Threads);
  void StartWorkers();
  void PutRequest(TRequest Request, TEvent* Event);
  TRequest TakeRequest(TEvent* Event);
private:
  static const int MAX_REQUEST = 100;
  std::queue<TRequest> FRequestQueue;
  std::vector<TWorkerThred*> FThreadPool;
  std::unique_ptr<TCriticalSection> FPutCriticalSection;
  std::unique_ptr<TCriticalSection> FTakeCriticalSection;
  std::queue<TEvent*> FPutEvent;
  std::queue<TEvent*> FTakeEvent;
  void WaitPut(TEvent* Event);
  void WaitTake(TEvent* Event);
  void NotifyPut();
  void NotifyTake();
};

TChannel.cpp

TChannel::TChannel(int Threads)
  : FPutCriticalSection(new TCriticalSection),
    FTakeCriticalSection(new TCriticalSection)
{
  boost::format fmt("Worker-%d");
  for (int i = 0; i < Threads; i++)
  {
    FThreadPool.push_back(new TWorkerThred((fmt % i).str(), this));
  }
}
void TChannel::StartWorkers()
{
  for (std::vector<TWorkerThred*>::size_type i = 0; i < FThreadPool.size(); ++i)
  {
    FThreadPool[i]->Resume();
  }
}
void TChannel::PutRequest(TRequest Request, TEvent* Event)
{
  FPutCriticalSection->Acquire();
  while (FRequestQueue.size() >= MAX_REQUEST)
  {
    WaitPut(Event);
  }
  FRequestQueue.push(Request);
  NotifyTake();
  FPutCriticalSection->Release();
}
TRequest TChannel::TakeRequest(TEvent* Event)
{
  FTakeCriticalSection->Acquire();
  while (FRequestQueue.empty())
  {
    WaitTake(Event);
  }
  TRequest request = FRequestQueue.front();
  FRequestQueue.pop();
  NotifyPut();
  FTakeCriticalSection->Release();
  return request;
}
void TChannel::WaitPut(TEvent* Event)
{
  FPutEvent.push(Event);
  FPutCriticalSection->Release();
  Event->WaitFor(INFINITE);
  FPutCriticalSection->Acquire();
};
void TChannel::WaitTake(TEvent* Event)
{
  FTakeEvent.push(Event);
  FTakeCriticalSection->Release();
  Event->WaitFor(INFINITE);
  FTakeCriticalSection->Acquire();
}
void TChannel::NotifyPut()
{
  if (!FPutEvent.empty())
  {
    TEvent* event = FPutEvent.front();;
    FPutEvent.pop();
    event->SetEvent();
  }
}
void TChannel::NotifyTake()
{
  if (!FTakeEvent.empty())
  {
    TEvent* event = FTakeEvent.front();
    FTakeEvent.pop();
    event->SetEvent();
  }
}

TWorkerThred.h

//ワーカースレッドを表すTWorkerThredクラス
class TWorkerThred : public TThread
{
public:
  TWorkerThred(std::string Name, TChannel* Channel)
    : TThread(true), FName(Name), FChannel(Channel),
      FEvent(new TEvent(NULL, false, false, "", false)) {};
protected:
  void __fastcall Execute();
private:
  std::string FName;
  TChannel* FChannel;
  std::unique_ptr<TEvent> FEvent;
};

TWorkerThred.cpp

void __fastcall TWorkerThred::Execute()
{
  while (true)
  {
    TRequest request = FChannel->TakeRequest(FEvent.get());
    request.Execute(FName);
  }
}

コメント

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

このサイトはスパムを低減するために Akismet を使っています。コメントデータの処理方法の詳細はこちらをご覧ください