Skip to content

Commit eb12bd5

Browse files
committed
Added the new mappers
1 parent 8d1f615 commit eb12bd5

File tree

3 files changed

+182
-0
lines changed

3 files changed

+182
-0
lines changed
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package org.biojava.spark.mappers;
2+
3+
import java.io.ByteArrayInputStream;
4+
import java.io.ByteArrayOutputStream;
5+
import java.io.IOException;
6+
import java.io.Serializable;
7+
import java.util.List;
8+
9+
import org.apache.hadoop.io.BytesWritable;
10+
import org.apache.hadoop.io.Text;
11+
import org.apache.spark.api.java.JavaPairRDD;
12+
import org.apache.spark.api.java.JavaSparkContext;
13+
import org.biojava.nbio.structure.Structure;
14+
import org.biojava.nbio.structure.StructureException;
15+
import org.biojava.nbio.structure.StructureIO;
16+
import org.biojava.nbio.structure.StructureImpl;
17+
import org.biojava.nbio.structure.io.mmtf.MmtfStructureReader;
18+
import org.biojava.nbio.structure.io.mmtf.MmtfStructureWriter;
19+
import org.biojava.nbio.structure.io.mmtf.MmtfUtils;
20+
import org.rcsb.mmtf.dataholders.MmtfStructure;
21+
import org.rcsb.mmtf.decoder.GenericDecoder;
22+
import org.rcsb.mmtf.decoder.StructureDataToAdapter;
23+
import org.rcsb.mmtf.encoder.AdapterToStructureData;
24+
import org.rcsb.mmtf.encoder.GenericEncoder;
25+
import org.rcsb.mmtf.encoder.WriterUtils;
26+
import org.rcsb.mmtf.serialization.MessagePackSerialization;
27+
import org.rcsb.mmtf.spark.utils.SparkUtils;
28+
29+
import scala.Tuple2;
30+
31+
/**
32+
* A class to preserve the log if the functions in mappers.
33+
* Mappers should not contain logic - as they are hard to test.
34+
* @author Anthony Bradley
35+
*
36+
*/
37+
public class MapperUtils implements Serializable{
38+
39+
private static final long serialVersionUID = -4717807367698811030L;
40+
41+
/**
42+
* Converts a byte array of the messagepack (mmtf) to a Biojava structure.
43+
* @param pdbCodePlus The pdb code is the first four characters. Additional characters can be used.
44+
* @param inputByteArr The message pack bytre array to be decoded.
45+
* @return the decoded and inflated structure
46+
* @throws IOException
47+
*/
48+
public static Structure byteArrToBiojavaStruct(String pdbCodePlus, byte[] inputByteArr) throws IOException {
49+
Structure newStruct;
50+
try{
51+
newStruct = MapperUtils.getFomByteArray(inputByteArr);
52+
}
53+
catch(Exception e){
54+
System.out.println(e);
55+
System.out.println(pdbCodePlus);
56+
newStruct = new StructureImpl();
57+
return newStruct;
58+
}
59+
return newStruct;
60+
}
61+
/**
62+
* PDB RDD generator. Converts a list of pdb ids to a {@link JavaPairRDD}
63+
* with key {@link Text} and value {@link BytesWritable}.
64+
* @param sparkContext the input {@link JavaSparkContext}
65+
* @param inputList a {@link List} of Strings of the input PDB ids
66+
* @return a {@link JavaPairRDD} with key {@link Text} and value {@link BytesWritable}
67+
*/
68+
public static JavaPairRDD<Text, BytesWritable> generateRdd(List<String> inputList, String ccdUrl) {
69+
MmtfUtils.setUpBioJava(ccdUrl);
70+
return SparkUtils.getSparkContext().parallelize(inputList)
71+
.mapToPair(t -> MapperUtils.getByteArray(t))
72+
.mapToPair(t -> new Tuple2<String,byte[]>(t._1, WriterUtils.gzipCompress(t._2)))
73+
.mapToPair(new StringByteToTextByteWriter());
74+
}
75+
76+
/**
77+
* Get the available data as a byte array from an input PDB id.
78+
* @param pdbId the input PDB id
79+
* @return the data as a byte array
80+
* @throws StructureException an error parsing the {@link Structure} using Biojava
81+
* @throws IOException an error accessing the file
82+
*/
83+
public static Tuple2<String,byte[]> getByteArray(String pdbId) throws IOException, StructureException {
84+
Structure structure = StructureIO.getStructure(pdbId);
85+
byte[] outByteArr = produceByteArray(structure, "Biojava-spark default");
86+
return new Tuple2<String,byte[]>(structure.getPDBCode(), outByteArr);
87+
}
88+
89+
90+
/**
91+
* Get the available data as a byte array from an input PDB id.
92+
* @param pdbId the input PDB id
93+
* @return the data as a byte array
94+
* @throws StructureException an error parsing the {@link Structure} using Biojava
95+
* @throws IOException an error accessing the file
96+
*/
97+
public static Tuple2<String,byte[]> getByteArray(String pdbId, String producer) throws IOException, StructureException {
98+
Structure structure = StructureIO.getStructure(pdbId);
99+
byte[] outByteArr = produceByteArray(structure, producer);
100+
return new Tuple2<String,byte[]>(structure.getPDBCode(), outByteArr);
101+
}
102+
103+
private static byte[] produceByteArray(Structure structure, String mmtfProducer) {
104+
MmtfStructure mmtfStructure = encodeStructure(structure);
105+
mmtfStructure.setMmtfProducer(mmtfProducer);
106+
ByteArrayOutputStream bos = new ByteArrayOutputStream();
107+
new MessagePackSerialization().serialize(mmtfStructure, bos);
108+
return bos.toByteArray();
109+
}
110+
111+
private static MmtfStructure encodeStructure(Structure structure) {
112+
AdapterToStructureData inflatorToGet = new AdapterToStructureData();
113+
new MmtfStructureWriter(structure, inflatorToGet);
114+
MmtfStructure mmtfStructure = new GenericEncoder(inflatorToGet).getMmtfEncodedStructure();
115+
return mmtfStructure;
116+
}
117+
private static Structure getFomByteArray(byte[] inputByteArr) {
118+
MmtfStructureReader mmtfStructureReader = new MmtfStructureReader();
119+
new StructureDataToAdapter(new GenericDecoder(new MessagePackSerialization().deserialize(new ByteArrayInputStream(inputByteArr))), mmtfStructureReader);
120+
return mmtfStructureReader.getStructure();
121+
}
122+
123+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package org.biojava.spark.mappers;
2+
3+
4+
import org.apache.spark.api.java.function.PairFunction;
5+
import org.biojava.spark.mappers.MapperUtils;
6+
7+
import scala.Tuple2;
8+
9+
/**
10+
* Generate the internal data structure (using biojava) from a PDB code.
11+
* @author Anthony Bradley
12+
*
13+
*/
14+
public class PdbIdToMmtf implements PairFunction<String, String, byte[]>{
15+
16+
private String producer = "Biojava spark";
17+
18+
/**
19+
* Constructor to provide the producer.
20+
* @param producer a string describing the producer
21+
*/
22+
public PdbIdToMmtf(String producer){
23+
this.producer = producer;
24+
}
25+
26+
private static final long serialVersionUID = 786599975302506694L;
27+
28+
@Override
29+
public Tuple2<String, byte[]> call(String t) throws Exception {
30+
return MapperUtils.getByteArray(t, this.producer);
31+
}
32+
33+
34+
}
35+
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package org.biojava.spark.mappers;
2+
3+
import org.apache.hadoop.io.BytesWritable;
4+
import org.apache.hadoop.io.Text;
5+
import org.apache.spark.api.java.function.PairFunction;
6+
7+
import scala.Tuple2;
8+
9+
/**
10+
* Converts a tuple of string and byte array, to a Text and Bytes writeable.
11+
* This is required for writing hadoop sequence files of data in this format.
12+
* @author Anthony Bradley
13+
*
14+
*/
15+
public class StringByteToTextByteWriter implements PairFunction<Tuple2<String,byte[]>, Text, BytesWritable>{
16+
17+
private static final long serialVersionUID = 8149053011560186912L;
18+
19+
@Override
20+
public Tuple2<Text, BytesWritable> call(Tuple2<String, byte[]> t) throws Exception {
21+
return new Tuple2<Text, BytesWritable>(new Text(t._1), new BytesWritable(t._2));
22+
}
23+
24+
}

0 commit comments

Comments
 (0)