.NET Core 2.0 Event Bus A simple memory event bus implementation

1. First define an event interface

public interface IEvent
{

}

2. Define an event handling interface

 public  interface IEventHandler: IEvent
{
Task Handle(IEvent e);
}

3. Define a publishing interface

public < span style="color: #0000ff">interface IEventPublisher
{
Task Publish
(TEvent e) < span style="color: #0000ff">where TEvent: IEvent;
}

4. Define a subscription interface

public interface< /span> IEventSubscriber 
{

Task Subscribe
() where TEvent: IEvent where EH: class, IEventHandler, new();
}

5. Create a class to store Events

 public static class MemoryMq
{
public static ConcurrentDictionary<string, IEvent> eventQueueDict {get; set; }

}< /span>

6. Implement the release class

 public class InMemoryEventPublisher: IEventPublisher
{
public Task Publish(TEvent @event) where TEvent: IEvent
{
if (@event == null) return Task.CompletedTask;
if (MemoryMq.eventQueueDict == null)
{
MemoryMq.eventQueueDict
= new ConcurrentDictionary<string< /span>, IEvent>();
}
MemoryMq.eventQueueD ict.GetOrAdd(Guid.NewGuid().ToString(),@event);
return Task.CompletedTask;
}
}

7. Implement the subscription class

< pre>public class InMemoryEventSubscriber: IEventSubscriber
{
private readonly ConcurrentDictionary<string, Task> taskDict = new ConcurrentDictionary<string , Task>();

public< /span> Task Subscribe() where TEvent: IEvent
where EH: class, IEventHandler, new()
{

EH state = new EH();

Task.Run(() =>
{
while (true)
{
if (MemoryMq.eventQueueDict != null)
{
foreach (< span style="color: #0000ff">var a in MemoryMq.eventQueueDict) {
state.Handle(a.Value
as IEvent);
IEvent o;
MemoryMq.eventQueueDict.TryRemove(a.Key ,
out o);
}
}

}

});< br />
return Task.CompletedTask;
}
}

9. Test cases

namespace MemoryMqTest
{
public class EventHandler: IE ventHandler
{
public Task Handle(IEvent e, MessagingHelper h)
{
switch (e)
{

case Order value: Console.WriteLine(value.name); break ;

}
return< span style="color: #000000"> Task.CompletedTask;

}

}
public class Order: IEvent
{
< span style="color: #0000ff">public string name {get; set; }
}
class< /span> Program
{
static void Main(string[] args)
{
var servicecollection = new ServiceCollection();
servicecollection.AddSingleton
();
servicecollection.AddSingleton
();
var provider = servicecollection.BuildServiceProvider();

var eventPub = provider.GetService();

var _eventSub = provider.GetService();

_eventSub.Subscribe
();


var order = new Order();
order.name
= "test";
eventPub.Publish(order);

Console.WriteLine(
"Hello World!");

Console.ReadKey();
}
}
}

10. Test results

share picture

public interface IEvent
{

}

 public interface IEventHandler : IEvent
{
Task Handle(IEvent e);
}

public interface IEventPublisher
{
Task Publish
(TEvent e) where TEvent: IEvent;
}

public interface IEventSubscriber 
{

Task Subscribe
() where TEvent: IEvent where EH: class, IEventHandler, new< /span>();
}

 public static class MemoryMq 
{
public static ConcurrentDictionary<string, IEvent> eventQueueDict {get; set; }

}

 public class InMemoryEventPublisher: IEventPublisher
{
public Task Publish(TEvent @event) where TEvent: IEvent
{
if (@event == null) return Task.CompletedTask;
if (MemoryMq.eventQueueDict == null)
{
MemoryMq.eventQueueDict
= new ConcurrentDictionary<string, IEvent>();
}
MemoryMq.eventQueueDict.GetOrAdd (Guid.NewGuid().ToString(),@event);
return Task.CompletedTask;
}
}

public class InMemoryEventSubscriber: IEventSubscriber 
{
private readonly ConcurrentDictionary<string, Task> taskDict = new ConcurrentDictionary<string span>, Task>();



public Task Subscribe() where TEvent: IEvent
where EH: class, IEventHandler, new()
{

EH state
= new EH();



Task.Run(()
=>
{
while (true)
{
if (MemoryMq.eventQueueDict != null)
{
foreach (var a in MemoryMq.eventQueueDict)
{
state.Handle(a.Value
as< /span> IEvent);
IEvent o;
MemoryMq.eventQueueDict.TryRemove(a .Key ,
out o);
}
}

}

});

return Task.CompletedTask;
}
}

namespace  MemoryMqTest
{
public class EventHandler: IEventHandler
{
public Task Handle(IEvent e, MessagingHelper h)
{
switch (e)
{

case Order value: Console.WriteLine(value.name); break ;

}
return Task.CompletedTask;

}

}
public span> class Order: IEvent
{
public string name {get; set; }
}
class Program
{
static void Main(string[] args)
{
var< /span> servicecollection = new ServiceCollection();
servicecollection.AddSingleton
();
servicecollection.AddSingleton
();
var provider = servicecollection.BuildServiceProvider();

var eventPub = provider.GetService();

var< /span> _eventSub = provider.GetService();

_eventSub.Subscribe
();


var order = new Order();
order.name
= "test";
eventPub.Publish(order);

Console.WriteLine(
" Hello World!");

Console.ReadKey();
}
}
}

WordPress database error: [Table 'yf99682.wp_s6mz6tyggq_comments' doesn't exist]
SELECT SQL_CALC_FOUND_ROWS wp_s6mz6tyggq_comments.comment_ID FROM wp_s6mz6tyggq_comments WHERE ( comment_approved = '1' ) AND comment_post_ID = 307 ORDER BY wp_s6mz6tyggq_comments.comment_date_gmt ASC, wp_s6mz6tyggq_comments.comment_ID ASC

Leave a Comment

Your email address will not be published.