public interface EzyEventLoopEvent {
boolean call();
default void onFinished() {}
default void onRemoved() {}
}
Ví dụ về EvenLoopGroup
EventLoopGroup sẽ cho phép chúng ta quản lý các EvenLoop và phân bổ các Event vào các EvenLoop sao cho đồng đều và đảm bảo 1 event sẽ được thực thi trên 1 thread duy nhất để tránh lỗi concurrent:
design-pattern
Remain: 5
1 Answer
tvd12
Enlightened
tvd12
Enlightened
public class EzyEventLoopGroup {
private final EzyRoundRobin eventLoops;
private final Map eventLoopByEvent;
public static final int DEFAULT_MAX_SLEEP_TIME = 3;
public EzyEventLoopGroup(int numberOfThreads) {
this(
numberOfThreads,
EzyNettyThreadFactory.create("ezy-event-loop")
);
}
public EzyEventLoopGroup(
int numberOfThreads,
ThreadFactory threadFactory
) {
this(
DEFAULT_MAX_SLEEP_TIME,
numberOfThreads,
threadFactory
);
}
public EzyEventLoopGroup(
int maxSleepTime,
int numberOfThreads,
ThreadFactory threadFactory
) {
eventLoopByEvent = new ConcurrentHashMap();
eventLoops = new EzyRoundRobin(
() -> new EventLoop(maxSleepTime, threadFactory),
numberOfThreads
);
for (int i = 0; i < numberOfThreads; ++i) {
eventLoops.get().start();
}
}
public void addEvent(EzyEventLoopEvent event) {
final EventLoop eventLoop = eventLoops.get();
eventLoopByEvent.put(
event instanceof ScheduledEvent
? ((ScheduledEvent) event).event
: event,
eventLoop
);
eventLoop.addEvent(event);
}
public void addScheduleEvent(
EzyEventLoopEvent event,
long period
) {
addScheduleEvent(event, 0, period);
}
public void addScheduleEvent(
EzyEventLoopEvent event,
long delayTime,
long period
) {
addEvent(new ScheduledEvent(event, delayTime, period));
}
public void addOneTimeEvent(
Runnable event,
long delayTime
) {
final EzyEventLoopEvent wrapper = new EzyEventLoopEvent() {
@Override
public boolean call() {
event.run();
return false;
}
@Override
public void onFinished() {
eventLoopByEvent.remove(this);
}
};
addEvent(
new ScheduledEvent(
wrapper,
delayTime,
delayTime
)
);
}
public void removeEvent(EzyEventLoopEvent event) {
final EventLoop eventLoop = eventLoopByEvent.remove(event);
if (eventLoop != null) {
eventLoop.removeEvent(event);
}
}
public void shutdown() {
eventLoops.forEach(EventLoop::shutdownAndGet);
}
public List shutdownAndGet() {
final List unfinishedEvents = new ArrayList();
eventLoops.forEach(it ->
unfinishedEvents.addAll(it.shutdownAndGet())
);
return unfinishedEvents;
}
private static final class EventLoop extends EzyLoggable {
private final int maxSleepTime;
private final AtomicBoolean active;
private final AtomicBoolean stopped;
private final EzyFuture shutdownFuture;
private final ThreadFactory threadFactory;
private final List removeEvents;
private final Map events;
private EventLoop(
int maxSleepTime,
ThreadFactory threadFactory
) {
this.maxSleepTime = maxSleepTime;
this.threadFactory = threadFactory;
this.active = new AtomicBoolean();
this.stopped = new AtomicBoolean();
this.events = new ConcurrentHashMap();
this.removeEvents = new ArrayList();
this.shutdownFuture = new EzyFutureTask();
}
public void addEvent(EzyEventLoopEvent event) {
if (!active.get()) {
throw new IllegalStateException("event loop has stopped");
}
events.put(
event instanceof ScheduledEvent
? ((ScheduledEvent) event).event
: event,
event
);
}
public void removeEvent(EzyEventLoopEvent event) {
synchronized (removeEvents) {
removeEvents.add(event);
}
}
private void doRemoveEvent(EzyEventLoopEvent event) {
events.remove(
event instanceof ScheduledEvent
? ((ScheduledEvent) event).event
: event
);
processWithLogException(event::onRemoved, true);
}
public void start() {
threadFactory.newThread(this::doStart)
.start();
}
private void doStart() {
active.set(true);
final List eventBuffers = new ArrayList();
while (active.get()) {
final long startTime = System.currentTimeMillis();
eventBuffers.addAll(events.values());
for (EzyEventLoopEvent event : eventBuffers) {
try {
if (event instanceof ScheduledEvent) {
final ScheduledEvent scheduledEvent = (ScheduledEvent) event;
if (scheduledEvent.isNotFireTime()) {
continue;
}
}
if (!event.call()) {
synchronized (removeEvents) {
removeEvents.add(event);
}
event.onFinished();
}
} catch (Throwable e) {
logger.error("fatal error on event loop with event: {}", event, e);
}
}
eventBuffers.clear();
synchronized (removeEvents) {
for (EzyEventLoopEvent event : removeEvents) {
doRemoveEvent(event);
}
removeEvents.clear();
}
final long elapsedTime = System.currentTimeMillis() - startTime;
final long sleepTime = maxSleepTime - elapsedTime;
if (sleepTime > 0) {
try {
//noinspection BusyWait
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
break;
}
}
}
synchronized (this) {
stopped.set(true);
shutdownFuture.setResult(true);
}
}
public List shutdownAndGet() {
active.set(false);
synchronized (this) {
if (stopped.get()) {
shutdownFuture.setResult(true);
}
}
processSilently(shutdownFuture::get);
return new ArrayList(events.values());
}
}
private static final class ScheduledEvent implements EzyEventLoopEvent {
private final long period;
private final EzyEventLoopEvent event;
private final AtomicLong nextFireTime = new AtomicLong();
private ScheduledEvent(
EzyEventLoopEvent event,
long delayTime,
long period
) {
this.period = period;
this.event = event;
this.nextFireTime.set(
System.currentTimeMillis() + (delayTime <= 0 ? 0 : period)
);
}
public boolean isNotFireTime() {
return System.currentTimeMillis() < nextFireTime.get();
}
@Override
public boolean call() {
this.nextFireTime.addAndGet(period);
return event.call();
}
@Override
public void onFinished() {
event.onFinished();
}
@Override
public void onRemoved() {
event.onRemoved();
}
}
}
-
0