Head's Up! These forums are read-only. All users and content have migrated. Please join us at community.neo4j.com.
10-16-2019 10:51 AM
I have a Cypher that calls the apoc.cypher.doIt procedure. It works fine when run in the browser. However, when it is configured to run from Kafka streams, it reports the error:
2019-10-16 15:47:23.049+0000 ERROR Error processing 1 messages
ErrorData(originalTopic=platforms.kubernetes.rolebindings, timestamp=1571239718430, partition=0, offset=0, exception=org.neo4j.graphdb.QueryExecutionException: There is no procedure with the name `apoc.cypher.doIt` registered for this database instance. Please ensure you've spelled the procedure name correctly and that the procedure is properly deployed.
In the Neo4j.conf file, the APOC procedures were set to unrestricted and the Cypher for one of the Kafka topics calls an APOC procedure, as in this fragment:
dbms.security.procedures.unrestricted=algo.*,apoc.*
streams.sink.topic.cypher.platforms.kubernetes.rolebindings=call apoc.cypher.doIt("....
This Cypher was run several times in the browser to test it before configuring it for use with Kafka streams. And it still runs in the browser on the same database where Kafka streams is reporting it as not found.
Is there some additional configuration needed to enable the APOC procedure to be found from Kafka Streams?
10-16-2019 11:27 AM
Can you show you entire cypher query? It may be throwing an error based on the quotes.
10-17-2019 04:47 AM
@david.fauth Its a bit long. Here is the version that works fine and finds the APOC procedure when run in the browser:
WITH [{ name:"RiptideTraining", namespace:"riptide-training", cluster:"kubernetes-ctc-core-nonprod", memberships:[ { name:"henry", type:"User", role:"admin" }, { name:"svcacct-1", type:"ServiceAccount", role:"admin", namespace:"sa-namespace" }, { name : "OSFI_K8S_CSG_ADMIN", role : "admin", type : "Group" },{ name : "system:authenticated", role : "view", type : "SystemGroup" } ] },
{ name:"RiptideTraining", namespace:"riptide-training", cluster:"kubernetes-ctc-core-nonprod", memberships:[ { name:"henry", type:"User", role:"admin" }, { name:"svcacct-1", type:"ServiceAccount", role:"developer", namespace:"sa-namespace" } ] }
] AS events
UNWIND events AS event
call apoc.cypher.doIt("
WITH $event as event, randomUUID() AS uuid
MERGE (namespace:PlatformsContainersNamespace {Name:event.namespace,Cluster:event.cluster})
MERGE (cluster:PlatformsContainersCluster {Name:event.cluster})
MERGE (namespace)-[:CreatedIn]-(cluster)
FOREACH (perm in event.memberships |
FOREACH ( _ IN case when perm.type = 'User' then [1] else [] end |
MERGE (who:AccountUser {Name:perm.name})
MERGE (who)-[r:hasAccess]-(namespace)
ON CREATE set r.Runid = uuid, r.Role = perm.role
ON MATCH set r.Runid = uuid, r.Role = perm.role
)
FOREACH ( _ IN case when perm.type = 'ServiceAccount' then [1] else [] end |
MERGE (who:PlatformsContainersServiceAccount {Name:perm.name,Namespace:perm.namespace,Cluster:event.cluster})
MERGE (who)-[r:hasAccess]-(namespace)
ON CREATE set r.Runid = uuid, r.Role = perm.role
ON MATCH set r.Runid = uuid, r.Role = perm.role
MERGE (createnamespace:PlatformsContainersNamespace {Name:perm.namespace,Cluster:event.cluster})
MERGE (who)-[r2:createdIn]-(createnamespace)
)
FOREACH ( _ IN case when perm.type = 'Group' then [1] else [] end |
MERGE (who:SecurityADGroup {Name:perm.name})
MERGE (who)-[r:hasAccess]-(namespace)
ON CREATE set r.Runid = uuid, r.Role = perm.role
ON MATCH set r.Runid = uuid, r.Role = perm.role
)
FOREACH ( _ IN case when perm.type = 'SystemGroup' then [1] else [] end |
MERGE (who:PlatformsContainersSystemGroup {Name:perm.name,Cluster:event.cluster})
MERGE (who)-[r:hasAccess]-(namespace)
ON CREATE set r.Runid = uuid, r.Role = perm.role
ON MATCH set r.Runid = uuid, r.Role = perm.role
)
)
WITH namespace, uuid
MATCH ()-[rd1:hasAccess]-(namespace) WHERE rd1.Runid <> uuid DELETE rd1
",{event:event}) yield value
return value
Here is how that was configure in the neo4j.config file to obtain the data from a kafka topic rather than hard coded for the browser testing:
streams.sink.topic.cypher.platforms.kubernetes.rolebindings=call apoc.cypher.doIt(" WITH $event as event, randomUUID() AS uuid MERGE (namespace:PlatformsContainersNamespace {Name:event.namespace,Cluster:event.cluster}) MERGE (cluster:PlatformsContainersCluster {Name:event.cluster}) MERGE (namespace)-[:CreatedIn]-(cluster) FOREACH (perm in event.memberships | FOREACH ( _ IN case when perm.type = 'User' then [1] else [] end | MERGE (who:AccountUser {Name:perm.name}) MERGE (who)-[r:hasAccess]-(namespace) ON CREATE set r.Runid = uuid, r.Role = perm.role ON MATCH set r.Runid = uuid, r.Role = perm.role ) FOREACH ( _ IN case when perm.type = 'ServiceAccount' then [1] else [] end | MERGE (who:PlatformsContainersServiceAccount {Name:perm.name,Namespace:perm.namespace,Cluster:event.cluster}) MERGE (who)-[r:hasAccess]-(namespace) ON CREATE set r.Runid = uuid, r.Role = perm.role ON MATCH set r.Runid = uuid, r.Role = perm.role MERGE (createnamespace:PlatformsContainersNamespace {Name:perm.namespace,Cluster:event.cluster}) MERGE (who)-[r2:createdIn]-(createnamespace) ) FOREACH ( _ IN case when perm.type = 'Group' then [1] else [] end | MERGE (who:SecurityADGroup {Name:perm.name}) MERGE (who)-[r:hasAccess]-(namespace) ON CREATE set r.Runid = uuid, r.Role = perm.role ON MATCH set r.Runid = uuid, r.Role = perm.role ) FOREACH ( _ IN case when perm.type = 'SystemGroup' then [1] else [] end | MERGE (who:PlatformsContainersSystemGroup {Name:perm.name,Cluster:event.cluster}) MERGE (who)-[r:hasAccess]-(namespace) ON CREATE set r.Runid = uuid, r.Role = perm.role ON MATCH set r.Runid = uuid, r.Role = perm.role ) ) WITH namespace, uuid MATCH ()-[rd1:hasAccess]-(namespace) WHERE rd1.Runid <> uuid DELETE rd1 ",{event:event}) yield value return value
All the sessions of the conference are now available online