1
1
using System ;
2
+ using System . Text ;
2
3
using MongoDB . Driver ;
3
4
using MongoDB . Driver . Builders ;
4
5
@@ -17,84 +18,99 @@ public class ScheduleRepository : IScheduleRepository
17
18
{
18
19
private readonly IScheduleRepositoryConfiguration configuration ;
19
20
private readonly Func < DateTime > getNow ;
20
- private readonly Lazy < MongoCollection < Schedule > > lazyCollection ;
21
- private readonly Lazy < MongoServer > lazyServer ;
21
+ private readonly Lazy < IMongoCollection < Schedule > > lazyCollection ;
22
22
23
23
public ScheduleRepository ( IScheduleRepositoryConfiguration configuration , Func < DateTime > getNow )
24
24
{
25
25
this . configuration = configuration ;
26
26
this . getNow = getNow ;
27
- lazyServer = new Lazy < MongoServer > ( Connect ) ;
28
- lazyCollection = new Lazy < MongoCollection < Schedule > > ( CreateAndIndex ) ;
27
+ lazyCollection = new Lazy < IMongoCollection < Schedule > > ( CreateAndIndex ) ;
29
28
}
30
29
31
- private MongoServer Server => lazyServer . Value ;
32
-
33
- private MongoCollection < Schedule > Collection => lazyCollection . Value ;
30
+ private IMongoCollection < Schedule > Collection => lazyCollection . Value ;
34
31
35
32
public void Store ( Schedule schedule )
36
33
{
37
- Collection . Insert ( schedule ) ;
34
+ Collection . InsertOne ( schedule ) ;
38
35
}
39
36
40
37
public void Cancel ( string cancellation )
41
38
{
42
- Collection . Remove ( Query < Schedule > . EQ ( x => x . CancellationKey , cancellation ) ) ;
39
+ Collection . DeleteOne ( x => x . CancellationKey == cancellation ) ;
43
40
}
44
41
45
42
public Schedule GetPending ( )
46
43
{
47
44
var now = getNow ( ) ;
48
- var query = Query . And (
49
- Query < Schedule > . EQ ( x => x . State , ScheduleState . Pending ) ,
50
- Query < Schedule > . LTE ( x => x . WakeTime , now ) ) ;
51
- var update = Update . Combine ( Update < Schedule > . Set ( x => x . State , ScheduleState . Publishing ) ,
52
- Update < Schedule > . Set ( x => x . PublishingTime , now ) ) ;
53
- var findAndModifyResult = Collection . FindAndModify ( new FindAndModifyArgs
54
- {
55
- Query = query ,
56
- SortBy = SortBy < Schedule > . Ascending ( x => x . WakeTime ) ,
57
- Update = update ,
58
- VersionReturned = FindAndModifyDocumentVersion . Modified
59
- } ) ;
60
- return findAndModifyResult . GetModifiedDocumentAs < Schedule > ( ) ;
45
+ var filter = Builders < Schedule > . Filter ;
46
+ var query = filter . And (
47
+ filter . Eq ( x => x . State , ScheduleState . Pending ) ,
48
+ filter . Lte ( x => x . WakeTime , now ) ) ;
49
+ var update = Builders < Schedule > . Update
50
+ . Set ( x => x . State , ScheduleState . Publishing )
51
+ . Set ( x => x . PublishingTime , now ) ; ;
52
+ var options = new FindOneAndUpdateOptions < Schedule > { Sort = Builders < Schedule > . Sort . Ascending ( x => x . WakeTime ) , ReturnDocument = ReturnDocument . After } ;
53
+ var findAndModifyResult = Collection . FindOneAndUpdate ( query , update , options ) ;
54
+
55
+ return findAndModifyResult ;
61
56
}
62
57
63
58
public void MarkAsPublished ( Guid id )
64
59
{
65
60
var now = getNow ( ) ;
66
- var query = Query . And ( Query < Schedule > . EQ ( x => x . Id , id ) ) ;
67
- var update = Update . Combine ( Update < Schedule > . Set ( x => x . State , ScheduleState . Published ) ,
68
- Update < Schedule > . Set ( x => x . PublishedTime , now ) ,
69
- Update < Schedule > . Unset ( x => x . PublishingTime )
70
- ) ;
71
- Collection . Update ( query , update ) ;
61
+ var update = Builders < Schedule > . Update
62
+ . Set ( x => x . State , ScheduleState . Published )
63
+ . Set ( x => x . PublishedTime , now )
64
+ . Unset ( x => x . PublishingTime ) ;
65
+ Collection . UpdateOne ( x => x . Id == id , update ) ;
72
66
}
73
67
74
68
public void HandleTimeout ( )
75
69
{
76
70
var publishingTimeTimeout = getNow ( ) - configuration . PublishTimeout ;
77
- var query = Query . And ( Query < Schedule > . EQ ( x => x . State , ScheduleState . Publishing ) ,
78
- Query < Schedule > . LTE ( x => x . PublishingTime , publishingTimeTimeout ) ) ;
79
- var update = Update . Combine ( Update < Schedule > . Set ( x => x . State , ScheduleState . Pending ) ,
80
- Update < Schedule > . Unset ( x => x . PublishingTime ) ) ;
81
- Collection . Update ( query , update , UpdateFlags . Multi ) ;
71
+ var filter = Builders < Schedule > . Filter ;
72
+ var query = filter . And ( filter . Eq ( x => x . State , ScheduleState . Publishing ) ,
73
+ filter . Lte ( x => x . PublishingTime , publishingTimeTimeout ) ) ;
74
+ var update = Builders < Schedule > . Update
75
+ . Set ( x => x . State , ScheduleState . Pending )
76
+ . Unset ( x => x . PublishingTime ) ;
77
+ Collection . UpdateMany ( query , update ) ;
82
78
}
83
79
84
- private MongoCollection < Schedule > CreateAndIndex ( )
80
+ private IMongoCollection < Schedule > CreateAndIndex ( )
85
81
{
86
- var collection = Server . GetDatabase ( configuration . DatabaseName )
87
- . GetCollection < Schedule > ( configuration . CollectionName ) ;
88
- collection . CreateIndex ( IndexKeys < Schedule > . Ascending ( x => x . CancellationKey ) , IndexOptions . SetSparse ( true ) ) ;
89
- collection . CreateIndex ( IndexKeys < Schedule > . Ascending ( x => x . State , x => x . WakeTime ) ) ;
90
- collection . CreateIndex ( IndexKeys < Schedule > . Ascending ( x => x . PublishedTime ) ,
91
- IndexOptions . SetTimeToLive ( configuration . DeleteTimeout ) . SetSparse ( true ) ) ;
82
+ var collection = GetICollection < Schedule > ( configuration . ConnectionString , configuration . DatabaseName , configuration . CollectionName ) ;
83
+ collection . Indexes . CreateOne ( new CreateIndexModel < Schedule > (
84
+ Builders < Schedule > . IndexKeys . Ascending ( x => x . CancellationKey ) ,
85
+ new CreateIndexOptions { Sparse = true } ) ) ;
86
+ collection . Indexes . CreateOne ( new CreateIndexModel < Schedule > (
87
+ Builders < Schedule > . IndexKeys
88
+ . Ascending ( x => x . State )
89
+ . Ascending ( x => x . WakeTime ) ) ) ;
90
+
91
+ collection . Indexes . CreateOne ( new CreateIndexModel < Schedule > (
92
+ Builders < Schedule > . IndexKeys . Ascending ( x => x . PublishedTime ) ,
93
+ new CreateIndexOptions
94
+ {
95
+ Sparse = true ,
96
+ ExpireAfter = configuration . DeleteTimeout
97
+ } ) ) ;
98
+
92
99
return collection ;
93
100
}
94
101
95
- private MongoServer Connect ( )
102
+ private static IMongoDatabase CreateDatabase ( MongoUrl connectionString , string databaseName )
103
+ {
104
+ var settings = MongoClientSettings . FromUrl ( connectionString ) ;
105
+ settings . ReadEncoding = new UTF8Encoding ( false , false ) ;
106
+ var client = new MongoClient ( settings ) ;
107
+ return client . GetDatabase ( databaseName ) ;
108
+ }
109
+
110
+ private static IMongoCollection < TDocument > GetICollection < TDocument > ( string connectionString , string databaseName , string collectionName )
96
111
{
97
- return new MongoClient ( configuration . ConnectionString ) . GetServer ( ) ;
112
+ var database = CreateDatabase ( new MongoUrl ( connectionString ) , databaseName ) ;
113
+ return database . GetCollection < TDocument > ( collectionName ) ;
98
114
}
99
115
}
100
116
}
0 commit comments