Skip to content

Commit 4602b9f

Browse files
author
Lei Shi
committed
OPSAPS-32613 [DR] (PART VI) Add python APIs for hdfs s3 replication
1. added class ApiHdfsCloudReplicationArguments; 2. changed class ApiReplicationSchedule by adding field 'hdfsCloudArguments'; 3. changed to allow cloud schedule creation; 4. added unit tests.
1 parent 142a24f commit 4602b9f

File tree

3 files changed

+114
-2
lines changed

3 files changed

+114
-2
lines changed

python/src/cm_api/endpoints/services.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1473,9 +1473,12 @@ def create_replication_schedule(self,
14731473
alertOnFail=alert_on_fail, alertOnAbort=alert_on_abort)
14741474

14751475
if self.type == 'HDFS':
1476-
if not isinstance(arguments, ApiHdfsReplicationArguments):
1476+
if isinstance(arguments, ApiHdfsCloudReplicationArguments):
1477+
schedule.hdfsCloudArguments = arguments
1478+
elif isinstance(arguments, ApiHdfsReplicationArguments):
1479+
schedule.hdfsArguments = arguments
1480+
else:
14771481
raise TypeError, 'Unexpected type for HDFS replication argument.'
1478-
schedule.hdfsArguments = arguments
14791482
elif self.type == 'HIVE':
14801483
if not isinstance(arguments, ApiHiveReplicationArguments):
14811484
raise TypeError, 'Unexpected type for Hive replication argument.'

python/src/cm_api/endpoints/types.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -708,6 +708,18 @@ class ApiHdfsReplicationArguments(BaseApiObject):
708708
'exclusionFilters' : None,
709709
}
710710

711+
class ApiHdfsCloudReplicationArguments(ApiHdfsReplicationArguments):
712+
@classmethod
713+
def _get_attributes(cls):
714+
if not cls.__dict__.has_key('_ATTRIBUTES'):
715+
attrs = {
716+
'cloudAccount' : None,
717+
'cloudService' : None,
718+
}
719+
attrs.update(ApiHdfsReplicationArguments._get_attributes())
720+
cls._ATTRIBUTES = attrs
721+
return cls._ATTRIBUTES
722+
711723
class ApiHdfsReplicationResult(BaseApiObject):
712724
_ATTRIBUTES = {
713725
'progress' : ROAttr(),
@@ -797,6 +809,7 @@ class ApiReplicationSchedule(BaseApiObject):
797809
'paused' : None,
798810
'hdfsArguments' : Attr(ApiHdfsReplicationArguments),
799811
'hiveArguments' : Attr(ApiHiveReplicationArguments),
812+
'hdfsCloudArguments' : Attr(ApiHdfsCloudReplicationArguments),
800813
'alertOnStart' : None,
801814
'alertOnSuccess' : None,
802815
'alertOnFail' : None,

python/src/cm_api_tests/test_replication.py

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,54 @@ def test_hdfs_arguments(self):
7272
self.assertEquals('DYNAMIC', args.replicationStrategy)
7373
self.assertFalse(args.preserveXAttrs)
7474

75+
def test_hdfs_cloud_arguments(self):
76+
RAW = '''{
77+
"sourceService" : {
78+
"peerName" : "vst2",
79+
"clusterName" : "Cluster 1 - CDH4",
80+
"serviceName" : "HDFS-1"
81+
},
82+
"sourcePath" : "/data",
83+
"destinationPath" : "/copy/data2",
84+
"mapreduceServiceName" : "MAPREDUCE-1",
85+
"schedulerPoolName" : "medium",
86+
"userName" : "systest",
87+
"dryRun" : false,
88+
"abortOnError" : true,
89+
"removeMissingFiles" : false,
90+
"preserveReplicationCount" : true,
91+
"preserveBlockSize" : true,
92+
"preservePermissions" : false,
93+
"skipTrash" : false,
94+
"replicationStrategy" : "DYNAMIC",
95+
"logPath" : "/tmp",
96+
"bandwidthPerMap" : "20",
97+
"preserveXAttrs" : false,
98+
"exclusionFilters" : ["ac"],
99+
"cloudAccount" : "someTestAccount",
100+
"cloudService" : "TARGET"
101+
}'''
102+
args = utils.deserialize(RAW, ApiHdfsCloudReplicationArguments)
103+
self.assertEquals('vst2', args.sourceService.peerName)
104+
self.assertEquals('Cluster 1 - CDH4', args.sourceService.clusterName)
105+
self.assertEquals('HDFS-1', args.sourceService.serviceName)
106+
self.assertEquals('/data', args.sourcePath)
107+
self.assertEquals('/copy/data2', args.destinationPath)
108+
self.assertEquals('MAPREDUCE-1', args.mapreduceServiceName)
109+
self.assertEquals('medium', args.schedulerPoolName)
110+
self.assertEquals('systest', args.userName)
111+
self.assertFalse(args.dryRun)
112+
self.assertTrue(args.abortOnError)
113+
self.assertFalse(args.removeMissingFiles)
114+
self.assertTrue(args.preserveBlockSize)
115+
self.assertFalse(args.preservePermissions)
116+
self.assertTrue(args.preserveReplicationCount)
117+
self.assertFalse(args.skipTrash)
118+
self.assertEquals('DYNAMIC', args.replicationStrategy)
119+
self.assertFalse(args.preserveXAttrs)
120+
self.assertEquals('someTestAccount', args.cloudAccount)
121+
self.assertEquals('TARGET', args.cloudService)
122+
75123
def test_hive_arguments(self):
76124
RAW = '''{
77125
"sourceService" : {
@@ -321,5 +369,53 @@ def test_replication_crud(self):
321369
retdata=return_sched.to_json_dict())
322370
service.delete_replication_schedule(1)
323371

372+
def test_hdfs_cloud_replication_crud(self):
373+
service = ApiService(self.resource, 'hdfs1', 'HDFS')
374+
service.__dict__['clusterRef'] = ApiClusterRef(self.resource, clusterName='cluster1')
375+
376+
hdfs_args = ApiHdfsCloudReplicationArguments(self.resource)
377+
hdfs_args.sourceService = ApiServiceRef('cluster2', 'hdfs2')
378+
hdfs_args.sourcePath = '/src'
379+
hdfs_args.destinationPath = 's3a://somebucket/dst'
380+
hdfs_args.cloudAccount = 'someTestAccount'
381+
hdfs_args.cloudService = 'TARGET'
382+
383+
return_sched = ApiReplicationSchedule(self.resource,
384+
interval=2, intervalUnit='DAY')
385+
return_sched.hdfsCloudArguments = hdfs_args
386+
return_sched.__dict__['id'] = 1
387+
return_list = ApiList([ return_sched ]).to_json_dict()
388+
389+
self.resource.expect("POST",
390+
"/clusters/cluster1/services/hdfs1/replications",
391+
retdata=return_list)
392+
393+
sched = service.create_replication_schedule(
394+
None, None, 'DAY', 2, True, hdfs_args, alert_on_fail=True)
395+
self.assertEqual(return_sched.intervalUnit, sched.intervalUnit)
396+
self.assertEqual(return_sched.interval, sched.interval)
397+
self.assertIsInstance(sched.hdfsCloudArguments, ApiHdfsCloudReplicationArguments)
398+
399+
self.resource.expect("GET",
400+
"/clusters/cluster1/services/hdfs1/replications",
401+
retdata=return_list)
402+
service.get_replication_schedules()
403+
404+
self.resource.expect("GET",
405+
"/clusters/cluster1/services/hdfs1/replications/1",
406+
retdata=return_sched.to_json_dict())
407+
service.get_replication_schedule(1)
408+
409+
self.resource.expect("PUT",
410+
"/clusters/cluster1/services/hdfs1/replications/1",
411+
data=return_sched,
412+
retdata=return_sched.to_json_dict())
413+
service.update_replication_schedule(1, return_sched)
414+
415+
self.resource.expect("DELETE",
416+
"/clusters/cluster1/services/hdfs1/replications/1",
417+
retdata=return_sched.to_json_dict())
418+
service.delete_replication_schedule(1)
419+
324420
if __name__ == '__main__':
325421
unittest.main()

0 commit comments

Comments
 (0)