Unverified Commit d35cd2f2 authored by Jorge Izquierdo Ciges's avatar Jorge Izquierdo Ciges Committed by GitHub
Browse files

Merge pull request #189 from EGA-archive/EE-1678

Add SimpleSeekableStream in commons
parents 145dd26e b7c21eae
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
......@@ -47,6 +47,33 @@
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!-- SamTools dependencies -->
<dependency>
<groupId>com.github.samtools</groupId>
<artifactId>htsjdk</artifactId>
<version>2.23.0</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>3.9.1</version>
</dependency>
<!-- Test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.0.103-beta</version>
<scope>test</scope>
</dependency> -->
</dependencies>
<dependencyManagement>
......
......@@ -97,6 +97,24 @@
<version>2.0.103-beta</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mock-server</groupId>
<artifactId>mockserver-netty</artifactId>
<version>5.11.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mock-server</groupId>
<artifactId>mockserver-client-java</artifactId>
<version>5.11.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mock-server</groupId>
<artifactId>mockserver-junit-rule</artifactId>
<version>5.11.2</version>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
......
/*
*
* Copyright 2020 EMBL - European Bioinformatics Institute
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package eu.elixir.ega.ebi.dataedge.config;
import okhttp3.ConnectionPool;
import okhttp3.OkHttpClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.TimeUnit;
@Configuration
public class ResConnectionConfig {
@Value("${res.connection.pool.size}")
private int connectionPoolSize;
@Value("${res.connection.pool.keepalive.ms}")
private int connectionPoolKeepAlive;
@Value("${res.connection.timeout.connect.ms}")
private int resConnectionConnectTimeout;
@Value("${res.connection.timeout.read.ms}")
private int resConnectionReadTimeout;
@Bean
public ConnectionPool connectionPool() {
return new ConnectionPool(connectionPoolSize, connectionPoolKeepAlive, TimeUnit.MILLISECONDS);
}
@Bean
public OkHttpClient resConnectionClient(@Autowired ConnectionPool connectionPool) {
OkHttpClient.Builder builder = new OkHttpClient.Builder()
.retryOnConnectionFailure(true)
.connectionPool(connectionPool);
if (resConnectionConnectTimeout > 0)
builder = builder.connectTimeout(resConnectionConnectTimeout, TimeUnit.MILLISECONDS);
if (resConnectionReadTimeout > 0)
builder = builder.readTimeout(resConnectionReadTimeout, TimeUnit.MILLISECONDS);
return builder.build();
}
}
......@@ -25,6 +25,7 @@ import eu.elixir.ega.ebi.commons.exception.InternalErrorException;
import eu.elixir.ega.ebi.commons.exception.NoContentException;
import eu.elixir.ega.ebi.commons.exception.PermissionDeniedException;
import eu.elixir.ega.ebi.commons.exception.UnavailableForLegalReasonsException;
import eu.elixir.ega.ebi.dataedge.utils.SimpleSeekableStream;
import eu.elixir.ega.ebi.commons.shared.dto.DownloadEntry;
import eu.elixir.ega.ebi.commons.shared.dto.EventEntry;
import eu.elixir.ega.ebi.commons.shared.dto.File;
......@@ -33,7 +34,6 @@ import eu.elixir.ega.ebi.commons.shared.dto.MyExternalConfig;
import eu.elixir.ega.ebi.commons.shared.service.DownloaderLogService;
import eu.elixir.ega.ebi.commons.shared.service.FileInfoService;
import eu.elixir.ega.ebi.dataedge.dto.*;
import eu.elixir.ega.ebi.htsjdk.samtools.seekablestream.EgaSeekableCachedResStream;
import eu.elixir.ega.ebi.htsjdk.variant.vcf.MyVCFFileReader;
import eu.elixir.ega.ebi.dataedge.service.FileLengthService;
import eu.elixir.ega.ebi.dataedge.service.FileService;
......@@ -52,8 +52,10 @@ import htsjdk.variant.variantcontext.writer.VariantContextWriterBuilder;
import htsjdk.variant.vcf.VCFHeader;
import lombok.extern.slf4j.Slf4j;
import okhttp3.OkHttpClient;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.client.loadbalancer.LoadBalancerClient;
......@@ -91,12 +93,24 @@ import static org.apache.catalina.connector.OutputBuffer.DEFAULT_BUFFER_SIZE;
@Slf4j
public class RemoteFileServiceImpl implements FileService {
@Autowired
private OkHttpClient client;
@Autowired
private LoadBalancerClient loadBalancer;
@Autowired
private RestTemplate restTemplate;
@Value("${res.connection.chunksize.header}")
private int resHeaderChunkSize;
@Value("${res.connection.chunksize.data}")
private int resDataChunkSize;
@Value("${res.connection.chunksize.index}")
private int resIndexChunkSize;
// Database Repositories/Services
@Autowired
......@@ -352,7 +366,7 @@ public class RemoteFileServiceImpl implements FileService {
URL resURL;
try {
resURL = new URL(resURL() + "/file/archive/" + reqFile.getFileId()); // Just specify file ID
SeekableStream cIn = new EgaSeekableCachedResStream(resURL, null, null, reqFile.getFileSize()); // Deals with coordinates
SeekableStream cIn = new SimpleSeekableStream(resURL, client, resHeaderChunkSize, reqFile.getFileSize()); // Deals with coordinates
SamReader reader = (x == null) ?
(SamReaderFactory.make() // BAM File
.validationStringency(ValidationStringency.LENIENT)
......@@ -453,8 +467,17 @@ public class RemoteFileServiceImpl implements FileService {
// BAM/CRAM File
URL resURL = new URL(resURL() + "/file/archive/" + reqFile.getFileId()); // Just specify file ID
SeekableStream cIn = (new EgaSeekableCachedResStream(resURL, null, null, reqFile.getFileSize())).setExtension(extension); // Deals with coordinates
//bIn = new SeekableBufferedStream(cIn);
// HTSJDK works out if the stream is a BAM or a CRAM file from the extension on the URI but RES URIs do
// not have extensions, so override the name returned by getSource
final String finalExtension = extension;
SeekableStream cIn = new SimpleSeekableStream(resURL, client, resDataChunkSize, reqFile.getFileSize()) {
@Override
public String getSource() {
return super.getSource() + "." + finalExtension;
}
};
// BAI/CRAI File
FileIndexFile fileIndexFile = getFileIndexFile(reqFile.getFileId());
if(fileIndexFile == null || StringUtils.isEmpty(fileIndexFile.getIndexFileId())) {
......@@ -463,7 +486,7 @@ public class RemoteFileServiceImpl implements FileService {
File reqIndexFile = fileInfoService.getFileInfo(fileIndexFile.getIndexFileId());
URL indexUrl = new URL(resURL() + "/file/archive/" + fileIndexFile.getIndexFileId()); // Just specify index ID
SeekableStream cIndexIn = (new EgaSeekableCachedResStream(indexUrl, null, null, reqIndexFile.getFileSize()));
SeekableStream cIndexIn = (new SimpleSeekableStream(indexUrl, client, resIndexChunkSize, reqIndexFile.getFileSize()));
inputResource = SamInputResource.of(cIn).index(cIndexIn);
} catch (Exception ex) {
......
/*
*
* Copyright 2020 EMBL - European Bioinformatics Institute
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package eu.elixir.ega.ebi.dataedge.utils;
import htsjdk.samtools.seekablestream.SeekableStream;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import java.io.IOException;
import java.net.URL;
public class SimpleSeekableStream extends SeekableStream {
// Build logger
private static final Logger logger = LoggerFactory.getLogger(SimpleSeekableStream.class);
private static final int DEFAULT_CHUNK_SIZE = 4096;
private final URL url;
private final int chunkSize;
private final long length;
private final OkHttpClient client;
private long position;
private byte[] buffer;
private long bufferPosition;
public SimpleSeekableStream(URL url, OkHttpClient client) throws IOException {
this(url, client, DEFAULT_CHUNK_SIZE);
}
public SimpleSeekableStream(URL url, OkHttpClient client, int chunkSize) throws IOException {
this(url, client, chunkSize, getContentLength(url, client));
}
protected static Long getContentLength(URL url, OkHttpClient client) throws IOException {
Response response;
do {
response = client.newCall(new Request.Builder().url(url).head().build()).execute();
} while (responseShouldRetry(response, url));
return Long.parseLong(response.header(HttpHeaders.CONTENT_LENGTH));
}
public SimpleSeekableStream(URL url, OkHttpClient client, int chunkSize, long length) throws IOException {
this.url = url;
this.chunkSize = chunkSize;
this.client = client;
this.length = length;
}
@Override
public long length() {
return length;
}
@Override
public long position() throws IOException {
return position;
}
@Override
public void seek(long position) throws IOException {
if (position < 0 || position >= length) {
throw new IOException("Invalid seek position");
}
this.position = position;
}
@Override
public int read() throws IOException {
if (eof())
return -1;
fillBufferWithChunk(position);
return buffer[(int) ((position++) - bufferPosition)];
}
protected void fillBufferWithChunk(long position) throws IOException {
if (buffer == null || position < bufferPosition || position >= bufferPosition + buffer.length) {
Request request = new Request.Builder()
.url(url)
.addHeader(HttpHeaders.RANGE, String.format("bytes=%d-%d", position, Math.min(position + chunkSize, length) - 1))
.build();
Response response;
do {
response = client.newCall(request).execute();
} while (responseShouldRetry(response, url));
buffer = response.body().bytes();
bufferPosition = position;
}
}
private static boolean responseShouldRetry(Response response, URL uri) throws IOException {
switch (HttpStatus.valueOf(response.code())) {
case OK:
case PARTIAL_CONTENT:
return false;
case SERVICE_UNAVAILABLE:
logger.warn("Got SERVICE_UNAVAILABLE when requesting {}, will retry", uri);
return true;
default:
logger.error("Unexpected status code {} when requesting {}", response.code(), uri);
throw new IOException(String.format("Unexpected HTTP response from %s: code %d", uri, response.code()));
}
}
@Override
public int read(byte[] bytes, int offset, int length) throws IOException {
if (eof())
return -1;
int bytesToRead = (int) Math.min(length, this.length - position);
int bytesRead = 0;
while (bytesRead < bytesToRead) {
fillBufferWithChunk(position);
int bytesToCopy = (int) Math.min(buffer.length - (position - bufferPosition), bytesToRead - bytesRead);
System.arraycopy(buffer, (int) (position - bufferPosition), bytes, offset + bytesRead, bytesToCopy);
position += bytesToCopy;
bytesRead += bytesToCopy;
}
return bytesRead;
}
@Override
public void close() throws IOException {
buffer = null;
bufferPosition = 0;
}
@Override
public boolean eof() throws IOException {
return position >= length;
}
@Override
public String getSource() {
return url.toString();
}
}
......@@ -32,3 +32,17 @@ spring.oauth2.resource.userInfoUri:
server.ssl.key-store-type: ${KEY_STORE_TYPE:PKCS12}
server.ssl.key-store: ${KEY_STORE:/dataedge.p12}
server.ssl.key-store-password: ${KEY_STORE_PASSWORD:changeit}
# Configuration for RES properties
res:
connection:
pool:
size: 16
keepalive.ms: 5000
timeout:
connect.ms: 2500
read.ms: 0
chunksize:
header: 4096
data: 20971520
index: 1048576
\ No newline at end of file
......@@ -18,6 +18,7 @@ package eu.elixir.ega.ebi.dataedge.service.internal;
import eu.elixir.ega.ebi.commons.exception.InternalErrorException;
import eu.elixir.ega.ebi.commons.exception.NoContentException;
import eu.elixir.ega.ebi.commons.exception.UnavailableForLegalReasonsException;
import eu.elixir.ega.ebi.dataedge.utils.SimpleSeekableStream;
import eu.elixir.ega.ebi.commons.shared.dto.File;
import eu.elixir.ega.ebi.commons.shared.dto.FileDataset;
import eu.elixir.ega.ebi.commons.shared.dto.FileIndexFile;
......@@ -27,7 +28,6 @@ import eu.elixir.ega.ebi.commons.shared.service.FileInfoService;
import eu.elixir.ega.ebi.dataedge.dto.*;
import eu.elixir.ega.ebi.dataedge.service.FileLengthService;
import eu.elixir.ega.ebi.dataedge.service.KeyService;
import eu.elixir.ega.ebi.htsjdk.samtools.seekablestream.EgaSeekableCachedResStream;
import eu.elixir.ega.ebi.htsjdk.variant.vcf.MyVCFFileReader;
import htsjdk.samtools.SAMFileHeader;
import htsjdk.samtools.SamInputResource;
......@@ -76,7 +76,6 @@ import static eu.elixir.ega.ebi.commons.config.Constants.FILEDATABASE_SERVICE;
/**
* Test class for {@link RemoteFileServiceImpl}.
*
* @author amohan
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({RemoteFileServiceImpl.class, SamReaderFactory.class})
......@@ -279,7 +278,7 @@ public class RemoteFileServiceImplTest {
final ResponseEntity<Long> forSize = mock(ResponseEntity.class);
final ResponseEntity<FileIndexFile[]> forResponseEntity = mock(ResponseEntity.class);
final HttpResult xferResult = mock(HttpResult.class);
final EgaSeekableCachedResStream egaSeekableCachedResStream = mock(EgaSeekableCachedResStream.class);
final SimpleSeekableStream simpleSeekableStream = mock(SimpleSeekableStream.class);
final SamReaderFactory samReaderFactory = mock(SamReaderFactory.class);
final SamReader samReader = mock(SamReader.class);
final MyVCFFileReader myVCFFileReader = mock(MyVCFFileReader.class);
......@@ -311,7 +310,7 @@ public class RemoteFileServiceImplTest {
when(myVCFFileReader.getFileHeader()).thenReturn(vcfHeader);
when(myVCFFileReader.iterator()).thenReturn(closeableIterator);
whenNew(EgaSeekableCachedResStream.class).withAnyArguments().thenReturn(egaSeekableCachedResStream);
whenNew(SimpleSeekableStream.class).withAnyArguments().thenReturn(simpleSeekableStream);
whenNew(MyVCFFileReader.class).withAnyArguments().thenReturn(myVCFFileReader);
mockStatic(SamReaderFactory.class);
......
/*
*
* Copyright 2020 EMBL - European Bioinformatics Institute
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package eu.elixir.ega.ebi.dataedge.utils;
import okhttp3.OkHttpClient;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpHeaders;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockserver.client.MockServerClient;
import org.mockserver.junit.MockServerRule;
import org.mockserver.mock.action.ExpectationResponseCallback;
import org.mockserver.model.HttpRequest;
import org.mockserver.model.HttpResponse;
import org.mockserver.verify.VerificationTimes;
import org.springframework.http.HttpRange;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockserver.model.HttpRequest.request;
public class SimpleSeekableStreamTest {
@Rule
public MockServerRule mockServerRule = new MockServerRule(this, 1337);
private MockServerClient mockServerClient;
public static final Long RESOURCE_SIZE = 10000L;
private final OkHttpClient client = new OkHttpClient();
@Before
public void setupMockResource() {
mockServerClient.reset();
mockServerClient.when(request("/test-data")).respond(new ExpectationResponseCallback() {
@Override
public HttpResponse handle(HttpRequest httpRequest) {
if (httpRequest.getMethod().equals("HEAD")) {
return HttpResponse.response().withHeader(HttpHeaders.CONTENT_LENGTH, RESOURCE_SIZE.toString());
}
String rangeHeader = httpRequest.getFirstHeader(HttpHeaders.RANGE);
List<HttpRange> ranges = HttpRange.parseRanges(rangeHeader);
assertEquals(1, ranges.size());
HttpRange range = ranges.get(0);
long rangeStart = range.getRangeStart(RESOURCE_SIZE);
long rangeEnd = range.getRangeEnd(RESOURCE_SIZE);
byte[] body = new byte[(int) (rangeEnd - rangeStart + 1)];
for (int i = 0; i < body.length; i++) {
body[i] = (byte) ((i + rangeStart) % 0xff);
}
HttpResponse response = HttpResponse.response().withBody(body);
return response;
}
});
mockServerClient.when(request("/missing-data")).respond(HttpResponse.notFoundResponse());
}
@Test
public void downloadHttpUrlInChunks() throws URISyntaxException, IOException {
// Arrange
URL uri = new URL("http://localhost:" + mockServerRule.getPort() + "/test-data");
// Act
SimpleSeekableStream stream = new SimpleSeekableStream(uri, client, 1234);
byte[] wholeFile = IOUtils.toByteArray(stream);
// Assert
assertEquals(RESOURCE_SIZE.longValue(), stream.length());
assertEquals(RESOURCE_SIZE.longValue(), stream.position());
assertTrue(stream.eof());
assertEquals(RESOURCE_SIZE.longValue(), wholeFile.length);
for (int i = 0; i < wholeFile.length; i++) {
assertEquals(i % 0xff, ((int) wholeFile[i]) & 0xff);
}
mockServerClient.verify(request().withPath("/test-data").withMethod("GET"), VerificationTimes.atLeast(2));
}
@Test
public void canReadSingleByte() throws IOException {
// Arrange
URL uri = new URL("http://localhost:" + mockServerRule.getPort() + "/test-data");
// Act
SimpleSeekableStream stream = new SimpleSeekableStream(uri, client, 100);
// Assert
assertEquals(0, stream.read());
assertEquals(1, stream.read());
assertEquals(2, stream.read());
}
@Test
public void seekReadsDataFromExpectedPosition() throws IOException {
// Arrange
URL uri = new URL("http://localhost:" + mockServerRule.getPort() + "/test-data");
// Act