001/**
002 * Copyright (C) 2012 FuseSource, Inc.
003 * http://fusesource.com
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *    http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.fusesource.hawtdispatch.transport;
019
020import org.fusesource.hawtdispatch.Dispatch;
021import org.fusesource.hawtdispatch.Task;
022
023import java.util.concurrent.TimeUnit;
024
025/**
026 * <p>A HeartBeatMonitor can be used to watch the read and write
027 * activity of a transport and raise events when the write side
028 * or read side has been idle too long.</p>
029 *
030 * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
031 */
032public class HeartBeatMonitor {
033
034    Transport transport;
035    long initialWriteCheckDelay;
036    long initialReadCheckDelay;
037    long writeInterval;
038    long readInterval;
039
040    Task onKeepAlive = Dispatch.NOOP;
041    Task onDead = Dispatch.NOOP;
042
043    short session = 0;
044
045    boolean readSuspendedInterval;
046    short readSuspendCount;
047
048    public void suspendRead() {
049        readSuspendCount++;
050        readSuspendedInterval = true;
051    }
052
053    public void resumeRead() {
054        readSuspendCount--;
055    }
056
057    private void schedule(final short session, long interval, final Task func) {
058        if (this.session == session) {
059            transport.getDispatchQueue().executeAfter(interval, TimeUnit.MILLISECONDS, new Task() {
060                public void run() {
061                    if (HeartBeatMonitor.this.session == session) {
062                        func.run();
063                    }
064                }
065            });
066        }
067    }
068
069    private void scheduleCheckWrites(final short session) {
070        final ProtocolCodec codec = transport.getProtocolCodec();
071        Task func;
072        if (codec == null) {
073            func = new Task() {
074                public void run() {
075                    scheduleCheckWrites(session);
076                }
077            };
078        } else {
079            final long lastWriteCounter = codec.getWriteCounter();
080            func = new Task() {
081                public void run() {
082                    if (lastWriteCounter == codec.getWriteCounter()) {
083                        onKeepAlive.run();
084                    }
085                    scheduleCheckWrites(session);
086                }
087            };
088        }
089        schedule(session, writeInterval, func);
090    }
091
092    private void scheduleCheckReads(final short session) {
093        final ProtocolCodec codec = transport.getProtocolCodec();
094        Task func;
095        if (codec == null) {
096            func = new Task() {
097                public void run() {
098                    scheduleCheckReads(session);
099                }
100            };
101        } else {
102            final long lastReadCounter = codec.getReadCounter();
103            func = new Task() {
104                public void run() {
105                    if (lastReadCounter == codec.getReadCounter() && !readSuspendedInterval && readSuspendCount == 0) {
106                        onDead.run();
107                    }
108                    readSuspendedInterval = false;
109                    scheduleCheckReads(session);
110                }
111            };
112        }
113        schedule(session, readInterval, func);
114    }
115
116    public void start() {
117        session++;
118        readSuspendedInterval = false;
119        if (writeInterval != 0) {
120            if (initialWriteCheckDelay != 0) {
121                transport.getDispatchQueue().executeAfter(initialWriteCheckDelay, TimeUnit.MILLISECONDS, new Task() {
122                    public void run() {
123                        scheduleCheckWrites(session);
124                    }
125                });
126            } else {
127                scheduleCheckWrites(session);
128            }
129        }
130        if (readInterval != 0) {
131            if (initialReadCheckDelay != 0) {
132                transport.getDispatchQueue().executeAfter(initialReadCheckDelay, TimeUnit.MILLISECONDS, new Task() {
133                    public void run() {
134                        scheduleCheckReads(session);
135                    }
136                });
137            } else {
138                scheduleCheckReads(session);
139            }
140        }
141    }
142
143    public void stop() {
144        session++;
145    }
146
147
148    public long getInitialReadCheckDelay() {
149        return initialReadCheckDelay;
150    }
151
152    public void setInitialReadCheckDelay(long initialReadCheckDelay) {
153        this.initialReadCheckDelay = initialReadCheckDelay;
154    }
155
156    public long getInitialWriteCheckDelay() {
157        return initialWriteCheckDelay;
158    }
159
160    public void setInitialWriteCheckDelay(long initialWriteCheckDelay) {
161        this.initialWriteCheckDelay = initialWriteCheckDelay;
162    }
163
164    public Task getOnDead() {
165        return onDead;
166    }
167
168    public void setOnDead(Task onDead) {
169        this.onDead = onDead;
170    }
171
172    public Task getOnKeepAlive() {
173        return onKeepAlive;
174    }
175
176    public void setOnKeepAlive(Task onKeepAlive) {
177        this.onKeepAlive = onKeepAlive;
178    }
179
180    public long getWriteInterval() {
181        return writeInterval;
182    }
183
184    public void setWriteInterval(long writeInterval) {
185        this.writeInterval = writeInterval;
186    }
187
188    public Transport getTransport() {
189        return transport;
190    }
191
192    public void setTransport(Transport transport) {
193        this.transport = transport;
194    }
195
196    public long getReadInterval() {
197        return readInterval;
198    }
199
200    public void setReadInterval(long readInterval) {
201        this.readInterval = readInterval;
202    }
203}