001package com.box.sdk; 002 003import com.eclipsesource.json.Json; 004import com.eclipsesource.json.JsonArray; 005import com.eclipsesource.json.JsonObject; 006import com.eclipsesource.json.JsonValue; 007import java.util.ArrayList; 008import java.util.Collection; 009 010/** 011 * Receives real-time events from the API and forwards them to {@link EventListener EventListeners}. 012 * 013 * <p>This class handles long polling the Box events endpoint in order to receive real-time user events. 014 * When an EventStream is started, it begins long polling on a separate thread until the {@link #stop} method 015 * is called. 016 * Since the API may return duplicate events, EventStream also maintains a small cache of the most recently received 017 * event IDs in order to automatically deduplicate events.</p> 018 * <p>Note: Enterprise Events can be accessed by admin users with the EventLog.getEnterpriseEvents method</p> 019 */ 020public class EventStream { 021 022 private static final int LIMIT = 800; 023 /** 024 * Events URL. 025 */ 026 public static final URLTemplate EVENT_URL = new URLTemplate("events?limit=" + LIMIT + "&stream_position=%s"); 027 private static final int STREAM_POSITION_NOW = -1; 028 private static final int DEFAULT_POLLING_DELAY = 1000; 029 private final BoxAPIConnection api; 030 private final long startingPosition; 031 private final int pollingDelay; 032 private final Collection<EventListener> listeners; 033 private final Object listenerLock; 034 035 private LRUCache<String> receivedEvents; 036 private boolean started; 037 private Poller poller; 038 private Thread pollerThread; 039 040 /** 041 * Constructs an EventStream using an API connection. 042 * 043 * @param api the API connection to use. 044 */ 045 public EventStream(BoxAPIConnection api) { 046 this(api, STREAM_POSITION_NOW, DEFAULT_POLLING_DELAY); 047 } 048 049 /** 050 * Constructs an EventStream using an API connection and a starting initial position. 051 * 052 * @param api the API connection to use. 053 * @param startingPosition the starting position of the event stream. 054 */ 055 public EventStream(BoxAPIConnection api, long startingPosition) { 056 this(api, startingPosition, DEFAULT_POLLING_DELAY); 057 } 058 059 /** 060 * Constructs an EventStream using an API connection and a starting initial position with custom polling delay. 061 * 062 * @param api the API connection to use. 063 * @param startingPosition the starting position of the event stream. 064 * @param pollingDelay the delay in milliseconds between successive calls to get more events. 065 */ 066 public EventStream(BoxAPIConnection api, long startingPosition, int pollingDelay) { 067 this.api = api; 068 this.startingPosition = startingPosition; 069 this.listeners = new ArrayList<>(); 070 this.listenerLock = new Object(); 071 this.pollingDelay = pollingDelay; 072 } 073 074 /** 075 * Adds a listener that will be notified when an event is received. 076 * 077 * @param listener the listener to add. 078 */ 079 public void addListener(EventListener listener) { 080 synchronized (this.listenerLock) { 081 this.listeners.add(listener); 082 } 083 } 084 085 /** 086 * Indicates whether or not this EventStream has been started. 087 * 088 * @return true if this EventStream has been started; otherwise false. 089 */ 090 public boolean isStarted() { 091 return this.started; 092 } 093 094 /** 095 * Stops this EventStream and disconnects from the API. 096 * 097 * @throws IllegalStateException if the EventStream is already stopped. 098 */ 099 public void stop() { 100 if (!this.started) { 101 throw new IllegalStateException("Cannot stop the EventStream because it isn't started."); 102 } 103 104 this.started = false; 105 this.pollerThread.interrupt(); 106 } 107 108 /** 109 * Starts this EventStream and begins long polling the API. 110 * 111 * @throws IllegalStateException if the EventStream is already started. 112 */ 113 public void start() { 114 if (this.started) { 115 throw new IllegalStateException("Cannot start the EventStream because it isn't stopped."); 116 } 117 118 final long initialPosition; 119 120 if (this.startingPosition == STREAM_POSITION_NOW) { 121 BoxJSONRequest request = new BoxJSONRequest(this.api, 122 EVENT_URL.buildAlpha(this.api.getBaseURL(), "now"), "GET" 123 ); 124 try (BoxJSONResponse response = request.send()) { 125 JsonObject jsonObject = Json.parse(response.getJSON()).asObject(); 126 initialPosition = jsonObject.get("next_stream_position").asLong(); 127 } 128 } else { 129 assert this.startingPosition >= 0 : "Starting position must be non-negative"; 130 initialPosition = this.startingPosition; 131 } 132 133 this.poller = new Poller(initialPosition); 134 135 this.pollerThread = new Thread(this.poller); 136 this.pollerThread.setUncaughtExceptionHandler((t, e) -> EventStream.this.notifyException(e)); 137 this.pollerThread.start(); 138 139 this.started = true; 140 } 141 142 /** 143 * Indicates whether or not an event ID is a duplicate. 144 * 145 * <p>This method can be overridden by a subclass in order to provide custom de-duping logic.</p> 146 * 147 * @param eventID the event ID. 148 * @return true if the event is a duplicate; otherwise false. 149 */ 150 protected boolean isDuplicate(String eventID) { 151 if (this.receivedEvents == null) { 152 this.receivedEvents = new LRUCache<>(); 153 } 154 155 return !this.receivedEvents.add(eventID); 156 } 157 158 private void notifyNextPosition(long position) { 159 synchronized (this.listenerLock) { 160 for (EventListener listener : this.listeners) { 161 listener.onNextPosition(position); 162 } 163 } 164 } 165 166 private void notifyEvent(BoxEvent event) { 167 synchronized (this.listenerLock) { 168 boolean isDuplicate = this.isDuplicate(event.getID()); 169 if (!isDuplicate) { 170 for (EventListener listener : this.listeners) { 171 listener.onEvent(event); 172 } 173 } 174 } 175 } 176 177 private void notifyException(Throwable e) { 178 if (e instanceof InterruptedException && !this.started) { 179 return; 180 } 181 182 this.stop(); 183 synchronized (this.listenerLock) { 184 for (EventListener listener : this.listeners) { 185 if (listener.onException(e)) { 186 return; 187 } 188 } 189 } 190 } 191 192 private class Poller implements Runnable { 193 private final long initialPosition; 194 195 private RealtimeServerConnection server; 196 197 Poller(long initialPosition) { 198 this.initialPosition = initialPosition; 199 this.server = new RealtimeServerConnection(EventStream.this.api); 200 } 201 202 @Override 203 public void run() { 204 long position = this.initialPosition; 205 while (!Thread.interrupted()) { 206 if (this.server.getRemainingRetries() == 0) { 207 this.server = new RealtimeServerConnection(EventStream.this.api); 208 } 209 210 if (this.server.waitForChange(position)) { 211 if (Thread.interrupted()) { 212 return; 213 } 214 215 BoxJSONRequest request = new BoxJSONRequest(EventStream.this.api, 216 EVENT_URL.buildAlpha(EventStream.this.api.getBaseURL(), position), "GET" 217 ); 218 try (BoxJSONResponse response = request.send()) { 219 JsonObject jsonObject = Json.parse(response.getJSON()).asObject(); 220 JsonArray entriesArray = jsonObject.get("entries").asArray(); 221 for (JsonValue entry : entriesArray) { 222 BoxEvent event = new BoxEvent(EventStream.this.api, entry.asObject()); 223 EventStream.this.notifyEvent(event); 224 } 225 position = jsonObject.get("next_stream_position").asLong(); 226 EventStream.this.notifyNextPosition(position); 227 try { 228 // Delay re-polling to avoid making too many API calls 229 // Since duplicate events may appear in the stream, without any delay added 230 // the stream can make 3-5 requests per second and not produce any new 231 // events. A short delay between calls balances latency for new events 232 // and the risk of hitting rate limits. 233 Thread.sleep(EventStream.this.pollingDelay); 234 } catch (InterruptedException ex) { 235 return; 236 } 237 } 238 } 239 } 240 } 241 } 242}