C#消息隊列應(yīng)用程序 -1
發(fā)表時間:2024-06-14 來源:明輝站整理相關(guān)軟件相關(guān)文章人氣:
[摘要]簡介 Microsoft近期推出一種用于生成集成應(yīng)用程序的新平臺——Microsoft .NET框架。.NET 框架允許開發(fā)人員使用任何編程語言迅速生成和部署Web 服務(wù)和應(yīng)用程序。Microsoft Intermediate Language (MSIL)和實時 (JIT )編譯器使這種不依...
簡介
Microsoft近期推出一種用于生成集成應(yīng)用程序的新平臺——Microsoft
.NET框架。.NET 框架允許開發(fā)人員使用任何編程語言迅速生成和部署Web
服務(wù)和應(yīng)用程序。Microsoft Intermediate Language (MSIL)和實時
(JIT )編譯器使這種不依賴語言的框架得以實現(xiàn)。
與.NET框架同時面世的還有一種新的編程語言C#(讀“C sharp”)。
C#是一種簡單、新穎、面向?qū)ο蠛皖愋桶踩木幊陶Z言。利用 .NET 框架
和 C# (除 Microsoft? Visual Basic ?和 Managed C++之外),用戶
可以編寫功能強大的 Microsoft Windows?和 Web應(yīng)用程序及服務(wù)。本文
提供了這樣的一個解決方案,它的重點是 .NET 框架和 C# 而不是編程語
言。C#語言的介紹可以在“ C# 簡介和概述(英文)”找到。
近期的文章“MSMQ:可伸縮、高可用性的負(fù)載平衡解決方案(英文)”
介紹了一種解決方案,用于高可用性消息隊列(MSMQ)的可伸縮負(fù)載平衡
解決方案體系結(jié)構(gòu)。此解決方案中涉及了一種將 Windows服務(wù)用作智能消
息路由器的開發(fā)方案。這樣的解決方案以前只有 Microsoft Visual C++
程序員才能實現(xiàn),而 .NET 框架的出現(xiàn)改變了這種情況。從下面的解決方
案中,您可以看到這一點。
.NET 框架應(yīng)用程序
這里介紹的解決方案是一種用來處理若干消息隊列的 Windows服務(wù);
其中每個隊列都是由多個線程進(jìn)行處理(接收和處理消息)。處理程序使
用循環(huán)法技術(shù)或應(yīng)用程序特定值(消息 AppSpecific屬性)從目的隊列列
表中路由消息,并使用消息屬性來調(diào)用組件方法。(示例進(jìn)程也屬于這種
情況。)在后一種情況下,組件的要求是它能夠?qū)崿F(xiàn)給定的接口IWeb
Message要處理錯誤,應(yīng)用程序需要將不能處理的消息發(fā)送到錯誤隊列中。
消息應(yīng)用程序的結(jié)構(gòu)與以前的活動模板庫(ATL )應(yīng)用程序相似,它
們之間的主要不同在于用于管理服務(wù)的代碼的封裝和 .NET 框架組件的使
用。要創(chuàng)建Windows服務(wù),.NET框架用戶僅僅需要創(chuàng)建一個從 ServiceBase
(來自System.ServiceControl程序集)繼承的類。這毫不奇怪,因為.NET
框架是面向?qū)ο蟮摹?
應(yīng)用程序結(jié)構(gòu)
應(yīng)用程序中主要的類是 ServiceControl ,它是從 ServiceBase繼承
的。因而,它必須實現(xiàn) OnStart和 OnStop 方法,以及可選的 OnPause和
OnContinue方法。事實上,類是在靜態(tài)方法 Main 內(nèi)構(gòu)造的:
using System;
using System.ServiceProcess;
public class ServiceControl: ServiceBase
{
// 創(chuàng)建服務(wù)對象的主入口點
public static void Main()
{
ServiceBase.Run(new ServiceControl());
}
// 定義服務(wù)參數(shù)的構(gòu)造對象
public ServiceControl()
{
CanPauseAndContinue = true;
ServiceName = "MSDNMessageService";
AutoLog = false;
}
protected override void OnStart(string[] args) {...}
protected override void OnStop() {...}
protected override void OnPause() {...}
protected override void OnContinue() {...}
}
ServiceControl類創(chuàng)建一系列 CWorker對象,即,為需要處理的每個
消息隊列創(chuàng)建 CWorker類的一個實例。根據(jù)定義中處理隊列所需的線程數(shù)
目,CWorker 類依次創(chuàng)建了一系列的 CWorkerThread對象。CWorkerThread
類創(chuàng)建的一個處理線程將執(zhí)行實際的服務(wù)工作。
使用 CWorker和 CWorkerThread類的主要目的是確認(rèn)服務(wù)控件 Start、
Stop、Pause 和 Continue 命令。因為這些進(jìn)程必須是無阻塞的,命令操
作最終將在后臺處理線程上執(zhí)行。
CWorkerThread 是一個抽象類,被 CWorkerThreadAppSpecific 、
CWorkerThreadRoundRobin 和 CWorkerThreadAssembly繼承。這些類以不
同的方式處理消息。前兩個類通過給另一隊列發(fā)送消息來處理消息(其不
同之處在于確定接收隊列路徑的方式),最后一個類則使用消息屬性來調(diào)
用組件方法。
.NET 框架內(nèi)部的錯誤處理是以基類 Exception為基礎(chǔ)的。當(dāng)系統(tǒng)引
發(fā)或捕獲錯誤時,這些錯誤必須是從 Exception中導(dǎo)出的類。CWorker
ThreadException 類就是這樣一種實現(xiàn),它通過附加額外屬性(用于定義
服務(wù)是否應(yīng)繼續(xù)運行)來擴展基類。
最后,應(yīng)用程序包含兩種結(jié)構(gòu)。這些值類型定義了輔助進(jìn)程或線程的
運行時參數(shù),以簡化 CWorker和 CWorkerThread對象的結(jié)構(gòu)。使用值類型
結(jié)構(gòu)(而不是引用類型類)能夠確保這些運行時參數(shù)維護(hù)的是數(shù)值(而不
是引用)。
IWebMessage 接口
CWorkerThread 的實現(xiàn)之一是一個調(diào)用組件方法的類。這個名為
CWorkerThreadAssembly 的類使用 IWebMessage接口來定義服務(wù)和組件之
間的約定。
與當(dāng)前版本的 Microsoft Visual Studio?不同,C#接口可以在任何
語言中顯式定義,而不需要創(chuàng)建和編譯 IDL文件。C# IWebMessage接口的
定義如下:
public interface IWebMessage
{
WebMessageReturn Process(string sMessageLabel, string sMessage
Body, int iAppSpecific);
void Release();
}
ATL 代碼中的 Process 方法是為處理消息而指定的。Process 方法的返
回代碼定義為枚舉類型 WebMessageReturn:
public enum WebMessageReturn
{
ReturnGood,
ReturnBad,
ReturnAbort
}
枚舉的定義如下:Good表示繼續(xù)處理,Bad 表示將消息寫入錯誤隊列,
Abort 表示終止處理。Release 方法為服務(wù)提供了輕松清除類實例的途徑。
因為僅在垃圾回收的過程中才調(diào)用類實例的析構(gòu)函數(shù),所以確保所有占用
昂貴資源(例如數(shù)據(jù)庫連接)的類都有一個能夠在析構(gòu)之前被調(diào)用的方法,
用來釋放這些資源,這是一種非常好的構(gòu)思。
名稱空間
在這里先簡單介紹一下名稱空間。名稱空間允許在內(nèi)部和外部表示中
將應(yīng)用程序組織成為邏輯元素。服務(wù)內(nèi)的所有代碼都包含在 MSDNMessage
Service.Service 名稱空間內(nèi)。盡管服務(wù)代碼包含在若干文件中,但是由
于它們包含在同一名稱空間中,因此用戶不需要引用其他文件。
由于 IWebMessage接口包含在 MSDNMessageService.Interface 名稱
空間中,因此使用此接口的線程類具有一個接口名稱空間。
服務(wù)類
應(yīng)用程序的目的是監(jiān)視和處理消息隊列,每一隊列在收到消息時都執(zhí)
行不同的進(jìn)程。應(yīng)用程序是作為 Windows服務(wù)來實現(xiàn)的。
ServiceBase 類
如前所述,服務(wù)的基本結(jié)構(gòu)是從 ServiceBase繼承的類。重要的方法
包括 OnStart、OnStop、OnPause 和 OnContinue ,每一個替代方法都與
一個服務(wù)控制操作直接對應(yīng)。OnStart 方法的目的是創(chuàng)建 CWorker對象,
而 CWorker類又創(chuàng)建 CWorkerThread對象,然后在該對象中創(chuàng)建執(zhí)行服務(wù)
工作的線程。
服務(wù)的運行時配置(以及 CWorker和 CWorkerThread對象的屬性)是
在基于 XML的配置文件中維護(hù)的。它的名稱與創(chuàng)建的 .exe 文件相同,但
帶有一個 .cfg 后綴。配置示例如下:
〈?xml version="1.0"?〉
〈configuration〉
〈ProcessList〉
〈ProcessDefinition
ProcessName="Worker1"
ProcessDesc="Message Worker with 2 Threads"
ProcessType="AppSpecific"
ProcessThreads="2"
InputQueue=".\private$\test_load1"
ErrorQueue=".\private$\test_error"〉
〈OutputList〉
〈OutputDefinition OutputName=".\private$\test_out11" /〉
〈OutputDefinition OutputName=".\private$\test_out12" /〉
〈/OutputList〉
〈/ProcessDefinition〉
〈ProcessDefinition
ProcessName="Worker2"
ProcessDesc="Assembly Worker with 1 Thread"
ProcessType="Assembly"
ProcessThreads="1"
InputQueue=".\private$\test_load2"
ErrorQueue=".\private$\test_error"〉
〈OutputList〉
〈OutputDefinition OutputName="C:\MSDNMessageService\Message
Example.dll" /〉
〈OutputDefinition OutputName="MSDNMessageService.Message
Sample.ExampleClass"/〉
〈/OutputList〉
〈/ProcessDefinition〉
〈/ProcessList〉
〈/configuration〉
對此信息的訪問通過來自 System.Configuration 程序集的 Config
Manager 類來管理。靜態(tài) Get方法返回信息的集合,這些集合將被枚舉以
獲得單個屬性。這些屬性集的設(shè)置決定了輔助對象的運行時特征。除了這
一配置文件,您還應(yīng)該創(chuàng)建定義 XML文件結(jié)構(gòu)的圖元文件,并在其中引用
位于服務(wù)器 machine.cfg配置文件中的圖元文件:
〈?xml version ="1.0"?〉
〈MetaData xmlns="x-schema:CatMeta.xms"〉
〈DatabaseMeta InternalName="MessageService"〉
〈ServerWiring Interceptor="Core_XMLInterceptor"/〉
〈Collection
InternalName="Process" PublicName="ProcessList"
PublicRowName="ProcessDefinition"
SchemaGeneratorFlags="EMITXMLSCHEMA"〉
〈Property InternalName="ProcessName" Type="String" Meta
Flags="PRIMARYKEY" /〉
〈Property InternalName="ProcessDesc" Type="String" /〉
〈Property InternalName="ProcessType" Type="Int32" Default
Value="RoundRobin" 〉
〈Enum InternalName="RoundRobin" Value="0"/〉
〈Enum InternalName="AppSpecific" Value="1"/〉
〈Enum InternalName="Assembly" Value="2"/〉
〈/Property〉
〈Property InternalName="ProcessThreads" Type="Int32"
DefaultValue="1" /〉
〈Property InternalName="InputQueue" Type="String" /〉
〈Property InternalName="ErrorQueue" Type="String" /〉
〈Property InternalName="OutputName" Type="String" /〉
〈QueryMeta InternalName="All" MetaFlags="ALL" /〉
〈QueryMeta InternalName="QueryByFile" CellName="__FILE"
Operator="EQUAL" /〉
〈/Collection〉
〈Collection
InternalName="Output" PublicName="OutputList"
PublicRowName="OutputDefinition"
SchemaGeneratorFlags="EMITXMLSCHEMA"〉
〈Property InternalName="ProcessName" Type="String" Meta
Flags="PRIMARYKEY" /〉
〈Property InternalName="OutputName" Type="String" Meta
Flags="PRIMARYKEY" /〉
〈QueryMeta InternalName="All" MetaFlags="ALL" /〉
〈QueryMeta InternalName="QueryByFile" CellName="__FILE"
Operator="EQUAL" /〉
〈/Collection〉
〈/DatabaseMeta〉
〈RelationMeta
PrimaryTable="Process" PrimaryColumns="ProcessName"
ForeignTable="Output" ForeignColumns="ProcessName"
MetaFlags="USECONTAINMENT"/〉
〈/MetaData〉
由于 Service類必須維護(hù)一個已創(chuàng)建輔助對象的列表,因此使用了
Hashtable 集合,用于保持類型對象的名稱/ 數(shù)值對列表。Hashtable 不
僅支持枚舉,還允許通過關(guān)鍵字來查詢值。在應(yīng)用程序中,XML 進(jìn)程名稱
是唯一的關(guān)鍵字:
private Hashtable htWorkers = new Hashtable();
IConfigCollection cWorkers = ConfigManager.Get("ProcessList", new
AppDomainSelector());
foreach (IConfigItem ciWorker in cWorkers)
{
WorkerFormatter sfWorker = new WorkerFormatter();
sfWorker.ProcessName = (string)ciWorker["ProcessName"];
sfWorker.ProcessDesc = (string)ciWorker["ProcessDesc"];
sfWorker.NumberThreads = (int)ciWorker["ProcessThreads"];
sfWorker.InputQueue = (string)ciWorker["InputQueue"];
sfWorker.ErrorQueue = (string)ciWorker["ErrorQueue"];
// 計算并定義進(jìn)程類型
switch ((int)ciWorker["ProcessType"])
{
case 0:
sfWorker.ProcessType = WorkerFormatter.SFProcessType.
ProcessRoundRobin;
break;
case 1:
sfWorker.ProcessType = WorkerFormatter.SFProcessType.
ProcessAppSpecific;
break;
case 2:
sfWorker.ProcessType = WorkerFormatter.SFProcessType.
ProcessAssembly;
break;
default:
throw new Exception("Unknown Processing Type");
}
// 執(zhí)行更多的工作以讀取輸出信息
string sProcessName = (string)ciWorker["ProcessName"];
if (htWorkers.ContainsKey(sProcessName))
throw new ArgumentException("Process Name Must be Unique: "
+ sProcessName);
htWorkers.Add(sProcessName, new CWorker(sfWorker));
}
在這段代碼中沒有包含的主要信息是輸出數(shù)據(jù)的獲取。每一個進(jìn)程定
義中都有一組相應(yīng)的輸出定義項。該信息是通過如下的簡單查詢讀取的:
string sQuery = "SELECT * FROM OutputList WHERE ProcessName=" +
sfWorker.ProcessName + " AND Selector=appdomain://";
ConfigQuery qQuery = new ConfigQuery(sQuery);
IConfigCollection cOutputs = ConfigManager.Get("OutputList",
qQuery);
int iSize = cOutputs.Count, iLoop = 0;
sfWorker.OutputName = new string[iSize];
foreach (IConfigItem ciOutput in cOutputs)
sfWorker.OutputName[iLoop++] = (string)ciOutput["OutputName"];
CWorkerThread 和 Cworker類都有相應(yīng)的服務(wù)控制方法,根據(jù)服務(wù)控
制操作進(jìn)行調(diào)用。由于 Hashtable中引用了每一個 CWorker對象,因此需
要枚舉 Hashtable的內(nèi)容,以調(diào)用適當(dāng)?shù)姆⻊?wù)控制方法:
foreach (CWorker cWorker in htWorkers.Values)
cWorker.Start();
類似地,實現(xiàn)的 OnPause、OnContinue和 OnStop 方法是通過調(diào)用
CWorker 對象上的相應(yīng)方法來執(zhí)行操作的。
CWorker 類
CWorker 類的主要功能是創(chuàng)建和管理 CWorkerThread對象。Start 、
Stop、Pause 和 Continue 方法調(diào)用相應(yīng)的 CWorkerThread方法。實際的
CWorkerThread 對象是在Start 方法中創(chuàng)建的。與使用 Hashtable管理輔
助對象引用的 Service類相似,CWorker 使用 ArrayList(簡單的動態(tài)數(shù)
組)來維護(hù)線程對象的列表。