public interface EzyEventLoopEvent { boolean call(); default void onFinished() {} default void onRemoved() {} }
Ví dụ về EvenLoopGroup
EventLoopGroup sẽ cho phép chúng ta quản lý các EvenLoop và phân bổ các Event vào các EvenLoop sao cho đồng đều và đảm bảo 1 event sẽ được thực thi trên 1 thread duy nhất để tránh lỗi concurrent:
design-pattern
Remain: 5
1 Answer
tvd12
Enlightened
tvd12
Enlightened
public class EzyEventLoopGroup { private final EzyRoundRobin eventLoops; private final Map eventLoopByEvent; public static final int DEFAULT_MAX_SLEEP_TIME = 3; public EzyEventLoopGroup(int numberOfThreads) { this( numberOfThreads, EzyNettyThreadFactory.create("ezy-event-loop") ); } public EzyEventLoopGroup( int numberOfThreads, ThreadFactory threadFactory ) { this( DEFAULT_MAX_SLEEP_TIME, numberOfThreads, threadFactory ); } public EzyEventLoopGroup( int maxSleepTime, int numberOfThreads, ThreadFactory threadFactory ) { eventLoopByEvent = new ConcurrentHashMap(); eventLoops = new EzyRoundRobin( () -> new EventLoop(maxSleepTime, threadFactory), numberOfThreads ); for (int i = 0; i < numberOfThreads; ++i) { eventLoops.get().start(); } } public void addEvent(EzyEventLoopEvent event) { final EventLoop eventLoop = eventLoops.get(); eventLoopByEvent.put( event instanceof ScheduledEvent ? ((ScheduledEvent) event).event : event, eventLoop ); eventLoop.addEvent(event); } public void addScheduleEvent( EzyEventLoopEvent event, long period ) { addScheduleEvent(event, 0, period); } public void addScheduleEvent( EzyEventLoopEvent event, long delayTime, long period ) { addEvent(new ScheduledEvent(event, delayTime, period)); } public void addOneTimeEvent( Runnable event, long delayTime ) { final EzyEventLoopEvent wrapper = new EzyEventLoopEvent() { @Override public boolean call() { event.run(); return false; } @Override public void onFinished() { eventLoopByEvent.remove(this); } }; addEvent( new ScheduledEvent( wrapper, delayTime, delayTime ) ); } public void removeEvent(EzyEventLoopEvent event) { final EventLoop eventLoop = eventLoopByEvent.remove(event); if (eventLoop != null) { eventLoop.removeEvent(event); } } public void shutdown() { eventLoops.forEach(EventLoop::shutdownAndGet); } public List shutdownAndGet() { final List unfinishedEvents = new ArrayList(); eventLoops.forEach(it -> unfinishedEvents.addAll(it.shutdownAndGet()) ); return unfinishedEvents; } private static final class EventLoop extends EzyLoggable { private final int maxSleepTime; private final AtomicBoolean active; private final AtomicBoolean stopped; private final EzyFuture shutdownFuture; private final ThreadFactory threadFactory; private final List removeEvents; private final Map events; private EventLoop( int maxSleepTime, ThreadFactory threadFactory ) { this.maxSleepTime = maxSleepTime; this.threadFactory = threadFactory; this.active = new AtomicBoolean(); this.stopped = new AtomicBoolean(); this.events = new ConcurrentHashMap(); this.removeEvents = new ArrayList(); this.shutdownFuture = new EzyFutureTask(); } public void addEvent(EzyEventLoopEvent event) { if (!active.get()) { throw new IllegalStateException("event loop has stopped"); } events.put( event instanceof ScheduledEvent ? ((ScheduledEvent) event).event : event, event ); } public void removeEvent(EzyEventLoopEvent event) { synchronized (removeEvents) { removeEvents.add(event); } } private void doRemoveEvent(EzyEventLoopEvent event) { events.remove( event instanceof ScheduledEvent ? ((ScheduledEvent) event).event : event ); processWithLogException(event::onRemoved, true); } public void start() { threadFactory.newThread(this::doStart) .start(); } private void doStart() { active.set(true); final List eventBuffers = new ArrayList(); while (active.get()) { final long startTime = System.currentTimeMillis(); eventBuffers.addAll(events.values()); for (EzyEventLoopEvent event : eventBuffers) { try { if (event instanceof ScheduledEvent) { final ScheduledEvent scheduledEvent = (ScheduledEvent) event; if (scheduledEvent.isNotFireTime()) { continue; } } if (!event.call()) { synchronized (removeEvents) { removeEvents.add(event); } event.onFinished(); } } catch (Throwable e) { logger.error("fatal error on event loop with event: {}", event, e); } } eventBuffers.clear(); synchronized (removeEvents) { for (EzyEventLoopEvent event : removeEvents) { doRemoveEvent(event); } removeEvents.clear(); } final long elapsedTime = System.currentTimeMillis() - startTime; final long sleepTime = maxSleepTime - elapsedTime; if (sleepTime > 0) { try { //noinspection BusyWait Thread.sleep(sleepTime); } catch (InterruptedException e) { break; } } } synchronized (this) { stopped.set(true); shutdownFuture.setResult(true); } } public List shutdownAndGet() { active.set(false); synchronized (this) { if (stopped.get()) { shutdownFuture.setResult(true); } } processSilently(shutdownFuture::get); return new ArrayList(events.values()); } } private static final class ScheduledEvent implements EzyEventLoopEvent { private final long period; private final EzyEventLoopEvent event; private final AtomicLong nextFireTime = new AtomicLong(); private ScheduledEvent( EzyEventLoopEvent event, long delayTime, long period ) { this.period = period; this.event = event; this.nextFireTime.set( System.currentTimeMillis() + (delayTime <= 0 ? 0 : period) ); } public boolean isNotFireTime() { return System.currentTimeMillis() < nextFireTime.get(); } @Override public boolean call() { this.nextFireTime.addAndGet(period); return event.call(); } @Override public void onFinished() { event.onFinished(); } @Override public void onRemoved() { event.onRemoved(); } } }
-
0