代码拉取完成,页面将自动刷新
C#语言中,用abstract 关键字来修饰一个类时,这个类叫作抽象类。抽象类是它的所有子类的公共属性的集合,是包含一个或多个抽象方法的类。抽象类可以看作是对类的进一步抽象。在面向对象领域,抽象类主要用来进行类型隐藏
经过以上抽象和泛型思想,就有了以下产物,参数可以传入非固定类型,父类处理公共数据,子类处理业务节点细节数据;
父类:
/// <summary>
/// 实现基类
/// </summary>
/// <typeparam name="TEvent"></typeparam>
public abstract class BaseRealization<TEvent> : IEventHandler<TEvent> where TEvent : Event
{
public Task HandleAsync(TEvent @event)
{
try
{
return Task.Run(() => Receive(@event));
}
catch (Exception ex)
{
throw;
}
}
/// <summary>
/// 调度处理中心
/// </summary>
/// <param name="eEvent"></param>
public void Handle(TEvent @Event)
{
try
{
Receive(@Event);
}
catch (Exception ex)
{
throw ex;
}
}
/// <summary>
/// 节点处理消息
/// </summary>
/// <param name="eEvent">消息体</param>
protected abstract void Receive(TEvent eEvent);
}
子类实现各自的细节行为逻辑
子类代码:
/// <summary>
/// 测试实现
/// </summary>
[FlowNode("FlowNode1")]
public class TestRealization : BaseRealization<TestParameter>
{
/// <summary>
/// Test接收处理实现
/// </summary>
/// <param name="eEvent"></param>
protected override void Receive(TestParameter eEvent)
{
var json = JsonConvert.SerializeObject(eEvent);
Console.WriteLine($"**********Test**********");
Console.WriteLine($"Test 被实现了,参数name为{eEvent.TestName}");
Console.WriteLine($"**********Test**********\r\n");
}
}
为了使我们泛型类代码有对应公共参数,于是添加了泛型约束类
泛型约束类代码:
/// <summary>
/// 事件
/// </summary>
public class Event : IEvent
{
/// <summary>
/// 初始化事件
/// </summary>
public Event()
{
Id = Guid.NewGuid().ToString();
Time = DateTime.Now;
}
/// <summary>
/// 事件标识
/// </summary>
public string Id { get; set; }
/// <summary>
/// 事件时间
/// </summary>
public DateTime Time { get; }
}
为了提供给外部使用,于是我们给抽象类提供了调度接口:
抽象泛型接口代码 :
/// <summary>
/// 事件处理器
/// </summary>
/// <typeparam name="TEvent">事件类型</typeparam>
public interface IEventHandler<in TEvent> : IEventHandler where TEvent : IEvent
{
/// <summary>
/// 异步处理事件
/// </summary>
/// <param name="event">事件</param>
Task HandleAsync(TEvent @event);
/// <summary>
/// 处理事件
/// </summary>
/// <param name="event">事件</param>
void Handle(TEvent @event);
}
首先,我们需要知道哪些类属于我们业务需要调度的类,于是我们在反射时添加了特性,用于查找当前业务的节点实现,然后通过在首次启动创建,并在特性中设置节点名称FlowNodeName,方便后续可用于调度,Index表示同节点有多个流程,可使用index进行排序调度,当然也可以根据数据库中进行配置排序
特性代码:
/// <summary>
/// 节点特性
/// </summary>
public class FlowNodeAttribute : Attribute
{
/// <summary>
/// 节点名称
/// </summary>
public string FlowNodeName { get; set; }
/// <summary>
/// 当节点下有排序实现
/// </summary>
public int Index { get; set; } = 0;
/// <summary>
///
/// </summary>
/// <param name="queueName"></param>
public FlowNodeAttribute(string flowNodeName, int index = 0)
{
FlowNodeName = flowNodeName;
Index = index;
}
}
通过以上步骤,相当于实现了知道哪些节点需要执行的,那么我们通过反射加载对应的实现到内存之中,后续可通过节点调度对应实现
反射创建对象代码:
/// <summary>
/// 节点对应实现字典
/// 使用线程安全字典存放,防止多线程问题
/// </summary>
public static ConcurrentDictionary<string, ReflectionClassInfo> RealizationDictionary = new ConcurrentDictionary<string, ReflectionClassInfo>();
/// <summary>
/// 参数对应字典
/// 使用线程安全字典存放,防止多线程问题
/// </summary>
public static ConcurrentDictionary<string, List<ParameterClassInfo>> ParameterDictionary = new ConcurrentDictionary<string, List<ParameterClassInfo>>();
/// <summary>
/// 初始化
/// </summary>
private void BaseBusInit()
{
System.Reflection.Assembly[] allAssembly = AppDomain.CurrentDomain.GetAssemblies();
System.Reflection.Assembly.GetExecutingAssembly();
List<Type> realizationTypes = new List<Type>();
Assembly.GetAssembly(typeof(IEventHandler<>));
foreach (Assembly assembly in allAssembly)
{
var types = assembly.GetTypes().Where(x => (x?.BaseType?.IsGenericType ?? false) && x.GetInterfaces().Count() > 0).ToList();
realizationTypes.AddRange(types);
}
foreach (var realizationType in realizationTypes)
{
var genericArguments = realizationType.BaseType.GetGenericArguments();
if (genericArguments == null || !genericArguments.Any())
continue;
var attributions = realizationType.GetCustomAttributes(typeof(FlowNodeAttribute), false);
if (attributions == null || attributions.Length == 0)
continue;
var attribution = attributions[0] as FlowNodeAttribute;
if (attribution == null || string.IsNullOrWhiteSpace(attribution.FlowNodeName))
continue;
var flowNodeName = attribution.FlowNodeName;
var index = attribution.Index;
var reflectionClassInfo = ReflectionClassInfo.CreateInstance(realizationType, index);
if (!RealizationDictionary.ContainsKey(flowNodeName))
RealizationDictionary.TryAdd(flowNodeName, reflectionClassInfo);
var argFirst = reflectionClassInfo.GenericArguments.FirstOrDefault();
if (argFirst != null)
{
var parameterClassInfos = new List<ParameterClassInfo>()
{
new ParameterClassInfo()
{
FlowNodeName = flowNodeName,
ReflectionClassInfo = reflectionClassInfo
}
};
var argFirstName = argFirst.FullName;
if (ParameterDictionary.ContainsKey(argFirstName))
{
List<ParameterClassInfo> value = ParameterDictionary[argFirstName];
if (value == null)
continue;
var valueFirst = value.FirstOrDefault(x => x.FlowNodeName == flowNodeName);
if (valueFirst != null)
continue;
value.AddRange(parameterClassInfos);
ParameterDictionary[argFirstName] = value;
}
else
ParameterDictionary.TryAdd(argFirstName, parameterClassInfos);
}
}
}
经过以上步骤,那我们就可以根据传入参数进行调度了
static void Main(string[] args)
{
try
{
var baseBus = BaseBus.GetInstance();
baseBus.Publish(new TestParameter() { TestName = "123" });
baseBus.Publish(new Test2Parameter() { TestName = "234" });
}
catch (Exception e)
{
Console.WriteLine(e.Message);
}
Console.WriteLine("Hello World!");
}
/// <summary>
/// 消息发布
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="msg"></param>
public void Publish<T>(T msg) where T : Event
{
try
{
var argName = typeof(T).FullName;
if (ParameterDictionary.ContainsKey(argName))
{
List<ParameterClassInfo> value = ParameterDictionary[argName];
if (value == null || !value.Any()) return;
var parameters = value.OrderBy(x => x.ReflectionClassInfo.Index).ToList();
foreach (var parameter in parameters)
{
var type = new Type[] { typeof(T), parameter.ReflectionClassInfo.ClassType };
this.FastInvoke(type,
x => x.ConsumerTo<T, IEventHandler<Event>>(msg, parameter.FlowNodeName),
new object[] { msg, parameter.FlowNodeName });
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.AppendLine($"*********调度中心数据************");
stringBuilder.AppendLine($"获取到节点{parameter.FlowNodeName}");
stringBuilder.AppendLine($"获取到节点请求参数{JsonConvert.SerializeObject(msg)}");
stringBuilder.AppendLine($"节点运行结束,状态正常");
stringBuilder.AppendLine($"*********调度中心数据************");
Console.WriteLine(stringBuilder);
}
}
//TODO:此处可以获取节点运行的情况,获取对应节点是否执行完成
//也可以根据节点自动更新节点组数组
}
catch (Exception e)
{
Console.WriteLine("节点运行异常\r\n");
///记录节点运行异常
throw e;
}
}
运行结果中,我们可以看到只需要一行代码,我们可实现对应的细节实现调度,并且我们可以监控到对应实现的运行状态;
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。