明輝手游網(wǎng)中心:是一個(gè)免費(fèi)提供流行視頻軟件教程、在線學(xué)習(xí)分享的學(xué)習(xí)平臺(tái)!

C#消息隊(duì)列應(yīng)用程序 -2

[摘要]在這個(gè)數(shù)組內(nèi)部,CWorker 類創(chuàng)建了 CWorkerThread類的一個(gè)實(shí)現(xiàn)版 本。CWorkerThread 類(將在下面討論)是一個(gè)必須繼承的抽象類。導(dǎo)出 類定義了消息的處理方式: aThreads = new ArrayList(); for (int idx=0; idx〈sfWork...

  在這個(gè)數(shù)組內(nèi)部,CWorker 類創(chuàng)建了 CWorkerThread類的一個(gè)實(shí)現(xiàn)版
本。CWorkerThread 類(將在下面討論)是一個(gè)必須繼承的抽象類。導(dǎo)出
類定義了消息的處理方式:
aThreads = new ArrayList();
for (int idx=0; idx〈sfWorker.NumberThreads; idx++)
{
  WorkerThreadFormatter wfThread = new WorkerThreadFormatter();
  wfThread.ProcessName = sfWorker.ProcessName;
  wfThread.ProcessDesc = sfWorker.ProcessDesc;
  wfThread.ThreadNumber = idx;
  wfThread.InputQueue = sfWorker.InputQueue;
  wfThread.ErrorQueue = sfWorker.ErrorQueue;
  wfThread.OutputName = sfWorker.OutputName;
  // 定義輔助類型,并將其插入輔助線程結(jié)構(gòu)
  CWorkerThread wtBase;
  switch (sfWorker.ProcessType)
  {
   case WorkerFormatter.SFProcessType.ProcessRoundRobin:
     wtBase = new CWorkerThreadRoundRobin(this, wfThread);
     break;
   case WorkerFormatter.SFProcessType.ProcessAppSpecific:
     wtBase = new CWorkerThreadAppSpecific(this, wfThread);
     break;
   case WorkerFormatter.SFProcessType.ProcessAssembly:
     wtBase = new CWorkerThreadAssembly(this, wfThread);
     break;
   default:
     throw new Exception("Unknown Processing Type");
  }
  // 添加對(duì)數(shù)組的調(diào)用
  aThreads.Insert(idx, wtBase);
}

  一旦所有的對(duì)象都已創(chuàng)建,就可以通過(guò)調(diào)用每個(gè)線程對(duì)象的 Start方
法來(lái)啟動(dòng)它們:
foreach(CWorkerThread cThread in aThreads)
  cThread.Start();

  Stop、Pause 和 Continue 方法在 foreach循環(huán)里執(zhí)行的操作類似。
Stop方法具有如下的垃圾收集操作:
GC.SuppressFinalize(this);

  在類析構(gòu)函數(shù)中將調(diào)用 Stop 方法,這樣,在沒(méi)有顯式調(diào)用 Stop 方
法的情況下也可以正確地終止對(duì)象。如果調(diào)用了 Stop 方法,將不需要析
構(gòu)函數(shù)。SuppressFinalize方法能夠防止調(diào)用對(duì)象的 Finalize 方法(析
構(gòu)函數(shù)的實(shí)際實(shí)現(xiàn))。

CWorkerThread 抽象類

  CWorkerThread 是一個(gè)由 CWorkerThreadAppSpecifc、CWorkerThread
RoundRobin 和 CWorkerThreadAssembly繼承的抽象類。無(wú)論如何處理消
息,隊(duì)列的大部分處理是相同的,所以 CWorkerThread類提供了這一功能。
這個(gè)類提供了抽象方法(必須被實(shí)際方法替代)以管理資源和處理消息。

  類的工作再一次通過(guò) Start、Stop、Pause 和 Continue 方法來(lái)實(shí)現(xiàn)。
在 Start方法中引用了輸入和錯(cuò)誤隊(duì)列。在 .NET 框架中,消息由 System.
Messaging 名稱空間處理:
// 嘗試打開(kāi)隊(duì)列,并設(shè)置默認(rèn)的讀寫(xiě)屬性
MessageQueue mqInput = new MessageQueue(sInputQueue);
mqInput.MessageReadPropertyFilter.Body = true;
mqInput.MessageReadPropertyFilter.AppSpecific = true;
MessageQueue mqError = new MessageQueue(sErrorQueue);
// 如果使用 MSMQ COM,則將格式化程序設(shè)置為 ActiveX
mqInput.Formatter = new ActiveXMessageFormatter();
mqError.Formatter = new ActiveXMessageFormatter();

  一旦定義了消息隊(duì)列引用,即會(huì)創(chuàng)建一個(gè)線程用于實(shí)際的處理函數(shù)
(稱為 ProcessMessages)。在 .NET 框架中,使用 System.Threading
名稱空間很容易實(shí)現(xiàn)線程處理:
procMessage = new Thread(new ThreadStart(ProcessMessages));
procMessage.Start();

  ProcessMessages 函數(shù)是基于 Boolean值的處理循環(huán)。當(dāng)數(shù)值設(shè)為
False,處理循環(huán)將終止。因此,線程對(duì)象的 Stop 方法只設(shè)置這一Boolean
值,然后關(guān)閉打開(kāi)的消息隊(duì)列,并加入帶有主線程的線程:
// 加入服務(wù)線程和處理線程
bRun = false;
procMessage.Join();
// 關(guān)閉打開(kāi)的消息隊(duì)列
mqInput.Close();
mqError.Close();

Pause 方法只設(shè)置一個(gè) Boolean 值,使處理線程休眠半秒鐘:

if (bPause)
  Thread.Sleep(500);

  最后,每一個(gè) Start、Stop、Pause 和 Continue 方法將調(diào)用抽象的
OnStart 、OnStop、OnPause 和 OnContinue 方法。這些抽象方法為實(shí)現(xiàn)
的類提供了掛鉤,以捕獲和釋放所需的資源。

  ProcessMessages 循環(huán)具有如下基本結(jié)構(gòu):
●接收Message。
●如果Message具有成功的Receive,則調(diào)用抽象ProcessMessage方法。
●如果Receive或ProcessMessage失敗,將Message發(fā)送至錯(cuò)誤隊(duì)列中。

Message mInput;
try
{
  // 從隊(duì)列中讀取,并等候 1 秒
  mInput = mqInput.Receive(new TimeSpan(0,0,0,1));
}
catch (MessageQueueException mqe)
{
  // 將消息設(shè)置為 null
  mInput = null;
  // 查看錯(cuò)誤代碼,了解是否超時(shí)
  if (mqe.ErrorCode != (-1072824293) ) //0xC00E001B
  {
   // 如果未超時(shí),發(fā)出一個(gè)錯(cuò)誤并記錄錯(cuò)誤號(hào)
   LogError("Error: " + mqe.Message);
   throw mqe;
  }
}
if (mInput != null)
{
  // 得到一個(gè)要處理的消息,調(diào)用處理消息抽象方法
  try
  {
   ProcessMessage(mInput);
  }
  // 捕獲已知異常狀態(tài)的錯(cuò)誤
  catch (CWorkerThreadException ex)
  {
   ProcessError(mInput, ex.Terminate);
  }
  // 捕獲未知異常,并調(diào)用 Terminate
  catch
  {
   ProcessError(mInput, true);
  }
}

  ProcessError方法將錯(cuò)誤的消息發(fā)送至錯(cuò)誤隊(duì)列。另外,它也可能引
發(fā)異常來(lái)終止線程。如果ProcessMessage方法引發(fā)了終止錯(cuò)誤或 CWorker
ThreadException類型,它將執(zhí)行此操作。

CworkerThread 導(dǎo)出類

  任何從 CWorkerThread中繼承的類都必須提供 OnStart、OnStop、On
Pause、OnContinue和 ProcessMessage 方法。OnStart 和 OnStop方法獲
取并釋放處理資源。OnPause 和 OnContinue 方法允許臨時(shí)釋放和重新獲
取這些資源。ProcessMessage方法應(yīng)該處理消息,并在出現(xiàn)失敗事件時(shí)引
發(fā) CWorkerThreadException 異常。

  由于 CWorkerThread構(gòu)造函數(shù)定義運(yùn)行時(shí)參數(shù),導(dǎo)出類必須調(diào)用基類
構(gòu)造函數(shù):
public CWorkerThreadDerived(CWorker v_cParent, WorkerThread
Formatter v_wfThread)
  : base (v_cParent, v_wfThread) {}

  導(dǎo)出類提供了兩種類型的處理:將消息發(fā)送至另一隊(duì)列,或者調(diào)用組
件方法。接收和發(fā)送消息的兩種實(shí)現(xiàn)使用了循環(huán)技術(shù)或應(yīng)用程序偏移(保
留在消息 AppSpecific屬性中),作為使用哪一隊(duì)列的決定因素。此方案
中的配置文件應(yīng)該包括隊(duì)列路徑的列表。實(shí)現(xiàn)的 OnStart和 OnStop 方法
應(yīng)該打開(kāi)和關(guān)閉對(duì)這些隊(duì)列的引用:
iQueues = wfThread.OutputName.Length;
mqOutput = new MessageQueue[iQueues];
for (int idx=0; idx〈iQueues; idx++)
{
  mqOutput[idx] = new MessageQueue(wfThread.OutputName[idx]);
  mqOutput[idx].Formatter = new ActiveXMessageFormatter();
}

  在這些方案中,消息的處理很簡(jiǎn)單:將消息發(fā)送必要的輸出隊(duì)列。在
循環(huán)情況下,這個(gè)進(jìn)程為:
try
{
  mqOutput[iNextQueue].Send(v_mInput);
}
catch (Exception ex)
{
  // 如果錯(cuò)誤強(qiáng)制終止異常
  throw new CWorkerThreadException(ex.Message, true);
}
// 計(jì)算下一個(gè)隊(duì)列號(hào)
iNextQueue++;
iNextQueue %= iQueues;

  后一種調(diào)用帶消息參數(shù)的組件的實(shí)現(xiàn)方法比較有趣。ProcessMessage
方法使用 IWebMessage接口調(diào)入一個(gè) .NET 組件。OnStart 和 OnStop 方
法獲取和釋放此組件的引用。

  此方案中的配置文件應(yīng)該包含兩個(gè)項(xiàng)目:完整的類名和類所在文件的
位置。按照 IWebMessage接口中的定義,在組件上調(diào)用 Process方法。

  要獲取對(duì)象引用,需要使用 Activator.CreateInstance 方法。此函
數(shù)需要一個(gè)程序集類型。在這里,它是從程序集文件路徑和類名中導(dǎo)出的。
一旦獲取對(duì)象引用,它將被放入合適的接口:
private IWebMessage iwmSample;
private string sFilePath, sTypeName;
// 保存程序集路徑和類型名稱
sFilePath = wfThread.OutputName[0];
sTypeName = wfThread.OutputName[1];
// 獲取對(duì)必要對(duì)象的引用
Assembly asmSample = Assembly.LoadFrom(sFilePath);
Type typSample = asmSample.GetType(sTypeName);
object objSample = Activator.CreateInstance(typSample);
// 定義給對(duì)象的必要接口
iwmSample = (IWebMessage)objSample;

  獲取對(duì)象引用后,ProcessMessage方法將在 IWebMessage接口上調(diào)用
Process 方法:
WebMessageReturn wbrSample;
try
{
  // 定義方法調(diào)用的參數(shù)
  string sLabel = v_mInput.Label;
  string sBody = (string)v_mInput.Body;
  int iAppSpecific = v_mInput.AppSpecific;
  // 調(diào)用方法并捕捉返回代碼
  wbrSample = iwmSample.Process(sLabel, sBody, iAppSpecific);
}
catch (InvalidCastException ex)
{
  // 如果在消息內(nèi)容中發(fā)生錯(cuò)誤,則強(qiáng)制發(fā)出一個(gè)非終止異常
  throw new CWorkerThreadException(ex.Message, false);
}
catch (Exception ex)
{
  // 如果錯(cuò)誤調(diào)用程序集,則強(qiáng)制發(fā)出終止異常
  throw new CWorkerThreadException(ex.Message, true);
}
// 如果沒(méi)有錯(cuò)誤,則檢查對(duì)象調(diào)用的返回狀態(tài)
switch (wbrSample)
{
  case WebMessageReturn.ReturnBad:
   throw new CWorkerThreadException
     ("Unable to process message: Message marked bad", false);
  case WebMessageReturn.ReturnAbort:
   throw new CWorkerThreadException
     ("Unable to process message: Process terminating", true);
  default:
   break;
}

  提供的示例組件將消息正文寫(xiě)入數(shù)據(jù)庫(kù)表。如果捕獲到嚴(yán)重?cái)?shù)據(jù)庫(kù)錯(cuò)
誤,您可能希望終止處理過(guò)程,但是在這里,僅僅將消息標(biāo)記為錯(cuò)誤的消
息。

  由于此示例中創(chuàng)建的類實(shí)例可能會(huì)獲取并保留昂貴的數(shù)據(jù)庫(kù)資源,所
以用 OnPause和 OnContinue 方法釋放和重新獲取對(duì)象引用。

檢測(cè)設(shè)備

  就象在所有優(yōu)秀的應(yīng)用程序中一樣,檢測(cè)設(shè)備用于監(jiān)測(cè)應(yīng)用程序的狀
態(tài)。。NET 框架大大簡(jiǎn)化了將事件日志、性能計(jì)數(shù)器和 Windows管理檢測(cè)
設(shè)備(WMI )納入應(yīng)用程序的過(guò)程。消息應(yīng)用程序使用時(shí)間日志和性能計(jì)
數(shù)器,二者都是來(lái)自 System.Diagnostics 程序集。

  在 ServiceBase類中,您可以自動(dòng)啟用事件日志。另外,ServiceBase
EventLog成員支持寫(xiě)入應(yīng)用程序事件日志:
EventLog.WriteEntry(sMyMessage, EventLogEntryType.Information);

  對(duì)于寫(xiě)入事件日志而不是應(yīng)用程序日志的應(yīng)用程序,它能夠很容易地
創(chuàng)建和獲取 EventLog 資源的引用(正如在 CWorker類中所做的一樣),
并能夠使用 WriteEntry 方法記錄日志項(xiàng):
private EventLog cLog;
string sSource = ServiceControl.ServiceControlName;
string sLog = "Application";
// 查看源是否存在,如果不存在,則創(chuàng)建源
if (!EventLog.SourceExists(sSource))
  EventLog.CreateEventSource(sSource, sLog);
// 創(chuàng)建日志對(duì)象,并引用現(xiàn)在定義的源
cLog = new EventLog();
cLog.Source = sSource;
// 在日志中寫(xiě)入條目,表明創(chuàng)建成功
cLog.WriteEntry("已成功創(chuàng)建", EventLogEntryType.Information);

  .NET 框架大大簡(jiǎn)化了性能計(jì)數(shù)器。對(duì)于每一個(gè)處理線程、線程導(dǎo)出
的用戶和整個(gè)應(yīng)用程序,這一消息應(yīng)用程序都能提供計(jì)數(shù)器,用于跟蹤消
息數(shù)量和每秒鐘處理消息的數(shù)量。要提供此功能,您需要定義性能計(jì)數(shù)器
的類別,然后增加相應(yīng)的計(jì)數(shù)器實(shí)例。

  性能計(jì)數(shù)器的類別在服務(wù) OnStart方法中定義。這些類別代表兩種計(jì)
數(shù)器——消息總數(shù)和每秒鐘處理的消息數(shù):
CounterCreationData[] cdMessage = new CounterCreationData[2];
cdMessage[0] = new CounterCreationData("Messages/Total", "Total
Messages Processed",
PerformanceCounterType.NumberOfItems64);
cdMessage[1] = new CounterCreationData("Messages/Second",
"Messages Processed a Second",
PerformanceCounterType.RateOfChangePerSecond32);
PerformanceCounterCategory.Create("MSDN Message Service", "MSDN
Message Service Counters", cdMessage);

  一旦定義了性能計(jì)數(shù)器類別,將創(chuàng)建 PerformanceCounter 對(duì)象以訪
問(wèn)計(jì)數(shù)器實(shí)例功能。PerformanceCounter對(duì)象需要類別、計(jì)數(shù)器名稱和一
個(gè)可選的實(shí)例名稱。對(duì)于輔助進(jìn)程,將使用來(lái)自 XML文件的進(jìn)程名稱,代
碼如下:
pcMsgTotWorker = new PerformanceCounter("MSDN Message Service",
"Messages/Total", sProcessName);
pcMsgSecWorker = new PerformanceCounter("MSDN Message Service",
"Messages/Second", sProcessName);
pcMsgTotWorker.RawValue = 0;
pcMsgSecWorker.RawValue = 0;

要增加計(jì)數(shù)器的值,僅僅需要調(diào)用適當(dāng)?shù)姆椒ǎ?

pcMsgTotWorker.IncrementBy(1);
pcMsgSecWorker.IncrementBy(1);

最后說(shuō)明一點(diǎn),服務(wù)終止時(shí),安裝的性能計(jì)數(shù)器類別應(yīng)該從系統(tǒng)中刪除:

PerformanceCounterCategory.Delete("MSDN Message Service");

  由于性能計(jì)數(shù)器在 .NET 框架中工作,因此需要運(yùn)行一項(xiàng)特殊的服務(wù)。
此服務(wù)(PerfCounterService)提供了共享內(nèi)存。計(jì)數(shù)器信息將寫(xiě)入共享
內(nèi)存,并被性能計(jì)數(shù)器系統(tǒng)讀取。

安裝

  在結(jié)束以前,我們來(lái)簡(jiǎn)要介紹一下安裝以及稱為 installutil.exe的
安裝工具。由于此應(yīng)用程序是 Windows服務(wù),它必須使用installutil.exe
來(lái)安裝。因此,需要使用一個(gè)從 System.Configuration.Install 程序集
中繼承的 Installer類:
public class ServiceRegister: Installer
{
  private ServiceInstaller serviceInstaller;
  private ServiceProcessInstaller processInstaller;
  public ServiceRegister()
  {
   // 創(chuàng)建服務(wù)安裝程序
   serviceInstaller = new ServiceInstaller();
   serviceInstaller.StartType = ServiceStart.Manual;
   serviceInstaller.ServiceName = ServiceControl.ServiceControl
   Name;
   serviceInstaller.DisplayName = ServiceControl.ServiceControl
   Desc;
   Installers.Add(serviceInstaller);
   // 創(chuàng)建進(jìn)程安裝程序
   processInstaller = new ServiceProcessInstaller();
   processInstaller.RunUnderSystemAccount = true;
   Installers.Add(processInstaller);
  }
}

  如此示例類所示,對(duì)于一個(gè) Windows服務(wù),服務(wù)和服務(wù)進(jìn)程各需要一
個(gè)安裝程序,以定義運(yùn)行服務(wù)的帳戶。其他安裝程序允許注冊(cè)事件日志和
性能計(jì)數(shù)器等資源。

總結(jié)

  從這個(gè) .NET 框架應(yīng)用程序示例中可以看出,以前只有 Visual C++
程序員能夠編寫(xiě)的應(yīng)用程序,現(xiàn)在使用簡(jiǎn)單的面向?qū)ο蟪绦蚣纯蓪?shí)現(xiàn)。盡
管我們的重點(diǎn)是 C# ,但本文所述的內(nèi)容也同樣適用于 Visual Basic 和
Managed C++.新的 .NET 框架使開(kāi)發(fā)人員能夠使用任何編程語(yǔ)言來(lái)創(chuàng)建功
能強(qiáng)大、可伸縮的 Windows應(yīng)用程序和服務(wù)。

  新的 .NET 框架不僅簡(jiǎn)化和擴(kuò)展了編程的種種可能,還能夠輕松地將
人們經(jīng)常遺忘的應(yīng)用程序檢測(cè)設(shè)備(例如性能監(jiān)測(cè)計(jì)數(shù)器和事件日志通知)
合并到應(yīng)用程序中。盡管這里的應(yīng)用程序沒(méi)有使用 Windows管理檢測(cè)設(shè)備
(WMI ),但 .NET 框架同樣也可以應(yīng)用它。