001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one or more
003 *  contributor license agreements.  See the NOTICE file distributed with
004 *  this work for additional information regarding copyright ownership.
005 *  The ASF licenses this file to You under the Apache License, Version 2.0
006 *  (the "License"); you may not use this file except in compliance with
007 *  the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 *  Unless required by applicable law or agreed to in writing, software
012 *  distributed under the License is distributed on an "AS IS" BASIS,
013 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 *  See the License for the specific language governing permissions and
015 *  limitations under the License.
016 *
017 */
018package org.apache.commons.compress.archivers.zip;
019
020import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
021
022import java.io.Closeable;
023import java.io.DataOutput;
024import java.io.IOException;
025import java.io.InputStream;
026import java.io.OutputStream;
027import java.util.zip.CRC32;
028import java.util.zip.Deflater;
029
030/**
031 * Encapsulates a {@link Deflater} and crc calculator, handling multiple types of output streams.
032 * Currently {@link java.util.zip.ZipEntry#DEFLATED} and {@link java.util.zip.ZipEntry#STORED} are the only
033 * supported compression methods.
034 *
035 * @since 1.10
036 */
037public abstract class StreamCompressor implements Closeable {
038
039    /*
040     * Apparently Deflater.setInput gets slowed down a lot on Sun JVMs
041     * when it gets handed a really big buffer.  See
042     * https://issues.apache.org/bugzilla/show_bug.cgi?id=45396
043     *
044     * Using a buffer size of 8 kB proved to be a good compromise
045     */
046    private static final int DEFLATER_BLOCK_SIZE = 8192;
047
048    private final Deflater def;
049
050    private final CRC32 crc = new CRC32();
051
052    private long writtenToOutputStreamForLastEntry = 0;
053    private long sourcePayloadLength = 0;
054    private long totalWrittenToOutputStream = 0;
055
056    private static final int bufferSize = 4096;
057    private final byte[] outputBuffer = new byte[bufferSize];
058    private final byte[] readerBuf = new byte[bufferSize];
059
060    StreamCompressor(Deflater deflater) {
061        this.def = deflater;
062    }
063
064    /**
065     * Create a stream compressor with the given compression level.
066     *
067     * @param os       The stream to receive output
068     * @param deflater The deflater to use
069     * @return A stream compressor
070     */
071    static StreamCompressor create(OutputStream os, Deflater deflater) {
072        return new OutputStreamCompressor(deflater, os);
073    }
074
075    /**
076     * Create a stream compressor with the default compression level.
077     *
078     * @param os The stream to receive output
079     * @return A stream compressor
080     */
081    static StreamCompressor create(OutputStream os) {
082        return create(os, new Deflater(Deflater.DEFAULT_COMPRESSION, true));
083    }
084
085    /**
086     * Create a stream compressor with the given compression level.
087     *
088     * @param os       The DataOutput to receive output
089     * @param deflater The deflater to use for the compressor
090     * @return A stream compressor
091     */
092    static StreamCompressor create(DataOutput os, Deflater deflater) {
093        return new DataOutputCompressor(deflater, os);
094    }
095
096    /**
097     * Create a stream compressor with the given compression level.
098     *
099     * @param compressionLevel The {@link Deflater}  compression level
100     * @param bs               The ScatterGatherBackingStore to receive output
101     * @return A stream compressor
102     */
103    public static StreamCompressor create(int compressionLevel, ScatterGatherBackingStore bs) {
104        final Deflater deflater = new Deflater(compressionLevel, true);
105        return new ScatterGatherBackingStoreCompressor(deflater, bs);
106    }
107
108    /**
109     * Create a stream compressor with the default compression level.
110     *
111     * @param bs The ScatterGatherBackingStore to receive output
112     * @return A stream compressor
113     */
114    public static StreamCompressor create(ScatterGatherBackingStore bs) {
115        return create(Deflater.DEFAULT_COMPRESSION, bs);
116    }
117
118    /**
119     * The crc32 of the last deflated file
120     *
121     * @return the crc32
122     */
123
124    public long getCrc32() {
125        return crc.getValue();
126    }
127
128    /**
129     * Return the number of bytes read from the source stream
130     *
131     * @return The number of bytes read, never negative
132     */
133    public long getBytesRead() {
134        return sourcePayloadLength;
135    }
136
137    /**
138     * The number of bytes written to the output for the last entry
139     *
140     * @return The number of bytes, never negative
141     */
142    public long getBytesWrittenForLastEntry() {
143        return writtenToOutputStreamForLastEntry;
144    }
145
146    /**
147     * The total number of bytes written to the output for all files
148     *
149     * @return The number of bytes, never negative
150     */
151    public long getTotalBytesWritten() {
152        return totalWrittenToOutputStream;
153    }
154
155
156    /**
157     * Deflate the given source using the supplied compression method
158     *
159     * @param source The source to compress
160     * @param method The #ZipArchiveEntry compression method
161     * @throws IOException When failures happen
162     */
163
164    public void deflate(InputStream source, int method) throws IOException {
165        reset();
166        int length;
167
168        while ((length = source.read(readerBuf, 0, readerBuf.length)) >= 0) {
169            write(readerBuf, 0, length, method);
170        }
171        if (method == ZipArchiveEntry.DEFLATED) {
172            flushDeflater();
173        }
174    }
175
176    /**
177     * Writes bytes to ZIP entry.
178     *
179     * @param b      the byte array to write
180     * @param offset the start position to write from
181     * @param length the number of bytes to write
182     * @param method the comrpession method to use
183     * @return the number of bytes written to the stream this time
184     * @throws IOException on error
185     */
186    long write(byte[] b, int offset, int length, int method) throws IOException {
187        long current = writtenToOutputStreamForLastEntry;
188        crc.update(b, offset, length);
189        if (method == ZipArchiveEntry.DEFLATED) {
190            writeDeflated(b, offset, length);
191        } else {
192            writeCounted(b, offset, length);
193        }
194        sourcePayloadLength += length;
195        return writtenToOutputStreamForLastEntry - current;
196    }
197
198
199    void reset() {
200        crc.reset();
201        def.reset();
202        sourcePayloadLength = 0;
203        writtenToOutputStreamForLastEntry = 0;
204    }
205
206    public void close() throws IOException {
207        def.end();
208    }
209
210    void flushDeflater() throws IOException {
211        def.finish();
212        while (!def.finished()) {
213            deflate();
214        }
215    }
216
217    private void writeDeflated(byte[] b, int offset, int length)
218            throws IOException {
219        if (length > 0 && !def.finished()) {
220            if (length <= DEFLATER_BLOCK_SIZE) {
221                def.setInput(b, offset, length);
222                deflateUntilInputIsNeeded();
223            } else {
224                final int fullblocks = length / DEFLATER_BLOCK_SIZE;
225                for (int i = 0; i < fullblocks; i++) {
226                    def.setInput(b, offset + i * DEFLATER_BLOCK_SIZE,
227                            DEFLATER_BLOCK_SIZE);
228                    deflateUntilInputIsNeeded();
229                }
230                final int done = fullblocks * DEFLATER_BLOCK_SIZE;
231                if (done < length) {
232                    def.setInput(b, offset + done, length - done);
233                    deflateUntilInputIsNeeded();
234                }
235            }
236        }
237    }
238
239    private void deflateUntilInputIsNeeded() throws IOException {
240        while (!def.needsInput()) {
241            deflate();
242        }
243    }
244
245    void deflate() throws IOException {
246        int len = def.deflate(outputBuffer, 0, outputBuffer.length);
247        if (len > 0) {
248            writeCounted(outputBuffer, 0, len);
249        }
250    }
251
252    public void writeCounted(byte[] data) throws IOException {
253        writeCounted(data, 0, data.length);
254    }
255
256    public void writeCounted(byte[] data, int offset, int length) throws IOException {
257        writeOut(data, offset, length);
258        writtenToOutputStreamForLastEntry += length;
259        totalWrittenToOutputStream += length;
260    }
261
262    protected abstract void writeOut(byte[] data, int offset, int length) throws IOException;
263
264    private static final class ScatterGatherBackingStoreCompressor extends StreamCompressor {
265        private final ScatterGatherBackingStore bs;
266
267        public ScatterGatherBackingStoreCompressor(Deflater deflater, ScatterGatherBackingStore bs) {
268            super(deflater);
269            this.bs = bs;
270        }
271
272        protected final void writeOut(byte[] data, int offset, int length)
273                throws IOException {
274            bs.writeOut(data, offset, length);
275        }
276    }
277
278    private static final class OutputStreamCompressor extends StreamCompressor {
279        private final OutputStream os;
280
281        public OutputStreamCompressor(Deflater deflater, OutputStream os) {
282            super(deflater);
283            this.os = os;
284        }
285
286        protected final void writeOut(byte[] data, int offset, int length)
287                throws IOException {
288            os.write(data, offset, length);
289        }
290    }
291
292    private static final class DataOutputCompressor extends StreamCompressor {
293        private final DataOutput raf;
294
295        public DataOutputCompressor(Deflater deflater, DataOutput raf) {
296            super(deflater);
297            this.raf = raf;
298        }
299
300        protected final void writeOut(byte[] data, int offset, int length)
301                throws IOException {
302            raf.write(data, offset, length);
303        }
304    }
305}