Skip to content

Commit 4b50106

Browse files
committed
Added missing files
1 parent 10bc86e commit 4b50106

24 files changed

+344
-73
lines changed
Lines changed: 178 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,57 +1,205 @@
11
package ldbc.snb.datagen.generator;
22

33
import ldbc.snb.datagen.dictionary.Dictionaries;
4+
import ldbc.snb.datagen.generator.tools.GraphUtils;
5+
import ldbc.snb.datagen.generator.tools.MinHash;
6+
import ldbc.snb.datagen.generator.tools.PersonGraph;
47
import ldbc.snb.datagen.objects.Knows;
58
import ldbc.snb.datagen.objects.Person;
69
import ldbc.snb.datagen.util.RandomGeneratorFarm;
710

8-
import java.util.ArrayList;
11+
import java.util.*;
912

1013
/**
1114
* Created by aprat on 11/15/14.
1215
*/
1316
public class ClusteringKnowsGenerator implements KnowsGenerator {
1417

15-
private RandomGeneratorFarm randomFarm;
18+
DistanceKnowsGenerator distanceKnowsGenerator_;
19+
private double targetCC_ = 0.5;
20+
private int maxIterations_ = 100;
21+
private int numMinHashes_ = 10;
22+
private Map<Long, Integer> personPosition;
1623

1724
public ClusteringKnowsGenerator() {
18-
this.randomFarm = new RandomGeneratorFarm();
25+
this.distanceKnowsGenerator_ = new DistanceKnowsGenerator();
26+
}
27+
28+
public class MinHashComparator implements Comparator<MinHashTuple> {
29+
private int function_;
30+
MinHashComparator(int function) {
31+
function_ = function;
32+
}
33+
@Override
34+
public int compare(MinHashTuple o1, MinHashTuple o2) {
35+
long a = o1.minHashes_.get(function_);
36+
long b = o1.minHashes_.get(function_);
37+
if( a < b ) return -1;
38+
if( a > b ) return 1;
39+
return 0;
40+
}
41+
}
42+
43+
protected class MinHashTuple {
44+
public long index_;
45+
ArrayList<Long> minHashes_;
46+
MinHashTuple( long index, ArrayList<Long> minHashes ){
47+
index_ = index;
48+
minHashes_ = minHashes;
49+
}
50+
}
51+
52+
protected class PersonTuple implements Comparable<PersonTuple> {
53+
public long a_;
54+
public long b_;
55+
public double score_;
56+
57+
PersonTuple(long a, long b, double score) {
58+
a_=Math.min(a,b);
59+
b_=Math.max(a,b);
60+
score_ = score;
61+
}
62+
63+
@Override
64+
public int compareTo(PersonTuple b) {
65+
if( score_ < b.score_) return 1;
66+
if( score_ > b.score_) return -1;
67+
if( a_ < b.a_) return -1;
68+
if( a_ > b.a_) return 1;
69+
if( b_ < b.b_) return -1;
70+
if( b_ > b.b_) return 1;
71+
return 0;
72+
}
1973
}
2074

2175
public void generateKnows( ArrayList<Person> persons, int seed, float upperBound ) {
22-
randomFarm.resetRandomGenerators(seed);
76+
Random random = new Random();
77+
Map<Long, Integer> personPosition = new HashMap<Long, Integer>();
2378
for( int i = 0; i < persons.size(); ++i ) {
24-
Person p = persons.get(i);
25-
for( int j = i+1; (p.maxNumKnows()*upperBound > p.knows().size()) /*&& (j < (i + 1000))*/ && (j < persons.size()); ++j ) {
26-
if( know(p, persons.get(j), j - i, upperBound)) {
27-
createKnow(p, persons.get(j));
79+
personPosition.put(persons.get(i).accountId(), i);
80+
}
81+
distanceKnowsGenerator_.generateKnows(persons,seed,upperBound);
82+
PersonGraph bestGraph = new PersonGraph(persons);
83+
MinHash minHash = new MinHash(numMinHashes_, 0);
84+
double bestCC = GraphUtils.ClusteringCoefficient(bestGraph);
85+
int numIterations = 0;
86+
ArrayList<MinHashTuple> minHashes = new ArrayList<MinHashTuple>();
87+
PersonGraph currentGraph = new PersonGraph(bestGraph);
88+
double currentCC = bestCC;
89+
while(bestCC < targetCC_ && numIterations < maxIterations_){
90+
System.out.println("Starting Refining Iteration "+bestCC+" "+currentCC+" "+numIterations);
91+
minHashes.clear();
92+
for(Long l : currentGraph.persons()) {
93+
minHashes.add(new MinHashTuple(l, minHash.minHash(currentGraph.neighbors(l))));
94+
}
95+
PriorityQueue<PersonTuple> pq = new PriorityQueue<PersonTuple>();
96+
for(int k = 0; k < numMinHashes_; ++k ) {
97+
Collections.sort(minHashes, new MinHashComparator(k));
98+
for(int i = 0; i < minHashes.size() - 1; ++i) {
99+
long personA = minHashes.get(i).index_;
100+
long personB = minHashes.get(i+1).index_;
101+
double score = computeScore(currentGraph,personA,personB);
102+
if(score > 0.5) {
103+
pq.add(new PersonTuple(personA, personB, score));
104+
}
105+
}
106+
}
107+
random.setSeed(numIterations);
108+
HashSet<Long> touched = new HashSet<Long>();
109+
while(pq.size() > 0) {
110+
PersonTuple t = pq.poll();
111+
long personA = t.a_;
112+
long personB = t.b_;
113+
HashSet<Long> candidatesA = new HashSet<Long>(currentGraph.neighbors(personA));
114+
HashSet<Long> candidatesB = new HashSet<Long>(currentGraph.neighbors(personB));
115+
HashSet<Long> intersection = new HashSet<Long>(candidatesA);
116+
intersection.retainAll(candidatesB);
117+
candidatesA.removeAll(intersection);
118+
candidatesB.removeAll(intersection);
119+
if(candidatesA.size() > 0 && candidatesB.size() > 0) {
120+
List<Long> candidatesListA = new ArrayList<Long>(candidatesA);
121+
List<Long> candidatesListB = new ArrayList<Long>(candidatesB);
122+
Collections.shuffle(candidatesListA,random);
123+
Collections.shuffle(candidatesListB,random);
124+
long kq = candidatesListA.get(random.nextInt(candidatesListA.size()));
125+
long kt = candidatesListB.get(random.nextInt(candidatesListB.size()));
126+
if( !touched.contains(personA) &&
127+
!touched.contains(personB) &&
128+
!touched.contains(kq) &&
129+
!touched.contains(kt)
130+
) {
131+
touched.add(personA);
132+
touched.add(personB);
133+
touched.add(kq);
134+
touched.add(kt);
135+
/*HashSet<Long> auxA = new HashSet<Long>(currentGraph.neighbors(personA));
136+
auxA.retainAll(currentGraph.neighbors(kq));
137+
long degreeAuxA = Math.min(currentGraph.neighbors(personA).size(), currentGraph.neighbors(kq).size());
138+
double scoreAq = 0.0;
139+
if(degreeAuxA > 1)
140+
scoreAq = (2*auxA.size()) / (double)(degreeAuxA*(degreeAuxA-1));
141+
142+
HashSet<Long> auxB = new HashSet<Long>(currentGraph.neighbors(personB));
143+
long degreeAuxB = Math.min(currentGraph.neighbors(personB).size(), currentGraph.neighbors(kt).size());
144+
auxB.retainAll(currentGraph.neighbors(kt));
145+
double scoreBt = 0.0;
146+
if(degreeAuxB > 1)
147+
scoreBt = (2*auxA.size()) / (double)(degreeAuxB*(degreeAuxB-1));
148+
if( scoreBt > bestCC && scoreAq > bestCC)
149+
*/
150+
rewire(currentGraph, t.a_, t.b_, kq, kt);
151+
}
28152
}
29-
}
153+
}
154+
currentCC = GraphUtils.ClusteringCoefficient(currentGraph);
155+
numIterations++;
156+
if(currentCC > bestCC ) {
157+
numIterations = 0;
158+
bestGraph = currentGraph;
159+
bestCC = currentCC;
160+
}
30161
}
31-
}
32162

33-
boolean know( Person personA, Person personB, int dist, float upperBound ) {
34-
if((float)(personA.knows().size()) >= (float)(personA.maxNumKnows())*upperBound ||
35-
personB.knows().size() >= (float)(personB.maxNumKnows())*upperBound ) return false;
36-
double randProb = randomFarm.get(RandomGeneratorFarm.Aspect.UNIFORM).nextDouble();
37-
double prob = Math.pow(DatagenParams.baseProbCorrelated, dist);
38-
if ((randProb < prob) || (randProb < DatagenParams.limitProCorrelated)) {
39-
return true;
163+
for(Person p : persons) {
164+
p.knows().clear();
165+
}
166+
for(Long l : bestGraph.persons()) {
167+
Person p = persons.get(personPosition.get(l));
168+
Set<Long> neighbors = bestGraph.neighbors(l);
169+
for( Long n: neighbors) {
170+
if( l < n ) {
171+
Person other = persons.get(personPosition.get(n));
172+
p.knows().add(new Knows(other, 0, 0));
173+
other.knows().add(new Knows(p, 0, 0));
174+
}
175+
}
40176
}
41-
return false;
42177
}
43178

44-
void createKnow( Person personA, Person personB ) {
45-
long creationDate = Dictionaries.dates.randomKnowsCreationDate(
46-
randomFarm.get(RandomGeneratorFarm.Aspect.DATE),
47-
personA,
48-
personB);
49-
creationDate = creationDate - personA.creationDate() >= DatagenParams.deltaTime ? creationDate : creationDate + (DatagenParams.deltaTime - (creationDate - personA.creationDate()));
50-
creationDate = creationDate - personB.creationDate() >= DatagenParams.deltaTime ? creationDate : creationDate + (DatagenParams.deltaTime - (creationDate - personB.creationDate()));
51-
if( creationDate <= Dictionaries.dates.getEndDateTime() ) {
52-
float similarity = Person.Similarity(personA,personB);
53-
personB.knows().add(new Knows(personA, creationDate, similarity));
54-
personA.knows().add(new Knows(personB, creationDate, similarity));
55-
}
179+
void rewire(PersonGraph graph, long i , long j, long q, long t) {
180+
graph.neighbors(i).add(j);
181+
graph.neighbors(j).add(i);
182+
graph.neighbors(i).remove(q);
183+
graph.neighbors(q).remove(i);
184+
graph.neighbors(j).remove(t);
185+
graph.neighbors(t).remove(j);
186+
graph.neighbors(q).add(t);
187+
graph.neighbors(t).add(q);
56188
}
189+
190+
double computeScore(PersonGraph graph, long personA, long personB){
191+
if(graph.neighbors(personA).contains(personB)) return 0.0;
192+
HashSet<Long> intersection = new HashSet<Long>(graph.neighbors(personA));
193+
intersection.retainAll(graph.neighbors(personB));
194+
if(intersection.size() == 0) return 0.0;
195+
int degreeA = graph.neighbors(personA).size();
196+
double score = 0.0;
197+
if(degreeA > 1)
198+
score += intersection.size() / (double) (degreeA * (degreeA - 1));
199+
int degreeB = graph.neighbors(personB).size();
200+
if(degreeB > 1)
201+
score += intersection.size() / (double) (degreeB * (degreeB - 1));
202+
return score;
203+
}
204+
57205
}

src/main/java/ldbc/snb/datagen/generator/DistanceKnowsGenerator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
/**
1111
* Created by aprat on 11/15/14.
1212
*/
13-
public class DistanceKnowsGenerator {
13+
public class DistanceKnowsGenerator implements KnowsGenerator {
1414

1515
private RandomGeneratorFarm randomFarm;
1616

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
package ldbc.snb.datagen.generator;
22

3+
import ldbc.snb.datagen.objects.Person;
4+
5+
import java.util.ArrayList;
6+
37
/**
48
* Created by aprat on 11/06/15.
59
*/
610
public interface KnowsGenerator {
11+
public void generateKnows( ArrayList<Person> persons, int seed, float upperBound );
712
}

src/main/java/ldbc/snb/datagen/generator/LDBCDatagen.java

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,13 @@ public int runGenerateJob(Configuration conf) throws Exception {
103103
fs.delete(new Path(personsFileName1), true);
104104
long endRandom= System.currentTimeMillis();
105105

106-
/* printProgress("Creating edges to fill the degree gap");
106+
/*printProgress("Creating edges to fill the degree gap");
107107
long startGap = System.currentTimeMillis();
108108
knowsGenerator = new HadoopKnowsGenerator(conf,null, "ldbc.snb.datagen.hadoop.DegreeGapKeySetter", 1.0f);
109109
knowsGenerator.run(personsFileName2,personsFileName1);
110110
fs.delete(new Path(personsFileName2), true);
111-
long endGap = System.currentTimeMillis();*/
111+
long endGap = System.currentTimeMillis();
112+
*/
112113

113114
printProgress("Serializing persons");
114115
long startPersonSerializing= System.currentTimeMillis();
@@ -139,7 +140,6 @@ public int runGenerateJob(Configuration conf) throws Exception {
139140
long startSortingUpdateStreams= System.currentTimeMillis();
140141
if(conf.getBoolean("ldbc.snb.datagen.serializer.updateStreams", false)) {
141142
printProgress("Sorting update streams ");
142-
}
143143

144144
int blockSize = DatagenParams.blockSize;
145145
int numBlocks = (int)Math.ceil(DatagenParams.numPersons / (double)blockSize);
@@ -148,23 +148,19 @@ public int runGenerateJob(Configuration conf) throws Exception {
148148
int numPartitions = conf.getInt("ldbc.snb.datagen.serializer.numUpdatePartitions", 1);
149149
if( i < numBlocks ) {
150150
for (int j = 0; j < numPartitions; ++j) {
151-
if (conf.getBoolean("ldbc.snb.datagen.serializer.updateStreams", false)) {
152-
HadoopFileSorter updateStreamSorter = new HadoopFileSorter(conf, LongWritable.class, Text.class);
153-
updateStreamSorter.run(DatagenParams.hadoopDir + "/temp_updateStream_person_" + i + "_" + j, DatagenParams.hadoopDir + "/updateStream_person_" + i + "_" + j);
154-
updateStreamSorter.run(DatagenParams.hadoopDir + "/temp_updateStream_forum_" + i + "_" + j, DatagenParams.hadoopDir + "/updateStream_forum_" + i + "_" + j);
155-
}
151+
HadoopFileSorter updateStreamSorter = new HadoopFileSorter(conf, LongWritable.class, Text.class);
152+
updateStreamSorter.run(DatagenParams.hadoopDir + "/temp_updateStream_person_" + i + "_" + j, DatagenParams.hadoopDir + "/updateStream_person_" + i + "_" + j);
153+
updateStreamSorter.run(DatagenParams.hadoopDir + "/temp_updateStream_forum_" + i + "_" + j, DatagenParams.hadoopDir + "/updateStream_forum_" + i + "_" + j);
156154

157155
fs.delete(new Path(DatagenParams.hadoopDir + "/temp_updateStream_person_" + i + "_" + j), true);
158156
fs.delete(new Path(DatagenParams.hadoopDir + "/temp_updateStream_forum_" + i + "_" + j), true);
159157

160-
if (conf.getBoolean("ldbc.snb.datagen.serializer.updateStreams", false)) {
161-
HadoopUpdateStreamSerializer updateSerializer = new HadoopUpdateStreamSerializer(conf);
162-
updateSerializer.run(DatagenParams.hadoopDir + "/updateStream_person_" + i + "_" + j, i, j, "person");
163-
updateSerializer.run(DatagenParams.hadoopDir + "/updateStream_forum_" + i + "_" + j, i, j, "forum");
158+
HadoopUpdateStreamSerializer updateSerializer = new HadoopUpdateStreamSerializer(conf);
159+
updateSerializer.run(DatagenParams.hadoopDir + "/updateStream_person_" + i + "_" + j, i, j, "person");
160+
updateSerializer.run(DatagenParams.hadoopDir + "/updateStream_forum_" + i + "_" + j, i, j, "forum");
164161

165-
fs.delete(new Path(DatagenParams.hadoopDir + "/updateStream_person_" + i + "_" + j), true);
166-
fs.delete(new Path(DatagenParams.hadoopDir + "/updateStream_forum_" + i + "_" + j), true);
167-
}
162+
fs.delete(new Path(DatagenParams.hadoopDir + "/updateStream_person_" + i + "_" + j), true);
163+
fs.delete(new Path(DatagenParams.hadoopDir + "/updateStream_forum_" + i + "_" + j), true);
168164
}
169165
} else {
170166
for (int j = 0; j < numPartitions; ++j) {
@@ -174,7 +170,6 @@ public int runGenerateJob(Configuration conf) throws Exception {
174170
}
175171
}
176172

177-
if(conf.getBoolean("ldbc.snb.datagen.serializer.updateStreams", false)) {
178173
long minDate = Long.MAX_VALUE;
179174
long maxDate = Long.MIN_VALUE;
180175
long count = 0;
@@ -234,8 +229,8 @@ public int runGenerateJob(Configuration conf) throws Exception {
234229
+ " total seconds");
235230
System.out.println("Person generation time: "+((endPerson - startPerson) / 1000));
236231
System.out.println("University correlated edge generation time: "+((endUniversity - startUniversity) / 1000));
237-
System.out.println("Interest correlated edge generation time: "+((endInterest - startInterest) / 1000));
238-
System.out.println("Random correlated edge generation time: "+((endRandom - startRandom) / 1000));
232+
// System.out.println("Interest correlated edge generation time: "+((endInterest - startInterest) / 1000));
233+
// System.out.println("Random correlated edge generation time: "+((endRandom - startRandom) / 1000));
239234
System.out.println("Person serialization time: "+((endPersonSerializing - startPersonSerializing) / 1000));
240235
System.out.println("Person activity generation and serialization time: "+((endPersonActivity - startPersonActivity) / 1000));
241236
System.out.println("Sorting update streams time: "+((endSortingUpdateStreams - startSortingUpdateStreams) / 1000));

src/main/java/ldbc/snb/datagen/generator/PersonActivityGenerator.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,9 @@ public void generateActivityForBlock( int seed, ArrayList<Person> block, Context
209209
int counter = 0;
210210
for( Person p : block ) {
211211
generateActivity(p, block);
212-
updateSerializer_.changePartition();
212+
if( DatagenParams.updateStreams ) {
213+
updateSerializer_.changePartition();
214+
}
213215
if( counter % 100 == 0 ) {
214216
context.setStatus("Generating activity of person "+counter+" of block"+seed);
215217
}

src/main/java/ldbc/snb/datagen/generator/distribution/ZipfDistribution.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ public class ZipfDistribution implements DegreeDistribution {
1414
private double ALPHA_ = 1.7;
1515

1616
public void initialize(Configuration conf) {
17-
ALPHA_ = conf.getDouble("ldbc.snb.datagen.generator.distribution.MOEZipfDistribution.alpha",ALPHA_);
17+
ALPHA_ = conf.getDouble("ldbc.snb.datagen.generator.distribution.ZipfDistribution.alpha",ALPHA_);
1818
zipf_ = new org.apache.commons.math3.distribution.ZipfDistribution(10000, ALPHA_);
1919
}
2020

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,30 @@
11
package ldbc.snb.datagen.generator.tools;
22

3+
import ldbc.snb.datagen.objects.Knows;
4+
import ldbc.snb.datagen.objects.Person;
5+
6+
import java.util.*;
7+
38
/**
49
* Created by aprat on 17/06/15.
510
*/
611
public class GraphUtils {
12+
13+
public static double ClusteringCoefficient( PersonGraph graph ) {
14+
double CC = 0.0;
15+
for( Long l : graph.persons()) {
16+
int triangles = 0;
17+
Set<Long> neighbors = graph.neighbors(l);
18+
for( Long n : neighbors){
19+
Set<Long> neighbors2 = graph.neighbors(n);
20+
Set<Long> aux = new HashSet<Long>(neighbors);
21+
aux.retainAll(neighbors2);
22+
triangles+=aux.size();
23+
}
24+
int degree = neighbors.size();
25+
if(triangles > 0)
26+
CC+=triangles / (double)(degree*(degree-1));
27+
}
28+
return CC / graph.persons().size();
29+
}
730
}

0 commit comments

Comments
 (0)