-
Notifications
You must be signed in to change notification settings - Fork 42
Expand file tree
/
Copy pathEDQueueStorageEngine.m
More file actions
executable file
·224 lines (186 loc) · 5.73 KB
/
EDQueueStorageEngine.m
File metadata and controls
executable file
·224 lines (186 loc) · 5.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
//
// EDQueueStorage.m
// queue
//
// Created by Andrew Sliwinski on 9/17/12.
// Copyright (c) 2012 DIY, Co. All rights reserved.
//
#import "EDQueueStorageEngine.h"
#import "FMDatabase.h"
#import "FMDatabaseAdditions.h"
#import "FMDatabasePool.h"
#import "FMDatabaseQueue.h"
@implementation EDQueueStorageEngine
#pragma mark - Init
- (id)initWithPath:(NSString *)inputPath
{
self = [super init];
if (self) {
// Database path
NSArray *paths = NSSearchPathForDirectoriesInDomains(NSDocumentDirectory, NSUserDomainMask,YES);
NSString *documentsDirectory = [paths objectAtIndex:0];
NSString *path = [documentsDirectory stringByAppendingPathComponent:inputPath];
// Allocate the queue
_queue = [[FMDatabaseQueue alloc] initWithPath:path];
[self.queue inDatabase:^(FMDatabase *db) {
[db executeUpdate:@"CREATE TABLE IF NOT EXISTS queue (id INTEGER PRIMARY KEY, task TEXT NOT NULL, data TEXT NOT NULL, attempts INTEGER DEFAULT 0, stamp STRING DEFAULT (strftime('%s','now')) NOT NULL, udef_1 TEXT, udef_2 TEXT)"];
[self _databaseHadError:[db hadError] fromDatabase:db];
}];
}
return self;
}
- (void)dealloc
{
_queue = nil;
}
#pragma mark - Public methods
/**
* Creates a new job within the datastore.
*
* @param {NSString} Data (JSON string)
* @param {NSString} Task name
*
* @return {void}
*/
- (void)createJob:(id)data forTask:(id)task
{
NSString *dataString = [[NSString alloc] initWithData:[NSJSONSerialization dataWithJSONObject:data options:NSJSONWritingPrettyPrinted error:nil] encoding:NSUTF8StringEncoding];
[self.queue inDatabase:^(FMDatabase *db) {
[db executeUpdate:@"INSERT INTO queue (task, data) VALUES (?, ?)", task, dataString];
[self _databaseHadError:[db hadError] fromDatabase:db];
}];
}
/**
* Tells if a job exists for the specified task name.
*
* @param {NSString} Task name
*
* @return {BOOL}
*/
- (BOOL)jobExistsForTask:(id)task
{
__block BOOL jobExists = NO;
[self.queue inDatabase:^(FMDatabase *db) {
FMResultSet *rs = [db executeQuery:@"SELECT count(id) AS count FROM queue WHERE task = ?", task];
[self _databaseHadError:[db hadError] fromDatabase:db];
while ([rs next]) {
jobExists |= ([rs intForColumn:@"count"] > 0);
}
[rs close];
}];
return jobExists;
}
/**
* Increments the "attempts" column for a specified job.
*
* @param {NSNumber} Job id
*
* @return {void}
*/
- (void)incrementAttemptForJob:(NSNumber *)jid
{
[self.queue inDatabase:^(FMDatabase *db) {
[db executeUpdate:@"UPDATE queue SET attempts = attempts + 1 WHERE id = ?", jid];
[self _databaseHadError:[db hadError] fromDatabase:db];
}];
}
/**
* Removes a job from the datastore using a specified id.
*
* @param {NSNumber} Job id
*
* @return {void}
*/
- (void)removeJob:(NSNumber *)jid
{
[self.queue inDatabase:^(FMDatabase *db) {
[db executeUpdate:@"DELETE FROM queue WHERE id = ?", jid];
[self _databaseHadError:[db hadError] fromDatabase:db];
}];
}
/**
* Removes all pending jobs from the datastore
*
* @return {void}
*
*/
- (void)removeAllJobs {
[self.queue inDatabase:^(FMDatabase *db) {
[db executeUpdate:@"DELETE FROM queue"];
[self _databaseHadError:[db hadError] fromDatabase:db];
}];
}
/**
* Returns the total number of jobs within the datastore.
*
* @return {uint}
*/
- (NSUInteger)fetchJobCount
{
__block NSUInteger count = 0;
[self.queue inDatabase:^(FMDatabase *db) {
FMResultSet *rs = [db executeQuery:@"SELECT count(id) AS count FROM queue"];
[self _databaseHadError:[db hadError] fromDatabase:db];
while ([rs next]) {
count = [rs intForColumn:@"count"];
}
[rs close];
}];
return count;
}
/**
* Returns the oldest job from the datastore.
*
* @return {NSDictionary}
*/
- (NSDictionary *)fetchJob
{
__block id job;
[self.queue inDatabase:^(FMDatabase *db) {
FMResultSet *rs = [db executeQuery:@"SELECT * FROM queue ORDER BY id ASC LIMIT 1"];
[self _databaseHadError:[db hadError] fromDatabase:db];
while ([rs next]) {
job = [self _jobFromResultSet:rs];
}
[rs close];
}];
return job;
}
/**
* Returns the oldest job for the task from the datastore.
*
* @param {id} Task label
*
* @return {NSDictionary}
*/
- (NSDictionary *)fetchJobForTask:(id)task
{
__block id job;
[self.queue inDatabase:^(FMDatabase *db) {
FMResultSet *rs = [db executeQuery:@"SELECT * FROM queue WHERE task = ? ORDER BY id ASC LIMIT 1", task];
[self _databaseHadError:[db hadError] fromDatabase:db];
while ([rs next]) {
job = [self _jobFromResultSet:rs];
}
[rs close];
}];
return job;
}
#pragma mark - Private methods
- (NSDictionary *)_jobFromResultSet:(FMResultSet *)rs
{
NSDictionary *job = @{
@"id": [NSNumber numberWithInt:[rs intForColumn:@"id"]],
@"task": [rs stringForColumn:@"task"],
@"data": [NSJSONSerialization JSONObjectWithData:[[rs stringForColumn:@"data"] dataUsingEncoding:NSUTF8StringEncoding] options:NSJSONReadingMutableContainers error:nil],
@"attempts": [NSNumber numberWithInt:[rs intForColumn:@"attempts"]],
@"stamp": [rs stringForColumn:@"stamp"]
};
return job;
}
- (BOOL)_databaseHadError:(BOOL)flag fromDatabase:(FMDatabase *)db
{
if (flag) NSLog(@"Queue Database Error %d: %@", [db lastErrorCode], [db lastErrorMessage]);
return flag;
}
@end